diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java index cd3c6dd84157..64a4f4d73d32 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java @@ -486,7 +486,10 @@ static void runTriggerExample(TrafficFlowOptions options) throws IOException { .get(i) .apply( "BigQuery Write Rows" + i, - BigQueryIO.writeTableRows().to(tableRef).withSchema(getSchema())); + BigQueryIO.writeTableRows() + .to(tableRef) + .withSchema(getSchema()) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); } PipelineResult result = pipeline.run(); diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 3763e21abc9f..4cf4c560b680 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -34,7 +34,6 @@ from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystem import FileMetadata from apache_beam.io.filesystem import FileSystem -from apache_beam.io.gcp import gcsio __all__ = ['GCSFileSystem'] @@ -42,8 +41,23 @@ class GCSFileSystem(FileSystem): """A GCS ``FileSystem`` implementation for accessing files on GCS. """ + def _get_gcsio(self): + try: + from apache_beam.io.gcp import gcsio + return gcsio + except ImportError: + raise ImportError( + 'GCSFileSystem requires apache-beam[gcp]. ' + 'Install it with: pip install apache-beam[gcp]') + + _CHUNK_SIZE = None + + @property + def CHUNK_SIZE(self): + if self._CHUNK_SIZE is None: + self._CHUNK_SIZE = self._get_gcsio().MAX_BATCH_OPERATION_SIZE + return self._CHUNK_SIZE # Chuck size in batch operations - CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations GCS_PREFIX = 'gs://' def __init__(self, pipeline_options): @@ -139,6 +153,7 @@ def _list(self, dir_or_prefix): raise BeamIOError("List operation failed", {dir_or_prefix: e}) def _gcsIO(self): + gcsio = self._get_gcsio() return gcsio.GcsIO(pipeline_options=self._pipeline_options) def _path_open( @@ -370,6 +385,7 @@ def delete(self, paths): def report_lineage(self, path, lineage): try: + gcsio = self._get_gcsio() components = gcsio.parse_gcs_path(path, object_optional=True) except ValueError: # report lineage is fail-safe diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index 08fdd6302887..66bb9bd48dd3 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -28,6 +28,7 @@ from apache_beam.io.filesystem import BeamIOError from apache_beam.io.filesystem import FileMetadata from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.io.filesystems import FileSystems # Protect against environments where apitools library is not available. # pylint: disable=wrong-import-order, wrong-import-position @@ -38,6 +39,12 @@ # pylint: enable=wrong-import-order, wrong-import-position +class GCSFileSystemLazyLoadTest(unittest.TestCase): + def test_get_filesystem_does_not_require_gcp_extra(self): + fs = FileSystems.get_filesystem('gs://test-bucket/path') + self.assertEqual(fs.scheme(), 'gs') + + @unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed') class GCSFileSystemTest(unittest.TestCase): def setUp(self): @@ -77,11 +84,13 @@ def test_split(self): with self.assertRaises(ValueError): self.fs.split('/no/gcs/prefix') - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_match_multiples(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_match_multiples(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module gcsio_mock.list_files.return_value = iter([ ('gs://bucket/file1', (1, 99999.0)), ('gs://bucket/file2', (2, 88888.0)) @@ -95,12 +104,14 @@ def test_match_multiples(self, mock_gcsio): gcsio_mock.list_files.assert_called_once_with( 'gs://bucket/', with_metadata=True) - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_match_multiples_limit(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_match_multiples_limit(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() limit = 1 - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module gcsio_mock.list_files.return_value = iter([ ('gs://bucket/file1', (1, 99999.0)) ]) @@ -111,11 +122,13 @@ def test_match_multiples_limit(self, mock_gcsio): gcsio_mock.list_files.assert_called_once_with( 'gs://bucket/', with_metadata=True) - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_match_multiples_error(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_match_multiples_error(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module exception = IOError('Failed') gcsio_mock.list_files.side_effect = exception @@ -127,11 +140,13 @@ def test_match_multiples_error(self, mock_gcsio): gcsio_mock.list_files.assert_called_once_with( 'gs://bucket/', with_metadata=True) - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_match_multiple_patterns(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_match_multiple_patterns(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module gcsio_mock.list_files.side_effect = [ iter([('gs://bucket/file1', (1, 99999.0))]), iter([('gs://bucket/file2', (2, 88888.0))]), @@ -141,33 +156,39 @@ def test_match_multiple_patterns(self, mock_gcsio): result = self.fs.match(['gs://bucket/file1*', 'gs://bucket/file2*']) self.assertEqual([mr.metadata_list for mr in result], expected_results) - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_create(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_create(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module # Issue file copy _ = self.fs.create('gs://bucket/from1', 'application/octet-stream') gcsio_mock.open.assert_called_once_with( 'gs://bucket/from1', 'wb', mime_type='application/octet-stream') - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_open(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_open(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module # Issue file copy _ = self.fs.open('gs://bucket/from1', 'application/octet-stream') gcsio_mock.open.assert_called_once_with( 'gs://bucket/from1', 'rb', mime_type='application/octet-stream') - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_copy_file(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_copy_file(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module sources = ['gs://bucket/from1'] destinations = ['gs://bucket/to1'] @@ -177,11 +198,13 @@ def test_copy_file(self, mock_gcsio): gcsio_mock.copy.assert_called_once_with( 'gs://bucket/from1', 'gs://bucket/to1') - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_copy_file_error(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_copy_file_error(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module sources = ['gs://bucket/from1'] destinations = ['gs://bucket/to1'] @@ -201,11 +224,13 @@ def test_copy_file_error(self, mock_gcsio): gcsio_mock.copy.assert_called_once_with( 'gs://bucket/from1', 'gs://bucket/to1') - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_copy_tree(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_copy_tree(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module sources = ['gs://bucket1/'] destinations = ['gs://bucket2/'] @@ -215,11 +240,13 @@ def test_copy_tree(self, mock_gcsio): gcsio_mock.copytree.assert_called_once_with( 'gs://bucket1/', 'gs://bucket2/') - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_rename(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_rename(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module sources = [ 'gs://bucket/from1', 'gs://bucket/from2', @@ -255,11 +282,13 @@ def test_rename(self, mock_gcsio): 'gs://bucket/from3', ]) - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_rename_delete_error(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_rename_delete_error(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module sources = [ 'gs://bucket/from1', 'gs://bucket/from2', @@ -296,11 +325,13 @@ def test_rename_delete_error(self, mock_gcsio): 'gs://bucket/from3', ]) - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_rename_copy_error(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_rename_copy_error(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module sources = [ 'gs://bucket/from1', 'gs://bucket/from2', @@ -335,11 +366,13 @@ def test_rename_copy_error(self, mock_gcsio): 'gs://bucket/from3', ]) - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_delete(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_delete(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module gcsio_mock._status.return_value = {'size': 0, 'updated': 99999.0} files = [ 'gs://bucket/from1', @@ -351,11 +384,13 @@ def test_delete(self, mock_gcsio): self.fs.delete(files) gcsio_mock.delete_batch.assert_called() - @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') - def test_delete_error(self, mock_gcsio): + @mock.patch('apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._get_gcsio') + def test_delete_error(self, mock_get_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock + mock_gcsio_module = mock.MagicMock() + mock_gcsio_module.GcsIO.return_value = gcsio_mock + mock_get_gcsio.return_value = mock_gcsio_module gcsio_mock._status.return_value = {'size': 0, 'updated': 99999.0} files = [