From fad2a5f6b66311bc340c0e9a9f91b10a76072709 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Thu, 13 Nov 2025 10:51:43 +0800 Subject: [PATCH] Init all series writer for AlignedChunkGroupWriter --- .../org/apache/tsfile/write/TsFileWriter.java | 12 ++- .../tsfile/write/TsFileWriteApiTest.java | 84 +++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java index d5ac5dedf..981a043bb 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java @@ -462,11 +462,13 @@ private List checkIsAllMeasurementsInGroup( return schemas; } - private IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId, boolean isAligned) { + private IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId, boolean isAligned) + throws IOException { IChunkGroupWriter groupWriter; if (!groupWriters.containsKey(deviceId)) { if (isAligned) { groupWriter = new AlignedChunkGroupWriterImpl(deviceId); + initAllSeriesWriterForAlignedSeries((AlignedChunkGroupWriterImpl) groupWriter, deviceId); if (!isUnseq) { // Sequence File ((AlignedChunkGroupWriterImpl) groupWriter) .setLastTime(alignedDeviceLastTimeMap.get(deviceId)); @@ -486,6 +488,14 @@ private IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId, boolean is return groupWriter; } + private void initAllSeriesWriterForAlignedSeries( + AlignedChunkGroupWriterImpl alignedChunkGroupWriter, IDeviceID deviceID) throws IOException { + MeasurementGroup deviceSchema = schema.getSeriesSchema(new Path(deviceID)); + for (MeasurementSchema measurementSchema : deviceSchema.getMeasurementSchemaMap().values()) { + alignedChunkGroupWriter.tryToAddSeriesWriter(measurementSchema); + } + } + /** * write a record in type of T. * diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java index 5586fb222..ad6bd79d5 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java @@ -25,10 +25,13 @@ import org.apache.tsfile.file.MetaMarker; import org.apache.tsfile.file.header.ChunkHeader; import org.apache.tsfile.file.header.PageHeader; +import org.apache.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.tsfile.file.metadata.ChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.PlainDeviceID; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.fileSystem.FSFactoryProducer; +import org.apache.tsfile.read.TsFileDeviceIterator; import org.apache.tsfile.read.TsFileReader; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.common.Chunk; @@ -36,6 +39,7 @@ import org.apache.tsfile.read.expression.QueryExpression; import org.apache.tsfile.read.query.dataset.QueryDataSet; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsFileGeneratorUtils; import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.tsfile.write.chunk.ChunkWriterImpl; @@ -54,6 +58,7 @@ import java.nio.ByteBuffer; import java.time.LocalDate; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -810,4 +815,83 @@ public void writeTsFileByFlushingPageDirectly() throws IOException, WriteProcess throw throwable; } } + + @Test + public void testWriteSomeColumnsOfTree() throws IOException, WriteProcessException { + List fullMeasurementSchemas = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)); + List measurementSchemas1 = + Arrays.asList(new MeasurementSchema("s1", TSDataType.INT32)); + String device = "root.test.d1"; + Tablet tablet1 = new Tablet(device, fullMeasurementSchemas); + Tablet tablet2 = new Tablet(device, measurementSchemas1); + for (int i = 0; i < 1000; i++) { + tablet1.addTimestamp(i, i); + tablet1.addValue("s1", i, 1); + tablet1.addValue("s2", i, 1); + tablet1.addValue("s3", i, 1); + } + tablet1.rowSize = 1000; + for (int i = 0; i < 1000; i++) { + tablet2.addTimestamp(i, i + 1005); + tablet2.addValue("s1", i, 0); + } + tablet2.rowSize = 1000; + try (TsFileWriter writer = new TsFileWriter(f)) { + writer.registerAlignedTimeseries(new Path(device), fullMeasurementSchemas); + writer.writeAligned(tablet1); + writer.flushAllChunkGroups(); + writer.writeAligned(tablet2); + writer.flushAllChunkGroups(); + } + try (TsFileSequenceReader reader = new TsFileSequenceReader(f.getPath())) { + TsFileDeviceIterator deviceIterator = reader.getAllDevicesIteratorWithIsAligned(); + while (deviceIterator.hasNext()) { + Pair pair = deviceIterator.next(); + List alignedChunkMetadataList = + reader.getAlignedChunkMetadataByMetadataIndexNode( + pair.getLeft(), deviceIterator.getFirstMeasurementNodeOfCurrentDevice()); + Assert.assertFalse(alignedChunkMetadataList.isEmpty()); + Assert.assertEquals(3, alignedChunkMetadataList.get(0).getValueChunkMetadataList().size()); + Assert.assertEquals( + 1000, + alignedChunkMetadataList + .get(0) + .getValueChunkMetadataList() + .get(0) + .getStatistics() + .getCount()); + Assert.assertEquals( + 1000, + alignedChunkMetadataList + .get(0) + .getValueChunkMetadataList() + .get(1) + .getStatistics() + .getCount()); + Assert.assertEquals( + 1000, + alignedChunkMetadataList + .get(0) + .getValueChunkMetadataList() + .get(2) + .getStatistics() + .getCount()); + Assert.assertEquals(3, alignedChunkMetadataList.get(1).getValueChunkMetadataList().size()); + Assert.assertEquals( + 1000, + alignedChunkMetadataList + .get(1) + .getValueChunkMetadataList() + .get(0) + .getStatistics() + .getCount()); + Assert.assertNull(alignedChunkMetadataList.get(1).getValueChunkMetadataList().get(1)); + Assert.assertNull(alignedChunkMetadataList.get(1).getValueChunkMetadataList().get(2)); + } + } + } }