From 9d80e97faf2d3e861cc9c36a291334e9fff5d5b3 Mon Sep 17 00:00:00 2001 From: jorgee Date: Tue, 28 Oct 2025 17:35:23 +0100 Subject: [PATCH 1/2] Reuse TowerClient http client [e2e prod] Signed-off-by: jorgee --- .../tower/plugin/TowerFusionToken.groovy | 97 ++----------- .../exchange/GetLicenseTokenRequest.groovy | 12 ++ .../tower/plugin/TowerFusionEnvTest.groovy | 135 ++++++++++++++---- 3 files changed, 135 insertions(+), 109 deletions(-) diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionToken.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionToken.groovy index dd93200883..7154cb3664 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionToken.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionToken.groovy @@ -1,11 +1,8 @@ package io.seqera.tower.plugin -import java.net.http.HttpClient -import java.net.http.HttpRequest import java.time.Duration import java.time.Instant import java.time.temporal.ChronoUnit -import java.util.concurrent.Executors import com.google.common.cache.Cache import com.google.common.cache.CacheBuilder @@ -13,13 +10,10 @@ import com.google.common.util.concurrent.UncheckedExecutionException import com.google.gson.JsonSyntaxException import groovy.transform.CompileStatic import groovy.util.logging.Slf4j -import io.seqera.http.HxClient -import io.seqera.http.HxConfig import io.seqera.tower.plugin.exception.BadResponseException import io.seqera.tower.plugin.exception.UnauthorizedException import io.seqera.tower.plugin.exchange.GetLicenseTokenRequest import io.seqera.tower.plugin.exchange.GetLicenseTokenResponse -import io.seqera.util.trace.TraceUtils import nextflow.SysEnv import nextflow.exception.AbortOperationException import nextflow.exception.ReportWarningException @@ -28,8 +22,6 @@ import nextflow.fusion.FusionToken import nextflow.platform.PlatformHelper import nextflow.plugin.Priority import nextflow.serde.gson.GsonEncoder -import nextflow.util.RetryConfig -import nextflow.util.Threads import org.pf4j.Extension /** * Environment provider for Platform-specific environment variables. @@ -45,14 +37,8 @@ class TowerFusionToken implements FusionToken { // The path relative to the Platform endpoint where license-scoped JWT tokens are obtained private static final String LICENSE_TOKEN_PATH = 'license/token/' - // Server errors that should trigger a retry - private static final Set SERVER_ERRORS = Set.of(408, 429, 500, 502, 503, 504) - - // Default connection timeout for HTTP requests - private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.of(30, ChronoUnit.SECONDS) - - // The HttpClient instance used to send requests - private HxClient httpClient + // The TowerClient instance used to send requests + private TowerClient client // Time-to-live for cached tokens private Duration tokenTTL = Duration.of(1, ChronoUnit.HOURS) @@ -84,7 +70,7 @@ class TowerFusionToken implements FusionToken { this.refreshToken = PlatformHelper.getRefreshToken(config, env) this.workflowId = env.get('TOWER_WORKFLOW_ID') this.workspaceId = PlatformHelper.getWorkspaceId(config, env) - this.httpClient = newDefaultHttpClient(accessToken, refreshToken) + this.client = TowerFactory.client() } protected void validateConfig() { @@ -168,51 +154,6 @@ class TowerFusionToken implements FusionToken { * Helper methods *************************************************************************/ - /** - * Create a new HttpClient instance with default settings - * @return The new HttpClient instance - */ - private HxClient newDefaultHttpClient(String accessToken, String refreshToken) { - final refreshUrl = refreshToken ? "$endpoint/oauth/access_token" : null - // the client config - final config = HxConfig.newBuilder() - .bearerToken(accessToken) - .refreshToken(refreshToken) - .refreshTokenUrl(refreshUrl) - .refreshCookiePolicy(CookiePolicy.ACCEPT_ALL) - .retryStatusCodes(SERVER_ERRORS) - .retryConfig(RetryConfig.config()) - .build() - // the client builder - final builder = HxClient.newBuilder() - .version(HttpClient.Version.HTTP_1_1) - .connectTimeout(DEFAULT_CONNECTION_TIMEOUT) - .config(config) - // use virtual threads executor if enabled - if ( Threads.useVirtual() ) { - builder.executor(Executors.newVirtualThreadPerTaskExecutor()) - } - // build and return the new client - return builder.build() - } - - /** - * Create a {@link HttpRequest} representing a {@link GetLicenseTokenRequest} object - * - * @param req The LicenseTokenRequest object - * @return The resulting HttpRequest object - */ - private HttpRequest makeHttpRequest(GetLicenseTokenRequest req) { - final gson = new GsonEncoder() {} - final body = HttpRequest.BodyPublishers.ofString( gson.encode(req) ) - return HttpRequest.newBuilder() - .uri(URI.create("${endpoint}/${LICENSE_TOKEN_PATH}").normalize()) - .header('Content-Type', 'application/json') - .header('Traceparent', TraceUtils.rndTrace()) - .POST(body) - .build() - } - /** * Parse a JSON string into a {@link GetLicenseTokenResponse} object * @@ -233,28 +174,20 @@ class TowerFusionToken implements FusionToken { * @return The LicenseTokenResponse object */ private GetLicenseTokenResponse sendRequest(GetLicenseTokenRequest request) { - final httpReq = makeHttpRequest(request) - try { - final resp = httpClient.sendAsString(httpReq) - if( log.isTraceEnabled() || resp.statusCode()!=200 ) - log.debug "Fusion license request ${httpReq} ${request}; status=${resp.statusCode()}; body: ${resp.body()}" - else - log.debug "Fusion license request ${httpReq}; status=${resp.statusCode()}" - // check ok response - if( resp.statusCode() == 200 ) { - final ret = parseLicenseTokenResponse(resp.body()) - return ret - } - // check for unauthorized error - if( resp.statusCode() == 401 ) { - throw new UnauthorizedException("Unauthorized [401] - Verify you have provided a Seqera Platform valid access token") - } - // unpexted error - throw new BadResponseException("Invalid response: ${httpReq.method()} ${httpReq.uri()} [${resp.statusCode()}] ${resp.body()}") + final url = "${client.getEndpoint()}/${LICENSE_TOKEN_PATH}" + final resp = client.sendHttpMessage(url, request.toMap()) + + if( resp.code == 200 ) { + final ret = parseLicenseTokenResponse(resp.message) + return ret } - catch (IOException e) { - throw new IllegalStateException("Unable to send request to '${httpReq.uri()}' : ${e.message}") + + if( resp.code == 401 ) { + throw new UnauthorizedException("Unauthorized [401] - Verify you have provided a Seqera Platform valid access token") } + + throw new BadResponseException("Invalid response: ${url} [${resp.code}] ${resp.message} -- ${resp.cause}") + } } diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/exchange/GetLicenseTokenRequest.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/exchange/GetLicenseTokenRequest.groovy index afdcb5b614..27668a6c6b 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/exchange/GetLicenseTokenRequest.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/exchange/GetLicenseTokenRequest.groovy @@ -29,4 +29,16 @@ class GetLicenseTokenRequest { * The Platform workspace ID associated with this request */ String workspaceId + + /** + * @return a Map representation of the request + */ + Map toMap() { + final map = new HashMap() + map.product = this.product + map.version = this.version + map.workflowId = this.workflowId + map.workspaceId = this.workspaceId + return map + } } diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy index 1f338647b4..37fc483fa8 100644 --- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy @@ -13,6 +13,7 @@ import io.seqera.tower.plugin.exception.UnauthorizedException import nextflow.Global import nextflow.Session import nextflow.SysEnv +import nextflow.script.WorkflowMetadata import nextflow.serde.gson.InstantAdapter import spock.lang.Shared import spock.lang.Specification @@ -262,23 +263,43 @@ class TowerFusionEnvTest extends Specification { def 'should get a license token with config'() { given: def config = [ + enabled : true, endpoint : wireMockServer.baseUrl(), - accessToken: 'abc123', + accessToken: 'eyJ0aWQiOiAxMTkxN30uNWQ5MGFmYWU2YjhhNmFmY2FlNjVkMTQ4ZDFhM2ZlNzlmMmNjN2I4Mw==', workspaceId: '67890' ] + def session = Mock(Session) + def meta = new WorkflowMetadata( + session: session, + projectName: 'the-project-name', + repository: 'git://repo.com/foo') + session.getConfig() >> [ tower: config ] + session.getUniqueId() >> UUID.randomUUID() + session.getWorkflowMetadata() >> meta def PRODUCT = 'some-product' def VERSION = 'some-version' and: - Global.session = Mock(Session) {getConfig() >> [ tower: config ] } - and: + Global.session = session def provider = new TowerFusionToken() + and: 'a mock endpoint at flow create' + wireMockServer.stubFor( + WireMock.post(urlEqualTo("/trace/create?workspaceId=${config.workspaceId}")) + .willReturn( + WireMock.aResponse() + .withStatus(200) + .withBody('{"message": "", "workflowId": "1234"}') + ) + ) + and: + def client = TowerFactory.client(session, SysEnv.get()) + client.onFlowCreate(session) and: 'a mock endpoint returning a valid token' final now = Instant.now() final expirationDate = toJson(now.plus(1, ChronoUnit.DAYS)) wireMockServer.stubFor( WireMock.post(urlEqualTo("/license/token/")) - .withHeader('Authorization', equalTo('Bearer abc123')) + .withHeader('Authorization', equalTo("Bearer ${config.accessToken}")) .withRequestBody(matchingJsonPath('$.product', equalTo("some-product"))) .withRequestBody(matchingJsonPath('$.version', equalTo("some-version"))) .withRequestBody(matchingJsonPath('$.workspaceId', equalTo("67890"))) @@ -298,33 +319,54 @@ class TowerFusionEnvTest extends Specification { and: 'the request is correct' wireMockServer.verify(1, WireMock.postRequestedFor(WireMock.urlEqualTo("/license/token/")) - .withHeader('Authorization', WireMock.equalTo('Bearer abc123'))) + .withHeader('Authorization', WireMock.equalTo("Bearer ${config.accessToken}"))) } def 'should get a license token with environment'() { given: + def accessToken = 'eyJ0aWQiOiAxMTkxN30uNWQ5MGFmYWU2YjhhNmFmY2FlNjVkMTQ4ZDFhM2ZlNzlmMmNjN2I4Mw==' + def workspaceId = '67890' SysEnv.push([ TOWER_WORKFLOW_ID: '12345', - TOWER_ACCESS_TOKEN: 'abc123', - TOWER_WORKSPACE_ID: '67890', + TOWER_ACCESS_TOKEN: accessToken, + TOWER_WORKSPACE_ID: workspaceId, TOWER_API_ENDPOINT: wireMockServer.baseUrl() ]) + def session = Mock(Session) + def meta = new WorkflowMetadata( + session: session, + projectName: 'the-project-name', + repository: 'git://repo.com/foo') + session.getConfig() >> [:] + session.getUniqueId() >> UUID.randomUUID() + session.getWorkflowMetadata() >> meta def PRODUCT = 'some-product' def VERSION = 'some-version' and: - Global.session = Mock(Session) {getConfig() >> [:] } - and: + Global.session = session def provider = new TowerFusionToken() + and: 'a mock endpoint at flow create' + wireMockServer.stubFor( + WireMock.post(urlEqualTo("/trace/create?workspaceId=${workspaceId}")) + .willReturn( + WireMock.aResponse() + .withStatus(200) + .withBody('{"message": "", "workflowId": "1234"}') + ) + ) + and: + def client = TowerFactory.client(session, SysEnv.get()) + client.onFlowCreate(session) and: 'a mock endpoint returning a valid token' final now = Instant.now() final expirationDate = toJson(now.plus(1, ChronoUnit.DAYS)) wireMockServer.stubFor( WireMock.post(urlEqualTo("/license/token/")) - .withHeader('Authorization', equalTo('Bearer abc123')) + .withHeader('Authorization', equalTo("Bearer $accessToken")) .withRequestBody(matchingJsonPath('$.product', equalTo("some-product"))) .withRequestBody(matchingJsonPath('$.version', equalTo("some-version"))) - .withRequestBody(matchingJsonPath('$.workspaceId', equalTo("67890"))) + .withRequestBody(matchingJsonPath('$.workspaceId', equalTo("${workspaceId}"))) .willReturn( WireMock.aResponse() .withStatus(200) @@ -341,7 +383,7 @@ class TowerFusionEnvTest extends Specification { and: 'the request is correct' wireMockServer.verify(1, WireMock.postRequestedFor(WireMock.urlEqualTo("/license/token/")) - .withHeader('Authorization', WireMock.equalTo('Bearer abc123'))) + .withHeader('Authorization', WireMock.equalTo("Bearer ${accessToken}"))) cleanup: SysEnv.pop() @@ -349,19 +391,40 @@ class TowerFusionEnvTest extends Specification { def 'should refresh the auth token on 401 and retry the request'() { given: + def accessToken = 'eyJ0aWQiOiAxMTkxN30uNWQ5MGFmYWU2YjhhNmFmY2FlNjVkMTQ4ZDFhM2ZlNzlmMmNjN2I4Mw==' + def workspaceId = '67890' SysEnv.push([ TOWER_WORKFLOW_ID: '12345', - TOWER_ACCESS_TOKEN: 'abc-token', + TOWER_ACCESS_TOKEN: accessToken, TOWER_REFRESH_TOKEN: 'xyz-refresh', - TOWER_WORKSPACE_ID: '67890', + TOWER_WORKSPACE_ID: workspaceId, TOWER_API_ENDPOINT: wireMockServer.baseUrl() ]) + def session = Mock(Session) + def meta = new WorkflowMetadata( + session: session, + projectName: 'the-project-name', + repository: 'git://repo.com/foo') + session.getConfig() >> [:] + session.getUniqueId() >> UUID.randomUUID() + session.getWorkflowMetadata() >> meta def PRODUCT = 'some-product' def VERSION = 'some-version' and: - Global.session = Mock(Session) { getConfig() >> [:] } - and: + Global.session = session def provider = new TowerFusionToken() + and: 'a mock endpoint at flow create' + wireMockServer.stubFor( + WireMock.post(urlEqualTo("/trace/create?workspaceId=${workspaceId}")) + .willReturn( + WireMock.aResponse() + .withStatus(200) + .withBody('{"message": "", "workflowId": "1234"}') + ) + ) + and: + def client = TowerFactory.client(session, SysEnv.get()) + client.onFlowCreate(session) and: 'prepare stubs' @@ -371,7 +434,7 @@ class TowerFusionEnvTest extends Specification { // 1️⃣ First attempt: /license/token/ fails with 401 wireMockServer.stubFor( WireMock.post(urlEqualTo("/license/token/")) - .withHeader('Authorization', equalTo('Bearer abc-token')) + .withHeader('Authorization', equalTo("Bearer $accessToken")) .inScenario("Refresh flow") .whenScenarioStateIs(Scenario.STARTED) .willReturn(WireMock.aResponse().withStatus(401)) @@ -425,21 +488,39 @@ class TowerFusionEnvTest extends Specification { def 'should throw UnauthorizedException if getting a token fails with 401'() { given: 'a TowerFusionEnv provider' - Global.session = Mock(Session) { - config >> [ - tower: [ - endpoint : wireMockServer.baseUrl(), - accessToken: 'abc123' - ] - ] - } + def config = [ + enabled : true, + endpoint : wireMockServer.baseUrl(), + accessToken: 'eyJ0aWQiOiAxMTkxN30uNWQ5MGFmYWU2YjhhNmFmY2FlNjVkMTQ4ZDFhM2ZlNzlmMmNjN2I4Mw==', + workspaceId: '67890' + ] + def session = Mock(Session) + def meta = new WorkflowMetadata( + session: session, + projectName: 'the-project-name', + repository: 'git://repo.com/foo') + session.getConfig() >> [ tower: config ] + session.getUniqueId() >> UUID.randomUUID() + session.getWorkflowMetadata() >> meta and: + Global.session = session def provider = new TowerFusionToken() - + and: 'a mock endpoint at flow create' + wireMockServer.stubFor( + WireMock.post(urlEqualTo("/trace/create?workspaceId=${config.workspaceId}")) + .willReturn( + WireMock.aResponse() + .withStatus(200) + .withBody('{"message": "", "workflowId": "1234"}') + ) + ) + and: + def client = TowerFactory.client(session, SysEnv.get()) + client.onFlowCreate(session) and: 'a mock endpoint returning an error' wireMockServer.stubFor( WireMock.post(WireMock.urlEqualTo("/license/token/")) - .withHeader('Authorization', WireMock.equalTo('Bearer abc123')) + .withHeader('Authorization', WireMock.equalTo("Bearer ${config.accessToken}")) .willReturn( WireMock.aResponse() .withStatus(401) From fa81a2a49e354f6371faa9baa380384d7085cdc4 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Tue, 4 Nov 2025 11:29:45 +0100 Subject: [PATCH 2/2] [e2e prod] test Signed-off-by: Paolo Di Tommaso