diff --git a/src/main/java/com/upserve/uppend/BlockedLongs.java b/src/main/java/com/upserve/uppend/BlockedLongs.java index a0387dcb..84dcc572 100644 --- a/src/main/java/com/upserve/uppend/BlockedLongs.java +++ b/src/main/java/com/upserve/uppend/BlockedLongs.java @@ -12,7 +12,6 @@ import java.util.*; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.Lock; -import java.util.function.Supplier; import java.util.stream.*; public class BlockedLongs implements AutoCloseable, Flushable { @@ -214,7 +213,14 @@ public LongStream values(Long pos){ return LongStream.empty(); } - return Arrays.stream(valuesArray(pos)); + long[] longs = valuesArray(pos); + + + return Arrays.stream(longs); + +// return StreamSupport.longStream( +// Spliterators.spliterator(longs, 0, longs.length, Spliterator.CONCURRENT | Spliterator.IMMUTABLE |Spliterator.NONNULL), +// true); } public long[] valuesArray(Long pos) { @@ -259,6 +265,8 @@ public long[] valuesArray(Long pos) { } } + // Lazy values is much slower in Performance tests with a large number of blocks. + // Might be workable with a custom spliterator that was block aware for parallelization public LongStream lazyValues(Long pos) { log.trace("streaming values from {} at {}", file, pos); diff --git a/src/main/java/com/upserve/uppend/blobs/MappedPage.java b/src/main/java/com/upserve/uppend/blobs/MappedPage.java index f49b6298..76963488 100644 --- a/src/main/java/com/upserve/uppend/blobs/MappedPage.java +++ b/src/main/java/com/upserve/uppend/blobs/MappedPage.java @@ -5,21 +5,23 @@ import static java.lang.Integer.min; /** - * Mapped Byte Buffer backed implementation of Page + * {@link ByteBuffer} backed implementation of {@link Page} + * The buffer used in the constructor must be thread local + * Buffer are not thread safe! */ public class MappedPage implements Page { - private final MappedByteBuffer buffer; + private final ByteBuffer buffer; private final int pageSize; private final int startingPosition; /** * Constructor for a MappedPage * - * @param buffer the mapped byte buffer representing a page of a file + * @param buffer a {@link ThreadLocal} {@link ByteBuffer} (mapped from a file) containing a {@link Page} of a {@link VirtualPageFile} * @param startingPosition the starting offset in a larger buffer * @param pageSize the size of the page to create */ - public MappedPage(MappedByteBuffer buffer, int startingPosition, int pageSize) { + public MappedPage(ByteBuffer buffer, int startingPosition, int pageSize) { this.pageSize = pageSize; this.buffer = buffer; this.startingPosition = startingPosition; @@ -30,7 +32,7 @@ public MappedPage(MappedByteBuffer buffer, int startingPosition, int pageSize) { * * @param buffer the mapped byte buffer representing a page of a file */ - public MappedPage(MappedByteBuffer buffer) { + public MappedPage(ByteBuffer buffer) { this(buffer, 0, buffer.capacity()); } @@ -42,9 +44,8 @@ public int get(int pagePosition, byte[] dst, int bufferOffset) { final int actualRead = min(desiredRead, availableToRead); // Make a local buffer with local position - ByteBuffer localBuffer = buffer.duplicate(); - localBuffer.position(pagePosition + startingPosition); - localBuffer.get(dst, bufferOffset, actualRead); + buffer.position(pagePosition + startingPosition); + buffer.get(dst, bufferOffset, actualRead); return actualRead; } @@ -56,9 +57,8 @@ public int put(int pagePosition, byte[] src, int bufferOffset) { final int actualWrite = min(desiredWrite, availableToWrite); // Make a local buffer with local position - ByteBuffer localBuffer = buffer.duplicate(); - localBuffer.position(pagePosition + startingPosition); - localBuffer.put(src, bufferOffset, actualWrite); + buffer.position(pagePosition + startingPosition); + buffer.put(src, bufferOffset, actualWrite); return actualWrite; } diff --git a/src/main/java/com/upserve/uppend/blobs/VirtualPageFile.java b/src/main/java/com/upserve/uppend/blobs/VirtualPageFile.java index f30ea62c..e3ee9b3e 100644 --- a/src/main/java/com/upserve/uppend/blobs/VirtualPageFile.java +++ b/src/main/java/com/upserve/uppend/blobs/VirtualPageFile.java @@ -60,7 +60,7 @@ public class VirtualPageFile implements Closeable { private static final int PAGE_TABLE_SIZE = 1000; private static final int MAX_BUFFERS = 1024 * 64; // 128 TB per partition for 2Gb Bufffers - private final MappedByteBuffer[] mappedByteBuffers; + private final ThreadLocal[] localMappedByteBuffers; private final int bufferSize; private final Path filePath; @@ -96,8 +96,7 @@ public Path getFilePath() { @Override public void close() throws IOException { if (!channel.isOpen()) return; - Arrays.fill(mappedByteBuffers, null); - + Arrays.fill(localMappedByteBuffers, null); if (!readOnly) { channel.truncate(nextPagePosition.get()); } @@ -239,7 +238,7 @@ MappedPage mappedPage(long startPosition) { final int mapIndex = (int) (postHeaderPosition / bufferSize); final int mapPosition = (int) (postHeaderPosition % bufferSize); - MappedByteBuffer bigbuffer = ensureBuffered(mapIndex); + ByteBuffer bigbuffer = ensureBuffered(mapIndex); return new MappedPage(bigbuffer, mapPosition, pageSize); } @@ -262,7 +261,7 @@ public VirtualPageFile(Path filePath, int virtualFiles, int pageSize, int target this.virtualFiles = virtualFiles; this.pageSize = pageSize; - this.mappedByteBuffers = new MappedByteBuffer[MAX_BUFFERS]; + this.localMappedByteBuffers = new ThreadLocal[MAX_BUFFERS]; if (targetBufferSize < (pageSize)) throw new IllegalArgumentException("Target buffer size " + targetBufferSize + " must be larger than a page " + pageSize); @@ -460,22 +459,28 @@ private void allocatePage(int virtualFileNumber, int currentPageCount, int pageN pageAllocationCount.add(pagesToAllocate); } - private MappedByteBuffer ensureBuffered(int bufferIndex) { - MappedByteBuffer buffer = mappedByteBuffers[bufferIndex]; - if (buffer == null) { - synchronized (mappedByteBuffers) { - buffer = mappedByteBuffers[bufferIndex]; - if (buffer == null) { + @SuppressWarnings("unchecked") + private ByteBuffer ensureBuffered(int bufferIndex) { + ThreadLocal local = localMappedByteBuffers[bufferIndex]; + if (local == null) { + synchronized (localMappedByteBuffers) { + local = localMappedByteBuffers[bufferIndex]; + if (local == null) { long bufferStart = ((long) bufferIndex * bufferSize) + totalHeaderSize; try { - buffer = channel.map(FileChannel.MapMode.READ_ONLY, bufferStart, bufferSize); + ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, bufferStart, bufferSize); + local = ThreadLocal.withInitial(buffer::duplicate); + local.set(buffer); + localMappedByteBuffers[bufferIndex] = local; } catch (IOException e) { throw new UncheckedIOException("Unable to map buffer for index " + bufferIndex + " at (" + bufferStart + " start position) in file " + filePath, e); } - mappedByteBuffers[bufferIndex] = buffer; } } } + + ByteBuffer buffer = (ByteBuffer) local.get(); + return buffer; } @@ -513,6 +518,7 @@ private LongBuffer ensurePageTable(int pageNumber) { } // Called during initialize only - no need to synchronize + @SuppressWarnings("unchecked") private void preloadBuffers(long nextPagePosition){ for (int bufferIndex=0; bufferIndex parallelStream = () -> Arrays.stream(longs).parallel(); + private final Supplier sequentialStream = () -> Arrays.stream(longs).sequential(); + + @Before + public void loadStore() { + Arrays.setAll(longs, (v) -> ThreadLocalRandom.current().nextLong(0, 512)); + long val = Arrays.stream(longs, 0, 100).parallel().sum(); + } + + @Test + public void sumTest() { + for (int i=0; i Arrays.stream(longs)).parallel())); + sequentialTime(flatMapSum(LongStream.of(1,2,3,4,5,6,7,8,9,10).flatMap(val -> Arrays.stream(longs)).sequential())); + } + } +} diff --git a/src/test/java/com/upserve/uppend/performance/MultiKeyTest.java b/src/test/java/com/upserve/uppend/performance/MultiKeyTest.java new file mode 100644 index 00000000..b67c6579 --- /dev/null +++ b/src/test/java/com/upserve/uppend/performance/MultiKeyTest.java @@ -0,0 +1,94 @@ +package com.upserve.uppend.performance; + +import com.upserve.uppend.*; +import com.upserve.uppend.util.SafeDeleting; +import org.junit.*; + +import java.io.IOException; +import java.nio.file.*; +import java.util.*; +import java.util.function.Supplier; +import java.util.stream.*; + +import static com.upserve.uppend.performance.StreamTimerMethods.*; + +public class MultiKeyTest { + + private static final int numPartitions = 12; + private static final int hashSize = 32; + private static final int values = 1_000; + private static final int blobsPerBlock = 64; + + private static final int keyCount = 1_000; + + private final Path path = Paths.get("build/test/tmp/performance"); + + private AppendOnlyStore appendOnlyStore; + + private final String[] keys = new String[keyCount]; // Just use a single key - it will be hashed appropriately for partition and hashsize + + private final int repeats = 5; + + private final Supplier parallelStream = () -> Arrays.stream(keys).flatMapToLong(key -> appendOnlyStore.read(key, key).mapToLong(bytes -> bytes.length).parallel()).parallel(); + private final Supplier sequentialStream = () -> Arrays.stream(keys).flatMapToLong(key -> appendOnlyStore.read(key, key).mapToLong(bytes -> bytes.length).sequential()).sequential(); + + @Before + public void loadStore() throws IOException { + SafeDeleting.removeTempPath(path); + + appendOnlyStore = new AppendOnlyStoreBuilder() + .withPartitionSize(numPartitions) + .withLongLookupHashSize(hashSize) + .withBlobsPerBlock(blobsPerBlock) + .withDir(path).build(); + + for (int value=0; value appendOnlyStore.append(key, key, bytes)); + } + + } + + @After + public void cleanup() throws IOException { + SafeDeleting.removeTempPath(path); + } + + @Test + public void sumTest() { + for (int i=0; i parallelStream = () -> appendOnlyStore.read(partition, key).mapToLong(bytes -> bytes.length).parallel(); + private final Supplier sequentialStream = () -> appendOnlyStore.read(partition, key).mapToLong(bytes -> bytes.length).sequential(); + + + @Before + public void loadStore() throws IOException { + SafeDeleting.removeTempPath(path); + + appendOnlyStore = new AppendOnlyStoreBuilder() + .withPartitionSize(numPartitions) + .withLongLookupHashSize(hashSize) + .withBlobsPerBlock(blobsPerBlock) + .withDir(path).build(); + + new Random() + .ints(values, 0, 512) + .parallel() + .mapToObj(TestHelper::genBytes) + .forEach(bytes -> appendOnlyStore.append(partition, key, bytes)); + } + + @After + public void cleanup() throws IOException { + SafeDeleting.removeTempPath(path); + } + + @Test + public void sumTest() { + for (int i=0; i appendOnlyStore.read(partition, key)).mapToLong(bytes -> bytes.length).parallel() + )); + sequentialTime(flatMapSum( + Stream.of(1,2,3,4,5).flatMap(val -> appendOnlyStore.read(partition, key)).mapToLong(bytes -> bytes.length).sequential() + )); + } + } +} diff --git a/src/test/java/com/upserve/uppend/performance/StreamTimerMethods.java b/src/test/java/com/upserve/uppend/performance/StreamTimerMethods.java new file mode 100644 index 00000000..c713b482 --- /dev/null +++ b/src/test/java/com/upserve/uppend/performance/StreamTimerMethods.java @@ -0,0 +1,109 @@ +package com.upserve.uppend.performance; + +import java.util.*; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.*; +import java.util.stream.*; + +import static junit.framework.TestCase.assertTrue; + +public class StreamTimerMethods { + + private static final int repeats = 5; + + public static void parallelTime(double time) { + System.out.println(String.format("Parallel execution time : %.2f ms", time)); + } + + public static void sequentialTime(double time) { + System.out.println(String.format("Sequential execution time: %.2f ms", time)); + } + + public static double sum(Supplier supplier) { + LongStream[] longStreams = new LongStream[repeats]; + long[] sums = new long[repeats]; + + for (int i=0; i 1_000_000L); + } + + return (toc - tic)/1_000_000.0D; + } + + public static double groupByCounting(Supplier supplier, boolean concurrent) { + LongStream[] longStreams = new LongStream[repeats]; + + List> groups = new ArrayList<>(repeats); + + for (int i=0; i group: groups){ + assertTrue("Should be large", group.values().stream().mapToLong(Long::longValue).sum() >= 1_000_000L); + } + + return (toc - tic)/1_000_000.0D; + } + + public static double forEachAdder(Supplier supplier) { + LongStream[] longStreams = new LongStream[repeats]; + long[] sums = new long[repeats]; + LongAdder[] longAdders = new LongAdder[repeats]; + + for (int i=0; i 1_000_000L); + } + + return (toc - tic)/1_000_000.0D; + } + + public static double flatMapSum(LongStream stream){ + long tic = System.nanoTime(); + long sum = stream.sum(); + long toc = System.nanoTime(); + + assertTrue("Should be large", sum > 1_000_000L); + + return (toc - tic)/1_000_000.0D; + } + +}