Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e560809
Migrate fileSize() and getObject()
shunping Feb 4, 2026
ace3ac8
Move getNonWildcardPrefix() and isWildcard() from GcsUtilV1 to GcsPath
shunping Feb 4, 2026
5c96d03
Migrate listObjects()
shunping Feb 4, 2026
e568f9c
Migrate getBucket(), bucketAccessible(), verifyBucketAccessible().
shunping Feb 4, 2026
15d8bcc
Add tests and extract setUp function.
shunping Feb 5, 2026
1f9168b
Migrate bucketOwner()
shunping Feb 5, 2026
8186a0a
Migrate createBucket() and removeBucket()
shunping Feb 5, 2026
e436313
Refactor exception handling.
shunping Feb 6, 2026
b6a0aea
Change returned value of listBlobs. Migrate expand().
shunping Feb 6, 2026
b4355c6
Add deprecated annotation to some V1 apis. Refactor expand().
shunping Feb 6, 2026
1416c72
Add exception handling for listBlobs()
shunping Feb 6, 2026
0b0df58
Add some more tests and comments.
shunping Feb 6, 2026
6c8eef7
Fetch only the required field to minimize data transfer.
shunping Feb 6, 2026
b412ff9
Migrate getObjects() and refactor some method arguments.
shunping Feb 6, 2026
5b36816
Refactor translateStorageException
shunping Feb 6, 2026
9825207
Control size of the batch for getBlobs.
shunping Feb 6, 2026
4e12b4b
Formatting
shunping Feb 6, 2026
7e5f4fd
Fix GcpCoreApiSurfaceTest
shunping Feb 6, 2026
4a1ec1e
Add java deps.
shunping Feb 9, 2026
1889894
Change getBucketV2 to getBucketWithOptions. Fix GcpCoreApiSurfaceTest.
shunping Feb 9, 2026
36e640d
Merge branch 'master' into gcs-migration-2
shunping Feb 9, 2026
3bb5786
Put the new IT tests into a different file.
shunping Feb 9, 2026
2b2ef61
Revise according to gemini review.
shunping Feb 9, 2026
bf17160
Change argument from List to Iterable in getBlobs()
shunping Feb 10, 2026
6556d45
Remove redundant comments.
shunping Feb 10, 2026
d4b24bd
Minor change.
shunping Feb 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_cloud_spanner_bom : "com.google.cloud:google-cloud-spanner-bom:$google_cloud_spanner_version",
google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version
google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests",
google_cloud_storage : "com.google.cloud:google-cloud-storage", // google_cloud_platform_libraries_bom sets version
google_cloud_tink : "com.google.crypto.tink:tink:1.19.0",
google_cloud_vertexai : "com.google.cloud:google-cloud-vertexai", // google_cloud_platform_libraries_bom sets version
google_code_gson : "com.google.code.gson:gson:$google_code_gson_version",
Expand Down
3 changes: 3 additions & 0 deletions sdks/java/extensions/google-cloud-platform-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ dependencies {
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":runners:core-java")
implementation library.java.gax
implementation library.java.google_http_client_gson
implementation library.java.google_auth_library_oauth2_http
implementation library.java.google_api_client
implementation library.java.google_cloud_core
implementation library.java.google_cloud_storage
implementation library.java.bigdataoss_gcsio
implementation library.java.bigdataoss_util
implementation library.java.google_api_services_cloudresourcemanager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.gax.paging.Page;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.auth.Credentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage.BlobGetOption;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.cloud.storage.Storage.BucketGetOption;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.nio.channels.SeekableByteChannel;
Expand All @@ -34,6 +40,7 @@
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.BlobOrIOException;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.options.DefaultValueFactory;
Expand All @@ -44,6 +51,7 @@

public class GcsUtil {
@VisibleForTesting GcsUtilV1 delegate;
@VisibleForTesting @Nullable GcsUtilV2 delegateV2;

public static class GcsCountersOptions {
final GcsUtilV1.GcsCountersOptions delegate;
Expand Down Expand Up @@ -91,16 +99,17 @@ public GcsUtil create(PipelineOptions options) {
gcsOptions.getEnableBucketWriteMetricCounter()
? gcsOptions.getGcsWriteCounterPrefix()
: null),
gcsOptions);
gcsOptions,
ExperimentalOptions.hasExperiment(options, "use_gcsutil_v2"));
}
}

