From 8f4949d36ec15245e59c50485e46e8dc27a57f45 Mon Sep 17 00:00:00 2001 From: James Prince Date: Fri, 14 Feb 2020 14:37:41 +0000 Subject: [PATCH 1/4] Add support for double partitioning --- petastorm/reader.py | 4 +--- petastorm/tests/test_predicates.py | 8 ++++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/petastorm/reader.py b/petastorm/reader.py index 008bfac9d..6e5a7126d 100644 --- a/petastorm/reader.py +++ b/petastorm/reader.py @@ -532,9 +532,7 @@ def _apply_predicate_to_row_groups(self, dataset, row_groups, predicate): raise ValueError('predicate parameter is expected to be derived from PredicateBase') predicate_fields = predicate.get_fields() - if set(predicate_fields) == dataset.partitions.partition_names: - assert len(dataset.partitions.partition_names) == 1, \ - 'Datasets with only a single partition level supported at the moment' + if set(predicate_fields).issubset(dataset.partitions.partition_names): filtered_row_group_indexes = [] for piece_index, piece in enumerate(row_groups): diff --git a/petastorm/tests/test_predicates.py b/petastorm/tests/test_predicates.py index 07333bc0d..8f4830eb5 100644 --- a/petastorm/tests/test_predicates.py +++ b/petastorm/tests/test_predicates.py @@ -146,7 +146,6 @@ def test_predicate_on_single_column(synthetic_dataset): assert actual['id2'] < 2 assert counter == len(synthetic_dataset.data) - def test_predicate_on_partitioned_dataset(tmpdir): """ Generates a partitioned dataset and ensures that readers evaluate the type of the partition @@ -154,12 +153,14 @@ def test_predicate_on_partitioned_dataset(tmpdir): """ TestSchema = Unischema('TestSchema', [ UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False), + UnischemaField('id2', np.int32, (), ScalarCodec(IntegerType()), False), UnischemaField('test_field', np.int32, (), ScalarCodec(IntegerType()), False), ]) def test_row_generator(x): """Returns a single entry in the generated dataset.""" return {'id': x, + 'id2': x+1, 'test_field': x*x} rowgroup_size_mb = 256 @@ -177,12 +178,15 @@ def test_row_generator(x): spark.createDataFrame(rows_rdd, TestSchema.as_spark_schema()) \ .write \ - .partitionBy('id') \ + .partitionBy('id', 'id2') \ .parquet(dataset_url) with make_reader(dataset_url, predicate=in_lambda(['id'], lambda x: x == 3)) as reader: assert next(reader).id == 3 + with make_reader(dataset_url, predicate=in_lambda(['id2'], lambda x: x == 5)) as reader: + assert next(reader).id == 5 with make_reader(dataset_url, predicate=in_lambda(['id'], lambda x: x == '3')) as reader: with pytest.raises(StopIteration): # Predicate should have selected none, so a StopIteration should be raised. next(reader) + From db6f15c9dae09068dc35465d4f8f6db728633f3e Mon Sep 17 00:00:00 2001 From: James Prince Date: Fri, 14 Feb 2020 16:31:26 +0000 Subject: [PATCH 2/4] Rm blank line --- petastorm/tests/test_predicates.py | 1 - 1 file changed, 1 deletion(-) diff --git a/petastorm/tests/test_predicates.py b/petastorm/tests/test_predicates.py index 8f4830eb5..e61367572 100644 --- a/petastorm/tests/test_predicates.py +++ b/petastorm/tests/test_predicates.py @@ -189,4 +189,3 @@ def test_row_generator(x): with pytest.raises(StopIteration): # Predicate should have selected none, so a StopIteration should be raised. next(reader) - From bd6ddfda095c73a237494d7566b42410c8fc9ded Mon Sep 17 00:00:00 2001 From: James Prince Date: Fri, 14 Feb 2020 16:39:12 +0000 Subject: [PATCH 3/4] add empty line --- petastorm/tests/test_predicates.py | 1 + 1 file changed, 1 insertion(+) diff --git a/petastorm/tests/test_predicates.py b/petastorm/tests/test_predicates.py index e61367572..ff2747fa9 100644 --- a/petastorm/tests/test_predicates.py +++ b/petastorm/tests/test_predicates.py @@ -146,6 +146,7 @@ def test_predicate_on_single_column(synthetic_dataset): assert actual['id2'] < 2 assert counter == len(synthetic_dataset.data) + def test_predicate_on_partitioned_dataset(tmpdir): """ Generates a partitioned dataset and ensures that readers evaluate the type of the partition From 697f5c2294f4c01f457f2f1f87d1cf3333b6e5fe Mon Sep 17 00:00:00 2001 From: James Prince Date: Fri, 14 Feb 2020 16:39:29 +0000 Subject: [PATCH 4/4] Update test_predicates.py --- petastorm/tests/test_predicates.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/petastorm/tests/test_predicates.py b/petastorm/tests/test_predicates.py index ff2747fa9..13f8eb06f 100644 --- a/petastorm/tests/test_predicates.py +++ b/petastorm/tests/test_predicates.py @@ -146,7 +146,7 @@ def test_predicate_on_single_column(synthetic_dataset): assert actual['id2'] < 2 assert counter == len(synthetic_dataset.data) - + def test_predicate_on_partitioned_dataset(tmpdir): """ Generates a partitioned dataset and ensures that readers evaluate the type of the partition