Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion foundation/src/main/resources/public
Submodule public updated 1 files
+5 −0 chains.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,91 +30,141 @@ import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
import java.math.BigInteger
import java.nio.ByteBuffer
import java.time.Instant

import java.util.concurrent.ConcurrentHashMap

/**
* Solana chain-specific implementation using slotSubscribe for head detection.
*
* Uses lightweight slotSubscribe WebSocket subscription instead of expensive blockSubscribe:
* - ~50 bytes per notification vs ~1KB for blockSubscribe
* - Stable API (no special node flags required)
* - Universal provider support
* - Throttled getEpochInfo calls (every N slots) to get actual slot and block height
* - Synthetic hash based on slot for ForkChoice deduplication
*/
object SolanaChainSpecific : AbstractChainSpecific() {

private val log = LoggerFactory.getLogger(SolanaChainSpecific::class.java)

// Throttle: check actual block height every N slots
private const val HEIGHT_CHECK_INTERVAL = 5

// Cache per upstream for throttling
private val lastKnownHeights = ConcurrentHashMap<String, Long>()
private val lastCheckedSlots = ConcurrentHashMap<String, Long>()

override fun getLatestBlock(api: ChainReader, upstreamId: String): Mono<BlockContainer> {
return api.read(ChainRequest("getSlot", ListParams())).flatMap {
val slot = it.getResultAsProcessedString().toLong()
api.read(
ChainRequest(
"getBlocks",
ListParams(
slot - 10,
slot,
),
),
).flatMap {
val response = Global.objectMapper.readValue(it.getResult(), LongArray::class.java)
if (response == null || response.isEmpty()) {
Mono.empty()
} else {
api.read(
ChainRequest(
"getBlock",
ListParams(
response.max(),
mapOf(
"commitment" to "confirmed",
"showRewards" to false,
"transactionDetails" to "none",
"maxSupportedTransactionVersion" to 0,
),
),
),
).map {
val raw = it.getResult()
val block = Global.objectMapper.readValue(it.getResult(), SolanaBlock::class.java)
makeBlock(raw, block, upstreamId, response.max())
}.onErrorResume {
log.debug("error during getting last solana block - ${it.message}")
Mono.empty()
return api.read(ChainRequest("getEpochInfo", ListParams()))
.map { response ->
val epochInfo = Global.objectMapper.readValue(
response.getResult(),
SolanaEpochInfo::class.java,
)
lastKnownHeights[upstreamId] = epochInfo.blockHeight
lastCheckedSlots[upstreamId] = epochInfo.absoluteSlot
makeBlockFromSlot(epochInfo.absoluteSlot, epochInfo.absoluteSlot - 1, epochInfo.blockHeight, upstreamId, ByteArray(0))
}
.onErrorResume { error ->
log.debug("error during getting latest solana block - ${error.message}")
Mono.empty()
}
}

override fun getFromHeader(data: ByteArray, upstreamId: String, api: ChainReader): Mono<BlockContainer> {
return try {
val notification = Global.objectMapper.readValue(data, SolanaSlotNotification::class.java)
val slot = notification.slot

val lastChecked = lastCheckedSlots[upstreamId] ?: 0L
val lastHeight = lastKnownHeights[upstreamId]
val shouldCheckHeight = slot - lastChecked >= HEIGHT_CHECK_INTERVAL

// Optimistic height estimation: assume 1:1 slot-to-block ratio
val estimatedHeight = if (lastHeight != null && lastChecked > 0) {
lastHeight + (slot - lastChecked)
} else {
null
}

if (shouldCheckHeight || estimatedHeight == null) {
// Verify actual height using getEpochInfo (single call for both slot and height)
api.read(ChainRequest("getEpochInfo", ListParams()))
.map { response ->
val epochInfo = Global.objectMapper.readValue(
response.getResult(),
SolanaEpochInfo::class.java,
)
val actualSlot = epochInfo.absoluteSlot
val actualHeight = epochInfo.blockHeight

if (estimatedHeight != null && estimatedHeight != actualHeight) {
log.debug(
"Height drift detected for {}: estimated={}, actual={}, diff={}",
upstreamId,
estimatedHeight,
actualHeight,
estimatedHeight - actualHeight,
)
}

lastKnownHeights[upstreamId] = actualHeight
lastCheckedSlots[upstreamId] = actualSlot
makeBlockFromSlot(actualSlot, actualSlot - 1, actualHeight, upstreamId, data)
}
}
.onErrorResume { error ->
log.warn("Failed to get block height, using estimated value: ${error.message}")
val height = estimatedHeight ?: lastHeight ?: slot
Mono.just(makeBlockFromSlot(slot, notification.parent, height, upstreamId, data))
}
} else {
// Use optimistic estimated height
Mono.just(makeBlockFromSlot(slot, notification.parent, estimatedHeight, upstreamId, data))
}
} catch (e: Exception) {
log.error("Failed to parse slotSubscribe notification", e)
Mono.empty()
}
}

override fun getFromHeader(data: ByteArray, upstreamId: String, api: ChainReader): Mono<BlockContainer> {
val res = Global.objectMapper.readValue(data, SolanaWrapper::class.java)
return Mono.just(makeBlock(data, res.value.block, upstreamId, res.context.slot))
override fun listenNewHeadsRequest(): ChainRequest {
return ChainRequest("slotSubscribe", ListParams())
}

private fun makeBlock(raw: ByteArray, block: SolanaBlock, upstreamId: String, slot: Long): BlockContainer {
override fun unsubscribeNewHeadsRequest(subId: Any): ChainRequest {
return ChainRequest("slotUnsubscribe", ListParams(subId))
}

private fun makeBlockFromSlot(slot: Long, parentSlot: Long, height: Long, upstreamId: String, data: ByteArray): BlockContainer {
// Synthetic hash from slot for ForkChoice deduplication
val syntheticHash = BlockId.from(
ByteBuffer.allocate(32).putLong(slot).array(),
)
// Synthetic parent hash from parent slot for chain tracking
val syntheticParentHash = BlockId.from(
ByteBuffer.allocate(32).putLong(parentSlot).array(),
)

return BlockContainer(
height = block.height,
hash = BlockId.fromBase64(block.hash),
height = height,
hash = syntheticHash,
difficulty = BigInteger.ZERO,
timestamp = Instant.ofEpochMilli(block.timestamp),
timestamp = Instant.now(),
full = false,
json = raw,
parsed = block,
json = data,
parsed = null,
transactions = emptyList(),
upstreamId = upstreamId,
parentHash = BlockId.fromBase64(block.parent),
parentHash = syntheticParentHash,
slot = slot,
)
}

override fun listenNewHeadsRequest(): ChainRequest {
return ChainRequest(
"blockSubscribe",
ListParams(
"all",
mapOf(
"commitment" to "confirmed",
"showRewards" to false,
"transactionDetails" to "none",
),
),
)
}

override fun unsubscribeNewHeadsRequest(subId: Any): ChainRequest {
return ChainRequest("blockUnsubscribe", ListParams(subId))
// For testing only - clear height cache
internal fun clearCache() {
lastKnownHeights.clear()
lastCheckedSlots.clear()
}

override fun upstreamValidators(
Expand Down Expand Up @@ -165,26 +215,28 @@ object SolanaChainSpecific : AbstractChainSpecific() {
}
}

// slotSubscribe response format
@JsonIgnoreProperties(ignoreUnknown = true)
data class SolanaWrapper(
@param:JsonProperty("context") var context: SolanaContext,
@param:JsonProperty("value") var value: SolanaResult,
)

@JsonIgnoreProperties(ignoreUnknown = true)
data class SolanaContext(
@param:JsonProperty("slot") var slot: Long,
data class SolanaSlotNotification(
@param:JsonProperty("slot") val slot: Long,
@param:JsonProperty("parent") val parent: Long,
@param:JsonProperty("root") val root: Long,
)

// getEpochInfo response format
@JsonIgnoreProperties(ignoreUnknown = true)
data class SolanaResult(
@param:JsonProperty("block") var block: SolanaBlock,
data class SolanaEpochInfo(
@param:JsonProperty("absoluteSlot") val absoluteSlot: Long,
@param:JsonProperty("blockHeight") val blockHeight: Long,
@param:JsonProperty("epoch") val epoch: Long,
@param:JsonProperty("slotIndex") val slotIndex: Long,
@param:JsonProperty("slotsInEpoch") val slotsInEpoch: Long,
)

// getBlock response format (used by SolanaLowerBoundSlotDetector)
@JsonIgnoreProperties(ignoreUnknown = true)
data class SolanaBlock(
@param:JsonProperty("blockHeight") var height: Long,
@param:JsonProperty("blockTime") var timestamp: Long,
@param:JsonProperty("blockhash") var hash: String,
@param:JsonProperty("previousBlockhash") var parent: String,
@param:JsonProperty("blockHeight") val height: Long,
@param:JsonProperty("blockhash") val hash: String,
@param:JsonProperty("blockTime") val time: Long,
)
Loading
Loading