From 6ca23cd1e570fba34f020583201a23d269a67852 Mon Sep 17 00:00:00 2001
From: ChristofferKristensen
<83878502+ChristofferKristensen@users.noreply.github.com>
Date: Thu, 9 Oct 2025 08:14:11 +0200
Subject: [PATCH 01/16] add support ApacheIcebergSource in Wayang Basic
---
wayang-commons/wayang-basic/pom.xml | 44 +++
.../basic/operators/ApacheIcebergSource.java | 265 ++++++++++++++++++
2 files changed, 309 insertions(+)
create mode 100644 wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
diff --git a/wayang-commons/wayang-basic/pom.xml b/wayang-commons/wayang-basic/pom.xml
index 166a957bf..eea9ae40b 100644
--- a/wayang-commons/wayang-basic/pom.xml
+++ b/wayang-commons/wayang-basic/pom.xml
@@ -34,6 +34,11 @@
This modules represents the base Wayang package with the default operators and functions.
+
+ 1.6.0
+ 3.3.6
+
+
@@ -132,6 +137,45 @@
com.azure
azure-identity
+
+
+
+
+
+ org.apache.iceberg
+ iceberg-core
+ ${iceberg.version}
+
+
+ org.apache.iceberg
+ iceberg-api
+ ${iceberg.version}
+
+
+ org.apache.iceberg
+ iceberg-parquet
+ ${iceberg.version}
+
+
+ org.apache.iceberg
+ iceberg-data
+ ${iceberg.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+
+
+ org.apache.hadoop
+ hadoop-client
+ ${hadoop.version}
+
+
+ org.slf4j
+ slf4j-simple
+ 2.0.16
+
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
new file mode 100644
index 000000000..d88335fb4
--- /dev/null
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
@@ -0,0 +1,265 @@
+package org.apache.wayang.basic.operators;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.basic.types.RecordType;
+import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
+import org.apache.wayang.core.plan.wayangplan.UnarySource;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.fs.FileSystems;
+
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.IcebergGenerics.ScanBuilder;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+
+public class ApacheIcebergSource extends UnarySource {
+
+ private final Logger logger = LogManager.getLogger(this.getClass());
+
+ private Catalog catalog;
+ private String icebergTableName;
+ private String icebergTableFolderBase;
+
+ private List whereExpressions;
+ private Collection columns;
+
+ private org.apache.iceberg.Table cachedTable = null;
+
+ private static final Double defaultSelectivityValue = 0.10;
+
+ public static ApacheIcebergSource create(Catalog catalog, String icebergTableName, String IcebergTableFolderBase,
+ List whereExpressions, Collection columns) {
+ ApacheIcebergSource ApacheIcebergSource = new ApacheIcebergSource(catalog, icebergTableName,
+ IcebergTableFolderBase, whereExpressions, columns);
+ return ApacheIcebergSource;
+ }
+
+ private ApacheIcebergSource(Catalog catalog, String icebergTableName, String IcebergTableFolderBase,
+ List whereExpressions, Collection columns) {
+ super(createOutputDataSetType(columns));
+ this.catalog = catalog;
+ this.icebergTableName = icebergTableName;
+ this.icebergTableFolderBase = IcebergTableFolderBase;
+
+ this.whereExpressions = whereExpressions;
+ this.columns = columns;
+ }
+
+ /**
+ * Copies an instance (exclusive of broadcasts).
+ *
+ * @param that that should be copied
+ */
+ public ApacheIcebergSource(ApacheIcebergSource that) {
+ super(that);
+ this.catalog = that.getCatalog();
+ this.columns = that.getColumns();
+ this.icebergTableName = that.getIcebergTableName();
+ this.icebergTableFolderBase = that.getIcebergTableFolderBase();
+ this.whereExpressions = that.getWhereExpressions();
+ }
+
+ @Override
+ public Optional createCardinalityEstimator(
+ final int outputIndex,
+ final Configuration configuration) {
+ Validate.inclusiveBetween(0, this.getNumOutputs() - 1, outputIndex);
+ return Optional.of(new ApacheIcebergSource.CardinalityEstimator());
+ }
+
+ /**
+ * Custom
+ * {@link org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator} for
+ * {@link FlatMapOperator}s.
+ */
+ protected class CardinalityEstimator implements org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator {
+
+ public final CardinalityEstimate FALLBACK_ESTIMATE = new CardinalityEstimate(1000L, 100000000L, 0.7);
+
+ @Override
+ public CardinalityEstimate estimate(OptimizationContext optimizationContext,
+ CardinalityEstimate... inputEstimates) {
+ Validate.isTrue(ApacheIcebergSource.this.getNumInputs() == inputEstimates.length);
+
+ // see Job for StopWatch measurements
+ final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start(
+ "Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities");
+
+ // Query the job cache first to see if there is already an estimate.
+ String jobCacheKey = String.format("%s.estimate(%s)", this.getClass().getCanonicalName(),
+ ApacheIcebergSource.this.icebergTableName); // ApacheIcebergSource.this.inputUrl);
+ CardinalityEstimate cardinalityEstimate = optimizationContext.queryJobCache(jobCacheKey,
+ CardinalityEstimate.class);
+ if (cardinalityEstimate != null)
+ return cardinalityEstimate;
+
+ // Otherwise calculate the cardinality.
+ // First, inspect the size of the file and its line sizes.
+ OptionalLong fileSize = getFileSize();
+ if (fileSize.isEmpty()) {
+ ApacheIcebergSource.this.logger.warn("Could not determine size of {}... deliver fallback estimate.",
+ ApacheIcebergSource.this.icebergTableName);
+ timeMeasurement.stop();
+ return this.FALLBACK_ESTIMATE;
+ }
+ if (fileSize.getAsLong() == 0L) {
+ timeMeasurement.stop();
+ return new CardinalityEstimate(0L, 0L, 1d);
+ }
+
+ OptionalLong numberRows = ApacheIcebergSource.this.ExtractNumberOfRows();
+
+ if (numberRows.isEmpty()) {
+ ApacheIcebergSource.this.logger
+ .warn("Could not determine the cardinality of {}... deliver fallback estimate.", ApacheIcebergSource.this.icebergTableName);
+ timeMeasurement.stop();
+ return this.FALLBACK_ESTIMATE;
+ }
+
+ long rowCount = numberRows.getAsLong();
+ cardinalityEstimate = new CardinalityEstimate(rowCount, rowCount, 1d);
+
+ // Cache the result, so that it will not be recalculated again.
+ optimizationContext.putIntoJobCache(jobCacheKey, cardinalityEstimate);
+
+ timeMeasurement.stop();
+ return cardinalityEstimate;
+ }
+ }
+
+ private static DataSetType createOutputDataSetType(Collection columnNames) {
+ String[] columnNamesAsArray = (String[]) columnNames.toArray();
+ return columnNamesAsArray.length == 0 ? DataSetType.createDefault(Record.class)
+ : DataSetType.createDefault(new RecordType(columnNamesAsArray));
+ }
+
+ public Catalog getCatalog() {
+ return catalog;
+ }
+
+ public String getIcebergTableName() {
+ return icebergTableName;
+ }
+
+ public String getIcebergTableFolderBase() {
+ return icebergTableFolderBase;
+ }
+
+ public List getWhereExpressions() {
+ return whereExpressions;
+ }
+
+ public Collection getColumns() {
+ return columns;
+ }
+
+ private void setCachedTable(Table table) {
+ this.cachedTable = table;
+ }
+
+ // private void setCachedScanBuilder(ScanBuilder scanBuilder) {
+ // this.cachedScanBuilder = scanBuilder;
+ // }
+
+ private org.apache.iceberg.Table getTable() {
+ if (this.cachedTable != null) {
+ return this.cachedTable;
+ }
+
+ TableIdentifier tableId = TableIdentifier.of(getIcebergTableFolderBase(), getIcebergTableName());
+ Table table = this.catalog.loadTable(tableId);
+ setCachedTable(table);
+ return table;
+ }
+
+ // private ScanBuilder getScanBuilder() {
+ // if (this.cachedScanBuilder != null) {
+ // return cachedScanBuilder;
+ // }
+ // ScanBuilder scanBuilder = IcebergGenerics.read(getTable());
+ // if (this.whereExpressions.size() > 0) {
+ // for (Expression whereExpression : this.whereExpressions) {
+ // scanBuilder = scanBuilder.where(whereExpression);
+ // }
+ // }
+ // setCachedScanBuilder(scanBuilder);
+ // return scanBuilder;
+ // }
+
+ // does not account if a filter is needed
+ private OptionalLong ExtractNumberOfRows() {
+ try {
+ long rowCount = 0L;
+
+ Table table = getTable();
+
+ try (CloseableIterable tasks = table.newScan().planFiles()) {
+ for (FileScanTask fileScanTask : tasks) {
+ rowCount += fileScanTask.estimatedRowsCount();
+ }
+ }
+
+ if (rowCount == 0) {
+ return OptionalLong.empty();
+ }
+
+ //if there are any filter conditions applied we use the formula nr of rows = total nr of rows * (1 / number of distinct values) -> where distinct values is
+ //in this case calculated by a default number as we do not have acccess to distinct values without reading the entire table
+ if (this.whereExpressions != null && this.whereExpressions.size() > 1) {
+ //the selectivity is set to 0,10 for each where expression. If there is 1 the seelctivty will be 0,10 if there is 20 it will be 0,20 and etc.
+ //Ask zoi about this!
+
+ Double updatedRowCount = rowCount * defaultSelectivityValue;
+ return OptionalLong.of(updatedRowCount.longValue());
+ }
+
+ return OptionalLong.of(rowCount);
+
+ } catch (Exception e) {
+ this.logger.warn("Could not extract the number of rows. Returning empty. Got erro: " + e);
+ return OptionalLong.empty();
+ }
+
+ }
+
+ private OptionalLong getFileSize() {
+
+ try {
+ long fileSizeCount = 0L;
+ try (CloseableIterable tasks = getTable().newScan().planFiles()) {
+ for (FileScanTask t : tasks) {
+ fileSizeCount += t.file().fileSizeInBytes();
+ }
+ }
+ return OptionalLong.of(fileSizeCount);
+
+ } catch (Exception e) {
+ this.logger.warn("Could not get file size. Returning empty. Got error: " + e);
+ return OptionalLong.empty();
+
+ }
+
+ }
+
+
+}
From 3c2f21bb7dd1ddeaacdd7917607ebaa82954451d Mon Sep 17 00:00:00 2001
From: ChristofferKristensen
<83878502+ChristofferKristensen@users.noreply.github.com>
Date: Sat, 11 Oct 2025 11:16:24 +0200
Subject: [PATCH 02/16] Add support for a Basic ApacheIcebergSource
---
.../basic/operators/ApacheIcebergSource.java | 173 ++++++++++++------
1 file changed, 115 insertions(+), 58 deletions(-)
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
index d88335fb4..bc7cfb2bb 100644
--- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.wayang.basic.operators;
import org.apache.logging.log4j.LogManager;
@@ -7,12 +25,10 @@
import org.apache.wayang.basic.types.RecordType;
import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
import org.apache.wayang.core.api.Configuration;
-import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.plan.wayangplan.UnarySource;
import org.apache.wayang.core.types.DataSetType;
-import org.apache.wayang.core.util.fs.FileSystems;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -21,8 +37,6 @@
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
-import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
@@ -32,13 +46,18 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
+/**
+ * This source reads an Iceberg Table and outputs the lines as
+ * {@link org.apache.wayang.basic.data.Record}
+ * units.
+ */
public class ApacheIcebergSource extends UnarySource {
private final Logger logger = LogManager.getLogger(this.getClass());
- private Catalog catalog;
- private String icebergTableName;
- private String icebergTableFolderBase;
+ private final Catalog catalog;
+
+ private final TableIdentifier tableIdentifier;
private List whereExpressions;
private Collection columns;
@@ -47,19 +66,29 @@ public class ApacheIcebergSource extends UnarySource {
private static final Double defaultSelectivityValue = 0.10;
- public static ApacheIcebergSource create(Catalog catalog, String icebergTableName, String IcebergTableFolderBase,
- List whereExpressions, Collection columns) {
- ApacheIcebergSource ApacheIcebergSource = new ApacheIcebergSource(catalog, icebergTableName,
- IcebergTableFolderBase, whereExpressions, columns);
- return ApacheIcebergSource;
+ /**
+ * Creates a new Iceberg source instance.
+ *
+ * @param catalog Iceberg catalog used to load the table
+ * @param tableIdentifier identifier of the target table
+ * @param whereExpressions list of
+ * {@link org.apache.iceberg.expressions.Expression}
+ * filters; empty list for none
+ * @param columns collection of column names to project; empty list for
+ * all columns
+ * @return a new {@link ApacheIcebergSource} instance
+ */
+ public static ApacheIcebergSource create(Catalog catalog, TableIdentifier tableIdentifier,
+ List whereExpressions,
+ Collection columns) {
+ return new ApacheIcebergSource(catalog, tableIdentifier, whereExpressions, columns);
}
- private ApacheIcebergSource(Catalog catalog, String icebergTableName, String IcebergTableFolderBase,
- List whereExpressions, Collection columns) {
+ private ApacheIcebergSource(Catalog catalog, TableIdentifier tableIdentifier,
+ List whereExpressions, Collection columns) {
super(createOutputDataSetType(columns));
this.catalog = catalog;
- this.icebergTableName = icebergTableName;
- this.icebergTableFolderBase = IcebergTableFolderBase;
+ this.tableIdentifier = tableIdentifier;
this.whereExpressions = whereExpressions;
this.columns = columns;
@@ -74,8 +103,7 @@ public ApacheIcebergSource(ApacheIcebergSource that) {
super(that);
this.catalog = that.getCatalog();
this.columns = that.getColumns();
- this.icebergTableName = that.getIcebergTableName();
- this.icebergTableFolderBase = that.getIcebergTableFolderBase();
+ this.tableIdentifier = that.getTableIdentifier();
this.whereExpressions = that.getWhereExpressions();
}
@@ -87,11 +115,6 @@ public Optional createOutputDataSetType(Collection columnNames) {
String[] columnNamesAsArray = (String[]) columnNames.toArray();
return columnNamesAsArray.length == 0 ? DataSetType.createDefault(Record.class)
@@ -157,12 +191,8 @@ public Catalog getCatalog() {
return catalog;
}
- public String getIcebergTableName() {
- return icebergTableName;
- }
-
- public String getIcebergTableFolderBase() {
- return icebergTableFolderBase;
+ public TableIdentifier getTableIdentifier() {
+ return tableIdentifier;
}
public List getWhereExpressions() {
@@ -177,36 +207,62 @@ private void setCachedTable(Table table) {
this.cachedTable = table;
}
- // private void setCachedScanBuilder(ScanBuilder scanBuilder) {
- // this.cachedScanBuilder = scanBuilder;
- // }
+ /**
+ * Returns the Iceberg table name.
+ *
+ * @return the table name from the
+ * {@link org.apache.iceberg.catalog.TableIdentifier}
+ */
+ public String getIcebergTableName() {
+ return tableIdentifier.name();
+ }
+ /**
+ * Loads and returns the Iceberg {@link org.apache.iceberg.Table}.
+ * Uses a cached instance if available.
+ *
+ * @return the loaded Iceberg table
+ */
private org.apache.iceberg.Table getTable() {
if (this.cachedTable != null) {
return this.cachedTable;
}
- TableIdentifier tableId = TableIdentifier.of(getIcebergTableFolderBase(), getIcebergTableName());
- Table table = this.catalog.loadTable(tableId);
+ Table table = this.catalog.loadTable(this.tableIdentifier);
setCachedTable(table);
return table;
}
- // private ScanBuilder getScanBuilder() {
- // if (this.cachedScanBuilder != null) {
- // return cachedScanBuilder;
- // }
- // ScanBuilder scanBuilder = IcebergGenerics.read(getTable());
- // if (this.whereExpressions.size() > 0) {
- // for (Expression whereExpression : this.whereExpressions) {
- // scanBuilder = scanBuilder.where(whereExpression);
- // }
- // }
- // setCachedScanBuilder(scanBuilder);
- // return scanBuilder;
- // }
-
- // does not account if a filter is needed
+ /**
+ * Builds a {@link org.apache.iceberg.data.ScanBuilder} for the current table.
+ * Applies selected columns and filter expressions if provided.
+ *
+ * @return configured {@link org.apache.iceberg.data.ScanBuilder}
+ */
+ public ScanBuilder getScanBuilder() {
+
+ ScanBuilder scanBuilder = IcebergGenerics.read(getTable());
+
+ if (this.columns != null && this.columns.size() > 0) {
+ scanBuilder = scanBuilder.select(columns);
+ }
+
+ if (this.whereExpressions != null && this.whereExpressions.size() > 0) {
+ for (Expression whereExpression : this.whereExpressions) {
+ scanBuilder = scanBuilder.where(whereExpression);
+ }
+ }
+
+ return scanBuilder;
+ }
+
+ /**
+ * Estimates the number of rows in the table.
+ * Applies a selectivity adjustment if filter expressions are present.
+ *
+ * @return estimated number of rows, or {@link OptionalLong#empty()} if
+ * unavailable
+ */
private OptionalLong ExtractNumberOfRows() {
try {
long rowCount = 0L;
@@ -223,13 +279,9 @@ private OptionalLong ExtractNumberOfRows() {
return OptionalLong.empty();
}
- //if there are any filter conditions applied we use the formula nr of rows = total nr of rows * (1 / number of distinct values) -> where distinct values is
- //in this case calculated by a default number as we do not have acccess to distinct values without reading the entire table
if (this.whereExpressions != null && this.whereExpressions.size() > 1) {
- //the selectivity is set to 0,10 for each where expression. If there is 1 the seelctivty will be 0,10 if there is 20 it will be 0,20 and etc.
- //Ask zoi about this!
- Double updatedRowCount = rowCount * defaultSelectivityValue;
+ Double updatedRowCount = rowCount * Math.pow(defaultSelectivityValue, this.whereExpressions.size());
return OptionalLong.of(updatedRowCount.longValue());
}
@@ -242,6 +294,12 @@ private OptionalLong ExtractNumberOfRows() {
}
+ /**
+ * Calculates the total file size in bytes of all table files.
+ *
+ * @return total file size in bytes, or {@link OptionalLong#empty()} if
+ * unavailable
+ */
private OptionalLong getFileSize() {
try {
@@ -261,5 +319,4 @@ private OptionalLong getFileSize() {
}
-
}
From b1d311f5ddb852ed463b1705376fbef208cfbfe4 Mon Sep 17 00:00:00 2001
From: ChristofferKristensen
<83878502+ChristofferKristensen@users.noreply.github.com>
Date: Sat, 11 Oct 2025 11:17:05 +0200
Subject: [PATCH 03/16] Add implementation for a JavaApacheIcergSource that
allows to read from an Apache Iceberg table
---
.../operators/JavaApacheIcebergSource.java | 163 ++++++++++++++++++
1 file changed, 163 insertions(+)
create mode 100644 wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java
new file mode 100644
index 000000000..f97abd220
--- /dev/null
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.java.operators;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics.ScanBuilder;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.wayang.basic.operators.ApacheIcebergSource;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.java.channels.StreamChannel;
+import org.apache.wayang.java.execution.JavaExecutor;
+
+/**
+ * This is execution operator implements the {@link ApacheIcebergSource}.
+ */
+public class JavaApacheIcebergSource extends ApacheIcebergSource implements JavaExecutionOperator {
+
+ /**
+ * Creates a new Java Iceberg source instance.
+ *
+ * @param catalog {@link org.apache.iceberg.catalog.Catalog} catalog
+ * used to load the table
+ * @param tableIdentifier {@linkorg.apache.iceberg.catalog.TableIdentifier} identifier
+ * of the target table
+ * @param whereExpressions list of
+ * {@link org.apache.iceberg.expressions.Expression}
+ * filters; empty list for none
+ * @param columns collection of column names to project; empty list for
+ * all columns
+ * @return a new {@link JavaApacheIcebergSource} instance
+ */
+
+ public JavaApacheIcebergSource(Catalog catalog, TableIdentifier tableIdentifier,
+ List whereExpressions, Collection columns) {
+ super(ApacheIcebergSource.create(catalog, tableIdentifier, whereExpressions, columns));
+ }
+
+ // TODO make support for parallel?
+ /**
+ * Creates a {@link Stream} of {@link org.apache.wayang.basic.data.Record}
+ * objects
+ * from the given Iceberg {@link org.apache.iceberg.data.ScanBuilder}.
+ *
+ * @param scanBuilder the configured {@link org.apache.iceberg.data.ScanBuilder}
+ * used to read table data
+ * @return a sequential {@link Stream} of Wayang
+ * {@link org.apache.wayang.basic.data.Record} instances
+ */
+ private static Stream getStreamFromIcebergTable(ScanBuilder scanBuilder) {
+ return StreamSupport.stream(scanBuilder.build().spliterator(), false)
+ .map(record -> getWayangRecord(record));
+ }
+
+ /**
+ * Converts an Iceberg {@link org.apache.iceberg.data.Record} to a Wayang
+ * {@link org.apache.wayang.basic.data.Record}.
+ *
+ * @param icebergRecord the Iceberg record to convert
+ * @return a new Wayang {@link org.apache.wayang.basic.data.Record} containing
+ * the same field values
+ */
+ private static org.apache.wayang.basic.data.Record getWayangRecord(org.apache.iceberg.data.Record icebergRecord) {
+ Object[] values = new Object[icebergRecord.size()];
+ for (int i = 0; i < values.length; i++) {
+ values[i] = icebergRecord.get(i);
+ }
+ return new org.apache.wayang.basic.data.Record(values);
+ }
+
+ @Override
+ public Tuple, Collection> evaluate(
+ ChannelInstance[] inputs,
+ ChannelInstance[] outputs,
+ JavaExecutor javaExecutor,
+ OptimizationContext.OperatorContext operatorContext) {
+
+ assert inputs.length == this.getNumInputs();
+ assert outputs.length == this.getNumOutputs();
+
+ String tableName = this.getIcebergTableName();
+
+ try {
+
+ ScanBuilder scanBuilder = this.getScanBuilder();
+
+ ((StreamChannel.Instance) outputs[0]).accept(getStreamFromIcebergTable(scanBuilder));
+
+ ExecutionLineageNode prepareLineageNode = new ExecutionLineageNode(operatorContext);
+ prepareLineageNode.add(LoadProfileEstimators.createFromSpecification(
+ "wayang.java.parquetsource.load.prepare", javaExecutor.getConfiguration()));
+ ExecutionLineageNode mainLineageNode = new ExecutionLineageNode(operatorContext);
+ mainLineageNode.add(LoadProfileEstimators.createFromSpecification(
+ "wayang.java.parquetsource.load.main", javaExecutor.getConfiguration()));
+
+ outputs[0].getLineage().addPredecessor(mainLineageNode);
+
+ return prepareLineageNode.collectAndMark();
+
+ } catch (Exception e) {
+ throw new WayangException(String.format("Reading from Apache Iceberg Source table %s failed.", tableName),
+ e);
+
+ }
+
+ }
+
+ /**
+ * Copies an instance (exclusive of broadcasts).
+ *
+ * @param that that should be copied
+ */
+ public JavaApacheIcebergSource(ApacheIcebergSource that) {
+ super(that);
+ }
+
+ @Override
+ public Collection getLoadProfileEstimatorConfigurationKeys() {
+ return Arrays.asList("wayang.java.apacheicebergsource.load.prepare",
+ "wayang.java.apacheicebergsource.load.main");
+ }
+
+ @Override
+ public List getSupportedInputChannels(int index) {
+ throw new UnsupportedOperationException(String.format("%s does not have input channels.", this));
+ }
+
+ @Override
+ public List getSupportedOutputChannels(int index) {
+ assert index <= this.getNumOutputs() || (index == 0 && this.getNumOutputs() == 0);
+ return Collections.singletonList(StreamChannel.DESCRIPTOR);
+ }
+
+}
From cee5f6959cfc2043e1f3c3556d03700632580f93 Mon Sep 17 00:00:00 2001
From: ChristofferKristensen
<83878502+ChristofferKristensen@users.noreply.github.com>
Date: Sat, 11 Oct 2025 11:17:25 +0200
Subject: [PATCH 04/16] Add support for a basic ApacheIcebergSink that allows
to write data to an Iceberg Table location
---
.../basic/operators/ApacheIcebergSink.java | 104 ++++++++++++++++++
1 file changed, 104 insertions(+)
create mode 100644 wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSink.java
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSink.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSink.java
new file mode 100644
index 000000000..f333b39da
--- /dev/null
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSink.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.basic.operators;
+
+import org.apache.wayang.core.plan.wayangplan.UnarySink;
+import org.apache.wayang.core.types.DataSetType;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.FileFormat;
+
+/**
+ * This {@link UnarySink} writes all incoming data quanta to an iceberg table.
+ * Either if the table does not exists it will create new, otherwise append.
+ *
+ * @param Data Type if the incoming Data Quanta
+ */
+public class ApacheIcebergSink extends UnarySink {
+
+ protected final Catalog catalog;
+ protected final Schema schema;
+ protected final TableIdentifier tableIdentifier;
+ protected final Class tClass;
+
+ /**
+ * Creates a new sink.
+ *
+ * @param catalog Iceberg catalog used to resolve the target table; must
+ * not be {@code null}
+ * @param schema Iceberg write schema; must be compatible with the
+ * target table
+ * @param tableIdentifier fully qualified identifier of the target table
+ * @param type {@link DataSetType} of the incoming data quanta
+ */
+ public ApacheIcebergSink(Catalog catalog, Schema schema, TableIdentifier tableIdentifier,
+ DataSetType type) {
+
+ super(type);
+ this.catalog = catalog;
+ this.schema = schema;
+ this.tableIdentifier = tableIdentifier;
+ this.tClass = type.getDataUnitType().getTypeClass();
+
+ }
+
+ /**
+ * Creates a new sink.
+ *
+ * @param catalog Iceberg catalog used to resolve the target table; must
+ * not be {@code null}
+ * @param schema Iceberg write schema; must be compatible with the
+ * target table
+ * @param tableIdentifier fully qualified identifier of the target table
+ * @param typeClass {@link Class} of incoming data quanta
+ */
+ public ApacheIcebergSink(Catalog catalog, Schema schema, TableIdentifier tableIdentifier,
+ Class typeClass) {
+ super(DataSetType.createDefault(typeClass));
+ this.catalog = catalog;
+ this.schema = schema;
+ this.tableIdentifier = tableIdentifier;
+ this.tClass = typeClass;
+ }
+
+ /**
+ * Creates a copied instance.
+ *
+ * @param that should be copied
+ */
+ public ApacheIcebergSink(ApacheIcebergSink that) {
+ super(that);
+ this.catalog = that.catalog;
+ this.schema = that.schema;
+ this.tableIdentifier = that.tableIdentifier;
+ this.tClass = that.tClass;
+ }
+
+ /**
+ * Returns the {@link FileFormat} used for writing.
+ * Subclasses should override this method to specify a concrete format.
+ *
+ * @return the file format
+ * @throws UnsupportedOperationException if not overridden
+ */
+ public FileFormat getFileFormat() {
+ throw new UnsupportedOperationException("Subclasses must implement getFileFormat().");
+ }
+}
From 736f4cd769fc72075f9ff50dacc2898b24bdc745 Mon Sep 17 00:00:00 2001
From: ChristofferKristensen
<83878502+ChristofferKristensen@users.noreply.github.com>
Date: Sat, 11 Oct 2025 11:18:09 +0200
Subject: [PATCH 05/16] Add implementation for a JavaApacheIcebergSink that
allows to write Wayang Records to an Iceberg location for the Java Platform
---
.../java/operators/JavaApacheIcebergSink.java | 201 ++++++++++++++++++
1 file changed, 201 insertions(+)
create mode 100644 wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSink.java
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSink.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSink.java
new file mode 100644
index 000000000..c7d16e478
--- /dev/null
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSink.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.java.operators;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.data.Record;
+
+import org.apache.wayang.basic.operators.ApacheIcebergSink;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.plan.wayangplan.Operator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.java.channels.CollectionChannel;
+import org.apache.wayang.java.channels.JavaChannelInstance;
+import org.apache.wayang.java.channels.StreamChannel;
+import org.apache.wayang.java.execution.JavaExecutor;
+import org.apache.wayang.java.platform.JavaPlatform;
+
+/**
+ * {@link Operator} for the {@link JavaPlatform} that creates an Iceberg Table.
+ * If the table does not exists it will create a new, otherwise it will append
+ * to a table
+ * {@code R} Is the input type of the incoming Data Stream. Must be a
+ * {@link org.apache.wayang.basic.data.Record} (or subclass).
+ */
+
+public class JavaApacheIcebergSink extends ApacheIcebergSink
+ implements JavaExecutionOperator {
+
+ // private final Logger logger = LogManager.getLogger(this.getClass());
+ private final int defaultPartitionId = 1;
+ private final int defaultTaskId = 1;
+ private FileFormat fileFormat = FileFormat.PARQUET;
+
+ /**
+ * Creates a new sink for the Java Platform.
+ *
+ * @param catalog Iceberg catalog used to resolve the target table; must
+ * not be {@code null}
+ * @param schema Iceberg write schema; must be compatible with the
+ * target table
+ * @param tableIdentifier fully qualified identifier of the target table
+ * @param fileFormat file format used for writing (e.g., Parquet, Avro)
+ * @param type {@link DataSetType} of the incoming data quanta
+ */
+ public JavaApacheIcebergSink(Catalog catalog, Schema schema, TableIdentifier tableIdentifier, FileFormat fileFormat,
+ DataSetType type) {
+ super(catalog, schema, tableIdentifier, type);
+ this.fileFormat = fileFormat;
+ }
+
+ /**
+ * Creates a new sink for the Java Platform using Parquet as the default file
+ * format.
+ *
+ * @param catalog Iceberg catalog used to resolve the target table; must
+ * not be {@code null}
+ * @param schema Iceberg write schema; must be compatible with the
+ * target table
+ * @param tableIdentifier fully qualified identifier of the target table
+ * @param type {@link DataSetType} of the incoming data quanta
+ */
+ public JavaApacheIcebergSink(Catalog catalog, Schema schema, TableIdentifier tableIdentifier, DataSetType type) {
+ super(catalog, schema, tableIdentifier, type);
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param that sink instance to copy
+ */
+
+ public JavaApacheIcebergSink(ApacheIcebergSink that) {
+ super(that);
+ this.fileFormat = that.getFileFormat();
+ }
+
+ private boolean tableExists() {
+ return catalog.tableExists(tableIdentifier);
+ }
+
+ @Override
+ public FileFormat getFileFormat() {
+ return this.fileFormat;
+ }
+
+ @Override
+ public Tuple, Collection> evaluate(
+ ChannelInstance[] inputs,
+ ChannelInstance[] outputs,
+ JavaExecutor javaExecutor,
+ OptimizationContext.OperatorContext operatorContext) {
+
+ try {
+
+ assert inputs.length == 1;
+ assert outputs.length == 2;
+
+ JavaChannelInstance input = (JavaChannelInstance) inputs[0];
+
+ if (!tableExists()) {
+ catalog.createTable(tableIdentifier, schema);
+ }
+
+ Stream inputStream = input
+ .provideStream()
+ .map(r -> wayangRecordToIcebergRecord(r));
+
+ Table table = catalog.loadTable(tableIdentifier);
+ OutputFileFactory outputFileFactory = OutputFileFactory
+ .builderFor(table, this.defaultPartitionId, this.defaultTaskId)
+ .format(fileFormat)
+ .build();
+
+ // TODO HOW SHOULD WE PASS DOWN PARTITIONS OR USE PARTITIONS?
+ EncryptedOutputFile outputFile = outputFileFactory.newOutputFile();
+
+ FileAppenderFactory appenderFactory = new org.apache.iceberg.data.GenericAppenderFactory(
+ this.schema);
+
+ // TODO ADD SUPPORT FOR PARITTION ALSO
+ try (DataWriter writer = appenderFactory.newDataWriter(outputFile,
+ fileFormat, /* Partition */null)) {
+
+ inputStream.forEach(dataQuanta -> {
+ writer.write(dataQuanta);
+ });
+
+ }
+
+ } catch (Exception e) {
+ throw new WayangException("Coult not write stream to iceberg location.", e);
+ }
+
+ return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext);
+
+ }
+
+ @Override
+ public String getLoadProfileEstimatorConfigurationKey() {
+ return "wayang.java.apacheicebergsink.load";
+ }
+
+ @Override
+ public List getSupportedInputChannels(int index) {
+ return Arrays.asList(CollectionChannel.DESCRIPTOR, StreamChannel.DESCRIPTOR);
+ }
+
+ @Override
+ public List getSupportedOutputChannels(int index) {
+ throw new UnsupportedOperationException();
+ }
+
+ private org.apache.iceberg.data.Record wayangRecordToIcebergRecord(R wayangRecord) {
+ GenericRecord template = GenericRecord.create(this.schema);
+ Record out = template.copy();
+
+ int n = this.schema.columns().size();
+ for (int i = 0; i < n; i++) {
+ Object v = wayangRecord.getField(i);
+ out.set(i, v);
+ }
+ return out;
+ };
+
+}
From 6fd5874bcd1188c374140de0cb6caa3840945b41 Mon Sep 17 00:00:00 2001
From: ChristofferKristensen
<83878502+ChristofferKristensen@users.noreply.github.com>
Date: Sat, 11 Oct 2025 12:01:43 +0200
Subject: [PATCH 06/16] refactor whereExpression to be of type Collection and
not List
---
.../wayang/basic/operators/ApacheIcebergSource.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
index bc7cfb2bb..139f1bcb7 100644
--- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
@@ -59,7 +59,7 @@ public class ApacheIcebergSource extends UnarySource {
private final TableIdentifier tableIdentifier;
- private List whereExpressions;
+ private Collection whereExpressions;
private Collection columns;
private org.apache.iceberg.Table cachedTable = null;
@@ -79,13 +79,13 @@ public class ApacheIcebergSource extends UnarySource {
* @return a new {@link ApacheIcebergSource} instance
*/
public static ApacheIcebergSource create(Catalog catalog, TableIdentifier tableIdentifier,
- List whereExpressions,
+ Collection whereExpressions,
Collection columns) {
return new ApacheIcebergSource(catalog, tableIdentifier, whereExpressions, columns);
}
- private ApacheIcebergSource(Catalog catalog, TableIdentifier tableIdentifier,
- List whereExpressions, Collection columns) {
+ public ApacheIcebergSource(Catalog catalog, TableIdentifier tableIdentifier,
+ Collection whereExpressions, Collection columns) {
super(createOutputDataSetType(columns));
this.catalog = catalog;
this.tableIdentifier = tableIdentifier;
@@ -195,7 +195,7 @@ public TableIdentifier getTableIdentifier() {
return tableIdentifier;
}
- public List getWhereExpressions() {
+ public Collection getWhereExpressions() {
return whereExpressions;
}
From 9597d52d5d66ef8327c633daf35ffb117f4e6f25 Mon Sep 17 00:00:00 2001
From: ChristofferKristensen
<83878502+ChristofferKristensen@users.noreply.github.com>
Date: Sat, 11 Oct 2025 12:02:03 +0200
Subject: [PATCH 07/16] Add mappings for iceberg source and sinks
---
.../mapping/ApacheIcebergSinkMapping.java | 74 +++++++++++++++++++
.../mapping/ApacheIcebergSourceMapping.java | 67 +++++++++++++++++
2 files changed, 141 insertions(+)
create mode 100644 wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSinkMapping.java
create mode 100644 wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSourceMapping.java
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSinkMapping.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSinkMapping.java
new file mode 100644
index 000000000..08a2d53b1
--- /dev/null
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSinkMapping.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.java.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.wayang.basic.operators.ApacheIcebergSink;
+import org.apache.wayang.basic.operators.TextFileSink;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.plan.wayangplan.OperatorBase;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.java.operators.JavaApacheIcebergSink;
+import org.apache.wayang.java.operators.JavaTextFileSink;
+import org.apache.wayang.java.platform.JavaPlatform;
+
+/**
+ * Mapping from {@link ApacheIcebergSink} to
+ * {@link JavaApacheIcebergSink}.
+ */
+public class ApacheIcebergSinkMapping implements Mapping {
+
+ @Override
+ public Collection getTransformations() {
+ return Collections.singleton(new PlanTransformation(
+ this.createSubplanPattern(),
+ this.createReplacementSubplanFactory(),
+ JavaPlatform.getInstance()));
+ }
+
+ private SubplanPattern createSubplanPattern() {
+ ApacheIcebergSink icebergSink = new ApacheIcebergSink<>(null, null, null,
+ DataSetType.none().getDataUnitType().getTypeClass());
+ final OperatorPattern operatorPattern = new OperatorPattern<>(
+ "sink", icebergSink, false);
+ return SubplanPattern.createSingleton(operatorPattern);
+ }
+
+ private ReplacementSubplanFactory createReplacementSubplanFactory() {
+ return new ReplacementSubplanFactory.OfSingleOperators>(
+ (matchedOperator, epoch) -> copyAny(matchedOperator).at(epoch));
+ }
+
+ private static JavaApacheIcebergSink> copyAny(ApacheIcebergSink> that) {
+ return copyUnchecked(that);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static JavaApacheIcebergSink copyUnchecked(
+ ApacheIcebergSink> that) {
+ return new JavaApacheIcebergSink<>((ApacheIcebergSink) that);
+ }
+
+}
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSourceMapping.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSourceMapping.java
new file mode 100644
index 000000000..996540492
--- /dev/null
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSourceMapping.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.java.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.wayang.basic.operators.ApacheIcebergSource;
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.java.operators.JavaApacheIcebergSource;
+import org.apache.wayang.java.platform.JavaPlatform;
+import org.apache.iceberg.expressions.Expression;
+
+
+/**
+ * Mapping from {@link ApahceIcebergSource} to {@link JavaApacheIcebergSource}.
+ */
+public class ApacheIcebergSourceMapping implements Mapping {
+
+
+ @Override
+ public Collection getTransformations() {
+ return Collections.singleton(new PlanTransformation(
+ this.createSubplanPattern(),
+ this.createReplacementSubplanFactory(),
+ JavaPlatform.getInstance()
+ ));
+ }
+
+ private SubplanPattern createSubplanPattern() {
+ ApacheIcebergSource icebergSource = new ApacheIcebergSource((Catalog) null, (TableIdentifier) null, (Collection) null, (Collection) null);
+
+ final OperatorPattern operatorPattern = new OperatorPattern(
+ "source", new ApacheIcebergSource(icebergSource), false
+ );
+ return SubplanPattern.createSingleton(operatorPattern);
+ }
+
+ private ReplacementSubplanFactory createReplacementSubplanFactory() {
+ return new ReplacementSubplanFactory.OfSingleOperators(
+ (matchedOperator, epoch) -> new JavaApacheIcebergSource(matchedOperator).at(epoch)
+ );
+ }
+
+}
From f720435bd0abf0a6c157e8d169a092287ca2e1c9 Mon Sep 17 00:00:00 2001
From: ChristofferKristensen
<83878502+ChristofferKristensen@users.noreply.github.com>
Date: Thu, 23 Oct 2025 09:49:34 +0200
Subject: [PATCH 08/16] Suport for Source iceberg Tables
---
.../scala/org/apache/wayang/api/JavaPlanBuilder.scala | 10 +++++++++-
.../main/scala/org/apache/wayang/api/PlanBuilder.scala | 9 ++++++++-
.../wayang/basic/operators/ApacheIcebergSource.java | 8 +++++---
.../java/org/apache/wayang/java/mapping/Mappings.java | 4 +++-
.../wayang/java/operators/JavaApacheIcebergSource.java | 4 ++--
5 files changed, 27 insertions(+), 8 deletions(-)
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
index d1f9a118a..00d54b5e3 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
@@ -25,7 +25,7 @@ import java.util.{Collection => JavaCollection}
import org.apache.commons.lang3.Validate
import org.apache.wayang.api.util.DataQuantaBuilderCache
import org.apache.wayang.basic.data.Record
-import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, GoogleCloudStorageSource, KafkaTopicSource, ParquetSource, TableSource, TextFileSource}
+import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, GoogleCloudStorageSource, KafkaTopicSource, ParquetSource, TableSource, TextFileSource, ApacheIcebergSource}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.WayangContext
import org.apache.wayang.core.plan.wayangplan._
@@ -72,6 +72,14 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
def readParquet(url: String, projection: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
createSourceBuilder(ParquetSource.create(url, projection))(ClassTag(classOf[Record]))
+
+ def readApacheIcebergTable(
+ catalog: org.apache.iceberg.catalog.Catalog,
+ tableIdentifier: org.apache.iceberg.catalog.TableIdentifier,
+ filterExpressions: Array[org.apache.iceberg.expressions.Expression] = null,
+ projectionColumns: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
+ createSourceBuilder(ApacheIcebergSource.create(catalog, tableIdentifier, filterExpressions, projectionColumns))(ClassTag(classOf[Record]))
+
/**
* Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line.
*
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
index 648755492..151f2021d 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
@@ -24,7 +24,7 @@ package org.apache.wayang.api
import org.apache.commons.lang3.Validate
import org.apache.wayang.api
import org.apache.wayang.basic.data.Record
-import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, CollectionSource, GoogleCloudStorageSource, ObjectFileSource, ParquetSource, TableSource, TextFileSource}
+import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, CollectionSource, GoogleCloudStorageSource, ObjectFileSource, ParquetSource, TableSource, TextFileSource, ApacheIcebergSource}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.WayangContext
import org.apache.wayang.core.plan.wayangplan._
@@ -140,6 +140,13 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job
*/
def readParquet(url: String, projection: Array[String] = null): DataQuanta[Record] = load(ParquetSource.create(url, projection))
+
+ def readApacheIcebergTable(
+ catalog: org.apache.iceberg.catalog.Catalog,
+ tableIdentifier: org.apache.iceberg.catalog.TableIdentifier,
+ filterExpressions: Array[org.apache.iceberg.expressions.Expression] = null,
+ projectionColumns: Array[String] = null): DataQuanta[Record] = load(ApacheIcebergSource.create(catalog, tableIdentifier, filterExpressions, projectionColumns))
+
/**
* Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line.
*
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
index 139f1bcb7..df8e3b022 100644
--- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
@@ -37,6 +37,7 @@
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
@@ -79,9 +80,10 @@ public class ApacheIcebergSource extends UnarySource {
* @return a new {@link ApacheIcebergSource} instance
*/
public static ApacheIcebergSource create(Catalog catalog, TableIdentifier tableIdentifier,
- Collection whereExpressions,
- Collection columns) {
- return new ApacheIcebergSource(catalog, tableIdentifier, whereExpressions, columns);
+ org.apache.iceberg.expressions.Expression[] whereExpressions,
+ String[] columns) {
+
+ return new ApacheIcebergSource(catalog, tableIdentifier, Arrays.asList(whereExpressions), Arrays.asList(columns));
}
public ApacheIcebergSource(Catalog catalog, TableIdentifier tableIdentifier,
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java
index dfdeca43e..a4418c18a 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java
@@ -62,7 +62,9 @@ public class Mappings {
new KafkaTopicSinkMapping(),
new AmazonS3SourceMapping(),
new GoogleCloudStorageSourceMapping(),
- new AzureBlobStorageSourceMapping()
+ new AzureBlobStorageSourceMapping(),
+ new ApacheIcebergSourceMapping(),
+ new ApacheIcebergSinkMapping()
);
public static Collection GRAPH_MAPPINGS = Arrays.asList(
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java
index f97abd220..1a489a47f 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java
@@ -52,7 +52,7 @@ public class JavaApacheIcebergSource extends ApacheIcebergSource implements Java
* used to load the table
* @param tableIdentifier {@linkorg.apache.iceberg.catalog.TableIdentifier} identifier
* of the target table
- * @param whereExpressions list of
+ * @param whereExpressions Collection of
* {@link org.apache.iceberg.expressions.Expression}
* filters; empty list for none
* @param columns collection of column names to project; empty list for
@@ -61,7 +61,7 @@ public class JavaApacheIcebergSource extends ApacheIcebergSource implements Java
*/
public JavaApacheIcebergSource(Catalog catalog, TableIdentifier tableIdentifier,
- List whereExpressions, Collection columns) {
+ Expression[] whereExpressions, String[] columns) {
super(ApacheIcebergSource.create(catalog, tableIdentifier, whereExpressions, columns));
}
From 3c5efc1b2a640e2a588aa7bdf3cc02be06f63bab Mon Sep 17 00:00:00 2001
From: ChristofferKristensen
<83878502+ChristofferKristensen@users.noreply.github.com>
Date: Tue, 13 Jan 2026 19:32:13 +0100
Subject: [PATCH 09/16] add methods to read from and write to iceberg tables
---
.../org/apache/wayang/api/DataQuanta.scala | 41 ++++++++++++++++++-
.../apache/wayang/api/DataQuantaBuilder.scala | 21 ++++++++++
.../apache/wayang/api/JavaPlanBuilder.scala | 10 ++++-
.../org/apache/wayang/api/PlanBuilder.scala | 10 ++++-
4 files changed, 79 insertions(+), 3 deletions(-)
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
index 99f6f9cc7..e73057ee4 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
@@ -1013,7 +1013,6 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
this.planBuilder.buildAndExplain(toJson)
}
-
/**
* Write the data quanta in this instance to a text file. Triggers execution.
*
@@ -1027,6 +1026,44 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
writeTextFileJava(url, toSerializableFunction(formatterUdf), udfLoad)
}
+ /**
+ * Write the data quanta in this instance to a iceberg table. Triggers execution.
+ *
+ * @param catalog Iceberg Catalog
+ * @param schema Iceberg Schema of the table to create
+ * @param tableIdentifier Iceberg Table Identifier of the table to create
+ * @param outputFileFormat File format of the output data files
+ */
+
+ def writeIcebergTable(catalog: org.apache.iceberg.catalog.Catalog,
+ schema: org.apache.iceberg.Schema,
+ tableIdentifier: org.apache.iceberg.catalog.TableIdentifier,
+ outputFileFormat: org.apache.iceberg.FileFormat): Unit = {
+ writeIcebergTableJava(catalog, schema, tableIdentifier, outputFileFormat)
+ }
+
+ /**
+ * Write the data quanta in this instance to a iceberg table. Triggers execution.
+ *
+ * @param catalog Iceberg Catalog
+ * @param schema Iceberg Schema of the table to create
+ * @param tableIdentifier Iceberg Table Identifier of the table to create
+ * @param outputFileFormat File format of the output data files
+ */
+ def writeIcebergTableJava(
+ catalog: org.apache.iceberg.catalog.Catalog,
+ schema: org.apache.iceberg.Schema,
+ tableIdentifier: org.apache.iceberg.catalog.TableIdentifier,
+ outputFileFormat: org.apache.iceberg.FileFormat ): Unit = {
+
+ val sink = new ApacheIcebergSink(catalog, schema, tableIdentifier, outputFileFormat)
+
+ sink.setName(s"*#-> Write to Iceberg Table Sink ")
+ this.connectTo(sink, 0)
+ this.planBuilder.sinks += sink
+ this.planBuilder.buildAndExecute()
+ this.planBuilder.sinks.clear()
+ }
/**
* Write the data quanta in this instance to a text file. Triggers execution.
*
@@ -1090,6 +1127,8 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
this.planBuilder.sinks.clear()
}
+
+
/**
* Write the data quanta in this instance to a Object file. Triggers execution.
*
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
index 6869dd439..65ddc31cb 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
@@ -499,6 +499,27 @@ trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging
this.dataQuanta().writeTextFileJava(url, formatterUdf, udfLoadProfileEstimator)
}
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.IcebergTableSink]]. This triggers
+ * execution of the constructed [[WayangPlan]].
+ *
+ * @param catalog Iceberg Catalog
+ * @param schema Iceberg Schema of the table to create
+ * @param tableIdentifier Iceberg Table Identifier of the table to create
+ * @param outputFileFormat File format of the output data files
+ * @return the collected data quanta
+ */
+
+ def writeIcebergTable(catalog: org.apache.iceberg.catalog.Catalog,
+ schema: org.apache.iceberg.Schema,
+ tableIdentifier: org.apache.iceberg.catalog.TableIdentifier,
+ outputFileFormat: org.apache.iceberg.FileFormat,
+ jobName: String): Unit = {
+ this.javaPlanBuilder.withJobName(jobName)
+ this.dataQuanta().writeIcebergTableJava(catalog, schema, tableIdentifier, outputFileFormat)
+
+ }
+
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.KafkaTopicSink]]. This triggers
* execution of the constructed [[WayangPlan]].
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
index 00d54b5e3..46ff0dbc2 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
@@ -72,7 +72,15 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
def readParquet(url: String, projection: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
createSourceBuilder(ParquetSource.create(url, projection))(ClassTag(classOf[Record]))
-
+ /**
+ * Read an Apache Iceberg table and provide it as a dataset of [[Record]]s.
+ *
+ * @param catalog the Iceberg catalog containing the table
+ * @param tableIdentifier the identifier of the Iceberg table to read
+ * @param filterExpressions optional array of filter expressions to apply during the read
+ * @param projectionColumns optional array of column names to project (select specific columns)
+ * @return [[DataQuantaBuilder]] for the Iceberg table
+ */
def readApacheIcebergTable(
catalog: org.apache.iceberg.catalog.Catalog,
tableIdentifier: org.apache.iceberg.catalog.TableIdentifier,
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
index 151f2021d..338d64616 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
@@ -140,7 +140,15 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job
*/
def readParquet(url: String, projection: Array[String] = null): DataQuanta[Record] = load(ParquetSource.create(url, projection))
-
+ /**
+ * Read an Apache Iceberg table and provide it as a dataset of [[Record]]s.
+ *
+ * @param catalog the Iceberg catalog containing the table
+ * @param tableIdentifier the identifier of the Iceberg table to read
+ * @param filterExpressions optional array of filter expressions to apply during the read
+ * @param projectionColumns optional array of column names to project (select specific columns)
+ * @return [[DataQuanta]] of [[Record]] for the Iceberg table
+ */
def readApacheIcebergTable(
catalog: org.apache.iceberg.catalog.Catalog,
tableIdentifier: org.apache.iceberg.catalog.TableIdentifier,
From 2e5c8859c46f667cbbcdc942799d4974f69b7511 Mon Sep 17 00:00:00 2001
From: ChristofferKristensen
<83878502+ChristofferKristensen@users.noreply.github.com>
Date: Tue, 13 Jan 2026 19:34:17 +0100
Subject: [PATCH 10/16] add source operators for apache iceberg for tje Java
Platform
---
.../basic/operators/ApacheIcebergSource.java | 19 ++++++++++++++++---
.../operators/JavaApacheIcebergSource.java | 4 ++--
2 files changed, 18 insertions(+), 5 deletions(-)
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
index df8e3b022..b6d724f39 100644
--- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
@@ -37,8 +37,10 @@
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
@@ -82,12 +84,20 @@ public class ApacheIcebergSource extends UnarySource {
public static ApacheIcebergSource create(Catalog catalog, TableIdentifier tableIdentifier,
org.apache.iceberg.expressions.Expression[] whereExpressions,
String[] columns) {
-
- return new ApacheIcebergSource(catalog, tableIdentifier, Arrays.asList(whereExpressions), Arrays.asList(columns));
+
+ List whereList =
+ (whereExpressions == null) ? Collections.emptyList() : Arrays.asList(whereExpressions);
+
+ List columnList =
+ (columns == null) ? Collections.emptyList() : Arrays.asList(columns);
+
+ return new ApacheIcebergSource(catalog, tableIdentifier, whereList, columnList);
}
public ApacheIcebergSource(Catalog catalog, TableIdentifier tableIdentifier,
Collection whereExpressions, Collection columns) {
+
+
super(createOutputDataSetType(columns));
this.catalog = catalog;
this.tableIdentifier = tableIdentifier;
@@ -184,7 +194,10 @@ public CardinalityEstimate estimate(OptimizationContext optimizationContext,
*/
private static DataSetType createOutputDataSetType(Collection columnNames) {
- String[] columnNamesAsArray = (String[]) columnNames.toArray();
+ if (columnNames == null) {
+ columnNames = new ArrayList();
+ }
+ String[] columnNamesAsArray = columnNames.toArray(new String[0]);
return columnNamesAsArray.length == 0 ? DataSetType.createDefault(Record.class)
: DataSetType.createDefault(new RecordType(columnNamesAsArray));
}
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java
index 1a489a47f..ed6e5e735 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java
@@ -43,7 +43,8 @@
/**
* This is execution operator implements the {@link ApacheIcebergSource}.
*/
-public class JavaApacheIcebergSource extends ApacheIcebergSource implements JavaExecutionOperator {
+public class JavaApacheIcebergSource
+extends ApacheIcebergSource implements JavaExecutionOperator {
/**
* Creates a new Java Iceberg source instance.
@@ -65,7 +66,6 @@ public JavaApacheIcebergSource(Catalog catalog, TableIdentifier tableIdentifier,
super(ApacheIcebergSource.create(catalog, tableIdentifier, whereExpressions, columns));
}
- // TODO make support for parallel?
/**
* Creates a {@link Stream} of {@link org.apache.wayang.basic.data.Record}
* objects
From f552d28bde00a8daa73e9edfe2962dde713ee016 Mon Sep 17 00:00:00 2001
From: ChristofferKristensen
<83878502+ChristofferKristensen@users.noreply.github.com>
Date: Tue, 13 Jan 2026 19:34:43 +0100
Subject: [PATCH 11/16] add sink operators for Apache Iceberg for the Java
Platform
---
.../basic/operators/ApacheIcebergSink.java | 52 ++++----------
.../mapping/ApacheIcebergSinkMapping.java | 18 +----
.../java/operators/JavaApacheIcebergSink.java | 70 ++++++++-----------
3 files changed, 46 insertions(+), 94 deletions(-)
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSink.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSink.java
index f333b39da..e9a2b3626 100644
--- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSink.java
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSink.java
@@ -24,6 +24,7 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.FileFormat;
+import org.apache.wayang.basic.data.Record;
/**
* This {@link UnarySink} writes all incoming data quanta to an iceberg table.
@@ -31,51 +32,29 @@
*
* @param Data Type if the incoming Data Quanta
*/
-public class ApacheIcebergSink extends UnarySink {
+public class ApacheIcebergSink extends UnarySink {
protected final Catalog catalog;
protected final Schema schema;
protected final TableIdentifier tableIdentifier;
- protected final Class tClass;
+ protected final FileFormat outputFileFormat;
/**
- * Creates a new sink.
*
* @param catalog Iceberg catalog used to resolve the target table; must
* not be {@code null}
* @param schema Iceberg write schema; must be compatible with the
* target table
* @param tableIdentifier fully qualified identifier of the target table
- * @param type {@link DataSetType} of the incoming data quanta
- */
- public ApacheIcebergSink(Catalog catalog, Schema schema, TableIdentifier tableIdentifier,
- DataSetType type) {
-
- super(type);
- this.catalog = catalog;
- this.schema = schema;
- this.tableIdentifier = tableIdentifier;
- this.tClass = type.getDataUnitType().getTypeClass();
-
- }
-
- /**
- * Creates a new sink.
- *
- * @param catalog Iceberg catalog used to resolve the target table; must
- * not be {@code null}
- * @param schema Iceberg write schema; must be compatible with the
- * target table
- * @param tableIdentifier fully qualified identifier of the target table
- * @param typeClass {@link Class} of incoming data quanta
+ *
+ * @param outputFileFormat {@link FileFormat} the format of the output data files
*/
- public ApacheIcebergSink(Catalog catalog, Schema schema, TableIdentifier tableIdentifier,
- Class typeClass) {
- super(DataSetType.createDefault(typeClass));
+ public ApacheIcebergSink(Catalog catalog, Schema schema, TableIdentifier tableIdentifier, FileFormat outputFileFormat) {
+ super(DataSetType.createDefault(Record.class));
this.catalog = catalog;
this.schema = schema;
this.tableIdentifier = tableIdentifier;
- this.tClass = typeClass;
+ this.outputFileFormat = outputFileFormat;
}
/**
@@ -83,22 +62,15 @@ public ApacheIcebergSink(Catalog catalog, Schema schema, TableIdentifier tableId
*
* @param that should be copied
*/
- public ApacheIcebergSink(ApacheIcebergSink that) {
+ public ApacheIcebergSink(ApacheIcebergSink that) {
super(that);
this.catalog = that.catalog;
this.schema = that.schema;
this.tableIdentifier = that.tableIdentifier;
- this.tClass = that.tClass;
+ this.outputFileFormat = that.outputFileFormat;
}
- /**
- * Returns the {@link FileFormat} used for writing.
- * Subclasses should override this method to specify a concrete format.
- *
- * @return the file format
- * @throws UnsupportedOperationException if not overridden
- */
- public FileFormat getFileFormat() {
- throw new UnsupportedOperationException("Subclasses must implement getFileFormat().");
+ public FileFormat getOutputFileFormat() {
+ return this.outputFileFormat;
}
}
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSinkMapping.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSinkMapping.java
index 08a2d53b1..18fc7862f 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSinkMapping.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSinkMapping.java
@@ -49,26 +49,14 @@ public Collection getTransformations() {
}
private SubplanPattern createSubplanPattern() {
- ApacheIcebergSink icebergSink = new ApacheIcebergSink<>(null, null, null,
- DataSetType.none().getDataUnitType().getTypeClass());
+ ApacheIcebergSink icebergSink = new ApacheIcebergSink(null, null, null, null);
final OperatorPattern operatorPattern = new OperatorPattern<>(
"sink", icebergSink, false);
return SubplanPattern.createSingleton(operatorPattern);
}
private ReplacementSubplanFactory createReplacementSubplanFactory() {
- return new ReplacementSubplanFactory.OfSingleOperators>(
- (matchedOperator, epoch) -> copyAny(matchedOperator).at(epoch));
+ return new ReplacementSubplanFactory.OfSingleOperators(
+ (matchedOperator, epoch) -> new JavaApacheIcebergSink(matchedOperator).at(epoch));
}
-
- private static JavaApacheIcebergSink> copyAny(ApacheIcebergSink> that) {
- return copyUnchecked(that);
- }
-
- @SuppressWarnings("unchecked")
- private static JavaApacheIcebergSink copyUnchecked(
- ApacheIcebergSink> that) {
- return new JavaApacheIcebergSink<>((ApacheIcebergSink) that);
- }
-
}
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSink.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSink.java
index c7d16e478..f2042f61f 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSink.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSink.java
@@ -20,6 +20,7 @@
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
@@ -34,7 +35,7 @@
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.data.Record;
-
+import org.apache.wayang.basic.channels.FileChannel;
import org.apache.wayang.basic.operators.ApacheIcebergSink;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
@@ -59,13 +60,11 @@
* {@link org.apache.wayang.basic.data.Record} (or subclass).
*/
-public class JavaApacheIcebergSink extends ApacheIcebergSink
+public class JavaApacheIcebergSink extends ApacheIcebergSink
implements JavaExecutionOperator {
- // private final Logger logger = LogManager.getLogger(this.getClass());
private final int defaultPartitionId = 1;
private final int defaultTaskId = 1;
- private FileFormat fileFormat = FileFormat.PARQUET;
/**
* Creates a new sink for the Java Platform.
@@ -75,29 +74,17 @@ public class JavaApacheIcebergSink type) {
- super(catalog, schema, tableIdentifier, type);
- this.fileFormat = fileFormat;
+ public JavaApacheIcebergSink(
+ Catalog catalog,
+ Schema schema,
+ TableIdentifier tableIdentifier,
+ FileFormat outputFileFormnat) {
+ super(catalog, schema, tableIdentifier, outputFileFormnat);
}
- /**
- * Creates a new sink for the Java Platform using Parquet as the default file
- * format.
- *
- * @param catalog Iceberg catalog used to resolve the target table; must
- * not be {@code null}
- * @param schema Iceberg write schema; must be compatible with the
- * target table
- * @param tableIdentifier fully qualified identifier of the target table
- * @param type {@link DataSetType} of the incoming data quanta
- */
- public JavaApacheIcebergSink(Catalog catalog, Schema schema, TableIdentifier tableIdentifier, DataSetType type) {
- super(catalog, schema, tableIdentifier, type);
- }
/**
* Copy constructor.
@@ -105,19 +92,11 @@ public JavaApacheIcebergSink(Catalog catalog, Schema schema, TableIdentifier tab
* @param that sink instance to copy
*/
- public JavaApacheIcebergSink(ApacheIcebergSink that) {
+ public JavaApacheIcebergSink(ApacheIcebergSink that) {
super(that);
- this.fileFormat = that.getFileFormat();
}
- private boolean tableExists() {
- return catalog.tableExists(tableIdentifier);
- }
- @Override
- public FileFormat getFileFormat() {
- return this.fileFormat;
- }
@Override
public Tuple, Collection> evaluate(
@@ -131,6 +110,8 @@ public Tuple, Collection> eval
assert inputs.length == 1;
assert outputs.length == 2;
+ FileFormat outputFileFormat = getOutputFileFormat();
+
JavaChannelInstance input = (JavaChannelInstance) inputs[0];
if (!tableExists()) {
@@ -138,24 +119,26 @@ public Tuple, Collection> eval
}
Stream inputStream = input
- .provideStream()
- .map(r -> wayangRecordToIcebergRecord(r));
+ .provideStream()
+ .peek(r -> {
+ if (!(r instanceof org.apache.wayang.basic.data.Record)) {
+ throw new WayangException("Expected Wayang Record but got " + r.getClass());
+ }})
+ .map(r -> wayangRecordToIcebergRecord((org.apache.wayang.basic.data.Record) r));
Table table = catalog.loadTable(tableIdentifier);
OutputFileFactory outputFileFactory = OutputFileFactory
.builderFor(table, this.defaultPartitionId, this.defaultTaskId)
- .format(fileFormat)
+ .format(outputFileFormat)
.build();
- // TODO HOW SHOULD WE PASS DOWN PARTITIONS OR USE PARTITIONS?
EncryptedOutputFile outputFile = outputFileFactory.newOutputFile();
FileAppenderFactory appenderFactory = new org.apache.iceberg.data.GenericAppenderFactory(
this.schema);
- // TODO ADD SUPPORT FOR PARITTION ALSO
try (DataWriter writer = appenderFactory.newDataWriter(outputFile,
- fileFormat, /* Partition */null)) {
+ outputFileFormat, /* Partition */null)) {
inputStream.forEach(dataQuanta -> {
writer.write(dataQuanta);
@@ -171,6 +154,11 @@ public Tuple, Collection> eval
}
+ @Override
+ protected ExecutionOperator createCopy() {
+ return new JavaApacheIcebergSink(this.catalog, this.schema, this.tableIdentifier, getOutputFileFormat());
+ }
+
@Override
public String getLoadProfileEstimatorConfigurationKey() {
return "wayang.java.apacheicebergsink.load";
@@ -186,7 +174,7 @@ public List getSupportedOutputChannels(int index) {
throw new UnsupportedOperationException();
}
- private org.apache.iceberg.data.Record wayangRecordToIcebergRecord(R wayangRecord) {
+ private org.apache.iceberg.data.Record wayangRecordToIcebergRecord(org.apache.wayang.basic.data.Record wayangRecord) {
GenericRecord template = GenericRecord.create(this.schema);
Record out = template.copy();
@@ -198,4 +186,8 @@ private org.apache.iceberg.data.Record wayangRecordToIcebergRecord(R wayangRecor
return out;
};
+ private boolean tableExists() {
+ return catalog.tableExists(tableIdentifier);
+ }
+
}
From 0e67d6a32e6dbaaca20a3061bf52420bd2d4f9f9 Mon Sep 17 00:00:00 2001
From: Christoffer Kristensen
<73429946+ChristofferEmilKristensen@users.noreply.github.com>
Date: Wed, 14 Jan 2026 18:37:03 +0000
Subject: [PATCH 12/16] update imports for wayang-api-scala-java files
---
.../org/apache/wayang/api/DataQuantaBuilder.scala | 12 ++++++++----
.../org/apache/wayang/api/JavaPlanBuilder.scala | 8 +++++---
.../scala/org/apache/wayang/api/PlanBuilder.scala | 8 +++++---
3 files changed, 18 insertions(+), 10 deletions(-)
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
index ddc1056b4..dad054a2f 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
@@ -40,6 +40,10 @@ import org.apache.wayang.core.types.DataSetType
import org.apache.wayang.core.util.{Logging, ReflectionUtils, WayangCollections, Tuple => WayangTuple}
import org.apache.wayang.core.plan.wayangplan.OutputSlot
+import org.apache.iceberg.Schema
+import org.apache.iceberg.FileFormat
+import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
+
import scala.collection.mutable.ListBuffer
@@ -510,10 +514,10 @@ trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging
* @return the collected data quanta
*/
- def writeIcebergTable(catalog: org.apache.iceberg.catalog.Catalog,
- schema: org.apache.iceberg.Schema,
- tableIdentifier: org.apache.iceberg.catalog.TableIdentifier,
- outputFileFormat: org.apache.iceberg.FileFormat,
+ def writeIcebergTable(catalog: Catalog,
+ schema: Schema,
+ tableIdentifier: TableIdentifier,
+ outputFileFormat: FileFormat,
jobName: String): Unit = {
this.javaPlanBuilder.withJobName(jobName)
this.dataQuanta().writeIcebergTableJava(catalog, schema, tableIdentifier, outputFileFormat)
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
index dd16d9687..1f20bd808 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
@@ -32,6 +32,8 @@ import org.apache.wayang.core.plan.wayangplan._
import org.apache.wayang.core.types.DataSetType
import scala.reflect.ClassTag
+import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
+import org.apache.iceberg.expressions.Expression
/**
* Utility to build and execute [[WayangPlan]]s.
@@ -89,9 +91,9 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
* @return [[DataQuantaBuilder]] for the Iceberg table
*/
def readApacheIcebergTable(
- catalog: org.apache.iceberg.catalog.Catalog,
- tableIdentifier: org.apache.iceberg.catalog.TableIdentifier,
- filterExpressions: Array[org.apache.iceberg.expressions.Expression] = null,
+ catalog: Catalog,
+ tableIdentifier:TableIdentifier,
+ filterExpressions: Array[Expression] = null,
projectionColumns: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
createSourceBuilder(ApacheIcebergSource.create(catalog, tableIdentifier, filterExpressions, projectionColumns))(ClassTag(classOf[Record]))
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
index 25be11a83..d49deded1 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
@@ -34,6 +34,8 @@ import scala.collection.JavaConversions
import scala.collection.mutable.ListBuffer
import scala.language.implicitConversions
import scala.reflect._
+import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
+import org.apache.iceberg.expressions.Expression
/**
* Utility to build [[WayangPlan]]s.
@@ -154,9 +156,9 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job
* @return [[DataQuanta]] of [[Record]] for the Iceberg table
*/
def readApacheIcebergTable(
- catalog: org.apache.iceberg.catalog.Catalog,
- tableIdentifier: org.apache.iceberg.catalog.TableIdentifier,
- filterExpressions: Array[org.apache.iceberg.expressions.Expression] = null,
+ catalog: Catalog,
+ tableIdentifier: TableIdentifier,
+ filterExpressions: Array[Expression] = null,
projectionColumns: Array[String] = null): DataQuanta[Record] = load(ApacheIcebergSource.create(catalog, tableIdentifier, filterExpressions, projectionColumns))
/**
From 813e0506d8907033ff21fee3b4e21dc3cd174632 Mon Sep 17 00:00:00 2001
From: Christoffer Kristensen
<73429946+ChristofferEmilKristensen@users.noreply.github.com>
Date: Wed, 14 Jan 2026 18:41:10 +0000
Subject: [PATCH 13/16] update imports so all class names are not fully
qualifed with namespace references
---
.../wayang/basic/operators/ApacheIcebergSource.java | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
index b6d724f39..e46ad774a 100644
--- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
@@ -48,6 +48,8 @@
import org.apache.commons.lang3.Validate;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
+import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator;
+
/**
* This source reads an Iceberg Table and outputs the lines as
@@ -82,10 +84,10 @@ public class ApacheIcebergSource extends UnarySource {
* @return a new {@link ApacheIcebergSource} instance
*/
public static ApacheIcebergSource create(Catalog catalog, TableIdentifier tableIdentifier,
- org.apache.iceberg.expressions.Expression[] whereExpressions,
+ Expression[] whereExpressions,
String[] columns) {
- List whereList =
+ List whereList =
(whereExpressions == null) ? Collections.emptyList() : Arrays.asList(whereExpressions);
List columnList =
@@ -120,14 +122,14 @@ public ApacheIcebergSource(ApacheIcebergSource that) {
}
@Override
- public Optional createCardinalityEstimator(
+ public Optional createCardinalityEstimator(
final int outputIndex,
final Configuration configuration) {
Validate.inclusiveBetween(0, this.getNumOutputs() - 1, outputIndex);
return Optional.of(new ApacheIcebergSource.CardinalityEstimator());
}
- protected class CardinalityEstimator implements org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator {
+ protected class CardinalityEstimator implements CardinalityEstimator {
public final CardinalityEstimate FALLBACK_ESTIMATE = new CardinalityEstimate(1000L, 100000000L, 0.7);
@@ -238,7 +240,7 @@ public String getIcebergTableName() {
*
* @return the loaded Iceberg table
*/
- private org.apache.iceberg.Table getTable() {
+ private Table getTable() {
if (this.cachedTable != null) {
return this.cachedTable;
}
From a0df79dafb2e87b1a4ee435bffa2604957711fe9 Mon Sep 17 00:00:00 2001
From: ChristofferKristensen
<83878502+ChristofferKristensen@users.noreply.github.com>
Date: Thu, 15 Jan 2026 12:09:53 +0100
Subject: [PATCH 14/16] change references to be apache iceberg source
---
.../apache/wayang/java/operators/JavaApacheIcebergSource.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java
index ed6e5e735..1b10eee91 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java
@@ -117,10 +117,10 @@ public Tuple, Collection> eval
ExecutionLineageNode prepareLineageNode = new ExecutionLineageNode(operatorContext);
prepareLineageNode.add(LoadProfileEstimators.createFromSpecification(
- "wayang.java.parquetsource.load.prepare", javaExecutor.getConfiguration()));
+ "wayang.java.apacheicebergsource.load.prepare", javaExecutor.getConfiguration()));
ExecutionLineageNode mainLineageNode = new ExecutionLineageNode(operatorContext);
mainLineageNode.add(LoadProfileEstimators.createFromSpecification(
- "wayang.java.parquetsource.load.main", javaExecutor.getConfiguration()));
+ "wayang.java.apacheicebergsource.load.main", javaExecutor.getConfiguration()));
outputs[0].getLineage().addPredecessor(mainLineageNode);
From 8f99f1ab375c55931207b626ea8d4dc6c6fa2ed5 Mon Sep 17 00:00:00 2001
From: ChristofferKristensen
<83878502+ChristofferKristensen@users.noreply.github.com>
Date: Thu, 15 Jan 2026 12:15:49 +0100
Subject: [PATCH 15/16] fix unamibious reference with CardinalityEstimators
---
.../apache/wayang/basic/operators/ApacheIcebergSource.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
index e46ad774a..3f188f418 100644
--- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
@@ -122,14 +122,14 @@ public ApacheIcebergSource(ApacheIcebergSource that) {
}
@Override
- public Optional createCardinalityEstimator(
+ public Optional createCardinalityEstimator(
final int outputIndex,
final Configuration configuration) {
Validate.inclusiveBetween(0, this.getNumOutputs() - 1, outputIndex);
return Optional.of(new ApacheIcebergSource.CardinalityEstimator());
}
- protected class CardinalityEstimator implements CardinalityEstimator {
+ protected class CardinalityEstimator implements org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator {
public final CardinalityEstimate FALLBACK_ESTIMATE = new CardinalityEstimate(1000L, 100000000L, 0.7);
From d2fc0d55a0c74cac4423cb68d719edd318e7facb Mon Sep 17 00:00:00 2001
From: ChristofferKristensen
<83878502+ChristofferKristensen@users.noreply.github.com>
Date: Thu, 15 Jan 2026 12:34:35 +0100
Subject: [PATCH 16/16] add space between tableIdentifier and Class object.
---
.../src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
index 1f20bd808..5e464c76b 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
@@ -92,7 +92,7 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
*/
def readApacheIcebergTable(
catalog: Catalog,
- tableIdentifier:TableIdentifier,
+ tableIdentifier: TableIdentifier,
filterExpressions: Array[Expression] = null,
projectionColumns: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
createSourceBuilder(ApacheIcebergSource.create(catalog, tableIdentifier, filterExpressions, projectionColumns))(ClassTag(classOf[Record]))