diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 220c08c6a7f3..6ef2dda5fd8a 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -20,11 +20,14 @@ import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; +import com.google.api.gax.paging.Page; import com.google.api.services.storage.Storage; import com.google.api.services.storage.model.Bucket; import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; import com.google.auth.Credentials; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BucketInfo; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.nio.channels.SeekableByteChannel; @@ -44,6 +47,7 @@ public class GcsUtil { @VisibleForTesting GcsUtilV1 delegate; + @VisibleForTesting @Nullable GcsUtilV2 delegateV2; public static class GcsCountersOptions { final GcsUtilV1.GcsCountersOptions delegate; @@ -91,16 +95,17 @@ public GcsUtil create(PipelineOptions options) { gcsOptions.getEnableBucketWriteMetricCounter() ? gcsOptions.getGcsWriteCounterPrefix() : null), - gcsOptions); + gcsOptions, + ExperimentalOptions.hasExperiment(options, "use_gcsutil_v2")); } } public static String getNonWildcardPrefix(String globExp) { - return GcsUtilV1.getNonWildcardPrefix(globExp); + return GcsPath.getNonWildcardPrefix(globExp); } public static boolean isWildcard(GcsPath spec) { - return GcsUtilV1.isWildcard(spec); + return GcsPath.isWildcard(spec); } @VisibleForTesting @@ -125,6 +130,36 @@ public static boolean isWildcard(GcsPath spec) { rewriteDataOpBatchLimit, gcsCountersOptions.delegate, gcsOptions); + this.delegateV2 = null; + } + + @VisibleForTesting + GcsUtil( + Storage storageClient, + HttpRequestInitializer httpRequestInitializer, + ExecutorService executorService, + Boolean shouldUseGrpc, + Credentials credentials, + @Nullable Integer uploadBufferSizeBytes, + @Nullable Integer rewriteDataOpBatchLimit, + GcsCountersOptions gcsCountersOptions, + GcsOptions gcsOptions, + Boolean shouldUseV2) { + this.delegate = + new GcsUtilV1( + storageClient, + httpRequestInitializer, + executorService, + shouldUseGrpc, + credentials, + uploadBufferSizeBytes, + rewriteDataOpBatchLimit, + gcsCountersOptions.delegate, + gcsOptions); + + if (shouldUseV2) { + this.delegateV2 = new GcsUtilV2(gcsOptions); + } } protected void setStorageClient(Storage storageClient) { @@ -136,6 +171,7 @@ protected void setBatchRequestSupplier(Supplier suppli } public List expand(GcsPath gcsPattern) throws IOException { + if (delegateV2 != null) return delegateV2.expand(gcsPattern); return delegate.expand(gcsPattern); } @@ -146,18 +182,28 @@ Integer getUploadBufferSizeBytes() { } public long fileSize(GcsPath path) throws IOException { + if (delegateV2 != null) return delegateV2.fileSize(path); return delegate.fileSize(path); } + /** @deprecated use {@link #getBlob(GcsPath)}. */ + @Deprecated public StorageObject getObject(GcsPath gcsPath) throws IOException { return delegate.getObject(gcsPath); } + /** @deprecated use {@link #getBlob(GcsPath)}. */ + @Deprecated @VisibleForTesting StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException { return delegate.getObject(gcsPath, backoff, sleeper); } + public Blob getBlob(GcsPath gcsPath) throws IOException { + if (delegateV2 != null) return delegateV2.getBlob(gcsPath); + throw new IOException("GcsUtil2 not initialized."); + } + public List getObjects(List gcsPaths) throws IOException { List legacy = delegate.getObjects(gcsPaths); return legacy.stream() @@ -165,17 +211,34 @@ public List getObjects(List gcsPaths) throw .collect(java.util.stream.Collectors.toList()); } + /** @deprecated use {@link #listBlobs(String, String, String)}. */ + @Deprecated public Objects listObjects(String bucket, String prefix, @Nullable String pageToken) throws IOException { return delegate.listObjects(bucket, prefix, pageToken); } + /** @deprecated use {@link #listBlobs(String, String, String, String)}. */ + @Deprecated public Objects listObjects( String bucket, String prefix, @Nullable String pageToken, @Nullable String delimiter) throws IOException { return delegate.listObjects(bucket, prefix, pageToken, delimiter); } + public Page listBlobs(String bucket, String prefix, @Nullable String pageToken) + throws IOException { + if (delegateV2 != null) return delegateV2.listBlobs(bucket, prefix, pageToken); + throw new IOException("GcsUtil2 not initialized."); + } + + public Page listBlobs( + String bucket, String prefix, @Nullable String pageToken, @Nullable String delimiter) + throws IOException { + if (delegateV2 != null) return delegateV2.listBlobs(bucket, prefix, pageToken, delimiter); + throw new IOException("GcsUtil2 not initialized."); + } + @VisibleForTesting List fileSizes(List paths) throws IOException { return delegate.fileSizes(paths); @@ -254,29 +317,62 @@ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IO } public void verifyBucketAccessible(GcsPath path) throws IOException { + if (delegateV2 != null) { + delegateV2.verifyBucketAccessible(path); + return; + } delegate.verifyBucketAccessible(path); } public boolean bucketAccessible(GcsPath path) throws IOException { + if (delegateV2 != null) return delegateV2.bucketAccessible(path); return delegate.bucketAccessible(path); } public long bucketOwner(GcsPath path) throws IOException { + if (delegateV2 != null) return delegateV2.bucketProject(path); return delegate.bucketOwner(path); } + /** @deprecated use {@link #createBucket(BucketInfo)}. */ + @Deprecated public void createBucket(String projectId, Bucket bucket) throws IOException { delegate.createBucket(projectId, bucket); } + public void createBucket(BucketInfo bucketInfo) throws IOException { + if (delegateV2 != null) { + delegateV2.createBucket(bucketInfo); + } else { + throw new IOException("GcsUtil2 not initialized."); + } + } + + /** @deprecated use {@link #getBucketV2(GcsPath)}. */ + @Deprecated public @Nullable Bucket getBucket(GcsPath path) throws IOException { return delegate.getBucket(path); } + public com.google.cloud.storage.@Nullable Bucket getBucketV2(GcsPath path) throws IOException { + if (delegateV2 != null) return delegateV2.getBucket(path); + throw new IOException("GcsUtil2 not initialized."); + } + + /** @deprecated use {@link #removeBucket(BucketInfo)}. */ + @Deprecated public void removeBucket(Bucket bucket) throws IOException { delegate.removeBucket(bucket); } + public void removeBucket(BucketInfo bucketInfo) throws IOException { + if (delegateV2 != null) { + delegateV2.removeBucket(bucketInfo); + } else { + throw new IOException("GcsUtil2 not initialized."); + } + } + @VisibleForTesting boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { return delegate.bucketAccessible(path, backoff, sleeper); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java index c44eb36c2636..a245f7d3f3f1 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java @@ -76,7 +76,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; -import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers; import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; @@ -105,7 +104,7 @@ /** Provides operations on GCS. */ @SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) + "nullness" // For Creating AccessDeniedException with null. }) class GcsUtilV1 { @@ -165,9 +164,6 @@ public GcsUtilV1 create(PipelineOptions options) { /** Maximum number of items to retrieve per Objects.List request. */ private static final long MAX_LIST_ITEMS_PER_CALL = 1024; - /** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */ - private static final Pattern GLOB_PREFIX = Pattern.compile("(?[^\\[*?]*)[\\[*?].*"); - /** Maximum number of requests permitted in a GCS batch request. */ private static final int MAX_REQUESTS_PER_BATCH = 100; /** Default maximum number of requests permitted in a GCS batch request where data is copied. */ @@ -224,18 +220,6 @@ public boolean shouldRetry(IOException e) { @VisibleForTesting @Nullable AtomicInteger numRewriteTokensUsed; - /** Returns the prefix portion of the glob that doesn't contain wildcards. */ - public static String getNonWildcardPrefix(String globExp) { - Matcher m = GLOB_PREFIX.matcher(globExp); - checkArgument(m.matches(), String.format("Glob expression: [%s] is not expandable.", globExp)); - return m.group("PREFIX"); - } - - /** Returns true if the given {@code spec} contains wildcard. */ - public static boolean isWildcard(GcsPath spec) { - return GLOB_PREFIX.matcher(spec.getObject()).matches(); - } - @VisibleForTesting GcsUtilV1( Storage storageClient, @@ -333,9 +317,9 @@ protected void setBatchRequestSupplier(Supplier supplier) { public List expand(GcsPath gcsPattern) throws IOException { Pattern p = null; String prefix = null; - if (isWildcard(gcsPattern)) { + if (GcsPath.isWildcard(gcsPattern)) { // Part before the first wildcard character. - prefix = getNonWildcardPrefix(gcsPattern.getObject()); + prefix = GcsPath.getNonWildcardPrefix(gcsPattern.getObject()); p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject())); } else { // Not a wildcard. diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java new file mode 100644 index 000000000000..c3244b087192 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.util; + +import static org.apache.beam.sdk.io.FileSystemUtils.wildcardToRegexp; + +import com.google.api.gax.paging.Page; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Bucket; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobField; +import com.google.cloud.storage.Storage.BlobGetOption; +import com.google.cloud.storage.Storage.BlobListOption; +import com.google.cloud.storage.Storage.BucketField; +import com.google.cloud.storage.Storage.BucketGetOption; +import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.StorageOptions; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.AccessDeniedException; +import java.nio.file.FileAlreadyExistsException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.nullness.qual.Nullable; + +class GcsUtilV2 { + private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(GcsUtilV2.class); + + public static class GcsUtilFactory implements DefaultValueFactory { + @Override + public GcsUtilV2 create(PipelineOptions options) { + // GcsOptions gcsOptions = options.as(GcsOptions.class); + // Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions); + return new GcsUtilV2(options); + } + } + + private Storage storage; + + /** Maximum number of items to retrieve per Objects.List request. */ + private static final long MAX_LIST_BLOBS_PER_CALL = 1024; + + GcsUtilV2(PipelineOptions options) { + String projectId = options.as(GcpOptions.class).getProject(); + storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService(); + } + + @SuppressWarnings({ + "nullness" // For Creating AccessDeniedException and FileAlreadyExistsException with null. + }) + private IOException translateStorageException( + String bucketName, @Nullable String blobName, StorageException e) { + String path = "gs://" + bucketName + (blobName == null ? "" : "/" + blobName); + + switch (e.getCode()) { + case 403: + return new AccessDeniedException(path, null, e.getMessage()); + case 409: + return new FileAlreadyExistsException(path, null, e.getMessage()); + default: + return new IOException(e); + } + } + + public Blob getBlob(GcsPath gcsPath, BlobGetOption... blobGetOptions) throws IOException { + try { + Blob blob = storage.get(gcsPath.getBucket(), gcsPath.getObject(), blobGetOptions); + if (blob == null) { + throw new FileNotFoundException( + String.format("The specified file does not exist: %s", gcsPath.toString())); + } + return blob; + } catch (StorageException e) { + throw translateStorageException(gcsPath.getBucket(), gcsPath.getObject(), e); + } + } + + public long fileSize(GcsPath gcsPath) throws IOException { + return getBlob(gcsPath, BlobGetOption.fields(BlobField.SIZE)).getSize(); + } + + /** Lists {@link Blob}s given the {@code bucket}, {@code prefix}, {@code pageToken}. */ + public Page listBlobs( + String bucket, + String prefix, + @Nullable String pageToken, + @Nullable String delimiter, + BlobListOption... extraOptions) + throws IOException { + List options = new ArrayList<>(); + options.add(BlobListOption.pageSize(MAX_LIST_BLOBS_PER_CALL)); + if (pageToken != null) { + options.add(BlobListOption.pageToken(pageToken)); + } + if (prefix != null) { + options.add(BlobListOption.prefix(prefix)); + } + if (delimiter != null) { + options.add(BlobListOption.delimiter(delimiter)); + } + if (extraOptions != null && extraOptions.length > 0) { + for (BlobListOption option : extraOptions) { + options.add(option); + } + } + + try { + return storage.list(bucket, options.toArray(new BlobListOption[0])); + } catch (StorageException e) { + throw translateStorageException(bucket, prefix, e); + } + } + + public Page listBlobs( + String bucket, String prefix, @Nullable String pageToken, BlobListOption... extraOptions) + throws IOException { + return listBlobs(bucket, prefix, pageToken, null, extraOptions); + } + + /** + * Expands a pattern into matched paths. The pattern path may contain globs, which are expanded in + * the result. For patterns that only match a single object, we ensure that the object exists. + */ + public List expand(GcsPath gcsPattern) throws IOException { + // Handle Non-Wildcard Path + if (!GcsPath.isWildcard(gcsPattern)) { + try { + // Use a get request to fetch the metadata of the object, and ignore the return value. + // The request has strong global consistency. + getBlob(gcsPattern, BlobGetOption.fields(BlobField.NAME)); + return ImmutableList.of(gcsPattern); + } catch (FileNotFoundException e) { + // If the path was not found, return an empty list. + return ImmutableList.of(); + } + } + + // Handle Wildcard Path + // TODO: check out BlobListOption.matchGlob() for a similar function. + String prefix = GcsPath.getNonWildcardPrefix(gcsPattern.getObject()); + Pattern p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject())); + + LOG.debug( + "matching files in bucket {}, prefix {} against pattern {}", + gcsPattern.getBucket(), + prefix, + p.toString()); + + List results = new ArrayList<>(); + Page blobs = + listBlobs( + gcsPattern.getBucket(), + prefix, + null, + BlobListOption.fields(BlobField.NAME, BlobField.BUCKET)); + // Iterate through all elements page by page (lazily) + for (Blob b : blobs.iterateAll()) { + String name = b.getName(); + // Filter objects based on the regex. Skip directories, which end with a slash. + if (p.matcher(name).matches() && !name.endsWith("/")) { + LOG.debug("Matched object: {}", name); + results.add(GcsPath.fromComponents(b.getBucket(), b.getName())); + } + } + return results; + } + + /** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */ + public Bucket getBucket(GcsPath path, BucketGetOption... options) throws IOException { + String bucketName = path.getBucket(); + try { + Bucket bucket = storage.get(bucketName, options); + if (bucket == null) { + throw new FileNotFoundException( + String.format("The specified bucket does not exist: gs://%s", bucketName)); + } + return bucket; + } catch (StorageException e) { + throw translateStorageException(bucketName, null, e); + } + } + + /** Returns whether the GCS bucket exists and is accessible. */ + public boolean bucketAccessible(GcsPath path) { + try { + // Fetch only the name field to minimize data transfer + getBucket(path, BucketGetOption.fields(BucketField.NAME)); + return true; + } catch (IOException e) { + return false; + } + } + + /** + * Checks whether the GCS bucket exists. Similar to {@link #bucketAccessible(GcsPath)}, but throws + * exception if the bucket is inaccessible due to permissions or does not exist. + */ + public void verifyBucketAccessible(GcsPath path) throws IOException { + // Fetch only the name field to minimize data transfer + getBucket(path, BucketGetOption.fields(BucketField.NAME)); + } + + /** + * Returns the project number of the project which owns this bucket. If the bucket exists, it must + * be accessible otherwise the permissions exception will be propagated. If the bucket does not + * exist, an exception will be thrown. + */ + public long bucketProject(GcsPath path) throws IOException { + Bucket bucket = getBucket(path, BucketGetOption.fields(BucketField.PROJECT)); + return bucket.getProject().longValue(); + } + + public void createBucket(BucketInfo bucketInfo) throws IOException { + try { + storage.create(bucketInfo); + } catch (StorageException e) { + throw translateStorageException(bucketInfo.getName(), null, e); + } + } + + public void removeBucket(BucketInfo bucketInfo) throws IOException { + Bucket bucket = + getBucket( + GcsPath.fromComponents(bucketInfo.getName(), null), + BucketGetOption.fields(BucketField.NAME)); + + try { + bucket.delete(); + } catch (StorageException e) { + throw translateStorageException(bucketInfo.getName(), null, e); + } + } +} diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java index 29c312d7a5e7..745ff4d36302 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java @@ -129,6 +129,9 @@ public static GcsPath fromUri(String uri) { /** Pattern that is used to validate a GCS bucket name. */ private static final Pattern GCS_BUCKET_NAME = Pattern.compile("[a-z0-9][-_a-z0-9.]+[a-z0-9]"); + /** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */ + private static final Pattern GLOB_PREFIX = Pattern.compile("(?[^\\[*?]*)[\\[*?].*"); + /** Creates a GcsPath from a OnePlatform resource name in string form. */ public static GcsPath fromResourceName(String name) { Matcher m = GCS_RESOURCE_NAME.matcher(name); @@ -605,4 +608,16 @@ private String bucketAndObject() { return bucket + "/" + object; } } + + /** Returns the prefix portion of the glob before the first wildcard character. */ + public static String getNonWildcardPrefix(String globExp) { + Matcher m = GLOB_PREFIX.matcher(globExp); + checkArgument(m.matches(), String.format("Glob expression: [%s] is not expandable.", globExp)); + return m.group("PREFIX"); + } + + /** Returns true if the given {@code spec} contains wildcard. */ + public static boolean isWildcard(GcsPath spec) { + return GLOB_PREFIX.matcher(spec.getObject()).matches(); + } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java index b6c92ab9369d..9830491d2237 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java @@ -17,33 +17,38 @@ */ package org.apache.beam.sdk.extensions.gcp.util; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; -import com.google.protobuf.ByteString; +import com.google.api.gax.paging.Page; +import com.google.api.services.storage.model.Bucket; +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BucketInfo; +import java.io.FileNotFoundException; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; +import java.nio.file.AccessDeniedException; +import java.nio.file.FileAlreadyExistsException; +import java.util.Arrays; import java.util.Collections; -import java.util.Date; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.List; +import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; -import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.CreateOptions; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; -import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.testing.UsesKms; -import org.apache.beam.sdk.util.ByteStringOutputStream; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; /** * Integration tests for {@link GcsUtil}. These tests are designed to run against production Google @@ -52,91 +57,305 @@ *

This is a runnerless integration test, even though the Beam IT framework assumes one. Thus, * this test should only be run against single runner (such as DirectRunner). */ -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) @Category(UsesKms.class) public class GcsUtilIT { - /** Tests a rewrite operation that requires multiple API calls (using a continuation token). */ - @Test - public void testRewriteMultiPart() throws IOException { - TestPipelineOptions options = - TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); - // Using a KMS key is necessary to trigger multi-part rewrites (bucket is created - // with a bucket default key). - assertNotNull(options.getTempRoot()); - options.setTempLocation( - FileSystems.matchNewDirectory(options.getTempRoot(), "testRewriteMultiPart").toString()); + + @Parameters(name = "{0}") + public static Iterable data() { + return Arrays.asList("use_gcsutil_v1", "use_gcsutil_v2"); + } + + @Parameter public String experiment; + + private TestPipelineOptions options; + private GcsUtil gcsUtil; + + @Before + public void setUp() { + options = TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); + + // set the experimental flag. + ExperimentalOptions experimentalOptions = options.as(ExperimentalOptions.class); + experimentalOptions.setExperiments(Collections.singletonList(experiment)); GcsOptions gcsOptions = options.as(GcsOptions.class); - GcsUtil gcsUtil = gcsOptions.getGcsUtil(); - String srcFilename = "gs://dataflow-samples/wikipedia_edits/wiki_data-000000000000.json"; - String dstFilename = - gcsOptions.getGcpTempLocation() - + String.format( - "/GcsUtilIT-%tF-% gcsUtil.getBlob(nonExistentPath)); + // For V2, we are returning AccessDeniedException (a subclass of IOException) for forbidden + // paths. + assertThrows(AccessDeniedException.class, () -> gcsUtil.getBlob(forbiddenPath)); + } else { + assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(nonExistentPath)); + assertThrows(IOException.class, () -> gcsUtil.getObject(forbiddenPath)); + } + } - // Read the test file back and verify - assertEquals(testContent, readGcsTextFile(gcsUtil, filename)); + @Test + public void testListObjectsOrListBlobs() throws IOException { + final String bucket = "apache-beam-samples"; + final String prefix = "shakespeare/kingrichard"; - gcsUtil.remove(Collections.singletonList(filename)); + List names; + if (experiment.equals("use_gcsutil_v2")) { + Page blobs = gcsUtil.listBlobs(bucket, prefix, null); + names = blobs.streamAll().map(blob -> blob.getName()).collect(Collectors.toList()); + } else { + Objects objs = gcsUtil.listObjects(bucket, prefix, null); + names = objs.getItems().stream().map(obj -> obj.getName()).collect(Collectors.toList()); + } + assertEquals( + Arrays.asList("shakespeare/kingrichardii.txt", "shakespeare/kingrichardiii.txt"), names); + + final String randomPrefix = "my-random-prefix/random"; + if (experiment.equals("use_gcsutil_v2")) { + Page blobs = gcsUtil.listBlobs(bucket, randomPrefix, null); + assertEquals(0, blobs.streamAll().count()); + } else { + Objects objs = gcsUtil.listObjects(bucket, randomPrefix, null); + assertEquals(null, objs.getItems()); + } + } + + @Test + public void testExpand() throws IOException { + final GcsPath existingPattern = + GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii*.txt"); + List paths = gcsUtil.expand(existingPattern); + + assertEquals( + Arrays.asList( + GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii.txt"), + GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardiii.txt")), + paths); + + final GcsPath nonExistentPattern1 = + GcsPath.fromUri("gs://apache-beam-samples/my_random_folder/random*.txt"); + assertTrue(gcsUtil.expand(nonExistentPattern1).isEmpty()); + + final GcsPath nonExistentPattern2 = + GcsPath.fromUri("gs://apache-beam-samples/shakespeare/king*.csv"); + assertTrue(gcsUtil.expand(nonExistentPattern2).isEmpty()); } - void writeGcsTextFile(GcsUtil gcsUtil, String filename, String content) throws IOException { - GcsPath gcsPath = GcsPath.fromUri(filename); - try (WritableByteChannel channel = - gcsUtil.create( - gcsPath, CreateOptions.builder().setContentType("text/plain;charset=utf-8").build())) { - channel.write(ByteString.copyFromUtf8(content).asReadOnlyByteBuffer()); + @Test + public void testGetBucketOrGetBucketV2() throws IOException { + final GcsPath existingPath = GcsPath.fromUri("gs://apache-beam-samples"); + + String bucket; + if (experiment.equals("use_gcsutil_v2")) { + bucket = gcsUtil.getBucketV2(existingPath).getName(); + } else { + bucket = gcsUtil.getBucket(existingPath).getName(); + } + assertEquals("apache-beam-samples", bucket); + + final GcsPath nonExistentPath = GcsPath.fromUri("gs://my-random-test-bucket-12345"); + final GcsPath forbiddenPath = GcsPath.fromUri("gs://test-bucket"); + + if (experiment.equals("use_gcsutil_v2")) { + assertThrows(FileNotFoundException.class, () -> gcsUtil.getBucketV2(nonExistentPath)); + assertThrows(AccessDeniedException.class, () -> gcsUtil.getBucketV2(forbiddenPath)); + } else { + assertThrows(FileNotFoundException.class, () -> gcsUtil.getBucket(nonExistentPath)); + assertThrows(AccessDeniedException.class, () -> gcsUtil.getBucket(forbiddenPath)); } } - String readGcsTextFile(GcsUtil gcsUtil, String filename) throws IOException { - GcsPath gcsPath = GcsPath.fromUri(filename); - try (ByteStringOutputStream output = new ByteStringOutputStream()) { - try (ReadableByteChannel channel = gcsUtil.open(gcsPath)) { - ByteBuffer bb = ByteBuffer.allocate(16); - while (channel.read(bb) != -1) { - output.write(bb.array(), 0, bb.capacity() - bb.remaining()); - bb.clear(); + @Test + public void testBucketAccessible() throws IOException { + final GcsPath existingPath = GcsPath.fromUri("gs://apache-beam-samples"); + final GcsPath nonExistentPath = GcsPath.fromUri("gs://my-random-test-bucket-12345"); + final GcsPath forbiddenPath = GcsPath.fromUri("gs://test-bucket"); + + assertEquals(true, gcsUtil.bucketAccessible(existingPath)); + assertEquals(false, gcsUtil.bucketAccessible(nonExistentPath)); + assertEquals(false, gcsUtil.bucketAccessible(forbiddenPath)); + } + + @Test + public void testBucketOwner() throws IOException { + final GcsPath existingPath = GcsPath.fromUri("gs://apache-beam-samples"); + final long expectedProjectNumber = 844138762903L; // apache-beam-testing + assertEquals(expectedProjectNumber, gcsUtil.bucketOwner(existingPath)); + + final GcsPath nonExistentPath = GcsPath.fromUri("gs://my-random-test-bucket-12345"); + final GcsPath forbiddenPath = GcsPath.fromUri("gs://test-bucket"); + assertThrows(FileNotFoundException.class, () -> gcsUtil.bucketOwner(nonExistentPath)); + assertThrows(AccessDeniedException.class, () -> gcsUtil.bucketOwner(forbiddenPath)); + } + + @Test + public void testCreateAndRemoveBucket() throws IOException { + final GcsPath gcsPath = GcsPath.fromUri("gs://apache-beam-test-bucket-12345"); + + if (experiment.equals("use_gcsutil_v2")) { + BucketInfo bucketInfo = BucketInfo.of(gcsPath.getBucket()); + try { + assertFalse(gcsUtil.bucketAccessible(gcsPath)); + gcsUtil.createBucket(bucketInfo); + assertTrue(gcsUtil.bucketAccessible(gcsPath)); + + // raise exception when the bucket already exists during creation + assertThrows(FileAlreadyExistsException.class, () -> gcsUtil.createBucket(bucketInfo)); + + assertTrue(gcsUtil.bucketAccessible(gcsPath)); + gcsUtil.removeBucket(bucketInfo); + assertFalse(gcsUtil.bucketAccessible(gcsPath)); + + // raise exception when the bucket does not exist during removal + assertThrows(FileNotFoundException.class, () -> gcsUtil.removeBucket(bucketInfo)); + } finally { + // clean up and ignore errors no matter what + try { + gcsUtil.removeBucket(bucketInfo); + } catch (IOException e) { + } + } + } else { + Bucket bucket = new Bucket().setName(gcsPath.getBucket()); + GcsOptions gcsOptions = options.as(GcsOptions.class); + String projectId = gcsOptions.getProject(); + try { + assertFalse(gcsUtil.bucketAccessible(gcsPath)); + gcsUtil.createBucket(projectId, bucket); + assertTrue(gcsUtil.bucketAccessible(gcsPath)); + + // raise exception when the bucket already exists during creation + assertThrows( + FileAlreadyExistsException.class, () -> gcsUtil.createBucket(projectId, bucket)); + + assertTrue(gcsUtil.bucketAccessible(gcsPath)); + gcsUtil.removeBucket(bucket); + assertFalse(gcsUtil.bucketAccessible(gcsPath)); + + // raise exception when the bucket does not exist during removal + assertThrows(FileNotFoundException.class, () -> gcsUtil.removeBucket(bucket)); + } finally { + // clean up and ignore errors no matter what + try { + gcsUtil.removeBucket(bucket); + } catch (IOException e) { } } - return output.toByteString().toStringUtf8(); } } + + // /** Tests a rewrite operation that requires multiple API calls (using a continuation token). */ + // @Test + // public void testRewriteMultiPart() throws IOException { + // TestPipelineOptions options = + // TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); + // // Using a KMS key is necessary to trigger multi-part rewrites (bucket is created + // // with a bucket default key). + // assertNotNull(options.getTempRoot()); + // options.setTempLocation( + // FileSystems.matchNewDirectory(options.getTempRoot(), "testRewriteMultiPart").toString()); + + // GcsOptions gcsOptions = options.as(GcsOptions.class); + // GcsUtil gcsUtil = gcsOptions.getGcsUtil(); + // String srcFilename = "gs://dataflow-samples/wikipedia_edits/wiki_data-000000000000.json"; + // String dstFilename = + // gcsOptions.getGcpTempLocation() + // + String.format( + // "/GcsUtilIT-%tF-%