public static String getNonWildcardPrefix(String globExp) {
return GcsUtilV1.getNonWildcardPrefix(globExp);
return GcsPath.getNonWildcardPrefix(globExp);
}

public static boolean isWildcard(GcsPath spec) {
return GcsUtilV1.isWildcard(spec);
return GcsPath.isWildcard(spec);
}

@VisibleForTesting
Expand All @@ -125,6 +134,36 @@ public static boolean isWildcard(GcsPath spec) {
rewriteDataOpBatchLimit,
gcsCountersOptions.delegate,
gcsOptions);
this.delegateV2 = null;
}

@VisibleForTesting
GcsUtil(
Storage storageClient,
HttpRequestInitializer httpRequestInitializer,
ExecutorService executorService,
Boolean shouldUseGrpc,
Credentials credentials,
@Nullable Integer uploadBufferSizeBytes,
@Nullable Integer rewriteDataOpBatchLimit,
GcsCountersOptions gcsCountersOptions,
GcsOptions gcsOptions,
Boolean shouldUseV2) {
this.delegate =
new GcsUtilV1(
storageClient,
httpRequestInitializer,
executorService,
shouldUseGrpc,
credentials,
uploadBufferSizeBytes,
rewriteDataOpBatchLimit,
gcsCountersOptions.delegate,
gcsOptions);

if (shouldUseV2) {
this.delegateV2 = new GcsUtilV2(gcsOptions);
}
}

protected void setStorageClient(Storage storageClient) {
Expand All @@ -136,6 +175,9 @@ protected void setBatchRequestSupplier(Supplier<GcsUtilV1.BatchInterface> suppli
}

public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
if (delegateV2 != null) {
return delegateV2.expand(gcsPattern);
}
return delegate.expand(gcsPattern);
}

Expand All @@ -146,36 +188,86 @@ Integer getUploadBufferSizeBytes() {
}

public long fileSize(GcsPath path) throws IOException {
if (delegateV2 != null) {
return delegateV2.fileSize(path);
}
return delegate.fileSize(path);
}

/** @deprecated use {@link #getBlob(GcsPath)}. */
@Deprecated
public StorageObject getObject(GcsPath gcsPath) throws IOException {
return delegate.getObject(gcsPath);
}

/** @deprecated use {@link #getBlob(GcsPath, BlobGetOption...)}. */
@Deprecated
@VisibleForTesting
StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException {
return delegate.getObject(gcsPath, backoff, sleeper);
}

public Blob getBlob(GcsPath gcsPath, BlobGetOption... options) throws IOException {
if (delegateV2 != null) {
return delegateV2.getBlob(gcsPath, options);
}
throw new IOException("GcsUtil V2 not initialized.");
}

/** @deprecated use {@link #getBlobs(Iterable, BlobGetOption...)}. */
@Deprecated
public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths) throws IOException {
List<GcsUtilV1.StorageObjectOrIOException> legacy = delegate.getObjects(gcsPaths);
return legacy.stream()
.map(StorageObjectOrIOException::fromLegacy)
.collect(java.util.stream.Collectors.toList());
}

public List<BlobOrIOException> getBlobs(Iterable<GcsPath> gcsPaths, BlobGetOption... options)
throws IOException {
if (delegateV2 != null) {
return delegateV2.getBlobs(gcsPaths, options);
}
throw new IOException("GcsUtil V2 not initialized.");
}

/** @deprecated use {@link #listBlobs(String, String, String, BlobListOption...)}. */
@Deprecated
public Objects listObjects(String bucket, String prefix, @Nullable String pageToken)
throws IOException {
return delegate.listObjects(bucket, prefix, pageToken);
}

/** @deprecated use {@link #listBlobs(String, String, String, String, BlobListOption...)}. */
@Deprecated
public Objects listObjects(
String bucket, String prefix, @Nullable String pageToken, @Nullable String delimiter)
throws IOException {
return delegate.listObjects(bucket, prefix, pageToken, delimiter);
}

public Page<Blob> listBlobs(
String bucket, String prefix, @Nullable String pageToken, BlobListOption... options)
throws IOException {
if (delegateV2 != null) {
return delegateV2.listBlobs(bucket, prefix, pageToken, options);
}
throw new IOException("GcsUtil V2 not initialized.");
}

