Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.claude
CLAUDE.md
.cursor*
.kiro*

# intellij files
.idea/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.exec.DataFormat;
import org.opensearch.index.engine.exec.FieldAssignments;
import org.opensearch.index.engine.exec.FieldSupportRegistry;
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
import com.parquet.parquetdataformat.bridge.RustBridge;
import com.parquet.parquetdataformat.engine.ParquetExecutionEngine;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.FormatStoreDirectory;
import org.opensearch.index.store.GenericStoreDirectory;
import org.opensearch.plugins.DataSourcePlugin;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.spi.vectorized.DataSourceCodec;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -82,8 +85,15 @@ public class ParquetDataFormatPlugin extends Plugin implements DataSourcePlugin

@Override
@SuppressWarnings("unchecked")
public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings) {
return (IndexingExecutionEngine<T>) new ParquetExecutionEngine(settings, () -> ArrowSchemaBuilder.getSchema(mapperService), shardPath, indexSettings);
public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(EngineConfig engineConfig, MapperService mapperService, boolean isPrimary, ShardPath shardPath, IndexSettings indexSettings, FieldAssignments fieldAssignments) {
ParquetExecutionEngine engine = new ParquetExecutionEngine(
settings,
isPrimary,
() -> ArrowSchemaBuilder.getSchema(mapperService, isPrimary),
shardPath,
indexSettings
);
return (IndexingExecutionEngine<T>) engine;
}

@Override
Expand All @@ -109,6 +119,12 @@ public DataFormat getDataFormat() {
return new ParquetDataFormat();
}

// In case of Parquet with multi-datasource, it will act as source of truth
@Override
public boolean isPrimary() {
return true;
}

@Override
public Optional<Map<org.opensearch.plugins.spi.vectorized.DataFormat, DataSourceCodec>> getDataSourceCodecs() {
Map<org.opensearch.plugins.spi.vectorized.DataFormat, DataSourceCodec> codecs = new HashMap<>();
Expand Down Expand Up @@ -136,6 +152,15 @@ public BlobContainer createBlobContainer(BlobStore blobStore, BlobPath baseBlobP
return blobStore.blobContainer(formatPath);
}

@Override
public void registerFieldSupport(FieldSupportRegistry registry) {
DataFormat parquet = getDataFormat();
for (Map.Entry<String, com.parquet.parquetdataformat.fields.ParquetField> entry :
com.parquet.parquetdataformat.fields.ArrowFieldRegistry.getRegisteredFields().entrySet()) {
registry.register(entry.getKey(), parquet, entry.getValue().getFieldCapabilities());
}
}

@Override
public List<Setting<?>> getSettings() {
return List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.index.engine.exec.DocumentInput;
import org.opensearch.index.mapper.MappedFieldType;
Expand All @@ -27,10 +26,13 @@ public static Schema getSchema() {
public static void populateDocumentInput(DocumentInput<?> documentInput) {
MappedFieldType idField = FieldTypeConverter.convertToMappedFieldType(ID, new ArrowType.Int(32, true));
documentInput.addField(idField, generateRandomId());

MappedFieldType nameField = FieldTypeConverter.convertToMappedFieldType(NAME, new ArrowType.Utf8());
documentInput.addField(nameField, generateRandomName());

MappedFieldType designationField = FieldTypeConverter.convertToMappedFieldType(DESIGNATION, new ArrowType.Utf8());
documentInput.addField(designationField, generateRandomDesignation());

MappedFieldType salaryField = FieldTypeConverter.convertToMappedFieldType(SALARY, new ArrowType.Int(32, true));
documentInput.addField(salaryField, random.nextInt(100000));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ public void configureStore() {

@Override
public boolean equals(Object obj) {
return true;
if (this == obj) return true;
if (!(obj instanceof DataFormat)) return false;
return name().equals(((DataFormat) obj).name());
}

@Override
public int hashCode() {
return 0;
return name().hashCode();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import com.parquet.parquetdataformat.fields.ArrowFieldRegistry;
import org.opensearch.index.engine.exec.DataFormat;
import org.opensearch.index.engine.exec.EngineRole;
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
import org.opensearch.index.engine.exec.Merger;
import org.opensearch.index.engine.exec.RefreshInput;
Expand Down Expand Up @@ -75,9 +77,11 @@ public class ParquetExecutionEngine implements IndexingExecutionEngine<ParquetDa
private final ParquetMerger parquetMerger;
private final ArrowBufferPool arrowBufferPool;
private final IndexSettings indexSettings;
private final boolean isPrimaryEngine;

public ParquetExecutionEngine(
Settings settings,
boolean isPrimaryEngine,
Supplier<Schema> schema,
ShardPath shardPath,
IndexSettings indexSettings
Expand All @@ -87,7 +91,7 @@ public ParquetExecutionEngine(
this.arrowBufferPool = new ArrowBufferPool(settings);
this.indexSettings = indexSettings;
this.parquetMerger = new ParquetMergeExecutor(CompactionStrategy.RECORD_BATCH, indexSettings.getIndex().getName());

this.isPrimaryEngine = isPrimaryEngine;
// Push current settings to Rust store once on construction, then keep in sync on updates
pushSettingsToRust(indexSettings);

Expand Down Expand Up @@ -131,7 +135,7 @@ public void deleteFiles(Map<String, Collection<String>> filesToDelete) {
Collection<String> parquetFilesToDelete = filesToDelete.get(PARQUET_DATA_FORMAT.name());
for (String fileName : parquetFilesToDelete) {
Path filePath = Paths.get(fileName);
logger.info("Deleting file [ParquetExecutionEngine]: {}", filePath);
// logger.info("Deleting file [ParquetExecutionEngine]: {}", filePath);
try {
Files.delete(filePath);
} catch (Exception e) {
Expand All @@ -143,14 +147,15 @@ public void deleteFiles(Map<String, Collection<String>> filesToDelete) {
}

@Override
public List<String> supportedFieldTypes() {
return List.of();
public List<String> supportedFieldTypes(boolean isPrimaryEngine) {
return new java.util.ArrayList<>(ArrowFieldRegistry.getRegisteredFieldNames());
}

@Override
public Writer<ParquetDocumentInput> createWriter(long writerGeneration) {
String fileName = Path.of(shardPath.getDataPath().toString(), getDataFormat().name(), FILE_NAME_PREFIX + "_" + writerGeneration + FILE_NAME_EXT).toString();
return new ParquetWriter(fileName, schema.get(), writerGeneration, arrowBufferPool, indexSettings);
EngineRole role = isPrimaryEngine ? EngineRole.PRIMARY : EngineRole.SECONDARY;
return new ParquetWriter(fileName, schema.get(), writerGeneration, arrowBufferPool, indexSettings, role);
}

@Override
Expand All @@ -174,8 +179,8 @@ public long getNativeBytesUsed() {
long vsrMemory = arrowBufferPool.getTotalAllocatedBytes();
String shardDataPath = shardPath.getDataPath().toString();
long filteredArrowWriterMemory = RustBridge.getFilteredNativeBytesUsed(shardDataPath);
logger.debug("Native memory used by VSR Buffer Pool: {}", vsrMemory);
logger.debug("Native memory used by ArrowWriters in shard path {}: {}", shardDataPath, filteredArrowWriterMemory);
// logger.debug("Native memory used by VSR Buffer Pool: {}", vsrMemory);
// logger.debug("Native memory used by ArrowWriters in shard path {}: {}", shardDataPath, filteredArrowWriterMemory);
return vsrMemory + filteredArrowWriterMemory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class ParquetDataSourceCodec implements DataSourceCodec {
static {
try {
//JniLibraryLoader.loadLibrary();
logger.info("DataFusion JNI library loaded successfully");
// logger.info("DataFusion JNI library loaded successfully");
} catch (Exception e) {
logger.error("Failed to load DataFusion JNI library", e);
throw new RuntimeException("Failed to initialize DataFusion JNI library", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private static void registerCorePlugins() {
// Register core data fields
registerPlugin(new CoreDataFieldPlugin(), "CoreDataFields");

// REgister metadata fields
// Register metadata fields
registerPlugin(new MetadataFieldPlugin(), "MetadataFields");
}
/**
Expand Down Expand Up @@ -141,6 +141,13 @@ public static ParquetField getParquetField(String fieldType) {
return FIELD_REGISTRY.get(fieldType);
}

/**
* Returns an unmodifiable view of all registered field mappings.
*/
public static Map<String, ParquetField> getRegisteredFields() {
return Collections.unmodifiableMap(FIELD_REGISTRY);
}

public static class RegistryStats {
private final int totalFields;
private final Set<String> allFieldTypes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
* Utility class for creating Apache Arrow schemas from OpenSearch mapper services.
* This class provides methods to convert OpenSearch field mappings into Arrow schema definitions
* that can be used for Parquet data format operations.
*
* <p>Uses {@link ArrowFieldRegistry} to determine which fields are eligible for the
* engine's role via {@code getParquetFieldAfterMatchingRole()}. For non-primary contexts,
* fields not eligible for the role are excluded from the schema.</p>
*/
public final class ArrowSchemaBuilder {

Expand All @@ -38,18 +42,18 @@ private ArrowSchemaBuilder() {
}

/**
* Creates an Apache Arrow Schema from the provided MapperService.
* This method extracts all non-metadata field mappers and converts them to Arrow fields.
* Creates an Apache Arrow Schema from the provided MapperService using the ArrowFieldRegistry.
*
* @param mapperService the OpenSearch mapper service containing field definitions
* @return a new Schema containing Arrow field definitions for all mapped fields
* @param isPrimary whether this is a primary engine context
* @return a new Schema containing Arrow field definitions for all eligible mapped fields
* @throws IllegalArgumentException if mapperService is null
* @throws IllegalStateException if no valid fields are found or if a field type is not supported
* @throws IllegalStateException if no valid fields are found or if a field type is not supported in primary context
*/
public static Schema getSchema(final MapperService mapperService) {
public static Schema getSchema(final MapperService mapperService, boolean isPrimary) {
Objects.requireNonNull(mapperService, "MapperService cannot be null");

final List<Field> fields = extractFieldsFromMappers(mapperService);
final List<Field> fields = extractFieldsFromMappers(mapperService, isPrimary);

if (fields.isEmpty()) {
throw new IllegalStateException("No valid fields found in mapper service");
Expand All @@ -59,25 +63,30 @@ public static Schema getSchema(final MapperService mapperService) {
}

/**
* Extracts Arrow fields from the mapper service, filtering out metadata fields.
* Extracts Arrow fields from the mapper service, filtering out metadata fields
* and fields not eligible for the engine's role.
*
* @param mapperService the mapper service to extract fields from
* @param isPrimary whether this is a primary engine context
* @return a list of Arrow fields
*/
private static List<Field> extractFieldsFromMappers(final MapperService mapperService) {
private static List<Field> extractFieldsFromMappers(final MapperService mapperService, boolean isPrimary) {
final List<Field> fields = new ArrayList<>();

for (final Mapper mapper : mapperService.documentMapper().mappers()) {
if (notSupportedMetadataField(mapper)) {
continue;
}

final Field arrowField = createArrowField(mapper);
fields.add(arrowField);
final Field arrowField = createArrowField(mapper, isPrimary);
if (arrowField != null) {
fields.add(arrowField);
}
}

fields.add(new Field(CompositeDataFormatWriter.ROW_ID, new LongParquetField().getFieldType(), null));
fields.add(new Field(SeqNoFieldMapper.PRIMARY_TERM_NAME, new LongParquetField().getFieldType(), null));
LongParquetField longField = new LongParquetField();
fields.add(new Field(CompositeDataFormatWriter.ROW_ID, longField.getFieldType(), null));
fields.add(new Field(SeqNoFieldMapper.PRIMARY_TERM_NAME, longField.getFieldType(), null));

return fields;
}
Expand All @@ -98,20 +107,27 @@ private static boolean notSupportedMetadataField(final Mapper mapper) {
}

/**
* Creates an Arrow Field from an OpenSearch Mapper.
* Creates an Arrow Field from an OpenSearch Mapper using the ArrowFieldRegistry.
* For non-primary contexts, returns null if the field type has no eligible ParquetField,
* allowing the caller to skip the field. For primary contexts, throws IllegalStateException
* if no ParquetField is found.
*
* @param mapper the mapper to convert
* @return a new Arrow Field
* @throws IllegalStateException if the mapper type is not supported
* @param isPrimary whether this is a primary engine context
* @return a new Arrow Field, or null if the field is not eligible for the role
* @throws IllegalStateException if the mapper type is not supported in primary context
*/
private static Field createArrowField(final Mapper mapper) {
private static Field createArrowField(final Mapper mapper, boolean isPrimary) {
final ParquetField parquetField = ArrowFieldRegistry.getParquetField(mapper.typeName());

if (parquetField == null) {
throw new IllegalStateException(
String.format("Unsupported field type '%s' for field '%s'",
mapper.typeName(), mapper.name())
);
if (isPrimary) {
throw new IllegalStateException(
String.format("Unsupported field type '%s' for field '%s'",
mapper.typeName(), mapper.name())
);
}
return null;
}

return new Field(mapper.name(), parquetField.getFieldType(), null);
Expand Down
Loading
Loading