Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,10 @@ static void runTriggerExample(TrafficFlowOptions options) throws IOException {
.get(i)
.apply(
"BigQuery Write Rows" + i,
BigQueryIO.writeTableRows().to(tableRef).withSchema(getSchema()));
BigQueryIO.writeTableRows()
.to(tableRef)
.withSchema(getSchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Comment on lines +489 to +492
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Java file change appears unrelated to the Python GCS filesystem lazy loading feature described in the PR. The change adds a write disposition to a BigQuery write operation in a Java example, which has no connection to making GCS filesystem lookup lazy in Python. This should likely be in a separate PR.

Copilot uses AI. Check for mistakes.
}

PipelineResult result = pipeline.run();
Expand Down
20 changes: 18 additions & 2 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,30 @@
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp import gcsio

__all__ = ['GCSFileSystem']


class GCSFileSystem(FileSystem):
"""A GCS ``FileSystem`` implementation for accessing files on GCS.
"""
def _get_gcsio(self):
try:
from apache_beam.io.gcp import gcsio
return gcsio
except ImportError:
raise ImportError(
'GCSFileSystem requires apache-beam[gcp]. '
'Install it with: pip install apache-beam[gcp]')

_CHUNK_SIZE = None

@property
def CHUNK_SIZE(self):
if self._CHUNK_SIZE is None:
self._CHUNK_SIZE = self._get_gcsio().MAX_BATCH_OPERATION_SIZE
return self._CHUNK_SIZE # Chuck size in batch operations
Comment on lines +53 to +59
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_CHUNK_SIZE is defined as a class variable (shared across all instances), but it's being used as an instance variable in the property getter. This creates a subtle bug where if one instance initializes _CHUNK_SIZE, all other instances will see it as initialized, even if they haven't accessed the gcsio module yet. Consider making this a true instance variable by initializing it in __init__ as self._chunk_size = None and updating the property accordingly.

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in comment: "Chuck size" should be "Chunk size".

Suggested change
return self._CHUNK_SIZE # Chuck size in batch operations
return self._CHUNK_SIZE # Chunk size in batch operations

Copilot uses AI. Check for mistakes.

CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations
GCS_PREFIX = 'gs://'

def __init__(self, pipeline_options):
Expand Down Expand Up @@ -139,6 +153,7 @@ def _list(self, dir_or_prefix):
raise BeamIOError("List operation failed", {dir_or_prefix: e})

def _gcsIO(self):
gcsio = self._get_gcsio()
return gcsio.GcsIO(pipeline_options=self._pipeline_options)

def _path_open(
Expand Down Expand Up @@ -370,6 +385,7 @@ def delete(self, paths):

def report_lineage(self, path, lineage):
try:
gcsio = self._get_gcsio()
components = gcsio.parse_gcs_path(path, object_optional=True)
except ValueError:
# report lineage is fail-safe
Expand Down
119 changes: 77 additions & 42 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import FileMetadata
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.filesystems import FileSystems

# Protect against environments where apitools library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
Expand All @@ -38,6 +39,12 @@
# pylint: enable=wrong-import-order, wrong-import-position


class GCSFileSystemLazyLoadTest(unittest.TestCase):
def test_get_filesystem_does_not_require_gcp_extra(self):
fs = FileSystems.get_filesystem('gs://test-bucket/path')
self.assertEqual(fs.scheme(), 'gs')
Comment on lines +43 to +45
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test validates that get_filesystem() succeeds without GCP extras, but it doesn't verify the second part of the lazy loading behavior: that actual filesystem operations (like open(), match(), etc.) properly raise an ImportError when GCP dependencies are missing. Consider adding a test that verifies an ImportError is raised when attempting to use the filesystem without GCP dependencies installed, to ensure the lazy loading works as intended.

Copilot uses AI. Check for mistakes.


@unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed')
class GCSFileSystemTest(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -77,11 +84,13 @@ def test_split(self):
with self.assertRaises(ValueError):
self.fs.split('/no/gcs/prefix')

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_match_multiples(self, mock_gcsio):
@mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio')
def test_match_multiples(self, mock_get_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
mock_gcsio_module = mock.MagicMock()
mock_gcsio_module.GcsIO.return_value = gcsio_mock
mock_get_gcsio.return_value = mock_gcsio_module
gcsio_mock.list_files.return_value = iter([
('gs://bucket/file1', (1, 99999.0)),
('gs://bucket/file2', (2, 88888.0))
Expand All @@ -95,12 +104,14 @@ def test_match_multiples(self, mock_gcsio):
gcsio_mock.list_files.assert_called_once_with(
'gs://bucket/', with_metadata=True)

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_match_multiples_limit(self, mock_gcsio):
@mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio')
def test_match_multiples_limit(self, mock_get_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
limit = 1
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
mock_gcsio_module = mock.MagicMock()
mock_gcsio_module.GcsIO.return_value = gcsio_mock
mock_get_gcsio.return_value = mock_gcsio_module
gcsio_mock.list_files.return_value = iter([
('gs://bucket/file1', (1, 99999.0))
])
Expand All @@ -111,11 +122,13 @@ def test_match_multiples_limit(self, mock_gcsio):
gcsio_mock.list_files.assert_called_once_with(
'gs://bucket/', with_metadata=True)

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_match_multiples_error(self, mock_gcsio):
@mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio')
def test_match_multiples_error(self, mock_get_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
mock_gcsio_module = mock.MagicMock()
mock_gcsio_module.GcsIO.return_value = gcsio_mock
mock_get_gcsio.return_value = mock_gcsio_module
exception = IOError('Failed')
gcsio_mock.list_files.side_effect = exception

Expand All @@ -127,11 +140,13 @@ def test_match_multiples_error(self, mock_gcsio):
gcsio_mock.list_files.assert_called_once_with(
'gs://bucket/', with_metadata=True)

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_match_multiple_patterns(self, mock_gcsio):
@mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio')
def test_match_multiple_patterns(self, mock_get_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
mock_gcsio_module = mock.MagicMock()
mock_gcsio_module.GcsIO.return_value = gcsio_mock
mock_get_gcsio.return_value = mock_gcsio_module
gcsio_mock.list_files.side_effect = [
iter([('gs://bucket/file1', (1, 99999.0))]),
iter([('gs://bucket/file2', (2, 88888.0))]),
Expand All @@ -141,33 +156,39 @@ def test_match_multiple_patterns(self, mock_gcsio):
result = self.fs.match(['gs://bucket/file1*', 'gs://bucket/file2*'])
self.assertEqual([mr.metadata_list for mr in result], expected_results)

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_create(self, mock_gcsio):
@mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio')
def test_create(self, mock_get_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
mock_gcsio_module = mock.MagicMock()
mock_gcsio_module.GcsIO.return_value = gcsio_mock
mock_get_gcsio.return_value = mock_gcsio_module
# Issue file copy
_ = self.fs.create('gs://bucket/from1', 'application/octet-stream')

gcsio_mock.open.assert_called_once_with(
'gs://bucket/from1', 'wb', mime_type='application/octet-stream')

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_open(self, mock_gcsio):
@mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio')
def test_open(self, mock_get_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
mock_gcsio_module = mock.MagicMock()
mock_gcsio_module.GcsIO.return_value = gcsio_mock
mock_get_gcsio.return_value = mock_gcsio_module
# Issue file copy
_ = self.fs.open('gs://bucket/from1', 'application/octet-stream')

gcsio_mock.open.assert_called_once_with(
'gs://bucket/from1', 'rb', mime_type='application/octet-stream')

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_copy_file(self, mock_gcsio):
@mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio')
def test_copy_file(self, mock_get_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
mock_gcsio_module = mock.MagicMock()
mock_gcsio_module.GcsIO.return_value = gcsio_mock
mock_get_gcsio.return_value = mock_gcsio_module
sources = ['gs://bucket/from1']
destinations = ['gs://bucket/to1']

Expand All @@ -177,11 +198,13 @@ def test_copy_file(self, mock_gcsio):
gcsio_mock.copy.assert_called_once_with(
'gs://bucket/from1', 'gs://bucket/to1')

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_copy_file_error(self, mock_gcsio):
@mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio')
def test_copy_file_error(self, mock_get_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
mock_gcsio_module = mock.MagicMock()
mock_gcsio_module.GcsIO.return_value = gcsio_mock
mock_get_gcsio.return_value = mock_gcsio_module
sources = ['gs://bucket/from1']
destinations = ['gs://bucket/to1']

Expand All @@ -201,11 +224,13 @@ def test_copy_file_error(self, mock_gcsio):
gcsio_mock.copy.assert_called_once_with(
'gs://bucket/from1', 'gs://bucket/to1')

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_copy_tree(self, mock_gcsio):
@mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio')
def test_copy_tree(self, mock_get_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
mock_gcsio_module = mock.MagicMock()
mock_gcsio_module.GcsIO.return_value = gcsio_mock
mock_get_gcsio.return_value = mock_gcsio_module
sources = ['gs://bucket1/']
destinations = ['gs://bucket2/']

Expand All @@ -215,11 +240,13 @@ def test_copy_tree(self, mock_gcsio):
gcsio_mock.copytree.assert_called_once_with(
'gs://bucket1/', 'gs://bucket2/')

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_rename(self, mock_gcsio):
@mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio')
def test_rename(self, mock_get_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
mock_gcsio_module = mock.MagicMock()
mock_gcsio_module.GcsIO.return_value = gcsio_mock
mock_get_gcsio.return_value = mock_gcsio_module
sources = [
'gs://bucket/from1',
'gs://bucket/from2',
Expand Down Expand Up @@ -255,11 +282,13 @@ def test_rename(self, mock_gcsio):
'gs://bucket/from3',
])

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_rename_delete_error(self, mock_gcsio):
@mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio')
def test_rename_delete_error(self, mock_get_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
mock_gcsio_module = mock.MagicMock()
mock_gcsio_module.GcsIO.return_value = gcsio_mock
mock_get_gcsio.return_value = mock_gcsio_module
sources = [
'gs://bucket/from1',
'gs://bucket/from2',
Expand Down Expand Up @@ -296,11 +325,13 @@ def test_rename_delete_error(self, mock_gcsio):
'gs://bucket/from3',
])

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_rename_copy_error(self, mock_gcsio):
@mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio')
def test_rename_copy_error(self, mock_get_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
mock_gcsio_module = mock.MagicMock()
mock_gcsio_module.GcsIO.return_value = gcsio_mock
mock_get_gcsio.return_value = mock_gcsio_module
sources = [
'gs://bucket/from1',
'gs://bucket/from2',
Expand Down Expand Up @@ -335,11 +366,13 @@ def test_rename_copy_error(self, mock_gcsio):
'gs://bucket/from3',
])

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_delete(self, mock_gcsio):
@mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio')
def test_delete(self, mock_get_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
mock_gcsio_module = mock.MagicMock()
mock_gcsio_module.GcsIO.return_value = gcsio_mock
mock_get_gcsio.return_value = mock_gcsio_module
gcsio_mock._status.return_value = {'size': 0, 'updated': 99999.0}
files = [
'gs://bucket/from1',
Expand All @@ -351,11 +384,13 @@ def test_delete(self, mock_gcsio):
self.fs.delete(files)
gcsio_mock.delete_batch.assert_called()

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_delete_error(self, mock_gcsio):
@mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio')
def test_delete_error(self, mock_get_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
mock_gcsio_module = mock.MagicMock()
mock_gcsio_module.GcsIO.return_value = gcsio_mock
mock_get_gcsio.return_value = mock_gcsio_module

gcsio_mock._status.return_value = {'size': 0, 'updated': 99999.0}
files = [
Expand Down
Loading