From f71893a1ba2f46cbd878da66803a09f8f6df593b Mon Sep 17 00:00:00 2001 From: RushabhRatnaparkhi Date: Sat, 24 Jan 2026 00:46:07 +0530 Subject: [PATCH 1/5] Fix TriggerExample BigQuery write disposition --- .../org/apache/beam/examples/cookbook/TriggerExample.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java index cd3c6dd84157..ca65a83931e7 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java @@ -486,7 +486,10 @@ static void runTriggerExample(TrafficFlowOptions options) throws IOException { .get(i) .apply( "BigQuery Write Rows" + i, - BigQueryIO.writeTableRows().to(tableRef).withSchema(getSchema())); + BigQueryIO.writeTableRows() + .to(tableRef) + .withSchema(getSchema()) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); } PipelineResult result = pipeline.run(); From 525d9bf934763f6d6d9a0ee15cc9f4d55cf815d2 Mon Sep 17 00:00:00 2001 From: RushabhRatnaparkhi Date: Sat, 24 Jan 2026 23:11:41 +0530 Subject: [PATCH 2/5] Apply Spotless formatting to TriggerExample.java --- .../org/apache/beam/examples/cookbook/TriggerExample.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java index ca65a83931e7..64a4f4d73d32 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java @@ -487,9 +487,9 @@ static void runTriggerExample(TrafficFlowOptions options) throws IOException { .apply( "BigQuery Write Rows" + i, BigQueryIO.writeTableRows() - .to(tableRef) - .withSchema(getSchema()) - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); + .to(tableRef) + .withSchema(getSchema()) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); } PipelineResult result = pipeline.run(); From 2cee81c0dca304e0862bdd7c48e8c2d9f3decff2 Mon Sep 17 00:00:00 2001 From: RushabhRatnaparkhi Date: Sat, 31 Jan 2026 22:49:06 +0530 Subject: [PATCH 3/5] Fix GCSFileSystem to lazily load gcsio dependency Ensure FileSystems.get_filesystem(gs://) does not require the GCP extra at lookup time, aligning behavior with S3FileSystem. Import errors are raised only when GCS operations are used. --- .../apache_beam/io/gcp/gcsfilesystem.py | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 3763e21abc9f..a5905e6bc6dc 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -34,7 +34,7 @@ from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystem import FileMetadata from apache_beam.io.filesystem import FileSystem -from apache_beam.io.gcp import gcsio + __all__ = ['GCSFileSystem'] @@ -42,8 +42,24 @@ class GCSFileSystem(FileSystem): """A GCS ``FileSystem`` implementation for accessing files on GCS. """ - - CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations + + def _get_gcsio(self): + try: + from apache_beam.io.gcp import gcsio + return gcsio + except ImportError: + raise ImportError( + 'GCSFileSystem requires apache-beam[gcp]. ' + 'Install it with: pip install apache-beam[gcp]' + ) + + _CHUNK_SIZE = None + + @property + def CHUNK_SIZE(self): + if self._CHUNK_SIZE is None: + self._CHUNK_SIZE = self._get_gcsio().MAX_BATCH_OPERATION_SIZE + return self._CHUNK_SIZE # Chuck size in batch operations GCS_PREFIX = 'gs://' def __init__(self, pipeline_options): @@ -139,6 +155,7 @@ def _list(self, dir_or_prefix): raise BeamIOError("List operation failed", {dir_or_prefix: e}) def _gcsIO(self): + gcsio = self._get_gcsio() return gcsio.GcsIO(pipeline_options=self._pipeline_options) def _path_open( @@ -370,6 +387,7 @@ def delete(self, paths): def report_lineage(self, path, lineage): try: + gcsio = self._get_gcsio() components = gcsio.parse_gcs_path(path, object_optional=True) except ValueError: # report lineage is fail-safe From 1ecf7f2e05940dc0f8fa346abea2d76209bfa30a Mon Sep 17 00:00:00 2001 From: RushabhRatnaparkhi Date: Tue, 3 Feb 2026 11:46:33 +0530 Subject: [PATCH 4/5] Make GCS filesystem lookup lazy Align GCSFileSystem behavior with S3 by deferring GCP dependency validation until filesystem usage instead of lookup time. Adds a regression test for lazy lookup. --- .../apache_beam/io/gcp/gcsfilesystem_test.py | 120 ++++++++++++------ 1 file changed, 78 insertions(+), 42 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index 08fdd6302887..fd05db0b0ae8 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -28,6 +28,7 @@ from apache_beam.io.filesystem import BeamIOError from apache_beam.io.filesystem import FileMetadata from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.io.filesystems import FileSystems # Protect against environments where apitools library is not available. # pylint: disable=wrong-import-order, wrong-import-position @@ -38,6 +39,13 @@ # pylint: enable=wrong-import-order, wrong-import-position +class GCSFileSystemLazyLoadTest(unittest.TestCase): + + def test_get_filesystem_does_not_require_gcp_extra(self): + fs = FileSystems.get_filesystem('gs://test-bucket/path') + self.assertEqual(fs.scheme(), 'gs') + + @unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed') class GCSFileSystemTest(unittest.TestCase): def setUp(self): @@ -77,11 +85,13 @@ def test_split(self): with self.assertRaises(ValueError): self.fs.split('/no/gcs/prefix') - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_match_multiples(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_match_multiples(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module gcsio_mock.list_files.return_value = iter([ ('gs://bucket/file1', (1, 99999.0)), ('gs://bucket/file2', (2, 88888.0)) @@ -95,12 +105,14 @@ def test_match_multiples(self, mock_gcsio): gcsio_mock.list_files.assert_called_once_with( 'gs://bucket/', with_metadata=True) - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_match_multiples_limit(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_match_multiples_limit(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() limit = 1 - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module gcsio_mock.list_files.return_value = iter([ ('gs://bucket/file1', (1, 99999.0)) ]) @@ -111,11 +123,13 @@ def test_match_multiples_limit(self, mock_gcsio): gcsio_mock.list_files.assert_called_once_with( 'gs://bucket/', with_metadata=True) - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_match_multiples_error(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_match_multiples_error(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module exception = IOError('Failed') gcsio_mock.list_files.side_effect = exception @@ -127,11 +141,13 @@ def test_match_multiples_error(self, mock_gcsio): gcsio_mock.list_files.assert_called_once_with( 'gs://bucket/', with_metadata=True) - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_match_multiple_patterns(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_match_multiple_patterns(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module gcsio_mock.list_files.side_effect = [ iter([('gs://bucket/file1', (1, 99999.0))]), iter([('gs://bucket/file2', (2, 88888.0))]), @@ -141,33 +157,39 @@ def test_match_multiple_patterns(self, mock_gcsio): result = self.fs.match(['gs://bucket/file1*', 'gs://bucket/file2*']) self.assertEqual([mr.metadata_list for mr in result], expected_results) - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_create(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_create(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module # Issue file copy _ = self.fs.create('gs://bucket/from1', 'application/octet-stream') gcsio_mock.open.assert_called_once_with( 'gs://bucket/from1', 'wb', mime_type='application/octet-stream') - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_open(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_open(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module # Issue file copy _ = self.fs.open('gs://bucket/from1', 'application/octet-stream') gcsio_mock.open.assert_called_once_with( 'gs://bucket/from1', 'rb', mime_type='application/octet-stream') - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_copy_file(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_copy_file(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module sources = ['gs://bucket/from1'] destinations = ['gs://bucket/to1'] @@ -177,11 +199,13 @@ def test_copy_file(self, mock_gcsio): gcsio_mock.copy.assert_called_once_with( 'gs://bucket/from1', 'gs://bucket/to1') - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_copy_file_error(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_copy_file_error(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module sources = ['gs://bucket/from1'] destinations = ['gs://bucket/to1'] @@ -201,11 +225,13 @@ def test_copy_file_error(self, mock_gcsio): gcsio_mock.copy.assert_called_once_with( 'gs://bucket/from1', 'gs://bucket/to1') - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_copy_tree(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_copy_tree(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module sources = ['gs://bucket1/'] destinations = ['gs://bucket2/'] @@ -215,11 +241,13 @@ def test_copy_tree(self, mock_gcsio): gcsio_mock.copytree.assert_called_once_with( 'gs://bucket1/', 'gs://bucket2/') - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_rename(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_rename(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module sources = [ 'gs://bucket/from1', 'gs://bucket/from2', @@ -255,11 +283,13 @@ def test_rename(self, mock_gcsio): 'gs://bucket/from3', ]) - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_rename_delete_error(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_rename_delete_error(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module sources = [ 'gs://bucket/from1', 'gs://bucket/from2', @@ -296,11 +326,13 @@ def test_rename_delete_error(self, mock_gcsio): 'gs://bucket/from3', ]) - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_rename_copy_error(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_rename_copy_error(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module sources = [ 'gs://bucket/from1', 'gs://bucket/from2', @@ -335,11 +367,13 @@ def test_rename_copy_error(self, mock_gcsio): 'gs://bucket/from3', ]) - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_delete(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_delete(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module gcsio_mock._status.return_value = {'size': 0, 'updated': 99999.0} files = [ 'gs://bucket/from1', @@ -351,11 +385,13 @@ def test_delete(self, mock_gcsio): self.fs.delete(files) gcsio_mock.delete_batch.assert_called() - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_delete_error(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_delete_error(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module gcsio_mock._status.return_value = {'size': 0, 'updated': 99999.0} files = [ From 3a66fff9b26451273052cc325f5aa1ba576979d2 Mon Sep 17 00:00:00 2001 From: RushabhRatnaparkhi Date: Wed, 4 Feb 2026 13:50:03 +0530 Subject: [PATCH 5/5] Fix Python formatting and lint issues --- sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 8 +++----- sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py | 1 - 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index a5905e6bc6dc..4cf4c560b680 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -35,23 +35,20 @@ from apache_beam.io.filesystem import FileMetadata from apache_beam.io.filesystem import FileSystem - __all__ = ['GCSFileSystem'] class GCSFileSystem(FileSystem): """A GCS ``FileSystem`` implementation for accessing files on GCS. """ - def _get_gcsio(self): try: from apache_beam.io.gcp import gcsio return gcsio except ImportError: raise ImportError( - 'GCSFileSystem requires apache-beam[gcp]. ' - 'Install it with: pip install apache-beam[gcp]' - ) + 'GCSFileSystem requires apache-beam[gcp]. ' + 'Install it with: pip install apache-beam[gcp]') _CHUNK_SIZE = None @@ -60,6 +57,7 @@ def CHUNK_SIZE(self): if self._CHUNK_SIZE is None: self._CHUNK_SIZE = self._get_gcsio().MAX_BATCH_OPERATION_SIZE return self._CHUNK_SIZE # Chuck size in batch operations + GCS_PREFIX = 'gs://' def __init__(self, pipeline_options): diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index fd05db0b0ae8..66bb9bd48dd3 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -40,7 +40,6 @@ class GCSFileSystemLazyLoadTest(unittest.TestCase): - def test_get_filesystem_does_not_require_gcp_extra(self): fs = FileSystems.get_filesystem('gs://test-bucket/path') self.assertEqual(fs.scheme(), 'gs')