From e5608097655ce7977271a1a815d6b089d04edb5e Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 4 Feb 2026 11:40:45 -0500 Subject: [PATCH 01/13] Migrate fileSize() and getObject() --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 45 +++- .../sdk/extensions/gcp/util/GcsUtilV2.java | 54 +++++ .../sdk/extensions/gcp/util/GcsUtilIT.java | 204 +++++++++++------- 3 files changed, 224 insertions(+), 79 deletions(-) create mode 100644 sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 220c08c6a7f3..104d8c4ffea3 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -25,6 +25,7 @@ import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; import com.google.auth.Credentials; +import com.google.cloud.storage.Blob; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.nio.channels.SeekableByteChannel; @@ -44,6 +45,7 @@ public class GcsUtil { @VisibleForTesting GcsUtilV1 delegate; + @VisibleForTesting @Nullable GcsUtilV2 delegateV2; public static class GcsCountersOptions { final GcsUtilV1.GcsCountersOptions delegate; @@ -91,7 +93,8 @@ public GcsUtil create(PipelineOptions options) { gcsOptions.getEnableBucketWriteMetricCounter() ? gcsOptions.getGcsWriteCounterPrefix() : null), - gcsOptions); + gcsOptions, + ExperimentalOptions.hasExperiment(options, "use_gcsutil_v2")); } } @@ -125,6 +128,36 @@ public static boolean isWildcard(GcsPath spec) { rewriteDataOpBatchLimit, gcsCountersOptions.delegate, gcsOptions); + this.delegateV2 = null; + } + + @VisibleForTesting + GcsUtil( + Storage storageClient, + HttpRequestInitializer httpRequestInitializer, + ExecutorService executorService, + Boolean shouldUseGrpc, + Credentials credentials, + @Nullable Integer uploadBufferSizeBytes, + @Nullable Integer rewriteDataOpBatchLimit, + GcsCountersOptions gcsCountersOptions, + GcsOptions gcsOptions, + Boolean shouldUseV2) { + this.delegate = + new GcsUtilV1( + storageClient, + httpRequestInitializer, + executorService, + shouldUseGrpc, + credentials, + uploadBufferSizeBytes, + rewriteDataOpBatchLimit, + gcsCountersOptions.delegate, + gcsOptions); + + if (shouldUseV2) { + this.delegateV2 = new GcsUtilV2(gcsOptions); + } } protected void setStorageClient(Storage storageClient) { @@ -146,18 +179,28 @@ Integer getUploadBufferSizeBytes() { } public long fileSize(GcsPath path) throws IOException { + if (delegateV2 != null) return delegateV2.fileSize(path); return delegate.fileSize(path); } + /** @deprecated use {@link #getBlob(GcsPath)}. */ + @Deprecated public StorageObject getObject(GcsPath gcsPath) throws IOException { return delegate.getObject(gcsPath); } + /** @deprecated use {@link #getBlob(GcsPath)}. */ + @Deprecated @VisibleForTesting StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException { return delegate.getObject(gcsPath, backoff, sleeper); } + public Blob getBlob(GcsPath gcsPath) throws IOException { + if (delegateV2 != null) return delegateV2.getBlob(gcsPath); + throw new IOException("GcsUtil2 not initialized."); + } + public List getObjects(List gcsPaths) throws IOException { List legacy = delegate.getObjects(gcsPaths); return legacy.stream() diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java new file mode 100644 index 000000000000..73065c24389f --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.util; + +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobGetOption; +import com.google.cloud.storage.StorageOptions; +import java.io.IOException; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.PipelineOptions; + +class GcsUtilV2 { + public static class GcsUtilFactory implements DefaultValueFactory { + @Override + public GcsUtilV2 create(PipelineOptions options) { + // GcsOptions gcsOptions = options.as(GcsOptions.class); + // Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions); + return new GcsUtilV2(options); + } + } + + private Storage storage; + + GcsUtilV2(PipelineOptions options) { + String projectId = options.as(GcpOptions.class).getProject(); + storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService(); + } + + public Blob getBlob(GcsPath gcsPath, BlobGetOption... blobGetOptions) throws IOException { + return storage.get(gcsPath.getBucket(), gcsPath.getObject(), blobGetOptions); + } + + public long fileSize(GcsPath gcsPath) throws IOException { + return getBlob(gcsPath).getSize(); + } +} diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java index b6c92ab9369d..6d80fd2c9ec4 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java @@ -17,33 +17,26 @@ */ package org.apache.beam.sdk.extensions.gcp.util; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import com.google.protobuf.ByteString; +import com.google.api.services.storage.model.StorageObject; +import com.google.cloud.storage.Blob; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; +import java.util.Arrays; import java.util.Collections; -import java.util.Date; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; -import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.CreateOptions; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; -import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.testing.UsesKms; -import org.apache.beam.sdk.util.ByteStringOutputStream; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; /** * Integration tests for {@link GcsUtil}. These tests are designed to run against production Google @@ -52,91 +45,146 @@ *

This is a runnerless integration test, even though the Beam IT framework assumes one. Thus, * this test should only be run against single runner (such as DirectRunner). */ -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) @Category(UsesKms.class) public class GcsUtilIT { - /** Tests a rewrite operation that requires multiple API calls (using a continuation token). */ + + @Parameters(name = "{0}") + public static Iterable data() { + return Arrays.asList("use_gcsutil_v1", "use_gcsutil_v2"); + } + + @Parameter public String experiment; + @Test - public void testRewriteMultiPart() throws IOException { + public void testFileSize() throws IOException { + final GcsPath gcsPath = GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt"); + final long expectedSize = 157283L; + TestPipelineOptions options = TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); - // Using a KMS key is necessary to trigger multi-part rewrites (bucket is created - // with a bucket default key). - assertNotNull(options.getTempRoot()); - options.setTempLocation( - FileSystems.matchNewDirectory(options.getTempRoot(), "testRewriteMultiPart").toString()); + + // set the experimental flag. + ExperimentalOptions experimentalOptions = options.as(ExperimentalOptions.class); + experimentalOptions.setExperiments(Collections.singletonList(experiment)); GcsOptions gcsOptions = options.as(GcsOptions.class); GcsUtil gcsUtil = gcsOptions.getGcsUtil(); - String srcFilename = "gs://dataflow-samples/wikipedia_edits/wiki_data-000000000000.json"; - String dstFilename = - gcsOptions.getGcpTempLocation() - + String.format( - "/GcsUtilIT-%tF-% Date: Wed, 4 Feb 2026 12:56:29 -0500 Subject: [PATCH 02/13] Move getNonWildcardPrefix() and isWildcard() from GcsUtilV1 to GcsPath --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 4 ++-- .../sdk/extensions/gcp/util/GcsUtilV1.java | 19 ++---------------- .../extensions/gcp/util/gcsfs/GcsPath.java | 15 ++++++++++++++ .../gcp/util/gcsfs/GcsPathTest.java | 20 +++++++++++++++++++ 4 files changed, 39 insertions(+), 19 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 104d8c4ffea3..ab3e31962664 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -99,11 +99,11 @@ public GcsUtil create(PipelineOptions options) { } public static String getNonWildcardPrefix(String globExp) { - return GcsUtilV1.getNonWildcardPrefix(globExp); + return GcsPath.getNonWildcardPrefix(globExp); } public static boolean isWildcard(GcsPath spec) { - return GcsUtilV1.isWildcard(spec); + return GcsPath.isWildcard(spec); } @VisibleForTesting diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java index c44eb36c2636..439fa0015a60 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java @@ -165,9 +165,6 @@ public GcsUtilV1 create(PipelineOptions options) { /** Maximum number of items to retrieve per Objects.List request. */ private static final long MAX_LIST_ITEMS_PER_CALL = 1024; - /** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */ - private static final Pattern GLOB_PREFIX = Pattern.compile("(?[^\\[*?]*)[\\[*?].*"); - /** Maximum number of requests permitted in a GCS batch request. */ private static final int MAX_REQUESTS_PER_BATCH = 100; /** Default maximum number of requests permitted in a GCS batch request where data is copied. */ @@ -224,18 +221,6 @@ public boolean shouldRetry(IOException e) { @VisibleForTesting @Nullable AtomicInteger numRewriteTokensUsed; - /** Returns the prefix portion of the glob that doesn't contain wildcards. */ - public static String getNonWildcardPrefix(String globExp) { - Matcher m = GLOB_PREFIX.matcher(globExp); - checkArgument(m.matches(), String.format("Glob expression: [%s] is not expandable.", globExp)); - return m.group("PREFIX"); - } - - /** Returns true if the given {@code spec} contains wildcard. */ - public static boolean isWildcard(GcsPath spec) { - return GLOB_PREFIX.matcher(spec.getObject()).matches(); - } - @VisibleForTesting GcsUtilV1( Storage storageClient, @@ -333,9 +318,9 @@ protected void setBatchRequestSupplier(Supplier supplier) { public List expand(GcsPath gcsPattern) throws IOException { Pattern p = null; String prefix = null; - if (isWildcard(gcsPattern)) { + if (GcsPath.isWildcard(gcsPattern)) { // Part before the first wildcard character. - prefix = getNonWildcardPrefix(gcsPattern.getObject()); + prefix = GcsPath.getNonWildcardPrefix(gcsPattern.getObject()); p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject())); } else { // Not a wildcard. diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java index 29c312d7a5e7..6ee6d46597f7 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java @@ -129,6 +129,9 @@ public static GcsPath fromUri(String uri) { /** Pattern that is used to validate a GCS bucket name. */ private static final Pattern GCS_BUCKET_NAME = Pattern.compile("[a-z0-9][-_a-z0-9.]+[a-z0-9]"); + /** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */ + private static final Pattern GLOB_PREFIX = Pattern.compile("(?[^\\[*?]*)[\\[*?].*"); + /** Creates a GcsPath from a OnePlatform resource name in string form. */ public static GcsPath fromResourceName(String name) { Matcher m = GCS_RESOURCE_NAME.matcher(name); @@ -605,4 +608,16 @@ private String bucketAndObject() { return bucket + "/" + object; } } + + /** Returns the prefix portion of the glob that doesn't contain wildcards. */ + public static String getNonWildcardPrefix(String globExp) { + Matcher m = GLOB_PREFIX.matcher(globExp); + checkArgument(m.matches(), String.format("Glob expression: [%s] is not expandable.", globExp)); + return m.group("PREFIX"); + } + + /** Returns true if the given {@code spec} contains wildcard. */ + public static boolean isWildcard(GcsPath spec) { + return GLOB_PREFIX.matcher(spec.getObject()).matches(); + } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java index ea047b0c6ea5..f344c6c2dba7 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java @@ -348,4 +348,24 @@ public void testSubPathError() { a.subpath(1, 1); // throws IllegalArgumentException Assert.fail(); } + + @Test + public void testIsWildcard() { + assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo*"))); + assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo?"))); + assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo[a-z]"))); + assertFalse(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo"))); + } + + @Test + public void testGetNonWildcardPrefix() { + assertEquals("gs://bucket/foo", GcsPath.getNonWildcardPrefix("gs://bucket/foo*")); + assertEquals("gs://bucket/foo", GcsPath.getNonWildcardPrefix("gs://bucket/foo?")); + assertEquals("gs://bucket/foo", GcsPath.getNonWildcardPrefix("gs://bucket/foo[a-z]")); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetNonWildcardPrefix_noWildcard() { + GcsPath.getNonWildcardPrefix("gs://bucket/foo/bar"); + } } From 5c96d03d80ac6b596c65bd3ad542b0be7ffb6f80 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 4 Feb 2026 14:37:58 -0500 Subject: [PATCH 03/13] Migrate listObjects() --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 17 +++++++++ .../sdk/extensions/gcp/util/GcsUtilV1.java | 1 - .../sdk/extensions/gcp/util/GcsUtilV2.java | 35 +++++++++++++++++++ .../sdk/extensions/gcp/util/GcsUtilIT.java | 29 +++++++++++++++ 4 files changed, 81 insertions(+), 1 deletion(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index ab3e31962664..5cf841625e37 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -208,17 +208,34 @@ public List getObjects(List gcsPaths) throw .collect(java.util.stream.Collectors.toList()); } + /** @deprecated use {@link #listBlobs(String, String, String)}. */ + @Deprecated public Objects listObjects(String bucket, String prefix, @Nullable String pageToken) throws IOException { return delegate.listObjects(bucket, prefix, pageToken); } + /** @deprecated use {@link #listBlobs(String, String, String, String)}. */ + @Deprecated public Objects listObjects( String bucket, String prefix, @Nullable String pageToken, @Nullable String delimiter) throws IOException { return delegate.listObjects(bucket, prefix, pageToken, delimiter); } + public List listBlobs(String bucket, String prefix, @Nullable String pageToken) + throws IOException { + if (delegateV2 != null) return delegateV2.listBlobs(bucket, prefix, pageToken); + throw new IOException("GcsUtil2 not initialized."); + } + + public List listBlobs( + String bucket, String prefix, @Nullable String pageToken, @Nullable String delimiter) + throws IOException { + if (delegateV2 != null) return delegateV2.listBlobs(bucket, prefix, pageToken, delimiter); + throw new IOException("GcsUtil2 not initialized."); + } + @VisibleForTesting List fileSizes(List paths) throws IOException { return delegate.fileSizes(paths); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java index 439fa0015a60..1ade4be6fdb5 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java @@ -76,7 +76,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; -import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers; import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index 73065c24389f..40b3871ee48f 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -17,15 +17,21 @@ */ package org.apache.beam.sdk.extensions.gcp.util; +import com.google.api.gax.paging.Page; import com.google.cloud.storage.Blob; import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobGetOption; +import com.google.cloud.storage.Storage.BlobListOption; import com.google.cloud.storage.StorageOptions; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.PipelineOptions; +import org.checkerframework.checker.nullness.qual.Nullable; class GcsUtilV2 { public static class GcsUtilFactory implements DefaultValueFactory { @@ -39,6 +45,9 @@ public GcsUtilV2 create(PipelineOptions options) { private Storage storage; + /** Maximum number of items to retrieve per Objects.List request. */ + private static final long MAX_LIST_BLOBS_PER_CALL = 1024; + GcsUtilV2(PipelineOptions options) { String projectId = options.as(GcpOptions.class).getProject(); storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService(); @@ -51,4 +60,30 @@ public Blob getBlob(GcsPath gcsPath, BlobGetOption... blobGetOptions) throws IOE public long fileSize(GcsPath gcsPath) throws IOException { return getBlob(gcsPath).getSize(); } + + /** Lists {@link Blob}s given the {@code bucket}, {@code prefix}, {@code pageToken}. */ + public List listBlobs( + String bucket, String prefix, @Nullable String pageToken, @Nullable String delimiter) + throws IOException { + List options = new ArrayList<>(); + options.add(BlobListOption.pageSize(MAX_LIST_BLOBS_PER_CALL)); + if (pageToken != null) { + options.add(BlobListOption.pageToken(pageToken)); + } + if (prefix != null) { + options.add(BlobListOption.prefix(prefix)); + } + if (delimiter != null) { + options.add(BlobListOption.delimiter(delimiter)); + } + + Page blobs = storage.list(bucket, options.toArray(new BlobListOption[0])); + List blobList = blobs.streamValues().collect(Collectors.toList()); + return blobList; + } + + public List listBlobs(String bucket, String prefix, @Nullable String pageToken) + throws IOException { + return listBlobs(bucket, prefix, pageToken, null); + } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java index 6d80fd2c9ec4..8c578bcea842 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java @@ -20,11 +20,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; import com.google.cloud.storage.Blob; import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.options.ExperimentalOptions; @@ -101,6 +104,32 @@ public void testGetObjectOrGetBlob() throws IOException { assertEquals(expectedCRC, crc); } + @Test + public void testListObjectsOrListBlobs() throws IOException { + final String bucket = "apache-beam-samples"; + final String prefix = "shakespeare/kingrichard"; + TestPipelineOptions options = + TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); + + // set the experimental flag. + ExperimentalOptions experimentalOptions = options.as(ExperimentalOptions.class); + experimentalOptions.setExperiments(Collections.singletonList(experiment)); + + GcsOptions gcsOptions = options.as(GcsOptions.class); + GcsUtil gcsUtil = gcsOptions.getGcsUtil(); + + List names; + if (experiment.equals("use_gcsutil_v2")) { + List blobs = gcsUtil.listBlobs(bucket, prefix, null); + names = blobs.stream().map(blob -> blob.getName()).collect(Collectors.toList()); + } else { + Objects objs = gcsUtil.listObjects(bucket, prefix, null); + names = objs.getItems().stream().map(obj -> obj.getName()).collect(Collectors.toList()); + } + assertEquals( + Arrays.asList("shakespeare/kingrichardii.txt", "shakespeare/kingrichardiii.txt"), names); + } + // /** Tests a rewrite operation that requires multiple API calls (using a continuation token). */ // @Test // public void testRewriteMultiPart() throws IOException { From e568f9cfffecd8e8d89f5135a6db61aef7ae9846 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 4 Feb 2026 16:32:30 -0500 Subject: [PATCH 04/13] Migrate getBucket(), bucketAccessible(), verifyBucketAccessible(). --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 7 ++++ .../sdk/extensions/gcp/util/GcsUtilV2.java | 33 +++++++++++++++++++ .../sdk/extensions/gcp/util/GcsUtilIT.java | 21 ++++++++++++ 3 files changed, 61 insertions(+) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 5cf841625e37..27e011f13450 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -314,10 +314,12 @@ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IO } public void verifyBucketAccessible(GcsPath path) throws IOException { + if (delegateV2 != null) delegateV2.verifyBucketAccessible(path); delegate.verifyBucketAccessible(path); } public boolean bucketAccessible(GcsPath path) throws IOException { + if (delegateV2 != null) return delegateV2.bucketAccessible(path); return delegate.bucketAccessible(path); } @@ -333,6 +335,11 @@ public void createBucket(String projectId, Bucket bucket) throws IOException { return delegate.getBucket(path); } + public com.google.cloud.storage.@Nullable Bucket getBucketV2(GcsPath path) throws IOException { + if (delegateV2 != null) return delegateV2.getBucket(path); + throw new IOException("GcsUtil2 not initialized."); + } + public void removeBucket(Bucket bucket) throws IOException { delegate.removeBucket(bucket); } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index 40b3871ee48f..83a1ba52caad 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -19,10 +19,15 @@ import com.google.api.gax.paging.Page; import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Bucket; import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobGetOption; import com.google.cloud.storage.Storage.BlobListOption; +import com.google.cloud.storage.Storage.BucketField; +import com.google.cloud.storage.Storage.BucketGetOption; +import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -86,4 +91,32 @@ public List listBlobs(String bucket, String prefix, @Nullable String pageT throws IOException { return listBlobs(bucket, prefix, pageToken, null); } + + /** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */ + public @Nullable Bucket getBucket(GcsPath path) throws IOException { + Bucket bucket = storage.get(path.getBucket()); + if (bucket != null) { + return bucket; + } + throw new FileNotFoundException( + String.format("The specified bucket does not exist: %s", path.getBucket())); + } + + /** Returns whether the GCS bucket exists and is accessible. */ + public boolean bucketAccessible(GcsPath path) { + try { + // Only select bucket name as a minimal set of returned fields. + return storage.get(path.getBucket(), BucketGetOption.fields(BucketField.NAME)) != null; + } catch (StorageException e) { + return false; + } + } + + /** + * Checks whether the GCS bucket exists. Similar to {@link #bucketAccessible(GcsPath)}, but throws + * exception if the bucket is inaccessible due to permissions or does not exist. + */ + public void verifyBucketAccessible(GcsPath path) throws IOException { + storage.get(path.getBucket(), BucketGetOption.fields(BucketField.NAME)); + } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java index 8c578bcea842..47ececf75a30 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java @@ -23,6 +23,7 @@ import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; import com.google.cloud.storage.Blob; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -130,6 +131,26 @@ public void testListObjectsOrListBlobs() throws IOException { Arrays.asList("shakespeare/kingrichardii.txt", "shakespeare/kingrichardiii.txt"), names); } + @Test(expected = FileNotFoundException.class) + public void testGetBucketOrGetBucketV2OnNonExistentBucket() throws IOException { + final GcsPath gcsPath = GcsPath.fromUri("gs://my-random-test-bucket-12345"); + TestPipelineOptions options = + TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); + + // set the experimental flag. + ExperimentalOptions experimentalOptions = options.as(ExperimentalOptions.class); + experimentalOptions.setExperiments(Collections.singletonList(experiment)); + + GcsOptions gcsOptions = options.as(GcsOptions.class); + GcsUtil gcsUtil = gcsOptions.getGcsUtil(); + + if (experiment.equals("use_gcsutil_v2")) { + gcsUtil.getBucketV2(gcsPath); + } else { + gcsUtil.getBucket(gcsPath); + } + } + // /** Tests a rewrite operation that requires multiple API calls (using a continuation token). */ // @Test // public void testRewriteMultiPart() throws IOException { From 15d8bccf1c1998d897396b093e885117e7dba43d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 4 Feb 2026 21:39:54 -0500 Subject: [PATCH 05/13] Add tests and extract setUp function. --- .../sdk/extensions/gcp/util/GcsUtilV2.java | 20 ++++- .../sdk/extensions/gcp/util/GcsUtilIT.java | 82 +++++++++++-------- 2 files changed, 63 insertions(+), 39 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index 83a1ba52caad..df69e3d2f92f 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -29,6 +29,7 @@ import com.google.cloud.storage.StorageOptions; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.AccessDeniedException; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -92,18 +93,29 @@ public List listBlobs(String bucket, String prefix, @Nullable String pageT return listBlobs(bucket, prefix, pageToken, null); } + @SuppressWarnings({ + "nullness" // For Creating AccessDeniedException with null. + }) /** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */ public @Nullable Bucket getBucket(GcsPath path) throws IOException { - Bucket bucket = storage.get(path.getBucket()); - if (bucket != null) { - return bucket; + try { + Bucket bucket = storage.get(path.getBucket()); + if (bucket != null) { + return bucket; + } + } catch (StorageException e) { + if (e.getCode() == 403) { // 403 Forbidden + throw new AccessDeniedException(path.toString(), null, e.getMessage()); + } else { + throw e; + } } throw new FileNotFoundException( String.format("The specified bucket does not exist: %s", path.getBucket())); } /** Returns whether the GCS bucket exists and is accessible. */ - public boolean bucketAccessible(GcsPath path) { + public boolean bucketAccessible(GcsPath path) throws IOException { try { // Only select bucket name as a minimal set of returned fields. return storage.get(path.getBucket(), BucketGetOption.fields(BucketField.NAME)) != null; diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java index 47ececf75a30..2eb35be9cbd5 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java @@ -18,13 +18,13 @@ package org.apache.beam.sdk.extensions.gcp.util; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; import com.google.cloud.storage.Blob; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.AccessDeniedException; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -35,6 +35,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.testing.UsesKms; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -60,21 +61,25 @@ public static Iterable data() { @Parameter public String experiment; - @Test - public void testFileSize() throws IOException { - final GcsPath gcsPath = GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt"); - final long expectedSize = 157283L; + private TestPipelineOptions options; + private GcsUtil gcsUtil; - TestPipelineOptions options = - TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); + @Before + public void setUp() { + options = TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); // set the experimental flag. ExperimentalOptions experimentalOptions = options.as(ExperimentalOptions.class); experimentalOptions.setExperiments(Collections.singletonList(experiment)); GcsOptions gcsOptions = options.as(GcsOptions.class); - GcsUtil gcsUtil = gcsOptions.getGcsUtil(); - assertNotNull(gcsUtil); + gcsUtil = gcsOptions.getGcsUtil(); + } + + @Test + public void testFileSize() throws IOException { + final GcsPath gcsPath = GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt"); + final long expectedSize = 157283L; assertEquals(expectedSize, gcsUtil.fileSize(gcsPath)); } @@ -84,16 +89,6 @@ public void testGetObjectOrGetBlob() throws IOException { final GcsPath gcsPath = GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt"); final String expectedCRC = "s0a3Tg=="; - TestPipelineOptions options = - TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); - - // set the experimental flag. - ExperimentalOptions experimentalOptions = options.as(ExperimentalOptions.class); - experimentalOptions.setExperiments(Collections.singletonList(experiment)); - - GcsOptions gcsOptions = options.as(GcsOptions.class); - GcsUtil gcsUtil = gcsOptions.getGcsUtil(); - String crc; if (experiment.equals("use_gcsutil_v2")) { Blob blob = gcsUtil.getBlob(gcsPath); @@ -109,15 +104,6 @@ public void testGetObjectOrGetBlob() throws IOException { public void testListObjectsOrListBlobs() throws IOException { final String bucket = "apache-beam-samples"; final String prefix = "shakespeare/kingrichard"; - TestPipelineOptions options = - TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); - - // set the experimental flag. - ExperimentalOptions experimentalOptions = options.as(ExperimentalOptions.class); - experimentalOptions.setExperiments(Collections.singletonList(experiment)); - - GcsOptions gcsOptions = options.as(GcsOptions.class); - GcsUtil gcsUtil = gcsOptions.getGcsUtil(); List names; if (experiment.equals("use_gcsutil_v2")) { @@ -131,18 +117,33 @@ public void testListObjectsOrListBlobs() throws IOException { Arrays.asList("shakespeare/kingrichardii.txt", "shakespeare/kingrichardiii.txt"), names); } + @Test + public void testGetBucketOrGetBucketV2OnAccessibleBucket() throws IOException { + final GcsPath gcsPath = GcsPath.fromUri("gs://apache-beam-samples"); + + String bucket; + if (experiment.equals("use_gcsutil_v2")) { + bucket = gcsUtil.getBucketV2(gcsPath).getName(); + } else { + bucket = gcsUtil.getBucket(gcsPath).getName(); + } + assertEquals("apache-beam-samples", bucket); + } + @Test(expected = FileNotFoundException.class) public void testGetBucketOrGetBucketV2OnNonExistentBucket() throws IOException { final GcsPath gcsPath = GcsPath.fromUri("gs://my-random-test-bucket-12345"); - TestPipelineOptions options = - TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); - // set the experimental flag. - ExperimentalOptions experimentalOptions = options.as(ExperimentalOptions.class); - experimentalOptions.setExperiments(Collections.singletonList(experiment)); + if (experiment.equals("use_gcsutil_v2")) { + gcsUtil.getBucketV2(gcsPath); + } else { + gcsUtil.getBucket(gcsPath); + } + } - GcsOptions gcsOptions = options.as(GcsOptions.class); - GcsUtil gcsUtil = gcsOptions.getGcsUtil(); + @Test(expected = AccessDeniedException.class) + public void testGetBucketOrGetBucketV2OnForbiddenBucket() throws IOException { + final GcsPath gcsPath = GcsPath.fromUri("gs://test-bucket"); if (experiment.equals("use_gcsutil_v2")) { gcsUtil.getBucketV2(gcsPath); @@ -151,6 +152,17 @@ public void testGetBucketOrGetBucketV2OnNonExistentBucket() throws IOException { } } + @Test + public void testBucketAccessible() throws IOException { + final GcsPath accessiblePath = GcsPath.fromUri("gs://apache-beam-samples"); + final GcsPath nonExistentPath = GcsPath.fromUri("gs://my-random-test-bucket-12345"); + final GcsPath forbiddenPath = GcsPath.fromUri("gs://test-bucket"); + + assertEquals(true, gcsUtil.bucketAccessible(accessiblePath)); + assertEquals(false, gcsUtil.bucketAccessible(nonExistentPath)); + assertEquals(false, gcsUtil.bucketAccessible(forbiddenPath)); + } + // /** Tests a rewrite operation that requires multiple API calls (using a continuation token). */ // @Test // public void testRewriteMultiPart() throws IOException { From 1f9168ba15ccc86f2b8d2cadf03544e54e86d4a2 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 4 Feb 2026 21:54:22 -0500 Subject: [PATCH 06/13] Migrate bucketOwner() --- .../apache/beam/sdk/extensions/gcp/util/GcsUtil.java | 1 + .../apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java | 10 ++++++++++ .../apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java | 7 +++++++ 3 files changed, 18 insertions(+) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 27e011f13450..010ee58c6271 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -324,6 +324,7 @@ public boolean bucketAccessible(GcsPath path) throws IOException { } public long bucketOwner(GcsPath path) throws IOException { + if (delegateV2 != null) return delegateV2.bucketProject(path); return delegate.bucketOwner(path); } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index df69e3d2f92f..1c234540e344 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -131,4 +131,14 @@ public boolean bucketAccessible(GcsPath path) throws IOException { public void verifyBucketAccessible(GcsPath path) throws IOException { storage.get(path.getBucket(), BucketGetOption.fields(BucketField.NAME)); } + + /** + * Returns the project number of the project which owns this bucket. If the bucket exists, it must + * be accessible otherwise the permissions exception will be propagated. If the bucket does not + * exist, an exception will be thrown. + */ + public long bucketProject(GcsPath path) throws IOException { + Bucket bucket = storage.get(path.getBucket(), BucketGetOption.fields(BucketField.PROJECT)); + return bucket.getProject().longValue(); + } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java index 2eb35be9cbd5..83e73527a19c 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java @@ -163,6 +163,13 @@ public void testBucketAccessible() throws IOException { assertEquals(false, gcsUtil.bucketAccessible(forbiddenPath)); } + @Test + public void testBucketOwner() throws IOException { + final GcsPath gcsPath = GcsPath.fromUri("gs://apache-beam-samples"); + final long expectedProjectNumber = 844138762903L; // apache-beam-testing + assertEquals(expectedProjectNumber, gcsUtil.bucketOwner(gcsPath)); + } + // /** Tests a rewrite operation that requires multiple API calls (using a continuation token). */ // @Test // public void testRewriteMultiPart() throws IOException { From 8186a0a92b59b9e905a0c3dc97a4797235f935ee Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 5 Feb 2026 12:17:28 -0500 Subject: [PATCH 07/13] Migrate createBucket() and removeBucket() --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 22 ++++++- .../sdk/extensions/gcp/util/GcsUtilV1.java | 2 +- .../sdk/extensions/gcp/util/GcsUtilV2.java | 61 +++++++++++++++--- .../sdk/extensions/gcp/util/GcsUtilIT.java | 62 +++++++++++++++++++ 4 files changed, 137 insertions(+), 10 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 010ee58c6271..43def6fd89bb 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -26,6 +26,7 @@ import com.google.api.services.storage.model.StorageObject; import com.google.auth.Credentials; import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BucketInfo; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.nio.channels.SeekableByteChannel; @@ -314,7 +315,10 @@ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IO } public void verifyBucketAccessible(GcsPath path) throws IOException { - if (delegateV2 != null) delegateV2.verifyBucketAccessible(path); + if (delegateV2 != null) { + delegateV2.verifyBucketAccessible(path); + return; + } delegate.verifyBucketAccessible(path); } @@ -332,6 +336,14 @@ public void createBucket(String projectId, Bucket bucket) throws IOException { delegate.createBucket(projectId, bucket); } + public void createBucket(BucketInfo bucketInfo) throws IOException { + if (delegateV2 != null) { + delegateV2.createBucket(bucketInfo); + } else { + throw new IOException("GcsUtil2 not initialized."); + } + } + public @Nullable Bucket getBucket(GcsPath path) throws IOException { return delegate.getBucket(path); } @@ -345,6 +357,14 @@ public void removeBucket(Bucket bucket) throws IOException { delegate.removeBucket(bucket); } + public void removeBucket(BucketInfo bucketInfo) throws IOException { + if (delegateV2 != null) { + delegateV2.removeBucket(bucketInfo); + } else { + throw new IOException("GcsUtil2 not initialized."); + } + } + @VisibleForTesting boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { return delegate.bucketAccessible(path, backoff, sleeper); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java index 1ade4be6fdb5..a245f7d3f3f1 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java @@ -104,7 +104,7 @@ /** Provides operations on GCS. */ @SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) + "nullness" // For Creating AccessDeniedException with null. }) class GcsUtilV1 { diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index 1c234540e344..4194c54e3cb1 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -20,6 +20,7 @@ import com.google.api.gax.paging.Page; import com.google.cloud.storage.Blob; import com.google.cloud.storage.Bucket; +import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobGetOption; import com.google.cloud.storage.Storage.BlobListOption; @@ -30,6 +31,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.AccessDeniedException; +import java.nio.file.FileAlreadyExistsException; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -98,20 +100,22 @@ public List listBlobs(String bucket, String prefix, @Nullable String pageT }) /** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */ public @Nullable Bucket getBucket(GcsPath path) throws IOException { + String bucketName = path.getBucket(); try { - Bucket bucket = storage.get(path.getBucket()); - if (bucket != null) { - return bucket; + Bucket bucket = storage.get(bucketName); + if (bucket == null) { + throw new FileNotFoundException( + String.format("The specified bucket does not exist: %s", bucketName)); } + return bucket; } catch (StorageException e) { if (e.getCode() == 403) { // 403 Forbidden - throw new AccessDeniedException(path.toString(), null, e.getMessage()); - } else { - throw e; + throw new AccessDeniedException(String.format("gs://%s", bucketName), null, e.getMessage()); } + + // rethrow other exceptions + throw e; } - throw new FileNotFoundException( - String.format("The specified bucket does not exist: %s", path.getBucket())); } /** Returns whether the GCS bucket exists and is accessible. */ @@ -141,4 +145,45 @@ public long bucketProject(GcsPath path) throws IOException { Bucket bucket = storage.get(path.getBucket(), BucketGetOption.fields(BucketField.PROJECT)); return bucket.getProject().longValue(); } + + @SuppressWarnings({ + "nullness" // For Creating AccessDeniedException with null. + }) + public void createBucket(BucketInfo bucketInfo) throws IOException { + String bucketName = bucketInfo.getName(); + try { + storage.create(bucketInfo); + } catch (StorageException e) { + if (e.getCode() == 403) { // 403 Forbidden + throw new AccessDeniedException(String.format("gs://%s", bucketName), null, e.getMessage()); + } else if (e.getCode() == 409) { // 409 Conflict + throw new FileAlreadyExistsException(bucketName, null, e.getMessage()); + } + + // rethrow other exceptions + throw e; + } + } + + @SuppressWarnings({ + "nullness" // For Creating AccessDeniedException with null. + }) + public void removeBucket(BucketInfo bucketInfo) throws IOException { + String bucketName = bucketInfo.getName(); + try { + Bucket bucket = storage.get(bucketName, BucketGetOption.fields(BucketField.NAME)); + if (bucket == null) { + throw new FileNotFoundException( + String.format("The specified bucket does not exist: %s", bucketName)); + } + bucket.delete(); + } catch (StorageException e) { + if (e.getCode() == 403) { // 403 Forbidden + throw new AccessDeniedException(String.format("gs://%s", bucketName), null, e.getMessage()); + } + + // rethrow other exceptions + throw e; + } + } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java index 83e73527a19c..d56ae86ac4db 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java @@ -18,13 +18,19 @@ package org.apache.beam.sdk.extensions.gcp.util; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import com.google.api.services.storage.model.Bucket; import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BucketInfo; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.AccessDeniedException; +import java.nio.file.FileAlreadyExistsException; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -170,6 +176,62 @@ public void testBucketOwner() throws IOException { assertEquals(expectedProjectNumber, gcsUtil.bucketOwner(gcsPath)); } + @Test + public void testCreateAndRemoveBucket() throws IOException { + final GcsPath gcsPath = GcsPath.fromUri("gs://apache-beam-test-bucket-12345"); + + if (experiment.equals("use_gcsutil_v2")) { + BucketInfo bucketInfo = BucketInfo.of(gcsPath.getBucket()); + try { + assertFalse(gcsUtil.bucketAccessible(gcsPath)); + gcsUtil.createBucket(bucketInfo); + assertTrue(gcsUtil.bucketAccessible(gcsPath)); + + // raise exception when the bucket already exists during creation + assertThrows(FileAlreadyExistsException.class, () -> gcsUtil.createBucket(bucketInfo)); + + assertTrue(gcsUtil.bucketAccessible(gcsPath)); + gcsUtil.removeBucket(bucketInfo); + assertFalse(gcsUtil.bucketAccessible(gcsPath)); + + // raise exception when the bucket does not exist during removal + assertThrows(FileNotFoundException.class, () -> gcsUtil.removeBucket(bucketInfo)); + } finally { + // clean up and ignore errors no matter what + try { + gcsUtil.removeBucket(bucketInfo); + } catch (IOException e) { + } + } + } else { + Bucket bucket = new Bucket().setName(gcsPath.getBucket()); + GcsOptions gcsOptions = options.as(GcsOptions.class); + String projectId = gcsOptions.getProject(); + try { + assertFalse(gcsUtil.bucketAccessible(gcsPath)); + gcsUtil.createBucket(projectId, bucket); + assertTrue(gcsUtil.bucketAccessible(gcsPath)); + + // raise exception when the bucket already exists during creation + assertThrows( + FileAlreadyExistsException.class, () -> gcsUtil.createBucket(projectId, bucket)); + + assertTrue(gcsUtil.bucketAccessible(gcsPath)); + gcsUtil.removeBucket(bucket); + assertFalse(gcsUtil.bucketAccessible(gcsPath)); + + // raise exception when the bucket does not exist during removal + assertThrows(FileNotFoundException.class, () -> gcsUtil.removeBucket(bucket)); + } finally { + // clean up and ignore errors no matter what + try { + gcsUtil.removeBucket(bucket); + } catch (IOException e) { + } + } + } + } + // /** Tests a rewrite operation that requires multiple API calls (using a continuation token). */ // @Test // public void testRewriteMultiPart() throws IOException { From e436313b068e6b1398101f304dac97150956df00 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 5 Feb 2026 20:30:06 -0500 Subject: [PATCH 08/13] Refactor exception handling. --- .../sdk/extensions/gcp/util/GcsUtilV2.java | 92 +++++++++---------- .../sdk/extensions/gcp/util/GcsUtilIT.java | 55 +++++------ 2 files changed, 75 insertions(+), 72 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index 4194c54e3cb1..fd0a93d7e1a8 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -61,8 +61,34 @@ public GcsUtilV2 create(PipelineOptions options) { storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService(); } + @SuppressWarnings({ + "nullness" // For Creating AccessDeniedException and FileAlreadyExistsException with null. + }) + private IOException translateStorageException( + String bucketName, @Nullable String blobName, StorageException e) { + String path = "gs://" + bucketName + (blobName == null ? "" : "/" + blobName); + + switch (e.getCode()) { + case 403: + return new AccessDeniedException(path, null, e.getMessage()); + case 409: + return new FileAlreadyExistsException(path, null, e.getMessage()); + default: + return new IOException(e); + } + } + public Blob getBlob(GcsPath gcsPath, BlobGetOption... blobGetOptions) throws IOException { - return storage.get(gcsPath.getBucket(), gcsPath.getObject(), blobGetOptions); + try { + Blob blob = storage.get(gcsPath.getBucket(), gcsPath.getObject(), blobGetOptions); + if (blob == null) { + throw new FileNotFoundException( + String.format("The specified file does not exist: %s", gcsPath.toString())); + } + return blob; + } catch (StorageException e) { + throw translateStorageException(gcsPath.getBucket(), gcsPath.getObject(), e); + } } public long fileSize(GcsPath gcsPath) throws IOException { @@ -95,35 +121,28 @@ public List listBlobs(String bucket, String prefix, @Nullable String pageT return listBlobs(bucket, prefix, pageToken, null); } - @SuppressWarnings({ - "nullness" // For Creating AccessDeniedException with null. - }) /** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */ - public @Nullable Bucket getBucket(GcsPath path) throws IOException { + public Bucket getBucket(GcsPath path, BucketGetOption... options) throws IOException { String bucketName = path.getBucket(); try { - Bucket bucket = storage.get(bucketName); + Bucket bucket = storage.get(bucketName, options); if (bucket == null) { throw new FileNotFoundException( - String.format("The specified bucket does not exist: %s", bucketName)); + String.format("The specified bucket does not exist: gs://%s", bucketName)); } return bucket; } catch (StorageException e) { - if (e.getCode() == 403) { // 403 Forbidden - throw new AccessDeniedException(String.format("gs://%s", bucketName), null, e.getMessage()); - } - - // rethrow other exceptions - throw e; + throw translateStorageException(bucketName, null, e); } } /** Returns whether the GCS bucket exists and is accessible. */ - public boolean bucketAccessible(GcsPath path) throws IOException { + public boolean bucketAccessible(GcsPath path) { try { - // Only select bucket name as a minimal set of returned fields. - return storage.get(path.getBucket(), BucketGetOption.fields(BucketField.NAME)) != null; - } catch (StorageException e) { + // Fetch only the name field to minimize data transfer + getBucket(path, BucketGetOption.fields(BucketField.NAME)); + return true; + } catch (IOException e) { return false; } } @@ -133,7 +152,8 @@ public boolean bucketAccessible(GcsPath path) throws IOException { * exception if the bucket is inaccessible due to permissions or does not exist. */ public void verifyBucketAccessible(GcsPath path) throws IOException { - storage.get(path.getBucket(), BucketGetOption.fields(BucketField.NAME)); + // Fetch only the name field to minimize data transfer + getBucket(path, BucketGetOption.fields(BucketField.NAME)); } /** @@ -142,48 +162,28 @@ public void verifyBucketAccessible(GcsPath path) throws IOException { * exist, an exception will be thrown. */ public long bucketProject(GcsPath path) throws IOException { - Bucket bucket = storage.get(path.getBucket(), BucketGetOption.fields(BucketField.PROJECT)); + Bucket bucket = getBucket(path, BucketGetOption.fields(BucketField.PROJECT)); return bucket.getProject().longValue(); } - @SuppressWarnings({ - "nullness" // For Creating AccessDeniedException with null. - }) public void createBucket(BucketInfo bucketInfo) throws IOException { - String bucketName = bucketInfo.getName(); try { storage.create(bucketInfo); } catch (StorageException e) { - if (e.getCode() == 403) { // 403 Forbidden - throw new AccessDeniedException(String.format("gs://%s", bucketName), null, e.getMessage()); - } else if (e.getCode() == 409) { // 409 Conflict - throw new FileAlreadyExistsException(bucketName, null, e.getMessage()); - } - - // rethrow other exceptions - throw e; + throw translateStorageException(bucketInfo.getName(), null, e); } } - @SuppressWarnings({ - "nullness" // For Creating AccessDeniedException with null. - }) public void removeBucket(BucketInfo bucketInfo) throws IOException { - String bucketName = bucketInfo.getName(); + Bucket bucket = + getBucket( + GcsPath.fromComponents(bucketInfo.getName(), null), + BucketGetOption.fields(BucketField.NAME)); + try { - Bucket bucket = storage.get(bucketName, BucketGetOption.fields(BucketField.NAME)); - if (bucket == null) { - throw new FileNotFoundException( - String.format("The specified bucket does not exist: %s", bucketName)); - } bucket.delete(); } catch (StorageException e) { - if (e.getCode() == 403) { // 403 Forbidden - throw new AccessDeniedException(String.format("gs://%s", bucketName), null, e.getMessage()); - } - - // rethrow other exceptions - throw e; + throw translateStorageException(bucketInfo.getName(), null, e); } } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java index d56ae86ac4db..f91195a95b76 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java @@ -92,18 +92,32 @@ public void testFileSize() throws IOException { @Test public void testGetObjectOrGetBlob() throws IOException { - final GcsPath gcsPath = GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt"); + final GcsPath existingPath = + GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt"); final String expectedCRC = "s0a3Tg=="; String crc; if (experiment.equals("use_gcsutil_v2")) { - Blob blob = gcsUtil.getBlob(gcsPath); + Blob blob = gcsUtil.getBlob(existingPath); crc = blob.getCrc32c(); } else { - StorageObject obj = gcsUtil.getObject(gcsPath); + StorageObject obj = gcsUtil.getObject(existingPath); crc = obj.getCrc32c(); } assertEquals(expectedCRC, crc); + + final GcsPath nonExistentPath = + GcsPath.fromUri("gs://my-random-test-bucket-12345/unknown-12345.txt"); + final GcsPath forbiddenPath = GcsPath.fromUri("gs://test-bucket/unknown-12345.txt"); + + if (experiment.equals("use_gcsutil_v2")) { + assertThrows(FileNotFoundException.class, () -> gcsUtil.getBlob(nonExistentPath)); + // For V2, we are returning AccessDeniedException (a subclass of IOException) for forbidden paths. + assertThrows(AccessDeniedException.class, () -> gcsUtil.getBlob(forbiddenPath)); + } else { + assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(nonExistentPath)); + assertThrows(IOException.class, () -> gcsUtil.getObject(forbiddenPath)); + } } @Test @@ -124,47 +138,36 @@ public void testListObjectsOrListBlobs() throws IOException { } @Test - public void testGetBucketOrGetBucketV2OnAccessibleBucket() throws IOException { - final GcsPath gcsPath = GcsPath.fromUri("gs://apache-beam-samples"); + public void testGetBucketOrGetBucketV2() throws IOException { + final GcsPath existingPath = GcsPath.fromUri("gs://apache-beam-samples"); String bucket; if (experiment.equals("use_gcsutil_v2")) { - bucket = gcsUtil.getBucketV2(gcsPath).getName(); + bucket = gcsUtil.getBucketV2(existingPath).getName(); } else { - bucket = gcsUtil.getBucket(gcsPath).getName(); + bucket = gcsUtil.getBucket(existingPath).getName(); } assertEquals("apache-beam-samples", bucket); - } - @Test(expected = FileNotFoundException.class) - public void testGetBucketOrGetBucketV2OnNonExistentBucket() throws IOException { - final GcsPath gcsPath = GcsPath.fromUri("gs://my-random-test-bucket-12345"); - - if (experiment.equals("use_gcsutil_v2")) { - gcsUtil.getBucketV2(gcsPath); - } else { - gcsUtil.getBucket(gcsPath); - } - } - - @Test(expected = AccessDeniedException.class) - public void testGetBucketOrGetBucketV2OnForbiddenBucket() throws IOException { - final GcsPath gcsPath = GcsPath.fromUri("gs://test-bucket"); + final GcsPath nonExistentPath = GcsPath.fromUri("gs://my-random-test-bucket-12345"); + final GcsPath forbiddenPath = GcsPath.fromUri("gs://test-bucket"); if (experiment.equals("use_gcsutil_v2")) { - gcsUtil.getBucketV2(gcsPath); + assertThrows(FileNotFoundException.class, () -> gcsUtil.getBucketV2(nonExistentPath)); + assertThrows(AccessDeniedException.class, () -> gcsUtil.getBucketV2(forbiddenPath)); } else { - gcsUtil.getBucket(gcsPath); + assertThrows(FileNotFoundException.class, () -> gcsUtil.getBucket(nonExistentPath)); + assertThrows(AccessDeniedException.class, () -> gcsUtil.getBucket(forbiddenPath)); } } @Test public void testBucketAccessible() throws IOException { - final GcsPath accessiblePath = GcsPath.fromUri("gs://apache-beam-samples"); + final GcsPath existingPath = GcsPath.fromUri("gs://apache-beam-samples"); final GcsPath nonExistentPath = GcsPath.fromUri("gs://my-random-test-bucket-12345"); final GcsPath forbiddenPath = GcsPath.fromUri("gs://test-bucket"); - assertEquals(true, gcsUtil.bucketAccessible(accessiblePath)); + assertEquals(true, gcsUtil.bucketAccessible(existingPath)); assertEquals(false, gcsUtil.bucketAccessible(nonExistentPath)); assertEquals(false, gcsUtil.bucketAccessible(forbiddenPath)); } From b6a0aea01a910795471f29360e07bfa4e34dc3b5 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 5 Feb 2026 21:10:06 -0500 Subject: [PATCH 09/13] Change returned value of listBlobs. Migrate expand(). --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 6 +- .../sdk/extensions/gcp/util/GcsUtilV2.java | 61 +++++++++++++++++-- .../sdk/extensions/gcp/util/GcsUtilIT.java | 20 +++++- 3 files changed, 76 insertions(+), 11 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 43def6fd89bb..799aac228138 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -20,6 +20,7 @@ import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; +import com.google.api.gax.paging.Page; import com.google.api.services.storage.Storage; import com.google.api.services.storage.model.Bucket; import com.google.api.services.storage.model.Objects; @@ -170,6 +171,7 @@ protected void setBatchRequestSupplier(Supplier suppli } public List expand(GcsPath gcsPattern) throws IOException { + if (delegateV2 != null) return delegateV2.expand(gcsPattern); return delegate.expand(gcsPattern); } @@ -224,13 +226,13 @@ public Objects listObjects( return delegate.listObjects(bucket, prefix, pageToken, delimiter); } - public List listBlobs(String bucket, String prefix, @Nullable String pageToken) + public Page listBlobs(String bucket, String prefix, @Nullable String pageToken) throws IOException { if (delegateV2 != null) return delegateV2.listBlobs(bucket, prefix, pageToken); throw new IOException("GcsUtil2 not initialized."); } - public List listBlobs( + public Page listBlobs( String bucket, String prefix, @Nullable String pageToken, @Nullable String delimiter) throws IOException { if (delegateV2 != null) return delegateV2.listBlobs(bucket, prefix, pageToken, delimiter); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index fd0a93d7e1a8..0ce35c9af377 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.extensions.gcp.util; +import static org.apache.beam.sdk.io.FileSystemUtils.wildcardToRegexp; + import com.google.api.gax.paging.Page; import com.google.cloud.storage.Blob; import com.google.cloud.storage.Bucket; @@ -34,14 +36,17 @@ import java.nio.file.FileAlreadyExistsException; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; +import java.util.regex.Pattern; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; class GcsUtilV2 { + private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(GcsUtilV2.class); + public static class GcsUtilFactory implements DefaultValueFactory { @Override public GcsUtilV2 create(PipelineOptions options) { @@ -96,7 +101,7 @@ public long fileSize(GcsPath gcsPath) throws IOException { } /** Lists {@link Blob}s given the {@code bucket}, {@code prefix}, {@code pageToken}. */ - public List listBlobs( + public Page listBlobs( String bucket, String prefix, @Nullable String pageToken, @Nullable String delimiter) throws IOException { List options = new ArrayList<>(); @@ -111,16 +116,60 @@ public List listBlobs( options.add(BlobListOption.delimiter(delimiter)); } - Page blobs = storage.list(bucket, options.toArray(new BlobListOption[0])); - List blobList = blobs.streamValues().collect(Collectors.toList()); - return blobList; + return storage.list(bucket, options.toArray(new BlobListOption[0])); } - public List listBlobs(String bucket, String prefix, @Nullable String pageToken) + public Page listBlobs(String bucket, String prefix, @Nullable String pageToken) throws IOException { return listBlobs(bucket, prefix, pageToken, null); } + /** + * Expands a pattern into matched paths. The pattern path may contain globs, which are expanded in + * the result. For patterns that only match a single object, we ensure that the object exists. + */ + public List expand(GcsPath gcsPattern) throws IOException { + Pattern p = null; + String prefix = null; + if (GcsPath.isWildcard(gcsPattern)) { + // Part before the first wildcard character. + prefix = GcsPath.getNonWildcardPrefix(gcsPattern.getObject()); + p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject())); + } else { + // Not a wildcard. + try { + // Use a get request to fetch the metadata of the object, and ignore the return value. + // The request has strong global consistency. + getBlob(gcsPattern); + return ImmutableList.of(gcsPattern); + } catch (FileNotFoundException e) { + // If the path was not found, return an empty list. + return ImmutableList.of(); + } + } + + LOG.debug( + "matching files in bucket {}, prefix {} against pattern {}", + gcsPattern.getBucket(), + prefix, + p.toString()); + + String pageToken = null; + List results = new ArrayList<>(); + Page blobs = listBlobs(gcsPattern.getBucket(), prefix, pageToken); + + // Filter objects based on the regex. + for (Blob b : blobs.iterateAll()) { + String name = b.getName(); + // Skip directories, which end with a slash. + if (p.matcher(name).matches() && !name.endsWith("/")) { + LOG.debug("Matched object: {}", name); + results.add(GcsPath.fromComponents(b.getBucket(), b.getName())); + } + } + return results; + } + /** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */ public Bucket getBucket(GcsPath path, BucketGetOption... options) throws IOException { String bucketName = path.getBucket(); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java index f91195a95b76..b435eeedb009 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import com.google.api.gax.paging.Page; import com.google.api.services.storage.model.Bucket; import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; @@ -112,7 +113,8 @@ public void testGetObjectOrGetBlob() throws IOException { if (experiment.equals("use_gcsutil_v2")) { assertThrows(FileNotFoundException.class, () -> gcsUtil.getBlob(nonExistentPath)); - // For V2, we are returning AccessDeniedException (a subclass of IOException) for forbidden paths. + // For V2, we are returning AccessDeniedException (a subclass of IOException) for forbidden + // paths. assertThrows(AccessDeniedException.class, () -> gcsUtil.getBlob(forbiddenPath)); } else { assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(nonExistentPath)); @@ -127,8 +129,8 @@ public void testListObjectsOrListBlobs() throws IOException { List names; if (experiment.equals("use_gcsutil_v2")) { - List blobs = gcsUtil.listBlobs(bucket, prefix, null); - names = blobs.stream().map(blob -> blob.getName()).collect(Collectors.toList()); + Page blobs = gcsUtil.listBlobs(bucket, prefix, null); + names = blobs.streamAll().map(blob -> blob.getName()).collect(Collectors.toList()); } else { Objects objs = gcsUtil.listObjects(bucket, prefix, null); names = objs.getItems().stream().map(obj -> obj.getName()).collect(Collectors.toList()); @@ -137,6 +139,18 @@ public void testListObjectsOrListBlobs() throws IOException { Arrays.asList("shakespeare/kingrichardii.txt", "shakespeare/kingrichardiii.txt"), names); } + @Test + public void testExpand() throws IOException { + final GcsPath gcsPattern = GcsPath.fromUri("gs://apache-beam-samples/shakespeare/king*iii.txt"); + List paths = gcsUtil.expand(gcsPattern); + + assertEquals( + Arrays.asList( + GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinghenryviii.txt"), + GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardiii.txt")), + paths); + } + @Test public void testGetBucketOrGetBucketV2() throws IOException { final GcsPath existingPath = GcsPath.fromUri("gs://apache-beam-samples"); From b4355c6311ba118a7928cffab9422a70e02c1ae3 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 5 Feb 2026 21:15:21 -0500 Subject: [PATCH 10/13] Add deprecated annotation to some V1 apis. Refactor expand(). --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 6 +++++ .../sdk/extensions/gcp/util/GcsUtilV2.java | 22 ++++++++----------- .../extensions/gcp/util/gcsfs/GcsPath.java | 2 +- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 799aac228138..6ef2dda5fd8a 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -334,6 +334,8 @@ public long bucketOwner(GcsPath path) throws IOException { return delegate.bucketOwner(path); } + /** @deprecated use {@link #createBucket(BucketInfo)}. */ + @Deprecated public void createBucket(String projectId, Bucket bucket) throws IOException { delegate.createBucket(projectId, bucket); } @@ -346,6 +348,8 @@ public void createBucket(BucketInfo bucketInfo) throws IOException { } } + /** @deprecated use {@link #getBucketV2(GcsPath)}. */ + @Deprecated public @Nullable Bucket getBucket(GcsPath path) throws IOException { return delegate.getBucket(path); } @@ -355,6 +359,8 @@ public void createBucket(BucketInfo bucketInfo) throws IOException { throw new IOException("GcsUtil2 not initialized."); } + /** @deprecated use {@link #removeBucket(BucketInfo)}. */ + @Deprecated public void removeBucket(Bucket bucket) throws IOException { delegate.removeBucket(bucket); } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index 0ce35c9af377..e1f8ec2c18e5 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -129,14 +129,8 @@ public Page listBlobs(String bucket, String prefix, @Nullable String pageT * the result. For patterns that only match a single object, we ensure that the object exists. */ public List expand(GcsPath gcsPattern) throws IOException { - Pattern p = null; - String prefix = null; - if (GcsPath.isWildcard(gcsPattern)) { - // Part before the first wildcard character. - prefix = GcsPath.getNonWildcardPrefix(gcsPattern.getObject()); - p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject())); - } else { - // Not a wildcard. + // Handle Non-Wildcard Path + if (!GcsPath.isWildcard(gcsPattern)) { try { // Use a get request to fetch the metadata of the object, and ignore the return value. // The request has strong global consistency. @@ -148,20 +142,22 @@ public List expand(GcsPath gcsPattern) throws IOException { } } + // Handle Non-Wildcard Path + String prefix = GcsPath.getNonWildcardPrefix(gcsPattern.getObject()); + Pattern p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject())); + LOG.debug( "matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(), prefix, p.toString()); - String pageToken = null; List results = new ArrayList<>(); - Page blobs = listBlobs(gcsPattern.getBucket(), prefix, pageToken); - - // Filter objects based on the regex. + Page blobs = listBlobs(gcsPattern.getBucket(), prefix, null); + // Iterate through all elements page by page (lazily) for (Blob b : blobs.iterateAll()) { String name = b.getName(); - // Skip directories, which end with a slash. + // Filter objects based on the regex. Skip directories, which end with a slash. if (p.matcher(name).matches() && !name.endsWith("/")) { LOG.debug("Matched object: {}", name); results.add(GcsPath.fromComponents(b.getBucket(), b.getName())); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java index 6ee6d46597f7..745ff4d36302 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java @@ -609,7 +609,7 @@ private String bucketAndObject() { } } - /** Returns the prefix portion of the glob that doesn't contain wildcards. */ + /** Returns the prefix portion of the glob before the first wildcard character. */ public static String getNonWildcardPrefix(String globExp) { Matcher m = GLOB_PREFIX.matcher(globExp); checkArgument(m.matches(), String.format("Glob expression: [%s] is not expandable.", globExp)); From 1416c723f7e5e16b40c10e965097a1fb6e3b1411 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 5 Feb 2026 21:34:20 -0500 Subject: [PATCH 11/13] Add exception handling for listBlobs() --- .../org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index e1f8ec2c18e5..93b833b47bb5 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -116,7 +116,11 @@ public Page listBlobs( options.add(BlobListOption.delimiter(delimiter)); } - return storage.list(bucket, options.toArray(new BlobListOption[0])); + try { + return storage.list(bucket, options.toArray(new BlobListOption[0])); + } catch (StorageException e) { + throw translateStorageException(bucket, prefix, e); + } } public Page listBlobs(String bucket, String prefix, @Nullable String pageToken) From 0b0df583ce8a44f4591ccac0e654154ab071b8e5 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 6 Feb 2026 10:03:01 -0500 Subject: [PATCH 12/13] Add some more tests and comments. --- .../sdk/extensions/gcp/util/GcsUtilV2.java | 3 +- .../sdk/extensions/gcp/util/GcsUtilIT.java | 33 ++++++++++++++++--- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index 93b833b47bb5..a62c1af0429e 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -146,7 +146,8 @@ public List expand(GcsPath gcsPattern) throws IOException { } } - // Handle Non-Wildcard Path + // Handle Wildcard Path + // TODO: check out BlobListOption.matchGlob() for a similar function. String prefix = GcsPath.getNonWildcardPrefix(gcsPattern.getObject()); Pattern p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject())); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java index b435eeedb009..9830491d2237 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java @@ -137,18 +137,36 @@ public void testListObjectsOrListBlobs() throws IOException { } assertEquals( Arrays.asList("shakespeare/kingrichardii.txt", "shakespeare/kingrichardiii.txt"), names); + + final String randomPrefix = "my-random-prefix/random"; + if (experiment.equals("use_gcsutil_v2")) { + Page blobs = gcsUtil.listBlobs(bucket, randomPrefix, null); + assertEquals(0, blobs.streamAll().count()); + } else { + Objects objs = gcsUtil.listObjects(bucket, randomPrefix, null); + assertEquals(null, objs.getItems()); + } } @Test public void testExpand() throws IOException { - final GcsPath gcsPattern = GcsPath.fromUri("gs://apache-beam-samples/shakespeare/king*iii.txt"); - List paths = gcsUtil.expand(gcsPattern); + final GcsPath existingPattern = + GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii*.txt"); + List paths = gcsUtil.expand(existingPattern); assertEquals( Arrays.asList( - GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinghenryviii.txt"), + GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii.txt"), GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardiii.txt")), paths); + + final GcsPath nonExistentPattern1 = + GcsPath.fromUri("gs://apache-beam-samples/my_random_folder/random*.txt"); + assertTrue(gcsUtil.expand(nonExistentPattern1).isEmpty()); + + final GcsPath nonExistentPattern2 = + GcsPath.fromUri("gs://apache-beam-samples/shakespeare/king*.csv"); + assertTrue(gcsUtil.expand(nonExistentPattern2).isEmpty()); } @Test @@ -188,9 +206,14 @@ public void testBucketAccessible() throws IOException { @Test public void testBucketOwner() throws IOException { - final GcsPath gcsPath = GcsPath.fromUri("gs://apache-beam-samples"); + final GcsPath existingPath = GcsPath.fromUri("gs://apache-beam-samples"); final long expectedProjectNumber = 844138762903L; // apache-beam-testing - assertEquals(expectedProjectNumber, gcsUtil.bucketOwner(gcsPath)); + assertEquals(expectedProjectNumber, gcsUtil.bucketOwner(existingPath)); + + final GcsPath nonExistentPath = GcsPath.fromUri("gs://my-random-test-bucket-12345"); + final GcsPath forbiddenPath = GcsPath.fromUri("gs://test-bucket"); + assertThrows(FileNotFoundException.class, () -> gcsUtil.bucketOwner(nonExistentPath)); + assertThrows(AccessDeniedException.class, () -> gcsUtil.bucketOwner(forbiddenPath)); } @Test From 6c8eef767791052a2c8553620eab2e4e7c8b65fc Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 6 Feb 2026 10:35:20 -0500 Subject: [PATCH 13/13] Fetch only the required field to minimize data transfer. --- .../sdk/extensions/gcp/util/GcsUtilV2.java | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index a62c1af0429e..c3244b087192 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -24,6 +24,7 @@ import com.google.cloud.storage.Bucket; import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobField; import com.google.cloud.storage.Storage.BlobGetOption; import com.google.cloud.storage.Storage.BlobListOption; import com.google.cloud.storage.Storage.BucketField; @@ -97,12 +98,16 @@ public Blob getBlob(GcsPath gcsPath, BlobGetOption... blobGetOptions) throws IOE } public long fileSize(GcsPath gcsPath) throws IOException { - return getBlob(gcsPath).getSize(); + return getBlob(gcsPath, BlobGetOption.fields(BlobField.SIZE)).getSize(); } /** Lists {@link Blob}s given the {@code bucket}, {@code prefix}, {@code pageToken}. */ public Page listBlobs( - String bucket, String prefix, @Nullable String pageToken, @Nullable String delimiter) + String bucket, + String prefix, + @Nullable String pageToken, + @Nullable String delimiter, + BlobListOption... extraOptions) throws IOException { List options = new ArrayList<>(); options.add(BlobListOption.pageSize(MAX_LIST_BLOBS_PER_CALL)); @@ -115,6 +120,11 @@ public Page listBlobs( if (delimiter != null) { options.add(BlobListOption.delimiter(delimiter)); } + if (extraOptions != null && extraOptions.length > 0) { + for (BlobListOption option : extraOptions) { + options.add(option); + } + } try { return storage.list(bucket, options.toArray(new BlobListOption[0])); @@ -123,9 +133,10 @@ public Page listBlobs( } } - public Page listBlobs(String bucket, String prefix, @Nullable String pageToken) + public Page listBlobs( + String bucket, String prefix, @Nullable String pageToken, BlobListOption... extraOptions) throws IOException { - return listBlobs(bucket, prefix, pageToken, null); + return listBlobs(bucket, prefix, pageToken, null, extraOptions); } /** @@ -138,7 +149,7 @@ public List expand(GcsPath gcsPattern) throws IOException { try { // Use a get request to fetch the metadata of the object, and ignore the return value. // The request has strong global consistency. - getBlob(gcsPattern); + getBlob(gcsPattern, BlobGetOption.fields(BlobField.NAME)); return ImmutableList.of(gcsPattern); } catch (FileNotFoundException e) { // If the path was not found, return an empty list. @@ -158,7 +169,12 @@ public List expand(GcsPath gcsPattern) throws IOException { p.toString()); List results = new ArrayList<>(); - Page blobs = listBlobs(gcsPattern.getBucket(), prefix, null); + Page blobs = + listBlobs( + gcsPattern.getBucket(), + prefix, + null, + BlobListOption.fields(BlobField.NAME, BlobField.BUCKET)); // Iterate through all elements page by page (lazily) for (Blob b : blobs.iterateAll()) { String name = b.getName();