diff --git a/cwms-data-api/build.gradle b/cwms-data-api/build.gradle
index c2bc13bf1..0e2bbac37 100644
--- a/cwms-data-api/build.gradle
+++ b/cwms-data-api/build.gradle
@@ -151,6 +151,8 @@ dependencies {
implementation(libs.bundles.overrides)
testImplementation(libs.bundles.java.parser)
+ implementation(libs.togglz.core)
+ implementation(libs.minio)
}
task extractWebJars(type: Copy) {
@@ -245,7 +247,7 @@ task run(type: JavaExec) {
}
task integrationTests(type: Test) {
- dependsOn test
+// dependsOn test
dependsOn generateConfig
dependsOn war
diff --git a/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java b/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java
index 76d668c01..fb66e766c 100644
--- a/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java
+++ b/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java
@@ -29,16 +29,18 @@
import com.codahale.metrics.Timer;
import cwms.cda.api.errors.CdaError;
import cwms.cda.data.dao.BlobDao;
+import cwms.cda.data.dao.StreamConsumer;
+import io.javalin.core.util.Header;
import io.javalin.http.Context;
import io.javalin.http.Handler;
import io.javalin.plugin.openapi.annotations.OpenApi;
import io.javalin.plugin.openapi.annotations.OpenApiContent;
import io.javalin.plugin.openapi.annotations.OpenApiParam;
import io.javalin.plugin.openapi.annotations.OpenApiResponse;
+import org.jetbrains.annotations.NotNull;
import org.jooq.DSLContext;
import javax.servlet.http.HttpServletResponse;
-import java.io.InputStream;
import static com.codahale.metrics.MetricRegistry.name;
import static cwms.cda.api.Controllers.*;
@@ -84,26 +86,39 @@ private Timer.Context markAndTime(String subject) {
)},
tags = {BinaryTimeSeriesController.TAG}
)
- public void handle(Context ctx) {
+ public void handle(@NotNull Context ctx) {
//Implementation will change with new CWMS schema
//https://www.hec.usace.army.mil/confluence/display/CWMS/2024-02-29+Task2A+Text-ts+and+Binary-ts+Design
try (Timer.Context ignored = markAndTime(GET_ALL)) {
String binaryId = requiredParam(ctx, BLOB_ID);
String officeId = requiredParam(ctx, OFFICE);
DSLContext dsl = getDslContext(ctx);
+
+ ctx.header(Header.ACCEPT_RANGES, "bytes");
+
+ final Long offset;
+ final Long end ;
+ long[] ranges = RangeParser.parseFirstRange(ctx.header(io.javalin.core.util.Header.RANGE));
+ if (ranges != null) {
+ offset = ranges[0];
+ end = ranges[1];
+ } else {
+ offset = null;
+ end = null;
+ }
+
BlobDao blobDao = new BlobDao(dsl);
- blobDao.getBlob(binaryId, officeId, (blob, mediaType) -> {
- if (blob == null) {
+ StreamConsumer streamConsumer = (is, isPosition, mediaType, totalLength) -> {
+ if (is == null) {
ctx.status(HttpServletResponse.SC_NOT_FOUND).json(new CdaError("Unable to find "
+ "blob based on given parameters"));
} else {
- long size = blob.length();
- requestResultSize.update(size);
- try (InputStream is = blob.getBinaryStream()) {
- RangeRequestUtil.seekableStream(ctx, is, mediaType, size);
- }
+ requestResultSize.update(totalLength);
+ RangeRequestUtil.seekableStream(ctx, is, isPosition, mediaType, totalLength);
}
- });
+ };
+
+ blobDao.getBlob(binaryId, officeId, streamConsumer, offset, end);
}
}
}
diff --git a/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java b/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java
index f203713b5..e675731dc 100644
--- a/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java
+++ b/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java
@@ -8,8 +8,7 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import cwms.cda.api.errors.CdaError;
-import cwms.cda.data.dao.BlobDao;
-import cwms.cda.data.dao.JooqDao;
+import cwms.cda.data.dao.*;
import cwms.cda.data.dto.Blob;
import cwms.cda.data.dto.Blobs;
import cwms.cda.data.dto.CwmsDTOPaginated;
@@ -26,13 +25,16 @@
import io.javalin.plugin.openapi.annotations.OpenApiParam;
import io.javalin.plugin.openapi.annotations.OpenApiRequestBody;
import io.javalin.plugin.openapi.annotations.OpenApiResponse;
-import java.io.InputStream;
+
import java.util.Optional;
import javax.servlet.http.HttpServletResponse;
import org.jetbrains.annotations.NotNull;
import org.jooq.DSLContext;
+import org.togglz.core.context.FeatureContext;
+import cwms.cda.features.CdaFeatures;
+import org.togglz.core.manager.FeatureManager;
/**
@@ -62,33 +64,58 @@ protected DSLContext getDslContext(Context ctx) {
return JooqDao.getDslContext(ctx);
}
+ private BlobAccess chooseBlobAccess(DSLContext dsl) {
+ boolean useObjectStore = isObjectStorageEnabled();
+ try {
+ // Prefer Togglz if available
+ FeatureManager featureManager = FeatureContext.getFeatureManager();
+ useObjectStore = featureManager.isActive(CdaFeatures.USE_OBJECT_STORAGE_BLOBS);
+ } catch (Throwable ignore) {
+ // fall back to system/env property check
+ }
+ if (useObjectStore) {
+ ObjectStorageConfig cfg = ObjectStorageConfig.fromSystem();
+ return new ObjectStorageBlobDao(cfg);
+ }
+ return new BlobDao(dsl);
+ }
+
+ private boolean isObjectStorageEnabled() {
+ // System properties first, then env. Accept FEATURE=true
+ String key = String.valueOf(CdaFeatures.USE_OBJECT_STORAGE_BLOBS);
+ String v = System.getProperty(key);
+ if (v == null) v = System.getProperty(key);
+ if (v == null) v = System.getenv(key);
+ return v != null && ("true".equalsIgnoreCase(v) || "1".equals(v));
+ }
+
@OpenApi(
- queryParams = {
- @OpenApiParam(name = OFFICE,
- description = "Specifies the owning office. If this field is not "
- + "specified, matching information from all offices shall be "
- + "returned."),
- @OpenApiParam(name = PAGE,
- description = "This end point can return a lot of data, this "
- + "identifies where in the request you are. This is an opaque"
- + " value, and can be obtained from the 'next-page' value in "
- + "the response."),
- @OpenApiParam(name = PAGE_SIZE,
- type = Integer.class,
- description = "How many entries per page returned. Default "
- + DEFAULT_PAGE_SIZE + "."),
- @OpenApiParam(name = LIKE,
- description = "Posix regular expression "
- + "describing the blob id's you want")
- },
- responses = {@OpenApiResponse(status = STATUS_200,
- description = "A list of blobs.",
- content = {
- @OpenApiContent(type = Formats.JSON, from = Blobs.class),
- @OpenApiContent(type = Formats.JSONV2, from = Blobs.class),
- })
- },
- tags = {TAG}
+ queryParams = {
+ @OpenApiParam(name = OFFICE,
+ description = "Specifies the owning office. If this field is not "
+ + "specified, matching information from all offices shall be "
+ + "returned."),
+ @OpenApiParam(name = PAGE,
+ description = "This end point can return a lot of data, this "
+ + "identifies where in the request you are. This is an opaque"
+ + " value, and can be obtained from the 'next-page' value in "
+ + "the response."),
+ @OpenApiParam(name = PAGE_SIZE,
+ type = Integer.class,
+ description = "How many entries per page returned. Default "
+ + DEFAULT_PAGE_SIZE + "."),
+ @OpenApiParam(name = LIKE,
+ description = "Posix regular expression "
+ + "describing the blob id's you want")
+ },
+ responses = {@OpenApiResponse(status = STATUS_200,
+ description = "A list of blobs.",
+ content = {
+ @OpenApiContent(type = Formats.JSON, from = Blobs.class),
+ @OpenApiContent(type = Formats.JSONV2, from = Blobs.class),
+ })
+ },
+ tags = {TAG}
)
@Override
public void getAll(@NotNull Context ctx) {
@@ -115,7 +142,7 @@ public void getAll(@NotNull Context ctx) {
String formatHeader = ctx.header(Header.ACCEPT);
ContentType contentType = Formats.parseHeader(formatHeader, Blobs.class);
- BlobDao dao = new BlobDao(dsl);
+ BlobAccess dao = chooseBlobAccess(dsl);
Blobs blobs = dao.getBlobs(cursor, pageSize, office, like);
String result = Formats.format(contentType, blobs);
@@ -130,20 +157,20 @@ public void getAll(@NotNull Context ctx) {
description = "Returns the binary value of the requested blob as a seekable stream with the "
+ "appropriate media type.",
queryParams = {
- @OpenApiParam(name = OFFICE, description = "Specifies the owning office."),
- @OpenApiParam(name = BLOB_ID, description = "If this _query_ parameter is provided the id _path_ parameter "
- + "is ignored and the value of the query parameter is used. "
- + "Note: this query parameter is necessary for id's that contain '/' or other special "
- + "characters. This is due to limitations in path pattern matching. "
- + "We will likely add support for encoding the ID in the path in the future. For now use the id field for those IDs. "
- + "Client libraries should detect slashes and choose the appropriate field. \"ignored\" is suggested for the path endpoint."),
+ @OpenApiParam(name = OFFICE, description = "Specifies the owning office."),
+ @OpenApiParam(name = BLOB_ID, description = "If this _query_ parameter is provided the id _path_ parameter "
+ + "is ignored and the value of the query parameter is used. "
+ + "Note: this query parameter is necessary for id's that contain '/' or other special "
+ + "characters. This is due to limitations in path pattern matching. "
+ + "We will likely add support for encoding the ID in the path in the future. For now use the id field for those IDs. "
+ + "Client libraries should detect slashes and choose the appropriate field. \"ignored\" is suggested for the path endpoint."),
},
responses = {
- @OpenApiResponse(status = STATUS_200,
- description = "Returns requested blob.",
- content = {
- @OpenApiContent(type = "application/octet-stream")
- })
+ @OpenApiResponse(status = STATUS_200,
+ description = "Returns requested blob.",
+ content = {
+ @OpenApiContent(type = "application/octet-stream")
+ })
},
tags = {TAG}
)
@@ -156,27 +183,40 @@ public void getOne(@NotNull Context ctx, @NotNull String blobId) {
blobId = idQueryParam;
}
DSLContext dsl = getDslContext(ctx);
- BlobDao dao = new BlobDao(dsl);
+
+ BlobAccess dao = chooseBlobAccess(dsl);
String officeQP = ctx.queryParam(OFFICE);
Optional office = Optional.ofNullable(officeQP);
- BlobDao.BlobConsumer tripleConsumer = (blob, mediaType) -> {
- if (blob == null) {
+ final Long offset;
+ final Long end;
+ long[] ranges = RangeParser.parseFirstRange(ctx.header(io.javalin.core.util.Header.RANGE));
+ if (ranges != null) {
+ offset = ranges[0];
+ end = ranges[1];
+ } else {
+ offset = null;
+ end = null;
+ }
+
+ ctx.header(Header.ACCEPT_RANGES, "bytes");
+
+ StreamConsumer consumer = (is, isPosition, mediaType, totalLength) -> {
+ if (is == null) {
ctx.status(HttpServletResponse.SC_NOT_FOUND).json(new CdaError("Unable to find "
+ "blob based on given parameters"));
} else {
- long size = blob.length();
- requestResultSize.update(size);
- try (InputStream is = blob.getBinaryStream()) { // is OracleBlobInputStream
- RangeRequestUtil.seekableStream(ctx, is, mediaType, size);
- }
+ requestResultSize.update(totalLength);
+ // is OracleBlobInputStream or something from MinIO
+ RangeRequestUtil.seekableStream(ctx, is, isPosition, mediaType, totalLength);
}
};
+
if (office.isPresent()) {
- dao.getBlob(blobId, office.get(), tripleConsumer);
+ dao.getBlob(blobId, office.get(), consumer, offset, end);
} else {
- dao.getBlob(blobId, tripleConsumer);
+ dao.getBlob(blobId, null, consumer, offset, end);
}
}
}
@@ -185,13 +225,13 @@ public void getOne(@NotNull Context ctx, @NotNull String blobId) {
@OpenApi(
description = "Create new Blob",
requestBody = @OpenApiRequestBody(
- content = {
- @OpenApiContent(from = Blob.class, type = Formats.JSONV2)
- },
- required = true),
+ content = {
+ @OpenApiContent(from = Blob.class, type = Formats.JSONV2)
+ },
+ required = true),
queryParams = {
- @OpenApiParam(name = FAIL_IF_EXISTS, type = Boolean.class,
- description = "Create will fail if provided ID already exists. Default: true")
+ @OpenApiParam(name = FAIL_IF_EXISTS, type = Boolean.class,
+ description = "Create will fail if provided ID already exists. Default: true")
},
method = HttpMethod.POST,
tags = {TAG}
@@ -204,7 +244,7 @@ public void create(@NotNull Context ctx) {
boolean failIfExists = ctx.queryParamAsClass(FAIL_IF_EXISTS, Boolean.class).getOrDefault(true);
ContentType contentType = Formats.parseHeader(formatHeader, Blob.class);
Blob blob = Formats.parseContent(contentType, ctx.bodyAsInputStream(), Blob.class);
- BlobDao dao = new BlobDao(dsl);
+ BlobAccess dao = chooseBlobAccess(dsl);
dao.create(blob, failIfExists, false);
ctx.status(HttpCode.CREATED);
}
@@ -213,21 +253,21 @@ public void create(@NotNull Context ctx) {
@OpenApi(
description = "Update an existing Blob",
pathParams = {
- @OpenApiParam(name = BLOB_ID, description = "The blob identifier to be deleted"),
+ @OpenApiParam(name = BLOB_ID, description = "The blob identifier to be updated"),
},
requestBody = @OpenApiRequestBody(
- content = {
- @OpenApiContent(from = Blob.class, type = Formats.JSONV2),
- @OpenApiContent(from = Blob.class, type = Formats.JSON)
- },
- required = true),
+ content = {
+ @OpenApiContent(from = Blob.class, type = Formats.JSONV2),
+ @OpenApiContent(from = Blob.class, type = Formats.JSON)
+ },
+ required = true),
queryParams = {
- @OpenApiParam(name = BLOB_ID, description = "If this _query_ parameter is provided the id _path_ parameter "
- + "is ignored and the value of the query parameter is used. "
- + "Note: this query parameter is necessary for id's that contain '/' or other special "
- + "characters. This is due to limitations in path pattern matching. "
- + "We will likely add support for encoding the ID in the path in the future. For now use the id field for those IDs. "
- + "Client libraries should detect slashes and choose the appropriate field. \"ignored\" is suggested for the path endpoint."),
+ @OpenApiParam(name = BLOB_ID, description = "If this _query_ parameter is provided the id _path_ parameter "
+ + "is ignored and the value of the query parameter is used. "
+ + "Note: this query parameter is necessary for id's that contain '/' or other special "
+ + "characters. This is due to limitations in path pattern matching. "
+ + "We will likely add support for encoding the ID in the path in the future. For now use the id field for those IDs. "
+ + "Client libraries should detect slashes and choose the appropriate field. \"ignored\" is suggested for the path endpoint."),
},
method = HttpMethod.PATCH,
tags = {TAG}
@@ -260,7 +300,13 @@ public void update(@NotNull Context ctx, @NotNull String blobId) {
+ "updating a blob");
}
- BlobDao dao = new BlobDao(dsl);
+ if (!blob.getId().equals(blobId)) {
+ throw new FormattingException("The blob id parameter does not match the blob id in the body. " +
+ "The blob end-point does not support renaming blobs. " +
+ "Create a new blob with the new id and delete the old one.");
+ }
+
+ BlobAccess dao = chooseBlobAccess(dsl);
dao.update(blob, false);
ctx.status(HttpServletResponse.SC_OK);
}
@@ -269,17 +315,17 @@ public void update(@NotNull Context ctx, @NotNull String blobId) {
@OpenApi(
description = "Deletes requested blob",
pathParams = {
- @OpenApiParam(name = BLOB_ID, description = "The blob identifier to be deleted"),
+ @OpenApiParam(name = BLOB_ID, description = "The blob identifier to be deleted"),
},
queryParams = {
- @OpenApiParam(name = OFFICE, required = true, description = "Specifies the "
- + "owning office of the blob to be deleted"),
- @OpenApiParam(name = BLOB_ID, description = "If this _query_ parameter is provided the id _path_ parameter "
- + "is ignored and the value of the query parameter is used. "
- + "Note: this query parameter is necessary for id's that contain '/' or other special "
- + "characters. This is due to limitations in path pattern matching. "
- + "We will likely add support for encoding the ID in the path in the future. For now use the id field for those IDs. "
- + "Client libraries should detect slashes and choose the appropriate field. \"ignored\" is suggested for the path endpoint."),
+ @OpenApiParam(name = OFFICE, required = true, description = "Specifies the "
+ + "owning office of the blob to be deleted"),
+ @OpenApiParam(name = BLOB_ID, description = "If this _query_ parameter is provided the id _path_ parameter "
+ + "is ignored and the value of the query parameter is used. "
+ + "Note: this query parameter is necessary for id's that contain '/' or other special "
+ + "characters. This is due to limitations in path pattern matching. "
+ + "We will likely add support for encoding the ID in the path in the future. For now use the id field for those IDs. "
+ + "Client libraries should detect slashes and choose the appropriate field. \"ignored\" is suggested for the path endpoint."),
},
method = HttpMethod.DELETE,
tags = {TAG}
@@ -293,7 +339,7 @@ public void delete(@NotNull Context ctx, @NotNull String blobId) {
}
DSLContext dsl = getDslContext(ctx);
String office = requiredParam(ctx, OFFICE);
- BlobDao dao = new BlobDao(dsl);
+ BlobAccess dao = chooseBlobAccess(dsl);
dao.delete(office, blobId);
ctx.status(HttpServletResponse.SC_NO_CONTENT);
}
diff --git a/cwms-data-api/src/main/java/cwms/cda/api/ClobController.java b/cwms-data-api/src/main/java/cwms/cda/api/ClobController.java
index 62e5ed6a2..89d858abe 100644
--- a/cwms-data-api/src/main/java/cwms/cda/api/ClobController.java
+++ b/cwms-data-api/src/main/java/cwms/cda/api/ClobController.java
@@ -27,6 +27,7 @@
import cwms.cda.api.errors.CdaError;
import cwms.cda.data.dao.ClobDao;
import cwms.cda.data.dao.JooqDao;
+import cwms.cda.data.dao.StreamConsumer;
import cwms.cda.data.dto.Clob;
import cwms.cda.data.dto.Clobs;
import cwms.cda.data.dto.CwmsDTOPaginated;
@@ -43,7 +44,7 @@
import io.javalin.plugin.openapi.annotations.OpenApiParam;
import io.javalin.plugin.openapi.annotations.OpenApiRequestBody;
import io.javalin.plugin.openapi.annotations.OpenApiResponse;
-import java.io.InputStream;
+
import java.util.Objects;
import java.util.Optional;
import javax.servlet.http.HttpServletResponse;
@@ -187,16 +188,16 @@ public void getOne(@NotNull Context ctx, @NotNull String clobId) {
if (TEXT_PLAIN.equals(formatHeader)) {
// useful cmd: curl -X 'GET' 'http://localhost:7000/cwms-data/clobs/encoded?office=SPK&id=%2FTIME%20SERIES%20TEXT%2F6261044'
// -H 'accept: text/plain' --header "Range: bytes=20000-40000"
- dao.getClob(clobId, office, c -> {
- if (c == null) {
- ctx.status(HttpServletResponse.SC_NOT_FOUND).json(new CdaError("Unable to find "
- + "clob based on given parameters"));
- } else {
- try (InputStream is = c.getAsciiStream()) {
- RangeRequestUtil.seekableStream(ctx, is, TEXT_PLAIN, c.length());
- }
- }
- });
+
+ ctx.header(Header.ACCEPT_RANGES, "bytes");
+
+ StreamConsumer consumer = (is, isPosition, mediaType, totalLength) -> {
+ requestResultSize.update(totalLength);
+ RangeRequestUtil.seekableStream(ctx, is, isPosition, mediaType, totalLength);
+ };
+
+ dao.getClob(clobId, office, consumer);
+
} else {
Optional optAc = dao.getByUniqueName(clobId, office);
diff --git a/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java b/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java
index 0fcd431c7..d41274d87 100644
--- a/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java
+++ b/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java
@@ -29,16 +29,18 @@
import com.codahale.metrics.Timer;
import cwms.cda.api.errors.CdaError;
import cwms.cda.data.dao.ForecastInstanceDao;
+import cwms.cda.data.dao.StreamConsumer;
import cwms.cda.helpers.DateUtils;
+import io.javalin.core.util.Header;
import io.javalin.http.Context;
import io.javalin.http.Handler;
import io.javalin.plugin.openapi.annotations.OpenApi;
import io.javalin.plugin.openapi.annotations.OpenApiContent;
import io.javalin.plugin.openapi.annotations.OpenApiParam;
import io.javalin.plugin.openapi.annotations.OpenApiResponse;
+import org.jetbrains.annotations.NotNull;
import javax.servlet.http.HttpServletResponse;
-import java.io.InputStream;
import java.time.Instant;
import static com.codahale.metrics.MetricRegistry.name;
@@ -92,7 +94,7 @@ private Timer.Context markAndTime(String subject) {
},
tags = {ForecastSpecController.TAG}
)
- public void handle(Context ctx) {
+ public void handle(@NotNull Context ctx) {
String specId = requiredParam(ctx, NAME);
String office = requiredParam(ctx, OFFICE);
String designator = ctx.queryParamAsClass(DESIGNATOR, String.class).allowNullable().get();
@@ -102,18 +104,17 @@ public void handle(Context ctx) {
Instant issueInstant = DateUtils.parseUserDate(issueDate, "UTC").toInstant();
try (Timer.Context ignored = markAndTime(GET_ALL)) {
ForecastInstanceDao dao = new ForecastInstanceDao(getDslContext(ctx));
- dao.getFileBlob(office, specId, designator, forecastInstant, issueInstant, (blob, mediaType) -> {
- if (blob == null) {
+ StreamConsumer streamConsumer = (is, isPosition, mediaType, totalLength) -> {
+ if (is == null) {
ctx.status(HttpServletResponse.SC_NOT_FOUND).json(new CdaError("Unable to find "
+ "blob based on given parameters"));
} else {
- long size = blob.length();
- requestResultSize.update(size);
- try (InputStream is = blob.getBinaryStream()) {
- RangeRequestUtil.seekableStream(ctx, is, mediaType, size);
- }
+ requestResultSize.update(totalLength);
+ ctx.header(Header.ACCEPT_RANGES, "bytes");
+ RangeRequestUtil.seekableStream(ctx, is, isPosition, mediaType, totalLength);
}
- });
+ };
+ dao.getFileBlob(office, specId, designator, forecastInstant, issueInstant, streamConsumer);
}
}
}
\ No newline at end of file
diff --git a/cwms-data-api/src/main/java/cwms/cda/api/RangeParser.java b/cwms-data-api/src/main/java/cwms/cda/api/RangeParser.java
new file mode 100644
index 000000000..ccfb4e204
--- /dev/null
+++ b/cwms-data-api/src/main/java/cwms/cda/api/RangeParser.java
@@ -0,0 +1,128 @@
+package cwms.cda.api;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.*;
+import java.util.regex.*;
+
+/**
+ * Utility class for parsing HTTP Range headers.
+ * These typically look like: bytes=100-1234
+ * or: bytes=100- this is common to resume a download
+ * or: bytes=0- equivalent to a regular request for the whole file
+ * but by returning 206 we show that we support range requests
+ * Note that multiple ranges can be requested at once such
+ * as: bytes=500-600,700-999 Server responds identifies separator and then puts separator between chunks
+ * bytes=0-0,-1 also legal its just the first and the last byte
+ * or: bytes=500-600,601-999 legal but what is the point?
+ * or: bytes=500-700,601-999 legal, notice they overlap.
+ *
+ *
+ */
+public class RangeParser {
+
+ private static final Pattern RANGE_PATTERN = Pattern.compile("(\\d*)-(\\d*)");
+
+ /**
+ * Return a list of two element long[] containing byte ranges parsed from the HTTP Range header.
+ * If the end of a range is not specified ( e.g. bytes=100- ) then a -1 is returned in the second position
+ * If the range only includes a negative byte (e.g bytes=-50) then -1 is returned as the start of the range
+ * and -1*end is returned as the end of the range. bytes=-50 will result in [-1,50]
+ *
+ * @param header the HTTP Range header this should start with "bytes=" if it is null or empty an empty list is returned
+ * @return a list of long[2] holding the ranges
+ */
+ public static List parse(String header) {
+ if (header == null || header.isEmpty() ) {
+ return Collections.emptyList();
+ } else if ( !header.startsWith("bytes=")){
+ throw new IllegalArgumentException("Invalid Range header: " + header);
+ }
+
+ String rangePart = header.substring(6);
+ List retval = parseRanges(rangePart);
+ if( retval.isEmpty() ){
+ throw new IllegalArgumentException("Invalid Range header: " + header);
+ }
+ return retval;
+ }
+
+ public static long[] parseFirstRange(String header) {
+ if(header != null) {
+ List ranges = RangeParser.parse(header);
+ if (!ranges.isEmpty()) {
+ return ranges.get(0);
+ }
+ }
+ return null;
+ }
+
+ public static @NotNull List parseRanges(String rangePart) {
+ if( rangePart == null || rangePart.isEmpty() ){
+ throw new IllegalArgumentException("Invalid range specified: " + rangePart);
+ }
+ String[] parts = rangePart.split(",");
+ List ranges = new ArrayList<>();
+
+ for (String part : parts) {
+ Matcher m = RANGE_PATTERN.matcher(part.trim());
+ if (m.matches()) {
+ String start = m.group(1);
+ String end = m.group(2);
+
+ long s = start.isEmpty() ? -1 : Long.parseLong(start);
+ long e = end.isEmpty() ? -1 : Long.parseLong(end);
+
+ ranges.add(new long[]{s, e});
+ }
+ }
+ return ranges;
+ }
+
+ /**
+ * The parse() method in this class can return -1 for unspecified values or when suffix ranges are supplied.
+ * This method interprets the negative values in regard to the totalSize and returns inclusive indices of the
+ * requested range.
+ * @param inputs the array of start and end byte positions
+ * @param totalBytes the total number of bytes in the file
+ * @return a long array with the start and end byte positions, these are inclusive. [0,0] means return the first byte
+ */
+ public static long[] interpret(long[] inputs, long totalBytes){
+ if(inputs == null){
+ throw new IllegalArgumentException("null range array provided");
+ } else if( inputs.length != 2 ){
+ throw new IllegalArgumentException("Invalid number of inputs: " + Arrays.toString(inputs));
+ }
+
+ long start = inputs[0];
+ long end = inputs[1];
+
+ if(start == -1L){
+ // its a suffix request.
+ start = totalBytes - end;
+ end = totalBytes - 1;
+ } else {
+ if (start < 0 ) {
+ throw new IllegalArgumentException("Invalid range specified: " + Arrays.toString(inputs));
+ }
+
+ if(end == -1L){
+ end = totalBytes - 1;
+ }
+
+ if(end < start){
+ throw new IllegalArgumentException("Invalid range specified: " + Arrays.toString(inputs));
+ }
+
+ if(start > totalBytes - 1){
+ throw new IllegalArgumentException("Can't satisfy range request: " + Arrays.toString(inputs) + " Range starts beyond end of file.");
+ }
+
+ end = Math.min(end, totalBytes - 1);
+ }
+
+ return new long[]{start, end};
+ }
+
+
+}
diff --git a/cwms-data-api/src/main/java/cwms/cda/api/RangeRequestUtil.java b/cwms-data-api/src/main/java/cwms/cda/api/RangeRequestUtil.java
index c84c74afa..97cdbc6f0 100644
--- a/cwms-data-api/src/main/java/cwms/cda/api/RangeRequestUtil.java
+++ b/cwms-data-api/src/main/java/cwms/cda/api/RangeRequestUtil.java
@@ -1,61 +1,78 @@
package cwms.cda.api;
+import com.google.common.flogger.FluentLogger;
import io.javalin.core.util.Header;
import io.javalin.http.Context;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.Arrays;
+import java.io.Reader;
import java.util.List;
+import org.apache.commons.io.IOUtils;
public class RangeRequestUtil {
+ static FluentLogger logger = FluentLogger.forEnclosingClass();
private RangeRequestUtil() {
// utility class
}
+
+ public static void seekableStream(Context ctx, InputStream is, String mediaType, long totalBytes) throws IOException {
+ seekableStream(ctx, is, 0, mediaType, totalBytes);
+ }
+
/**
- * Javalin has a method very similar to this in its Context class. The issue is that Javalin decided to
- * take the InputStream, wrap it in a CompletedFuture and then process the request asynchronously. This
- * causes problems when the InputStream is tied to a database connection that gets closed before the
- * async processing happens. This method doesn't do the async thing but tries to support the rest.
- * @param ctx
- * @param is
- * @param mediaType
- * @param totalBytes
- * @throws IOException
+ * This method copies data from InputStream is and into the Context response OutputStream.
+ * If the request include a Range request header than the response will be a partial response for the first range.
+ * Javalin has a similar method in its Context class that handles the streaming asynchronously. The Javalin
+ * method caused problems when our database connections were closed before the streaming completes.
+ * Both SQL Blobs and S3 streams are capable of retrieving Streams at a user-controlled offset. Those methods
+ * should be used and the offset passed in the isPosition parameter.
+ * If additional skipping needs to be done, this method will call InputStream.skip() which may have some
+ * implications for specific implementations but works efficiently for others.
+ * @param ctx the Javalin context
+ * @param is the input stream
+ * @param isPostion the current position in the input stream.
+ * @param mediaType the content type
+ * @param totalBytes the total number of bytes in the input stream
+ * @throws IOException if either of the streams throw an IOException
*/
- public static void seekableStream(Context ctx, InputStream is, String mediaType, long totalBytes) throws IOException {
- long from = 0;
- long to = totalBytes - 1;
+ public static void seekableStream(Context ctx, InputStream is, long isPostion, String mediaType, long totalBytes) throws IOException {
+
if (ctx.header(Header.RANGE) == null) {
+ // Not a range request.
ctx.res.setContentType(mediaType);
+
+ if(isPostion > 0){
+ throw new IllegalArgumentException("Input stream position must be 0 for non-range requests");
+ }
+
// Javalin's version of this method doesn't set the content-length
// Not setting the content-length makes the servlet container use Transfer-Encoding=chunked.
// Chunked is a worse experience overall, seems like we should just set the length if we know it.
- writeRange(ctx.res.getOutputStream(), is, from, Math.min(to, totalBytes - 1));
+ ctx.header(Header.CONTENT_LENGTH, String.valueOf(totalBytes));
+
+ IOUtils.copyLarge(is, (OutputStream) ctx.res.getOutputStream(), 0, totalBytes);
} else {
- int chunkSize = 128000;
String rangeHeader = ctx.header(Header.RANGE);
- String[] eqSplit = rangeHeader.split("=", 2);
- String[] dashSplit = eqSplit[1].split("-", -1); // keep empty trailing part
-
- List requestedRange = Arrays.stream(dashSplit)
- .filter(s -> !s.isEmpty())
- .collect(java.util.stream.Collectors.toList());
-
- from = Long.parseLong(requestedRange.get(0));
-
- if (from + chunkSize > totalBytes) {
- // chunk bigger than file, write all
- to = totalBytes - 1;
- } else if (requestedRange.size() == 2) {
- // chunk smaller than file, to/from specified
- to = Long.parseLong(requestedRange.get(1));
- } else {
- // chunk smaller than file, to/from not specified
- to = from + chunkSize - 1;
+
+ List ranges = RangeParser.parse(rangeHeader);
+
+ long[] requestedRange = ranges.get(0);
+ if( ranges.size() > 1 ){
+ // we support range requests but we not currently supporting multiple ranges.
+ // Range request are optional so we have choices what to do if multiple ranges are requested:
+ // We could return 416 and hope the client figures out to only send one range
+ // We could service the first range with 206 and ignore the other ranges
+ // We could ignore the range request entirely and return the full body with 200
+ // We could implement support for multiple ranges
+ logger.atInfo().log("Multiple ranges requested, using first and ignoring additional ranges");
}
+ requestedRange = RangeParser.interpret(requestedRange, totalBytes);
+
+ long from = requestedRange[0];
+ long to = requestedRange[1];
ctx.status(206);
@@ -64,33 +81,98 @@ public static void seekableStream(Context ctx, InputStream is, String mediaType,
ctx.res.setContentType(mediaType);
ctx.header(Header.CONTENT_LENGTH, String.valueOf(Math.min(to - from + 1, totalBytes)));
- writeRange(ctx.res.getOutputStream(), is, from, Math.min(to, totalBytes - 1));
+
+ if(isPostion < from){
+ skip(is, from-isPostion);
+ }
+ long len = Math.min(to, totalBytes - 1) - from + 1;
+
+ // If the inputOffset to IOUtils.copyLarge is not 0 then IOUtils will do its own skipping. For reasons
+ // that IOUtils explains (quirks of certain streams) it does its skipping via read(). Using read() has
+ // performance implications b/c all the skipped data still gets retrieved and copied to memory. In our
+ // use-case the data comes from a database blob/clob/s3. Copying (potential) gigabytes of data we
+ // don't need across the network is not ideal. We've tried to address this performance impact in two
+ // ways. 1) We allow callers to pass an input stream that is already positioned so that skipping isn't
+ // needed. 2) If skipping is needed we call InputStream.skip() directly (this is efficient for the
+ // stream from Oracle Blobs).
+
+ // We do our own skipping and then have IOUtils copy.
+ IOUtils.copyLarge(is, (OutputStream) ctx.res.getOutputStream(), 0, len);
}
}
+ /**
+ * Similar to seekableStream but for Reader. For some reason the java.sql.Clob does not specify a method to get a
+ * Stream that is already positioned but there is a method to position a Reader.
+ * @param ctx
+ * @param reader
+ * @param isPostion
+ * @param mediaType
+ * @param totalBytes
+ * @throws IOException
+ */
+ public static void seekableReader(Context ctx, Reader reader, long isPostion, String mediaType, long totalBytes) throws IOException {
+
+ if (ctx.header(Header.RANGE) == null) {
+ // Not a range request.
+ ctx.res.setContentType(mediaType);
+
+ if(isPostion > 0){
+ throw new IllegalArgumentException("Input stream position must be 0 for non-range requests");
+ }
+
+ ctx.header(Header.CONTENT_LENGTH, String.valueOf(totalBytes));
+
+ IOUtils.copyLarge(reader, ctx.res.getWriter(), 0, totalBytes);
+ } else {
+ String rangeHeader = ctx.header(Header.RANGE);
+
+ List ranges = RangeParser.parse(rangeHeader);
+
+ long[] requestedRange = ranges.get(0);
+ if( ranges.size() > 1 ){
+ // we support range requests but we not currently supporting multiple ranges.
+ // Range request are optional so we have choices what to do if multiple ranges are requested:
+ // We could return 416 and hope the client figures out to only send one range
+ // We could service the first range with 206 and ignore the other ranges
+ // We could ignore the range request entirely and return the full body with 200
+ // We could implement support for multiple ranges
+ logger.atInfo().log("Multiple ranges requested, using first and ignoring additional ranges");
+ }
+ requestedRange = RangeParser.interpret(requestedRange, totalBytes);
+
+ long from = requestedRange[0];
+ long to = requestedRange[1];
+
+ ctx.status(206);
+
+ ctx.header(Header.ACCEPT_RANGES, "bytes");
+ ctx.header(Header.CONTENT_RANGE, "bytes " + from + "-" + to + "/" + totalBytes);
+
+ ctx.res.setContentType(mediaType);
+ ctx.header(Header.CONTENT_LENGTH, String.valueOf(Math.min(to - from + 1, totalBytes)));
+
+ if(isPostion < from){
+ skip(reader, from-isPostion);
+ }
+ long len = Math.min(to, totalBytes - 1) - from + 1;
- public static void writeRange(OutputStream out, InputStream in, long from, long to) throws IOException {
- writeRange(out, in, from, to, new byte[8192]);
+ IOUtils.copyLarge(reader, ctx.res.getWriter(), 0, len);
+ }
}
- public static void writeRange(OutputStream out, InputStream is, long from, long to, byte[] buffer) throws IOException {
- long toSkip = from;
+ private static void skip(InputStream is, long toSkip) throws IOException {
while (toSkip > 0) {
long skipped = is.skip(toSkip);
toSkip -= skipped;
}
+ }
- long bytesLeft = to - from + 1;
- while (bytesLeft != 0L) {
- int maxRead = (int) Math.min(buffer.length, bytesLeft);
- int read = is.read(buffer, 0, maxRead);
- if (read == -1) {
- break;
- }
- out.write(buffer, 0, read);
- bytesLeft -= read;
+ private static void skip(Reader reader, long toSkip) throws IOException {
+ while (toSkip > 0) {
+ long skipped = reader.skip(toSkip);
+ toSkip -= skipped;
}
-
}
}
diff --git a/cwms-data-api/src/main/java/cwms/cda/api/TextTimeSeriesValueController.java b/cwms-data-api/src/main/java/cwms/cda/api/TextTimeSeriesValueController.java
index 72857b8bd..7702b4006 100644
--- a/cwms-data-api/src/main/java/cwms/cda/api/TextTimeSeriesValueController.java
+++ b/cwms-data-api/src/main/java/cwms/cda/api/TextTimeSeriesValueController.java
@@ -27,8 +27,9 @@
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
-import cwms.cda.api.errors.CdaError;
import cwms.cda.data.dao.ClobDao;
+import cwms.cda.data.dao.StreamConsumer;
+import io.javalin.core.util.Header;
import io.javalin.http.Context;
import io.javalin.http.Handler;
import io.javalin.plugin.openapi.annotations.OpenApi;
@@ -37,9 +38,6 @@
import io.javalin.plugin.openapi.annotations.OpenApiResponse;
import org.jooq.DSLContext;
-import javax.servlet.http.HttpServletResponse;
-import java.io.InputStream;
-
import static com.codahale.metrics.MetricRegistry.name;
import static cwms.cda.api.Controllers.*;
import static cwms.cda.data.dao.JooqDao.getDslContext;
@@ -92,18 +90,14 @@ public void handle(Context ctx) {
try (Timer.Context ignored = markAndTime(GET_ALL)) {
DSLContext dsl = getDslContext(ctx);
ClobDao clobDao = new ClobDao(dsl);
- clobDao.getClob(textId, officeId, clob -> {
- if (clob == null) {
- ctx.status(HttpServletResponse.SC_NOT_FOUND).json(new CdaError("Unable to find "
- + "clob based on given parameters"));
- } else {
- long size = clob.length();
- requestResultSize.update(size);
- try(InputStream is = clob.getAsciiStream()){
- RangeRequestUtil.seekableStream(ctx, is, TEXT_PLAIN, size);
- }
- }
- });
+
+ StreamConsumer consumer = (is, isPosition, mediaType, totalLength) -> {
+ requestResultSize.update(totalLength);
+ ctx.header(Header.ACCEPT_RANGES, "bytes");
+ RangeRequestUtil.seekableStream(ctx, is, isPosition, mediaType, totalLength);
+ };
+ clobDao.getClob(textId, officeId, consumer);
+
}
}
}
diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobAccess.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobAccess.java
new file mode 100644
index 000000000..0616a6dfa
--- /dev/null
+++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobAccess.java
@@ -0,0 +1,22 @@
+package cwms.cda.data.dao;
+
+import cwms.cda.data.dto.Blob;
+import cwms.cda.data.dto.Blobs;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Optional;
+
+public interface BlobAccess {
+ @NotNull Blobs getBlobs(@Nullable String cursor, int pageSize, @Nullable String officeId, @Nullable String like);
+
+ Optional getByUniqueName(String id, String office);
+
+ void getBlob(String id, String office, StreamConsumer consumer, @Nullable Long offset, @Nullable Long end);
+
+ void create(Blob blob, boolean failIfExists, boolean ignoreNulls);
+
+ void update(Blob blob, boolean ignoreNulls);
+
+ void delete(String office, String id);
+}
diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java
index bdac0a54f..d0875e4f5 100644
--- a/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java
+++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java
@@ -1,5 +1,7 @@
package cwms.cda.data.dao;
+import com.google.common.flogger.FluentLogger;
+import cwms.cda.api.RangeParser;
import cwms.cda.api.errors.NotFoundException;
import cwms.cda.data.dto.Blob;
import cwms.cda.data.dto.Blobs;
@@ -14,6 +16,7 @@
import org.jooq.ResultQuery;
import org.jooq.SelectLimitPercentStep;
import org.jooq.Table;
+
import usace.cwms.db.jooq.codegen.packages.CWMS_TEXT_PACKAGE;
import usace.cwms.db.jooq.codegen.tables.AV_CWMS_MEDIA_TYPE;
import usace.cwms.db.jooq.codegen.tables.AV_OFFICE;
@@ -34,8 +37,8 @@
import static org.jooq.impl.DSL.table;
import static org.jooq.impl.DSL.upper;
-public class BlobDao extends JooqDao {
-
+public class BlobDao extends JooqDao implements BlobAccess {
+ static FluentLogger logger = FluentLogger.forEnclosingClass();
public static final String ID = "ID";
public static final String DESCRIPTION = "DESCRIPTION";
public static final String OFFICE_CODE = "OFFICE_CODE";
@@ -85,7 +88,9 @@ public Optional getByUniqueName(String id, String limitToOffice) {
return Optional.ofNullable(retVal);
}
- public void getBlob(String id, String office, BlobConsumer consumer) {
+ @Override
+ public void getBlob(String id, String office, StreamConsumer consumer, @Nullable Long offset, @Nullable Long end) {
+
// Not using jOOQ here because we want the java.sql.Blob and not an automatic field binding. We want
// blob so that we can pull out a stream to the data and pass that to javalin.
// If the request included Content-Ranges Javalin can have the stream skip to the correct
@@ -97,54 +102,97 @@ public void getBlob(String id, String office, BlobConsumer consumer) {
//
connection(dsl, connection -> {
- try (PreparedStatement preparedStatement = connection.prepareStatement(BLOB_WITH_OFFICE)) {
- preparedStatement.setString(1, office);
- preparedStatement.setString(2, id);
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- if (resultSet.next()) {
- String mediaType = resultSet.getString("MEDIA_TYPE_ID");
- java.sql.Blob blob = resultSet.getBlob("VALUE");
- try {
- consumer.accept(blob, mediaType);
- } finally {
- if (blob != null) {
- blob.free();
- }
- }
- } else {
- throw new NotFoundException("Unable to find blob with id " + id + " in office " + office);
- }
+ if(office == null ){
+ try (PreparedStatement preparedStatement = connection.prepareStatement(BLOB_QUERY)) {
+ preparedStatement.setString(1, id);
+ executeAndHandle(consumer, offset, end, preparedStatement, "Unable to find blob with id " + id);
+ }
+ } else {
+ try (PreparedStatement preparedStatement = connection.prepareStatement(BLOB_WITH_OFFICE)) {
+ preparedStatement.setString(1, office);
+ preparedStatement.setString(2, id);
+
+ executeAndHandle(consumer, offset, end, preparedStatement, "Unable to find blob with id " + id + " in office " + office);
}
}
});
}
- public void getBlob(String id, BlobConsumer consumer) {
-
- connection(dsl, connection -> {
- try (PreparedStatement preparedStatement = connection.prepareStatement(BLOB_QUERY)) {
- preparedStatement.setString(1, id);
+ private static void executeAndHandle(StreamConsumer consumer, @Nullable Long offset, @Nullable Long end, PreparedStatement preparedStatement, String message) throws SQLException, IOException {
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ if (resultSet.next()) {
+ handleResultSet(consumer, offset, end, resultSet);
+ } else {
+ throw new NotFoundException(message);
+ }
+ }
+ }
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- if (resultSet.next()) {
- String mediaType = resultSet.getString("MEDIA_TYPE_ID");
- java.sql.Blob blob = resultSet.getBlob("VALUE");
- try {
- consumer.accept(blob, mediaType);
- } finally {
- if (blob != null) {
- blob.free();
- }
- }
- } else {
- throw new NotFoundException("Unable to find blob with id " + id);
- }
+ /**
+ *
+ * @param consumer
+ * @param offset where to start reading. 0 is first byte
+ * @param end position of last byte to include. inclusive. 0 would mean only return the first byte.
+ * @param resultSet
+ * @throws SQLException
+ * @throws IOException
+ */
+ private static void handleResultSet(StreamConsumer consumer, @Nullable Long offset, @Nullable Long end, ResultSet resultSet) throws SQLException, IOException {
+ String mediaType = resultSet.getString("MEDIA_TYPE_ID");
+ java.sql.Blob blob = resultSet.getBlob("VALUE");
+ try {
+ long totalLength = blob.length();
+ if (offset != null) {
+ long pos = offset + 1; // For getBinaryStream the first byte is at 1.
+ long length = getLength(offset, end, totalLength);
+
+ logger.atFine().log("Reading blob at pos %s, length %s, totalLength %s", pos, length, totalLength);
+ try (InputStream stream = blob.getBinaryStream(pos, length)) {
+ consumer.accept(stream, offset, mediaType, totalLength);
+ } catch (SQLException e) {
+ logger.atWarning().withCause(e).log("Error reading blob at offset %s, length %s, totalLength %s", offset, length, totalLength);
+ throw e;
+ }
+ } else {
+ try (InputStream stream = blob.getBinaryStream()) {
+ consumer.accept(stream, 0, mediaType, totalLength);
}
}
- });
+ } finally {
+ blob.free();
+ }
}
+ /**
+ *
+ * @param offset the index of the first byte to read. Like http range-requests 0 is first byte. -1 is last byte.
+ * @param end the index of the last byte to read, inclusive. null reads until the end of the blob. -1 is also last byte.
+ *
+ * @param totalLength the total length of the blob
+ * @return the length of the range to read
+ */
+ static long getLength(@NotNull Long offset, @Nullable Long end, long totalLength) {
+
+ long length;
+ if(end != null){
+ // The length we are getting passed in is from range-request and could be negative to indicate suffix
+ long[] startEnd = RangeParser.interpret(new long[]{offset, end}, totalLength);
+ if(startEnd != null){
+ offset = startEnd[0];
+ end = startEnd[1];
+ }
+
+ length = end - offset + 1;
+ } else {
+ // if its not set just assume we are reading until the end of blob.
+ // Consumer can always close stream early.
+ length = totalLength - offset;
+ }
+ return length;
+ }
+
+
public List getAll(String officeId, String like) {
String queryStr = "SELECT AT_BLOB.ID, AT_BLOB.DESCRIPTION, CWMS_MEDIA_TYPE.MEDIA_TYPE_ID, CWMS_OFFICE.OFFICE_ID\n"
+ " FROM CWMS_20.AT_BLOB \n"
@@ -179,6 +227,7 @@ public List getAll(String officeId, String like) {
* @param like filter blobs by a case-insensitive regex pattern on their IDs, can be null or empty
* @return a Blobs object containing the retrieved blobs and pagination information
*/
+ @Override
public @NotNull Blobs getBlobs(@Nullable String cursor, int pageSize, @Nullable String officeId, @Nullable String like) {
String cursorOffice = null;
@@ -249,6 +298,7 @@ public List getAll(String officeId, String like) {
return builder.build();
}
+ @Override
public void create(Blob blob, boolean failIfExists, boolean ignoreNulls) {
String pFailIfExists = formatBool(failIfExists);
String pIgnoreNulls = formatBool(ignoreNulls);
@@ -265,6 +315,7 @@ public void create(Blob blob, boolean failIfExists, boolean ignoreNulls) {
blob.getOfficeId()));
}
+ @Override
public void update(Blob blob, boolean ignoreNulls) {
String pFailIfExists = formatBool(false);
String pIgnoreNulls = formatBool(ignoreNulls);
@@ -288,6 +339,7 @@ public void update(Blob blob, boolean ignoreNulls) {
blob.getOfficeId()));
}
+ @Override
public void delete(String office, String id) {
if (!blobExists(office, id)) {
throw new NotFoundException("Unable to find blob with id " + id + " in office " + office);
@@ -336,8 +388,5 @@ public static byte[] readFully(@NotNull InputStream stream) throws IOException {
return output.toByteArray();
}
- @FunctionalInterface
- public interface BlobConsumer {
- void accept(java.sql.Blob blob, String mediaType) throws SQLException, IOException;
- }
+
}
diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/ClobDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/ClobDao.java
index c27b35348..3603b6593 100644
--- a/cwms-data-api/src/main/java/cwms/cda/data/dao/ClobDao.java
+++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/ClobDao.java
@@ -22,6 +22,7 @@
import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
import java.io.Reader;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -221,20 +222,18 @@ public void update(Clob clob, boolean ignoreNulls) {
*
* @param clobId the id to search for
* @param officeId the office
- * @param clobConsumer a consumer that should be handed the input stream and the length of the stream.
+ * @param streamConsumer a consumer that should be handed the input stream and the length of the stream.
*/
- public void getClob(String clobId, String officeId, ClobConsumer clobConsumer) {
- // Not using jOOQ here because we want the java.sql.Clob and not an automatic field binding. We want
- // clob so that we can pull out a stream to the data and pass that to javalin.
- // If the request included Content-Ranges Javalin can have the stream skip to the correct
- // location, which will avoid reading unneeded data. Passing this stream right to the javalin
- // response should let CDA return a huge (2Gb) clob to the client without ever holding the entire String
- // in memory.
- // We can't use the stream once the connection we get from jooq is closed, so we have to pass in
- // what we want javalin to do with the stream as a consumer.
- //
-
+ public void getClob(String clobId, String officeId, StreamConsumer streamConsumer) {
dsl.connection(connection -> {
+ // Not using jOOQ here because we want the java.sql.Clob and not an automatic field binding. We want
+ // clob so that we can pull out a stream to the data and pass that to javalin.
+ // If the request included Content-Ranges Javalin can have the stream skip to the correct
+ // location, which will avoid reading unneeded data. Passing this stream right to the javalin
+ // response should let CDA return a huge (2Gb) clob to the client without ever holding the entire String
+ // in memory.
+ // We can't use the stream once the connection we get from jooq is closed, so we have to pass in
+ // what we want javalin to do with the stream as a consumer.
try (PreparedStatement preparedStatement = connection.prepareStatement(SELECT_CLOB_QUERY)) {
preparedStatement.setString(1, officeId);
preparedStatement.setString(2, clobId);
@@ -242,13 +241,12 @@ public void getClob(String clobId, String officeId, ClobConsumer clobConsumer) {
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
java.sql.Clob clob = resultSet.getClob("VALUE");
+ long length = clob.length();
- try {
- clobConsumer.accept(clob);
+ try (InputStream is = clob.getAsciiStream()){
+ streamConsumer.accept(is, 0, "text/plain", length);
} finally {
- if (clob != null) {
- clob.free();
- }
+ clob.free();
}
} else {
throw new NotFoundException("Unable to find clob with id " + clobId + " in office " + officeId);
@@ -270,8 +268,5 @@ public static String readFully(java.sql.Clob clob) throws IOException, SQLExcept
}
}
- @FunctionalInterface
- public interface ClobConsumer {
- void accept(java.sql.Clob blob) throws SQLException, IOException;
- }
+
}
diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/ForecastInstanceDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/ForecastInstanceDao.java
index 8b70042e8..610ecaedf 100644
--- a/cwms-data-api/src/main/java/cwms/cda/data/dao/ForecastInstanceDao.java
+++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/ForecastInstanceDao.java
@@ -149,7 +149,7 @@ private static String mapToJson(Map metadata) {
private static Map mapFromJson(String forecastInfo) {
try {
- return JsonV2.buildObjectMapper().readValue(forecastInfo, new TypeReference