Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ object BftScanConnection {
protected val initialScanConnections: Seq[SingleScanConnection]
protected val initialFailedConnections: Map[Uri, Throwable]
protected val connectionBuilder: Uri => Future[SingleScanConnection]
protected val refreshScanUrlsCallback: Option[Seq[(String, String)] => Future[Unit]]
protected val getScans: BftScanConnection => Future[Seq[DsoScan]]
val scansRefreshInterval: NonNegativeFiniteDuration
val retryProvider: RetryProvider
Expand Down Expand Up @@ -944,11 +945,16 @@ object BftScanConnection {

filteredScans = filterScans(scansInDsoRules)

dsoScanSeq: Seq[(String, String)] = filteredScans.map(scan =>
(scan.svName, scan.publicUrl.toString)
)

_ = refreshScanUrlsCallback.map(f => f(dsoScanSeq))

newState <- computeNewState(retriedCurrentState, filteredScans)
} yield {
currentScanConnectionsRef.set(newState)
logger.info(s"Updated scan list to $newState")

val connections = newState.scanConnections
validateState(newState)
connections
Expand Down Expand Up @@ -1064,6 +1070,7 @@ object BftScanConnection {
override val initialScanConnections: Seq[SingleScanConnection],
override val initialFailedConnections: Map[Uri, Throwable],
override val connectionBuilder: Uri => Future[SingleScanConnection],
protected val refreshScanUrlsCallback: Option[Seq[(String, String)] => Future[Unit]],
override val getScans: BftScanConnection => Future[Seq[DsoScan]],
override val scansRefreshInterval: NonNegativeFiniteDuration,
override val retryProvider: RetryProvider,
Expand All @@ -1090,6 +1097,7 @@ object BftScanConnection {
override val initialScanConnections: Seq[SingleScanConnection],
override val initialFailedConnections: Map[Uri, Throwable],
override val connectionBuilder: Uri => Future[SingleScanConnection],
protected val refreshScanUrlsCallback: Option[Seq[(String, String)] => Future[Unit]],
override val getScans: BftScanConnection => Future[Seq[DsoScan]],
override val scansRefreshInterval: NonNegativeFiniteDuration,
override val retryProvider: RetryProvider,
Expand Down Expand Up @@ -1172,22 +1180,33 @@ object BftScanConnection {
}

private def bootstrapWithSeedNodes(
seedUrls: NonEmptyList[Uri],
seedUris: NonEmptyList[Uri],
amuletRulesCacheTimeToLive: NonNegativeFiniteDuration,
spliceLedgerClient: SpliceLedgerClient,
scansRefreshInterval: NonNegativeFiniteDuration,
clock: Clock,
retryProvider: RetryProvider,
loggerFactory: NamedLoggerFactory,
builder: (Uri, NonNegativeFiniteDuration) => Future[SingleScanConnection],
lastPersistedScanUrlList: Option[Seq[DsoScan]],
refreshScanUrlsCallback: Option[Seq[(String, String)] => Future[Unit]],
)(implicit
ec: ExecutionContextExecutor,
tc: TraceContext,
mat: Materializer,
): Future[BftScanConnection] = {
val logger = loggerFactory.getTracedLogger(getClass)

val bootstrapUris: NonEmptyList[Uri] = lastPersistedScanUrlList match {
case Some(scans) if scans.nonEmpty => {
val urls = NonEmptyList.fromList(scans.map(_.publicUrl).toList)
urls.getOrElse { seedUris }
}
case _ => seedUris
}

for {
initialSeedConnections <- seedUrls.traverse(uri =>
initialSeedConnections <- bootstrapUris.traverse(uri =>
builder(uri, amuletRulesCacheTimeToLive).transformWith {
case Success(conn) => Future.successful(Right(conn))
case Failure(err) => Future.successful(Left(uri -> err))
Expand All @@ -1202,7 +1221,7 @@ object BftScanConnection {
Future.failed(
Status.UNAVAILABLE
.withDescription(
s"Failed to connect to any seed URLs for bootstrapping: ${seedUrls.toList}"
s"Failed to connect to any seed URLs for bootstrapping: ${bootstrapUris.toList}"
)
.asRuntimeException()
)
Expand All @@ -1211,6 +1230,7 @@ object BftScanConnection {
successfulSeedConnections,
failedSeeds.toMap,
uri => builder(uri, amuletRulesCacheTimeToLive),
refreshScanUrlsCallback,
Bft.getScansInDsoRules,
scansRefreshInterval,
retryProvider,
Expand Down Expand Up @@ -1238,6 +1258,8 @@ object BftScanConnection {
clock: Clock,
retryProvider: RetryProvider,
loggerFactory: NamedLoggerFactory,
refreshScanUrlsCallback: Option[Seq[(String, String)] => Future[Unit]],
lastPersistedScanUrlList: Future[Option[List[(String, String)]]],
)(implicit
ec: ExecutionContextExecutor,
tc: TraceContext,
Expand All @@ -1249,6 +1271,11 @@ object BftScanConnection {
val builder = buildScanConnection(upgradesConfig, clock, retryProvider, loggerFactory)
val logger = loggerFactory.getTracedLogger(getClass)

val lastPersistedDsoScansFuture: Future[Option[Seq[DsoScan]]] = lastPersistedScanUrlList.map {
rs =>
{ rs.map(list => list.map { case (url, svName) => DsoScan(Uri(url), svName) }) }
}

config match {
case BftScanClientConfig.TrustSingle(url, ttl) =>
for {
Expand All @@ -1269,6 +1296,7 @@ object BftScanConnection {
// In the future, add a new threshold for how many trusted seed-urls should be there.

for {
lastPersistedDsoScans <- lastPersistedDsoScansFuture
tempBftConnection <- bootstrapWithSeedNodes(
ts.seedUrls,
ts.amuletRulesCacheTimeToLive,
Expand All @@ -1278,6 +1306,8 @@ object BftScanConnection {
retryProvider,
loggerFactory,
builder,
lastPersistedDsoScans,
refreshScanUrlsCallback,
)

// Use the temporary connection to get a consensus on the full list of scans
Expand Down Expand Up @@ -1318,6 +1348,7 @@ object BftScanConnection {
connections,
failed.toMap,
uri => builder(uri, ts.amuletRulesCacheTimeToLive),
refreshScanUrlsCallback,
Bft.getScansInDsoRules,
ts.scansRefreshInterval,
retryProvider,
Expand Down Expand Up @@ -1354,7 +1385,7 @@ object BftScanConnection {

case bft @ BftScanClientConfig.Bft(_, _, _) =>
for {

lastPersistedDsoScans <- lastPersistedDsoScansFuture
bftConnection <- bootstrapWithSeedNodes(
bft.seedUrls,
bft.amuletRulesCacheTimeToLive,
Expand All @@ -1364,6 +1395,8 @@ object BftScanConnection {
retryProvider,
loggerFactory,
builder,
lastPersistedDsoScans,
refreshScanUrlsCallback,
)
_ <- retryProvider.waitUntil(
RetryFor.WaitingOnInitDependency,
Expand Down Expand Up @@ -1437,6 +1470,7 @@ object BftScanConnection {
connections,
failed.toMap,
uri => builder(uri, amuletRulesCacheTimeToLive),
None,
_ => Bft.getPeerScansFromStore(store, svName),
scansRefreshInterval,
retryProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ object ScanAggregatesConnection {
clock,
retryProvider,
loggerFactory,
None,
Future.successful(None),
)
.map(bft => new ScanAggregatesConnection(bft, retryProvider, loggerFactory))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ class BftScanConnectionTest
initialConnections,
initialFailedConnections,
connectionBuilder,
None,
Bft.getScansInDsoRules,
NonNegativeFiniteDuration.ofSeconds(refreshSeconds),
retryProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ import org.lfdecentralizedtrust.splice.validator.config.{
import org.lfdecentralizedtrust.splice.validator.domain.DomainConnector
import org.lfdecentralizedtrust.splice.validator.metrics.ValidatorAppMetrics
import org.lfdecentralizedtrust.splice.validator.migration.DomainMigrationDump
import org.lfdecentralizedtrust.splice.validator.store.ValidatorStore
import org.lfdecentralizedtrust.splice.validator.store.{
ScanUrlInternalConfig,
ValidatorConfigProvider,
ValidatorInternalStore,
ValidatorStore,
}
import org.lfdecentralizedtrust.splice.validator.util.ValidatorUtil
import org.lfdecentralizedtrust.splice.wallet.{ExternalPartyWalletManager, UserWalletManager}
import org.lfdecentralizedtrust.splice.wallet.admin.http.{
Expand Down Expand Up @@ -104,7 +109,6 @@ import org.apache.pekko.http.scaladsl.server.Directives.*
import org.apache.pekko.http.scaladsl.server.directives.BasicDirectives
import com.google.protobuf.ByteString
import org.lfdecentralizedtrust.splice.store.AppStoreWithIngestion.SpliceLedgerConnectionPriority

import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}

Expand Down Expand Up @@ -194,7 +198,15 @@ class ValidatorApp(
override def preInitializeAfterLedgerConnection(
connection: BaseLedgerConnection,
ledgerClient: SpliceLedgerClient,
)(implicit traceContext: TraceContext): scala.concurrent.Future[Option[CantonTimestamp]] =
)(implicit traceContext: TraceContext): scala.concurrent.Future[Option[CantonTimestamp]] = {

val internalStore = new ValidatorConfigProvider(
ValidatorInternalStore(
storage,
loggerFactory,
)
)

for {
initialSynchronizerTime <-
withParticipantAdminConnection { participantAdminConnection =>
Expand All @@ -207,6 +219,8 @@ class ValidatorApp(
clock,
retryProvider,
loggerFactory,
Some(persistScanUrlListBuilder(internalStore)),
getPersistedScanList(internalStore),
)
}
domainConnector = new DomainConnector(
Expand Down Expand Up @@ -432,6 +446,7 @@ class ValidatorApp(
} yield initialSynchronizerTime
}
} yield initialSynchronizerTime
}

private def readRestoreDump: Option[DomainMigrationDump] = config.restoreFromMigrationDump.map {
path =>
Expand Down Expand Up @@ -683,6 +698,36 @@ class ValidatorApp(
)
}

private def persistScanUrlListBuilder(
validatorConfigProvider: ValidatorConfigProvider
)(implicit traceContext: TraceContext): Seq[(String, String)] => Future[Unit] = {

(connections: Seq[(String, String)]) =>
{
val internalConfigs: Seq[ScanUrlInternalConfig] = connections.map { case (url, svName) =>
ScanUrlInternalConfig(
svName = svName,
url = url,
)
}
validatorConfigProvider.setScanUrlInternalConfig(internalConfigs)
}
}

private def getPersistedScanList(
validatorConfigProvider: ValidatorConfigProvider
)(implicit traceContext: TraceContext): Future[Option[List[(String, String)]]] = {

val optionTConfig = validatorConfigProvider.getScanUrlInternalConfig()

optionTConfig.map { internalConfigs =>
internalConfigs.map { internalConfig =>
(internalConfig.url, internalConfig.svName)
}.toList
}.value

}

override def initialize(
ledgerClient: SpliceLedgerClient,
validatorParty: PartyId,
Expand All @@ -707,6 +752,14 @@ class ValidatorApp(
config.participantIdentitiesBackup.map(_ -> clock),
loggerFactory,
)

internalStore = new ValidatorConfigProvider(
ValidatorInternalStore(
storage,
loggerFactory,
)
)

scanConnection <- appInitStep("Get scan connection") {
client.BftScanConnection(
ledgerClient,
Expand All @@ -715,6 +768,8 @@ class ValidatorApp(
clock,
retryProvider,
loggerFactory,
Some(persistScanUrlListBuilder(internalStore)),
getPersistedScanList(internalStore),
)
}

Expand Down Expand Up @@ -1199,6 +1254,7 @@ class ValidatorApp(
domainTimeAutomationService,
domainParamsAutomationService,
store,
internalStore,
automation,
walletManagerOpt,
timeouts,
Expand All @@ -1220,6 +1276,7 @@ object ValidatorApp {
domainTimeAutomationService: DomainTimeAutomationService,
domainParamsAutomationService: DomainParamsAutomationService,
store: ValidatorStore,
internalStore: ValidatorConfigProvider,
automation: ValidatorAutomationService,
walletManager: Option[UserWalletManager],
timeouts: ProcessingTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,38 @@
package org.lfdecentralizedtrust.splice.validator.store

import cats.data.OptionT
import com.digitalasset.canton.lifecycle.CloseContext
import com.digitalasset.canton.logging.{ErrorLoggingContext, NamedLoggerFactory}
import com.digitalasset.canton.resource.{DbStorage, Storage}
import com.digitalasset.canton.tracing.TraceContext
import io.circe.{Decoder, Encoder}
import org.lfdecentralizedtrust.splice.validator.store.db.DbValidatorInternalStore

import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}

trait ValidatorInternalStore {
def setConfig[T: Encoder](key: String, value: T)(implicit tc: TraceContext): Future[Unit]

def getConfig[T: Decoder](key: String)(implicit tc: TraceContext): OptionT[Future, T]
}

object ValidatorInternalStore {

def apply(
storage: Storage,
loggerFactory: NamedLoggerFactory,
)(implicit
ec: ExecutionContext,
lc: ErrorLoggingContext,
cc: CloseContext,
): ValidatorInternalStore = {
storage match {
case storage: DbStorage =>
new DbValidatorInternalStore(
storage,
loggerFactory,
)
case storageType => throw new RuntimeException(s"Unsupported storage type $storageType")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import org.lfdecentralizedtrust.splice.store.db.AcsJdbcTypes

class DbValidatorInternalStore(
storage: DbStorage,
implicit val loggingContext: ErrorLoggingContext,
implicit val closeContext: CloseContext,
protected val loggerFactory: NamedLoggerFactory,
)(implicit val ec: ExecutionContext)
extends ValidatorInternalStore
val loggerFactory: NamedLoggerFactory,
)(implicit
val ec: ExecutionContext,
val loggingContext: ErrorLoggingContext,
val closeContext: CloseContext,
) extends ValidatorInternalStore
with AcsJdbcTypes
with NamedLogging {

Expand Down Expand Up @@ -53,12 +54,19 @@ class DbValidatorInternalStore(
WHERE config_key = $key
""".as[Json].headOption

val jsonOptionF: OptionT[FutureUnlessShutdown, Json] =
val jsonOptionF: OptionT[FutureUnlessShutdown, Json] = {
storage.querySingle(queryAction, "get-validator-internal-config")
}

val typedOptionT: OptionT[FutureUnlessShutdown, T] = jsonOptionF.subflatMap { json =>
json.as[T] match {
case Right(typedValue) => Some(typedValue)
case Right(typedValue) => {
logger.debug(
s"retrieved validator config from database with key '$key' and value '${json.noSpaces}'"
)
Some(typedValue)
}

case Left(error) => {
logger.error(
s"Failed to decode config key '$key' to expected type T: ${error.getMessage}"
Expand Down
Loading
Loading