From 2df38df5bf94baee2e009b5500cb62367c5f404e Mon Sep 17 00:00:00 2001 From: Jms Date: Thu, 21 Jan 2016 11:08:33 +0100 Subject: [PATCH 1/4] Recordinality implementation first approach. The main algorithm is working. Recordinality extends from ICardinality and Serialize but we need to performance some TODOs such as serialize, merge etc... There are a new test class for Recordinality with three tests. --- .../stream/cardinality/Recordinality.java | 178 ++++++++++++++++++ .../stream/cardinality/TestRecordinality.java | 95 ++++++++++ 2 files changed, 273 insertions(+) create mode 100644 src/main/java/com/clearspring/analytics/stream/cardinality/Recordinality.java create mode 100644 src/test/java/com/clearspring/analytics/stream/cardinality/TestRecordinality.java diff --git a/src/main/java/com/clearspring/analytics/stream/cardinality/Recordinality.java b/src/main/java/com/clearspring/analytics/stream/cardinality/Recordinality.java new file mode 100644 index 000000000..4b889d049 --- /dev/null +++ b/src/main/java/com/clearspring/analytics/stream/cardinality/Recordinality.java @@ -0,0 +1,178 @@ +package com.clearspring.analytics.stream.cardinality; + + +/** + * Java implementation of Recordinality (R) algorithm from this paper: + *

+ * http://www.dmtcs.org/pdfpapers/dmAQ0124.pdf + *

+ * Recordinality counts the number of records (more + * generally, k-records) in the sequence + *

+ * It depends in the underlying permutation of the first + * occurrences of distinct values, very different from the other + * estimators + *

+ * The Recordinality estimator => + * Z = k * (1 + 1/k)^(rk - k + 1) -1 + *

+ * E_n[Z] = n (It's an unbiased estimator of n) + *

+ * The accuracy of Recordinality in terms of SE, asymptotically, satisfacted: + *

+ * SE_n[Z] = sqrt( (n/ke)^(1/k) - 1 ) + *

+ * You can find more information in these slides: + *

+ * https://www.cs.upc.edu/~conrado/research/talks/aofa2012.pdf + *

+ *

+ * Users have different motivations to use different types of hashing functions. + * Rather than try to keep up with all available hash functions and to remove + * the concern of causing future binary incompatibilities this class allows clients + * to offer the value in hashed int or long form. This way clients are free + * to change their hash function on their own time line. We recommend using Google's + * Guava Murmur3_128 implementation as it provides good performance and speed when + * high precision is required. In our tests the 32bit MurmurHash function included + * in this project is faster and produces better results than the 32 bit murmur3 + * implementation google provides. + *

+ */ + + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.PriorityQueue; +import java.util.HashSet; +import java.util.Iterator; + +import com.clearspring.analytics.hash.MurmurHash; +import com.clearspring.analytics.util.Bits; +import com.clearspring.analytics.util.IBuilder; +import com.clearspring.analytics.util.Varint; + + +public class Recordinality implements ICardinality, Serializable { + private final int sampleSize; + private final PriorityQueue sampleSet = new PriorityQueue<>(); + private final HashSet elements = new HashSet<>(); + private long rk; + + + /** + * Initializes a new Recordinality instance with a configurable 'k'-size. + */ + public Recordinality(int sampleSize) { + this.sampleSize = sampleSize; + this.rk = 0; + } + + public boolean offerHashed(long hashedLong) { + if (!elements.contains(hashedLong)) { + if (sampleSize > sampleSet.size()) { + elements.add(hashedLong); + sampleSet.add(hashedLong); + rk+=1; + return true; + } else if (sampleSet.peek() < hashedLong) { + elements.remove(sampleSet.peek()); + elements.add(hashedLong); + sampleSet.poll(); + sampleSet.add(hashedLong); + rk+=1; + return true; + } + } + return false; + } + + @Override + public boolean offerHashed(int hashedInt) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean offer(Object o) { + long x = MurmurHash.hash64(o); + return offerHashed(x); + } + + @Override + public long cardinality() { + if (sampleSet.size() < sampleSize) return sampleSet.size(); + else { + long pow = rk - sampleSize + 1; + double estimate = (sampleSize * (Math.pow(1 + (1.0 / sampleSize), pow))) - 1; + return (long) estimate; + } + } + + + public double estimatedStandarError(){ + if (sampleSet.size() < sampleSize) return 0; + else { + long estimateCardinality = cardinality(); + double pow = 1.0/sampleSize; + return Math.sqrt( Math.pow( + estimateCardinality/(sampleSize*Math.E), pow) + - 1); + } + } + + //TODO check the sizeof, getBytes and writeBytes + @Override + public int sizeof() { + return sampleSet.size() * 8; + } + + @Override + public byte[] getBytes() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutput dos = new DataOutputStream(baos); + writeBytes(dos); + + return baos.toByteArray(); + } + + private void writeBytes(DataOutput serializedByteStream) throws IOException { + serializedByteStream.writeInt(sampleSize); + serializedByteStream.writeInt(elements.size() * 8); + + for (Long e : elements) { + serializedByteStream.writeLong(e); + } + } + + //TODO check merge function + @Override + public ICardinality merge(ICardinality... estimators) throws CardinalityMergeException { + throw new RecordinalityMergeException("Cannot merge Recordinality"); + } + + @SuppressWarnings("serial") + protected static class RecordinalityMergeException extends CardinalityMergeException { + public RecordinalityMergeException(String message) { + super(message); + } + } + + + +} diff --git a/src/test/java/com/clearspring/analytics/stream/cardinality/TestRecordinality.java b/src/test/java/com/clearspring/analytics/stream/cardinality/TestRecordinality.java new file mode 100644 index 000000000..4c3bf479c --- /dev/null +++ b/src/test/java/com/clearspring/analytics/stream/cardinality/TestRecordinality.java @@ -0,0 +1,95 @@ +package com.clearspring.analytics.stream.cardinality; + + +/** + * Copyright (C) 2011 Clearspring Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import java.io.IOException; + +import java.util.Arrays; + +import com.clearspring.analytics.TestUtils; + +import com.google.common.base.Charsets; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; + +import org.junit.Ignore; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestRecordinality { + + @Test + public void testComputeCount() { + Recordinality recordinality = new Recordinality(16); + recordinality.offer(0); + recordinality.offer(1); + recordinality.offer(2); + recordinality.offer(3); + recordinality.offer(16); + recordinality.offer(17); + recordinality.offer(18); + recordinality.offer(19); + recordinality.offer(19); + assertEquals(8, recordinality.cardinality()); + } + + @Test + public void testSerialization() throws IOException, ClassNotFoundException { + Recordinality r = new Recordinality(8); + r.offer("a"); + r.offer("b"); + r.offer("c"); + r.offer("d"); + r.offer("e"); + + Recordinality r2 = (Recordinality) TestUtils.deserialize(TestUtils.serialize(r)); + assertEquals(r.cardinality(), r2.cardinality()); + } + + @Test + public void testHighCardinality() { + int counter = 0; + for (int j = 0; j < 3; ++j) { + long start = System.currentTimeMillis(); + Recordinality recordinality = new Recordinality(10); + int size = 10000000; + for (int i = 0; i < size; i++) { + recordinality.offer(TestICardinality.streamElement(i)); + } + System.out.println("time: " + (System.currentTimeMillis() - start)); + /** + * the algorithm RECORDINALITY is expected to provide estimates + * within σ, 2σ, 3σ of the exact count in respectively at least + * 68%, 95% and 99% of all cases. + */ + long estimate = recordinality.cardinality(); + double estimatedError = recordinality.estimatedStandarError(); + long permittedError = (long) (3*size*estimatedError); + long err = Math.abs(estimate - size); + + if (err > permittedError) ++counter; + + } + System.out.println("If counter (> 1) rerun the test. \nIf you have already done it, something is broken"); + System.out.println("Counter: " + counter); + assertTrue(counter < 2); + } +} From aad72b24dab32ff7157595ddd3dd5cd21a373d6b Mon Sep 17 00:00:00 2001 From: Jms Date: Fri, 22 Jan 2016 11:52:10 +0100 Subject: [PATCH 2/4] Added some comments to make easier the pull request correction. --- .../stream/cardinality/Recordinality.java | 18 ++++++++++++++++++ .../stream/cardinality/TestRecordinality.java | 3 +++ 2 files changed, 21 insertions(+) diff --git a/src/main/java/com/clearspring/analytics/stream/cardinality/Recordinality.java b/src/main/java/com/clearspring/analytics/stream/cardinality/Recordinality.java index 4b889d049..7fb8104e5 100644 --- a/src/main/java/com/clearspring/analytics/stream/cardinality/Recordinality.java +++ b/src/main/java/com/clearspring/analytics/stream/cardinality/Recordinality.java @@ -84,13 +84,20 @@ public Recordinality(int sampleSize) { this.rk = 0; } + /** + * Process the offered hash. + * You can find a pseudocode in the description links + */ public boolean offerHashed(long hashedLong) { + // if the element is not in the hashmap... if (!elements.contains(hashedLong)) { + //if we don't have k-values this is a k-max if (sampleSize > sampleSet.size()) { elements.add(hashedLong); sampleSet.add(hashedLong); rk+=1; return true; + // if we have k values but this is a k-max insert it and remove the minimum } else if (sampleSet.peek() < hashedLong) { elements.remove(sampleSet.peek()); elements.add(hashedLong); @@ -103,7 +110,9 @@ public boolean offerHashed(long hashedLong) { return false; } + @Override + //I don't give support to ints yet... public boolean offerHashed(int hashedInt) { throw new UnsupportedOperationException(); } @@ -114,6 +123,10 @@ public boolean offer(Object o) { return offerHashed(x); } + /** + * Return a estimation of distinct values + * You can find a pseudocode in the description links + */ @Override public long cardinality() { if (sampleSet.size() < sampleSize) return sampleSet.size(); @@ -125,6 +138,9 @@ public long cardinality() { } + /** + * Return a estimated Standar Error from the estimated cardinality + */ public double estimatedStandarError(){ if (sampleSet.size() < sampleSize) return 0; else { @@ -136,6 +152,8 @@ public double estimatedStandarError(){ } } + //Below I am not sure about how to process... + //TODO check the sizeof, getBytes and writeBytes @Override public int sizeof() { diff --git a/src/test/java/com/clearspring/analytics/stream/cardinality/TestRecordinality.java b/src/test/java/com/clearspring/analytics/stream/cardinality/TestRecordinality.java index 4c3bf479c..a1c33be08 100644 --- a/src/test/java/com/clearspring/analytics/stream/cardinality/TestRecordinality.java +++ b/src/test/java/com/clearspring/analytics/stream/cardinality/TestRecordinality.java @@ -64,6 +64,9 @@ public void testSerialization() throws IOException, ClassNotFoundException { assertEquals(r.cardinality(), r2.cardinality()); } + /** + * should not fail with probability 1/100 + */ @Test public void testHighCardinality() { int counter = 0; From f5085ad421da674a5cc801aae81ce7c70abaf7fa Mon Sep 17 00:00:00 2001 From: Jms Date: Fri, 22 Jan 2016 12:19:53 +0100 Subject: [PATCH 3/4] Added Recordinality paper information in the ReadMe. --- README.mdown | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.mdown b/README.mdown index b75d8f8af..86b118b8b 100644 --- a/README.mdown +++ b/README.mdown @@ -108,6 +108,9 @@ pages 693–703, London, UK, 2002. Springer-Verlag. Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm. Proceedings of the EDBT 2013 Conference, ACM, Genoa, Italy +* Ahmed Helmi, Jerémie Lumbroso, Conrado Martínez and Alfredo Viola. Data Streams as Random Permutations: +the Distinct Element Problem. 22nd International Meeting on Probabilistic, Combinatorial, and +Asymptotic Methods in the Analysis of Algorithms (AofA'12), 2012, Montreal, Canada. #### Top-K From 233fe8a76f5793c682731a988bf0953800f7d826 Mon Sep 17 00:00:00 2001 From: Jms Date: Wed, 3 Feb 2016 10:09:13 +0100 Subject: [PATCH 4/4] Some format errors fixed. Merge function throw an UnsupportedOperationException. Added an Ignore decorator to one test. --- .../stream/cardinality/Recordinality.java | 37 +------------------ .../stream/cardinality/TestRecordinality.java | 9 +---- 2 files changed, 3 insertions(+), 43 deletions(-) diff --git a/src/main/java/com/clearspring/analytics/stream/cardinality/Recordinality.java b/src/main/java/com/clearspring/analytics/stream/cardinality/Recordinality.java index 7fb8104e5..94fcfaf83 100644 --- a/src/main/java/com/clearspring/analytics/stream/cardinality/Recordinality.java +++ b/src/main/java/com/clearspring/analytics/stream/cardinality/Recordinality.java @@ -40,33 +40,15 @@ */ -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; -import java.io.Externalizable; import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; import java.io.Serializable; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.SortedMap; -import java.util.TreeMap; import java.util.PriorityQueue; import java.util.HashSet; -import java.util.Iterator; - import com.clearspring.analytics.hash.MurmurHash; -import com.clearspring.analytics.util.Bits; -import com.clearspring.analytics.util.IBuilder; -import com.clearspring.analytics.util.Varint; + public class Recordinality implements ICardinality, Serializable { @@ -112,7 +94,6 @@ public boolean offerHashed(long hashedLong) { @Override - //I don't give support to ints yet... public boolean offerHashed(int hashedInt) { throw new UnsupportedOperationException(); } @@ -152,9 +133,6 @@ public double estimatedStandarError(){ } } - //Below I am not sure about how to process... - - //TODO check the sizeof, getBytes and writeBytes @Override public int sizeof() { return sampleSet.size() * 8; @@ -178,19 +156,8 @@ private void writeBytes(DataOutput serializedByteStream) throws IOException { } } - //TODO check merge function @Override public ICardinality merge(ICardinality... estimators) throws CardinalityMergeException { - throw new RecordinalityMergeException("Cannot merge Recordinality"); - } - - @SuppressWarnings("serial") - protected static class RecordinalityMergeException extends CardinalityMergeException { - public RecordinalityMergeException(String message) { - super(message); - } + throw new UnsupportedOperationException(); } - - - } diff --git a/src/test/java/com/clearspring/analytics/stream/cardinality/TestRecordinality.java b/src/test/java/com/clearspring/analytics/stream/cardinality/TestRecordinality.java index a1c33be08..109ed52a4 100644 --- a/src/test/java/com/clearspring/analytics/stream/cardinality/TestRecordinality.java +++ b/src/test/java/com/clearspring/analytics/stream/cardinality/TestRecordinality.java @@ -19,15 +19,7 @@ import java.io.IOException; - -import java.util.Arrays; - import com.clearspring.analytics.TestUtils; - -import com.google.common.base.Charsets; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; - import org.junit.Ignore; import org.junit.Test; @@ -68,6 +60,7 @@ public void testSerialization() throws IOException, ClassNotFoundException { * should not fail with probability 1/100 */ @Test + @Ignore public void testHighCardinality() { int counter = 0; for (int j = 0; j < 3; ++j) {