From c385b6e14dd07967ba9763e5c779cca5469684bc Mon Sep 17 00:00:00 2001 From: yaroslav Date: Tue, 20 Aug 2024 13:02:25 +0300 Subject: [PATCH 1/3] DRILL-8507 Missing parquet columns quoted with backticks conflict with existing ones 1. In ParquetSchema#createMissingColumn replaced col.toExpr() to col.getAsUnescapedPath() so that missing column name wouldn't be quoted with backticks 2. Fixed a typo in UnionAllRecordBatch ("counthas" -> "counts") 3. In TestParquetFilterPushDown workarounded NumberFormatException with CONVERT_TO 4. Removed testCoalesceWithUntypedNullValues* test methods from TestCaseNullableTypes 5. Moved testCoalesceOnNotExistentColumns* test methods from TestUntypedNull to a separate TestParquetMissingColumns and made them expect Nullable Int instead of Untyped Null 6. Created new TestParquetPartiallyMissingColumns test class with test cases for "backticks problem" --- .../impl/union/UnionAllRecordBatch.java | 2 +- .../parquet/columnreaders/ParquetSchema.java | 3 +- .../drill/TestParquetMissingColumns.java | 91 ++++++++++++++++++ .../TestParquetPartiallyMissingColumns.java | 78 +++++++++++++++ .../org/apache/drill/TestUntypedNull.java | 73 -------------- .../logical/TestCaseNullableTypes.java | 21 ---- .../parquet/TestParquetFilterPushDown.java | 2 +- .../parquet/partially_missing/o_m/0.parquet | Bin 0 -> 1094 bytes .../parquet/partially_missing/o_m/1.parquet | Bin 0 -> 562 bytes 9 files changed, 172 insertions(+), 98 deletions(-) create mode 100644 exec/java-exec/src/test/java/org/apache/drill/TestParquetMissingColumns.java create mode 100644 exec/java-exec/src/test/java/org/apache/drill/TestParquetPartiallyMissingColumns.java create mode 100644 exec/java-exec/src/test/resources/parquet/partially_missing/o_m/0.parquet create mode 100644 exec/java-exec/src/test/resources/parquet/partially_missing/o_m/1.parquet diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index fad14184fa5..d2ab14f5aab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -144,7 +144,7 @@ public int getRecordCount() { private IterOutcome doWork(BatchStatusWrappper batchStatus, boolean newSchema) { Preconditions.checkArgument(batchStatus.batch.getSchema().getFieldCount() == container.getSchema().getFieldCount(), - "Input batch and output batch have different field counthas!"); + "Input batch and output batch have different field counts!"); if (newSchema) { createUnionAller(batchStatus.batch); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java index 53bced71012..2a99963edd8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java @@ -228,8 +228,7 @@ public void createNonExistentColumns(OutputMutator output, List | name | age + * - parquet/partially_missing/o_m/1.parquet: id + * + * So, by querying "age" or "name" columns we would trigger both 0.parquet reader to read the data and + * 1.parquet reader to create the missing column vector. + */ +public class TestParquetPartiallyMissingColumns extends ClusterTest { + + private static final SchemaBuilder ageSchema = + new SchemaBuilder().add("age", Types.optional(TypeProtos.MinorType.INT)); + + @BeforeClass + public static void setup() throws Exception { + startCluster(ClusterFixture.builder(dirTestWatcher)); + dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "partially_missing")); + } + + /* + Field name for the missing column MUST NOT be quoted with back-ticks, so we should have ONLY ONE + column for that field (unquoted) + */ + + @Test + public void testMissingColumnNamingWithOrderBy() throws Exception { + test("SELECT age FROM dfs.`parquet/partially_missing/o_m` ORDER BY age", ageSchema); + } + + @Test + public void testMissingColumnNamingWithUnionAll() throws Exception { + test("SELECT age FROM dfs.`parquet/partially_missing/o_m` UNION ALL (VALUES (1))", ageSchema); + } + + // Runs the query and verifies the result schema against the expected schema + private void test(String query, SchemaBuilder expectedSchemaBuilder) throws Exception { + BatchSchema expectedSchema = new BatchSchemaBuilder() + .withSchemaBuilder(expectedSchemaBuilder) + .build(); + + testBuilder() + .sqlQuery(query) + .schemaBaseLine(expectedSchema) + .go(); + } + +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java b/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java index bb707c64af4..d959334a6c7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java @@ -18,12 +18,7 @@ package org.apache.drill; import org.apache.drill.categories.SqlFunctionTest; -import org.apache.drill.common.types.TypeProtos; -import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.BatchSchemaBuilder; -import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.test.ClusterFixture; import org.apache.drill.test.ClusterFixtureBuilder; import org.apache.drill.test.ClusterTest; @@ -38,8 +33,6 @@ @Category(SqlFunctionTest.class) public class TestUntypedNull extends ClusterTest { - private static final TypeProtos.MajorType UNTYPED_NULL_TYPE = Types.optional(TypeProtos.MinorType.NULL); - @BeforeClass public static void setup() throws Exception { ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher); @@ -121,72 +114,6 @@ public void testTypeAndMode() throws Exception { assertEquals(0, summary.recordCount()); } - @Test - public void testCoalesceOnNotExistentColumns() throws Exception { - String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` limit 5"; - SchemaBuilder schemaBuilder = new SchemaBuilder() - .add("coal", UNTYPED_NULL_TYPE); - BatchSchema expectedSchema = new BatchSchemaBuilder() - .withSchemaBuilder(schemaBuilder) - .build(); - - testBuilder() - .sqlQuery(query) - .schemaBaseLine(expectedSchema) - .go(); - - testBuilder() - .sqlQuery(query) - .unOrdered() - .baselineColumns("coal") - .baselineValuesForSingleColumn(null, null, null, null, null) - .go(); - } - - @Test - public void testCoalesceOnNotExistentColumnsWithGroupBy() throws Exception { - String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` group by 1"; - SchemaBuilder schemaBuilder = new SchemaBuilder() - .add("coal", UNTYPED_NULL_TYPE); - BatchSchema expectedSchema = new BatchSchemaBuilder() - .withSchemaBuilder(schemaBuilder) - .build(); - - testBuilder() - .sqlQuery(query) - .schemaBaseLine(expectedSchema) - .go(); - - testBuilder() - .sqlQuery(query) - .unOrdered() - .baselineColumns("coal") - .baselineValuesForSingleColumn(new Object[] {null}) - .go(); - } - - @Test - public void testCoalesceOnNotExistentColumnsWithOrderBy() throws Exception { - String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` order by 1 limit 5"; - SchemaBuilder schemaBuilder = new SchemaBuilder() - .add("coal", UNTYPED_NULL_TYPE); - BatchSchema expectedSchema = new BatchSchemaBuilder() - .withSchemaBuilder(schemaBuilder) - .build(); - - testBuilder() - .sqlQuery(query) - .schemaBaseLine(expectedSchema) - .go(); - - testBuilder() - .sqlQuery(query) - .unOrdered() - .baselineColumns("coal") - .baselineValuesForSingleColumn(null, null, null, null, null) - .go(); - } - @Test public void testCoalesceOnNotExistentColumnsWithCoalesceInWhereClause() throws Exception { String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` where coalesce(unk1, unk2) > 10"; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestCaseNullableTypes.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestCaseNullableTypes.java index cf598275e99..02f8210d955 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestCaseNullableTypes.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestCaseNullableTypes.java @@ -157,25 +157,4 @@ public void testCaseNullableTypesDecimal() throws Exception { resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY); } } - - // Coalesce is being transformed to if-else cases - @Test - public void testCoalesceWithUntypedNullValues() throws Exception { - testBuilder() - .sqlQuery("select coalesce(coalesce(n_name1, n_name2, n_name), coalesce(n_name3, n_name4), n_name3) res from cp.`tpch/nation.parquet` limit 1") - .ordered() - .baselineColumns("res") - .baselineValues("ALGERIA") - .go(); - } - - @Test - public void testCoalesceWithUntypedNullValues_2() throws Exception { - testBuilder() - .sqlQuery("select coalesce(coalesce(n_name1, n_name2), n_name) res from cp.`tpch/nation.parquet` limit 1") - .ordered() - .baselineColumns("res") - .baselineValues("ALGERIA") - .go(); - } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java index 6ab7bbe6744..00c3be6a687 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java @@ -550,7 +550,7 @@ public void testBooleanPredicate() throws Exception { final String queryNotEqualFalse = "select col_bln from dfs.`parquetFilterPush/blnTbl` where not col_bln = false"; testParquetFilterPD(queryNotEqualFalse, 4, 2, false); - final String queryEqualTrueWithAnd = "select col_bln from dfs.`parquetFilterPush/blnTbl` where col_bln = true and unk_col = 'a'"; + final String queryEqualTrueWithAnd = "select col_bln from dfs.`parquetFilterPush/blnTbl` where col_bln = true and convert_to(unk_col, 'UTF8') = 'a'"; testParquetFilterPD(queryEqualTrueWithAnd, 0, 2, false); // File ff1.parquet has column with the values: false, null, false. diff --git a/exec/java-exec/src/test/resources/parquet/partially_missing/o_m/0.parquet b/exec/java-exec/src/test/resources/parquet/partially_missing/o_m/0.parquet new file mode 100644 index 0000000000000000000000000000000000000000..75fd2db8ec923686dc998943f32c57bd24c6d9fd GIT binary patch literal 1094 zcma)+Pm9w)7{(`mEOF_<#hq~?ImALk_h8vHyR9u{VOP7_Qg&;%yCMin(o7|gHZe_I zL_ByE4?PH;L{EMM51zzNAc7aaz!YGnIot)uF@ltp)lKq$+Wf0`o)ViT)K*$4< z@4_%kw#l}k)YCfBq6jJ^RizPR2U8#us;{RNBJx_oYviYZYslEsa8}*QaAJ}?VYW!i z*aIdnKW&LvScn#wM44NI$Oj2uCvQYtN5)j(2Ti%WfvtIM3eZ;!_`oBEx4dm-0kbQ;-Vhd+`B zFbLsYr%V$8D1!wkU}rwz@ebUTG{N7!PJS0NS%rZilj>hh5nhg4iUj?U&1?| z%R67gud6p=Q<|LfCFh(Z_4`8yBfJ3Z`{Vsnx3CV-zy_K=Gw?MRHIwQUgsvj6W)fr) z9bRNw#K1jLvm3{AzEpXdU=1d%BC3VrT0=0?GOd8GE&8?Lg4l z)S^b94q-qF?XA~#h`i0`bLaY73c%~-J~D1>Rk9NXEk9kv#f wAsm6%kwSXyUANQO+1e70=y=j~yms4d2iz6X^|k>8eBHDxxZf|hdLRClAIM^doB#j- literal 0 HcmV?d00001 From 5a775e302ce10d875465a9bb77f0425188ba38d5 Mon Sep 17 00:00:00 2001 From: Yaroslav Chernysh Date: Tue, 27 Aug 2024 14:27:34 +0300 Subject: [PATCH 2/3] DRILL-8508 Choosing the best suitable major type for a partially missing parquet column (minor type solution) 1. Passed an overall table schema from AbstractParquetRowGroupScan to ParquetSchema 2. In ParquetSchema#createMissingColumn used the minor type from that schema instead of hardcoding the INT --- .../AbstractParquetScanBatchCreator.java | 4 ++- .../columnreaders/ParquetRecordReader.java | 19 ++++++++---- .../parquet/columnreaders/ParquetSchema.java | 29 +++++++++++++----- .../parquet/columnreaders/ReadState.java | 5 ++- .../TestParquetPartiallyMissingColumns.java | 29 +++++++++++++++--- .../physical/unit/MiniPlanUnitTestBase.java | 3 +- .../parquet/ParquetRecordReaderTest.java | 2 +- .../parquet/partially_missing/m_o/0.parquet | Bin 0 -> 562 bytes .../parquet/partially_missing/m_o/1.parquet | Bin 0 -> 1094 bytes 9 files changed, 67 insertions(+), 24 deletions(-) create mode 100644 exec/java-exec/src/test/resources/parquet/partially_missing/m_o/0.parquet create mode 100644 exec/java-exec/src/test/resources/parquet/partially_missing/m_o/1.parquet diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java index 9bc53d6e788..50921554fca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java @@ -335,7 +335,9 @@ private Map createReaderAndImplicitColumns(ExecutorFragmentConte ccf, footer, rowGroupScan.getColumns(), - containsCorruptDates); + containsCorruptDates, + // each parquet SubScan shares the same table schema constructed by a GroupScan + rowGroupScan.getSchema()); } logger.debug("Query {} uses {}", diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index eeca2cabfe6..56aea74aebb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.parquet.columnreaders; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.CommonParquetRecordReader; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; @@ -72,6 +73,11 @@ public class ParquetRecordReader extends CommonParquetRecordReader { private final boolean useBulkReader; + /** + * See {@link ParquetSchema#tableSchema} + */ + private final TupleMetadata tableSchema; + public ParquetRecordReader(FragmentContext fragmentContext, Path path, int rowGroupIndex, @@ -80,8 +86,8 @@ public ParquetRecordReader(FragmentContext fragmentContext, CompressionCodecFactory codecFactory, ParquetMetadata footer, List columns, - ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) { - this(fragmentContext, numRecordsToRead, path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus); + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus, TupleMetadata tableSchema) { + this(fragmentContext, numRecordsToRead, path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus, tableSchema); } public ParquetRecordReader(FragmentContext fragmentContext, @@ -91,9 +97,9 @@ public ParquetRecordReader(FragmentContext fragmentContext, CompressionCodecFactory codecFactory, ParquetMetadata footer, List columns, - ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) { + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus, TupleMetadata tableSchema) { this(fragmentContext, footer.getBlocks().get(rowGroupIndex).getRowCount(), path, rowGroupIndex, fs, codecFactory, - footer, columns, dateCorruptionStatus); + footer, columns, dateCorruptionStatus, tableSchema); } public ParquetRecordReader( @@ -105,13 +111,14 @@ public ParquetRecordReader( CompressionCodecFactory codecFactory, ParquetMetadata footer, List columns, - ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) { + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus, TupleMetadata tableSchema) { super(footer, fragmentContext); this.hadoopPath = path; this.fileSystem = fs; this.codecFactory = codecFactory; this.rowGroupIndex = rowGroupIndex; this.dateCorruptionStatus = dateCorruptionStatus; + this.tableSchema = tableSchema; this.numRecordsToRead = initNumRecordsToRead(numRecordsToRead, rowGroupIndex, footer); this.useAsyncColReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val; this.useAsyncPageReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val; @@ -185,7 +192,7 @@ public ReadState getReadState() { @Override public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException { this.operatorContext = operatorContext; - ParquetSchema schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, footer, isStarQuery() ? null : getColumns()); + ParquetSchema schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, footer, isStarQuery() ? null : getColumns(), tableSchema); batchSizerMgr = new RecordBatchSizerManager(fragmentContext.getOptions(), schema, numRecordsToRead, new RecordBatchStatsContext(fragmentContext, operatorContext)); logger.debug("Reading {} records from row group({}) in file {}.", numRecordsToRead, rowGroupIndex, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java index 2a99963edd8..22668ebcf22 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java @@ -32,9 +32,10 @@ import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; -import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.ValueVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.SchemaElement; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -64,6 +65,14 @@ public final class ParquetSchema { private final int rowGroupIndex; private final ParquetMetadata footer; + /** + * Schema for the whole table constructed by a GroupScan from all the parquet files to read. + * If we don't find a selected column in our parquet file, type for the null-filled vector + * to create would be tried to find in this schema. That is, if some other parquet file contains + * the column, we'll take their type. Otherwise, default to Nullable Int. + */ + private final TupleMetadata tableSchema; + /** * List of metadata for selected columns. This list does two things. * First, it identifies the Parquet columns we wish to select. Second, it @@ -91,11 +100,12 @@ public final class ParquetSchema { * this is a SELECT * query */ - public ParquetSchema(OptionManager options, int rowGroupIndex, ParquetMetadata footer, Collection selectedCols) { + public ParquetSchema(OptionManager options, int rowGroupIndex, ParquetMetadata footer, Collection selectedCols, TupleMetadata tableSchema) { this.options = options; this.rowGroupIndex = rowGroupIndex; this.selectedCols = selectedCols; this.footer = footer; + this.tableSchema = tableSchema; if (selectedCols == null) { columnsFound = null; } else { @@ -206,7 +216,7 @@ private boolean columnSelected(ColumnDescriptor column) { * @throws SchemaChangeException should not occur */ - public void createNonExistentColumns(OutputMutator output, List nullFilledVectors) throws SchemaChangeException { + public void createNonExistentColumns(OutputMutator output, List nullFilledVectors) throws SchemaChangeException { List projectedColumns = Lists.newArrayList(selectedCols); for (int i = 0; i < columnsFound.length; i++) { SchemaPath col = projectedColumns.get(i); @@ -227,11 +237,14 @@ public void createNonExistentColumns(OutputMutator output, List buildChunkMap(BlockMetaData rowGroupMetadata) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java index 1325ddd5e51..f6b30d7a77a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java @@ -25,7 +25,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.store.parquet.ParquetReaderStats; import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager; -import org.apache.drill.exec.vector.NullableIntVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -51,7 +50,7 @@ public class ReadState { * at that position in the schema. Currently this requires a vector be present. Here is a list of all of these vectors * that need only have their value count set at the end of each call to next(), as the values default to null. */ - private List nullFilledVectors; + private List nullFilledVectors; private List> fixedLenColumnReaders = new ArrayList<>(); private final long totalNumRecordsToRead; // number of records to read @@ -229,4 +228,4 @@ public void close() { varLengthReader = null; } } -} \ No newline at end of file +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestParquetPartiallyMissingColumns.java b/exec/java-exec/src/test/java/org/apache/drill/TestParquetPartiallyMissingColumns.java index 34c6786a8b3..c622c153518 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestParquetPartiallyMissingColumns.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestParquetPartiallyMissingColumns.java @@ -14,11 +14,11 @@ /** * Covers querying a table in which some parquet files do contain selected columns, and - * others do not. + * others do not (or have them as OPTIONALs). * - * TODO: Expand this test to assert this behavior: - * TODO: 1) If at least 1 parquet file to be read has the column, take the minor type from there. - * TODO: Otherwise, default to INT. + * Expected behavior for the missing columns is following: + * 1) If at least 1 parquet file to be read has the column, take the minor type from there. + * Otherwise, default to INT. * TODO: 2) If at least 1 parquet file to be read doesn't have the column, or has it as OPTIONAL, * TODO: enforce the overall scan output schema to have it as OPTIONAL * @@ -28,12 +28,16 @@ * (not guaranteed though, but seems to work). We use such tables for such scenarios: * * - parquet/partially_missing/o_m -- optional, then missing + * - parquet/partially_missing/m_o -- missing, then optional * * These tables have these parquet files with such schemas: * * - parquet/partially_missing/o_m/0.parquet: id | name | age * - parquet/partially_missing/o_m/1.parquet: id * + * - parquet/partially_missing/m_0/0.parquet: id + * - parquet/partially_missing/m_0/1.parquet: id | name | age + * * So, by querying "age" or "name" columns we would trigger both 0.parquet reader to read the data and * 1.parquet reader to create the missing column vector. */ @@ -41,6 +45,8 @@ public class TestParquetPartiallyMissingColumns extends ClusterTest { private static final SchemaBuilder ageSchema = new SchemaBuilder().add("age", Types.optional(TypeProtos.MinorType.INT)); + private static final SchemaBuilder nameSchema = + new SchemaBuilder().add("name", Types.optional(TypeProtos.MinorType.VARCHAR)); @BeforeClass public static void setup() throws Exception { @@ -63,6 +69,21 @@ public void testMissingColumnNamingWithUnionAll() throws Exception { test("SELECT age FROM dfs.`parquet/partially_missing/o_m` UNION ALL (VALUES (1))", ageSchema); } + /* + If at least 1 file in the table has the selected column, the overall scan output schema should + take the MinorType for the column from there (and not default to Int) + */ + + @Test + public void testMissingColumnTypeGuessWithOrderBy() throws Exception { + test("SELECT name FROM dfs.`parquet/partially_missing/o_m` ORDER BY name", nameSchema); + } + + @Test + public void testMissingColumnTypeGuessWithUnionAll() throws Exception { + test("SELECT name FROM dfs.`parquet/partially_missing/m_o` UNION ALL (VALUES ('Bob'))", nameSchema); + } + // Runs the query and verifies the result schema against the expected schema private void test(String query, SchemaBuilder expectedSchemaBuilder) throws Exception { BatchSchema expectedSchema = new BatchSchemaBuilder() diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java index 784a1820292..6f8c2f36f90 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java @@ -460,7 +460,8 @@ private RecordBatch getScanBatch() throws Exception { ccf, footer, columnsToRead, - ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_NO_CORRUPTION)); + ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_NO_CORRUPTION, + null)); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 5f06d8bed01..5e625417c40 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -640,7 +640,7 @@ public void testPerformance() throws Exception { ); final ParquetRecordReader rr = new ParquetRecordReader(context, fileName, 0, fs, ccf, - f.getParquetMetadata(), columns, ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION); + f.getParquetMetadata(), columns, ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION, null); final TestOutputMutator mutator = new TestOutputMutator(allocator); rr.setup(null, mutator); final Stopwatch watch = Stopwatch.createStarted(); diff --git a/exec/java-exec/src/test/resources/parquet/partially_missing/m_o/0.parquet b/exec/java-exec/src/test/resources/parquet/partially_missing/m_o/0.parquet new file mode 100644 index 0000000000000000000000000000000000000000..543249e0b3da564cbb079a81dde60a16b1e21994 GIT binary patch literal 562 zcmZWn-AcnS7>%V$8D1!wkU}rwz@ebUTG{N7!PJS0NS%rZilj>hh5nhg4iUj?U&1?| z%R67gud6p=Q<|LfCFh(Z_4`8yBfJ3Z`{Vsnx3CV-zy_K=Gw?MRHIwQUgsvj6W)fr) z9bRNw#K1jLvm3{AzEpXdU=1d%BC3VrT0=0?GOd8GE&8?Lg4l z)S^b94q-qF?XA~#h`i0`bLaY73c%~-J~D1>Rk9NXEk9kv#f wAsm6%kwSXyUANQO+1e70=y=j~yms4d2iz6X^|k>8eBHDxxZf|hdLRClAIM^doB#j- literal 0 HcmV?d00001 diff --git a/exec/java-exec/src/test/resources/parquet/partially_missing/m_o/1.parquet b/exec/java-exec/src/test/resources/parquet/partially_missing/m_o/1.parquet new file mode 100644 index 0000000000000000000000000000000000000000..75fd2db8ec923686dc998943f32c57bd24c6d9fd GIT binary patch literal 1094 zcma)+Pm9w)7{(`mEOF_<#hq~?ImALk_h8vHyR9u{VOP7_Qg&;%yCMin(o7|gHZe_I zL_ByE4?PH;L{EMM51zzNAc7aaz!YGnIot)uF@ltp)lKq$+Wf0`o)ViT)K*$4< z@4_%kw#l}k)YCfBq6jJ^RizPR2U8#us;{RNBJx_oYviYZYslEsa8}*QaAJ}?VYW!i z*aIdnKW&LvScn#wM44NI$Oj2uCvQYtN5)j(2Ti%WfvtIM3eZ;!_`oBEx4dm-0kbQ;-Vhd+`B zFbLsYr Date: Tue, 27 Aug 2024 14:30:13 +0300 Subject: [PATCH 3/3] DRILL-8508 Choosing the best suitable major type for a partially missing parquet column (data mode solution) 1. Added TypeCastRules#getLeastRestrictiveMajorType method for convenience 2. In Metadata, added resolving data mode (so it always prefer less restrictive one) when collecting file schemas and merging them into a single table schema. Synchronized merging to accomplish that 3. In ParquetTableMetadataUtils made the column either found OPTIONAL or missing in any of the files be OPTIONAL in the overall table schema 4. For such cases, added enforcing OPTIONAL data mode in ParquetSchema, ParquetColumnMetadata and ColumnReaderFactory. Now even if the file has the column as REQUIRED, but we need it as OPTIONAL, the nullable column reader and nullable value vector would be created 5. Added "() -> 1" initialization for definitionLevels in PageReader so that nullable column reader would be able to read REQUIRED columns 6. Added testEnforcingOptional* test cases in TestParquetPartiallyMissingColumns --- .../drill/exec/resolver/TypeCastRules.java | 12 +++++++ .../parquet/ParquetTableMetadataUtils.java | 16 +++++---- .../columnreaders/ColumnReaderFactory.java | 13 +++++-- .../parquet/columnreaders/PageReader.java | 10 ++++++ .../columnreaders/ParquetColumnMetadata.java | 4 +-- .../parquet/columnreaders/ParquetSchema.java | 16 ++++++++- .../exec/store/parquet/metadata/Metadata.java | 31 +++++++++++++--- .../TestParquetPartiallyMissingColumns.java | 33 ++++++++++++++++-- .../parquet/partially_missing/r_m/0.parquet | Bin 0 -> 1081 bytes .../parquet/partially_missing/r_m/1.parquet | Bin 0 -> 562 bytes .../parquet/partially_missing/r_o/0.parquet | Bin 0 -> 1081 bytes .../parquet/partially_missing/r_o/1.parquet | Bin 0 -> 1101 bytes 12 files changed, 115 insertions(+), 20 deletions(-) create mode 100644 exec/java-exec/src/test/resources/parquet/partially_missing/r_m/0.parquet create mode 100644 exec/java-exec/src/test/resources/parquet/partially_missing/r_m/1.parquet create mode 100644 exec/java-exec/src/test/resources/parquet/partially_missing/r_o/0.parquet create mode 100644 exec/java-exec/src/test/resources/parquet/partially_missing/r_o/1.parquet diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java index d2c864683af..c109b573f59 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.resolver; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -751,6 +752,17 @@ public static MinorType getLeastRestrictiveType(MinorType... types) { return result; } + public static MajorType getLeastRestrictiveMajorType(MajorType... majorTypes) { + MinorType[] minorTypes = Arrays.stream(majorTypes).map(MajorType::getMinorType).toArray(MinorType[]::new); + DataMode[] dataModes = Arrays.stream(majorTypes).map(MajorType::getMode).toArray(DataMode[]::new); + MinorType leastRestrictiveMinorType = getLeastRestrictiveType(minorTypes); + DataMode leastRestrictiveDataMode = getLeastRestrictiveDataMode(dataModes); + return MajorType.newBuilder() + .setMinorType(leastRestrictiveMinorType) + .setMode(leastRestrictiveDataMode) + .build(); + } + /** * Finds the type in a given set that has the cheapest cast from a given * starting type. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java index 72420c29fd2..b26ff86b9cc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java @@ -19,6 +19,7 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.planner.common.DrillStatsTable; import org.apache.drill.exec.record.SchemaUtil; import org.apache.drill.metastore.metadata.BaseTableMetadata; @@ -52,6 +53,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import com.google.common.primitives.Longs; import org.apache.hadoop.fs.Path; import org.apache.parquet.io.api.Binary; @@ -661,6 +663,12 @@ static Map resolveFields(MetadataBase.ParquetT // row groups in the file have the same schema, so using the first one Map fileColumns = getFileFields(parquetTableMetadata, file); fileColumns.forEach((columnPath, type) -> putType(columns, columnPath, type)); + // If at least 1 parquet file to read doesn't contain a column, enforce this column + // DataMode to OPTIONAL in the overall table schema + for (SchemaPath column: Sets.symmetricDifference(columns.keySet(), fileColumns.keySet())) { + TypeProtos.MinorType minorType = columns.get(column).getMinorType(); + columns.put(column, Types.optional(minorType)); + } } return columns; } @@ -680,13 +688,7 @@ private static void putType(Map columns, Schem if (majorType == null) { columns.put(columnPath, type); } else if (!majorType.equals(type)) { - TypeProtos.MinorType leastRestrictiveType = TypeCastRules.getLeastRestrictiveType( - majorType.getMinorType(), - type.getMinorType() - ); - if (leastRestrictiveType != majorType.getMinorType()) { - columns.put(columnPath, type); - } + columns.put(columnPath, TypeCastRules.getLeastRestrictiveMajorType(majorType, type)); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java index d22c07d7bcd..aae079d01ff 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.parquet.columnreaders; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.vector.BigIntVector; import org.apache.drill.exec.vector.BitVector; @@ -73,7 +74,10 @@ static ColumnReader createFixedColumnReader(ParquetRecordReader recordReader, ConvertedType convertedType = schemaElement.getConverted_type(); // if the column is required, or repeated (in which case we just want to use this to generate our appropriate // ColumnReader for actually transferring data into the data vector inside of our repeated vector - if (descriptor.getMaxDefinitionLevel() == 0 || descriptor.getMaxRepetitionLevel() > 0) { + // Choose a reader based on a ValueVector DataMode since we might want to put + // parquet's REQUIRED column into a Drill's OPTIONAL ValueVector + // see ParquetSchema#tableSchema for details + if (v.getField().getDataMode() != TypeProtos.DataMode.OPTIONAL) { return getColumnReader(recordReader, fixedLength, descriptor, columnChunkMetaData, v, schemaElement, convertedType); } else { // if the column is nullable return getNullableColumnReader(recordReader, descriptor, @@ -86,8 +90,11 @@ static VarLengthValuesColumn getReader(ParquetRecordReader parentReader, Colu SchemaElement schemaElement ) throws ExecutionSetupException { ConvertedType convertedType = schemaElement.getConverted_type(); - switch (descriptor.getMaxDefinitionLevel()) { - case 0: + // Choose a reader based on a ValueVector DataMode since we might want to put + // parquet's REQUIRED column into a Drill's OPTIONAL ValueVector + // see ParquetSchema#tableSchema for details + switch (v.getField().getDataMode()) { + case REQUIRED: if (convertedType == null) { return new VarLengthColumnReaders.VarBinaryColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java index 738d13c4a82..b6af0528c16 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java @@ -488,6 +488,11 @@ protected int decodeLevels() throws IOException { ValuesReader dlReader = dlEncoding.getValuesReader(columnDescriptor, ValuesType.DEFINITION_LEVEL); dlReader.initFromPage(pageValueCount, dataStream); this.definitionLevels = new ValuesReaderIntIterator(dlReader); + } else { + // Even if all values in a page are REQUIRED, still initialize definitionLevels this way + // to be able to read such a column with NullableColumnReader and treat each value + // definition as 1 + this.definitionLevels = () -> 1; } dataOffset = (int) dataStream.position(); @@ -511,6 +516,11 @@ protected int decodeLevels() throws IOException { maxDefLevel, BytesInput.from(pageData.nioBuffer(repLevelLen, defLevelLen)) ); + } else { + // Even if all values in a page are REQUIRED, still initialize definitionLevels this way + // to be able to read such a column with NullableColumnReader and treat each value + // definition as 1 + this.definitionLevels = () -> 1; } dataOffset = repLevelLen + defLevelLen; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java index 62b3b56c38c..549b725848a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java @@ -56,10 +56,10 @@ public ParquetColumnMetadata(ColumnDescriptor column) { this.column = column; } - public void resolveDrillType(Map schemaElements, OptionManager options) { + public void resolveDrillType(Map schemaElements, OptionManager options, boolean isEnforcedOptional) { se = schemaElements.get(ParquetReaderUtility.getFullColumnPath(column)); type = ParquetToDrillTypeConverter.toMajorType(column.getType(), column.getTypeLength(), - getDataMode(column), se, options); + isEnforcedOptional ? DataMode.OPTIONAL : getDataMode(column), se, options); field = MaterializedField.create(toFieldName(column.getPath()).getLastSegment().getNameSegment().getPath(), type); length = getDataTypeLength(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java index 22668ebcf22..db59ddc6e1a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java @@ -70,6 +70,11 @@ public final class ParquetSchema { * If we don't find a selected column in our parquet file, type for the null-filled vector * to create would be tried to find in this schema. That is, if some other parquet file contains * the column, we'll take their type. Otherwise, default to Nullable Int. + * Also, if at least 1 file does not contain the selected column, then the overall table schema + * should have this field with OPTIONAL data mode. GroupScan catches this case and sets the + * appropriate data mode in this schema. Our mission here is to enforce that OPTIONAL mode in our + * output schema, even if the particular parquet file we're reading from has this field REQUIRED, + * to provide consistency across all scan batches. */ private final TupleMetadata tableSchema; @@ -137,7 +142,7 @@ private void loadParquetSchema() { // loop to add up the length of the fixed width columns and build the schema for (ColumnDescriptor column : footer.getFileMetaData().getSchema().getColumns()) { ParquetColumnMetadata columnMetadata = new ParquetColumnMetadata(column); - columnMetadata.resolveDrillType(schemaElements, options); + columnMetadata.resolveDrillType(schemaElements, options, shouldEnforceOptional(column)); if (!columnSelected(column)) { continue; } @@ -145,6 +150,15 @@ private void loadParquetSchema() { } } + private boolean shouldEnforceOptional(ColumnDescriptor column) { + String columnName = SchemaPath.getCompoundPath(column.getPath()).getAsUnescapedPath(); + MaterializedField tableField; + if (tableSchema == null || (tableField = tableSchema.column(columnName)) == null) { + return false; + } + return tableField.getDataMode() == DataMode.OPTIONAL; + } + /** * Fixed-width fields are the easiest to plan. We know the size of each column, * making it easy to determine the total length of each vector, once we know diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java index e9850dae3ad..cf718e8b18d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java @@ -54,6 +54,7 @@ import java.io.OutputStream; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -279,15 +280,16 @@ private Pair createMetaFilesR } else { for (ColumnTypeMetadata_v4.Key key : subTableColumnTypeInfo.keySet()) { ColumnTypeMetadata_v4 columnTypeMetadata_v4 = columnTypeInfoSet.get(key); - if (columnTypeMetadata_v4 == null) { - columnTypeMetadata_v4 = subTableColumnTypeInfo.get(key); + ColumnTypeMetadata_v4 subTableColumnTypeMetadata_v4 = subTableColumnTypeInfo.get(key); + if (columnTypeMetadata_v4 == null || columnTypeMetadata_v4.repetition.isMoreRestrictiveThan(subTableColumnTypeMetadata_v4.repetition)) { + columnTypeMetadata_v4 = subTableColumnTypeMetadata_v4; } else { // If the existing total null count or the null count of the child file is unknown(-1), update the total null count // as unknown - if (subTableColumnTypeInfo.get(key).totalNullCount < 0 || columnTypeMetadata_v4.totalNullCount < 0) { + if (subTableColumnTypeMetadata_v4.totalNullCount < 0 || columnTypeMetadata_v4.totalNullCount < 0) { columnTypeMetadata_v4.totalNullCount = NULL_COUNT_NOT_EXISTS; } else { - columnTypeMetadata_v4.totalNullCount = columnTypeMetadata_v4.totalNullCount + subTableColumnTypeInfo.get(key).totalNullCount; + columnTypeMetadata_v4.totalNullCount = columnTypeMetadata_v4.totalNullCount + subTableColumnTypeMetadata_v4.totalNullCount; } } columnTypeInfoSet.put(key, columnTypeMetadata_v4); @@ -516,10 +518,29 @@ public static ParquetFileAndRowCountMetadata getParquetFileMetadata_v4(ParquetTa FileMetadataCollector metadataCollector = new FileMetadataCollector(metadata, file, fs, allColumnsInteresting, skipNonInteresting, columnSet, readerConfig); - parquetTableMetadata.metadataSummary.columnTypeInfo.putAll(metadataCollector.getColumnTypeInfo()); + mergeColumns(parquetTableMetadata.metadataSummary.columnTypeInfo, metadataCollector.getColumnTypeInfo()); return metadataCollector.getFileMetadata(); } + /** + * Merges myColumns into resultColumns map with the least restrictive repetition resolution + * @param resultColumns - overall columns map from all the files + * @param myColumns - columns from a particular file to merge into resultColumns + */ + private static synchronized void mergeColumns(Map resultColumns, + Map myColumns) { + Map columnsToMerge = new HashMap<>(myColumns); + for (ColumnTypeMetadata_v4.Key key: columnsToMerge.keySet()) { + ColumnTypeMetadata_v4 columnToMerge = columnsToMerge.get(key); + ColumnTypeMetadata_v4 resultColumn = resultColumns.get(key); + if (resultColumn != null && columnToMerge.repetition.isMoreRestrictiveThan(resultColumn.repetition)) { + columnToMerge.repetition = resultColumn.repetition; + } + } + resultColumns.putAll(columnsToMerge); + } + + /** * Serialize parquet metadata to json and write to a file. * diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestParquetPartiallyMissingColumns.java b/exec/java-exec/src/test/java/org/apache/drill/TestParquetPartiallyMissingColumns.java index c622c153518..7e316fdf57a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestParquetPartiallyMissingColumns.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestParquetPartiallyMissingColumns.java @@ -19,8 +19,8 @@ * Expected behavior for the missing columns is following: * 1) If at least 1 parquet file to be read has the column, take the minor type from there. * Otherwise, default to INT. - * TODO: 2) If at least 1 parquet file to be read doesn't have the column, or has it as OPTIONAL, - * TODO: enforce the overall scan output schema to have it as OPTIONAL + * 2) If at least 1 parquet file to be read doesn't have the column, or has it as OPTIONAL, + * enforce the overall scan output schema to have it as OPTIONAL * * We need to control ordering of scanning batches to cover different erroneous cases, and we assume * that parquet files in a table would be read in alphabetic order (not a real use case though). So @@ -29,6 +29,8 @@ * * - parquet/partially_missing/o_m -- optional, then missing * - parquet/partially_missing/m_o -- missing, then optional + * - parquet/partially_missing/r_m -- required, then missing + * - parquet/partially_missing/r_o -- required, then optional * * These tables have these parquet files with such schemas: * @@ -38,6 +40,12 @@ * - parquet/partially_missing/m_0/0.parquet: id * - parquet/partially_missing/m_0/1.parquet: id | name | age * + * - parquet/partially_missing/r_m/0.parquet: id | name | age + * - parquet/partially_missing/r_m/1.parquet: id + * + * - parquet/partially_missing/r_o/0.parquet: id | name | age + * - parquet/partially_missing/r_o/1.parquet: id | name | age + * * So, by querying "age" or "name" columns we would trigger both 0.parquet reader to read the data and * 1.parquet reader to create the missing column vector. */ @@ -84,6 +92,27 @@ public void testMissingColumnTypeGuessWithUnionAll() throws Exception { test("SELECT name FROM dfs.`parquet/partially_missing/m_o` UNION ALL (VALUES ('Bob'))", nameSchema); } + /* + If at least 1 file in the table doesn't have the selected column, or has it as OPTIONAL, + the overall scan output schema should have this column as OPTIONAL + */ + + @Test + public void testEnforcingOptionalWithOrderBy() throws Exception { + test("SELECT age FROM dfs.`parquet/partially_missing/r_o` ORDER BY age", ageSchema); + test("SELECT age FROM dfs.`parquet/partially_missing/r_m` ORDER BY age", ageSchema); + test("SELECT name FROM dfs.`parquet/partially_missing/r_o` ORDER BY name", nameSchema); + test("SELECT name FROM dfs.`parquet/partially_missing/r_m` ORDER BY name", nameSchema); + } + + @Test + public void testEnforcingOptionalWithUnionAll() throws Exception { + test("SELECT age FROM dfs.`parquet/partially_missing/r_o` UNION ALL (VALUES (1))", ageSchema); + test("SELECT age FROM dfs.`parquet/partially_missing/r_m` UNION ALL (VALUES (1))", ageSchema); + test("SELECT name FROM dfs.`parquet/partially_missing/r_o` UNION ALL (VALUES ('Bob'))", nameSchema); + test("SELECT name FROM dfs.`parquet/partially_missing/r_m` UNION ALL (VALUES ('Bob'))", nameSchema); + } + // Runs the query and verifies the result schema against the expected schema private void test(String query, SchemaBuilder expectedSchemaBuilder) throws Exception { BatchSchema expectedSchema = new BatchSchemaBuilder() diff --git a/exec/java-exec/src/test/resources/parquet/partially_missing/r_m/0.parquet b/exec/java-exec/src/test/resources/parquet/partially_missing/r_m/0.parquet new file mode 100644 index 0000000000000000000000000000000000000000..4dfdc2a4978c34b5d91deebcece234046b2fc68d GIT binary patch literal 1081 zcma)+L66cv6vwAf*@mET?<u`WO^)| z`X0`};R2#5#~!2*&cRgltO+ZitD>UBXjn>HkJ%uDpgKivXp#bi0+M8Y3d1UClBOm# zl17rQ2r9;HB@tu?KOhroY$q)u@;Z&T$aewPk=9mlE&;~5W28w;t=*^qhIdbo-B=+J zvak!0cWHc)yh-8fNSh1G>Uh=5^5z26$GxJlalF32(F%20-={PtGq11o$oV^Jvx{m3ubY&r^^Ao4@{Tj1W+)qG-ZK09`pYpc(*s=5#yJI{mJWXQt0 zs>9s1hRm(2-Mt&F?qR=mf6(c-Zozu=EC7=Eu-7{5?3+e8`{E>xLsxm_)A0_>erNyo z|A64wD$h6_@3=lhFRfd16gm+*2-s+P)1L&cV|fu3qj1I$-Hf%cH1TcbmUtsniv1kU z#)w{6=ZsuyI;FY9@5I{^xn|x`Xazm~NFKl>fUh?_Gq&6*B)hx7U73XHQxF{AQ}vR* zThi6yaOS`xs#TfIYL#n7wYs~rW9xRc#tgkysTh@^W!TK9l_45h+j0Yb70Hb{X@q+4 Gr|}1+HRw42 literal 0 HcmV?d00001 diff --git a/exec/java-exec/src/test/resources/parquet/partially_missing/r_m/1.parquet b/exec/java-exec/src/test/resources/parquet/partially_missing/r_m/1.parquet new file mode 100644 index 0000000000000000000000000000000000000000..543249e0b3da564cbb079a81dde60a16b1e21994 GIT binary patch literal 562 zcmZWn-AcnS7>%V$8D1!wkU}rwz@ebUTG{N7!PJS0NS%rZilj>hh5nhg4iUj?U&1?| z%R67gud6p=Q<|LfCFh(Z_4`8yBfJ3Z`{Vsnx3CV-zy_K=Gw?MRHIwQUgsvj6W)fr) z9bRNw#K1jLvm3{AzEpXdU=1d%BC3VrT0=0?GOd8GE&8?Lg4l z)S^b94q-qF?XA~#h`i0`bLaY73c%~-J~D1>Rk9NXEk9kv#f wAsm6%kwSXyUANQO+1e70=y=j~yms4d2iz6X^|k>8eBHDxxZf|hdLRClAIM^doB#j- literal 0 HcmV?d00001 diff --git a/exec/java-exec/src/test/resources/parquet/partially_missing/r_o/0.parquet b/exec/java-exec/src/test/resources/parquet/partially_missing/r_o/0.parquet new file mode 100644 index 0000000000000000000000000000000000000000..4dfdc2a4978c34b5d91deebcece234046b2fc68d GIT binary patch literal 1081 zcma)+L66cv6vwAf*@mET?<u`WO^)| z`X0`};R2#5#~!2*&cRgltO+ZitD>UBXjn>HkJ%uDpgKivXp#bi0+M8Y3d1UClBOm# zl17rQ2r9;HB@tu?KOhroY$q)u@;Z&T$aewPk=9mlE&;~5W28w;t=*^qhIdbo-B=+J zvak!0cWHc)yh-8fNSh1G>Uh=5^5z26$GxJlalF32(F%20-={PtGq11o$oV^Jvx{m3ubY&r^^Ao4@{Tj1W+)qG-ZK09`pYpc(*s=5#yJI{mJWXQt0 zs>9s1hRm(2-Mt&F?qR=mf6(c-Zozu=EC7=Eu-7{5?3+e8`{E>xLsxm_)A0_>erNyo z|A64wD$h6_@3=lhFRfd16gm+*2-s+P)1L&cV|fu3qj1I$-Hf%cH1TcbmUtsniv1kU z#)w{6=ZsuyI;FY9@5I{^xn|x`Xazm~NFKl>fUh?_Gq&6*B)hx7U73XHQxF{AQ}vR* zThi6yaOS`xs#TfIYL#n7wYs~rW9xRc#tgkysTh@^W!TK9l_45h+j0Yb70Hb{X@q+4 Gr|}1+HRw42 literal 0 HcmV?d00001 diff --git a/exec/java-exec/src/test/resources/parquet/partially_missing/r_o/1.parquet b/exec/java-exec/src/test/resources/parquet/partially_missing/r_o/1.parquet new file mode 100644 index 0000000000000000000000000000000000000000..5f0812d395a6e4042cc5d2588f90f4941682cb34 GIT binary patch literal 1101 zcma)+?`zXQ7{@O^Rzj)ZaCclJNE|e5FD%QlwYHSuoUOKmt#&O#5k|UPC6G2TO&ub> z^@T6@!ngWre~15r;u{6u2>uO%g3skT>^5JRyXNk>=ef^w&o^m0+g%$Yd==V{cW+Kr z1FK+aSVP$*!5!=y&^~^D{Yw$<9Cyp$QltgA@31KJAj*LD^UM2hS1v{=O^B+&PhXUE zs=@@pkg;W*Af2etg>xZ;=~Rpu+|X6slI7Xv3`12;W4$LB=a^Ux6gbPKPs$*}&ifB_cj4T+I6z?hP=MdyK^tz=P zKqw+j=esbhk}a}jX?0PFD2t$#tS(_ec8~=!q57(*5s?!GFOjbjt|6;w;DWxHOJZ-4qLlDXFdPwGL1v$dFvN>d!1fuZ+F{0&)e^HTf6uF1q8|FdCvdw&d!JE&GXV6 zCqc^gV|FyziAHf4c;l3+X)Uu{mUtml%k&&fhlpM~&p8Uc$%Gb!-a^|; zxu)ae#EU!pksQDzhWDK=483py$=WmAtx;k=2S*T%O}k`olx%ZlFb&`lRV&PA)$(np qQrTEr^KHLUWsY4fm!0y!bA0Ah*TEWit9l*&CZeOq3PK(Dko*Dcpz5Fi literal 0 HcmV?d00001