From 1e68808cd8291bdd669d911727336724c3837764 Mon Sep 17 00:00:00 2001 From: David Stuebe Date: Sun, 30 Dec 2018 12:17:58 -0500 Subject: [PATCH 1/3] Add stream performance tests --- .../java/com/upserve/uppend/BlockedLongs.java | 12 +- .../upserve/uppend/AppendOnlyStoreTest.java | 31 +---- .../java/com/upserve/uppend/TestHelper.java | 28 +++++ .../performance/ArrayLongStreamTest.java | 70 +++++++++++ .../uppend/performance/MultiKeyTest.java | 94 +++++++++++++++ .../uppend/performance/SingleKeyTest.java | 102 ++++++++++++++++ .../performance/StreamTimerMethods.java | 109 ++++++++++++++++++ 7 files changed, 416 insertions(+), 30 deletions(-) create mode 100644 src/test/java/com/upserve/uppend/performance/ArrayLongStreamTest.java create mode 100644 src/test/java/com/upserve/uppend/performance/MultiKeyTest.java create mode 100644 src/test/java/com/upserve/uppend/performance/SingleKeyTest.java create mode 100644 src/test/java/com/upserve/uppend/performance/StreamTimerMethods.java diff --git a/src/main/java/com/upserve/uppend/BlockedLongs.java b/src/main/java/com/upserve/uppend/BlockedLongs.java index a0387dcb..84dcc572 100644 --- a/src/main/java/com/upserve/uppend/BlockedLongs.java +++ b/src/main/java/com/upserve/uppend/BlockedLongs.java @@ -12,7 +12,6 @@ import java.util.*; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.Lock; -import java.util.function.Supplier; import java.util.stream.*; public class BlockedLongs implements AutoCloseable, Flushable { @@ -214,7 +213,14 @@ public LongStream values(Long pos){ return LongStream.empty(); } - return Arrays.stream(valuesArray(pos)); + long[] longs = valuesArray(pos); + + + return Arrays.stream(longs); + +// return StreamSupport.longStream( +// Spliterators.spliterator(longs, 0, longs.length, Spliterator.CONCURRENT | Spliterator.IMMUTABLE |Spliterator.NONNULL), +// true); } public long[] valuesArray(Long pos) { @@ -259,6 +265,8 @@ public long[] valuesArray(Long pos) { } } + // Lazy values is much slower in Performance tests with a large number of blocks. + // Might be workable with a custom spliterator that was block aware for parallelization public LongStream lazyValues(Long pos) { log.trace("streaming values from {} at {}", file, pos); diff --git a/src/test/java/com/upserve/uppend/AppendOnlyStoreTest.java b/src/test/java/com/upserve/uppend/AppendOnlyStoreTest.java index 495fecab..0411df2b 100644 --- a/src/test/java/com/upserve/uppend/AppendOnlyStoreTest.java +++ b/src/test/java/com/upserve/uppend/AppendOnlyStoreTest.java @@ -13,6 +13,7 @@ import java.util.concurrent.*; import java.util.stream.Collectors; +import static com.upserve.uppend.TestHelper.genBytes; import static org.junit.Assert.*; public class AppendOnlyStoreTest { @@ -389,37 +390,11 @@ private void tester(int number, int size) { assertEquals(inputBytes.size(), outputBytes.size()); - inputBytes.sort(AppendOnlyStoreTest::compareByteArrays); - outputBytes.sort(AppendOnlyStoreTest::compareByteArrays); + inputBytes.sort(TestHelper::compareByteArrays); + outputBytes.sort(TestHelper::compareByteArrays); for (int i = 0; i < number; i++) { assertArrayEquals("input and output byte arrays differ at index " + i, inputBytes.get(i), outputBytes.get(i)); } } - - private static int compareByteArrays(byte[] o1, byte[] o2) { - if (o1 == null) { - if (o2 == null) { - return 0; - } - return -1; - } - if (o2 == null) { - return 1; - } - for (int i = 0; i < o1.length && i < o2.length; i++) { - int v1 = 0xff & o1[i]; - int v2 = 0xff & o2[i]; - if (v1 != v2) { - return v1 < v2 ? -1 : 1; - } - } - return Integer.compare(o1.length, o2.length); - } - - private byte[] genBytes(int len) { - byte[] bytes = new byte[len]; - new Random().nextBytes(bytes); - return bytes; - } } diff --git a/src/test/java/com/upserve/uppend/TestHelper.java b/src/test/java/com/upserve/uppend/TestHelper.java index 78ef86bb..ff3ffd6c 100644 --- a/src/test/java/com/upserve/uppend/TestHelper.java +++ b/src/test/java/com/upserve/uppend/TestHelper.java @@ -3,9 +3,11 @@ import org.slf4j.*; import java.lang.reflect.*; +import java.util.Random; import java.util.concurrent.*; public class TestHelper { + public static void resetLogger(Class clazz, String fieldName) throws Exception { setLogger(clazz, fieldName, LoggerFactory.getLogger(clazz)); } @@ -43,4 +45,30 @@ public static CounterStoreBuilder getDefaultCounterStoreTestBuilder() { .withLookupPageSize(16 * 1024); } + public static int compareByteArrays(byte[] o1, byte[] o2) { + if (o1 == null) { + if (o2 == null) { + return 0; + } + return -1; + } + if (o2 == null) { + return 1; + } + for (int i = 0; i < o1.length && i < o2.length; i++) { + int v1 = 0xff & o1[i]; + int v2 = 0xff & o2[i]; + if (v1 != v2) { + return v1 < v2 ? -1 : 1; + } + } + return Integer.compare(o1.length, o2.length); + } + + public static byte[] genBytes(int len) { + byte[] bytes = new byte[len]; + ThreadLocalRandom.current().nextBytes(bytes); + return bytes; + } + } diff --git a/src/test/java/com/upserve/uppend/performance/ArrayLongStreamTest.java b/src/test/java/com/upserve/uppend/performance/ArrayLongStreamTest.java new file mode 100644 index 00000000..f1516791 --- /dev/null +++ b/src/test/java/com/upserve/uppend/performance/ArrayLongStreamTest.java @@ -0,0 +1,70 @@ +package com.upserve.uppend.performance; + +import org.junit.*; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.*; +import java.util.stream.*; + +import static com.upserve.uppend.performance.StreamTimerMethods.*; + +// Parallel is slower - usually about 2X, but variable! +public class ArrayLongStreamTest { + + private static final int values = 10_000_000; + + private final long[] longs = new long[values]; + + private final int repeats = 5; + + private final Supplier parallelStream = () -> Arrays.stream(longs).parallel(); + private final Supplier sequentialStream = () -> Arrays.stream(longs).sequential(); + + @Before + public void loadStore() { + Arrays.setAll(longs, (v) -> ThreadLocalRandom.current().nextLong(0, 512)); + long val = Arrays.stream(longs, 0, 100).parallel().sum(); + } + + @Test + public void sumTest() { + for (int i=0; i Arrays.stream(longs)).parallel())); + sequentialTime(flatMapSum(LongStream.of(1,2,3,4,5,6,7,8,9,10).flatMap(val -> Arrays.stream(longs)).sequential())); + } + } +} diff --git a/src/test/java/com/upserve/uppend/performance/MultiKeyTest.java b/src/test/java/com/upserve/uppend/performance/MultiKeyTest.java new file mode 100644 index 00000000..b67c6579 --- /dev/null +++ b/src/test/java/com/upserve/uppend/performance/MultiKeyTest.java @@ -0,0 +1,94 @@ +package com.upserve.uppend.performance; + +import com.upserve.uppend.*; +import com.upserve.uppend.util.SafeDeleting; +import org.junit.*; + +import java.io.IOException; +import java.nio.file.*; +import java.util.*; +import java.util.function.Supplier; +import java.util.stream.*; + +import static com.upserve.uppend.performance.StreamTimerMethods.*; + +public class MultiKeyTest { + + private static final int numPartitions = 12; + private static final int hashSize = 32; + private static final int values = 1_000; + private static final int blobsPerBlock = 64; + + private static final int keyCount = 1_000; + + private final Path path = Paths.get("build/test/tmp/performance"); + + private AppendOnlyStore appendOnlyStore; + + private final String[] keys = new String[keyCount]; // Just use a single key - it will be hashed appropriately for partition and hashsize + + private final int repeats = 5; + + private final Supplier parallelStream = () -> Arrays.stream(keys).flatMapToLong(key -> appendOnlyStore.read(key, key).mapToLong(bytes -> bytes.length).parallel()).parallel(); + private final Supplier sequentialStream = () -> Arrays.stream(keys).flatMapToLong(key -> appendOnlyStore.read(key, key).mapToLong(bytes -> bytes.length).sequential()).sequential(); + + @Before + public void loadStore() throws IOException { + SafeDeleting.removeTempPath(path); + + appendOnlyStore = new AppendOnlyStoreBuilder() + .withPartitionSize(numPartitions) + .withLongLookupHashSize(hashSize) + .withBlobsPerBlock(blobsPerBlock) + .withDir(path).build(); + + for (int value=0; value appendOnlyStore.append(key, key, bytes)); + } + + } + + @After + public void cleanup() throws IOException { + SafeDeleting.removeTempPath(path); + } + + @Test + public void sumTest() { + for (int i=0; i parallelStream = () -> appendOnlyStore.read(partition, key).mapToLong(bytes -> bytes.length).parallel(); + private final Supplier sequentialStream = () -> appendOnlyStore.read(partition, key).mapToLong(bytes -> bytes.length).sequential(); + + + @Before + public void loadStore() throws IOException { + SafeDeleting.removeTempPath(path); + + appendOnlyStore = new AppendOnlyStoreBuilder() + .withPartitionSize(numPartitions) + .withLongLookupHashSize(hashSize) + .withBlobsPerBlock(blobsPerBlock) + .withDir(path).build(); + + new Random() + .ints(values, 0, 512) + .parallel() + .mapToObj(TestHelper::genBytes) + .forEach(bytes -> appendOnlyStore.append(partition, key, bytes)); + } + + @After + public void cleanup() throws IOException { + SafeDeleting.removeTempPath(path); + } + + @Test + public void sumTest() { + for (int i=0; i appendOnlyStore.read(partition, key)).mapToLong(bytes -> bytes.length).parallel() + )); + sequentialTime(flatMapSum( + Stream.of(1,2,3,4,5).flatMap(val -> appendOnlyStore.read(partition, key)).mapToLong(bytes -> bytes.length).sequential() + )); + } + } +} diff --git a/src/test/java/com/upserve/uppend/performance/StreamTimerMethods.java b/src/test/java/com/upserve/uppend/performance/StreamTimerMethods.java new file mode 100644 index 00000000..c713b482 --- /dev/null +++ b/src/test/java/com/upserve/uppend/performance/StreamTimerMethods.java @@ -0,0 +1,109 @@ +package com.upserve.uppend.performance; + +import java.util.*; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.*; +import java.util.stream.*; + +import static junit.framework.TestCase.assertTrue; + +public class StreamTimerMethods { + + private static final int repeats = 5; + + public static void parallelTime(double time) { + System.out.println(String.format("Parallel execution time : %.2f ms", time)); + } + + public static void sequentialTime(double time) { + System.out.println(String.format("Sequential execution time: %.2f ms", time)); + } + + public static double sum(Supplier supplier) { + LongStream[] longStreams = new LongStream[repeats]; + long[] sums = new long[repeats]; + + for (int i=0; i 1_000_000L); + } + + return (toc - tic)/1_000_000.0D; + } + + public static double groupByCounting(Supplier supplier, boolean concurrent) { + LongStream[] longStreams = new LongStream[repeats]; + + List> groups = new ArrayList<>(repeats); + + for (int i=0; i group: groups){ + assertTrue("Should be large", group.values().stream().mapToLong(Long::longValue).sum() >= 1_000_000L); + } + + return (toc - tic)/1_000_000.0D; + } + + public static double forEachAdder(Supplier supplier) { + LongStream[] longStreams = new LongStream[repeats]; + long[] sums = new long[repeats]; + LongAdder[] longAdders = new LongAdder[repeats]; + + for (int i=0; i 1_000_000L); + } + + return (toc - tic)/1_000_000.0D; + } + + public static double flatMapSum(LongStream stream){ + long tic = System.nanoTime(); + long sum = stream.sum(); + long toc = System.nanoTime(); + + assertTrue("Should be large", sum > 1_000_000L); + + return (toc - tic)/1_000_000.0D; + } + +} From 0e816ad031f42807e9909e809ea37c977c9ade6f Mon Sep 17 00:00:00 2001 From: David Stuebe Date: Mon, 31 Dec 2018 13:03:19 -0500 Subject: [PATCH 2/3] Use array of ThreadLocal to avoid expensive duplicate --- .../com/upserve/uppend/blobs/MappedPage.java | 16 ++++----- .../upserve/uppend/blobs/VirtualPageFile.java | 36 +++++++++++-------- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/upserve/uppend/blobs/MappedPage.java b/src/main/java/com/upserve/uppend/blobs/MappedPage.java index f49b6298..95908c41 100644 --- a/src/main/java/com/upserve/uppend/blobs/MappedPage.java +++ b/src/main/java/com/upserve/uppend/blobs/MappedPage.java @@ -8,7 +8,7 @@ * Mapped Byte Buffer backed implementation of Page */ public class MappedPage implements Page { - private final MappedByteBuffer buffer; + private final ByteBuffer buffer; private final int pageSize; private final int startingPosition; @@ -19,7 +19,7 @@ public class MappedPage implements Page { * @param startingPosition the starting offset in a larger buffer * @param pageSize the size of the page to create */ - public MappedPage(MappedByteBuffer buffer, int startingPosition, int pageSize) { + public MappedPage(ByteBuffer buffer, int startingPosition, int pageSize) { this.pageSize = pageSize; this.buffer = buffer; this.startingPosition = startingPosition; @@ -30,7 +30,7 @@ public MappedPage(MappedByteBuffer buffer, int startingPosition, int pageSize) { * * @param buffer the mapped byte buffer representing a page of a file */ - public MappedPage(MappedByteBuffer buffer) { + public MappedPage(ByteBuffer buffer) { this(buffer, 0, buffer.capacity()); } @@ -42,9 +42,8 @@ public int get(int pagePosition, byte[] dst, int bufferOffset) { final int actualRead = min(desiredRead, availableToRead); // Make a local buffer with local position - ByteBuffer localBuffer = buffer.duplicate(); - localBuffer.position(pagePosition + startingPosition); - localBuffer.get(dst, bufferOffset, actualRead); + buffer.position(pagePosition + startingPosition); + buffer.get(dst, bufferOffset, actualRead); return actualRead; } @@ -56,9 +55,8 @@ public int put(int pagePosition, byte[] src, int bufferOffset) { final int actualWrite = min(desiredWrite, availableToWrite); // Make a local buffer with local position - ByteBuffer localBuffer = buffer.duplicate(); - localBuffer.position(pagePosition + startingPosition); - localBuffer.put(src, bufferOffset, actualWrite); + buffer.position(pagePosition + startingPosition); + buffer.put(src, bufferOffset, actualWrite); return actualWrite; } diff --git a/src/main/java/com/upserve/uppend/blobs/VirtualPageFile.java b/src/main/java/com/upserve/uppend/blobs/VirtualPageFile.java index f30ea62c..e3ee9b3e 100644 --- a/src/main/java/com/upserve/uppend/blobs/VirtualPageFile.java +++ b/src/main/java/com/upserve/uppend/blobs/VirtualPageFile.java @@ -60,7 +60,7 @@ public class VirtualPageFile implements Closeable { private static final int PAGE_TABLE_SIZE = 1000; private static final int MAX_BUFFERS = 1024 * 64; // 128 TB per partition for 2Gb Bufffers - private final MappedByteBuffer[] mappedByteBuffers; + private final ThreadLocal[] localMappedByteBuffers; private final int bufferSize; private final Path filePath; @@ -96,8 +96,7 @@ public Path getFilePath() { @Override public void close() throws IOException { if (!channel.isOpen()) return; - Arrays.fill(mappedByteBuffers, null); - + Arrays.fill(localMappedByteBuffers, null); if (!readOnly) { channel.truncate(nextPagePosition.get()); } @@ -239,7 +238,7 @@ MappedPage mappedPage(long startPosition) { final int mapIndex = (int) (postHeaderPosition / bufferSize); final int mapPosition = (int) (postHeaderPosition % bufferSize); - MappedByteBuffer bigbuffer = ensureBuffered(mapIndex); + ByteBuffer bigbuffer = ensureBuffered(mapIndex); return new MappedPage(bigbuffer, mapPosition, pageSize); } @@ -262,7 +261,7 @@ public VirtualPageFile(Path filePath, int virtualFiles, int pageSize, int target this.virtualFiles = virtualFiles; this.pageSize = pageSize; - this.mappedByteBuffers = new MappedByteBuffer[MAX_BUFFERS]; + this.localMappedByteBuffers = new ThreadLocal[MAX_BUFFERS]; if (targetBufferSize < (pageSize)) throw new IllegalArgumentException("Target buffer size " + targetBufferSize + " must be larger than a page " + pageSize); @@ -460,22 +459,28 @@ private void allocatePage(int virtualFileNumber, int currentPageCount, int pageN pageAllocationCount.add(pagesToAllocate); } - private MappedByteBuffer ensureBuffered(int bufferIndex) { - MappedByteBuffer buffer = mappedByteBuffers[bufferIndex]; - if (buffer == null) { - synchronized (mappedByteBuffers) { - buffer = mappedByteBuffers[bufferIndex]; - if (buffer == null) { + @SuppressWarnings("unchecked") + private ByteBuffer ensureBuffered(int bufferIndex) { + ThreadLocal local = localMappedByteBuffers[bufferIndex]; + if (local == null) { + synchronized (localMappedByteBuffers) { + local = localMappedByteBuffers[bufferIndex]; + if (local == null) { long bufferStart = ((long) bufferIndex * bufferSize) + totalHeaderSize; try { - buffer = channel.map(FileChannel.MapMode.READ_ONLY, bufferStart, bufferSize); + ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, bufferStart, bufferSize); + local = ThreadLocal.withInitial(buffer::duplicate); + local.set(buffer); + localMappedByteBuffers[bufferIndex] = local; } catch (IOException e) { throw new UncheckedIOException("Unable to map buffer for index " + bufferIndex + " at (" + bufferStart + " start position) in file " + filePath, e); } - mappedByteBuffers[bufferIndex] = buffer; } } } + + ByteBuffer buffer = (ByteBuffer) local.get(); + return buffer; } @@ -513,6 +518,7 @@ private LongBuffer ensurePageTable(int pageNumber) { } // Called during initialize only - no need to synchronize + @SuppressWarnings("unchecked") private void preloadBuffers(long nextPagePosition){ for (int bufferIndex=0; bufferIndex Date: Mon, 31 Dec 2018 14:14:06 -0500 Subject: [PATCH 3/3] Add comments --- src/main/java/com/upserve/uppend/blobs/MappedPage.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/upserve/uppend/blobs/MappedPage.java b/src/main/java/com/upserve/uppend/blobs/MappedPage.java index 95908c41..76963488 100644 --- a/src/main/java/com/upserve/uppend/blobs/MappedPage.java +++ b/src/main/java/com/upserve/uppend/blobs/MappedPage.java @@ -5,7 +5,9 @@ import static java.lang.Integer.min; /** - * Mapped Byte Buffer backed implementation of Page + * {@link ByteBuffer} backed implementation of {@link Page} + * The buffer used in the constructor must be thread local + * Buffer are not thread safe! */ public class MappedPage implements Page { private final ByteBuffer buffer; @@ -15,7 +17,7 @@ public class MappedPage implements Page { /** * Constructor for a MappedPage * - * @param buffer the mapped byte buffer representing a page of a file + * @param buffer a {@link ThreadLocal} {@link ByteBuffer} (mapped from a file) containing a {@link Page} of a {@link VirtualPageFile} * @param startingPosition the starting offset in a larger buffer * @param pageSize the size of the page to create */