public Page<Blob> listBlobs(
String bucket,
String prefix,
@Nullable String pageToken,
@Nullable String delimiter,
BlobListOption... options)
throws IOException {
if (delegateV2 != null) {
return delegateV2.listBlobs(bucket, prefix, pageToken, delimiter, options);
}
throw new IOException("GcsUtil V2 not initialized.");
}

@VisibleForTesting
List<Long> fileSizes(List<GcsPath> paths) throws IOException {
return delegate.fileSizes(paths);
Expand Down Expand Up @@ -254,29 +346,69 @@ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IO
}

public void verifyBucketAccessible(GcsPath path) throws IOException {
if (delegateV2 != null) {
delegateV2.verifyBucketAccessible(path);
return;
}
delegate.verifyBucketAccessible(path);
}

public boolean bucketAccessible(GcsPath path) throws IOException {
if (delegateV2 != null) {
return delegateV2.bucketAccessible(path);
}
return delegate.bucketAccessible(path);
}

public long bucketOwner(GcsPath path) throws IOException {
if (delegateV2 != null) {
return delegateV2.bucketProject(path);
}
return delegate.bucketOwner(path);
}

/** @deprecated use {@link #createBucket(BucketInfo)}. */
@Deprecated
public void createBucket(String projectId, Bucket bucket) throws IOException {
delegate.createBucket(projectId, bucket);
}

public void createBucket(BucketInfo bucketInfo) throws IOException {
if (delegateV2 != null) {
delegateV2.createBucket(bucketInfo);
} else {
throw new IOException("GcsUtil V2 not initialized.");
}
}

/** @deprecated use {@link #getBucketWithOptions(GcsPath, BucketGetOption...)} . */
@Deprecated
public @Nullable Bucket getBucket(GcsPath path) throws IOException {
return delegate.getBucket(path);
}

public com.google.cloud.storage.@Nullable Bucket getBucketWithOptions(
GcsPath path, BucketGetOption... options) throws IOException {
if (delegateV2 != null) {
return delegateV2.getBucket(path, options);
}
throw new IOException("GcsUtil V2 not initialized.");
}

/** @deprecated use {@link #removeBucket(BucketInfo)}. */
@Deprecated
public void removeBucket(Bucket bucket) throws IOException {
delegate.removeBucket(bucket);
}

public void removeBucket(BucketInfo bucketInfo) throws IOException {
if (delegateV2 != null) {
delegateV2.removeBucket(bucketInfo);
} else {
throw new IOException("GcsUtil V2 not initialized.");
}
}

@VisibleForTesting
boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
return delegate.bucketAccessible(path, backoff, sleeper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
Expand Down Expand Up @@ -165,9 +164,6 @@ public GcsUtilV1 create(PipelineOptions options) {
/** Maximum number of items to retrieve per Objects.List request. */
private static final long MAX_LIST_ITEMS_PER_CALL = 1024;

/** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */
private static final Pattern GLOB_PREFIX = Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*");

/** Maximum number of requests permitted in a GCS batch request. */
private static final int MAX_REQUESTS_PER_BATCH = 100;
/** Default maximum number of requests permitted in a GCS batch request where data is copied. */
Expand Down Expand Up @@ -224,18 +220,6 @@ public boolean shouldRetry(IOException e) {

@VisibleForTesting @Nullable AtomicInteger numRewriteTokensUsed;

/** Returns the prefix portion of the glob that doesn't contain wildcards. */
public static String getNonWildcardPrefix(String globExp) {
Matcher m = GLOB_PREFIX.matcher(globExp);
checkArgument(m.matches(), String.format("Glob expression: [%s] is not expandable.", globExp));
return m.group("PREFIX");
}

/** Returns true if the given {@code spec} contains wildcard. */
public static boolean isWildcard(GcsPath spec) {
return GLOB_PREFIX.matcher(spec.getObject()).matches();
}

@VisibleForTesting
GcsUtilV1(
Storage storageClient,
Expand Down Expand Up @@ -333,9 +317,9 @@ protected void setBatchRequestSupplier(Supplier<BatchInterface> supplier) {
public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
Pattern p = null;
String prefix = null;
if (isWildcard(gcsPattern)) {
if (GcsPath.isWildcard(gcsPattern)) {
// Part before the first wildcard character.
prefix = getNonWildcardPrefix(gcsPattern.getObject());
prefix = GcsPath.getNonWildcardPrefix(gcsPattern.getObject());
p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject()));
} else {
// Not a wildcard.
Expand Down
Loading
Loading