From 8ec8b81078bd80b89f840842c8714318ab79f25c Mon Sep 17 00:00:00 2001 From: Celeste Zeng <61256376+celeste-zeng@users.noreply.github.com> Date: Tue, 14 Oct 2025 18:51:31 +0000 Subject: [PATCH 01/15] Address circular dependencies. --- .../benchmarks/nexmark/models/auction_bid.py | 4 +- .../nexmark/models/nexmark_model.py | 8 ++-- .../nexmark/models/nextmark_json_util.py | 46 +++++++++++++++++++ .../benchmarks/nexmark/nexmark_util.py | 32 ++----------- 4 files changed, 57 insertions(+), 33 deletions(-) create mode 100644 sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py index 7424a3a48355..a8844fa65b5e 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py @@ -18,7 +18,7 @@ """Result of WinningBid transform.""" from apache_beam.coders import coder_impl from apache_beam.coders.coders import FastCoder -from apache_beam.testing.benchmarks.nexmark import nexmark_util +from apache_beam.testing.benchmarks.nexmark.models import nexmark_json_util from apache_beam.testing.benchmarks.nexmark.models import nexmark_model @@ -41,7 +41,7 @@ def __init__(self, auction, bid): self.bid = bid def __repr__(self): - return nexmark_util.model_to_json(self) + return nexmark_json_util.model_to_json(self) class AuctionBidCoderImpl(coder_impl.StreamCoderImpl): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 4613d7f90c26..694b9d1768c7 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -29,7 +29,7 @@ from apache_beam.coders import coder_impl from apache_beam.coders.coders import FastCoder from apache_beam.coders.coders import StrUtf8Coder -from apache_beam.testing.benchmarks.nexmark import nexmark_util +from apache_beam.testing.benchmarks.nexmark.models import nexmark_json_util class PersonCoder(FastCoder): @@ -59,7 +59,7 @@ def __init__( self.extra = extra def __repr__(self): - return nexmark_util.model_to_json(self) + return nexmark_json_util.model_to_json(self) class AuctionCoder(FastCoder): @@ -101,7 +101,7 @@ def __init__( self.extra = extra def __repr__(self): - return nexmark_util.model_to_json(self) + return nexmark_json_util.model_to_json(self) class BidCoder(FastCoder): @@ -127,7 +127,7 @@ def __init__(self, auction, bidder, price, date_time, extra=None): self.extra = extra def __repr__(self): - return nexmark_util.model_to_json(self) + return nexmark_json_util.model_to_json(self) class AuctionCoderImpl(coder_impl.StreamCoderImpl): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py new file mode 100644 index 000000000000..83b2db1bef59 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py @@ -0,0 +1,46 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""JSON utilities for the Nexmark suite.""" + +import json + +from apache_beam.testing.benchmarks.nexmark.models import nexmark_model +from apache_beam.utils.timestamp import Timestamp + + +def model_to_json(model): + return json.dumps(construct_json_dict(model), separators=(',', ':')) + + +def construct_json_dict(model): + return {k: unnest_to_json(v) for k, v in model.__dict__.items()} + + +def unnest_to_json(cand): + if isinstance(cand, Timestamp): + return cand.micros // 1000 + elif isinstance( + cand, (nexmark_model.Auction, nexmark_model.Bid, nexmark_model.Person)): + return construct_json_dict(cand) + else: + return cand + + +def millis_to_timestamp(millis: int) -> Timestamp: + micro_second = millis * 1000 + return Timestamp(micros=micro_second) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index ef53156d8be0..3ca94fb6048b 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -42,6 +42,7 @@ from apache_beam.metrics import MetricsFilter from apache_beam.runners.runner import PipelineResult # pylint: disable=unused-import from apache_beam.testing.benchmarks.nexmark.models import auction_bid +from apache_beam.testing.benchmarks.nexmark.models import nexmark_json_util from apache_beam.testing.benchmarks.nexmark.models import nexmark_model from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldNames from apache_beam.transforms import window @@ -155,7 +156,7 @@ def process(self, elem): json_dict[FieldNames.CREDIT_CARD], json_dict[FieldNames.CITY], json_dict[FieldNames.STATE], - millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + nexmark_json_util.millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), json_dict[FieldNames.EXTRA]) elif FieldNames.ITEM_NAME in json_dict: if type(json_dict[FieldNames.EXPIRES]) is dict: @@ -166,8 +167,8 @@ def process(self, elem): json_dict[FieldNames.DESCRIPTION], json_dict[FieldNames.INITIAL_BID], json_dict[FieldNames.RESERVE], - millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), - millis_to_timestamp(json_dict[FieldNames.EXPIRES]), + nexmark_json_util.millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + nexmark_json_util.millis_to_timestamp(json_dict[FieldNames.EXPIRES]), json_dict[FieldNames.SELLER], json_dict[FieldNames.CATEGORY], json_dict[FieldNames.EXTRA]) @@ -176,7 +177,7 @@ def process(self, elem): json_dict[FieldNames.AUCTION], json_dict[FieldNames.BIDDER], json_dict[FieldNames.PRICE], - millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + nexmark_json_util.millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), json_dict[FieldNames.EXTRA]) else: raise ValueError('Invalid event: %s.' % str(json_dict)) @@ -201,29 +202,6 @@ def display(elm): return elm -def model_to_json(model): - return json.dumps(construct_json_dict(model), separators=(',', ':')) - - -def construct_json_dict(model): - return {k: unnest_to_json(v) for k, v in model.__dict__.items()} - - -def unnest_to_json(cand): - if isinstance(cand, Timestamp): - return cand.micros // 1000 - elif isinstance( - cand, (nexmark_model.Auction, nexmark_model.Bid, nexmark_model.Person)): - return construct_json_dict(cand) - else: - return cand - - -def millis_to_timestamp(millis: int) -> Timestamp: - micro_second = millis * 1000 - return Timestamp(micros=micro_second) - - def get_counter_metric( result: PipelineResult, namespace: str, name: str) -> int: """ From 2268152446596bb83a33d105ad3d7869aa0d5557 Mon Sep 17 00:00:00 2001 From: Celeste Zeng <61256376+celeste-zeng@users.noreply.github.com> Date: Tue, 14 Oct 2025 19:50:44 +0000 Subject: [PATCH 02/15] Fix formatting. --- .../nexmark/models/nextmark_json_util.py | 3 ++- .../benchmarks/nexmark/nexmark_util.py | 21 +++++++++++++------ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py index 83b2db1bef59..8d01c1f68223 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py @@ -35,7 +35,8 @@ def unnest_to_json(cand): if isinstance(cand, Timestamp): return cand.micros // 1000 elif isinstance( - cand, (nexmark_model.Auction, nexmark_model.Bid, nexmark_model.Person)): + cand, (nexmark_model.Auction, nexmark_model.Bid, nexmark_model.Person) + ): return construct_json_dict(cand) else: return cand diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index 3ca94fb6048b..cb6085017630 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -156,8 +156,11 @@ def process(self, elem): json_dict[FieldNames.CREDIT_CARD], json_dict[FieldNames.CITY], json_dict[FieldNames.STATE], - nexmark_json_util.millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), - json_dict[FieldNames.EXTRA]) + nexmark_json_util.millis_to_timestamp( + json_dict[FieldNames.DATE_TIME] + ), + json_dict[FieldNames.EXTRA], + ) elif FieldNames.ITEM_NAME in json_dict: if type(json_dict[FieldNames.EXPIRES]) is dict: json_dict[FieldNames.EXPIRES] = json_dict[FieldNames.EXPIRES]['millis'] @@ -167,18 +170,24 @@ def process(self, elem): json_dict[FieldNames.DESCRIPTION], json_dict[FieldNames.INITIAL_BID], json_dict[FieldNames.RESERVE], - nexmark_json_util.millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + nexmark_json_util.millis_to_timestamp( + json_dict[FieldNames.DATE_TIME] + ), nexmark_json_util.millis_to_timestamp(json_dict[FieldNames.EXPIRES]), json_dict[FieldNames.SELLER], json_dict[FieldNames.CATEGORY], - json_dict[FieldNames.EXTRA]) + json_dict[FieldNames.EXTRA], + ) elif FieldNames.AUCTION in json_dict: yield nexmark_model.Bid( json_dict[FieldNames.AUCTION], json_dict[FieldNames.BIDDER], json_dict[FieldNames.PRICE], - nexmark_json_util.millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), - json_dict[FieldNames.EXTRA]) + nexmark_json_util.millis_to_timestamp( + json_dict[FieldNames.DATE_TIME] + ), + json_dict[FieldNames.EXTRA], + ) else: raise ValueError('Invalid event: %s.' % str(json_dict)) From 60d97e001d755efc15bad9abd11368ec1459f59a Mon Sep 17 00:00:00 2001 From: Celeste Zeng <61256376+celeste-zeng@users.noreply.github.com> Date: Tue, 14 Oct 2025 21:16:31 +0000 Subject: [PATCH 03/15] Fix tests. --- .../benchmarks/nexmark/models/nextmark_json_util.py | 3 +-- .../testing/benchmarks/nexmark/nexmark_util.py | 9 +++------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py index 8d01c1f68223..83b2db1bef59 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py @@ -35,8 +35,7 @@ def unnest_to_json(cand): if isinstance(cand, Timestamp): return cand.micros // 1000 elif isinstance( - cand, (nexmark_model.Auction, nexmark_model.Bid, nexmark_model.Person) - ): + cand, (nexmark_model.Auction, nexmark_model.Bid, nexmark_model.Person)): return construct_json_dict(cand) else: return cand diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index cb6085017630..6aa01cb73e4a 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -157,8 +157,7 @@ def process(self, elem): json_dict[FieldNames.CITY], json_dict[FieldNames.STATE], nexmark_json_util.millis_to_timestamp( - json_dict[FieldNames.DATE_TIME] - ), + json_dict[FieldNames.DATE_TIME]), json_dict[FieldNames.EXTRA], ) elif FieldNames.ITEM_NAME in json_dict: @@ -171,8 +170,7 @@ def process(self, elem): json_dict[FieldNames.INITIAL_BID], json_dict[FieldNames.RESERVE], nexmark_json_util.millis_to_timestamp( - json_dict[FieldNames.DATE_TIME] - ), + json_dict[FieldNames.DATE_TIME]), nexmark_json_util.millis_to_timestamp(json_dict[FieldNames.EXPIRES]), json_dict[FieldNames.SELLER], json_dict[FieldNames.CATEGORY], @@ -184,8 +182,7 @@ def process(self, elem): json_dict[FieldNames.BIDDER], json_dict[FieldNames.PRICE], nexmark_json_util.millis_to_timestamp( - json_dict[FieldNames.DATE_TIME] - ), + json_dict[FieldNames.DATE_TIME]), json_dict[FieldNames.EXTRA], ) else: From cb2a021beacfb9a5f8f862c5d36b3c265052a269 Mon Sep 17 00:00:00 2001 From: Celeste Zeng <61256376+celeste-zeng@users.noreply.github.com> Date: Wed, 15 Oct 2025 02:35:48 +0000 Subject: [PATCH 04/15] Fix lint. --- .../testing/benchmarks/nexmark/models/nexmark_model.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 694b9d1768c7..258058e23e53 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -29,7 +29,6 @@ from apache_beam.coders import coder_impl from apache_beam.coders.coders import FastCoder from apache_beam.coders.coders import StrUtf8Coder -from apache_beam.testing.benchmarks.nexmark.models import nexmark_json_util class PersonCoder(FastCoder): @@ -58,9 +57,6 @@ def __init__( self.date_time = date_time self.extra = extra - def __repr__(self): - return nexmark_json_util.model_to_json(self) - class AuctionCoder(FastCoder): def to_type_hint(self): @@ -100,9 +96,6 @@ def __init__( self.category = category self.extra = extra - def __repr__(self): - return nexmark_json_util.model_to_json(self) - class BidCoder(FastCoder): def to_type_hint(self): @@ -126,9 +119,6 @@ def __init__(self, auction, bidder, price, date_time, extra=None): self.date_time = date_time self.extra = extra - def __repr__(self): - return nexmark_json_util.model_to_json(self) - class AuctionCoderImpl(coder_impl.StreamCoderImpl): _int_coder_impl = coder_impl.VarIntCoderImpl() From 82b3a60f525e190d33b7029c290cdbffe59f64d4 Mon Sep 17 00:00:00 2001 From: Celeste Zeng <61256376+celeste-zeng@users.noreply.github.com> Date: Wed, 15 Oct 2025 05:05:03 +0000 Subject: [PATCH 05/15] Remove unused import. --- .../apache_beam/testing/benchmarks/nexmark/nexmark_util.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index 6aa01cb73e4a..a430b6680481 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -46,7 +46,6 @@ from apache_beam.testing.benchmarks.nexmark.models import nexmark_model from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldNames from apache_beam.transforms import window -from apache_beam.utils.timestamp import Timestamp _LOGGER = logging.getLogger(__name__) From 598cf39513e7519e599de0c0819c3762b59a7cf7 Mon Sep 17 00:00:00 2001 From: Celeste Zeng <61256376+celeste-zeng@users.noreply.github.com> Date: Tue, 21 Oct 2025 00:20:07 +0000 Subject: [PATCH 06/15] Resolve circular dependency without removing __repr__. --- .../nexmark/models/nexmark_model.py | 13 ++++++++++++ .../nexmark/models/nextmark_json_util.py | 20 ++++++++++--------- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 258058e23e53..10638615d2bb 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -29,6 +29,7 @@ from apache_beam.coders import coder_impl from apache_beam.coders.coders import FastCoder from apache_beam.coders.coders import StrUtf8Coder +from apache_beam.testing.benchmarks.nexmark.models import nexmark_json_util class PersonCoder(FastCoder): @@ -57,6 +58,10 @@ def __init__( self.date_time = date_time self.extra = extra + def __repr__(self): + nexmark_types = (Person, Auction, Bid) + return nexmark_json_util.model_to_json(self, nexmark_types) + class AuctionCoder(FastCoder): def to_type_hint(self): @@ -96,6 +101,10 @@ def __init__( self.category = category self.extra = extra + def __repr__(self): + nexmark_types = (Person, Auction, Bid) + return nexmark_json_util.model_to_json(self, nexmark_types) + class BidCoder(FastCoder): def to_type_hint(self): @@ -119,6 +128,10 @@ def __init__(self, auction, bidder, price, date_time, extra=None): self.date_time = date_time self.extra = extra + def __repr__(self): + nexmark_types = (Person, Auction, Bid) + return nexmark_json_util.model_to_json(self, nexmark_types) + class AuctionCoderImpl(coder_impl.StreamCoderImpl): _int_coder_impl = coder_impl.VarIntCoderImpl() diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py index 83b2db1bef59..52fc09ec68da 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py @@ -19,24 +19,26 @@ import json -from apache_beam.testing.benchmarks.nexmark.models import nexmark_model from apache_beam.utils.timestamp import Timestamp -def model_to_json(model): - return json.dumps(construct_json_dict(model), separators=(',', ':')) +def model_to_json(model, nexmark_types): + return json.dumps( + construct_json_dict(model, nexmark_types), separators=(',', ':') + ) -def construct_json_dict(model): - return {k: unnest_to_json(v) for k, v in model.__dict__.items()} +def construct_json_dict(model, nexmark_types): + return { + k: unnest_to_json(v, nexmark_types) for k, v in model.__dict__.items() + } -def unnest_to_json(cand): +def unnest_to_json(cand, nexmark_types): if isinstance(cand, Timestamp): return cand.micros // 1000 - elif isinstance( - cand, (nexmark_model.Auction, nexmark_model.Bid, nexmark_model.Person)): - return construct_json_dict(cand) + elif isinstance(cand, nexmark_types): + return construct_json_dict(cand, nexmark_types) else: return cand From 40562f333edb9d216aa47bf63df9165dc28ea14d Mon Sep 17 00:00:00 2001 From: Celeste Zeng <61256376+celeste-zeng@users.noreply.github.com> Date: Tue, 21 Oct 2025 02:45:36 +0000 Subject: [PATCH 07/15] Fix formatting. --- .../testing/benchmarks/nexmark/models/nextmark_json_util.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py index 52fc09ec68da..6df670413a60 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py @@ -24,13 +24,13 @@ def model_to_json(model, nexmark_types): return json.dumps( - construct_json_dict(model, nexmark_types), separators=(',', ':') - ) + construct_json_dict(model, nexmark_types), separators=(',', ':')) def construct_json_dict(model, nexmark_types): return { - k: unnest_to_json(v, nexmark_types) for k, v in model.__dict__.items() + k: unnest_to_json(v, nexmark_types) + for k, v in model.__dict__.items() } From 56edfe7b3dbba881b06cd4aac0cde961e6741ca0 Mon Sep 17 00:00:00 2001 From: Celeste Zeng <61256376+celeste-zeng@users.noreply.github.com> Date: Tue, 21 Oct 2025 21:35:32 +0000 Subject: [PATCH 08/15] Remove nextmark_json_util and move all its methods into nextmark_model. --- .../benchmarks/nexmark/models/auction_bid.py | 3 +- .../nexmark/models/nexmark_model.py | 35 +++++++++++--- .../nexmark/models/nextmark_json_util.py | 48 ------------------- .../benchmarks/nexmark/nexmark_util.py | 12 ++--- 4 files changed, 33 insertions(+), 65 deletions(-) delete mode 100644 sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py index a8844fa65b5e..8cdb55686ab3 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py @@ -18,7 +18,6 @@ """Result of WinningBid transform.""" from apache_beam.coders import coder_impl from apache_beam.coders.coders import FastCoder -from apache_beam.testing.benchmarks.nexmark.models import nexmark_json_util from apache_beam.testing.benchmarks.nexmark.models import nexmark_model @@ -41,7 +40,7 @@ def __init__(self, auction, bid): self.bid = bid def __repr__(self): - return nexmark_json_util.model_to_json(self) + return nexmark_model.model_to_json(self) class AuctionBidCoderImpl(coder_impl.StreamCoderImpl): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 10638615d2bb..d50ac1f19cb6 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -26,10 +26,34 @@ - The bid on an item for auction (Bid). """ +import json + from apache_beam.coders import coder_impl from apache_beam.coders.coders import FastCoder from apache_beam.coders.coders import StrUtf8Coder -from apache_beam.testing.benchmarks.nexmark.models import nexmark_json_util +from apache_beam.utils.timestamp import Timestamp + + +def model_to_json(model): + return json.dumps(construct_json_dict(model), separators=(",", ":")) + + +def construct_json_dict(model): + return {k: unnest_to_json(v) for k, v in model.__dict__.items()} + + +def unnest_to_json(cand): + if isinstance(cand, Timestamp): + return cand.micros // 1000 + elif isinstance(cand, (Auction, Bid, Person)): + return construct_json_dict(cand) + else: + return cand + + +def millis_to_timestamp(millis: int) -> Timestamp: + micro_second = millis * 1000 + return Timestamp(micros=micro_second) class PersonCoder(FastCoder): @@ -59,8 +83,7 @@ def __init__( self.extra = extra def __repr__(self): - nexmark_types = (Person, Auction, Bid) - return nexmark_json_util.model_to_json(self, nexmark_types) + return model_to_json(self) class AuctionCoder(FastCoder): @@ -102,8 +125,7 @@ def __init__( self.extra = extra def __repr__(self): - nexmark_types = (Person, Auction, Bid) - return nexmark_json_util.model_to_json(self, nexmark_types) + return model_to_json(self) class BidCoder(FastCoder): @@ -129,8 +151,7 @@ def __init__(self, auction, bidder, price, date_time, extra=None): self.extra = extra def __repr__(self): - nexmark_types = (Person, Auction, Bid) - return nexmark_json_util.model_to_json(self, nexmark_types) + return model_to_json(self) class AuctionCoderImpl(coder_impl.StreamCoderImpl): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py deleted file mode 100644 index 6df670413a60..000000000000 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nextmark_json_util.py +++ /dev/null @@ -1,48 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""JSON utilities for the Nexmark suite.""" - -import json - -from apache_beam.utils.timestamp import Timestamp - - -def model_to_json(model, nexmark_types): - return json.dumps( - construct_json_dict(model, nexmark_types), separators=(',', ':')) - - -def construct_json_dict(model, nexmark_types): - return { - k: unnest_to_json(v, nexmark_types) - for k, v in model.__dict__.items() - } - - -def unnest_to_json(cand, nexmark_types): - if isinstance(cand, Timestamp): - return cand.micros // 1000 - elif isinstance(cand, nexmark_types): - return construct_json_dict(cand, nexmark_types) - else: - return cand - - -def millis_to_timestamp(millis: int) -> Timestamp: - micro_second = millis * 1000 - return Timestamp(micros=micro_second) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index a430b6680481..c4e2fdea0394 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -42,7 +42,6 @@ from apache_beam.metrics import MetricsFilter from apache_beam.runners.runner import PipelineResult # pylint: disable=unused-import from apache_beam.testing.benchmarks.nexmark.models import auction_bid -from apache_beam.testing.benchmarks.nexmark.models import nexmark_json_util from apache_beam.testing.benchmarks.nexmark.models import nexmark_model from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldNames from apache_beam.transforms import window @@ -155,8 +154,7 @@ def process(self, elem): json_dict[FieldNames.CREDIT_CARD], json_dict[FieldNames.CITY], json_dict[FieldNames.STATE], - nexmark_json_util.millis_to_timestamp( - json_dict[FieldNames.DATE_TIME]), + nexmark_model.millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), json_dict[FieldNames.EXTRA], ) elif FieldNames.ITEM_NAME in json_dict: @@ -168,9 +166,8 @@ def process(self, elem): json_dict[FieldNames.DESCRIPTION], json_dict[FieldNames.INITIAL_BID], json_dict[FieldNames.RESERVE], - nexmark_json_util.millis_to_timestamp( - json_dict[FieldNames.DATE_TIME]), - nexmark_json_util.millis_to_timestamp(json_dict[FieldNames.EXPIRES]), + nexmark_model.millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + nexmark_model.millis_to_timestamp(json_dict[FieldNames.EXPIRES]), json_dict[FieldNames.SELLER], json_dict[FieldNames.CATEGORY], json_dict[FieldNames.EXTRA], @@ -180,8 +177,7 @@ def process(self, elem): json_dict[FieldNames.AUCTION], json_dict[FieldNames.BIDDER], json_dict[FieldNames.PRICE], - nexmark_json_util.millis_to_timestamp( - json_dict[FieldNames.DATE_TIME]), + nexmark_model.millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), json_dict[FieldNames.EXTRA], ) else: From bc740d65e3fa266ab706ece33695908fa74e6f50 Mon Sep 17 00:00:00 2001 From: Celeste Zeng <61256376+celeste-zeng@users.noreply.github.com> Date: Wed, 22 Oct 2025 18:54:44 +0000 Subject: [PATCH 09/15] Restore millis_to_timestamp. --- .../nexmark/models/nexmark_model.py | 5 ---- .../benchmarks/nexmark/nexmark_util.py | 23 +++++++++++-------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index d50ac1f19cb6..c16739741407 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -51,11 +51,6 @@ def unnest_to_json(cand): return cand -def millis_to_timestamp(millis: int) -> Timestamp: - micro_second = millis * 1000 - return Timestamp(micros=micro_second) - - class PersonCoder(FastCoder): def to_type_hint(self): return Person diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index c4e2fdea0394..dc9e3721f417 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -45,6 +45,7 @@ from apache_beam.testing.benchmarks.nexmark.models import nexmark_model from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldNames from apache_beam.transforms import window +from apache_beam.utils.timestamp import Timestamp _LOGGER = logging.getLogger(__name__) @@ -154,9 +155,8 @@ def process(self, elem): json_dict[FieldNames.CREDIT_CARD], json_dict[FieldNames.CITY], json_dict[FieldNames.STATE], - nexmark_model.millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), - json_dict[FieldNames.EXTRA], - ) + millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + json_dict[FieldNames.EXTRA]) elif FieldNames.ITEM_NAME in json_dict: if type(json_dict[FieldNames.EXPIRES]) is dict: json_dict[FieldNames.EXPIRES] = json_dict[FieldNames.EXPIRES]['millis'] @@ -166,20 +166,18 @@ def process(self, elem): json_dict[FieldNames.DESCRIPTION], json_dict[FieldNames.INITIAL_BID], json_dict[FieldNames.RESERVE], - nexmark_model.millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), - nexmark_model.millis_to_timestamp(json_dict[FieldNames.EXPIRES]), + millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + millis_to_timestamp(json_dict[FieldNames.EXPIRES]), json_dict[FieldNames.SELLER], json_dict[FieldNames.CATEGORY], - json_dict[FieldNames.EXTRA], - ) + json_dict[FieldNames.EXTRA]) elif FieldNames.AUCTION in json_dict: yield nexmark_model.Bid( json_dict[FieldNames.AUCTION], json_dict[FieldNames.BIDDER], json_dict[FieldNames.PRICE], - nexmark_model.millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), - json_dict[FieldNames.EXTRA], - ) + millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + json_dict[FieldNames.EXTRA]) else: raise ValueError('Invalid event: %s.' % str(json_dict)) @@ -203,6 +201,11 @@ def display(elm): return elm +def millis_to_timestamp(millis: int) -> Timestamp: + micro_second = millis * 1000 + return Timestamp(micros=micro_second) + + def get_counter_metric( result: PipelineResult, namespace: str, name: str) -> int: """ From f1fd0350340b7f1d07db6a5331cdf41765292644 Mon Sep 17 00:00:00 2001 From: Celeste Zeng <61256376+celeste-zeng@users.noreply.github.com> Date: Sat, 31 Jan 2026 00:56:46 +0000 Subject: [PATCH 10/15] Plumb custom batch params and add tests. --- .../java/org/apache/beam/sdk/io/FileIO.java | 76 +++++++++++++-- .../org/apache/beam/sdk/io/FileIOTest.java | 93 ++++++++++++++++++- 2 files changed, 160 insertions(+), 9 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index cfa06f3cf0d5..7679ef299592 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -19,8 +19,8 @@ import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE; import static org.apache.beam.sdk.transforms.Contextful.fn; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import java.io.IOException; @@ -74,10 +74,10 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; +import com.google.common.collect.Lists; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -548,8 +548,9 @@ public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) public MatchConfiguration continuously( Duration interval, TerminationCondition condition, boolean matchUpdatedFiles) { LOG.warn( - "Matching Continuously is stateful, and can scale poorly. Consider using Pub/Sub " - + "Notifications (https://cloud.google.com/storage/docs/pubsub-notifications) if possible"); + "Matching Continuously is stateful, and can scale poorly. Consider using Pub/Sub" + + " Notifications (https://cloud.google.com/storage/docs/pubsub-notifications) if" + + " possible"); return toBuilder() .setWatchInterval(interval) .setWatchTerminationCondition(condition) @@ -1059,6 +1060,12 @@ public static FileNaming relativeFileNaming( abstract @Nullable Integer getMaxNumWritersPerBundle(); + abstract @Nullable Integer getBatchSize(); + + abstract @Nullable Integer getBatchSizeBytes(); + + abstract @Nullable Duration getBatchMaxBufferingDuration(); + abstract @Nullable ErrorHandler getBadRecordErrorHandler(); abstract Builder toBuilder(); @@ -1112,6 +1119,13 @@ abstract Builder setSharding( abstract Builder setMaxNumWritersPerBundle( @Nullable Integer maxNumWritersPerBundle); + abstract Builder setBatchSize(@Nullable Integer batchSize); + + abstract Builder setBatchSizeBytes(@Nullable Integer batchSizeBytes); + + abstract Builder setBatchMaxBufferingDuration( + @Nullable Duration batchMaxBufferingDuration); + abstract Builder setBadRecordErrorHandler( @Nullable ErrorHandler badRecordErrorHandler); @@ -1301,6 +1315,7 @@ public Write withDestinationCoder(Coder desti */ public Write withNumShards(int numShards) { checkArgument(numShards >= 0, "numShards must be non-negative, but was: %s", numShards); + checkArgument(!getAutoSharding(), "Cannot set numShards when withAutoSharding() is used"); if (numShards == 0) { return withNumShards(null); } @@ -1311,6 +1326,7 @@ public Write withNumShards(int numShards) { * Like {@link #withNumShards(int)}. Specifying {@code null} means runner-determined sharding. */ public Write withNumShards(@Nullable ValueProvider numShards) { + checkArgument(!getAutoSharding(), "Cannot set numShards when withAutoSharding() is used"); return toBuilder().setNumShards(numShards).build(); } @@ -1321,6 +1337,7 @@ public Write withNumShards(@Nullable ValueProvider public Write withSharding( PTransform, PCollectionView> sharding) { checkArgument(sharding != null, "sharding can not be null"); + checkArgument(!getAutoSharding(), "Cannot set sharding when withAutoSharding() is used"); return toBuilder().setSharding(sharding).build(); } @@ -1337,6 +1354,9 @@ public Write withIgnoreWindowing() { } public Write withAutoSharding() { + checkArgument( + getNumShards() == null && getSharding() == null, + "Cannot use withAutoSharding() when withNumShards() or withSharding() is set"); return toBuilder().setAutoSharding(true).build(); } @@ -1366,6 +1386,37 @@ public Write withBadRecordErrorHandler( return toBuilder().setBadRecordErrorHandler(errorHandler).build(); } + /** + * Returns a new {@link Write} that will batch the input records using specified batch size. The + * default value is {@link WriteFiles#FILE_TRIGGERING_RECORD_COUNT}. + * + *

This option is used only for writing unbounded data with auto-sharding. + */ + public Write withBatchSize(@Nullable Integer batchSize) { + return toBuilder().setBatchSize(batchSize).build(); + } + + /** + * Returns a new {@link Write} that will batch the input records using specified batch size in + * bytes. The default value is {@link WriteFiles#FILE_TRIGGERING_BYTE_COUNT}. + * + *

This option is used only for writing unbounded data with auto-sharding. + */ + public Write withBatchSizeBytes(@Nullable Integer batchSizeBytes) { + return toBuilder().setBatchSizeBytes(batchSizeBytes).build(); + } + + /** + * Returns a new {@link Write} that will batch the input records using specified max buffering + * duration. The default value is {@link WriteFiles#FILE_TRIGGERING_RECORD_BUFFERING_DURATION}. + * + *

This option is used only for writing unbounded data with auto-sharding. + */ + public Write withBatchMaxBufferingDuration( + @Nullable Duration batchMaxBufferingDuration) { + return toBuilder().setBatchMaxBufferingDuration(batchMaxBufferingDuration).build(); + } + @VisibleForTesting Contextful> resolveFileNamingFn() { if (getDynamic()) { @@ -1482,6 +1533,15 @@ public WriteFilesResult expand(PCollection input) { if (getBadRecordErrorHandler() != null) { writeFiles = writeFiles.withBadRecordErrorHandler(getBadRecordErrorHandler()); } + if (getBatchSize() != null) { + writeFiles = writeFiles.withBatchSize(getBatchSize()); + } + if (getBatchSizeBytes() != null) { + writeFiles = writeFiles.withBatchSizeBytes(getBatchSizeBytes()); + } + if (getBatchMaxBufferingDuration() != null) { + writeFiles = writeFiles.withBatchMaxBufferingDuration(getBatchMaxBufferingDuration()); + } return input.apply(writeFiles); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java index d3c1f6680bee..a3ebfe55e2b5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java @@ -17,13 +17,17 @@ */ package org.apache.beam.sdk.io; +import static com.google.common.base.MoreObjects.firstNonNull; import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import com.google.common.collect.Lists; +import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; @@ -38,7 +42,9 @@ import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.nio.file.attribute.FileTime; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.zip.GZIPOutputStream; @@ -46,6 +52,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; @@ -53,20 +60,26 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesUnboundedPCollections; import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo; import org.apache.beam.sdk.transforms.Contextful; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Requirements; import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.Watch; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; @@ -91,6 +104,11 @@ public class FileIOTest implements Serializable { @Rule public transient Timeout globalTimeout = Timeout.seconds(1200); + private static final int CUSTOM_FILE_TRIGGERING_RECORD_COUNT = 50000; + private static final int CUSTOM_FILE_TRIGGERING_BYTE_COUNT = 32 * 1024 * 1024; // 32MiB + private static final Duration CUSTOM_FILE_TRIGGERING_RECORD_BUFFERING_DURATION = + Duration.standardSeconds(4); + @Test @Category(NeedsRunner.class) public void testMatchAndMatchAll() throws IOException { @@ -547,4 +565,77 @@ public void testFileIoDynamicNaming() throws IOException { "Output file shard 0 exists after pipeline completes", new File(outputFileName + "-0").exists()); } + + @Test + @Category({NeedsRunner.class, UsesUnboundedPCollections.class}) + public void testWriteUnboundedWithCustomBatchParameters() throws IOException { + File root = tmpFolder.getRoot(); + List inputs = Arrays.asList("one", "two", "three", "four", "five", "six"); + + PTransform, PCollection> transform = + Window.into(FixedWindows.of(Duration.standardSeconds(10))) + .triggering(AfterWatermark.pastEndOfWindow()) + .withAllowedLateness(Duration.ZERO) + .discardingFiredPanes(); + + FileIO.Write write = + FileIO.write() + .via(TextIO.sink()) + .to(root.getAbsolutePath()) + .withPrefix("output") + .withSuffix(".txt") + .withAutoSharding() + .withBatchSize(CUSTOM_FILE_TRIGGERING_RECORD_COUNT) + .withBatchSizeBytes(CUSTOM_FILE_TRIGGERING_BYTE_COUNT) + .withBatchMaxBufferingDuration(CUSTOM_FILE_TRIGGERING_RECORD_BUFFERING_DURATION); + + // Prepare timestamps for the elements. + List timestamps = new ArrayList<>(); + for (long i = 0; i < inputs.size(); i++) { + timestamps.add(i + 1); + } + + p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of())) + .setIsBoundedInternal(IsBounded.UNBOUNDED) + .apply(transform) + .apply(write); + p.run().waitUntilFinish(); + + // Verify that the custom batch parameters are set. + assertEquals(CUSTOM_FILE_TRIGGERING_RECORD_COUNT, write.getBatchSize().intValue()); + assertEquals(CUSTOM_FILE_TRIGGERING_BYTE_COUNT, write.getBatchSizeBytes().intValue()); + assertEquals( + CUSTOM_FILE_TRIGGERING_RECORD_BUFFERING_DURATION, write.getBatchMaxBufferingDuration()); + + checkFileContents(root, "output", inputs); + } + + static void checkFileContents(File rootDir, String prefix, List inputs) + throws IOException { + List outputFiles = Lists.newArrayList(); + final String pattern = new File(rootDir, prefix).getAbsolutePath() + "*"; + List metadata = + FileSystems.match(Collections.singletonList(pattern)).get(0).metadata(); + for (Metadata meta : metadata) { + outputFiles.add(new File(meta.resourceId().toString())); + } + assertFalse("Should have produced at least 1 output file", outputFiles.isEmpty()); + + List actual = Lists.newArrayList(); + for (File outputFile : outputFiles) { + List actualShard = Lists.newArrayList(); + try (BufferedReader reader = + Files.newBufferedReader(outputFile.toPath(), StandardCharsets.UTF_8)) { + for (; ; ) { + String line = reader.readLine(); + if (line == null) { + break; + } + actualShard.add(line); + } + } + actual.addAll(actualShard); + } + assertThat(actual, containsInAnyOrder(inputs.toArray())); + } } From 99fbc299e048d6b4d763502eb0d3f2f72305b30d Mon Sep 17 00:00:00 2001 From: Celeste Zeng <61256376+celeste-zeng@users.noreply.github.com> Date: Tue, 3 Feb 2026 08:39:17 +0000 Subject: [PATCH 11/15] Fix formatting and imports. --- .../java/org/apache/beam/sdk/io/FileIO.java | 17 ++++++++--------- .../java/org/apache/beam/sdk/io/FileIOTest.java | 7 ++----- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 7679ef299592..0bdc5fcbd77a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -19,8 +19,8 @@ import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE; import static org.apache.beam.sdk.transforms.Contextful.fn; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import java.io.IOException; @@ -74,10 +74,10 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; -import com.google.common.base.Objects; -import com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -548,9 +548,8 @@ public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) public MatchConfiguration continuously( Duration interval, TerminationCondition condition, boolean matchUpdatedFiles) { LOG.warn( - "Matching Continuously is stateful, and can scale poorly. Consider using Pub/Sub" - + " Notifications (https://cloud.google.com/storage/docs/pubsub-notifications) if" - + " possible"); + "Matching Continuously is stateful, and can scale poorly. Consider using Pub/Sub " + + "Notifications (https://cloud.google.com/storage/docs/pubsub-notifications) if possible"); return toBuilder() .setWatchInterval(interval) .setWatchTerminationCondition(condition) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java index a3ebfe55e2b5..4f91e1cf5382 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java @@ -626,11 +626,8 @@ static void checkFileContents(File rootDir, String prefix, List inputs) List actualShard = Lists.newArrayList(); try (BufferedReader reader = Files.newBufferedReader(outputFile.toPath(), StandardCharsets.UTF_8)) { - for (; ; ) { - String line = reader.readLine(); - if (line == null) { - break; - } + String line; + while ((line = reader.readLine()) != null) { actualShard.add(line); } } From 81fe42e032ce4a716d5a9201c9f5f74a566fe62f Mon Sep 17 00:00:00 2001 From: Celeste Zeng <61256376+celeste-zeng@users.noreply.github.com> Date: Tue, 3 Feb 2026 22:24:04 +0000 Subject: [PATCH 12/15] Fix imports and test. --- .../org/apache/beam/sdk/io/FileIOTest.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java index 4f91e1cf5382..b7d7184dea4c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java @@ -17,8 +17,8 @@ */ package org.apache.beam.sdk.io; -import static com.google.common.base.MoreObjects.firstNonNull; import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.isA; @@ -26,7 +26,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import com.google.common.collect.Lists; import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; @@ -104,11 +103,6 @@ public class FileIOTest implements Serializable { @Rule public transient Timeout globalTimeout = Timeout.seconds(1200); - private static final int CUSTOM_FILE_TRIGGERING_RECORD_COUNT = 50000; - private static final int CUSTOM_FILE_TRIGGERING_BYTE_COUNT = 32 * 1024 * 1024; // 32MiB - private static final Duration CUSTOM_FILE_TRIGGERING_RECORD_BUFFERING_DURATION = - Duration.standardSeconds(4); - @Test @Category(NeedsRunner.class) public void testMatchAndMatchAll() throws IOException { @@ -585,9 +579,10 @@ public void testWriteUnboundedWithCustomBatchParameters() throws IOException { .withPrefix("output") .withSuffix(".txt") .withAutoSharding() - .withBatchSize(CUSTOM_FILE_TRIGGERING_RECORD_COUNT) - .withBatchSizeBytes(CUSTOM_FILE_TRIGGERING_BYTE_COUNT) - .withBatchMaxBufferingDuration(CUSTOM_FILE_TRIGGERING_RECORD_BUFFERING_DURATION); + .withBatchSize(3) + .withBatchSizeBytes(1024 * 1024) // Set high to avoid triggering flushing. + .withBatchMaxBufferingDuration( + Duration.standardMinutes(1)); // Set high to avoid triggering flushing. // Prepare timestamps for the elements. List timestamps = new ArrayList<>(); @@ -602,12 +597,19 @@ public void testWriteUnboundedWithCustomBatchParameters() throws IOException { p.run().waitUntilFinish(); // Verify that the custom batch parameters are set. - assertEquals(CUSTOM_FILE_TRIGGERING_RECORD_COUNT, write.getBatchSize().intValue()); - assertEquals(CUSTOM_FILE_TRIGGERING_BYTE_COUNT, write.getBatchSizeBytes().intValue()); - assertEquals( - CUSTOM_FILE_TRIGGERING_RECORD_BUFFERING_DURATION, write.getBatchMaxBufferingDuration()); + assertEquals(3, write.getBatchSize().intValue()); + assertEquals(1024 * 1024, write.getBatchSizeBytes().intValue()); + assertEquals(Duration.standardMinutes(1), write.getBatchMaxBufferingDuration()); + // Verify file contents. checkFileContents(root, "output", inputs); + + // With auto-sharding, we can't assert on the exact number of output files, but because + // batch size is 3 and there are 6 elements, we expect at least 2 files. + final String pattern = new File(root, "output").getAbsolutePath() + "*"; + List metadata = + FileSystems.match(Collections.singletonList(pattern)).get(0).metadata(); + assertTrue(metadata.size() >= 2); } static void checkFileContents(File rootDir, String prefix, List inputs) From b9fb646e04d9f300106c34a981387d209721e461 Mon Sep 17 00:00:00 2001 From: Celeste Zeng <61256376+celeste-zeng@users.noreply.github.com> Date: Tue, 3 Feb 2026 22:35:43 +0000 Subject: [PATCH 13/15] Add missing import. --- .../core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java index b7d7184dea4c..f360d3396dc9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java @@ -82,6 +82,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; From 40d7c0f4ab41852971771ba813d5943f4690139c Mon Sep 17 00:00:00 2001 From: Celeste Zeng <61256376+celeste-zeng@users.noreply.github.com> Date: Wed, 4 Feb 2026 21:03:33 +0000 Subject: [PATCH 14/15] Add another test case for byte count. --- .../org/apache/beam/sdk/io/FileIOTest.java | 54 +++++++++++++++++-- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java index f360d3396dc9..dffc4943bfab 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java @@ -563,7 +563,7 @@ public void testFileIoDynamicNaming() throws IOException { @Test @Category({NeedsRunner.class, UsesUnboundedPCollections.class}) - public void testWriteUnboundedWithCustomBatchParameters() throws IOException { + public void testWriteUnboundedWithCustomBatchSize() throws IOException { File root = tmpFolder.getRoot(); List inputs = Arrays.asList("one", "two", "three", "four", "five", "six"); @@ -581,9 +581,9 @@ public void testWriteUnboundedWithCustomBatchParameters() throws IOException { .withSuffix(".txt") .withAutoSharding() .withBatchSize(3) - .withBatchSizeBytes(1024 * 1024) // Set high to avoid triggering flushing. + .withBatchSizeBytes(1024 * 1024) // Set high to avoid triggering flushing by byte count. .withBatchMaxBufferingDuration( - Duration.standardMinutes(1)); // Set high to avoid triggering flushing. + Duration.standardMinutes(1)); // Set high to avoid triggering flushing by duration. // Prepare timestamps for the elements. List timestamps = new ArrayList<>(); @@ -613,6 +613,54 @@ public void testWriteUnboundedWithCustomBatchParameters() throws IOException { assertTrue(metadata.size() >= 2); } + @Test + @Category({NeedsRunner.class, UsesUnboundedPCollections.class}) + public void testWriteUnboundedWithCustomBatchSizeBytes() throws IOException { + File root = tmpFolder.getRoot(); + // The elements plus newline characters give a total of 4+4+6+5+5+4=28 bytes. + List inputs = Arrays.asList("one", "two", "three", "four", "five", "six"); + // Assign timestamps so that all elements fall into the same 10s window. + List timestamps = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L); + + FileIO.Write write = + FileIO.write() + .via(TextIO.sink()) + .to(root.getAbsolutePath()) + .withPrefix("output") + .withSuffix(".txt") + .withAutoSharding() + .withBatchSize(1000) // Set high to avoid flushing by record count. + .withBatchSizeBytes(10) + .withBatchMaxBufferingDuration( + Duration.standardMinutes(1)); // Set high to avoid flushing by duration. + + p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of())) + .setIsBoundedInternal(IsBounded.UNBOUNDED) + .apply( + Window.into(FixedWindows.of(Duration.standardSeconds(10))) + .triggering(AfterWatermark.pastEndOfWindow()) + .withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()) + .apply(write); + + p.run().waitUntilFinish(); + + // Verify that the custom batch parameters are set. + assertEquals(1000, write.getBatchSize().intValue()); + assertEquals(10, write.getBatchSizeBytes().intValue()); + assertEquals(Duration.standardMinutes(1), write.getBatchMaxBufferingDuration()); + checkFileContents(root, "output", inputs); + + // With auto-sharding, we cannot assert on the exact number of output files. The BatchSizeBytes + // acts as a threshold for flushing; once buffer size reaches 10 bytes, a flush is triggered, + // but more items may be added before it completes. With 28 bytes total, we can only guarantee + // at least 2 files are produced. + final String pattern = new File(root, "output").getAbsolutePath() + "*"; + List metadata = + FileSystems.match(Collections.singletonList(pattern)).get(0).metadata(); + assertTrue(metadata.size() >= 2); + } + static void checkFileContents(File rootDir, String prefix, List inputs) throws IOException { List outputFiles = Lists.newArrayList(); From c342f81494931ca54130d05ab375003e1c66d57b Mon Sep 17 00:00:00 2001 From: Celeste Zeng <61256376+celeste-zeng@users.noreply.github.com> Date: Wed, 4 Feb 2026 22:04:34 +0000 Subject: [PATCH 15/15] Added checks for positive values. --- .../core/src/main/java/org/apache/beam/sdk/io/FileIO.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 0bdc5fcbd77a..5c9e19da160e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -1392,6 +1392,7 @@ public Write withBadRecordErrorHandler( *

This option is used only for writing unbounded data with auto-sharding. */ public Write withBatchSize(@Nullable Integer batchSize) { + checkArgument(batchSize > 0, "batchSize must be positive, but was: %s", batchSize); return toBuilder().setBatchSize(batchSize).build(); } @@ -1402,6 +1403,8 @@ public Write withBatchSize(@Nullable Integer batchSize) { *

This option is used only for writing unbounded data with auto-sharding. */ public Write withBatchSizeBytes(@Nullable Integer batchSizeBytes) { + checkArgument( + batchSizeBytes > 0, "batchSizeBytes must be positive, but was: %s", batchSizeBytes); return toBuilder().setBatchSizeBytes(batchSizeBytes).build(); } @@ -1413,6 +1416,10 @@ public Write withBatchSizeBytes(@Nullable Integer batchSize */ public Write withBatchMaxBufferingDuration( @Nullable Duration batchMaxBufferingDuration) { + checkArgument( + batchMaxBufferingDuration.isLongerThan(Duration.ZERO), + "batchMaxBufferingDuration must be positive, but was: %s", + batchMaxBufferingDuration); return toBuilder().setBatchMaxBufferingDuration(batchMaxBufferingDuration).build(); }