Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.gax.paging.Page;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.auth.Credentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BucketInfo;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.nio.channels.SeekableByteChannel;
Expand All @@ -44,6 +47,7 @@

public class GcsUtil {
@VisibleForTesting GcsUtilV1 delegate;
@VisibleForTesting @Nullable GcsUtilV2 delegateV2;

public static class GcsCountersOptions {
final GcsUtilV1.GcsCountersOptions delegate;
Expand Down Expand Up @@ -91,16 +95,17 @@ public GcsUtil create(PipelineOptions options) {
gcsOptions.getEnableBucketWriteMetricCounter()
? gcsOptions.getGcsWriteCounterPrefix()
: null),
gcsOptions);
gcsOptions,
ExperimentalOptions.hasExperiment(options, "use_gcsutil_v2"));
}
}

public static String getNonWildcardPrefix(String globExp) {
return GcsUtilV1.getNonWildcardPrefix(globExp);
return GcsPath.getNonWildcardPrefix(globExp);
}

public static boolean isWildcard(GcsPath spec) {
return GcsUtilV1.isWildcard(spec);
return GcsPath.isWildcard(spec);
}

@VisibleForTesting
Expand All @@ -125,6 +130,36 @@ public static boolean isWildcard(GcsPath spec) {
rewriteDataOpBatchLimit,
gcsCountersOptions.delegate,
gcsOptions);
this.delegateV2 = null;
}

@VisibleForTesting
GcsUtil(
Storage storageClient,
HttpRequestInitializer httpRequestInitializer,
ExecutorService executorService,
Boolean shouldUseGrpc,
Credentials credentials,
@Nullable Integer uploadBufferSizeBytes,
@Nullable Integer rewriteDataOpBatchLimit,
GcsCountersOptions gcsCountersOptions,
GcsOptions gcsOptions,
Boolean shouldUseV2) {
this.delegate =
new GcsUtilV1(
storageClient,
httpRequestInitializer,
executorService,
shouldUseGrpc,
credentials,
uploadBufferSizeBytes,
rewriteDataOpBatchLimit,
gcsCountersOptions.delegate,
gcsOptions);

if (shouldUseV2) {
this.delegateV2 = new GcsUtilV2(gcsOptions);
}
}

protected void setStorageClient(Storage storageClient) {
Expand All @@ -136,6 +171,7 @@ protected void setBatchRequestSupplier(Supplier<GcsUtilV1.BatchInterface> suppli
}

public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
if (delegateV2 != null) return delegateV2.expand(gcsPattern);
return delegate.expand(gcsPattern);
}

Expand All @@ -146,36 +182,63 @@ Integer getUploadBufferSizeBytes() {
}

public long fileSize(GcsPath path) throws IOException {
if (delegateV2 != null) return delegateV2.fileSize(path);
return delegate.fileSize(path);
}

/** @deprecated use {@link #getBlob(GcsPath)}. */
@Deprecated
public StorageObject getObject(GcsPath gcsPath) throws IOException {
return delegate.getObject(gcsPath);
}

/** @deprecated use {@link #getBlob(GcsPath)}. */
@Deprecated
@VisibleForTesting
StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException {
return delegate.getObject(gcsPath, backoff, sleeper);
}

public Blob getBlob(GcsPath gcsPath) throws IOException {
if (delegateV2 != null) return delegateV2.getBlob(gcsPath);
throw new IOException("GcsUtil2 not initialized.");
}

public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths) throws IOException {
List<GcsUtilV1.StorageObjectOrIOException> legacy = delegate.getObjects(gcsPaths);
return legacy.stream()
.map(StorageObjectOrIOException::fromLegacy)
.collect(java.util.stream.Collectors.toList());
}

/** @deprecated use {@link #listBlobs(String, String, String)}. */
@Deprecated
public Objects listObjects(String bucket, String prefix, @Nullable String pageToken)
throws IOException {
return delegate.listObjects(bucket, prefix, pageToken);
}

/** @deprecated use {@link #listBlobs(String, String, String, String)}. */
@Deprecated
public Objects listObjects(
String bucket, String prefix, @Nullable String pageToken, @Nullable String delimiter)
throws IOException {
return delegate.listObjects(bucket, prefix, pageToken, delimiter);
}

public Page<Blob> listBlobs(String bucket, String prefix, @Nullable String pageToken)
throws IOException {
if (delegateV2 != null) return delegateV2.listBlobs(bucket, prefix, pageToken);
throw new IOException("GcsUtil2 not initialized.");
}

