Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
package io.seqera.tower.plugin

import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.concurrent.Executors

import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import com.google.common.util.concurrent.UncheckedExecutionException
import com.google.gson.JsonSyntaxException
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.seqera.http.HxClient
import io.seqera.http.HxConfig
import io.seqera.tower.plugin.exception.BadResponseException
import io.seqera.tower.plugin.exception.UnauthorizedException
import io.seqera.tower.plugin.exchange.GetLicenseTokenRequest
import io.seqera.tower.plugin.exchange.GetLicenseTokenResponse
import io.seqera.util.trace.TraceUtils
import nextflow.SysEnv
import nextflow.exception.AbortOperationException
import nextflow.exception.ReportWarningException
Expand All @@ -28,8 +22,6 @@ import nextflow.fusion.FusionToken
import nextflow.platform.PlatformHelper
import nextflow.plugin.Priority
import nextflow.serde.gson.GsonEncoder
import nextflow.util.RetryConfig
import nextflow.util.Threads
import org.pf4j.Extension
/**
* Environment provider for Platform-specific environment variables.
Expand All @@ -45,14 +37,8 @@ class TowerFusionToken implements FusionToken {
// The path relative to the Platform endpoint where license-scoped JWT tokens are obtained
private static final String LICENSE_TOKEN_PATH = 'license/token/'

// Server errors that should trigger a retry
private static final Set<Integer> SERVER_ERRORS = Set.of(408, 429, 500, 502, 503, 504)

// Default connection timeout for HTTP requests
private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.of(30, ChronoUnit.SECONDS)

// The HttpClient instance used to send requests
private HxClient httpClient
// The TowerClient instance used to send requests
private TowerClient client

// Time-to-live for cached tokens
private Duration tokenTTL = Duration.of(1, ChronoUnit.HOURS)
Expand Down Expand Up @@ -84,7 +70,7 @@ class TowerFusionToken implements FusionToken {
this.refreshToken = PlatformHelper.getRefreshToken(config, env)
this.workflowId = env.get('TOWER_WORKFLOW_ID')
this.workspaceId = PlatformHelper.getWorkspaceId(config, env)
this.httpClient = newDefaultHttpClient(accessToken, refreshToken)
this.client = TowerFactory.client()
}

protected void validateConfig() {
Expand Down Expand Up @@ -168,51 +154,6 @@ class TowerFusionToken implements FusionToken {
* Helper methods
*************************************************************************/

/**
* Create a new HttpClient instance with default settings
* @return The new HttpClient instance
*/
private HxClient newDefaultHttpClient(String accessToken, String refreshToken) {
final refreshUrl = refreshToken ? "$endpoint/oauth/access_token" : null
// the client config
final config = HxConfig.newBuilder()
.bearerToken(accessToken)
.refreshToken(refreshToken)
.refreshTokenUrl(refreshUrl)
.refreshCookiePolicy(CookiePolicy.ACCEPT_ALL)
.retryStatusCodes(SERVER_ERRORS)
.retryConfig(RetryConfig.config())
.build()
// the client builder
final builder = HxClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.connectTimeout(DEFAULT_CONNECTION_TIMEOUT)
.config(config)
// use virtual threads executor if enabled
if ( Threads.useVirtual() ) {
builder.executor(Executors.newVirtualThreadPerTaskExecutor())
}
// build and return the new client
return builder.build()
}

/**
* Create a {@link HttpRequest} representing a {@link GetLicenseTokenRequest} object
*
* @param req The LicenseTokenRequest object
* @return The resulting HttpRequest object
*/
private HttpRequest makeHttpRequest(GetLicenseTokenRequest req) {
final gson = new GsonEncoder<GetLicenseTokenRequest>() {}
final body = HttpRequest.BodyPublishers.ofString( gson.encode(req) )
return HttpRequest.newBuilder()
.uri(URI.create("${endpoint}/${LICENSE_TOKEN_PATH}").normalize())
.header('Content-Type', 'application/json')
.header('Traceparent', TraceUtils.rndTrace())
.POST(body)
.build()
}

/**
* Parse a JSON string into a {@link GetLicenseTokenResponse} object
*
Expand All @@ -233,28 +174,20 @@ class TowerFusionToken implements FusionToken {
* @return The LicenseTokenResponse object
*/
private GetLicenseTokenResponse sendRequest(GetLicenseTokenRequest request) {
final httpReq = makeHttpRequest(request)
try {
final resp = httpClient.sendAsString(httpReq)
if( log.isTraceEnabled() || resp.statusCode()!=200 )
log.debug "Fusion license request ${httpReq} ${request}; status=${resp.statusCode()}; body: ${resp.body()}"
else
log.debug "Fusion license request ${httpReq}; status=${resp.statusCode()}"
// check ok response
if( resp.statusCode() == 200 ) {
final ret = parseLicenseTokenResponse(resp.body())
return ret
}
// check for unauthorized error
if( resp.statusCode() == 401 ) {
throw new UnauthorizedException("Unauthorized [401] - Verify you have provided a Seqera Platform valid access token")
}
// unpexted error
throw new BadResponseException("Invalid response: ${httpReq.method()} ${httpReq.uri()} [${resp.statusCode()}] ${resp.body()}")
final url = "${client.getEndpoint()}/${LICENSE_TOKEN_PATH}"
final resp = client.sendHttpMessage(url, request.toMap())

if( resp.code == 200 ) {
final ret = parseLicenseTokenResponse(resp.message)
return ret
}
catch (IOException e) {
throw new IllegalStateException("Unable to send request to '${httpReq.uri()}' : ${e.message}")

if( resp.code == 401 ) {
throw new UnauthorizedException("Unauthorized [401] - Verify you have provided a Seqera Platform valid access token")
}

throw new BadResponseException("Invalid response: ${url} [${resp.code}] ${resp.message} -- ${resp.cause}")

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,16 @@ class GetLicenseTokenRequest {
* The Platform workspace ID associated with this request
*/
String workspaceId

/**
* @return a Map representation of the request
*/
Map<String, String> toMap() {
final map = new HashMap<String, String>()
map.product = this.product
map.version = this.version
map.workflowId = this.workflowId
map.workspaceId = this.workspaceId
return map
}
}
Loading
Loading