From 03948073e9bb820b36b6c83fa4eee772cb4c9e24 Mon Sep 17 00:00:00 2001 From: rma-rripken <89810919+rma-rripken@users.noreply.github.com> Date: Mon, 22 Dec 2025 17:01:52 -0800 Subject: [PATCH 01/11] Initial Object store changes. Still need to cleanup. --- cwms-data-api/build.gradle | 4 +- .../java/cwms/cda/api/BlobController.java | 56 ++- .../java/cwms/cda/data/dao/BlobAccess.java | 26 ++ .../main/java/cwms/cda/data/dao/BlobDao.java | 7 +- .../cda/data/dao/ObjectStorageBlobDao.java | 416 ++++++++++++++++++ .../cda/data/dao/ObjectStorageConfig.java | 64 +++ .../features/CdaFeatureManagerProvider.java | 34 ++ .../java/cwms/cda/features/CdaFeatures.java | 9 + .../BlobControllerObjectStorageTestIT.java | 68 +++ .../cwms/cda/api/BlobControllerTestIT.java | 10 +- .../CdaFeatureManagerProviderTest.java | 77 ++++ docker-compose.yml | 57 ++- gradle/libs.versions.toml | 4 + 13 files changed, 818 insertions(+), 14 deletions(-) create mode 100644 cwms-data-api/src/main/java/cwms/cda/data/dao/BlobAccess.java create mode 100644 cwms-data-api/src/main/java/cwms/cda/data/dao/ObjectStorageBlobDao.java create mode 100644 cwms-data-api/src/main/java/cwms/cda/data/dao/ObjectStorageConfig.java create mode 100644 cwms-data-api/src/main/java/cwms/cda/features/CdaFeatureManagerProvider.java create mode 100644 cwms-data-api/src/main/java/cwms/cda/features/CdaFeatures.java create mode 100644 cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java create mode 100644 cwms-data-api/src/test/java/cwms/cda/features/CdaFeatureManagerProviderTest.java diff --git a/cwms-data-api/build.gradle b/cwms-data-api/build.gradle index c2bc13bf1..0e2bbac37 100644 --- a/cwms-data-api/build.gradle +++ b/cwms-data-api/build.gradle @@ -151,6 +151,8 @@ dependencies { implementation(libs.bundles.overrides) testImplementation(libs.bundles.java.parser) + implementation(libs.togglz.core) + implementation(libs.minio) } task extractWebJars(type: Copy) { @@ -245,7 +247,7 @@ task run(type: JavaExec) { } task integrationTests(type: Test) { - dependsOn test +// dependsOn test dependsOn generateConfig dependsOn war diff --git a/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java b/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java index 2764fe1b5..44cf20104 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java @@ -9,6 +9,9 @@ import com.codahale.metrics.Timer; import cwms.cda.api.errors.CdaError; import cwms.cda.data.dao.BlobDao; +import cwms.cda.data.dao.BlobAccess; +import cwms.cda.data.dao.ObjectStorageBlobDao; +import cwms.cda.data.dao.ObjectStorageConfig; import cwms.cda.data.dao.JooqDao; import cwms.cda.data.dto.Blob; import cwms.cda.data.dto.Blobs; @@ -33,6 +36,9 @@ import org.jetbrains.annotations.NotNull; import org.jooq.DSLContext; +import org.togglz.core.context.FeatureContext; +import cwms.cda.features.CdaFeatures; +import org.togglz.core.manager.FeatureManager; /** @@ -62,6 +68,31 @@ protected DSLContext getDslContext(Context ctx) { return JooqDao.getDslContext(ctx); } + private BlobAccess chooseBlobAccess(DSLContext dsl) { + boolean useObjectStore = isObjectStorageEnabled(); + try { + // Prefer Togglz if available + FeatureManager featureManager = FeatureContext.getFeatureManager(); + useObjectStore = featureManager.isActive(CdaFeatures.USE_OBJECT_STORAGE_BLOBS); + } catch (Throwable ignore) { + // fall back to system/env property check + } + if (useObjectStore) { + ObjectStorageConfig cfg = ObjectStorageConfig.fromSystem(); + return new ObjectStorageBlobDao(cfg); + } + return new BlobDao(dsl); + } + + private boolean isObjectStorageEnabled() { + // System properties first, then env. Accept FEATURE=true + String key = String.valueOf(CdaFeatures.USE_OBJECT_STORAGE_BLOBS); + String v = System.getProperty(key); + if (v == null) v = System.getProperty(key); + if (v == null) v = System.getenv(key); + return v != null && ("true".equalsIgnoreCase(v) || "1".equals(v)); + } + @OpenApi( queryParams = { @OpenApiParam(name = OFFICE, @@ -115,7 +146,7 @@ public void getAll(@NotNull Context ctx) { String formatHeader = ctx.header(Header.ACCEPT); ContentType contentType = Formats.parseHeader(formatHeader, Blobs.class); - BlobDao dao = new BlobDao(dsl); + BlobAccess dao = chooseBlobAccess(dsl); Blobs blobs = dao.getBlobs(cursor, pageSize, office, like); String result = Formats.format(contentType, blobs); @@ -151,12 +182,13 @@ public void getAll(@NotNull Context ctx) { public void getOne(@NotNull Context ctx, @NotNull String blobId) { try (final Timer.Context ignored = markAndTime(GET_ONE)) { - String idQueryParam = ctx.queryParam(CLOB_ID); + String idQueryParam = ctx.queryParam(BLOB_ID); if (idQueryParam != null) { blobId = idQueryParam; } DSLContext dsl = getDslContext(ctx); - BlobDao dao = new BlobDao(dsl); + + BlobAccess dao = chooseBlobAccess(dsl); String officeQP = ctx.queryParam(OFFICE); Optional office = Optional.ofNullable(officeQP); @@ -204,7 +236,7 @@ public void create(@NotNull Context ctx) { boolean failIfExists = ctx.queryParamAsClass(FAIL_IF_EXISTS, Boolean.class).getOrDefault(true); ContentType contentType = Formats.parseHeader(formatHeader, Blob.class); Blob blob = Formats.parseContent(contentType, ctx.bodyAsInputStream(), Blob.class); - BlobDao dao = new BlobDao(dsl); + BlobAccess dao = chooseBlobAccess(dsl); dao.create(blob, failIfExists, false); ctx.status(HttpCode.CREATED); } @@ -213,7 +245,7 @@ public void create(@NotNull Context ctx) { @OpenApi( description = "Update an existing Blob", pathParams = { - @OpenApiParam(name = BLOB_ID, description = "The blob identifier to be deleted"), + @OpenApiParam(name = BLOB_ID, description = "The blob identifier to be updated"), }, requestBody = @OpenApiRequestBody( content = { @@ -235,7 +267,7 @@ public void create(@NotNull Context ctx) { @Override public void update(@NotNull Context ctx, @NotNull String blobId) { try (final Timer.Context ignored = markAndTime(UPDATE)) { - String idQueryParam = ctx.queryParam(CLOB_ID); + String idQueryParam = ctx.queryParam(BLOB_ID); if (idQueryParam != null) { blobId = idQueryParam; } @@ -260,7 +292,13 @@ public void update(@NotNull Context ctx, @NotNull String blobId) { + "updating a blob"); } - BlobDao dao = new BlobDao(dsl); + if(!blob.getId().equals(blobId)) { + throw new FormattingException("The blob id parameter does not match the blob id in the body. " + + "The blob end-point does not support renaming blobs. " + + "Create a new blob with the new id and delete the old one."); + } + + BlobAccess dao = chooseBlobAccess(dsl); dao.update(blob, false); ctx.status(HttpServletResponse.SC_OK); } @@ -287,13 +325,13 @@ public void update(@NotNull Context ctx, @NotNull String blobId) { @Override public void delete(@NotNull Context ctx, @NotNull String blobId) { try (Timer.Context ignored = markAndTime(DELETE)) { - String idQueryParam = ctx.queryParam(CLOB_ID); + String idQueryParam = ctx.queryParam(BLOB_ID); if (idQueryParam != null) { blobId = idQueryParam; } DSLContext dsl = getDslContext(ctx); String office = requiredParam(ctx, OFFICE); - BlobDao dao = new BlobDao(dsl); + BlobAccess dao = chooseBlobAccess(dsl); dao.delete(office, blobId); ctx.status(HttpServletResponse.SC_NO_CONTENT); } diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobAccess.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobAccess.java new file mode 100644 index 000000000..f4d50ace1 --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobAccess.java @@ -0,0 +1,26 @@ +package cwms.cda.data.dao; + +import cwms.cda.data.dto.Blob; +import cwms.cda.data.dto.Blobs; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.Optional; + +public interface BlobAccess { + @NotNull Blobs getBlobs(@Nullable String cursor, int pageSize, @Nullable String officeId, @Nullable String like); + + Optional getByUniqueName(String id, String office); + + void getBlob(String id, String office, BlobDao.BlobConsumer consumer); + + default void getBlob(String id, BlobDao.BlobConsumer consumer) { + getBlob(id, null, consumer); + } + + void create(Blob blob, boolean failIfExists, boolean ignoreNulls); + + void update(Blob blob, boolean ignoreNulls); + + void delete(String office, String id); +} diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java index bdac0a54f..14737f053 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java @@ -34,7 +34,7 @@ import static org.jooq.impl.DSL.table; import static org.jooq.impl.DSL.upper; -public class BlobDao extends JooqDao { +public class BlobDao extends JooqDao implements BlobAccess { public static final String ID = "ID"; public static final String DESCRIPTION = "DESCRIPTION"; @@ -85,6 +85,7 @@ public Optional getByUniqueName(String id, String limitToOffice) { return Optional.ofNullable(retVal); } + @Override public void getBlob(String id, String office, BlobConsumer consumer) { // Not using jOOQ here because we want the java.sql.Blob and not an automatic field binding. We want // blob so that we can pull out a stream to the data and pass that to javalin. @@ -179,6 +180,7 @@ public List getAll(String officeId, String like) { * @param like filter blobs by a case-insensitive regex pattern on their IDs, can be null or empty * @return a Blobs object containing the retrieved blobs and pagination information */ + @Override public @NotNull Blobs getBlobs(@Nullable String cursor, int pageSize, @Nullable String officeId, @Nullable String like) { String cursorOffice = null; @@ -249,6 +251,7 @@ public List getAll(String officeId, String like) { return builder.build(); } + @Override public void create(Blob blob, boolean failIfExists, boolean ignoreNulls) { String pFailIfExists = formatBool(failIfExists); String pIgnoreNulls = formatBool(ignoreNulls); @@ -265,6 +268,7 @@ public void create(Blob blob, boolean failIfExists, boolean ignoreNulls) { blob.getOfficeId())); } + @Override public void update(Blob blob, boolean ignoreNulls) { String pFailIfExists = formatBool(false); String pIgnoreNulls = formatBool(ignoreNulls); @@ -288,6 +292,7 @@ public void update(Blob blob, boolean ignoreNulls) { blob.getOfficeId())); } + @Override public void delete(String office, String id) { if (!blobExists(office, id)) { throw new NotFoundException("Unable to find blob with id " + id + " in office " + office); diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/ObjectStorageBlobDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/ObjectStorageBlobDao.java new file mode 100644 index 000000000..9a7e15395 --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/ObjectStorageBlobDao.java @@ -0,0 +1,416 @@ +package cwms.cda.data.dao; + +import com.google.common.flogger.FluentLogger; +import cwms.cda.api.errors.AlreadyExists; +import cwms.cda.api.errors.FieldLengthExceededException; +import cwms.cda.api.errors.NotFoundException; +import cwms.cda.data.dto.Blob; +import cwms.cda.data.dto.Blobs; +import cwms.cda.data.dto.CwmsDTOPaginated; +import io.minio.*; +import io.minio.errors.*; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import javax.sql.rowset.serial.SerialBlob; +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.regex.Pattern; + +import io.minio.messages.Item; + +/** + * Object Storage-backed implementation using MinIO Java client. keys like OFFICE/ID_UPPER. + */ +public class ObjectStorageBlobDao implements BlobAccess { + FluentLogger logger = FluentLogger.forEnclosingClass(); + + public static final int ID_LENGTH_LIMIT = 256; // This is to match pl/sql limit + private static final int MAX_KEY_LENGTH = 1024; + private final ObjectStorageConfig config; + private final MinioClient client; + + public ObjectStorageBlobDao(ObjectStorageConfig config) { + this.config = config; + this.client = buildClient(config); + } + + private static MinioClient buildClient(ObjectStorageConfig cfg) { + MinioClient.Builder b = MinioClient.builder(); + if (cfg.endpoint() != null && !cfg.endpoint().isEmpty()) { + b = b.endpoint(cfg.endpoint()); + } + if (cfg.accessKey() != null && cfg.secretKey() != null) { + b = b.credentials(cfg.accessKey(), cfg.secretKey()); + } + + return b.build(); + } + + @Override + public @NotNull Blobs getBlobs(@Nullable String cursor, int pageSize, @Nullable String officeId, @Nullable String like) { + String prefix = null; + if (officeId != null && !officeId.isEmpty()) { + prefix = officeId.toUpperCase(Locale.ROOT) + "/"; + } + + String startAfter = null; + + String cursorOffice = null; + String cursorId = null; + if (cursor != null && !cursor.isEmpty()) { + final String[] parts = CwmsDTOPaginated.decodeCursor(cursor, "||"); + + if (parts.length > 1) { + cursorOffice = Blobs.getOffice(cursor); + cursorId = Blobs.getId(cursor); + pageSize = Integer.parseInt(parts[2]); + } + + if (cursorOffice != null && cursorId != null) { + startAfter = key(cursorOffice, cursorId); + } + } + + + Pattern likePattern = null; + if (like != null && !like.isEmpty() && !".*".equals(like)) { + likePattern = Pattern.compile(like, Pattern.CASE_INSENSITIVE); + } + + List collected = new ArrayList<>(); + + ListObjectsArgs.Builder args = ListObjectsArgs.builder() + .bucket(requiredBucket()) + .recursive(true) + .maxKeys(pageSize); + if (prefix != null) args = args.prefix(prefix); + if (startAfter != null) args = args.startAfter(startAfter); + + for (Result res : client.listObjects(args.build())) { + try { + // item.key() like OFFICE/ID + Item item = res.get(); + String k = item.objectName(); + int slash = k.indexOf('/'); + if (slash <= 0 || slash >= k.length() - 1) continue; + String off = k.substring(0, slash); + String id = k.substring(slash + 1); + if (likePattern != null && !likePattern.matcher(id).find()) { + continue; + } + // fetch metadata for media type and optional description + try { + StatObjectResponse stat = client.statObject(StatObjectArgs.builder() + .bucket(requiredBucket()) + .object(k) + .build()); + String mediaType = stat.contentType(); + String desc = stat.userMetadata() != null ? stat.userMetadata().getOrDefault("description", null) : null; + collected.add(new Blob(off, id, desc, mediaType, null)); + if (collected.size() >= pageSize) break; + } catch (Exception e) { + // skip items that fail stat + } + } catch (Exception ignore) { + // skip this entry on error + } + } + + Blobs.Builder builder = new Blobs.Builder(cursor, pageSize, 0); + collected.forEach(builder::addBlob); + return builder.build(); + } + + @Override + public Optional getByUniqueName(String id, String office) { + String k = (office == null || office.isEmpty()) ? findFirstKeyById(id) : key(office, id); + if (k == null) { + return Optional.empty(); + } + String officeFromKey = officeFromKey(k); + String idFromKey = idFromKey(k); + try { + StatObjectResponse stat = client.statObject(StatObjectArgs.builder() + .bucket(requiredBucket()) + .object(k) + .build()); + String mediaType = stat.contentType(); + String desc = stat.userMetadata() != null ? stat.userMetadata().getOrDefault("description", null) : null; + return Optional.of(new Blob(officeFromKey, idFromKey, desc, mediaType, null)); + } catch (ErrorResponseException ere) { + if ("NoSuchKey".equalsIgnoreCase(ere.errorResponse().code())) { + return Optional.empty(); + } + throw new RuntimeException(ere); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void getBlob(String id, String office, BlobDao.BlobConsumer consumer) { + String k = (office == null || office.isEmpty()) ? findFirstKeyById(id) : key(office, id); + try { + if (k == null) { + try { + consumer.accept(null, null); + } catch (Exception e) { + throw new RuntimeException(e); + } + return; + } + logger.atFine().log("Getting stat for %s", k); + // Stat first to get content type and size + StatObjectResponse stat = client.statObject(StatObjectArgs.builder() + .bucket(requiredBucket()) + .object(k) + .build()); + String mediaType = stat.contentType() != null ? stat.contentType() : "application/octet-stream"; + + try (InputStream is = client.getObject(GetObjectArgs.builder() + .bucket(requiredBucket()) + .object(k) + .build())) { + // Its too bad this has to readFully - future optimization can skip ahead + // b/c the consumer really just wants to get the stream out of the blob. + byte[] data = readFully(is); + SerialBlob blob = new SerialBlob(data); + consumer.accept(blob, mediaType); + } catch (Exception e) { + throw new RuntimeException(e); + } + } catch (ErrorResponseException ere) { + if ("NoSuchKey".equalsIgnoreCase(ere.errorResponse().code())) { + try { + // We could also just throw a NotFoundException. + // BlobController suggests consumer.accept(null, null); will handle things. + consumer.accept(null, null); + } catch (Exception e) { + throw new RuntimeException(e); + } + return; + } + throw new RuntimeException(ere); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void create(Blob blob, boolean failIfExists, boolean ignoreNulls) { + String k = key(blob.getOfficeId(), blob.getId()); + if (failIfExists) { + try { + client.statObject(StatObjectArgs.builder() + .bucket(requiredBucket()) + .object(k) + .build()); + throw new AlreadyExists("Blob already exists: " + k, null); + } catch (ErrorResponseException ere) { + if (!"NoSuchKey".equalsIgnoreCase(ere.errorResponse().code())) { + throw new RuntimeException(ere); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // TODO: Figure out which of these can be something better. + try { + doPut(blob, k, ignoreNulls); + } catch (ServerException e) { + throw new RuntimeException(e); + } catch (InsufficientDataException e) { + throw new RuntimeException(e); + } catch (ErrorResponseException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } catch (InvalidKeyException e) { + throw new RuntimeException(e); + } catch (InvalidResponseException e) { + throw new RuntimeException(e); + } catch (XmlParserException e) { + throw new RuntimeException(e); + } catch (InternalException e) { + throw new RuntimeException(e); + } + } + + @Override + public void update(Blob blob, boolean ignoreNulls) { + String k = key(blob.getOfficeId(), blob.getId()); + // For updatemake sure it exists first + try { + client.statObject(StatObjectArgs.builder() + .bucket(requiredBucket()) + .object(k) + .build()); + } catch (ErrorResponseException ere) { + if ("NoSuchKey".equalsIgnoreCase(ere.errorResponse().code())) { + throw new NotFoundException("Unable to find blob with id " + blob.getId() + " in office " + blob.getOfficeId()); + } + throw new RuntimeException(ere); + } catch (Exception e) { + throw new RuntimeException(e); + } + + try { + doPut(blob, k, ignoreNulls); + } catch (ServerException e) { + throw new RuntimeException(e); + } catch (InsufficientDataException e) { + throw new RuntimeException(e); + } catch (ErrorResponseException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } catch (InvalidKeyException e) { + throw new RuntimeException(e); + } catch (InvalidResponseException e) { + throw new RuntimeException(e); + } catch (XmlParserException e) { + throw new RuntimeException(e); + } catch (InternalException e) { + throw new RuntimeException(e); + } + } + + private void doPut(Blob blob, String k, boolean ignoreNulls) throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException { + byte[] value = blob.getValue(); + if (value == null && ignoreNulls) { + return; + } + + if (value == null) { + value = new byte[0]; + } + + try (InputStream is = new ByteArrayInputStream(value)) { + PutObjectArgs.Builder builder = PutObjectArgs.builder() + .bucket(requiredBucket()) + .object(k) + .stream(is, value.length, -1) + .contentType(blob.getMediaTypeId()); + + if (blob.getDescription() != null) { + builder.userMetadata(java.util.Collections.singletonMap("description", blob.getDescription())); + } + + client.putObject(builder.build()); + } + } + + @Override + public void delete(String office, String id) { + String k = key(office, id); + try { + client.removeObject(RemoveObjectArgs.builder() + .bucket(requiredBucket()) + .object(k) + .build()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private String findFirstKeyById(String id) { + String targetSuffix = "/" + normalizeId(id).toUpperCase(Locale.ROOT); + try { + ListObjectsArgs args = ListObjectsArgs.builder() + .bucket(requiredBucket()) + .recursive(true) + .build(); + for (Result res : client.listObjects(args)) { + try { + Item item = res.get(); + String name = item.objectName(); + if (name.toUpperCase(Locale.ROOT).endsWith(targetSuffix)) { + return name; + } + } catch (Exception ignore) { + } + } + return null; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static String officeFromKey(String key) { + int slash = key.indexOf('/'); + return (slash > 0) ? key.substring(0, slash) : null; + } + + private static String idFromKey(String key) { + int slash = key.indexOf('/'); + return (slash >= 0 && slash < key.length() - 1) ? key.substring(slash + 1) : key; + } + + private String requiredBucket() { + String bucket = config.bucket(); + if (bucket == null || bucket.isEmpty()) { + throw new IllegalStateException("Object storage bucket is not configured (blob.store.bucket)"); + } + return bucket; + } + + private static String key(String office, String id) { + String off = office == null ? "" : office.toUpperCase(Locale.ROOT); + String nid = normalizeId(id).toUpperCase(Locale.ROOT); + String fullKey = off + "/" + nid; + if (fullKey.length() > MAX_KEY_LENGTH) { + throw new FieldLengthExceededException("Key", fullKey.length(), MAX_KEY_LENGTH, null, true); + } + return fullKey; + } + + private static String normalizeId(String id) { + if (id == null) return ""; + + if(id.length() > ID_LENGTH_LIMIT){ + throw new FieldLengthExceededException("ID", id.length(), ID_LENGTH_LIMIT, null, true); + } + // Replace spaces with underscore; leave common safe chars; percent-encode others + StringBuilder sb = new StringBuilder(); + for (char c : id.toCharArray()) { + if (Character.isLetterOrDigit(c) || c == '.' || c == '_' || c == '-' ) { + sb.append(c); + } else if (c == ' ') { + sb.append('_'); + } else if (c == '/') { + // keep slash because controller may pass IDs containing '/'; since we prefix with OFFICE/, this would nest more levels + sb.append('/'); + } else { + String hex = Integer.toHexString(c).toUpperCase(Locale.ROOT); + if (hex.length() == 1) hex = "0" + hex; + sb.append('%').append(hex); + } + } + return sb.toString(); + } + + private static byte[] readFully(InputStream is) throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] buf = new byte[8192]; + int r; + while ((r = is.read(buf)) != -1) { + baos.write(buf, 0, r); + } + return baos.toByteArray(); + } +} diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/ObjectStorageConfig.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/ObjectStorageConfig.java new file mode 100644 index 000000000..442c13514 --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/ObjectStorageConfig.java @@ -0,0 +1,64 @@ +package cwms.cda.data.dao; + +import java.util.Optional; + +public class ObjectStorageConfig { + + private final String bucket; + + private final String endpoint; + + private final String accessKey; + private final String secretKey; + + public ObjectStorageConfig(String bucket, String endpoint, + String accessKey, String secretKey) { + + this.bucket = bucket; + + this.endpoint = endpoint; + + this.accessKey = accessKey; + this.secretKey = secretKey; + } + + public static ObjectStorageConfig fromSystem() { + + String bucket = get("blob.store.bucket").orElse(null); + + String endpoint = get("blob.store.endpoint").orElse(null); + + String accessKey = get("blob.store.accessKey").orElse(null); + String secretKey = get("blob.store.secretKey").orElse(null); + return new ObjectStorageConfig(bucket, endpoint, accessKey, secretKey); + } + + private static Optional get(String key) { + String sys = System.getProperty(key); + if (sys != null && !sys.isEmpty()) return Optional.of(sys); + String env = System.getenv(toEnvKey(key)); + if (env != null && !env.isEmpty()) return Optional.of(env); + return Optional.empty(); + } + + private static String toEnvKey(String key) { + return key.toUpperCase().replace('.', '_'); + } + + + public String bucket() { + return bucket; + } + + public String endpoint() { + return endpoint; + } + + public String accessKey() { + return accessKey; + } + + public String secretKey() { + return secretKey; + } +} diff --git a/cwms-data-api/src/main/java/cwms/cda/features/CdaFeatureManagerProvider.java b/cwms-data-api/src/main/java/cwms/cda/features/CdaFeatureManagerProvider.java new file mode 100644 index 000000000..284a63a7f --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/features/CdaFeatureManagerProvider.java @@ -0,0 +1,34 @@ +package cwms.cda.features; + +import org.togglz.core.manager.FeatureManager; +import org.togglz.core.manager.FeatureManagerBuilder; +import org.togglz.core.repository.file.FileBasedStateRepository; +import java.io.File; +import org.togglz.core.spi.FeatureManagerProvider; + +public class CdaFeatureManagerProvider implements FeatureManagerProvider { + public static final String DEFAULT_PROPERTIES_FILE = "features.properties"; + public static final String PROPERTIES_FILE = "properties.file"; + private volatile FeatureManager manager; + + @Override + public int priority() { + return 10; + } + + @Override + public FeatureManager getFeatureManager() { + if (manager == null) { + synchronized (this) { + if (manager == null) { + String file = System.getProperty(PROPERTIES_FILE, DEFAULT_PROPERTIES_FILE); + manager = new FeatureManagerBuilder() + .featureEnum(CdaFeatures.class) + .stateRepository(new FileBasedStateRepository(new File(file))) + .build(); + } + } + } + return manager; + } +} diff --git a/cwms-data-api/src/main/java/cwms/cda/features/CdaFeatures.java b/cwms-data-api/src/main/java/cwms/cda/features/CdaFeatures.java new file mode 100644 index 000000000..fa2a662ef --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/features/CdaFeatures.java @@ -0,0 +1,9 @@ +package cwms.cda.features; + +import org.togglz.core.Feature; +import org.togglz.core.annotation.Label; + +public enum CdaFeatures implements Feature { + @Label("Use object-storage backed Blob DAO in BlobController") + USE_OBJECT_STORAGE_BLOBS +} diff --git a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java new file mode 100644 index 000000000..854bbdf15 --- /dev/null +++ b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java @@ -0,0 +1,68 @@ +package cwms.cda.api; + +import cwms.cda.features.CdaFeatures; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.Extension; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.togglz.core.context.FeatureContext; +import org.togglz.core.manager.FeatureManager; + +@Tag("integration") +@ExtendWith(BlobControllerObjectStorageTestIT.FeatureEnableExtension.class) +public class BlobControllerObjectStorageTestIT extends BlobControllerTestIT{ + + static boolean wasActive; + + + // I need this to happen before super.BeforeAll is run so that the create will create into Object-store version +// @BeforeAll +// public static void setup() throws Exception { +// setObjectStoreProperties(); +// +// // now call the method that the super calls. +// createExistingBlob(); +// } + + private static void setObjectStoreProperties() { + FeatureManager featureManager = FeatureContext.getFeatureManager(); + wasActive=featureManager.isActive(CdaFeatures.USE_OBJECT_STORAGE_BLOBS); + featureManager.enable(CdaFeatures.USE_OBJECT_STORAGE_BLOBS); + featureManager.isActive(CdaFeatures.USE_OBJECT_STORAGE_BLOBS); + + // TODO: Need to figure out a cleaner way to do this + System.setProperty("blob.store.endpoint", "http://127.0.0.1:9000"); + System.setProperty("blob.store.bucket", "cwms-test"); + System.setProperty("blob.store.accessKey", "cda_user"); + System.setProperty("blob.store.secretKey", "cda_password"); + } + + @AfterAll + public static void teardown() { + FeatureManager featureManager = FeatureContext.getFeatureManager(); + if(wasActive){ + featureManager.enable(CdaFeatures.USE_OBJECT_STORAGE_BLOBS); + } else { + featureManager.disable(CdaFeatures.USE_OBJECT_STORAGE_BLOBS); + } + + } + + @Override + @Test + void test_create_getOne() + { + super.test_create_getOne(); + } + + static class FeatureEnableExtension implements Extension, BeforeAllCallback { + + @Override + public void beforeAll(ExtensionContext context) { + setObjectStoreProperties(); + } + } +} diff --git a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerTestIT.java b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerTestIT.java index 446e9e9bd..82c44eb5a 100644 --- a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerTestIT.java +++ b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerTestIT.java @@ -18,6 +18,7 @@ import javax.servlet.http.HttpServletResponse; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.time.Duration; import static io.restassured.RestAssured.given; @@ -36,6 +37,11 @@ public class BlobControllerTestIT extends DataApiTestIT { private static final String EXISTING_BLOB_VALUE = "test value"; @BeforeAll + public static void setup() throws Exception { + createExistingBlob(); + } + + static void createExistingBlob() throws Exception { String origDesc = "test description"; @@ -67,7 +73,7 @@ static void createExistingBlob() throws Exception @Test void test_getOne_not_found() throws UnsupportedEncodingException { String blobId = "TEST"; - String urlencoded = URLEncoder.encode(blobId, "UTF-8"); + String urlencoded = URLEncoder.encode(blobId, StandardCharsets.UTF_8); given() .log().ifValidationFails(LogDetail.ALL,true) @@ -406,7 +412,7 @@ void test_pagination_works() { nextPage = pageN.path("next-page"); int pageTotal = pageN.path("blobs.size()"); - assertTrue(pageTotal <= pageSize, "Expected the page to return no more than the configured page size"); + assertTrue(pageTotal <= pageSize, "Expected the page to return no more than the configured page size. Expected " + pageTotal + "<=" + pageSize); totalRetrieved += pageTotal; } while( nextPage != null ); diff --git a/cwms-data-api/src/test/java/cwms/cda/features/CdaFeatureManagerProviderTest.java b/cwms-data-api/src/test/java/cwms/cda/features/CdaFeatureManagerProviderTest.java new file mode 100644 index 000000000..7a2f68a2a --- /dev/null +++ b/cwms-data-api/src/test/java/cwms/cda/features/CdaFeatureManagerProviderTest.java @@ -0,0 +1,77 @@ +package cwms.cda.features; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.togglz.core.manager.FeatureManager; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; + +import static org.junit.jupiter.api.Assertions.*; + +class CdaFeatureManagerProviderTest { + + private String originalPropertiesFile; + + @BeforeEach + void setUp() { + originalPropertiesFile = System.getProperty(CdaFeatureManagerProvider.PROPERTIES_FILE); + } + + @AfterEach + void tearDown() { + if (originalPropertiesFile != null) { + System.setProperty(CdaFeatureManagerProvider.PROPERTIES_FILE, originalPropertiesFile); + } else { + System.clearProperty(CdaFeatureManagerProvider.PROPERTIES_FILE); + } + } + + @Test + void testPriority() { + CdaFeatureManagerProvider provider = new CdaFeatureManagerProvider(); + assertEquals(10, provider.priority()); + } + + @Test + void testGetFeatureManager() { + CdaFeatureManagerProvider provider = new CdaFeatureManagerProvider(); + FeatureManager manager = provider.getFeatureManager(); + assertNotNull(manager); + assertSame(manager, provider.getFeatureManager(), "Should return the same instance"); + } + + @Test + void testUseObjectStorageBlobsFeature() throws IOException { + File tempFile = Files.createTempFile("features", ".properties").toFile(); + tempFile.deleteOnExit(); + + try (FileWriter writer = new FileWriter(tempFile)) { + writer.write(CdaFeatures.USE_OBJECT_STORAGE_BLOBS.name() + " = true"); + } + + System.setProperty(CdaFeatureManagerProvider.PROPERTIES_FILE, tempFile.getAbsolutePath()); + + CdaFeatureManagerProvider provider = new CdaFeatureManagerProvider(); + FeatureManager manager = provider.getFeatureManager(); + + assertTrue(manager.isActive(CdaFeatures.USE_OBJECT_STORAGE_BLOBS)); + } + + @Test + void testFeatureDisabledByDefault() throws IOException { + File tempFile = Files.createTempFile("features_disabled", ".properties").toFile(); + tempFile.deleteOnExit(); + + // Empty file should mean features are disabled by default + System.setProperty(CdaFeatureManagerProvider.PROPERTIES_FILE, tempFile.getAbsolutePath()); + + CdaFeatureManagerProvider provider = new CdaFeatureManagerProvider(); + FeatureManager manager = provider.getFeatureManager(); + + assertFalse(manager.isActive(CdaFeatures.USE_OBJECT_STORAGE_BLOBS)); + } +} diff --git a/docker-compose.yml b/docker-compose.yml index 1254b4dd1..1ef5cb2d0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,11 @@ volumes: oracle_data_1: auth_data: + minio_data: + driver: local + minio_config: + driver: local + services: db: image: ghcr.io/hydrologicengineeringcenter/cwms-database/cwms/database-ready-ora-23.5:latest-dev @@ -50,6 +55,8 @@ services: condition: service_completed_successfully traefik: condition: service_healthy + minio: + condition: service_healthy image: cwms-rest-api:local-dev build: target: api @@ -77,6 +84,11 @@ services: - cwms.dataapi.access.openid.altAuthUrl=http://localhost:${APP_PORT:-8081} - cwms.dataapi.access.openid.useAltWellKnown=true - cwms.dataapi.access.openid.issuer=http://localhost:${APP_PORT:-8081}/auth/realms/cwms + - blob.store.endpoint=http://minio:9000 + - blob.store.region=docker + - blob.store.bucket=cwms-test + - blob.store.accessKey=cda_user + - blob.store.secretKey=cda_password expose: - 7000 - 5005 @@ -133,7 +145,7 @@ services: expose: - "8080" volumes: - - "/var/run/docker.sock:/var/run/docker.sock:ro" + - /var/run/docker.sock:/var/run/docker.sock:ro healthcheck: test: traefik healthcheck --ping command: @@ -147,3 +159,46 @@ services: - "traefik.enable=true" - "traefik.http.routers.traefik.rule=PathPrefix(`/traefik`)" - "traefik.http.routers.traefik.service=api@internal" + + minio: + container_name: minio_server + image: minio/minio:RELEASE.2025-04-22T22-12-26Z + volumes: + - minio_data:/data + - minio_config:/root/.minio + ports: + - '${FORWARD_MINIO_API_PORT0:-9000}:9000' + - '${FORWARD_MINIO_PORT:-9001}:9001' + - '${FORWARD_MINIO_API_PORT2:-9002}:9002' + environment: + MINIO_ROOT_USER: minio_admin + MINIO_ROOT_PASSWORD: saersdbewadfqewrbwreq12rfgweqrffw52354ec@%fwewEFFWSE + command: server /data --console-address ":9001" + healthcheck: + test: [ "CMD", "mc", "ready", "local" ] + interval: 5s + timeout: 5s + retries: 5 + restart: unless-stopped + deploy: + resources: + reservations: + cpus: "0.5" + memory: 1G + limits: + cpus: "1.0" + memory: 2G + + minio-setup: + image: minio/mc:latest + depends_on: + minio: + condition: service_healthy + entrypoint: > + /bin/sh -c " + /usr/bin/mc alias set myminio http://minio:9000 minio_admin saersdbewadfqewrbwreq12rfgweqrffw52354ec@%fwewEFFWSE; + /usr/bin/mc admin user add myminio cda_user cda_password; + /usr/bin/mc mb --ignore-existing myminio/cwms-test; + /usr/bin/mc admin policy attach myminio readwrite --user cda_user; + exit 0; + " diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 1e49a183e..9ff8d2679 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -36,6 +36,8 @@ freemarker = "2.3.32" auto-service = "1.1.1" openapi-validation = "2.44.9" javaparser = "3.26.2" +togglz = "3.3.3" +minio = "8.6.0" #Overrides classgraph = { strictly = '4.8.176' } @@ -79,7 +81,9 @@ jackson-datatype-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-dat jackson-dataformat-xml = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-xml", version.ref = "jackson" } jackson-datatype-jdk8 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jdk8", version.ref = "jackson" } +togglz-core = { module = "org.togglz:togglz-core", version.ref = "togglz" } +minio = { module = "io.minio:minio", version.ref = "minio"} #compile compileOnly javaee-web-api = { module = "javax:javaee-web-api", version.ref = "java-ee" } From d02ccfe9c78cb835d7d32330136ce12773a6bdc7 Mon Sep 17 00:00:00 2001 From: rma-rripken <89810919+rma-rripken@users.noreply.github.com> Date: Tue, 23 Dec 2025 16:44:22 -0800 Subject: [PATCH 02/11] Added methods and util classes to clarify and test http range request parsing. --- .../main/java/cwms/cda/api/RangeParser.java | 107 +++++++++++++++++ .../java/cwms/cda/api/RangeRequestUtil.java | 101 ++++++++-------- .../java/cwms/cda/api/RangeParserTest.java | 110 ++++++++++++++++++ 3 files changed, 269 insertions(+), 49 deletions(-) create mode 100644 cwms-data-api/src/main/java/cwms/cda/api/RangeParser.java create mode 100644 cwms-data-api/src/test/java/cwms/cda/api/RangeParserTest.java diff --git a/cwms-data-api/src/main/java/cwms/cda/api/RangeParser.java b/cwms-data-api/src/main/java/cwms/cda/api/RangeParser.java new file mode 100644 index 000000000..a2e84ecfa --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/api/RangeParser.java @@ -0,0 +1,107 @@ +package cwms.cda.api; + +import org.jspecify.annotations.NonNull; + +import java.util.*; +import java.util.regex.*; + +/** + * Utility class for parsing HTTP Range headers. + * These typically look like: bytes=100-1234 + * or: bytes=100- this is common to resume a download + * or: bytes=0- equivalent to a regular request for the whole file + * but by returning 206 we show that we support range requests + * Note that multiple ranges can be requested at once such + * as: bytes=500-600,700-999 Server responds identifies separator and then puts separator between chunks + * bytes=0-0,-1 also legal its just the first and the last byte + * or: bytes=500-600,601-999 legal but what is the point? + * or: bytes=500-700,601-999 legal, notice they overlap. + * + * + */ +public class RangeParser { + + private static final Pattern RANGE_PATTERN = Pattern.compile("(\\d*)-(\\d*)"); + + /** + * Return a list of two element long[] containing byte ranges parsed from the HTTP Range header. + * If the end of a range is not specified ( e.g. bytes=100- ) then a -1 is returned in the second position + * If the range only includes a negative byte (e.g bytes=-50) then -1 is returned as the start of the range + * and -1*end is returned as the end of the range. bytes=-50 will result in [-1,50] + * + * @param header the HTTP Range header + * @return a list of byte ranges + */ + public static List parse(String header) { + if (header == null || header.isEmpty() ) { + return Collections.emptyList(); + } else if ( !header.startsWith("bytes=")){ + throw new IllegalArgumentException("Invalid Range header: " + header); + } + + String rangePart = header.substring(6); + List retval = parseRanges(rangePart); + if( retval.isEmpty() ){ + throw new IllegalArgumentException("Invalid Range header: " + header); + } + return retval; + } + + public static @NonNull List parseRanges(String rangePart) { + if( rangePart == null || rangePart.isEmpty() ){ + throw new IllegalArgumentException("Invalid range specified: " + rangePart); + } + String[] parts = rangePart.split(","); + List ranges = new ArrayList<>(); + + for (String part : parts) { + Matcher m = RANGE_PATTERN.matcher(part.trim()); + if (m.matches()) { + String start = m.group(1); + String end = m.group(2); + + long s = start.isEmpty() ? -1 : Long.parseLong(start); + long e = end.isEmpty() ? -1 : Long.parseLong(end); + + ranges.add(new long[]{s, e}); + } + } + return ranges; + } + + /** + * The parse() method in this class can return -1 for unspecified values or when suffix ranges are supplied. + * This method interprets the negative values in regard to the totalSize and returns inclusive indices of the + * requested range. + * @param inputs the array of start and end byte positions + * @param totalBytes the total number of bytes in the file + * @return a long array with the start and end byte positions, these are inclusive. [0,0] means return the first byte + */ + public static long[] interpret(long[] inputs, long totalBytes){ + if(inputs == null){ + throw new IllegalArgumentException("null range array provided"); + } else if( inputs.length != 2 ){ + throw new IllegalArgumentException("Invalid number of inputs: " + Arrays.toString(inputs)); + } + + long start = inputs[0]; + long end = inputs[1]; + + if(start == -1L){ + // its a suffix request. + start = totalBytes - end; + end = totalBytes - 1; + } else { + if (start < 0 || end < start) { + throw new IllegalArgumentException("Invalid range specified: " + Arrays.toString(inputs)); + } + + start = Math.min(start, totalBytes - 1); + end = Math.min(end, totalBytes - 1); + } + + return new long[]{start, end}; + } + + +} diff --git a/cwms-data-api/src/main/java/cwms/cda/api/RangeRequestUtil.java b/cwms-data-api/src/main/java/cwms/cda/api/RangeRequestUtil.java index c84c74afa..0f3fefb9b 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/RangeRequestUtil.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/RangeRequestUtil.java @@ -1,14 +1,16 @@ package cwms.cda.api; +import com.google.common.flogger.FluentLogger; import io.javalin.core.util.Header; import io.javalin.http.Context; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Arrays; import java.util.List; +import org.apache.commons.io.IOUtils; public class RangeRequestUtil { + static FluentLogger logger = FluentLogger.forEnclosingClass(); private RangeRequestUtil() { // utility class @@ -19,78 +21,79 @@ private RangeRequestUtil() { * take the InputStream, wrap it in a CompletedFuture and then process the request asynchronously. This * causes problems when the InputStream is tied to a database connection that gets closed before the * async processing happens. This method doesn't do the async thing but tries to support the rest. - * @param ctx - * @param is - * @param mediaType - * @param totalBytes - * @throws IOException + * @param ctx the Javalin context + * @param is the input stream + * @param mediaType the content type + * @param totalBytes the total number of bytes in the input stream + * @throws IOException if either of the streams throw an IOException */ public static void seekableStream(Context ctx, InputStream is, String mediaType, long totalBytes) throws IOException { - long from = 0; - long to = totalBytes - 1; + if (ctx.header(Header.RANGE) == null) { + // Not a range request. ctx.res.setContentType(mediaType); + // Javalin's version of this method doesn't set the content-length // Not setting the content-length makes the servlet container use Transfer-Encoding=chunked. // Chunked is a worse experience overall, seems like we should just set the length if we know it. - writeRange(ctx.res.getOutputStream(), is, from, Math.min(to, totalBytes - 1)); + ctx.header(Header.CONTENT_LENGTH, String.valueOf(totalBytes)); + + IOUtils.copyLarge(is, (OutputStream) ctx.res.getOutputStream(), 0, totalBytes); } else { - int chunkSize = 128000; String rangeHeader = ctx.header(Header.RANGE); - String[] eqSplit = rangeHeader.split("=", 2); - String[] dashSplit = eqSplit[1].split("-", -1); // keep empty trailing part - - List requestedRange = Arrays.stream(dashSplit) - .filter(s -> !s.isEmpty()) - .collect(java.util.stream.Collectors.toList()); - - from = Long.parseLong(requestedRange.get(0)); - if (from + chunkSize > totalBytes) { - // chunk bigger than file, write all - to = totalBytes - 1; - } else if (requestedRange.size() == 2) { - // chunk smaller than file, to/from specified - to = Long.parseLong(requestedRange.get(1)); + List ranges = RangeParser.parse(rangeHeader); + + long[] requestedRange = ranges.get(0); + if( ranges.size() > 1 ){ + // we support range requests but we not currently supporting multiple ranges. + // Range request are optional so we have choices what to do if multiple ranges are requested: + // We could return 416 and hope the client figures out to only send one range + // We could service the first range with 206 and ignore the other ranges + // We could ignore the range request entirely and return the full body with 200 + // We could implement support for multiple ranges + logger.atInfo().log("Multiple ranges requested, using first and ignoring additional ranges"); } else { - // chunk smaller than file, to/from not specified - to = from + chunkSize - 1; - } + requestedRange = RangeParser.interpret(requestedRange, totalBytes); - ctx.status(206); + long from = requestedRange[0]; + long to = requestedRange[1]; - ctx.header(Header.ACCEPT_RANGES, "bytes"); - ctx.header(Header.CONTENT_RANGE, "bytes " + from + "-" + to + "/" + totalBytes); + ctx.status(206); - ctx.res.setContentType(mediaType); - ctx.header(Header.CONTENT_LENGTH, String.valueOf(Math.min(to - from + 1, totalBytes))); - writeRange(ctx.res.getOutputStream(), is, from, Math.min(to, totalBytes - 1)); + ctx.header(Header.ACCEPT_RANGES, "bytes"); + ctx.header(Header.CONTENT_RANGE, "bytes " + from + "-" + to + "/" + totalBytes); + + ctx.res.setContentType(mediaType); + ctx.header(Header.CONTENT_LENGTH, String.valueOf(Math.min(to - from + 1, totalBytes))); + writeRange(ctx.res.getOutputStream(), is, from, Math.min(to, totalBytes - 1)); + } } } - + /** + * Writes a range of bytes from the input stream to the output stream. + * @param out the output stream to write to. + * @param in the input stream to read from. It is assumed that this stream is open and positioned at 0. + * @param from the starting byte position to read from (inclusive) + * @param to the ending byte position to read to (inclusive) + * @throws IOException if either of the streams throw an IOException + */ public static void writeRange(OutputStream out, InputStream in, long from, long to) throws IOException { - writeRange(out, in, from, to, new byte[8192]); + skip(in, from); + long len = to - from + 1; + + // If the inputOffset to IOUtils.copyLarge is not 0 then IOUtils will do its own skipping. For reasons + // that IOUtils explains (quirks of certain streams) it does its skipping via read(). Using read() has performance + // implications b/c all the skipped data gets copied to memory. We do our own skipping and then have IOUtils copy. + IOUtils.copyLarge(in, out, 0, len); } - public static void writeRange(OutputStream out, InputStream is, long from, long to, byte[] buffer) throws IOException { - long toSkip = from; + private static void skip(InputStream is, long toSkip) throws IOException { while (toSkip > 0) { long skipped = is.skip(toSkip); toSkip -= skipped; } - - long bytesLeft = to - from + 1; - while (bytesLeft != 0L) { - int maxRead = (int) Math.min(buffer.length, bytesLeft); - int read = is.read(buffer, 0, maxRead); - if (read == -1) { - break; - } - out.write(buffer, 0, read); - bytesLeft -= read; - } - } } diff --git a/cwms-data-api/src/test/java/cwms/cda/api/RangeParserTest.java b/cwms-data-api/src/test/java/cwms/cda/api/RangeParserTest.java new file mode 100644 index 000000000..75ca185b8 --- /dev/null +++ b/cwms-data-api/src/test/java/cwms/cda/api/RangeParserTest.java @@ -0,0 +1,110 @@ +package cwms.cda.api; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +class RangeParserTest { + + @Test + void testResume() { + List ranges = RangeParser.parse("bytes=100-"); + assertNotNull(ranges); + assertEquals(1, ranges.size()); + assertArrayEquals(new long[]{100L, -1L}, ranges.get(0)); + } + + @Test + void testFirstK() { + List ranges = RangeParser.parse("bytes=0-1000"); + assertNotNull(ranges); + assertEquals(1, ranges.size()); + assertArrayEquals(new long[]{0L, 1000L}, ranges.get(0)); + } + + @Test + void testFirstOpen() { + List ranges = RangeParser.parse("bytes=0-"); + assertNotNull(ranges); + assertEquals(1, ranges.size()); + assertArrayEquals(new long[]{0L, -1L}, ranges.get(0)); + } + + @Test + void testSuffixOpen() { + List ranges = RangeParser.parse("bytes=-50"); + assertNotNull(ranges); + assertEquals(1, ranges.size()); + assertArrayEquals(new long[]{-1L, 50L}, ranges.get(0)); + } + + + @Test + void testTwoPart() { + List ranges = RangeParser.parse("bytes=0-10,99-100"); + assertNotNull(ranges); + assertEquals(2, ranges.size()); + assertArrayEquals(new long[]{0L, 10L}, ranges.get(0)); + assertArrayEquals(new long[]{99L, 100L}, ranges.get(1)); + } + + + @Test + void testMultiParse() { + List ranges = RangeParser.parse("bytes=0-99,200-299,-50"); + assertNotNull(ranges); + assertEquals(3, ranges.size()); + assertArrayEquals(new long[]{0L, 99L}, ranges.get(0)); + assertArrayEquals(new long[]{200L, 299L}, ranges.get(1)); + assertArrayEquals(new long[]{-1L, 50L}, ranges.get(2)); + } + + + @Test + void testTwoWeird() { + List ranges = RangeParser.parse("bytes=0-0,-1"); + assertNotNull(ranges); + assertEquals(2, ranges.size()); + assertArrayEquals(new long[]{0L, 0L}, ranges.get(0)); + assertArrayEquals(new long[]{-1L, 1L}, ranges.get(1)); + } + + @Test + void testNotBytes() { + assertThrows(IllegalArgumentException.class, () -> RangeParser.parse("bits=0-10")); + } + + + @Test + void testSuffixDoubleNeg() { + assertThrows(IllegalArgumentException.class, () -> RangeParser.parse("bytes=--64")); + } + + + @Test + void testSuffixClosed() { + assertThrows(IllegalArgumentException.class, () -> + RangeParser.parse("bytes=-50-100")); + } + + + @Test + void testSuffixDoubleClosed() { + assertThrows(IllegalArgumentException.class, () -> RangeParser.parse("bytes=-50--100")); + } + + @Test + void testInterpret(){ + + assertArrayEquals(new long[]{0L, 10L}, RangeParser.interpret(new long[]{0L, 10L}, 100)); + assertArrayEquals(new long[]{0L, 0L}, RangeParser.interpret(new long[]{0L, 0L}, 100)); + assertArrayEquals(new long[]{8L, 12L}, RangeParser.interpret(new long[]{8L, 12L}, 100)); + assertArrayEquals(new long[]{8L, 99L}, RangeParser.interpret(new long[]{8L, 100L}, 100)); + assertArrayEquals(new long[]{8L, 99L}, RangeParser.interpret(new long[]{8L, 200L}, 100)); + + } + +// probably invalid assertArrayEquals(new long[]{8L, 100L}, RangeParser.interpret(new long[]{100L, 200L}, 100)); +} From ca9f0380b91f6a0052cb6df98d9f7663478cb91b Mon Sep 17 00:00:00 2001 From: rma-rripken <89810919+rma-rripken@users.noreply.github.com> Date: Fri, 2 Jan 2026 16:46:59 -0800 Subject: [PATCH 03/11] Changed range-requests to use more efficient methods. --- .../api/BinaryTimeSeriesValueController.java | 32 +- .../java/cwms/cda/api/BlobController.java | 168 ++++----- .../cwms/cda/api/ForecastFileController.java | 19 +- .../main/java/cwms/cda/api/RangeParser.java | 33 +- .../java/cwms/cda/api/RangeRequestUtil.java | 137 ++++++-- .../java/cwms/cda/data/dao/BlobAccess.java | 6 +- .../main/java/cwms/cda/data/dao/BlobDao.java | 130 ++++--- .../cda/data/dao/ForecastInstanceDao.java | 11 +- .../cda/data/dao/ObjectStorageBlobDao.java | 330 ++++++++++-------- .../cwms/cda/data/dao/StreamConsumer.java | 10 + .../BlobControllerObjectStorageTestIT.java | 32 +- .../cwms/cda/api/BlobControllerTestIT.java | 42 +++ .../java/cwms/cda/api/RangeParserTest.java | 21 +- .../java/cwms/cda/data/dao/BlobDaoTest.java | 33 ++ 14 files changed, 635 insertions(+), 369 deletions(-) create mode 100644 cwms-data-api/src/main/java/cwms/cda/data/dao/StreamConsumer.java create mode 100644 cwms-data-api/src/test/java/cwms/cda/data/dao/BlobDaoTest.java diff --git a/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java b/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java index 76d668c01..0fb4c5471 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java @@ -29,16 +29,17 @@ import com.codahale.metrics.Timer; import cwms.cda.api.errors.CdaError; import cwms.cda.data.dao.BlobDao; +import cwms.cda.data.dao.StreamConsumer; import io.javalin.http.Context; import io.javalin.http.Handler; import io.javalin.plugin.openapi.annotations.OpenApi; import io.javalin.plugin.openapi.annotations.OpenApiContent; import io.javalin.plugin.openapi.annotations.OpenApiParam; import io.javalin.plugin.openapi.annotations.OpenApiResponse; +import org.jetbrains.annotations.NotNull; import org.jooq.DSLContext; import javax.servlet.http.HttpServletResponse; -import java.io.InputStream; import static com.codahale.metrics.MetricRegistry.name; import static cwms.cda.api.Controllers.*; @@ -84,26 +85,37 @@ private Timer.Context markAndTime(String subject) { )}, tags = {BinaryTimeSeriesController.TAG} ) - public void handle(Context ctx) { + public void handle(@NotNull Context ctx) { //Implementation will change with new CWMS schema //https://www.hec.usace.army.mil/confluence/display/CWMS/2024-02-29+Task2A+Text-ts+and+Binary-ts+Design try (Timer.Context ignored = markAndTime(GET_ALL)) { String binaryId = requiredParam(ctx, BLOB_ID); String officeId = requiredParam(ctx, OFFICE); DSLContext dsl = getDslContext(ctx); + + final Long offset; + final Long end ; + long[] ranges = RangeParser.parseFirstRange(ctx.header(io.javalin.core.util.Header.RANGE)); + if (ranges != null) { + offset = ranges[0]; + end = ranges[1]; + } else { + offset = null; + end = null; + } + BlobDao blobDao = new BlobDao(dsl); - blobDao.getBlob(binaryId, officeId, (blob, mediaType) -> { - if (blob == null) { + StreamConsumer streamConsumer = (is, isPosition, mediaType, totalLength) -> { + if (is == null) { ctx.status(HttpServletResponse.SC_NOT_FOUND).json(new CdaError("Unable to find " + "blob based on given parameters")); } else { - long size = blob.length(); - requestResultSize.update(size); - try (InputStream is = blob.getBinaryStream()) { - RangeRequestUtil.seekableStream(ctx, is, mediaType, size); - } + requestResultSize.update(totalLength); + RangeRequestUtil.seekableStream(ctx, is, isPosition, mediaType, totalLength); } - }); + }; + + blobDao.getBlob(binaryId, officeId, streamConsumer, offset, end); } } } diff --git a/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java b/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java index 44cf20104..a4b301b18 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java @@ -8,11 +8,7 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import cwms.cda.api.errors.CdaError; -import cwms.cda.data.dao.BlobDao; -import cwms.cda.data.dao.BlobAccess; -import cwms.cda.data.dao.ObjectStorageBlobDao; -import cwms.cda.data.dao.ObjectStorageConfig; -import cwms.cda.data.dao.JooqDao; +import cwms.cda.data.dao.*; import cwms.cda.data.dto.Blob; import cwms.cda.data.dto.Blobs; import cwms.cda.data.dto.CwmsDTOPaginated; @@ -29,7 +25,7 @@ import io.javalin.plugin.openapi.annotations.OpenApiParam; import io.javalin.plugin.openapi.annotations.OpenApiRequestBody; import io.javalin.plugin.openapi.annotations.OpenApiResponse; -import java.io.InputStream; + import java.util.Optional; import javax.servlet.http.HttpServletResponse; @@ -94,32 +90,32 @@ private boolean isObjectStorageEnabled() { } @OpenApi( - queryParams = { - @OpenApiParam(name = OFFICE, - description = "Specifies the owning office. If this field is not " - + "specified, matching information from all offices shall be " - + "returned."), - @OpenApiParam(name = PAGE, - description = "This end point can return a lot of data, this " - + "identifies where in the request you are. This is an opaque" - + " value, and can be obtained from the 'next-page' value in " - + "the response."), - @OpenApiParam(name = PAGE_SIZE, - type = Integer.class, - description = "How many entries per page returned. Default " - + DEFAULT_PAGE_SIZE + "."), - @OpenApiParam(name = LIKE, - description = "Posix regular expression " - + "describing the blob id's you want") - }, - responses = {@OpenApiResponse(status = STATUS_200, - description = "A list of blobs.", - content = { - @OpenApiContent(type = Formats.JSON, from = Blobs.class), - @OpenApiContent(type = Formats.JSONV2, from = Blobs.class), - }) - }, - tags = {TAG} + queryParams = { + @OpenApiParam(name = OFFICE, + description = "Specifies the owning office. If this field is not " + + "specified, matching information from all offices shall be " + + "returned."), + @OpenApiParam(name = PAGE, + description = "This end point can return a lot of data, this " + + "identifies where in the request you are. This is an opaque" + + " value, and can be obtained from the 'next-page' value in " + + "the response."), + @OpenApiParam(name = PAGE_SIZE, + type = Integer.class, + description = "How many entries per page returned. Default " + + DEFAULT_PAGE_SIZE + "."), + @OpenApiParam(name = LIKE, + description = "Posix regular expression " + + "describing the blob id's you want") + }, + responses = {@OpenApiResponse(status = STATUS_200, + description = "A list of blobs.", + content = { + @OpenApiContent(type = Formats.JSON, from = Blobs.class), + @OpenApiContent(type = Formats.JSONV2, from = Blobs.class), + }) + }, + tags = {TAG} ) @Override public void getAll(@NotNull Context ctx) { @@ -161,20 +157,20 @@ public void getAll(@NotNull Context ctx) { description = "Returns the binary value of the requested blob as a seekable stream with the " + "appropriate media type.", queryParams = { - @OpenApiParam(name = OFFICE, description = "Specifies the owning office."), - @OpenApiParam(name = BLOB_ID, description = "If this _query_ parameter is provided the id _path_ parameter " - + "is ignored and the value of the query parameter is used. " - + "Note: this query parameter is necessary for id's that contain '/' or other special " - + "characters. This is due to limitations in path pattern matching. " - + "We will likely add support for encoding the ID in the path in the future. For now use the id field for those IDs. " - + "Client libraries should detect slashes and choose the appropriate field. \"ignored\" is suggested for the path endpoint."), + @OpenApiParam(name = OFFICE, description = "Specifies the owning office."), + @OpenApiParam(name = BLOB_ID, description = "If this _query_ parameter is provided the id _path_ parameter " + + "is ignored and the value of the query parameter is used. " + + "Note: this query parameter is necessary for id's that contain '/' or other special " + + "characters. This is due to limitations in path pattern matching. " + + "We will likely add support for encoding the ID in the path in the future. For now use the id field for those IDs. " + + "Client libraries should detect slashes and choose the appropriate field. \"ignored\" is suggested for the path endpoint."), }, responses = { - @OpenApiResponse(status = STATUS_200, - description = "Returns requested blob.", - content = { - @OpenApiContent(type = "application/octet-stream") - }) + @OpenApiResponse(status = STATUS_200, + description = "Returns requested blob.", + content = { + @OpenApiContent(type = "application/octet-stream") + }) }, tags = {TAG} ) @@ -192,23 +188,33 @@ public void getOne(@NotNull Context ctx, @NotNull String blobId) { String officeQP = ctx.queryParam(OFFICE); Optional office = Optional.ofNullable(officeQP); - BlobDao.BlobConsumer tripleConsumer = (blob, mediaType) -> { - if (blob == null) { + final Long offset; + final Long end; + long[] ranges = RangeParser.parseFirstRange(ctx.header(io.javalin.core.util.Header.RANGE)); + if (ranges != null) { + offset = ranges[0]; + end = ranges[1]; + } else { + offset = null; + end = null; + } + + StreamConsumer consumer = (is, isPosition, mediaType, totalLength) -> { + if (is == null) { ctx.status(HttpServletResponse.SC_NOT_FOUND).json(new CdaError("Unable to find " + "blob based on given parameters")); } else { - long size = blob.length(); - requestResultSize.update(size); - try (InputStream is = blob.getBinaryStream()) { // is OracleBlobInputStream - RangeRequestUtil.seekableStream(ctx, is, mediaType, size); - } + requestResultSize.update(totalLength); + // is OracleBlobInputStream or something from MinIO + RangeRequestUtil.seekableStream(ctx, is, isPosition, mediaType, totalLength); } }; + if (office.isPresent()) { - dao.getBlob(blobId, office.get(), tripleConsumer); + dao.getBlob(blobId, office.get(), consumer, offset, end); } else { - dao.getBlob(blobId, tripleConsumer); + dao.getBlob(blobId, null, consumer, offset, end); } } } @@ -217,13 +223,13 @@ public void getOne(@NotNull Context ctx, @NotNull String blobId) { @OpenApi( description = "Create new Blob", requestBody = @OpenApiRequestBody( - content = { - @OpenApiContent(from = Blob.class, type = Formats.JSONV2) - }, - required = true), + content = { + @OpenApiContent(from = Blob.class, type = Formats.JSONV2) + }, + required = true), queryParams = { - @OpenApiParam(name = FAIL_IF_EXISTS, type = Boolean.class, - description = "Create will fail if provided ID already exists. Default: true") + @OpenApiParam(name = FAIL_IF_EXISTS, type = Boolean.class, + description = "Create will fail if provided ID already exists. Default: true") }, method = HttpMethod.POST, tags = {TAG} @@ -245,21 +251,21 @@ public void create(@NotNull Context ctx) { @OpenApi( description = "Update an existing Blob", pathParams = { - @OpenApiParam(name = BLOB_ID, description = "The blob identifier to be updated"), + @OpenApiParam(name = BLOB_ID, description = "The blob identifier to be updated"), }, requestBody = @OpenApiRequestBody( - content = { - @OpenApiContent(from = Blob.class, type = Formats.JSONV2), - @OpenApiContent(from = Blob.class, type = Formats.JSON) - }, - required = true), + content = { + @OpenApiContent(from = Blob.class, type = Formats.JSONV2), + @OpenApiContent(from = Blob.class, type = Formats.JSON) + }, + required = true), queryParams = { - @OpenApiParam(name = BLOB_ID, description = "If this _query_ parameter is provided the id _path_ parameter " - + "is ignored and the value of the query parameter is used. " - + "Note: this query parameter is necessary for id's that contain '/' or other special " - + "characters. This is due to limitations in path pattern matching. " - + "We will likely add support for encoding the ID in the path in the future. For now use the id field for those IDs. " - + "Client libraries should detect slashes and choose the appropriate field. \"ignored\" is suggested for the path endpoint."), + @OpenApiParam(name = BLOB_ID, description = "If this _query_ parameter is provided the id _path_ parameter " + + "is ignored and the value of the query parameter is used. " + + "Note: this query parameter is necessary for id's that contain '/' or other special " + + "characters. This is due to limitations in path pattern matching. " + + "We will likely add support for encoding the ID in the path in the future. For now use the id field for those IDs. " + + "Client libraries should detect slashes and choose the appropriate field. \"ignored\" is suggested for the path endpoint."), }, method = HttpMethod.PATCH, tags = {TAG} @@ -292,7 +298,7 @@ public void update(@NotNull Context ctx, @NotNull String blobId) { + "updating a blob"); } - if(!blob.getId().equals(blobId)) { + if (!blob.getId().equals(blobId)) { throw new FormattingException("The blob id parameter does not match the blob id in the body. " + "The blob end-point does not support renaming blobs. " + "Create a new blob with the new id and delete the old one."); @@ -307,17 +313,17 @@ public void update(@NotNull Context ctx, @NotNull String blobId) { @OpenApi( description = "Deletes requested blob", pathParams = { - @OpenApiParam(name = BLOB_ID, description = "The blob identifier to be deleted"), + @OpenApiParam(name = BLOB_ID, description = "The blob identifier to be deleted"), }, queryParams = { - @OpenApiParam(name = OFFICE, required = true, description = "Specifies the " - + "owning office of the blob to be deleted"), - @OpenApiParam(name = BLOB_ID, description = "If this _query_ parameter is provided the id _path_ parameter " - + "is ignored and the value of the query parameter is used. " - + "Note: this query parameter is necessary for id's that contain '/' or other special " - + "characters. This is due to limitations in path pattern matching. " - + "We will likely add support for encoding the ID in the path in the future. For now use the id field for those IDs. " - + "Client libraries should detect slashes and choose the appropriate field. \"ignored\" is suggested for the path endpoint."), + @OpenApiParam(name = OFFICE, required = true, description = "Specifies the " + + "owning office of the blob to be deleted"), + @OpenApiParam(name = BLOB_ID, description = "If this _query_ parameter is provided the id _path_ parameter " + + "is ignored and the value of the query parameter is used. " + + "Note: this query parameter is necessary for id's that contain '/' or other special " + + "characters. This is due to limitations in path pattern matching. " + + "We will likely add support for encoding the ID in the path in the future. For now use the id field for those IDs. " + + "Client libraries should detect slashes and choose the appropriate field. \"ignored\" is suggested for the path endpoint."), }, method = HttpMethod.DELETE, tags = {TAG} diff --git a/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java b/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java index 0fcd431c7..8224ab925 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java @@ -29,6 +29,7 @@ import com.codahale.metrics.Timer; import cwms.cda.api.errors.CdaError; import cwms.cda.data.dao.ForecastInstanceDao; +import cwms.cda.data.dao.StreamConsumer; import cwms.cda.helpers.DateUtils; import io.javalin.http.Context; import io.javalin.http.Handler; @@ -36,9 +37,9 @@ import io.javalin.plugin.openapi.annotations.OpenApiContent; import io.javalin.plugin.openapi.annotations.OpenApiParam; import io.javalin.plugin.openapi.annotations.OpenApiResponse; +import org.jetbrains.annotations.NotNull; import javax.servlet.http.HttpServletResponse; -import java.io.InputStream; import java.time.Instant; import static com.codahale.metrics.MetricRegistry.name; @@ -92,7 +93,7 @@ private Timer.Context markAndTime(String subject) { }, tags = {ForecastSpecController.TAG} ) - public void handle(Context ctx) { + public void handle(@NotNull Context ctx) { String specId = requiredParam(ctx, NAME); String office = requiredParam(ctx, OFFICE); String designator = ctx.queryParamAsClass(DESIGNATOR, String.class).allowNullable().get(); @@ -102,18 +103,16 @@ public void handle(Context ctx) { Instant issueInstant = DateUtils.parseUserDate(issueDate, "UTC").toInstant(); try (Timer.Context ignored = markAndTime(GET_ALL)) { ForecastInstanceDao dao = new ForecastInstanceDao(getDslContext(ctx)); - dao.getFileBlob(office, specId, designator, forecastInstant, issueInstant, (blob, mediaType) -> { - if (blob == null) { + StreamConsumer streamConsumer = (is, isPosition, mediaType, totalLength) -> { + if (is == null) { ctx.status(HttpServletResponse.SC_NOT_FOUND).json(new CdaError("Unable to find " + "blob based on given parameters")); } else { - long size = blob.length(); - requestResultSize.update(size); - try (InputStream is = blob.getBinaryStream()) { - RangeRequestUtil.seekableStream(ctx, is, mediaType, size); - } + requestResultSize.update(totalLength); + RangeRequestUtil.seekableStream(ctx, is, isPosition, mediaType, totalLength); } - }); + }; + dao.getFileBlob(office, specId, designator, forecastInstant, issueInstant, streamConsumer); } } } \ No newline at end of file diff --git a/cwms-data-api/src/main/java/cwms/cda/api/RangeParser.java b/cwms-data-api/src/main/java/cwms/cda/api/RangeParser.java index a2e84ecfa..ccfb4e204 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/RangeParser.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/RangeParser.java @@ -1,6 +1,6 @@ package cwms.cda.api; -import org.jspecify.annotations.NonNull; +import org.jetbrains.annotations.NotNull; import java.util.*; import java.util.regex.*; @@ -29,8 +29,8 @@ public class RangeParser { * If the range only includes a negative byte (e.g bytes=-50) then -1 is returned as the start of the range * and -1*end is returned as the end of the range. bytes=-50 will result in [-1,50] * - * @param header the HTTP Range header - * @return a list of byte ranges + * @param header the HTTP Range header this should start with "bytes=" if it is null or empty an empty list is returned + * @return a list of long[2] holding the ranges */ public static List parse(String header) { if (header == null || header.isEmpty() ) { @@ -47,7 +47,17 @@ public static List parse(String header) { return retval; } - public static @NonNull List parseRanges(String rangePart) { + public static long[] parseFirstRange(String header) { + if(header != null) { + List ranges = RangeParser.parse(header); + if (!ranges.isEmpty()) { + return ranges.get(0); + } + } + return null; + } + + public static @NotNull List parseRanges(String rangePart) { if( rangePart == null || rangePart.isEmpty() ){ throw new IllegalArgumentException("Invalid range specified: " + rangePart); } @@ -92,11 +102,22 @@ public static long[] interpret(long[] inputs, long totalBytes){ start = totalBytes - end; end = totalBytes - 1; } else { - if (start < 0 || end < start) { + if (start < 0 ) { throw new IllegalArgumentException("Invalid range specified: " + Arrays.toString(inputs)); } - start = Math.min(start, totalBytes - 1); + if(end == -1L){ + end = totalBytes - 1; + } + + if(end < start){ + throw new IllegalArgumentException("Invalid range specified: " + Arrays.toString(inputs)); + } + + if(start > totalBytes - 1){ + throw new IllegalArgumentException("Can't satisfy range request: " + Arrays.toString(inputs) + " Range starts beyond end of file."); + } + end = Math.min(end, totalBytes - 1); } diff --git a/cwms-data-api/src/main/java/cwms/cda/api/RangeRequestUtil.java b/cwms-data-api/src/main/java/cwms/cda/api/RangeRequestUtil.java index 0f3fefb9b..97cdbc6f0 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/RangeRequestUtil.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/RangeRequestUtil.java @@ -6,6 +6,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.Reader; import java.util.List; import org.apache.commons.io.IOUtils; @@ -16,23 +17,37 @@ private RangeRequestUtil() { // utility class } + + public static void seekableStream(Context ctx, InputStream is, String mediaType, long totalBytes) throws IOException { + seekableStream(ctx, is, 0, mediaType, totalBytes); + } + /** - * Javalin has a method very similar to this in its Context class. The issue is that Javalin decided to - * take the InputStream, wrap it in a CompletedFuture and then process the request asynchronously. This - * causes problems when the InputStream is tied to a database connection that gets closed before the - * async processing happens. This method doesn't do the async thing but tries to support the rest. + * This method copies data from InputStream is and into the Context response OutputStream. + * If the request include a Range request header than the response will be a partial response for the first range. + * Javalin has a similar method in its Context class that handles the streaming asynchronously. The Javalin + * method caused problems when our database connections were closed before the streaming completes. + * Both SQL Blobs and S3 streams are capable of retrieving Streams at a user-controlled offset. Those methods + * should be used and the offset passed in the isPosition parameter. + * If additional skipping needs to be done, this method will call InputStream.skip() which may have some + * implications for specific implementations but works efficiently for others. * @param ctx the Javalin context * @param is the input stream + * @param isPostion the current position in the input stream. * @param mediaType the content type * @param totalBytes the total number of bytes in the input stream * @throws IOException if either of the streams throw an IOException */ - public static void seekableStream(Context ctx, InputStream is, String mediaType, long totalBytes) throws IOException { + public static void seekableStream(Context ctx, InputStream is, long isPostion, String mediaType, long totalBytes) throws IOException { if (ctx.header(Header.RANGE) == null) { // Not a range request. ctx.res.setContentType(mediaType); + if(isPostion > 0){ + throw new IllegalArgumentException("Input stream position must be 0 for non-range requests"); + } + // Javalin's version of this method doesn't set the content-length // Not setting the content-length makes the servlet container use Transfer-Encoding=chunked. // Chunked is a worse experience overall, seems like we should just set the length if we know it. @@ -53,40 +68,97 @@ public static void seekableStream(Context ctx, InputStream is, String mediaType, // We could ignore the range request entirely and return the full body with 200 // We could implement support for multiple ranges logger.atInfo().log("Multiple ranges requested, using first and ignoring additional ranges"); - } else { - requestedRange = RangeParser.interpret(requestedRange, totalBytes); + } + requestedRange = RangeParser.interpret(requestedRange, totalBytes); + + long from = requestedRange[0]; + long to = requestedRange[1]; - long from = requestedRange[0]; - long to = requestedRange[1]; + ctx.status(206); - ctx.status(206); + ctx.header(Header.ACCEPT_RANGES, "bytes"); + ctx.header(Header.CONTENT_RANGE, "bytes " + from + "-" + to + "/" + totalBytes); - ctx.header(Header.ACCEPT_RANGES, "bytes"); - ctx.header(Header.CONTENT_RANGE, "bytes " + from + "-" + to + "/" + totalBytes); + ctx.res.setContentType(mediaType); + ctx.header(Header.CONTENT_LENGTH, String.valueOf(Math.min(to - from + 1, totalBytes))); - ctx.res.setContentType(mediaType); - ctx.header(Header.CONTENT_LENGTH, String.valueOf(Math.min(to - from + 1, totalBytes))); - writeRange(ctx.res.getOutputStream(), is, from, Math.min(to, totalBytes - 1)); + if(isPostion < from){ + skip(is, from-isPostion); } + long len = Math.min(to, totalBytes - 1) - from + 1; + + // If the inputOffset to IOUtils.copyLarge is not 0 then IOUtils will do its own skipping. For reasons + // that IOUtils explains (quirks of certain streams) it does its skipping via read(). Using read() has + // performance implications b/c all the skipped data still gets retrieved and copied to memory. In our + // use-case the data comes from a database blob/clob/s3. Copying (potential) gigabytes of data we + // don't need across the network is not ideal. We've tried to address this performance impact in two + // ways. 1) We allow callers to pass an input stream that is already positioned so that skipping isn't + // needed. 2) If skipping is needed we call InputStream.skip() directly (this is efficient for the + // stream from Oracle Blobs). + + // We do our own skipping and then have IOUtils copy. + IOUtils.copyLarge(is, (OutputStream) ctx.res.getOutputStream(), 0, len); } } /** - * Writes a range of bytes from the input stream to the output stream. - * @param out the output stream to write to. - * @param in the input stream to read from. It is assumed that this stream is open and positioned at 0. - * @param from the starting byte position to read from (inclusive) - * @param to the ending byte position to read to (inclusive) - * @throws IOException if either of the streams throw an IOException + * Similar to seekableStream but for Reader. For some reason the java.sql.Clob does not specify a method to get a + * Stream that is already positioned but there is a method to position a Reader. + * @param ctx + * @param reader + * @param isPostion + * @param mediaType + * @param totalBytes + * @throws IOException */ - public static void writeRange(OutputStream out, InputStream in, long from, long to) throws IOException { - skip(in, from); - long len = to - from + 1; - - // If the inputOffset to IOUtils.copyLarge is not 0 then IOUtils will do its own skipping. For reasons - // that IOUtils explains (quirks of certain streams) it does its skipping via read(). Using read() has performance - // implications b/c all the skipped data gets copied to memory. We do our own skipping and then have IOUtils copy. - IOUtils.copyLarge(in, out, 0, len); + public static void seekableReader(Context ctx, Reader reader, long isPostion, String mediaType, long totalBytes) throws IOException { + + if (ctx.header(Header.RANGE) == null) { + // Not a range request. + ctx.res.setContentType(mediaType); + + if(isPostion > 0){ + throw new IllegalArgumentException("Input stream position must be 0 for non-range requests"); + } + + ctx.header(Header.CONTENT_LENGTH, String.valueOf(totalBytes)); + + IOUtils.copyLarge(reader, ctx.res.getWriter(), 0, totalBytes); + } else { + String rangeHeader = ctx.header(Header.RANGE); + + List ranges = RangeParser.parse(rangeHeader); + + long[] requestedRange = ranges.get(0); + if( ranges.size() > 1 ){ + // we support range requests but we not currently supporting multiple ranges. + // Range request are optional so we have choices what to do if multiple ranges are requested: + // We could return 416 and hope the client figures out to only send one range + // We could service the first range with 206 and ignore the other ranges + // We could ignore the range request entirely and return the full body with 200 + // We could implement support for multiple ranges + logger.atInfo().log("Multiple ranges requested, using first and ignoring additional ranges"); + } + requestedRange = RangeParser.interpret(requestedRange, totalBytes); + + long from = requestedRange[0]; + long to = requestedRange[1]; + + ctx.status(206); + + ctx.header(Header.ACCEPT_RANGES, "bytes"); + ctx.header(Header.CONTENT_RANGE, "bytes " + from + "-" + to + "/" + totalBytes); + + ctx.res.setContentType(mediaType); + ctx.header(Header.CONTENT_LENGTH, String.valueOf(Math.min(to - from + 1, totalBytes))); + + if(isPostion < from){ + skip(reader, from-isPostion); + } + long len = Math.min(to, totalBytes - 1) - from + 1; + + IOUtils.copyLarge(reader, ctx.res.getWriter(), 0, len); + } } private static void skip(InputStream is, long toSkip) throws IOException { @@ -96,4 +168,11 @@ private static void skip(InputStream is, long toSkip) throws IOException { } } + private static void skip(Reader reader, long toSkip) throws IOException { + while (toSkip > 0) { + long skipped = reader.skip(toSkip); + toSkip -= skipped; + } + } + } diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobAccess.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobAccess.java index f4d50ace1..0616a6dfa 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobAccess.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobAccess.java @@ -12,11 +12,7 @@ public interface BlobAccess { Optional getByUniqueName(String id, String office); - void getBlob(String id, String office, BlobDao.BlobConsumer consumer); - - default void getBlob(String id, BlobDao.BlobConsumer consumer) { - getBlob(id, null, consumer); - } + void getBlob(String id, String office, StreamConsumer consumer, @Nullable Long offset, @Nullable Long end); void create(Blob blob, boolean failIfExists, boolean ignoreNulls); diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java index 14737f053..d0875e4f5 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java @@ -1,5 +1,7 @@ package cwms.cda.data.dao; +import com.google.common.flogger.FluentLogger; +import cwms.cda.api.RangeParser; import cwms.cda.api.errors.NotFoundException; import cwms.cda.data.dto.Blob; import cwms.cda.data.dto.Blobs; @@ -14,6 +16,7 @@ import org.jooq.ResultQuery; import org.jooq.SelectLimitPercentStep; import org.jooq.Table; + import usace.cwms.db.jooq.codegen.packages.CWMS_TEXT_PACKAGE; import usace.cwms.db.jooq.codegen.tables.AV_CWMS_MEDIA_TYPE; import usace.cwms.db.jooq.codegen.tables.AV_OFFICE; @@ -35,7 +38,7 @@ import static org.jooq.impl.DSL.upper; public class BlobDao extends JooqDao implements BlobAccess { - + static FluentLogger logger = FluentLogger.forEnclosingClass(); public static final String ID = "ID"; public static final String DESCRIPTION = "DESCRIPTION"; public static final String OFFICE_CODE = "OFFICE_CODE"; @@ -86,7 +89,8 @@ public Optional getByUniqueName(String id, String limitToOffice) { } @Override - public void getBlob(String id, String office, BlobConsumer consumer) { + public void getBlob(String id, String office, StreamConsumer consumer, @Nullable Long offset, @Nullable Long end) { + // Not using jOOQ here because we want the java.sql.Blob and not an automatic field binding. We want // blob so that we can pull out a stream to the data and pass that to javalin. // If the request included Content-Ranges Javalin can have the stream skip to the correct @@ -98,54 +102,97 @@ public void getBlob(String id, String office, BlobConsumer consumer) { // connection(dsl, connection -> { - try (PreparedStatement preparedStatement = connection.prepareStatement(BLOB_WITH_OFFICE)) { - preparedStatement.setString(1, office); - preparedStatement.setString(2, id); - try (ResultSet resultSet = preparedStatement.executeQuery()) { - if (resultSet.next()) { - String mediaType = resultSet.getString("MEDIA_TYPE_ID"); - java.sql.Blob blob = resultSet.getBlob("VALUE"); - try { - consumer.accept(blob, mediaType); - } finally { - if (blob != null) { - blob.free(); - } - } - } else { - throw new NotFoundException("Unable to find blob with id " + id + " in office " + office); - } + if(office == null ){ + try (PreparedStatement preparedStatement = connection.prepareStatement(BLOB_QUERY)) { + preparedStatement.setString(1, id); + executeAndHandle(consumer, offset, end, preparedStatement, "Unable to find blob with id " + id); + } + } else { + try (PreparedStatement preparedStatement = connection.prepareStatement(BLOB_WITH_OFFICE)) { + preparedStatement.setString(1, office); + preparedStatement.setString(2, id); + + executeAndHandle(consumer, offset, end, preparedStatement, "Unable to find blob with id " + id + " in office " + office); } } }); } - public void getBlob(String id, BlobConsumer consumer) { - - connection(dsl, connection -> { - try (PreparedStatement preparedStatement = connection.prepareStatement(BLOB_QUERY)) { - preparedStatement.setString(1, id); + private static void executeAndHandle(StreamConsumer consumer, @Nullable Long offset, @Nullable Long end, PreparedStatement preparedStatement, String message) throws SQLException, IOException { + try (ResultSet resultSet = preparedStatement.executeQuery()) { + if (resultSet.next()) { + handleResultSet(consumer, offset, end, resultSet); + } else { + throw new NotFoundException(message); + } + } + } - try (ResultSet resultSet = preparedStatement.executeQuery()) { - if (resultSet.next()) { - String mediaType = resultSet.getString("MEDIA_TYPE_ID"); - java.sql.Blob blob = resultSet.getBlob("VALUE"); - try { - consumer.accept(blob, mediaType); - } finally { - if (blob != null) { - blob.free(); - } - } - } else { - throw new NotFoundException("Unable to find blob with id " + id); - } + /** + * + * @param consumer + * @param offset where to start reading. 0 is first byte + * @param end position of last byte to include. inclusive. 0 would mean only return the first byte. + * @param resultSet + * @throws SQLException + * @throws IOException + */ + private static void handleResultSet(StreamConsumer consumer, @Nullable Long offset, @Nullable Long end, ResultSet resultSet) throws SQLException, IOException { + String mediaType = resultSet.getString("MEDIA_TYPE_ID"); + java.sql.Blob blob = resultSet.getBlob("VALUE"); + try { + long totalLength = blob.length(); + if (offset != null) { + long pos = offset + 1; // For getBinaryStream the first byte is at 1. + long length = getLength(offset, end, totalLength); + + logger.atFine().log("Reading blob at pos %s, length %s, totalLength %s", pos, length, totalLength); + try (InputStream stream = blob.getBinaryStream(pos, length)) { + consumer.accept(stream, offset, mediaType, totalLength); + } catch (SQLException e) { + logger.atWarning().withCause(e).log("Error reading blob at offset %s, length %s, totalLength %s", offset, length, totalLength); + throw e; + } + } else { + try (InputStream stream = blob.getBinaryStream()) { + consumer.accept(stream, 0, mediaType, totalLength); } } - }); + } finally { + blob.free(); + } + } + + /** + * + * @param offset the index of the first byte to read. Like http range-requests 0 is first byte. -1 is last byte. + * @param end the index of the last byte to read, inclusive. null reads until the end of the blob. -1 is also last byte. + * + * @param totalLength the total length of the blob + * @return the length of the range to read + */ + static long getLength(@NotNull Long offset, @Nullable Long end, long totalLength) { + + long length; + if(end != null){ + // The length we are getting passed in is from range-request and could be negative to indicate suffix + long[] startEnd = RangeParser.interpret(new long[]{offset, end}, totalLength); + if(startEnd != null){ + offset = startEnd[0]; + end = startEnd[1]; + } + + length = end - offset + 1; + } else { + // if its not set just assume we are reading until the end of blob. + // Consumer can always close stream early. + length = totalLength - offset; + } + return length; } + public List getAll(String officeId, String like) { String queryStr = "SELECT AT_BLOB.ID, AT_BLOB.DESCRIPTION, CWMS_MEDIA_TYPE.MEDIA_TYPE_ID, CWMS_OFFICE.OFFICE_ID\n" + " FROM CWMS_20.AT_BLOB \n" @@ -341,8 +388,5 @@ public static byte[] readFully(@NotNull InputStream stream) throws IOException { return output.toByteArray(); } - @FunctionalInterface - public interface BlobConsumer { - void accept(java.sql.Blob blob, String mediaType) throws SQLException, IOException; - } + } diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/ForecastInstanceDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/ForecastInstanceDao.java index 8b70042e8..610ecaedf 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/ForecastInstanceDao.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/ForecastInstanceDao.java @@ -149,7 +149,7 @@ private static String mapToJson(Map metadata) { private static Map mapFromJson(String forecastInfo) { try { - return JsonV2.buildObjectMapper().readValue(forecastInfo, new TypeReference>() { + return JsonV2.buildObjectMapper().readValue(forecastInfo, new TypeReference<>() { }); } catch (JsonProcessingException e) { throw new IllegalArgumentException("Error serializing forecast info to JSON", e); @@ -314,7 +314,7 @@ public void delete(String office, String name, String designator, } public void getFileBlob(String office, String name, String designator, - Instant forecastDate, Instant issueDate, BlobDao.BlobConsumer consumer) { + Instant forecastDate, Instant issueDate, StreamConsumer consumer) { String query = FILE_QUERY + FILE_CONDITIONS; connection(dsl, c -> { @@ -337,8 +337,8 @@ public void getFileBlob(String office, String name, String designator, } Blob blob = (Blob) attributes[5]; - try { - consumer.accept(blob, mediaType); + try (InputStream is = blob.getBinaryStream()){ + consumer.accept(is, 0, mediaType, blob.length()); return; } finally { if (blob != null) { @@ -348,7 +348,8 @@ public void getFileBlob(String office, String name, String designator, } } } - consumer.accept(null, null); + // If we get here there was some problem finding the stream. + throw new NotFoundException("Forecast Instance file not found"); } } }); diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/ObjectStorageBlobDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/ObjectStorageBlobDao.java index 9a7e15395..07518a7c8 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/ObjectStorageBlobDao.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/ObjectStorageBlobDao.java @@ -1,6 +1,7 @@ package cwms.cda.data.dao; import com.google.common.flogger.FluentLogger; +import cwms.cda.api.RangeParser; import cwms.cda.api.errors.AlreadyExists; import cwms.cda.api.errors.FieldLengthExceededException; import cwms.cda.api.errors.NotFoundException; @@ -12,14 +13,13 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import javax.sql.rowset.serial.SerialBlob; -import java.io.ByteArrayOutputStream; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; +import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -32,6 +32,8 @@ * Object Storage-backed implementation using MinIO Java client. keys like OFFICE/ID_UPPER. */ public class ObjectStorageBlobDao implements BlobAccess { + public static final String DESCRIPTION = "description"; + public static final String NO_SUCH_KEY = "NoSuchKey"; FluentLogger logger = FluentLogger.forEnclosingClass(); public static final int ID_LENGTH_LIMIT = 256; // This is to match pl/sql limit @@ -77,63 +79,104 @@ private static MinioClient buildClient(ObjectStorageConfig cfg) { } if (cursorOffice != null && cursorId != null) { - startAfter = key(cursorOffice, cursorId); + startAfter = buildName(cursorOffice, cursorId); } } - Pattern likePattern = null; if (like != null && !like.isEmpty() && !".*".equals(like)) { likePattern = Pattern.compile(like, Pattern.CASE_INSENSITIVE); } + List collected = getBlobs(pageSize, likePattern, prefix, startAfter); + + Blobs.Builder builder = new Blobs.Builder(cursor, pageSize, 0); + collected.forEach(builder::addBlob); + return builder.build(); + } + private @NotNull List getBlobs(int pageSize, @Nullable Pattern likePattern, String prefix, String startAfter) { List collected = new ArrayList<>(); ListObjectsArgs.Builder args = ListObjectsArgs.builder() .bucket(requiredBucket()) .recursive(true) .maxKeys(pageSize); - if (prefix != null) args = args.prefix(prefix); - if (startAfter != null) args = args.startAfter(startAfter); + if (prefix != null) { + args = args.prefix(prefix); + } + if (startAfter != null){ + args = args.startAfter(startAfter); + } for (Result res : client.listObjects(args.build())) { try { // item.key() like OFFICE/ID Item item = res.get(); - String k = item.objectName(); - int slash = k.indexOf('/'); - if (slash <= 0 || slash >= k.length() - 1) continue; - String off = k.substring(0, slash); - String id = k.substring(slash + 1); - if (likePattern != null && !likePattern.matcher(id).find()) { - continue; - } - // fetch metadata for media type and optional description - try { - StatObjectResponse stat = client.statObject(StatObjectArgs.builder() - .bucket(requiredBucket()) - .object(k) - .build()); - String mediaType = stat.contentType(); - String desc = stat.userMetadata() != null ? stat.userMetadata().getOrDefault("description", null) : null; - collected.add(new Blob(off, id, desc, mediaType, null)); - if (collected.size() >= pageSize) break; - } catch (Exception e) { - // skip items that fail stat + String name = item.objectName(); + if(nameMatches(name, likePattern)) { + try { + Blob blob = getBlob(name); + collected.add(blob); + if (collected.size() >= pageSize) { + break; + } + } catch (Exception e) { + // skip items that fail stat + } } - } catch (Exception ignore) { + } + catch (Exception ignore) { // skip this entry on error } } + return collected; + } - Blobs.Builder builder = new Blobs.Builder(cursor, pageSize, 0); - collected.forEach(builder::addBlob); - return builder.build(); + private @NotNull Blob getBlob(String name) throws ErrorResponseException, InsufficientDataException, InternalException, InvalidKeyException, InvalidResponseException, IOException, NoSuchAlgorithmException, ServerException, XmlParserException { + StatObjectResponse stat = client.statObject(StatObjectArgs.builder() + .bucket(requiredBucket()) + .object(name) + .build()); + String mediaType = stat.contentType(); + String desc = stat.userMetadata() != null ? stat.userMetadata().getOrDefault(DESCRIPTION, null) : null; + return new Blob(officeFromName(name), idFromName(name), desc, mediaType, null); } + public static String officeFromName(String k){ + String off = null; + int slash = k.indexOf('/'); + if (slash > 0 && slash < k.length() - 1) { + off = k.substring(0, slash); + } + return off; + } + + public static String idFromName(String k) { + String id = null; + int slash = k.indexOf('/'); + if (slash > 0 && slash < k.length() - 1) { + id = k.substring(slash + 1); + } + return id; + } + + public static boolean nameMatches(String name, Pattern likePattern) { + boolean nameMatches = false; + + int slash = name.indexOf('/'); + if (slash > 0 && slash < name.length() - 1) { + String id = name.substring(slash + 1); + if (likePattern == null || likePattern.matcher(id).find()) { + nameMatches = true; + } + } + return nameMatches; + } + + @Override public Optional getByUniqueName(String id, String office) { - String k = (office == null || office.isEmpty()) ? findFirstKeyById(id) : key(office, id); + String k = (office == null || office.isEmpty()) ? findFirstKeyById(id) : buildName(office, id); if (k == null) { return Optional.empty(); } @@ -145,152 +188,134 @@ public Optional getByUniqueName(String id, String office) { .object(k) .build()); String mediaType = stat.contentType(); - String desc = stat.userMetadata() != null ? stat.userMetadata().getOrDefault("description", null) : null; + String desc = stat.userMetadata() != null ? stat.userMetadata().getOrDefault(DESCRIPTION, null) : null; return Optional.of(new Blob(officeFromKey, idFromKey, desc, mediaType, null)); } catch (ErrorResponseException ere) { - if ("NoSuchKey".equalsIgnoreCase(ere.errorResponse().code())) { + if (NO_SUCH_KEY.equalsIgnoreCase(ere.errorResponse().code())) { return Optional.empty(); } throw new RuntimeException(ere); - } catch (Exception e) { + } catch (ServerException | InternalException | XmlParserException | InvalidResponseException | + InvalidKeyException | NoSuchAlgorithmException | IOException | InsufficientDataException e) { throw new RuntimeException(e); } } @Override - public void getBlob(String id, String office, BlobDao.BlobConsumer consumer) { - String k = (office == null || office.isEmpty()) ? findFirstKeyById(id) : key(office, id); + public void getBlob(String id, String office, StreamConsumer consumer, @Nullable Long offset, @Nullable Long end) { + String key = (office == null || office.isEmpty()) ? findFirstKeyById(id) : buildName(office, id); + if (key == null) { + throw new NotFoundException("Could not find blob with id:" + id + " in office:" + office); + } try { - if (k == null) { - try { - consumer.accept(null, null); - } catch (Exception e) { - throw new RuntimeException(e); - } - return; - } - logger.atFine().log("Getting stat for %s", k); - // Stat first to get content type and size + logger.atFine().log("Getting stat for %s", key); + StatObjectResponse stat = client.statObject(StatObjectArgs.builder() .bucket(requiredBucket()) - .object(k) + .object(key) .build()); String mediaType = stat.contentType() != null ? stat.contentType() : "application/octet-stream"; - - try (InputStream is = client.getObject(GetObjectArgs.builder() - .bucket(requiredBucket()) - .object(k) - .build())) { - // Its too bad this has to readFully - future optimization can skip ahead - // b/c the consumer really just wants to get the stream out of the blob. - byte[] data = readFully(is); - SerialBlob blob = new SerialBlob(data); - consumer.accept(blob, mediaType); - } catch (Exception e) { - throw new RuntimeException(e); - } + long totalLength = stat.size(); + + streamToConsumer(key, consumer, offset, end, mediaType, totalLength); } catch (ErrorResponseException ere) { - if ("NoSuchKey".equalsIgnoreCase(ere.errorResponse().code())) { - try { - // We could also just throw a NotFoundException. - // BlobController suggests consumer.accept(null, null); will handle things. - consumer.accept(null, null); - } catch (Exception e) { - throw new RuntimeException(e); - } - return; + if (NO_SUCH_KEY.equalsIgnoreCase(ere.errorResponse().code())) { + throw new NotFoundException("Could not find blob with id:" + id + " in office:" + office); } throw new RuntimeException(ere); - } catch (Exception e) { + } catch (ServerException | InternalException | XmlParserException | InvalidResponseException | + InvalidKeyException | NoSuchAlgorithmException | IOException | InsufficientDataException | + SQLException e) { throw new RuntimeException(e); } } + private void streamToConsumer(String name, StreamConsumer consumer, @Nullable Long offset, @Nullable Long end, + String mediaType, long totalLength) throws SQLException, IOException { + + if(offset != null && end != null){ + long[] startEnd = RangeParser.interpret(new long[]{offset, end}, totalLength); + offset = startEnd[0]; + end = startEnd[1]; + } + + GetObjectArgs.Builder builder = GetObjectArgs.builder() + .bucket(requiredBucket()) + .object(name); + if(offset != null ) { + builder = builder.offset(offset); + } else { + offset = 0L; + } + + if(end != null && end > 0) { + long length = end - offset + 1; + builder = builder.length(length); + } + + try (InputStream is = client.getObject(builder.build())) { + consumer.accept(is, offset, mediaType, totalLength); + } catch (ServerException | InsufficientDataException e) { + throw new IOException(e); + } catch (InvalidKeyException e) { + throw new NotFoundException(e); + } catch (ErrorResponseException | NoSuchAlgorithmException | InvalidResponseException | XmlParserException | + InternalException e) { + throw new RuntimeException(e); + } + } + + @Override - public void create(Blob blob, boolean failIfExists, boolean ignoreNulls) { - String k = key(blob.getOfficeId(), blob.getId()); + public void create(Blob blob, boolean failIfExists, boolean ignoreNulls) { + String name = buildName(blob.getOfficeId(), blob.getId()); if (failIfExists) { try { client.statObject(StatObjectArgs.builder() .bucket(requiredBucket()) - .object(k) + .object(name) .build()); - throw new AlreadyExists("Blob already exists: " + k, null); + throw new AlreadyExists("Blob already exists: " + name, null); } catch (ErrorResponseException ere) { - if (!"NoSuchKey".equalsIgnoreCase(ere.errorResponse().code())) { + if (!NO_SUCH_KEY.equalsIgnoreCase(ere.errorResponse().code())) { throw new RuntimeException(ere); } - } catch (Exception e) { + } catch (ServerException | InsufficientDataException | IOException | NoSuchAlgorithmException | + InternalException | XmlParserException | InvalidResponseException | InvalidKeyException e) { throw new RuntimeException(e); } } - - // TODO: Figure out which of these can be something better. + try { - doPut(blob, k, ignoreNulls); - } catch (ServerException e) { - throw new RuntimeException(e); - } catch (InsufficientDataException e) { - throw new RuntimeException(e); - } catch (ErrorResponseException e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e); - } catch (InvalidKeyException e) { - throw new RuntimeException(e); - } catch (InvalidResponseException e) { - throw new RuntimeException(e); - } catch (XmlParserException e) { - throw new RuntimeException(e); - } catch (InternalException e) { + doPut(blob, name, ignoreNulls); + } catch (ServerException | InsufficientDataException | ErrorResponseException | NoSuchAlgorithmException | + InvalidKeyException | InvalidResponseException | XmlParserException | InternalException | IOException e) { throw new RuntimeException(e); } } @Override public void update(Blob blob, boolean ignoreNulls) { - String k = key(blob.getOfficeId(), blob.getId()); - // For updatemake sure it exists first + String name = buildName(blob.getOfficeId(), blob.getId()); + // For update make sure it exists first try { client.statObject(StatObjectArgs.builder() .bucket(requiredBucket()) - .object(k) + .object(name) .build()); + doPut(blob, name, ignoreNulls); } catch (ErrorResponseException ere) { - if ("NoSuchKey".equalsIgnoreCase(ere.errorResponse().code())) { + if (NO_SUCH_KEY.equalsIgnoreCase(ere.errorResponse().code())) { throw new NotFoundException("Unable to find blob with id " + blob.getId() + " in office " + blob.getOfficeId()); } throw new RuntimeException(ere); - } catch (Exception e) { - throw new RuntimeException(e); - } - - try { - doPut(blob, k, ignoreNulls); - } catch (ServerException e) { - throw new RuntimeException(e); - } catch (InsufficientDataException e) { - throw new RuntimeException(e); - } catch (ErrorResponseException e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e); - } catch (InvalidKeyException e) { - throw new RuntimeException(e); - } catch (InvalidResponseException e) { - throw new RuntimeException(e); - } catch (XmlParserException e) { - throw new RuntimeException(e); - } catch (InternalException e) { + } catch (ServerException | IOException | InsufficientDataException | NoSuchAlgorithmException | + InvalidKeyException | InvalidResponseException | XmlParserException | InternalException e) { throw new RuntimeException(e); } } - private void doPut(Blob blob, String k, boolean ignoreNulls) throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException { + private void doPut(Blob blob, String name, boolean ignoreNulls) throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException { byte[] value = blob.getValue(); if (value == null && ignoreNulls) { return; @@ -303,12 +328,12 @@ private void doPut(Blob blob, String k, boolean ignoreNulls) throws ServerExcept try (InputStream is = new ByteArrayInputStream(value)) { PutObjectArgs.Builder builder = PutObjectArgs.builder() .bucket(requiredBucket()) - .object(k) + .object(name) .stream(is, value.length, -1) .contentType(blob.getMediaTypeId()); if (blob.getDescription() != null) { - builder.userMetadata(java.util.Collections.singletonMap("description", blob.getDescription())); + builder.userMetadata(java.util.Collections.singletonMap(DESCRIPTION, blob.getDescription())); } client.putObject(builder.build()); @@ -317,38 +342,41 @@ private void doPut(Blob blob, String k, boolean ignoreNulls) throws ServerExcept @Override public void delete(String office, String id) { - String k = key(office, id); + String name = buildName(office, id); try { client.removeObject(RemoveObjectArgs.builder() .bucket(requiredBucket()) - .object(k) + .object(name) .build()); - } catch (Exception e) { + } catch (ServerException | XmlParserException | ErrorResponseException | InsufficientDataException | + IOException | NoSuchAlgorithmException | InvalidKeyException | InvalidResponseException | + InternalException e) { throw new RuntimeException(e); } } private String findFirstKeyById(String id) { String targetSuffix = "/" + normalizeId(id).toUpperCase(Locale.ROOT); - try { - ListObjectsArgs args = ListObjectsArgs.builder() - .bucket(requiredBucket()) - .recursive(true) - .build(); - for (Result res : client.listObjects(args)) { - try { - Item item = res.get(); - String name = item.objectName(); - if (name.toUpperCase(Locale.ROOT).endsWith(targetSuffix)) { - return name; - } - } catch (Exception ignore) { + + ListObjectsArgs args = ListObjectsArgs.builder() + .bucket(requiredBucket()) + .recursive(true) + .build(); + for (Result res : client.listObjects(args)) { + try { + Item item = res.get(); + String name = item.objectName(); + if (name.toUpperCase(Locale.ROOT).endsWith(targetSuffix)) { + return name; } + } catch (ErrorResponseException | InsufficientDataException | XmlParserException | ServerException | + NoSuchAlgorithmException | IOException | InvalidResponseException | InvalidKeyException | + InternalException e) { + throw new RuntimeException(e); } - return null; - } catch (Exception e) { - throw new RuntimeException(e); + } + return null; } private static String officeFromKey(String key) { @@ -369,7 +397,7 @@ private String requiredBucket() { return bucket; } - private static String key(String office, String id) { + private static String buildName(String office, String id) { String off = office == null ? "" : office.toUpperCase(Locale.ROOT); String nid = normalizeId(id).toUpperCase(Locale.ROOT); String fullKey = off + "/" + nid; @@ -404,13 +432,5 @@ private static String normalizeId(String id) { return sb.toString(); } - private static byte[] readFully(InputStream is) throws Exception { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - byte[] buf = new byte[8192]; - int r; - while ((r = is.read(buf)) != -1) { - baos.write(buf, 0, r); - } - return baos.toByteArray(); - } + } diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/StreamConsumer.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/StreamConsumer.java new file mode 100644 index 000000000..8e337ab41 --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/StreamConsumer.java @@ -0,0 +1,10 @@ +package cwms.cda.data.dao; + +import java.io.IOException; +import java.io.InputStream; +import java.sql.SQLException; + +@FunctionalInterface +public interface StreamConsumer { + void accept(InputStream stream, long inputStreamPosition, String mediaType, long totalLength) throws SQLException, IOException; +} \ No newline at end of file diff --git a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java index 854bbdf15..70bb51909 100644 --- a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java +++ b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java @@ -3,7 +3,6 @@ import cwms.cda.features.CdaFeatures; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.Extension; @@ -15,19 +14,18 @@ @ExtendWith(BlobControllerObjectStorageTestIT.FeatureEnableExtension.class) public class BlobControllerObjectStorageTestIT extends BlobControllerTestIT{ - static boolean wasActive; + static class FeatureEnableExtension implements Extension, BeforeAllCallback { + @Override + public void beforeAll(ExtensionContext context) { + setObjectStoreProperties(); + } + } - // I need this to happen before super.BeforeAll is run so that the create will create into Object-store version -// @BeforeAll -// public static void setup() throws Exception { -// setObjectStoreProperties(); -// -// // now call the method that the super calls. -// createExistingBlob(); -// } + static boolean wasActive; private static void setObjectStoreProperties() { + // This is to get the feature enabled before we try and use the controller FeatureManager featureManager = FeatureContext.getFeatureManager(); wasActive=featureManager.isActive(CdaFeatures.USE_OBJECT_STORAGE_BLOBS); featureManager.enable(CdaFeatures.USE_OBJECT_STORAGE_BLOBS); @@ -51,18 +49,4 @@ public static void teardown() { } - @Override - @Test - void test_create_getOne() - { - super.test_create_getOne(); - } - - static class FeatureEnableExtension implements Extension, BeforeAllCallback { - - @Override - public void beforeAll(ExtensionContext context) { - setObjectStoreProperties(); - } - } } diff --git a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerTestIT.java b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerTestIT.java index 82c44eb5a..952bab67d 100644 --- a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerTestIT.java +++ b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerTestIT.java @@ -129,6 +129,35 @@ void test_blob_get_one_default() void test_blob_range() { // We can now do Range requests! + // Our example blob above has a value "test value" + + // If we ask for byte 0 to the _end_ that is like a normal request for the whole file, we should get "test value" + given() + .log().ifValidationFails(LogDetail.ALL, true) + .queryParam(Controllers.OFFICE, SPK) + .header("Range", " bytes=0-") + .when() + .get("/blobs/" + EXISTING_BLOB_ID) + .then() + .log().ifValidationFails(LogDetail.ALL,true) + .assertThat() + .statusCode(is(HttpServletResponse.SC_PARTIAL_CONTENT)) + .body( is(EXISTING_BLOB_VALUE)); + + // Our test value is 10 bytes so if we ask for 0-9 we should get the whole value + given() + .log().ifValidationFails(LogDetail.ALL, true) + .queryParam(Controllers.OFFICE, SPK) + .header("Range", " bytes=0-9") + .when() + .get("/blobs/" + EXISTING_BLOB_ID) + .then() + .log().ifValidationFails(LogDetail.ALL,true) + .assertThat() + .statusCode(is(HttpServletResponse.SC_PARTIAL_CONTENT)) + .body( is(EXISTING_BLOB_VALUE)); + + // If we ask for byte 3 to the end we should get "t value" given() .log().ifValidationFails(LogDetail.ALL, true) .queryParam(Controllers.OFFICE, SPK) @@ -140,6 +169,19 @@ void test_blob_range() .assertThat() .statusCode(is(HttpServletResponse.SC_PARTIAL_CONTENT)) .body( is("t value")); + + // If we ask for byte 3 to 7 we should get "t val" + given() + .log().ifValidationFails(LogDetail.ALL, true) + .queryParam(Controllers.OFFICE, SPK) + .header("Range", " bytes=3-7") + .when() + .get("/blobs/" + EXISTING_BLOB_ID) + .then() + .log().ifValidationFails(LogDetail.ALL,true) + .assertThat() + .statusCode(is(HttpServletResponse.SC_PARTIAL_CONTENT)) + .body( is("t val")); } @Test diff --git a/cwms-data-api/src/test/java/cwms/cda/api/RangeParserTest.java b/cwms-data-api/src/test/java/cwms/cda/api/RangeParserTest.java index 75ca185b8..05452447a 100644 --- a/cwms-data-api/src/test/java/cwms/cda/api/RangeParserTest.java +++ b/cwms-data-api/src/test/java/cwms/cda/api/RangeParserTest.java @@ -104,7 +104,26 @@ void testInterpret(){ assertArrayEquals(new long[]{8L, 99L}, RangeParser.interpret(new long[]{8L, 100L}, 100)); assertArrayEquals(new long[]{8L, 99L}, RangeParser.interpret(new long[]{8L, 200L}, 100)); + // typical resume bytes=10- + assertArrayEquals(new long[]{10L, 99L}, RangeParser.interpret(new long[]{10L, -1L}, 100)); + + // bytes=0-0 + assertArrayEquals(new long[]{0L, 0L}, RangeParser.interpret(new long[]{0L, 0L}, 100)); + // bytes=-1 + assertArrayEquals(new long[]{99L, 99L}, RangeParser.interpret(new long[]{-1L, 1L}, 100)); + + // bytes=-50 + assertArrayEquals(new long[]{50L, 99L}, RangeParser.interpret(new long[]{-1L, 50L}, 100)); + + } + + @Test + void testInvalidInterp() { + // They requested 100-200 but our file is 100 long (only 0-99). + assertThrows(IllegalArgumentException.class, () -> RangeParser.interpret(new long[]{100L, 200L}, 100)); + assertThrows(IllegalArgumentException.class, () -> RangeParser.interpret(new long[]{200L, 100L}, 100)); + assertThrows(IllegalArgumentException.class, () -> RangeParser.interpret(new long[]{100L, 100L}, 100)); + } -// probably invalid assertArrayEquals(new long[]{8L, 100L}, RangeParser.interpret(new long[]{100L, 200L}, 100)); } diff --git a/cwms-data-api/src/test/java/cwms/cda/data/dao/BlobDaoTest.java b/cwms-data-api/src/test/java/cwms/cda/data/dao/BlobDaoTest.java new file mode 100644 index 000000000..59e5fc892 --- /dev/null +++ b/cwms-data-api/src/test/java/cwms/cda/data/dao/BlobDaoTest.java @@ -0,0 +1,33 @@ +package cwms.cda.data.dao; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +class BlobDaoTest { + + @Test + void testGetLength() { + // Full range (end=null): offset=0, end=null, totalLength=10 -> length=10 + assertEquals(10L, BlobDao.getLength(0L, null, 10L)); + + // suffix but we expect method to treat end as full end not the suffix version. + assertEquals(10L, BlobDao.getLength(0L, -1L, 10L)); + + assertEquals(1L, BlobDao.getLength(0L, 0L, 10L)); + assertEquals(2L, BlobDao.getLength(0L, 1L, 10L)); + + // Normal range: offset=0, end=9, totalLength=10 -> length=10 + assertEquals(10L, BlobDao.getLength(0L, 9L, 10L)); + + // Sub range from middle: offset=5, end=9, totalLength=10 -> length=5 + assertEquals(5L, BlobDao.getLength(5L, 9L, 10L)); + + // Offset from middle, end is null: offset=50, end=null, totalLength=10 -> length=50 + assertEquals(6L, BlobDao.getLength(4L, null, 10L)); + assertEquals(6L, BlobDao.getLength(4L, 9L, 10L)); + + assertEquals(1L, BlobDao.getLength(-1L, 1L, 10L)); + + } +} From de7aded5a393c30768ca68c46c38a6c889e4a4db Mon Sep 17 00:00:00 2001 From: rma-rripken <89810919+rma-rripken@users.noreply.github.com> Date: Fri, 2 Jan 2026 17:46:01 -0800 Subject: [PATCH 04/11] Removing check for "chunked" from test and looking for Accept-Range and Content-Length headers instead. --- .../java/cwms/cda/api/BlobController.java | 2 + .../java/cwms/cda/api/ClobController.java | 23 +++--- .../main/java/cwms/cda/data/dao/ClobDao.java | 37 ++++----- .../cwms/cda/api/ClobControllerTestIT.java | 76 +++++++++++++++---- .../api/ForecastInstanceControllerTestIT.java | 6 +- .../api/TextTimeSeriesControllerTestIT.java | 3 +- 6 files changed, 99 insertions(+), 48 deletions(-) diff --git a/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java b/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java index a4b301b18..e675731dc 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java @@ -200,6 +200,8 @@ public void getOne(@NotNull Context ctx, @NotNull String blobId) { end = null; } + ctx.header(Header.ACCEPT_RANGES, "bytes"); + StreamConsumer consumer = (is, isPosition, mediaType, totalLength) -> { if (is == null) { ctx.status(HttpServletResponse.SC_NOT_FOUND).json(new CdaError("Unable to find " diff --git a/cwms-data-api/src/main/java/cwms/cda/api/ClobController.java b/cwms-data-api/src/main/java/cwms/cda/api/ClobController.java index 62e5ed6a2..89d858abe 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/ClobController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/ClobController.java @@ -27,6 +27,7 @@ import cwms.cda.api.errors.CdaError; import cwms.cda.data.dao.ClobDao; import cwms.cda.data.dao.JooqDao; +import cwms.cda.data.dao.StreamConsumer; import cwms.cda.data.dto.Clob; import cwms.cda.data.dto.Clobs; import cwms.cda.data.dto.CwmsDTOPaginated; @@ -43,7 +44,7 @@ import io.javalin.plugin.openapi.annotations.OpenApiParam; import io.javalin.plugin.openapi.annotations.OpenApiRequestBody; import io.javalin.plugin.openapi.annotations.OpenApiResponse; -import java.io.InputStream; + import java.util.Objects; import java.util.Optional; import javax.servlet.http.HttpServletResponse; @@ -187,16 +188,16 @@ public void getOne(@NotNull Context ctx, @NotNull String clobId) { if (TEXT_PLAIN.equals(formatHeader)) { // useful cmd: curl -X 'GET' 'http://localhost:7000/cwms-data/clobs/encoded?office=SPK&id=%2FTIME%20SERIES%20TEXT%2F6261044' // -H 'accept: text/plain' --header "Range: bytes=20000-40000" - dao.getClob(clobId, office, c -> { - if (c == null) { - ctx.status(HttpServletResponse.SC_NOT_FOUND).json(new CdaError("Unable to find " - + "clob based on given parameters")); - } else { - try (InputStream is = c.getAsciiStream()) { - RangeRequestUtil.seekableStream(ctx, is, TEXT_PLAIN, c.length()); - } - } - }); + + ctx.header(Header.ACCEPT_RANGES, "bytes"); + + StreamConsumer consumer = (is, isPosition, mediaType, totalLength) -> { + requestResultSize.update(totalLength); + RangeRequestUtil.seekableStream(ctx, is, isPosition, mediaType, totalLength); + }; + + dao.getClob(clobId, office, consumer); + } else { Optional optAc = dao.getByUniqueName(clobId, office); diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/ClobDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/ClobDao.java index 9ee4fbc00..4a6bf188b 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/ClobDao.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/ClobDao.java @@ -22,6 +22,7 @@ import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.Reader; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -220,20 +221,18 @@ public void update(Clob clob, boolean ignoreNulls) { * * @param clobId the id to search for * @param officeId the office - * @param clobConsumer a consumer that should be handed the input stream and the length of the stream. + * @param streamConsumer a consumer that should be handed the input stream and the length of the stream. */ - public void getClob(String clobId, String officeId, ClobConsumer clobConsumer) { - // Not using jOOQ here because we want the java.sql.Clob and not an automatic field binding. We want - // clob so that we can pull out a stream to the data and pass that to javalin. - // If the request included Content-Ranges Javalin can have the stream skip to the correct - // location, which will avoid reading unneeded data. Passing this stream right to the javalin - // response should let CDA return a huge (2Gb) clob to the client without ever holding the entire String - // in memory. - // We can't use the stream once the connection we get from jooq is closed, so we have to pass in - // what we want javalin to do with the stream as a consumer. - // - + public void getClob(String clobId, String officeId, StreamConsumer streamConsumer) { dsl.connection(connection -> { + // Not using jOOQ here because we want the java.sql.Clob and not an automatic field binding. We want + // clob so that we can pull out a stream to the data and pass that to javalin. + // If the request included Content-Ranges Javalin can have the stream skip to the correct + // location, which will avoid reading unneeded data. Passing this stream right to the javalin + // response should let CDA return a huge (2Gb) clob to the client without ever holding the entire String + // in memory. + // We can't use the stream once the connection we get from jooq is closed, so we have to pass in + // what we want javalin to do with the stream as a consumer. try (PreparedStatement preparedStatement = connection.prepareStatement(SELECT_CLOB_QUERY)) { preparedStatement.setString(1, officeId); preparedStatement.setString(2, clobId); @@ -241,13 +240,12 @@ public void getClob(String clobId, String officeId, ClobConsumer clobConsumer) { try (ResultSet resultSet = preparedStatement.executeQuery()) { if (resultSet.next()) { java.sql.Clob clob = resultSet.getClob("VALUE"); + long length = clob.length(); - try { - clobConsumer.accept(clob); + try (InputStream is = clob.getAsciiStream()){ + streamConsumer.accept(is, 0, "text/plain", length); } finally { - if (clob != null) { - clob.free(); - } + clob.free(); } } else { throw new NotFoundException("Unable to find clob with id " + clobId + " in office " + officeId); @@ -269,8 +267,5 @@ public static String readFully(java.sql.Clob clob) throws IOException, SQLExcept } } - @FunctionalInterface - public interface ClobConsumer { - void accept(java.sql.Clob blob) throws SQLException, IOException; - } + } diff --git a/cwms-data-api/src/test/java/cwms/cda/api/ClobControllerTestIT.java b/cwms-data-api/src/test/java/cwms/cda/api/ClobControllerTestIT.java index 654743c7d..f638d94e4 100644 --- a/cwms-data-api/src/test/java/cwms/cda/api/ClobControllerTestIT.java +++ b/cwms-data-api/src/test/java/cwms/cda/api/ClobControllerTestIT.java @@ -13,6 +13,7 @@ import io.restassured.filter.log.LogDetail; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import javax.servlet.http.HttpServletResponse; import org.apache.commons.lang3.RandomStringUtils; @@ -107,7 +108,7 @@ void test_failIfExists() throws Exception { @Test void test_getOne_notFound() throws UnsupportedEncodingException { String clobId = "TEST"; - String urlencoded = URLEncoder.encode(clobId, "UTF-8"); + String urlencoded = URLEncoder.encode(clobId, StandardCharsets.UTF_8); given() .log().ifValidationFails(LogDetail.ALL, true) @@ -150,19 +151,68 @@ of the object name (NOTE: good candidate for actually having a GUID or other "co void test_getOne_plainText_withRange() { // We can now do Range requests! + + // If we ask for byte 0 to the _end_ that is like a normal request for the whole file, we should get "test value" given() - .accept("text/plain") - .log().ifValidationFails(LogDetail.ALL, true) - .queryParam(Controllers.OFFICE, SPK) - .queryParam(Controllers.CLOB_ID, EXISTING_CLOB_ID) - .header("Range"," bytes=3-") - .when() - .get("/clobs/ignored") - .then() - .log().ifValidationFails(LogDetail.ALL, true) - .assertThat() - .statusCode(is(HttpServletResponse.SC_PARTIAL_CONTENT)) - .body( is("t value")); + .accept("text/plain") + .log().ifValidationFails(LogDetail.ALL, true) + .queryParam(Controllers.OFFICE, SPK) + .queryParam(Controllers.CLOB_ID, EXISTING_CLOB_ID) + .header("Range", " bytes=0-") + .when() + .get("/clobs/ignored") + .then() + .log().ifValidationFails(LogDetail.ALL,true) + .assertThat() + .statusCode(is(HttpServletResponse.SC_PARTIAL_CONTENT)) + .body( is(EXISTING_CLOB_VALUE)); + + // Our test value is 10 bytes so if we ask for 0-9 we should get the whole value + given() + .accept("text/plain") + .log().ifValidationFails(LogDetail.ALL, true) + .queryParam(Controllers.OFFICE, SPK) + .queryParam(Controllers.CLOB_ID, EXISTING_CLOB_ID) + .header("Range", " bytes=0-9") + .when() + .get("/clobs/ignored") + .then() + .log().ifValidationFails(LogDetail.ALL,true) + .assertThat() + .statusCode(is(HttpServletResponse.SC_PARTIAL_CONTENT)) + .body( is(EXISTING_CLOB_VALUE)); + + // If we ask for byte 3 to the end we should get "t value" + given() + .accept("text/plain") + .log().ifValidationFails(LogDetail.ALL, true) + .queryParam(Controllers.OFFICE, SPK) + .queryParam(Controllers.CLOB_ID, EXISTING_CLOB_ID) + .header("Range", " bytes=3-") + .when() + .get("/clobs/ignored") + .then() + .log().ifValidationFails(LogDetail.ALL,true) + .assertThat() + .statusCode(is(HttpServletResponse.SC_PARTIAL_CONTENT)) + .body( is("t value")); + + // If we ask for byte 3 to 7 we should get "t val" + given() + .accept("text/plain") + .log().ifValidationFails(LogDetail.ALL, true) + .queryParam(Controllers.OFFICE, SPK) + .queryParam(Controllers.CLOB_ID, EXISTING_CLOB_ID) + .header("Range", " bytes=3-7") + .when() + .get("/clobs/ignored") + .then() + .log().ifValidationFails(LogDetail.ALL,true) + .assertThat() + .statusCode(is(HttpServletResponse.SC_PARTIAL_CONTENT)) + .body( is("t val")); + + } @Test diff --git a/cwms-data-api/src/test/java/cwms/cda/api/ForecastInstanceControllerTestIT.java b/cwms-data-api/src/test/java/cwms/cda/api/ForecastInstanceControllerTestIT.java index 52f00ca68..a9ffe7fb7 100644 --- a/cwms-data-api/src/test/java/cwms/cda/api/ForecastInstanceControllerTestIT.java +++ b/cwms-data-api/src/test/java/cwms/cda/api/ForecastInstanceControllerTestIT.java @@ -389,7 +389,8 @@ void test_large_file_download_null_designator(String format) throws IOException, .log().ifValidationFails(LogDetail.ALL, true) .assertThat() .statusCode(is(HttpServletResponse.SC_OK)) - .header("Transfer-Encoding", equalTo("chunked")) + .header("Accept-Ranges", equalTo("bytes")) + .header("Content-Length", not(isEmptyOrNullString())) .contentType(equalTo("text/plain")) .extract() .response() @@ -507,7 +508,8 @@ void test_large_file_download(String format) throws IOException, URISyntaxExcept .log().ifValidationFails(LogDetail.ALL, true) .assertThat() .statusCode(is(HttpServletResponse.SC_OK)) - .header("Transfer-Encoding", equalTo("chunked")) + .header("Accept-Ranges", equalTo("bytes")) + .header("Content-Length", not(isEmptyOrNullString())) .contentType(equalTo("text/plain")) .extract() .response() diff --git a/cwms-data-api/src/test/java/cwms/cda/api/TextTimeSeriesControllerTestIT.java b/cwms-data-api/src/test/java/cwms/cda/api/TextTimeSeriesControllerTestIT.java index 358190eac..c2f631059 100644 --- a/cwms-data-api/src/test/java/cwms/cda/api/TextTimeSeriesControllerTestIT.java +++ b/cwms-data-api/src/test/java/cwms/cda/api/TextTimeSeriesControllerTestIT.java @@ -617,7 +617,8 @@ void test_large_data_url(String format) throws Exception { .log().ifValidationFails(LogDetail.ALL, true) .assertThat() .statusCode(is(HttpServletResponse.SC_OK)) - .header("Transfer-Encoding", equalTo("chunked")) + .header("Accept-Ranges", equalTo("bytes")) + .header("Content-Length", not(isEmptyOrNullString())) .contentType(equalTo("text/plain")) .extract() .response() From 841d9101b29a70dae60554261b40adaa55adb6a0 Mon Sep 17 00:00:00 2001 From: rma-rripken <89810919+rma-rripken@users.noreply.github.com> Date: Fri, 2 Jan 2026 18:06:54 -0800 Subject: [PATCH 05/11] Return Accept-Range so that browsers will know we support it. --- .../api/BinaryTimeSeriesValueController.java | 3 +++ .../cwms/cda/api/ForecastFileController.java | 2 ++ .../api/TextTimeSeriesValueController.java | 26 +++++++------------ .../api/BinaryTimeSeriesControllerTestIT.java | 2 +- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java b/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java index 0fb4c5471..fb66e766c 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java @@ -30,6 +30,7 @@ import cwms.cda.api.errors.CdaError; import cwms.cda.data.dao.BlobDao; import cwms.cda.data.dao.StreamConsumer; +import io.javalin.core.util.Header; import io.javalin.http.Context; import io.javalin.http.Handler; import io.javalin.plugin.openapi.annotations.OpenApi; @@ -93,6 +94,8 @@ public void handle(@NotNull Context ctx) { String officeId = requiredParam(ctx, OFFICE); DSLContext dsl = getDslContext(ctx); + ctx.header(Header.ACCEPT_RANGES, "bytes"); + final Long offset; final Long end ; long[] ranges = RangeParser.parseFirstRange(ctx.header(io.javalin.core.util.Header.RANGE)); diff --git a/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java b/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java index 8224ab925..d41274d87 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java @@ -31,6 +31,7 @@ import cwms.cda.data.dao.ForecastInstanceDao; import cwms.cda.data.dao.StreamConsumer; import cwms.cda.helpers.DateUtils; +import io.javalin.core.util.Header; import io.javalin.http.Context; import io.javalin.http.Handler; import io.javalin.plugin.openapi.annotations.OpenApi; @@ -109,6 +110,7 @@ public void handle(@NotNull Context ctx) { + "blob based on given parameters")); } else { requestResultSize.update(totalLength); + ctx.header(Header.ACCEPT_RANGES, "bytes"); RangeRequestUtil.seekableStream(ctx, is, isPosition, mediaType, totalLength); } }; diff --git a/cwms-data-api/src/main/java/cwms/cda/api/TextTimeSeriesValueController.java b/cwms-data-api/src/main/java/cwms/cda/api/TextTimeSeriesValueController.java index 72857b8bd..7702b4006 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/TextTimeSeriesValueController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/TextTimeSeriesValueController.java @@ -27,8 +27,9 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import cwms.cda.api.errors.CdaError; import cwms.cda.data.dao.ClobDao; +import cwms.cda.data.dao.StreamConsumer; +import io.javalin.core.util.Header; import io.javalin.http.Context; import io.javalin.http.Handler; import io.javalin.plugin.openapi.annotations.OpenApi; @@ -37,9 +38,6 @@ import io.javalin.plugin.openapi.annotations.OpenApiResponse; import org.jooq.DSLContext; -import javax.servlet.http.HttpServletResponse; -import java.io.InputStream; - import static com.codahale.metrics.MetricRegistry.name; import static cwms.cda.api.Controllers.*; import static cwms.cda.data.dao.JooqDao.getDslContext; @@ -92,18 +90,14 @@ public void handle(Context ctx) { try (Timer.Context ignored = markAndTime(GET_ALL)) { DSLContext dsl = getDslContext(ctx); ClobDao clobDao = new ClobDao(dsl); - clobDao.getClob(textId, officeId, clob -> { - if (clob == null) { - ctx.status(HttpServletResponse.SC_NOT_FOUND).json(new CdaError("Unable to find " - + "clob based on given parameters")); - } else { - long size = clob.length(); - requestResultSize.update(size); - try(InputStream is = clob.getAsciiStream()){ - RangeRequestUtil.seekableStream(ctx, is, TEXT_PLAIN, size); - } - } - }); + + StreamConsumer consumer = (is, isPosition, mediaType, totalLength) -> { + requestResultSize.update(totalLength); + ctx.header(Header.ACCEPT_RANGES, "bytes"); + RangeRequestUtil.seekableStream(ctx, is, isPosition, mediaType, totalLength); + }; + clobDao.getClob(textId, officeId, consumer); + } } } diff --git a/cwms-data-api/src/test/java/cwms/cda/api/BinaryTimeSeriesControllerTestIT.java b/cwms-data-api/src/test/java/cwms/cda/api/BinaryTimeSeriesControllerTestIT.java index ea7f521cf..b41c4198a 100644 --- a/cwms-data-api/src/test/java/cwms/cda/api/BinaryTimeSeriesControllerTestIT.java +++ b/cwms-data-api/src/test/java/cwms/cda/api/BinaryTimeSeriesControllerTestIT.java @@ -790,7 +790,7 @@ void test_large_data_url(String format) throws Exception { .log().ifValidationFails(LogDetail.ALL, true) .assertThat() .statusCode(is(HttpServletResponse.SC_OK)) - .header("Transfer-Encoding", equalTo("chunked")) +// .header("Transfer-Encoding", equalTo("chunked")) .contentType(equalTo("application/octet-stream")) .extract() .response() From 48b30a812cada18ecd45989d8e2a336bc4f65ef4 Mon Sep 17 00:00:00 2001 From: rma-rripken <89810919+rma-rripken@users.noreply.github.com> Date: Mon, 5 Jan 2026 10:41:17 -0800 Subject: [PATCH 06/11] Use AutoService to have CdaFeatureManagerProvider register as a FeatureManagerProvider. --- .../main/java/cwms/cda/features/CdaFeatureManagerProvider.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cwms-data-api/src/main/java/cwms/cda/features/CdaFeatureManagerProvider.java b/cwms-data-api/src/main/java/cwms/cda/features/CdaFeatureManagerProvider.java index 284a63a7f..a952a77b8 100644 --- a/cwms-data-api/src/main/java/cwms/cda/features/CdaFeatureManagerProvider.java +++ b/cwms-data-api/src/main/java/cwms/cda/features/CdaFeatureManagerProvider.java @@ -1,11 +1,13 @@ package cwms.cda.features; +import com.google.auto.service.AutoService; import org.togglz.core.manager.FeatureManager; import org.togglz.core.manager.FeatureManagerBuilder; import org.togglz.core.repository.file.FileBasedStateRepository; import java.io.File; import org.togglz.core.spi.FeatureManagerProvider; +@AutoService(FeatureManagerProvider.class) public class CdaFeatureManagerProvider implements FeatureManagerProvider { public static final String DEFAULT_PROPERTIES_FILE = "features.properties"; public static final String PROPERTIES_FILE = "properties.file"; From 1b40360a2ed99030d11769de57f5d33fac79cc13 Mon Sep 17 00:00:00 2001 From: rma-rripken <89810919+rma-rripken@users.noreply.github.com> Date: Mon, 5 Jan 2026 12:20:26 -0800 Subject: [PATCH 07/11] Adding dependencies so that Minio-setup gets run once during docker-compose --- docker-compose.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index 1ef5cb2d0..6534d504c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -57,6 +57,8 @@ services: condition: service_healthy minio: condition: service_healthy + minio-setup: + condition: service_completed_successfully image: cwms-rest-api:local-dev build: target: api @@ -191,6 +193,7 @@ services: minio-setup: image: minio/mc:latest + restart: "no" depends_on: minio: condition: service_healthy From a5216edb295b9238e8e68f4aece661535e199d74 Mon Sep 17 00:00:00 2001 From: rma-rripken <89810919+rma-rripken@users.noreply.github.com> Date: Mon, 5 Jan 2026 13:14:17 -0800 Subject: [PATCH 08/11] Adding dependencies so that Minio-setup gets run once during docker-compose --- .../BlobControllerObjectStorageTestIT.java | 58 +++++++++++++++++-- 1 file changed, 53 insertions(+), 5 deletions(-) diff --git a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java index 70bb51909..c55eb0992 100644 --- a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java +++ b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java @@ -7,6 +7,9 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.Extension; import org.junit.jupiter.api.extension.ExtensionContext; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; import org.togglz.core.context.FeatureContext; import org.togglz.core.manager.FeatureManager; @@ -14,28 +17,73 @@ @ExtendWith(BlobControllerObjectStorageTestIT.FeatureEnableExtension.class) public class BlobControllerObjectStorageTestIT extends BlobControllerTestIT{ + public static final String MINIO_ADMIN = "minio_admin"; + public static final String MINIO_ADMIN_SECRET = "saersdbewadfqewrbwreq12rfgweqrffw52354ec"; + public static final String IMAGE_NAME = "minio/minio:RELEASE.2025-04-22T22-12-26Z"; + public static final int PORT = 9000; + private static final GenericContainer MINIO_CONTAINER = new GenericContainer<>(DockerImageName.parse(IMAGE_NAME)) + .withExposedPorts(PORT) + .withEnv("MINIO_ROOT_USER", MINIO_ADMIN) + .withEnv("MINIO_ROOT_PASSWORD", MINIO_ADMIN_SECRET) + .withCommand("server /data") + .waitingFor(Wait.forHttp("/minio/health/live").forPort(PORT)); + public static final String BUCKET = "cwms-test"; + public static final String MINIO_USER = "cda_user"; + public static final String MINIO_USER_SECRET = "cda_password"; + public static final String CONTAINER_NAME = "myminio"; + + static class FeatureEnableExtension implements Extension, BeforeAllCallback { @Override public void beforeAll(ExtensionContext context) { + if (!MINIO_CONTAINER.isRunning()) { + MINIO_CONTAINER.start(); + setupMinioResources(); + } + + setObjectStoreProperties(); } } + private static void setupMinioResources() { + try { + String address = "http://" + MINIO_CONTAINER.getHost() + ":" + MINIO_CONTAINER.getMappedPort(PORT); + GenericContainer mc = new GenericContainer<>(DockerImageName.parse("minio/mc:latest")) + .withCreateContainerCmdModifier(cmd -> cmd.withEntrypoint("/bin/sh", "-c")) + .withCommand( + String.format("mc alias set %s %s %s %s;" , CONTAINER_NAME, address, MINIO_ADMIN, MINIO_ADMIN_SECRET) + + String.format(" mc admin user add %s %s %s;", CONTAINER_NAME, MINIO_USER, MINIO_USER_SECRET) + + String.format(" mc mb --ignore-existing %s/%s;", CONTAINER_NAME, BUCKET) + + String.format(" mc admin policy attach %s readwrite --user %s;", CONTAINER_NAME, MINIO_USER) + ) + .withAccessToHost(true); + mc.start(); + // wait for setup to complete + while (mc.isRunning()) { Thread.sleep(100); } + } catch (Exception e) { + throw new RuntimeException("Failed to setup MinIO resources", e); + } + } + static boolean wasActive; private static void setObjectStoreProperties() { // This is to get the feature enabled before we try and use the controller FeatureManager featureManager = FeatureContext.getFeatureManager(); + wasActive=featureManager.isActive(CdaFeatures.USE_OBJECT_STORAGE_BLOBS); featureManager.enable(CdaFeatures.USE_OBJECT_STORAGE_BLOBS); featureManager.isActive(CdaFeatures.USE_OBJECT_STORAGE_BLOBS); - // TODO: Need to figure out a cleaner way to do this - System.setProperty("blob.store.endpoint", "http://127.0.0.1:9000"); - System.setProperty("blob.store.bucket", "cwms-test"); - System.setProperty("blob.store.accessKey", "cda_user"); - System.setProperty("blob.store.secretKey", "cda_password"); + String host = MINIO_CONTAINER.getHost(); + Integer port = MINIO_CONTAINER.getMappedPort(PORT); + + System.setProperty("blob.store.endpoint", "http://" + host + ":" + port); + System.setProperty("blob.store.bucket", BUCKET); + System.setProperty("blob.store.accessKey", MINIO_USER); + System.setProperty("blob.store.secretKey", MINIO_USER_SECRET); } @AfterAll From 8059c90480452bce79d8efe4d1c0ff729ab0ee4e Mon Sep 17 00:00:00 2001 From: rma-rripken <89810919+rma-rripken@users.noreply.github.com> Date: Mon, 5 Jan 2026 14:12:10 -0800 Subject: [PATCH 09/11] Additional logging and checks to make sure minio setup has completed. --- .../BlobControllerObjectStorageTestIT.java | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java index c55eb0992..3ecec6f71 100644 --- a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java +++ b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java @@ -1,5 +1,6 @@ package cwms.cda.api; +import com.google.common.flogger.FluentLogger; import cwms.cda.features.CdaFeatures; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Tag; @@ -7,7 +8,9 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.Extension; import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; import org.togglz.core.context.FeatureContext; @@ -16,20 +19,21 @@ @Tag("integration") @ExtendWith(BlobControllerObjectStorageTestIT.FeatureEnableExtension.class) public class BlobControllerObjectStorageTestIT extends BlobControllerTestIT{ - + FluentLogger logger = FluentLogger.forEnclosingClass(); public static final String MINIO_ADMIN = "minio_admin"; public static final String MINIO_ADMIN_SECRET = "saersdbewadfqewrbwreq12rfgweqrffw52354ec"; public static final String IMAGE_NAME = "minio/minio:RELEASE.2025-04-22T22-12-26Z"; public static final int PORT = 9000; + public static final String MINIO_USER = "cda_user"; + public static final String MINIO_USER_SECRET = "cda_password"; private static final GenericContainer MINIO_CONTAINER = new GenericContainer<>(DockerImageName.parse(IMAGE_NAME)) .withExposedPorts(PORT) .withEnv("MINIO_ROOT_USER", MINIO_ADMIN) .withEnv("MINIO_ROOT_PASSWORD", MINIO_ADMIN_SECRET) .withCommand("server /data") .waitingFor(Wait.forHttp("/minio/health/live").forPort(PORT)); + public static final String BUCKET = "cwms-test"; - public static final String MINIO_USER = "cda_user"; - public static final String MINIO_USER_SECRET = "cda_password"; public static final String CONTAINER_NAME = "myminio"; @@ -58,10 +62,24 @@ private static void setupMinioResources() { String.format(" mc mb --ignore-existing %s/%s;", CONTAINER_NAME, BUCKET) + String.format(" mc admin policy attach %s readwrite --user %s;", CONTAINER_NAME, MINIO_USER) ) + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("minio-mc"))) .withAccessToHost(true); mc.start(); - // wait for setup to complete - while (mc.isRunning()) { Thread.sleep(100); } + + long startTime = System.currentTimeMillis(); + while (mc.isRunning() && (System.currentTimeMillis() - startTime) < 10000) { + Thread.sleep(100); + } + + if (mc.isRunning()) { + throw new RuntimeException("MinIO setup timed out after 10 seconds"); + } + + // Check if it exited successfully (0) + if (mc.getContainerInfo().getState().getExitCodeLong() != 0) { + throw new RuntimeException("MinIO setup failed with exit code: " + + mc.getContainerInfo().getState().getExitCodeLong()); + } } catch (Exception e) { throw new RuntimeException("Failed to setup MinIO resources", e); } From 88e6e0c032f465dd06c8fd8cb837f104f07ae380 Mon Sep 17 00:00:00 2001 From: rma-rripken <89810919+rma-rripken@users.noreply.github.com> Date: Mon, 5 Jan 2026 14:53:50 -0800 Subject: [PATCH 10/11] Running setup script via withCommand to ensure the semicolons are interpreted by sh --- .../BlobControllerObjectStorageTestIT.java | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java index 3ecec6f71..faa134cbe 100644 --- a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java +++ b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java @@ -19,7 +19,7 @@ @Tag("integration") @ExtendWith(BlobControllerObjectStorageTestIT.FeatureEnableExtension.class) public class BlobControllerObjectStorageTestIT extends BlobControllerTestIT{ - FluentLogger logger = FluentLogger.forEnclosingClass(); + static FluentLogger logger = FluentLogger.forEnclosingClass(); public static final String MINIO_ADMIN = "minio_admin"; public static final String MINIO_ADMIN_SECRET = "saersdbewadfqewrbwreq12rfgweqrffw52354ec"; public static final String IMAGE_NAME = "minio/minio:RELEASE.2025-04-22T22-12-26Z"; @@ -46,35 +46,37 @@ public void beforeAll(ExtensionContext context) { setupMinioResources(); } - setObjectStoreProperties(); } } private static void setupMinioResources() { - try { + String address = "http://" + MINIO_CONTAINER.getHost() + ":" + MINIO_CONTAINER.getMappedPort(PORT); - GenericContainer mc = new GenericContainer<>(DockerImageName.parse("minio/mc:latest")) - .withCreateContainerCmdModifier(cmd -> cmd.withEntrypoint("/bin/sh", "-c")) - .withCommand( - String.format("mc alias set %s %s %s %s;" , CONTAINER_NAME, address, MINIO_ADMIN, MINIO_ADMIN_SECRET) + - String.format(" mc admin user add %s %s %s;", CONTAINER_NAME, MINIO_USER, MINIO_USER_SECRET) + - String.format(" mc mb --ignore-existing %s/%s;", CONTAINER_NAME, BUCKET) + - String.format(" mc admin policy attach %s readwrite --user %s;", CONTAINER_NAME, MINIO_USER) - ) - .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("minio-mc"))) - .withAccessToHost(true); + String setupScript = String.format("mc alias set %s %s %s %s; ", CONTAINER_NAME, address, MINIO_ADMIN, MINIO_ADMIN_SECRET) + + String.format("mc admin user add %s %s %s; ", CONTAINER_NAME, MINIO_USER, MINIO_USER_SECRET) + + String.format("mc mb --ignore-existing %s/%s; ", CONTAINER_NAME, BUCKET) + + String.format("mc admin policy attach %s readwrite --user %s;", CONTAINER_NAME, MINIO_USER); + + try (GenericContainer mc = new GenericContainer<>(DockerImageName.parse("minio/mc:latest")) + // Use the string array version of withCommand to ensure /bin/sh -c gets exactly one string + .withCommand("/bin/sh", "-c", setupScript) + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("minio-mc"))) + .withAccessToHost(true)) { mc.start(); long startTime = System.currentTimeMillis(); - while (mc.isRunning() && (System.currentTimeMillis() - startTime) < 10000) { - Thread.sleep(100); + while (mc.isRunning() && (System.currentTimeMillis() - startTime) < 20000) { + Thread.sleep(200); } if (mc.isRunning()) { - throw new RuntimeException("MinIO setup timed out after 10 seconds"); + mc.stop(); + throw new RuntimeException("MinIO setup timed out"); } + logger.atInfo().log(mc.getLogs()); + // Check if it exited successfully (0) if (mc.getContainerInfo().getState().getExitCodeLong() != 0) { throw new RuntimeException("MinIO setup failed with exit code: " From 222d8051b62b0712352eef5ba2ceae20e58b6e72 Mon Sep 17 00:00:00 2001 From: rma-rripken <89810919+rma-rripken@users.noreply.github.com> Date: Mon, 5 Jan 2026 18:10:41 -0800 Subject: [PATCH 11/11] Putting Minio and sidecar on same network --- .../BlobControllerObjectStorageTestIT.java | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java index faa134cbe..628249d52 100644 --- a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java +++ b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerObjectStorageTestIT.java @@ -10,8 +10,10 @@ import org.junit.jupiter.api.extension.ExtensionContext; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; import org.testcontainers.utility.DockerImageName; import org.togglz.core.context.FeatureContext; import org.togglz.core.manager.FeatureManager; @@ -21,12 +23,17 @@ public class BlobControllerObjectStorageTestIT extends BlobControllerTestIT{ static FluentLogger logger = FluentLogger.forEnclosingClass(); public static final String MINIO_ADMIN = "minio_admin"; - public static final String MINIO_ADMIN_SECRET = "saersdbewadfqewrbwreq12rfgweqrffw52354ec"; + public static final String MINIO_ADMIN_SECRET = "saersdbewadfqewrbwreq12rfgweqrffw52354ec@%fwewEFFWSE"; public static final String IMAGE_NAME = "minio/minio:RELEASE.2025-04-22T22-12-26Z"; public static final int PORT = 9000; public static final String MINIO_USER = "cda_user"; public static final String MINIO_USER_SECRET = "cda_password"; + + private static final Network NETWORK = Network.newNetwork(); + @Container private static final GenericContainer MINIO_CONTAINER = new GenericContainer<>(DockerImageName.parse(IMAGE_NAME)) + .withNetwork(NETWORK) + .withNetworkAliases("minio") .withExposedPorts(PORT) .withEnv("MINIO_ROOT_USER", MINIO_ADMIN) .withEnv("MINIO_ROOT_PASSWORD", MINIO_ADMIN_SECRET) @@ -51,16 +58,20 @@ public void beforeAll(ExtensionContext context) { } private static void setupMinioResources() { + String address = "http://minio:9000"; + + String cmd1 = String.format(" /usr/bin/mc alias set %s %s %s %s", CONTAINER_NAME, address, MINIO_ADMIN, MINIO_ADMIN_SECRET); + String cmd2 = String.format("/usr/bin/mc admin user add %s %s %s", CONTAINER_NAME, MINIO_USER, MINIO_USER_SECRET); + String cmd3 = String.format("/usr/bin/mc mb --ignore-existing %s/%s", CONTAINER_NAME, BUCKET); + String cmd4 = String.format("/usr/bin/mc admin policy attach %s readwrite --user %s", CONTAINER_NAME, MINIO_USER); - String address = "http://" + MINIO_CONTAINER.getHost() + ":" + MINIO_CONTAINER.getMappedPort(PORT); - String setupScript = String.format("mc alias set %s %s %s %s; ", CONTAINER_NAME, address, MINIO_ADMIN, MINIO_ADMIN_SECRET) + - String.format("mc admin user add %s %s %s; ", CONTAINER_NAME, MINIO_USER, MINIO_USER_SECRET) + - String.format("mc mb --ignore-existing %s/%s; ", CONTAINER_NAME, BUCKET) + - String.format("mc admin policy attach %s readwrite --user %s;", CONTAINER_NAME, MINIO_USER); + String setupScript = String.join("; ", cmd1, cmd2, cmd3, cmd4, "exit 0", " "); + logger.atInfo().log("Running setup script: %s", setupScript); try (GenericContainer mc = new GenericContainer<>(DockerImageName.parse("minio/mc:latest")) - // Use the string array version of withCommand to ensure /bin/sh -c gets exactly one string - .withCommand("/bin/sh", "-c", setupScript) + .withNetwork(NETWORK) + .withCreateContainerCmdModifier(cmd -> cmd.withEntrypoint("")) // Clear the entrypoint to allow shell usage + .withCommand("/bin/sh", "-c", setupScript ) .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("minio-mc"))) .withAccessToHost(true)) { mc.start();