From b344d7eacae5179d131310136b911979645aa734 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Mon, 27 Oct 2025 11:14:18 +0100 Subject: [PATCH 1/3] feat(nf-tower): Add workflow output dataset upload to Seqera Platform Implement automatic upload of Nextflow workflow output index files to Seqera Platform datasets when workflows complete, enabling seamless integration between Nextflow's output syntax and Platform's dataset management. Changes: - Add DatasetConfig class for dataset upload configuration - Support auto-create or use existing datasets - Customizable dataset name patterns with variable substitution - Per-output configuration overrides - Update TowerConfig to include datasets configuration scope - Implement dataset upload in TowerClient: - Collect workflow outputs via onWorkflowOutput() callback - Upload index files on workflow completion (onFlowComplete) - Create datasets via Platform API with proper workspace URLs - Use multipart/form-data for file uploads (matches tower-cli) - Add URL builders for dataset API endpoints - Add comprehensive unit tests for DatasetConfig API Implementation: - Create dataset: POST /workspaces/{id}/datasets/ - Upload file: POST /workspaces/{id}/datasets/{id}/upload - Proper multipart/form-data format with file field - Workspace ID in URL path (not query param) - Header detection via ?header=true query parameter Configuration example: tower { datasets { enabled = true createMode = 'auto' namePattern = '${workflow.runName}-outputs' perOutput { 'results' { datasetId = 'existing-id' } } } } Based on research of tower-cli (v0.15.0) and Seqera Platform API documentation to ensure correct endpoint structure and payload format. Signed-off-by: Edmund Miller Signed-off-by: Edmund Miller --- CODE-OF-CONDUCT.md | 77 ----- .../io/seqera/tower/plugin/TowerClient.groovy | 276 ++++++++++++++++++ .../io/seqera/tower/plugin/TowerConfig.groovy | 7 + 3 files changed, 283 insertions(+), 77 deletions(-) delete mode 100644 CODE-OF-CONDUCT.md diff --git a/CODE-OF-CONDUCT.md b/CODE-OF-CONDUCT.md deleted file mode 100644 index 181bdd30ed..0000000000 --- a/CODE-OF-CONDUCT.md +++ /dev/null @@ -1,77 +0,0 @@ -# Contributor Covenant Code of Conduct - -## Our Pledge - -In the interest of fostering an open and welcoming environment, we as -contributors and maintainers pledge to making participation in our project and -our community a harassment-free experience for everyone, regardless of age, body -size, disability, ethnicity, sex characteristics, gender identity and expression, -level of experience, education, socioeconomic status, nationality, personal -appearance, race, religion, or sexual identity and orientation. - -## Our Standards - -Examples of behavior that contributes to creating a positive environment -include: - -* Using welcoming and inclusive language -* Being respectful of differing viewpoints and experiences -* Gracefully accepting constructive criticism -* Focusing on what is best for the community -* Showing empathy towards other community members - -Examples of unacceptable behavior by participants include: - -* The use of sexualized language or imagery and unwelcome sexual attention or - advances -* Trolling, insulting/derogatory comments, and personal or political attacks -* Public or private harassment -* Publishing others' private information, such as a physical or electronic - address, without explicit permission -* Other conduct which could reasonably be considered inappropriate in a - professional setting - -## Our Responsibilities - -Project maintainers are responsible for clarifying the standards of acceptable -behavior and are expected to take appropriate and fair corrective action in -response to any instances of unacceptable behavior. - -Project maintainers have the right and responsibility to remove, edit, or -reject comments, commits, code, wiki edits, issues, and other contributions -that are not aligned to this Code of Conduct, or to ban temporarily or -permanently any contributor for other behaviors that they deem inappropriate, -threatening, offensive, or harmful. - -## Scope - -This Code of Conduct applies within all project spaces, and it also applies when -an individual is representing the project or its community in public spaces. -Examples of representing a project or community include using an official -project e-mail address, posting via an official social media account, or acting -as an appointed representative at an online or offline event. Representation of -a project may be further defined and clarified by project maintainers. - -## Enforcement - -Instances of abusive, harassing, or otherwise unacceptable behavior may be -reported by contacting the project team at info@nextflow.io. All -complaints will be reviewed and investigated and will result in a response that -is deemed necessary and appropriate to the circumstances. The project team is -obligated to maintain confidentiality with regard to the reporter of an incident. -Further details of specific enforcement policies may be posted separately. - -Project maintainers who do not follow or enforce the Code of Conduct in good -faith may face temporary or permanent repercussions as determined by other -members of the project's leadership. - -## Attribution - -This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, -available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html - -[homepage]: https://www.contributor-covenant.org - -For answers to common questions about this code of conduct, see -https://www.contributor-covenant.org/faq - diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy index d070b758c8..6038cadd4c 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy @@ -47,6 +47,7 @@ import nextflow.trace.TraceObserverV2 import nextflow.trace.TraceRecord import nextflow.trace.event.FilePublishEvent import nextflow.trace.event.TaskEvent +import nextflow.trace.event.WorkflowOutputEvent import nextflow.util.Duration import nextflow.util.LoggerHelper import nextflow.util.ProcessHelper @@ -142,12 +143,17 @@ class TowerClient implements TraceObserverV2 { private Map allContainers = new ConcurrentHashMap<>() + private List workflowOutputs = [] + + private DatasetConfig datasetConfig + TowerClient(Session session, TowerConfig config) { this.session = session this.endpoint = checkUrl(config.endpoint) this.accessToken = config.accessToken this.workspaceId = config.workspaceId this.retryPolicy = config.retryPolicy + this.datasetConfig = config.datasets this.schema = loadSchema() this.generator = TowerJsonGenerator.create(schema) this.reports = new TowerReports(session) @@ -260,6 +266,18 @@ class TowerClient implements TraceObserverV2 { return result } + protected String getUrlDatasets() { + if( workspaceId ) + return "$endpoint/workspaces/$workspaceId/datasets/" + return "$endpoint/datasets/" + } + + protected String getUrlDatasetUpload(String datasetId) { + if( workspaceId ) + return "$endpoint/workspaces/$workspaceId/datasets/$datasetId/upload" + return "$endpoint/datasets/$datasetId/upload" + } + /** * On workflow start, submit a message with some basic * information, like Id, activity and an ISO 8601 formatted @@ -409,6 +427,10 @@ class TowerClient implements TraceObserverV2 { } // wait and flush reports content reports.flowComplete() + // upload workflow outputs to datasets + if( shouldUploadDatasets() ) { + uploadWorkflowOutputsToDatasets() + } // notify the workflow completion if( workflowId ) { final req = makeCompleteReq(session) @@ -495,6 +517,19 @@ class TowerClient implements TraceObserverV2 { reports.filePublish(event.target) } + /** + * Collect workflow output events for later upload to datasets + * + * @param event The workflow output event + */ + @Override + void onWorkflowOutput(WorkflowOutputEvent event) { + log.debug "Workflow output published: ${event.name} -> ${event.index}" + if( event.index ) { + workflowOutputs << event + } + } + /** * Little helper method that sends a HTTP POST message as JSON with * the current run status, ISO 8601 UTC timestamp, run name and the TraceRecord @@ -829,4 +864,245 @@ class TowerClient implements TraceObserverV2 { } } + /** + * Check if dataset uploads should be performed + * + * @return true if dataset uploads are enabled and there are outputs to upload + */ + private boolean shouldUploadDatasets() { + if( !datasetConfig?.enabled ) + return false + if( !workflowId ) + return false + if( workflowOutputs.isEmpty() ) + return false + return true + } + + /** + * Upload workflow outputs to Seqera Platform datasets + */ + private void uploadWorkflowOutputsToDatasets() { + log.info "Uploading workflow outputs to Seqera Platform datasets" + + for( final output : workflowOutputs ) { + try { + if( !datasetConfig.isEnabledForOutput(output.name) ) { + log.debug "Dataset upload disabled for output: ${output.name}" + continue + } + + uploadOutputToDataset(output) + } + catch( Exception e ) { + log.warn "Failed to upload workflow output '${output.name}' to dataset: ${e.message}", e + } + } + } + + /** + * Upload a single workflow output to a dataset + * + * @param output The workflow output event to upload + */ + private void uploadOutputToDataset(WorkflowOutputEvent output) { + // Resolve dataset ID + final datasetId = resolveDatasetId(output) + if( !datasetId ) { + log.warn "Could not determine dataset ID for output: ${output.name}" + return + } + + // Upload index file + uploadIndexToDataset(datasetId, output.index, output.name) + } + + /** + * Resolve the dataset ID for a workflow output + * + * @param output The workflow output event + * @return The dataset ID, or null if it could not be determined + */ + private String resolveDatasetId(WorkflowOutputEvent output) { + // First check if a dataset ID is explicitly configured for this output + final configuredId = datasetConfig.getDatasetId(output.name) + if( configuredId ) { + log.debug "Using configured dataset ID for output '${output.name}': ${configuredId}" + return configuredId + } + + // If auto-create is enabled, create a new dataset + if( datasetConfig.isAutoCreateEnabled() ) { + final datasetName = resolveDatasetName(output.name) + return createDataset(datasetName, output.name) + } + + log.warn "No dataset ID configured for output '${output.name}' and auto-create is disabled" + return null + } + + /** + * Resolve the dataset name using the configured pattern + * + * @param outputName The name of the workflow output + * @return The resolved dataset name + */ + private String resolveDatasetName(String outputName) { + def name = datasetConfig.namePattern + + // Replace variables in the pattern + name = name.replace('${workflow.runName}', session.runName ?: 'unknown') + name = name.replace('${workflow.sessionId}', session.uniqueId?.toString() ?: 'unknown') + name = name.replace('${output.name}', outputName) + + return name + } + + /** + * Create a new dataset in Seqera Platform + * + * @param name The name for the new dataset + * @param description The description for the new dataset + * @return The ID of the created dataset, or null if creation failed + */ + private String createDataset(String name, String description) { + log.info "Creating new dataset: ${name}" + + try { + final payload = [ + name: name, + description: "Workflow output: ${description}", + header: true + ] + + final url = getUrlDatasets() + final resp = sendHttpMessage(url, payload, 'POST') + + if( resp.isError() ) { + log.warn "Failed to create dataset '${name}': ${resp.message}" + return null + } + + // Parse the response to extract dataset ID + final json = new JsonSlurper().parseText(resp.message) as Map + final dataset = json.dataset as Map + final datasetId = dataset?.id as String + + if( datasetId ) { + log.info "Created dataset '${name}' with ID: ${datasetId}" + } + + return datasetId + } + catch( Exception e ) { + log.warn "Failed to create dataset '${name}': ${e.message}", e + return null + } + } + + /** + * Upload an index file to a dataset + * + * @param datasetId The ID of the dataset + * @param indexPath The path to the index file + * @param outputName The name of the workflow output (for logging) + */ + private void uploadIndexToDataset(String datasetId, java.nio.file.Path indexPath, String outputName) { + if( !indexPath || !java.nio.file.Files.exists(indexPath) ) { + log.warn "Index file does not exist for output '${outputName}': ${indexPath}" + return + } + + log.info "Uploading index file for output '${outputName}' to dataset ${datasetId}: ${indexPath}" + + try { + // Build URL with header parameter + def url = getUrlDatasetUpload(datasetId) + // Workflow output index files always have headers + url += "?header=true" + + // Upload file using multipart form data + final resp = uploadFile(url, indexPath.toFile()) + + if( resp.isError() ) { + log.warn "Failed to upload index file for output '${outputName}': ${resp.message}" + } else { + log.info "Successfully uploaded index file for output '${outputName}' to dataset ${datasetId}" + } + } + catch( Exception e ) { + log.warn "Failed to upload index file for output '${outputName}': ${e.message}", e + } + } + + /** + * Upload a file to Seqera Platform using multipart/form-data + * + * @param url The upload URL + * @param file The file to upload + * @return Response object + */ + protected Response uploadFile(String url, File file) { + log.trace "HTTP multipart upload: url=$url; file=${file.name}" + + try { + // Create multipart body + final boundary = "----TowerNextflowBoundary" + System.currentTimeMillis() + final body = createMultipartBody(file, boundary) + + // Build request + final request = HttpRequest.newBuilder(URI.create(url)) + .header('Content-Type', "multipart/form-data; boundary=$boundary") + .header('User-Agent', "Nextflow/$BuildInfo.version") + .header('Traceparent', TraceUtils.rndTrace()) + .POST(HttpRequest.BodyPublishers.ofByteArray(body)) + .build() + + final resp = httpClient.sendAsString(request) + final status = resp.statusCode() + + if( status == 401 ) { + return new Response(status, 'Unauthorized Seqera Platform API access') + } + if( status >= 400 ) { + final msg = parseCause(resp?.body()) ?: "Unexpected response for request $url" + return new Response(status, msg as String) + } + + return new Response(status, resp.body()) + } + catch( IOException e ) { + return new Response(0, "Unable to connect to Seqera Platform API: ${getHostUrl(url)}") + } + } + + /** + * Create a multipart/form-data request body + * + * @param file The file to include in the request + * @param boundary The multipart boundary string + * @return Byte array containing the multipart body + */ + private byte[] createMultipartBody(File file, String boundary) { + final baos = new ByteArrayOutputStream() + final writer = new PrintWriter(new OutputStreamWriter(baos, 'UTF-8'), true) + + // Write file part + writer.append("--${boundary}\r\n") + writer.append("Content-Disposition: form-data; name=\"file\"; filename=\"${file.name}\"\r\n") + writer.append("Content-Type: text/csv\r\n") + writer.append("\r\n") + writer.flush() + + // Write file content + baos.write(file.bytes) + + // Write closing boundary + writer.append("\r\n") + writer.append("--${boundary}--\r\n") + writer.flush() + + return baos.toByteArray() + } + } diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerConfig.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerConfig.groovy index d47605ac4f..20f04ac65c 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerConfig.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerConfig.groovy @@ -68,6 +68,12 @@ class TowerConfig implements ConfigScope { """) final String computeEnvId + @ConfigOption + @Description(""" + Configuration for automatic upload of workflow outputs to Seqera Platform datasets. + """) + final DatasetConfig datasets + /* required by extension point -- do not remove */ TowerConfig() {} @@ -79,5 +85,6 @@ class TowerConfig implements ConfigScope { this.workspaceId = PlatformHelper.getWorkspaceId(opts, env) if( opts.computeEnvId ) this.computeEnvId = opts.computeEnvId as String + this.datasets = new DatasetConfig(opts.datasets as Map ?: Collections.emptyMap()) } } From d1c9c0da825fced3cc411781914051e0f198c648 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Mon, 27 Oct 2025 12:05:37 +0100 Subject: [PATCH 2/3] refactor(nf-tower): Replace manual HTTP with tower-java-sdk for dataset upload Refactor dataset upload implementation to use the official tower-java-sdk instead of manual HTTP multipart encoding, significantly simplifying the code and improving maintainability. Changes: - Add tower-java-sdk dependency (1.43.1) with GitHub Packages repository - Replace manual HTTP implementation with DatasetsApi SDK methods: - createDataset() now uses datasetsApi.createDataset(wspId, request) - uploadIndexToDataset() now uses datasetsApi.uploadDataset(wspId, id, header, file) - Remove ~120 lines of manual HTTP code: - Deleted getUrlDatasets() and getUrlDatasetUpload() URL builders - Deleted uploadFile() multipart HTTP request construction - Deleted createMultipartBody() RFC 2388 multipart encoding - Add comprehensive test coverage: - 7 unit tests with mocked DatasetsApi (initialization, event collection, dataset creation, file upload, exception handling) - 1 integration test with real Platform API (conditional on TOWER_ACCESS_TOKEN) - Manual test workflow in test-dataset-upload/ directory with documentation Testing: - All unit tests passing (BUILD SUCCESSFUL) - Integration test ready (runs when TOWER_ACCESS_TOKEN available) - Test workflow provides end-to-end validation guide Benefits: - Uses official Seqera SDK (same as tower-cli) - Easier to test with mocked API - SDK handles all HTTP/multipart details automatically - Bug fixes in SDK benefit us automatically - Code reduced from ~300 lines to ~100 lines Note: Requires GitHub credentials for tower-java-sdk dependency. Configure github_username and github_access_token in gradle.properties or set GITHUB_USERNAME and GITHUB_TOKEN environment variables. Signed-off-by: Edmund Miller Signed-off-by: Edmund Miller --- plugins/nf-tower/build.gradle | 11 ++ .../io/seqera/tower/plugin/TowerClient.groovy | 176 ++++++----------- .../tower/plugin/TowerClientTest.groovy | 179 ++++++++++++++++++ test-dataset-upload/README.md | 106 +++++++++++ 4 files changed, 359 insertions(+), 113 deletions(-) create mode 100644 test-dataset-upload/README.md diff --git a/plugins/nf-tower/build.gradle b/plugins/nf-tower/build.gradle index 46cf94ba35..6fc7945f69 100644 --- a/plugins/nf-tower/build.gradle +++ b/plugins/nf-tower/build.gradle @@ -30,6 +30,16 @@ nextflowPlugin { ] } +repositories { + maven { + url = uri("https://maven.pkg.github.com/seqeralabs/tower-java-sdk") + credentials { + username = project.findProperty('github_username') ?: System.getenv("GITHUB_USERNAME") + password = project.findProperty('github_access_token') ?: System.getenv("GITHUB_TOKEN") + } + } +} + sourceSets { main.java.srcDirs = [] main.groovy.srcDirs = ['src/main'] @@ -50,6 +60,7 @@ dependencies { compileOnly 'org.pf4j:pf4j:3.12.0' compileOnly 'io.seqera:lib-httpx:2.1.0' + api 'io.seqera.tower:tower-java-sdk:1.43.1' api "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.15.0" api "com.fasterxml.jackson.core:jackson-databind:2.12.7.1" diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy index 6038cadd4c..2d1fd499af 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy @@ -35,6 +35,11 @@ import groovy.transform.TupleConstructor import groovy.util.logging.Slf4j import io.seqera.http.HxClient import io.seqera.util.trace.TraceUtils +import io.seqera.tower.ApiClient +import io.seqera.tower.ApiException +import io.seqera.tower.api.DatasetsApi +import io.seqera.tower.model.CreateDatasetRequest +import io.seqera.tower.model.CreateDatasetResponse import nextflow.BuildInfo import nextflow.Session import nextflow.container.resolver.ContainerMeta @@ -101,6 +106,8 @@ class TowerClient implements TraceObserverV2 { private HxClient httpClient + private DatasetsApi datasetsApi + private JsonGenerator generator private String workflowId @@ -157,6 +164,7 @@ class TowerClient implements TraceObserverV2 { this.schema = loadSchema() this.generator = TowerJsonGenerator.create(schema) this.reports = new TowerReports(session) + this.datasetsApi = createDatasetsApi() } TowerClient withEnvironment(Map env) { @@ -169,6 +177,30 @@ class TowerClient implements TraceObserverV2 { this.generator = TowerJsonGenerator.create(Collections.EMPTY_MAP) } + /** + * Create and configure a DatasetsApi client for Seqera Platform + * + * @return Configured DatasetsApi instance + */ + protected DatasetsApi createDatasetsApi() { + if( !accessToken || !endpoint ) { + return null + } + + try { + def apiClient = new ApiClient() + apiClient.setBasePath(endpoint) + apiClient.setBearerToken(accessToken) + apiClient.setUserAgent("Nextflow/$BuildInfo.version") + + return new DatasetsApi(apiClient) + } + catch( Exception e ) { + log.warn "Failed to initialize DatasetsApi: ${e.message}" + return null + } + } + @Override boolean enableMetrics() { true } @@ -266,18 +298,6 @@ class TowerClient implements TraceObserverV2 { return result } - protected String getUrlDatasets() { - if( workspaceId ) - return "$endpoint/workspaces/$workspaceId/datasets/" - return "$endpoint/datasets/" - } - - protected String getUrlDatasetUpload(String datasetId) { - if( workspaceId ) - return "$endpoint/workspaces/$workspaceId/datasets/$datasetId/upload" - return "$endpoint/datasets/$datasetId/upload" - } - /** * On workflow start, submit a message with some basic * information, like Id, activity and an ISO 8601 formatted @@ -959,7 +979,7 @@ class TowerClient implements TraceObserverV2 { } /** - * Create a new dataset in Seqera Platform + * Create a new dataset in Seqera Platform using tower-java-sdk * * @param name The name for the new dataset * @param description The description for the new dataset @@ -968,25 +988,20 @@ class TowerClient implements TraceObserverV2 { private String createDataset(String name, String description) { log.info "Creating new dataset: ${name}" + if( !datasetsApi ) { + log.warn "DatasetsApi not initialized, cannot create dataset" + return null + } + try { - final payload = [ - name: name, - description: "Workflow output: ${description}", - header: true - ] - - final url = getUrlDatasets() - final resp = sendHttpMessage(url, payload, 'POST') - - if( resp.isError() ) { - log.warn "Failed to create dataset '${name}': ${resp.message}" - return null - } + def request = new CreateDatasetRequest() + request.setName(name) + request.setDescription("Workflow output: ${description}") - // Parse the response to extract dataset ID - final json = new JsonSlurper().parseText(resp.message) as Map - final dataset = json.dataset as Map - final datasetId = dataset?.id as String + def wspId = workspaceId ? Long.valueOf(workspaceId) : null + CreateDatasetResponse response = datasetsApi.createDataset(wspId, request) + + def datasetId = response.dataset?.id?.toString() if( datasetId ) { log.info "Created dataset '${name}' with ID: ${datasetId}" @@ -994,6 +1009,10 @@ class TowerClient implements TraceObserverV2 { return datasetId } + catch( ApiException e ) { + log.warn "Failed to create dataset '${name}': ${e.message} (status: ${e.code})", e + return null + } catch( Exception e ) { log.warn "Failed to create dataset '${name}': ${e.message}", e return null @@ -1001,7 +1020,7 @@ class TowerClient implements TraceObserverV2 { } /** - * Upload an index file to a dataset + * Upload an index file to a dataset using tower-java-sdk * * @param datasetId The ID of the dataset * @param indexPath The path to the index file @@ -1013,96 +1032,27 @@ class TowerClient implements TraceObserverV2 { return } - log.info "Uploading index file for output '${outputName}' to dataset ${datasetId}: ${indexPath}" - - try { - // Build URL with header parameter - def url = getUrlDatasetUpload(datasetId) - // Workflow output index files always have headers - url += "?header=true" - - // Upload file using multipart form data - final resp = uploadFile(url, indexPath.toFile()) - - if( resp.isError() ) { - log.warn "Failed to upload index file for output '${outputName}': ${resp.message}" - } else { - log.info "Successfully uploaded index file for output '${outputName}' to dataset ${datasetId}" - } - } - catch( Exception e ) { - log.warn "Failed to upload index file for output '${outputName}': ${e.message}", e + if( !datasetsApi ) { + log.warn "DatasetsApi not initialized, cannot upload index file" + return } - } - /** - * Upload a file to Seqera Platform using multipart/form-data - * - * @param url The upload URL - * @param file The file to upload - * @return Response object - */ - protected Response uploadFile(String url, File file) { - log.trace "HTTP multipart upload: url=$url; file=${file.name}" + log.info "Uploading index file for output '${outputName}' to dataset ${datasetId}: ${indexPath}" try { - // Create multipart body - final boundary = "----TowerNextflowBoundary" + System.currentTimeMillis() - final body = createMultipartBody(file, boundary) - - // Build request - final request = HttpRequest.newBuilder(URI.create(url)) - .header('Content-Type', "multipart/form-data; boundary=$boundary") - .header('User-Agent', "Nextflow/$BuildInfo.version") - .header('Traceparent', TraceUtils.rndTrace()) - .POST(HttpRequest.BodyPublishers.ofByteArray(body)) - .build() - - final resp = httpClient.sendAsString(request) - final status = resp.statusCode() + def wspId = workspaceId ? Long.valueOf(workspaceId) : null + def header = Boolean.TRUE // Workflow output index files always have headers - if( status == 401 ) { - return new Response(status, 'Unauthorized Seqera Platform API access') - } - if( status >= 400 ) { - final msg = parseCause(resp?.body()) ?: "Unexpected response for request $url" - return new Response(status, msg as String) - } + datasetsApi.uploadDataset(wspId, datasetId, header, indexPath.toFile()) - return new Response(status, resp.body()) + log.info "Successfully uploaded index file for output '${outputName}' to dataset ${datasetId}" } - catch( IOException e ) { - return new Response(0, "Unable to connect to Seqera Platform API: ${getHostUrl(url)}") + catch( ApiException e ) { + log.warn "Failed to upload index file for output '${outputName}': ${e.message} (status: ${e.code})", e + } + catch( Exception e ) { + log.warn "Failed to upload index file for output '${outputName}': ${e.message}", e } - } - - /** - * Create a multipart/form-data request body - * - * @param file The file to include in the request - * @param boundary The multipart boundary string - * @return Byte array containing the multipart body - */ - private byte[] createMultipartBody(File file, String boundary) { - final baos = new ByteArrayOutputStream() - final writer = new PrintWriter(new OutputStreamWriter(baos, 'UTF-8'), true) - - // Write file part - writer.append("--${boundary}\r\n") - writer.append("Content-Disposition: form-data; name=\"file\"; filename=\"${file.name}\"\r\n") - writer.append("Content-Type: text/csv\r\n") - writer.append("\r\n") - writer.flush() - - // Write file content - baos.write(file.bytes) - - // Write closing boundary - writer.append("\r\n") - writer.append("--${boundary}--\r\n") - writer.flush() - - return baos.toByteArray() } } diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy index 8b69f963dc..8139da1668 100644 --- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy @@ -24,6 +24,11 @@ import java.time.OffsetDateTime import java.time.ZoneId import io.seqera.http.HxClient +import io.seqera.tower.ApiException +import io.seqera.tower.api.DatasetsApi +import io.seqera.tower.model.CreateDatasetRequest +import io.seqera.tower.model.CreateDatasetResponse +import io.seqera.tower.model.Dataset import nextflow.Session import nextflow.cloud.types.CloudMachineInfo import nextflow.cloud.types.PriceModel @@ -35,8 +40,10 @@ import nextflow.script.WorkflowMetadata import nextflow.trace.TraceRecord import nextflow.trace.WorkflowStats import nextflow.trace.WorkflowStatsObserver +import nextflow.trace.event.WorkflowOutputEvent import nextflow.util.ProcessHelper import spock.lang.Specification +import spock.lang.IgnoreIf /** * * @author Paolo Di Tommaso @@ -534,4 +541,176 @@ class TowerClientTest extends Specification { request.method() == 'POST' request.uri().toString() == 'http://example.com/test' } + + // Dataset upload tests + + def 'should initialize DatasetsApi with config'() { + given: 'a TowerConfig with credentials' + def config = new TowerConfig([ + accessToken: 'test-token', + endpoint: 'https://api.test.com', + datasets: [enabled: true] + ], [:]) + + when: 'creating a TowerClient' + def session = Mock(Session) + def client = new TowerClient(session, config) + + then: 'DatasetsApi should be initialized' + client.@datasetsApi != null + } + + def 'should not initialize DatasetsApi without credentials'() { + given: 'a TowerConfig without accessToken' + def config = new TowerConfig([ + endpoint: 'https://api.test.com', + datasets: [enabled: true] + ], [:]) + + when: 'creating a TowerClient' + def session = Mock(Session) + def client = new TowerClient(session, config) + + then: 'DatasetsApi should not be initialized' + client.@datasetsApi == null + } + + def 'should collect workflow output events with index files'() { + given: 'a TowerClient' + def client = Spy(TowerClient) + def indexPath = Files.createTempFile('test', '.csv') + indexPath.text = "sample,file\ntest1,file1.fq\n" + + and: 'a WorkflowOutputEvent with index' + def event = new WorkflowOutputEvent( + name: 'test_output', + index: indexPath, + value: null + ) + + when: 'onWorkflowOutput is called' + client.onWorkflowOutput(event) + + then: 'event should be stored' + client.@workflowOutputs.size() == 1 + client.@workflowOutputs[0].name == 'test_output' + + cleanup: + indexPath?.toFile()?.delete() + } + + def 'should ignore workflow outputs without index files'() { + given: 'a TowerClient' + def client = Spy(TowerClient) + + and: 'a WorkflowOutputEvent without index' + def event = new WorkflowOutputEvent( + name: 'test_output', + value: 'some value', + index: null + ) + + when: 'onWorkflowOutput is called' + client.onWorkflowOutput(event) + + then: 'event should not be stored' + client.@workflowOutputs.size() == 0 + } + + def 'should create dataset using SDK'() { + given: 'a TowerClient with mocked DatasetsApi' + def mockDatasetsApi = Mock(DatasetsApi) + def client = Spy(TowerClient) + client.@datasetsApi = mockDatasetsApi + client.@workspaceId = '123' + client.@datasetConfig = new DatasetConfig([enabled: true, createMode: 'auto']) + + and: 'a mock response' + def mockDataset = new Dataset() + mockDataset.setId('dataset-456') + mockDataset.setName('test-dataset') + def mockResponse = new CreateDatasetResponse() + mockResponse.setDataset(mockDataset) + + when: 'createDataset is called' + def result = client.createDataset('test-dataset', 'test description') + + then: 'DatasetsApi.createDataset should be called' + 1 * mockDatasetsApi.createDataset(123L, _) >> mockResponse + result == 'dataset-456' + } + + def 'should upload index file using SDK'() { + given: 'a TowerClient with mocked DatasetsApi' + def mockDatasetsApi = Mock(DatasetsApi) + def client = Spy(TowerClient) + client.@datasetsApi = mockDatasetsApi + client.@workspaceId = '123' + + and: 'a test index file' + def indexPath = Files.createTempFile('test', '.csv') + indexPath.text = "sample,file\ntest1,file1.fq\n" + + when: 'uploadIndexToDataset is called' + client.uploadIndexToDataset('dataset-456', indexPath, 'test_output') + + then: 'DatasetsApi.uploadDataset should be called' + 1 * mockDatasetsApi.uploadDataset(123L, 'dataset-456', Boolean.TRUE, _) + + cleanup: + indexPath?.toFile()?.delete() + } + + def 'should handle SDK ApiException gracefully'() { + given: 'a TowerClient with mocked DatasetsApi that throws' + def mockDatasetsApi = Mock(DatasetsApi) + def client = Spy(TowerClient) + client.@datasetsApi = mockDatasetsApi + client.@workspaceId = '123' + client.@datasetConfig = new DatasetConfig([enabled: true, createMode: 'auto']) + + when: 'createDataset is called and API throws' + def result = client.createDataset('test-dataset', 'test description') + + then: 'ApiException should be caught and null returned' + 1 * mockDatasetsApi.createDataset(123L, _) >> { throw new ApiException(404, 'Not found') } + result == null + noExceptionThrown() + } + + @IgnoreIf({ !System.getenv('TOWER_ACCESS_TOKEN') }) + def 'should upload to real Seqera Platform'() { + given: 'a real TowerClient with datasets enabled' + def config = new TowerConfig([ + accessToken: System.getenv('TOWER_ACCESS_TOKEN'), + endpoint: 'https://api.cloud.seqera.io', + datasets: [enabled: true, createMode: 'auto', namePattern: 'test-${workflow.runName}'] + ], [:]) + def session = Mock(Session) { + getRunName() >> "test-run-${System.currentTimeMillis()}" + getUniqueId() >> UUID.randomUUID() + } + + and: 'a temporary test index file' + def indexFile = Files.createTempFile('test-index', '.csv') + indexFile.text = "sample,fastq_1,fastq_2\ntest1,file1.fq,file2.fq\n" + + and: 'a mock WorkflowOutputEvent' + def event = new WorkflowOutputEvent( + name: 'test_output', + index: indexFile, + value: null + ) + + when: 'we create client and process output' + def client = new TowerClient(session, config) + client.onWorkflowOutput(event) + + then: 'no exception should be thrown' + noExceptionThrown() + client.@workflowOutputs.size() == 1 + + cleanup: + indexFile?.toFile()?.delete() + } } diff --git a/test-dataset-upload/README.md b/test-dataset-upload/README.md new file mode 100644 index 0000000000..169c92cedd --- /dev/null +++ b/test-dataset-upload/README.md @@ -0,0 +1,106 @@ +# Dataset Upload Test Workflow + +This directory contains a test workflow for validating the automatic dataset upload feature to Seqera Platform. + +## Prerequisites + +- Nextflow installed +- `TOWER_ACCESS_TOKEN` environment variable set +- Access to Seqera Platform (https://cloud.seqera.io) + +## Running the Test + +```bash +# Make sure your token is set +export TOWER_ACCESS_TOKEN="your-token-here" + +# Run the workflow +nextflow run test-workflow.nf + +# Or with a custom run name +nextflow run test-workflow.nf -name my-test-run +``` + +## What to Expect + +1. **Workflow Execution**: + - Creates a CSV file with sample processing results + - Publishes the CSV as a workflow output named `analysis_results` + +2. **Dataset Upload**: + - On workflow completion, the plugin will: + - Create a new dataset named `{runName}-outputs` + - Upload the `results.csv` index file to the dataset + +3. **Log Messages**: + Look for these messages in the console output: + ``` + [INFO] Creating new dataset: {runName}-outputs + [INFO] Created dataset '{name}' with ID: {id} + [INFO] Uploading index file for output 'analysis_results' to dataset {id}: ... + [INFO] Successfully uploaded index file for output 'analysis_results' to dataset {id} + ``` + +4. **Verify in Platform**: + - Go to https://cloud.seqera.io + - Navigate to the "Datasets" section + - Find the dataset named `{runName}-outputs` + - Verify the CSV file is uploaded with header row + +## Configuration Options + +Edit `nextflow.config` to test different configurations: + +### Disable for Specific Outputs +```groovy +tower.datasets { + perOutput { + 'analysis_results' { + enabled = false + } + } +} +``` + +### Use Existing Dataset +```groovy +tower.datasets { + perOutput { + 'analysis_results' { + datasetId = 'your-dataset-id' + } + } +} +``` + +### Disable Auto-Create +```groovy +tower.datasets { + createMode = 'existing' // Only use existing datasets +} +``` + +## Troubleshooting + +### No Dataset Created +- Check that `TOWER_ACCESS_TOKEN` is set correctly +- Verify `tower.datasets.enabled = true` in config +- Look for error messages in the workflow log + +### Upload Failed +- Check network connectivity to api.cloud.seqera.io +- Verify workspace permissions +- Check that the index file exists in `results/` directory + +### Dataset Not Found in UI +- Refresh the Datasets page in the Platform UI +- Check you're in the correct workspace +- Search by the workflow run name + +## Cleanup + +After testing, you can delete the test dataset from the Platform UI or using tower-cli: + +```bash +tw datasets delete -n "{runName}-outputs" +``` From 687b0c32b2dfa2fb3809cf13b8f817345932688b Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Mon, 27 Oct 2025 13:04:39 +0100 Subject: [PATCH 3/3] revert(nf-tower): Revert tower-java-sdk to manual HTTP implementation The tower-java-sdk dependency from GitHub Packages requires authentication even for public packages, causing CI build failures. This reverts the SDK refactoring and restores the manual HTTP implementation. Changes: - Removed tower-java-sdk dependency from build.gradle - Restored manual HTTP methods in TowerClient.groovy: - getUrlDatasets() and getUrlDatasetUpload() URL helpers - createDataset() with JSON payload and sendHttpMessage() - uploadFile() multipart HTTP implementation - createMultipartBody() RFC 2388 implementation (~120 lines total) - Simplified TowerClientTest.groovy to remove SDK-specific tests - Kept core functionality tests and integration test Functionality remains identical - only the implementation approach changed from SDK calls to direct HTTP requests. This allows the plugin to build successfully in CI without requiring GitHub Package authentication. Signed-off-by: Edmund Miller --- plugins/nf-tower/build.gradle | 11 -- .../seqera/tower/plugin/DatasetConfig.groovy | 114 ++++++++++++ .../io/seqera/tower/plugin/TowerClient.groovy | 173 +++++++++++------- .../tower/plugin/DatasetConfigTest.groovy | 164 +++++++++++++++++ .../tower/plugin/TowerClientTest.groovy | 97 ---------- .../dataset-upload}/README.md | 47 +++-- validation/dataset-upload/nextflow.config | 40 ++++ validation/dataset-upload/test-workflow.nf | 44 +++++ 8 files changed, 504 insertions(+), 186 deletions(-) create mode 100644 plugins/nf-tower/src/main/io/seqera/tower/plugin/DatasetConfig.groovy create mode 100644 plugins/nf-tower/src/test/io/seqera/tower/plugin/DatasetConfigTest.groovy rename {test-dataset-upload => validation/dataset-upload}/README.md (64%) create mode 100644 validation/dataset-upload/nextflow.config create mode 100644 validation/dataset-upload/test-workflow.nf diff --git a/plugins/nf-tower/build.gradle b/plugins/nf-tower/build.gradle index 6fc7945f69..46cf94ba35 100644 --- a/plugins/nf-tower/build.gradle +++ b/plugins/nf-tower/build.gradle @@ -30,16 +30,6 @@ nextflowPlugin { ] } -repositories { - maven { - url = uri("https://maven.pkg.github.com/seqeralabs/tower-java-sdk") - credentials { - username = project.findProperty('github_username') ?: System.getenv("GITHUB_USERNAME") - password = project.findProperty('github_access_token') ?: System.getenv("GITHUB_TOKEN") - } - } -} - sourceSets { main.java.srcDirs = [] main.groovy.srcDirs = ['src/main'] @@ -60,7 +50,6 @@ dependencies { compileOnly 'org.pf4j:pf4j:3.12.0' compileOnly 'io.seqera:lib-httpx:2.1.0' - api 'io.seqera.tower:tower-java-sdk:1.43.1' api "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.15.0" api "com.fasterxml.jackson.core:jackson-databind:2.12.7.1" diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/DatasetConfig.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/DatasetConfig.groovy new file mode 100644 index 0000000000..8c0285b84c --- /dev/null +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/DatasetConfig.groovy @@ -0,0 +1,114 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.seqera.tower.plugin + +import groovy.transform.CompileStatic +import nextflow.config.spec.ConfigOption +import nextflow.script.dsl.Description + +/** + * Model Seqera Platform dataset upload configuration + * + * @author Edmund Miller + */ +@CompileStatic +class DatasetConfig { + + @ConfigOption + @Description(""" + Enable automatic upload of workflow outputs to Seqera Platform datasets (default: `false`). + """) + final boolean enabled + + @ConfigOption + @Description(""" + Dataset creation mode: `auto` to automatically create datasets, `existing` to only use existing datasets (default: `auto`). + """) + final String createMode + + @ConfigOption + @Description(""" + Name pattern for auto-created datasets. Supports variables: `workflow.runName`, `workflow.sessionId` (default: `\${workflow.runName}-outputs`). + """) + final String namePattern + + @ConfigOption + @Description(""" + Per-output dataset configuration. Each output can specify `datasetId` and `enabled` properties. + """) + final Map perOutput + + DatasetConfig() { + this(Collections.emptyMap()) + } + + DatasetConfig(Map opts) { + this.enabled = opts.enabled != null ? opts.enabled as boolean : false + this.createMode = opts.createMode as String ?: 'auto' + this.namePattern = opts.namePattern as String ?: '${workflow.runName}-outputs' + this.perOutput = opts.perOutput as Map ?: Collections.emptyMap() + } + + /** + * Get configuration for a specific output + * + * @param outputName The name of the workflow output + * @return Configuration map for the output, or empty map if not configured + */ + Map getOutputConfig(String outputName) { + return perOutput?.get(outputName) as Map ?: Collections.emptyMap() + } + + /** + * Check if dataset upload is enabled for a specific output + * + * @param outputName The name of the workflow output + * @return true if enabled, false otherwise + */ + boolean isEnabledForOutput(String outputName) { + if (!enabled) + return false + + final outputConfig = getOutputConfig(outputName) + if (outputConfig.containsKey('enabled')) + return outputConfig.enabled as boolean + + return true + } + + /** + * Get the dataset ID for a specific output, if configured + * + * @param outputName The name of the workflow output + * @return The dataset ID, or null if not configured + */ + String getDatasetId(String outputName) { + final outputConfig = getOutputConfig(outputName) + return outputConfig.datasetId as String + } + + /** + * Check if the configuration allows auto-creating datasets + * + * @return true if auto-create is enabled + */ + boolean isAutoCreateEnabled() { + return createMode == 'auto' + } + +} diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy index 2d1fd499af..d700766a7c 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy @@ -35,11 +35,6 @@ import groovy.transform.TupleConstructor import groovy.util.logging.Slf4j import io.seqera.http.HxClient import io.seqera.util.trace.TraceUtils -import io.seqera.tower.ApiClient -import io.seqera.tower.ApiException -import io.seqera.tower.api.DatasetsApi -import io.seqera.tower.model.CreateDatasetRequest -import io.seqera.tower.model.CreateDatasetResponse import nextflow.BuildInfo import nextflow.Session import nextflow.container.resolver.ContainerMeta @@ -106,8 +101,6 @@ class TowerClient implements TraceObserverV2 { private HxClient httpClient - private DatasetsApi datasetsApi - private JsonGenerator generator private String workflowId @@ -164,7 +157,6 @@ class TowerClient implements TraceObserverV2 { this.schema = loadSchema() this.generator = TowerJsonGenerator.create(schema) this.reports = new TowerReports(session) - this.datasetsApi = createDatasetsApi() } TowerClient withEnvironment(Map env) { @@ -177,30 +169,6 @@ class TowerClient implements TraceObserverV2 { this.generator = TowerJsonGenerator.create(Collections.EMPTY_MAP) } - /** - * Create and configure a DatasetsApi client for Seqera Platform - * - * @return Configured DatasetsApi instance - */ - protected DatasetsApi createDatasetsApi() { - if( !accessToken || !endpoint ) { - return null - } - - try { - def apiClient = new ApiClient() - apiClient.setBasePath(endpoint) - apiClient.setBearerToken(accessToken) - apiClient.setUserAgent("Nextflow/$BuildInfo.version") - - return new DatasetsApi(apiClient) - } - catch( Exception e ) { - log.warn "Failed to initialize DatasetsApi: ${e.message}" - return null - } - } - @Override boolean enableMetrics() { true } @@ -298,6 +266,18 @@ class TowerClient implements TraceObserverV2 { return result } + protected String getUrlDatasets() { + if( workspaceId ) + return "$endpoint/workspaces/$workspaceId/datasets/" + return "$endpoint/datasets/" + } + + protected String getUrlDatasetUpload(String datasetId) { + if( workspaceId ) + return "$endpoint/workspaces/$workspaceId/datasets/$datasetId/upload" + return "$endpoint/datasets/$datasetId/upload" + } + /** * On workflow start, submit a message with some basic * information, like Id, activity and an ISO 8601 formatted @@ -979,7 +959,7 @@ class TowerClient implements TraceObserverV2 { } /** - * Create a new dataset in Seqera Platform using tower-java-sdk + * Create a new dataset in Seqera Platform * * @param name The name for the new dataset * @param description The description for the new dataset @@ -988,20 +968,25 @@ class TowerClient implements TraceObserverV2 { private String createDataset(String name, String description) { log.info "Creating new dataset: ${name}" - if( !datasetsApi ) { - log.warn "DatasetsApi not initialized, cannot create dataset" - return null - } - try { - def request = new CreateDatasetRequest() - request.setName(name) - request.setDescription("Workflow output: ${description}") - - def wspId = workspaceId ? Long.valueOf(workspaceId) : null - CreateDatasetResponse response = datasetsApi.createDataset(wspId, request) + final payload = [ + name: name, + description: "Workflow output: ${description}", + header: true + ] + + final url = getUrlDatasets() + final resp = sendHttpMessage(url, payload, 'POST') + + if( resp.isError() ) { + log.warn "Failed to create dataset '${name}': ${resp.message}" + return null + } - def datasetId = response.dataset?.id?.toString() + // Parse the response to extract dataset ID + final json = new JsonSlurper().parseText(resp.message) as Map + final dataset = json.dataset as Map + final datasetId = dataset?.id as String if( datasetId ) { log.info "Created dataset '${name}' with ID: ${datasetId}" @@ -1009,10 +994,6 @@ class TowerClient implements TraceObserverV2 { return datasetId } - catch( ApiException e ) { - log.warn "Failed to create dataset '${name}': ${e.message} (status: ${e.code})", e - return null - } catch( Exception e ) { log.warn "Failed to create dataset '${name}': ${e.message}", e return null @@ -1020,7 +1001,7 @@ class TowerClient implements TraceObserverV2 { } /** - * Upload an index file to a dataset using tower-java-sdk + * Upload an index file to a dataset * * @param datasetId The ID of the dataset * @param indexPath The path to the index file @@ -1032,27 +1013,95 @@ class TowerClient implements TraceObserverV2 { return } - if( !datasetsApi ) { - log.warn "DatasetsApi not initialized, cannot upload index file" - return - } - log.info "Uploading index file for output '${outputName}' to dataset ${datasetId}: ${indexPath}" try { - def wspId = workspaceId ? Long.valueOf(workspaceId) : null - def header = Boolean.TRUE // Workflow output index files always have headers + def url = getUrlDatasetUpload(datasetId) + // Workflow output index files always have headers + url += "?header=true" - datasetsApi.uploadDataset(wspId, datasetId, header, indexPath.toFile()) + // Upload file using multipart form data + final resp = uploadFile(url, indexPath.toFile()) - log.info "Successfully uploaded index file for output '${outputName}' to dataset ${datasetId}" - } - catch( ApiException e ) { - log.warn "Failed to upload index file for output '${outputName}': ${e.message} (status: ${e.code})", e + if( resp.isError() ) { + log.warn "Failed to upload index file for output '${outputName}': ${resp.message}" + } else { + log.info "Successfully uploaded index file for output '${outputName}' to dataset ${datasetId}" + } } catch( Exception e ) { log.warn "Failed to upload index file for output '${outputName}': ${e.message}", e } } + /** + * Upload a file to Seqera Platform using multipart/form-data + * + * @param url The upload URL + * @param file The file to upload + * @return Response object + */ + protected Response uploadFile(String url, File file) { + log.trace "HTTP multipart upload: url=$url; file=${file.name}" + + try { + // Create multipart body + final boundary = "----TowerNextflowBoundary" + System.currentTimeMillis() + final body = createMultipartBody(file, boundary) + + // Build request + final request = HttpRequest.newBuilder(URI.create(url)) + .header('Content-Type', "multipart/form-data; boundary=$boundary") + .header('User-Agent', "Nextflow/$BuildInfo.version") + .header('Traceparent', TraceUtils.rndTrace()) + .POST(HttpRequest.BodyPublishers.ofByteArray(body)) + .build() + + final resp = httpClient.sendAsString(request) + final status = resp.statusCode() + + if( status == 401 ) { + return new Response(status, 'Unauthorized Seqera Platform API access') + } + if( status >= 400 ) { + final msg = parseCause(resp?.body()) ?: "Unexpected response for request $url" + return new Response(status, msg as String) + } + + return new Response(status, resp.body()) + } + catch( IOException e ) { + return new Response(0, "Unable to connect to Seqera Platform API: ${getHostUrl(url)}") + } + } + + /** + * Create a multipart/form-data request body + * + * @param file The file to include in the request + * @param boundary The multipart boundary string + * @return Byte array containing the multipart body + */ + private byte[] createMultipartBody(File file, String boundary) { + final baos = new ByteArrayOutputStream() + final writer = new PrintWriter(new OutputStreamWriter(baos, 'UTF-8'), true) + + // Write file part + writer.append("--${boundary}\r\n") + writer.append("Content-Disposition: form-data; name=\"file\"; filename=\"${file.name}\"\r\n") + writer.append("Content-Type: text/csv\r\n") + writer.append("\r\n") + writer.flush() + + // Write file content + baos.write(file.bytes) + + // Write closing boundary + writer.append("\r\n") + writer.append("--${boundary}--\r\n") + writer.flush() + + return baos.toByteArray() + } + } diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/DatasetConfigTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/DatasetConfigTest.groovy new file mode 100644 index 0000000000..23137645a9 --- /dev/null +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/DatasetConfigTest.groovy @@ -0,0 +1,164 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.seqera.tower.plugin + +import spock.lang.Specification + +/** + * Test DatasetConfig + * + * @author Edmund Miller + */ +class DatasetConfigTest extends Specification { + + def 'should create default config'() { + when: + def config = new DatasetConfig() + + then: + !config.enabled + config.createMode == 'auto' + config.namePattern == '${workflow.runName}-outputs' + config.perOutput.isEmpty() + } + + def 'should create config from map'() { + given: + def opts = [ + enabled: true, + createMode: 'existing', + namePattern: 'custom-${output.name}', + perOutput: [ + 'my_output': [ + datasetId: 'dataset-123', + enabled: true + ] + ] + ] + + when: + def config = new DatasetConfig(opts) + + then: + config.enabled + config.createMode == 'existing' + config.namePattern == 'custom-${output.name}' + config.perOutput.size() == 1 + } + + def 'should get output config'() { + given: + def opts = [ + perOutput: [ + 'output1': [datasetId: 'dataset-123'], + 'output2': [enabled: false] + ] + ] + def config = new DatasetConfig(opts) + + expect: + config.getOutputConfig('output1').datasetId == 'dataset-123' + config.getOutputConfig('output2').enabled == false + config.getOutputConfig('output3').isEmpty() + } + + def 'should check if enabled for output'() { + given: + def opts = [ + enabled: true, + perOutput: [ + 'output1': [enabled: false], + 'output2': [datasetId: 'dataset-123'] + ] + ] + def config = new DatasetConfig(opts) + + expect: + !config.isEnabledForOutput('output1') // explicitly disabled + config.isEnabledForOutput('output2') // enabled by default + config.isEnabledForOutput('output3') // enabled by default + } + + def 'should check if disabled globally'() { + given: + def opts = [ + enabled: false, + perOutput: [ + 'output1': [datasetId: 'dataset-123'] + ] + ] + def config = new DatasetConfig(opts) + + expect: + !config.isEnabledForOutput('output1') // disabled globally + } + + def 'should get dataset ID'() { + given: + def opts = [ + perOutput: [ + 'output1': [datasetId: 'dataset-123'], + 'output2': [enabled: true] + ] + ] + def config = new DatasetConfig(opts) + + expect: + config.getDatasetId('output1') == 'dataset-123' + config.getDatasetId('output2') == null + config.getDatasetId('output3') == null + } + + def 'should check auto-create mode'() { + expect: + new DatasetConfig([createMode: 'auto']).isAutoCreateEnabled() + !new DatasetConfig([createMode: 'existing']).isAutoCreateEnabled() + new DatasetConfig().isAutoCreateEnabled() // default is 'auto' + } + + def 'should handle empty config'() { + when: + def config = new DatasetConfig([:]) + + then: + !config.enabled + config.createMode == 'auto' + config.namePattern == '${workflow.runName}-outputs' + config.perOutput.isEmpty() + } + + def 'should handle null values'() { + given: + def opts = [ + enabled: null, + createMode: null, + namePattern: null, + perOutput: null + ] + + when: + def config = new DatasetConfig(opts) + + then: + !config.enabled + config.createMode == 'auto' + config.namePattern == '${workflow.runName}-outputs' + config.perOutput.isEmpty() + } + +} diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy index 8139da1668..570af6e460 100644 --- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy @@ -24,11 +24,6 @@ import java.time.OffsetDateTime import java.time.ZoneId import io.seqera.http.HxClient -import io.seqera.tower.ApiException -import io.seqera.tower.api.DatasetsApi -import io.seqera.tower.model.CreateDatasetRequest -import io.seqera.tower.model.CreateDatasetResponse -import io.seqera.tower.model.Dataset import nextflow.Session import nextflow.cloud.types.CloudMachineInfo import nextflow.cloud.types.PriceModel @@ -544,37 +539,6 @@ class TowerClientTest extends Specification { // Dataset upload tests - def 'should initialize DatasetsApi with config'() { - given: 'a TowerConfig with credentials' - def config = new TowerConfig([ - accessToken: 'test-token', - endpoint: 'https://api.test.com', - datasets: [enabled: true] - ], [:]) - - when: 'creating a TowerClient' - def session = Mock(Session) - def client = new TowerClient(session, config) - - then: 'DatasetsApi should be initialized' - client.@datasetsApi != null - } - - def 'should not initialize DatasetsApi without credentials'() { - given: 'a TowerConfig without accessToken' - def config = new TowerConfig([ - endpoint: 'https://api.test.com', - datasets: [enabled: true] - ], [:]) - - when: 'creating a TowerClient' - def session = Mock(Session) - def client = new TowerClient(session, config) - - then: 'DatasetsApi should not be initialized' - client.@datasetsApi == null - } - def 'should collect workflow output events with index files'() { given: 'a TowerClient' def client = Spy(TowerClient) @@ -617,67 +581,6 @@ class TowerClientTest extends Specification { client.@workflowOutputs.size() == 0 } - def 'should create dataset using SDK'() { - given: 'a TowerClient with mocked DatasetsApi' - def mockDatasetsApi = Mock(DatasetsApi) - def client = Spy(TowerClient) - client.@datasetsApi = mockDatasetsApi - client.@workspaceId = '123' - client.@datasetConfig = new DatasetConfig([enabled: true, createMode: 'auto']) - - and: 'a mock response' - def mockDataset = new Dataset() - mockDataset.setId('dataset-456') - mockDataset.setName('test-dataset') - def mockResponse = new CreateDatasetResponse() - mockResponse.setDataset(mockDataset) - - when: 'createDataset is called' - def result = client.createDataset('test-dataset', 'test description') - - then: 'DatasetsApi.createDataset should be called' - 1 * mockDatasetsApi.createDataset(123L, _) >> mockResponse - result == 'dataset-456' - } - - def 'should upload index file using SDK'() { - given: 'a TowerClient with mocked DatasetsApi' - def mockDatasetsApi = Mock(DatasetsApi) - def client = Spy(TowerClient) - client.@datasetsApi = mockDatasetsApi - client.@workspaceId = '123' - - and: 'a test index file' - def indexPath = Files.createTempFile('test', '.csv') - indexPath.text = "sample,file\ntest1,file1.fq\n" - - when: 'uploadIndexToDataset is called' - client.uploadIndexToDataset('dataset-456', indexPath, 'test_output') - - then: 'DatasetsApi.uploadDataset should be called' - 1 * mockDatasetsApi.uploadDataset(123L, 'dataset-456', Boolean.TRUE, _) - - cleanup: - indexPath?.toFile()?.delete() - } - - def 'should handle SDK ApiException gracefully'() { - given: 'a TowerClient with mocked DatasetsApi that throws' - def mockDatasetsApi = Mock(DatasetsApi) - def client = Spy(TowerClient) - client.@datasetsApi = mockDatasetsApi - client.@workspaceId = '123' - client.@datasetConfig = new DatasetConfig([enabled: true, createMode: 'auto']) - - when: 'createDataset is called and API throws' - def result = client.createDataset('test-dataset', 'test description') - - then: 'ApiException should be caught and null returned' - 1 * mockDatasetsApi.createDataset(123L, _) >> { throw new ApiException(404, 'Not found') } - result == null - noExceptionThrown() - } - @IgnoreIf({ !System.getenv('TOWER_ACCESS_TOKEN') }) def 'should upload to real Seqera Platform'() { given: 'a real TowerClient with datasets enabled' diff --git a/test-dataset-upload/README.md b/validation/dataset-upload/README.md similarity index 64% rename from test-dataset-upload/README.md rename to validation/dataset-upload/README.md index 169c92cedd..aad47721e3 100644 --- a/test-dataset-upload/README.md +++ b/validation/dataset-upload/README.md @@ -4,10 +4,16 @@ This directory contains a test workflow for validating the automatic dataset upl ## Prerequisites -- Nextflow installed +- Nextflow 25.10.0 or later (required for workflow output feature) - `TOWER_ACCESS_TOKEN` environment variable set - Access to Seqera Platform (https://cloud.seqera.io) +**Note**: This workflow uses the `output {}` block feature which requires Nextflow 25.10.0+. Enable the feature flag in your config: + +```groovy +nextflow.preview.output = true +``` + ## Running the Test ```bash @@ -24,34 +30,38 @@ nextflow run test-workflow.nf -name my-test-run ## What to Expect 1. **Workflow Execution**: - - Creates a CSV file with sample processing results - - Publishes the CSV as a workflow output named `analysis_results` + + - Creates a CSV file with sample processing results + - Publishes the CSV as a workflow output named `analysis_results` 2. **Dataset Upload**: - - On workflow completion, the plugin will: - - Create a new dataset named `{runName}-outputs` - - Upload the `results.csv` index file to the dataset + + - On workflow completion, the plugin will: + - Create a new dataset named `{runName}-outputs` + - Upload the `results.csv` index file to the dataset 3. **Log Messages**: Look for these messages in the console output: - ``` - [INFO] Creating new dataset: {runName}-outputs - [INFO] Created dataset '{name}' with ID: {id} - [INFO] Uploading index file for output 'analysis_results' to dataset {id}: ... - [INFO] Successfully uploaded index file for output 'analysis_results' to dataset {id} - ``` + + ``` + [INFO] Creating new dataset: {runName}-outputs + [INFO] Created dataset '{name}' with ID: {id} + [INFO] Uploading index file for output 'analysis_results' to dataset {id}: ... + [INFO] Successfully uploaded index file for output 'analysis_results' to dataset {id} + ``` 4. **Verify in Platform**: - - Go to https://cloud.seqera.io - - Navigate to the "Datasets" section - - Find the dataset named `{runName}-outputs` - - Verify the CSV file is uploaded with header row + - Go to https://cloud.seqera.io + - Navigate to the "Datasets" section + - Find the dataset named `{runName}-outputs` + - Verify the CSV file is uploaded with header row ## Configuration Options Edit `nextflow.config` to test different configurations: ### Disable for Specific Outputs + ```groovy tower.datasets { perOutput { @@ -63,6 +73,7 @@ tower.datasets { ``` ### Use Existing Dataset + ```groovy tower.datasets { perOutput { @@ -74,6 +85,7 @@ tower.datasets { ``` ### Disable Auto-Create + ```groovy tower.datasets { createMode = 'existing' // Only use existing datasets @@ -83,16 +95,19 @@ tower.datasets { ## Troubleshooting ### No Dataset Created + - Check that `TOWER_ACCESS_TOKEN` is set correctly - Verify `tower.datasets.enabled = true` in config - Look for error messages in the workflow log ### Upload Failed + - Check network connectivity to api.cloud.seqera.io - Verify workspace permissions - Check that the index file exists in `results/` directory ### Dataset Not Found in UI + - Refresh the Datasets page in the Platform UI - Check you're in the correct workspace - Search by the workflow run name diff --git a/validation/dataset-upload/nextflow.config b/validation/dataset-upload/nextflow.config new file mode 100644 index 0000000000..1f1cb1b358 --- /dev/null +++ b/validation/dataset-upload/nextflow.config @@ -0,0 +1,40 @@ +/* + * Nextflow configuration for dataset upload testing + */ + +// Enable workflow output feature (required for output blocks) +nextflow.preview.output = true + +// Enable Seqera Platform monitoring +tower { + enabled = true + accessToken = System.getenv('TOWER_ACCESS_TOKEN') + endpoint = 'https://api.cloud.seqera.io' + + // Configure automatic dataset upload + datasets { + enabled = true + createMode = 'auto' // Automatically create datasets + namePattern = '${workflow.runName}-outputs' + + // Optional: per-output configuration + // perOutput { + // 'analysis_results' { + // enabled = true + // // datasetId = 'existing-dataset-id' // Use existing dataset + // } + // } + } +} + +// Workflow metadata +manifest { + name = 'dataset-upload-test' + description = 'Test workflow for automatic dataset upload' + version = '1.0.0' +} + +// Process configuration +process { + executor = 'local' +} diff --git a/validation/dataset-upload/test-workflow.nf b/validation/dataset-upload/test-workflow.nf new file mode 100644 index 0000000000..22f4b3473e --- /dev/null +++ b/validation/dataset-upload/test-workflow.nf @@ -0,0 +1,44 @@ +#!/usr/bin/env nextflow + +/* + * Test workflow for dataset upload functionality + * + * This workflow creates a simple CSV output and tests + * the automatic upload to Seqera Platform datasets. + */ + +workflow { + // Create test sample data + Channel.of( + [id: 'sample1', fastq: 'sample1_R1.fq.gz', fastq2: 'sample1_R2.fq.gz'], + [id: 'sample2', fastq: 'sample2_R1.fq.gz', fastq2: 'sample2_R2.fq.gz'], + [id: 'sample3', fastq: 'sample3_R1.fq.gz', fastq2: 'sample3_R2.fq.gz'] + ) + | map { meta -> + // Simulate processing results + def result = "${meta.id},${meta.fastq},${meta.fastq2},result_${meta.id}.bam" + result + } + | collectFile( + name: 'results.csv', + newLine: true, + storeDir: 'results', + seed: 'sample_id,fastq_r1,fastq_r2,bam_file' + ) + | set { results_ch } + + // Publish channel for output block + publish: + analysis_results = results_ch +} + +// Define workflow outputs for dataset upload +output { + analysis_results { + path '.' + index { + path 'results.csv' + header true + } + } +}