public Page<Blob> listBlobs(
String bucket, String prefix, @Nullable String pageToken, @Nullable String delimiter)
throws IOException {
if (delegateV2 != null) return delegateV2.listBlobs(bucket, prefix, pageToken, delimiter);
throw new IOException("GcsUtil2 not initialized.");
}

@VisibleForTesting
List<Long> fileSizes(List<GcsPath> paths) throws IOException {
return delegate.fileSizes(paths);
Expand Down Expand Up @@ -254,29 +317,62 @@ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IO
}

public void verifyBucketAccessible(GcsPath path) throws IOException {
if (delegateV2 != null) {
delegateV2.verifyBucketAccessible(path);
return;
}
delegate.verifyBucketAccessible(path);
}

public boolean bucketAccessible(GcsPath path) throws IOException {
if (delegateV2 != null) return delegateV2.bucketAccessible(path);
return delegate.bucketAccessible(path);
}

public long bucketOwner(GcsPath path) throws IOException {
if (delegateV2 != null) return delegateV2.bucketProject(path);
return delegate.bucketOwner(path);
}

/** @deprecated use {@link #createBucket(BucketInfo)}. */
@Deprecated
public void createBucket(String projectId, Bucket bucket) throws IOException {
delegate.createBucket(projectId, bucket);
}

public void createBucket(BucketInfo bucketInfo) throws IOException {
if (delegateV2 != null) {
delegateV2.createBucket(bucketInfo);
} else {
throw new IOException("GcsUtil2 not initialized.");
}
}

/** @deprecated use {@link #getBucketV2(GcsPath)}. */
@Deprecated
public @Nullable Bucket getBucket(GcsPath path) throws IOException {
return delegate.getBucket(path);
}

public com.google.cloud.storage.@Nullable Bucket getBucketV2(GcsPath path) throws IOException {
if (delegateV2 != null) return delegateV2.getBucket(path);
throw new IOException("GcsUtil2 not initialized.");
}

/** @deprecated use {@link #removeBucket(BucketInfo)}. */
@Deprecated
public void removeBucket(Bucket bucket) throws IOException {
delegate.removeBucket(bucket);
}

public void removeBucket(BucketInfo bucketInfo) throws IOException {
if (delegateV2 != null) {
delegateV2.removeBucket(bucketInfo);
} else {
throw new IOException("GcsUtil2 not initialized.");
}
}

@VisibleForTesting
boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
return delegate.bucketAccessible(path, backoff, sleeper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
Expand Down Expand Up @@ -105,7 +104,7 @@

/** Provides operations on GCS. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
"nullness" // For Creating AccessDeniedException with null.
})
class GcsUtilV1 {

Expand Down Expand Up @@ -165,9 +164,6 @@ public GcsUtilV1 create(PipelineOptions options) {
/** Maximum number of items to retrieve per Objects.List request. */
private static final long MAX_LIST_ITEMS_PER_CALL = 1024;

/** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */
private static final Pattern GLOB_PREFIX = Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*");

/** Maximum number of requests permitted in a GCS batch request. */
private static final int MAX_REQUESTS_PER_BATCH = 100;
/** Default maximum number of requests permitted in a GCS batch request where data is copied. */
Expand Down Expand Up @@ -224,18 +220,6 @@ public boolean shouldRetry(IOException e) {

@VisibleForTesting @Nullable AtomicInteger numRewriteTokensUsed;

/** Returns the prefix portion of the glob that doesn't contain wildcards. */
public static String getNonWildcardPrefix(String globExp) {
Matcher m = GLOB_PREFIX.matcher(globExp);
checkArgument(m.matches(), String.format("Glob expression: [%s] is not expandable.", globExp));
return m.group("PREFIX");
}

/** Returns true if the given {@code spec} contains wildcard. */
public static boolean isWildcard(GcsPath spec) {
return GLOB_PREFIX.matcher(spec.getObject()).matches();
}

@VisibleForTesting
GcsUtilV1(
Storage storageClient,
Expand Down Expand Up @@ -333,9 +317,9 @@ protected void setBatchRequestSupplier(Supplier<BatchInterface> supplier) {
public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
Pattern p = null;
String prefix = null;
if (isWildcard(gcsPattern)) {
if (GcsPath.isWildcard(gcsPattern)) {
// Part before the first wildcard character.
prefix = getNonWildcardPrefix(gcsPattern.getObject());
prefix = GcsPath.getNonWildcardPrefix(gcsPattern.getObject());
p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject()));
} else {
// Not a wildcard.
Expand Down
Loading
Loading