diff --git a/foundation/src/main/resources/public b/foundation/src/main/resources/public index de2e459ee..a93f66d7b 160000 --- a/foundation/src/main/resources/public +++ b/foundation/src/main/resources/public @@ -1 +1 @@ -Subproject commit de2e459ee1133fc0338b5ee2fb9aded6bf432cdd +Subproject commit a93f66d7b0f10c37a6b25ee2befa888d84349e94 diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt index d4a09f75e..722681676 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt @@ -30,91 +30,141 @@ import org.slf4j.LoggerFactory import reactor.core.publisher.Mono import reactor.core.scheduler.Scheduler import java.math.BigInteger +import java.nio.ByteBuffer import java.time.Instant - +import java.util.concurrent.ConcurrentHashMap + +/** + * Solana chain-specific implementation using slotSubscribe for head detection. + * + * Uses lightweight slotSubscribe WebSocket subscription instead of expensive blockSubscribe: + * - ~50 bytes per notification vs ~1KB for blockSubscribe + * - Stable API (no special node flags required) + * - Universal provider support + * - Throttled getEpochInfo calls (every N slots) to get actual slot and block height + * - Synthetic hash based on slot for ForkChoice deduplication + */ object SolanaChainSpecific : AbstractChainSpecific() { private val log = LoggerFactory.getLogger(SolanaChainSpecific::class.java) + // Throttle: check actual block height every N slots + private const val HEIGHT_CHECK_INTERVAL = 5 + + // Cache per upstream for throttling + private val lastKnownHeights = ConcurrentHashMap() + private val lastCheckedSlots = ConcurrentHashMap() + override fun getLatestBlock(api: ChainReader, upstreamId: String): Mono { - return api.read(ChainRequest("getSlot", ListParams())).flatMap { - val slot = it.getResultAsProcessedString().toLong() - api.read( - ChainRequest( - "getBlocks", - ListParams( - slot - 10, - slot, - ), - ), - ).flatMap { - val response = Global.objectMapper.readValue(it.getResult(), LongArray::class.java) - if (response == null || response.isEmpty()) { - Mono.empty() - } else { - api.read( - ChainRequest( - "getBlock", - ListParams( - response.max(), - mapOf( - "commitment" to "confirmed", - "showRewards" to false, - "transactionDetails" to "none", - "maxSupportedTransactionVersion" to 0, - ), - ), - ), - ).map { - val raw = it.getResult() - val block = Global.objectMapper.readValue(it.getResult(), SolanaBlock::class.java) - makeBlock(raw, block, upstreamId, response.max()) - }.onErrorResume { - log.debug("error during getting last solana block - ${it.message}") - Mono.empty() + return api.read(ChainRequest("getEpochInfo", ListParams())) + .map { response -> + val epochInfo = Global.objectMapper.readValue( + response.getResult(), + SolanaEpochInfo::class.java, + ) + lastKnownHeights[upstreamId] = epochInfo.blockHeight + lastCheckedSlots[upstreamId] = epochInfo.absoluteSlot + makeBlockFromSlot(epochInfo.absoluteSlot, epochInfo.absoluteSlot - 1, epochInfo.blockHeight, upstreamId, ByteArray(0)) + } + .onErrorResume { error -> + log.debug("error during getting latest solana block - ${error.message}") + Mono.empty() + } + } + + override fun getFromHeader(data: ByteArray, upstreamId: String, api: ChainReader): Mono { + return try { + val notification = Global.objectMapper.readValue(data, SolanaSlotNotification::class.java) + val slot = notification.slot + + val lastChecked = lastCheckedSlots[upstreamId] ?: 0L + val lastHeight = lastKnownHeights[upstreamId] + val shouldCheckHeight = slot - lastChecked >= HEIGHT_CHECK_INTERVAL + + // Optimistic height estimation: assume 1:1 slot-to-block ratio + val estimatedHeight = if (lastHeight != null && lastChecked > 0) { + lastHeight + (slot - lastChecked) + } else { + null + } + + if (shouldCheckHeight || estimatedHeight == null) { + // Verify actual height using getEpochInfo (single call for both slot and height) + api.read(ChainRequest("getEpochInfo", ListParams())) + .map { response -> + val epochInfo = Global.objectMapper.readValue( + response.getResult(), + SolanaEpochInfo::class.java, + ) + val actualSlot = epochInfo.absoluteSlot + val actualHeight = epochInfo.blockHeight + + if (estimatedHeight != null && estimatedHeight != actualHeight) { + log.debug( + "Height drift detected for {}: estimated={}, actual={}, diff={}", + upstreamId, + estimatedHeight, + actualHeight, + estimatedHeight - actualHeight, + ) + } + + lastKnownHeights[upstreamId] = actualHeight + lastCheckedSlots[upstreamId] = actualSlot + makeBlockFromSlot(actualSlot, actualSlot - 1, actualHeight, upstreamId, data) } - } + .onErrorResume { error -> + log.warn("Failed to get block height, using estimated value: ${error.message}") + val height = estimatedHeight ?: lastHeight ?: slot + Mono.just(makeBlockFromSlot(slot, notification.parent, height, upstreamId, data)) + } + } else { + // Use optimistic estimated height + Mono.just(makeBlockFromSlot(slot, notification.parent, estimatedHeight, upstreamId, data)) } + } catch (e: Exception) { + log.error("Failed to parse slotSubscribe notification", e) + Mono.empty() } } - override fun getFromHeader(data: ByteArray, upstreamId: String, api: ChainReader): Mono { - val res = Global.objectMapper.readValue(data, SolanaWrapper::class.java) - return Mono.just(makeBlock(data, res.value.block, upstreamId, res.context.slot)) + override fun listenNewHeadsRequest(): ChainRequest { + return ChainRequest("slotSubscribe", ListParams()) } - private fun makeBlock(raw: ByteArray, block: SolanaBlock, upstreamId: String, slot: Long): BlockContainer { + override fun unsubscribeNewHeadsRequest(subId: Any): ChainRequest { + return ChainRequest("slotUnsubscribe", ListParams(subId)) + } + + private fun makeBlockFromSlot(slot: Long, parentSlot: Long, height: Long, upstreamId: String, data: ByteArray): BlockContainer { + // Synthetic hash from slot for ForkChoice deduplication + val syntheticHash = BlockId.from( + ByteBuffer.allocate(32).putLong(slot).array(), + ) + // Synthetic parent hash from parent slot for chain tracking + val syntheticParentHash = BlockId.from( + ByteBuffer.allocate(32).putLong(parentSlot).array(), + ) + return BlockContainer( - height = block.height, - hash = BlockId.fromBase64(block.hash), + height = height, + hash = syntheticHash, difficulty = BigInteger.ZERO, - timestamp = Instant.ofEpochMilli(block.timestamp), + timestamp = Instant.now(), full = false, - json = raw, - parsed = block, + json = data, + parsed = null, transactions = emptyList(), upstreamId = upstreamId, - parentHash = BlockId.fromBase64(block.parent), + parentHash = syntheticParentHash, slot = slot, ) } - override fun listenNewHeadsRequest(): ChainRequest { - return ChainRequest( - "blockSubscribe", - ListParams( - "all", - mapOf( - "commitment" to "confirmed", - "showRewards" to false, - "transactionDetails" to "none", - ), - ), - ) - } - - override fun unsubscribeNewHeadsRequest(subId: Any): ChainRequest { - return ChainRequest("blockUnsubscribe", ListParams(subId)) + // For testing only - clear height cache + internal fun clearCache() { + lastKnownHeights.clear() + lastCheckedSlots.clear() } override fun upstreamValidators( @@ -165,26 +215,28 @@ object SolanaChainSpecific : AbstractChainSpecific() { } } +// slotSubscribe response format @JsonIgnoreProperties(ignoreUnknown = true) -data class SolanaWrapper( - @param:JsonProperty("context") var context: SolanaContext, - @param:JsonProperty("value") var value: SolanaResult, -) - -@JsonIgnoreProperties(ignoreUnknown = true) -data class SolanaContext( - @param:JsonProperty("slot") var slot: Long, +data class SolanaSlotNotification( + @param:JsonProperty("slot") val slot: Long, + @param:JsonProperty("parent") val parent: Long, + @param:JsonProperty("root") val root: Long, ) +// getEpochInfo response format @JsonIgnoreProperties(ignoreUnknown = true) -data class SolanaResult( - @param:JsonProperty("block") var block: SolanaBlock, +data class SolanaEpochInfo( + @param:JsonProperty("absoluteSlot") val absoluteSlot: Long, + @param:JsonProperty("blockHeight") val blockHeight: Long, + @param:JsonProperty("epoch") val epoch: Long, + @param:JsonProperty("slotIndex") val slotIndex: Long, + @param:JsonProperty("slotsInEpoch") val slotsInEpoch: Long, ) +// getBlock response format (used by SolanaLowerBoundSlotDetector) @JsonIgnoreProperties(ignoreUnknown = true) data class SolanaBlock( - @param:JsonProperty("blockHeight") var height: Long, - @param:JsonProperty("blockTime") var timestamp: Long, - @param:JsonProperty("blockhash") var hash: String, - @param:JsonProperty("previousBlockhash") var parent: String, + @param:JsonProperty("blockHeight") val height: Long, + @param:JsonProperty("blockhash") val hash: String, + @param:JsonProperty("blockTime") val time: Long, ) diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecificTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecificTest.kt index f2eafb137..9636affa8 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecificTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecificTest.kt @@ -1,39 +1,216 @@ package io.emeraldpay.dshackle.upstream.solana +import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.data.BlockId import io.emeraldpay.dshackle.reader.ChainReader -import org.assertj.core.api.Assertions +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.ChainResponse +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.mockito.kotlin.any import org.mockito.kotlin.mock +import org.mockito.kotlin.times +import org.mockito.kotlin.verify +import reactor.core.publisher.Mono +import java.nio.ByteBuffer +import java.time.Instant +import java.time.temporal.ChronoUnit -val example = """{ - "context": { - "slot": 112301554 - }, - "value": { - "slot": 112301554, - "block": { - "previousBlockhash": "GJp125YAN4ufCSUvZJVdCyWQJ7RPWMmwxoyUQySydZA", - "blockhash": "6ojMHjctdqfB55JDpEpqfHnP96fiaHEcvzEQ2NNcxzHP", - "parentSlot": 112301553, - "blockTime": 1639926816, - "blockHeight": 101210751 - }, - "err": null - } - } -""".trimIndent() class SolanaChainSpecificTest { + @BeforeEach + fun setup() { + // Clear cache before each test + SolanaChainSpecific.clearCache() + } + + private fun epochInfoResponse(absoluteSlot: Long, blockHeight: Long): ChainResponse { + val json = """{"absoluteSlot": $absoluteSlot, "blockHeight": $blockHeight, "epoch": 100, "slotIndex": 1000, "slotsInEpoch": 432000}""" + return ChainResponse(json.toByteArray(), null) + } + + @Test + fun `parseBlock from slotSubscribe notification`() { + val reader = mock { + on { read(any()) }.thenReturn( + Mono.just(epochInfoResponse(112301554, 101210751)), + ) + } + + val beforeCall = Instant.now() + val json = """{"slot": 112301554, "parent": 112301553, "root": 112301500}""" + val result = SolanaChainSpecific.getFromHeader(json.toByteArray(), "upstream-1", reader).block()!! + val afterCall = Instant.now() + + // Uses actual data from getEpochInfo + assertThat(result.slot).isEqualTo(112301554) + assertThat(result.height).isEqualTo(101210751) + assertThat(result.upstreamId).isEqualTo("upstream-1") + + // Synthetic hash based on actualSlot from getEpochInfo + val expectedHash = BlockId.from(ByteBuffer.allocate(32).putLong(112301554).array()) + assertThat(result.hash).isEqualTo(expectedHash) + + // Synthetic parent hash based on actualSlot - 1 + val expectedParentHash = BlockId.from(ByteBuffer.allocate(32).putLong(112301553).array()) + assertThat(result.parentHash).isEqualTo(expectedParentHash) + + // Timestamp is synthetic (Instant.now() at call time) + assertThat(result.timestamp).isBetween(beforeCall, afterCall.plus(1, ChronoUnit.SECONDS)) + } + + @Test + fun `listenNewHeadsRequest returns slotSubscribe`() { + val request = SolanaChainSpecific.listenNewHeadsRequest() + + assertThat(request.method).isEqualTo("slotSubscribe") + } + + @Test + fun `unsubscribeNewHeadsRequest returns slotUnsubscribe`() { + val request = SolanaChainSpecific.unsubscribeNewHeadsRequest("sub-123") + + assertThat(request.method).isEqualTo("slotUnsubscribe") + } + + @Test + fun `throttle HTTP calls every 5 slots`() { + val reader = mock { + on { read(any()) }.thenReturn( + Mono.just(epochInfoResponse(100, 100000000)), + ) + } + + // First slot - should trigger HTTP call (no cache) + val slot1 = """{"slot": 100, "parent": 99, "root": 50}""" + SolanaChainSpecific.getFromHeader(slot1.toByteArray(), "upstream-1", reader).block() + + // Next 4 slots - should use cached height (within interval of 5) + for (i in 101..104) { + val slotN = """{"slot": $i, "parent": ${i - 1}, "root": 50}""" + SolanaChainSpecific.getFromHeader(slotN.toByteArray(), "upstream-1", reader).block() + } + + // Only 1 HTTP call so far + verify(reader, times(1)).read(any()) + + // Slot 105 - should trigger new HTTP call (interval reached) + val slot105 = """{"slot": 105, "parent": 104, "root": 50}""" + SolanaChainSpecific.getFromHeader(slot105.toByteArray(), "upstream-1", reader).block() + + // Now 2 HTTP calls + verify(reader, times(2)).read(any()) + } + + @Test + fun `synthetic hash is deterministic based on slot`() { + val slot = 12345L + val hash1 = BlockId.from(ByteBuffer.allocate(32).putLong(slot).array()) + val hash2 = BlockId.from(ByteBuffer.allocate(32).putLong(slot).array()) + + assertThat(hash1).isEqualTo(hash2) + + val differentSlot = 12346L + val hash3 = BlockId.from(ByteBuffer.allocate(32).putLong(differentSlot).array()) + + assertThat(hash1).isNotEqualTo(hash3) + } + @Test - fun parseBlock() { - val reader = mock {} + fun `SolanaSlotNotification parses correctly`() { + val json = """{"slot": 123456, "parent": 123455, "root": 123400}""" + val notification = Global.objectMapper.readValue(json, SolanaSlotNotification::class.java) + + assertThat(notification.slot).isEqualTo(123456) + assertThat(notification.parent).isEqualTo(123455) + assertThat(notification.root).isEqualTo(123400) + } + + @Test + fun `uses slot as height when cache is empty and no HTTP call`() { + val reader = mock { + on { read(any()) }.thenReturn(Mono.error(RuntimeException("Network error"))) + } + + // First slot with HTTP error - should fallback to slot as height + val json = """{"slot": 112301554, "parent": 112301553, "root": 112301500}""" + val result = SolanaChainSpecific.getFromHeader(json.toByteArray(), "upstream-1", reader).block()!! + + assertThat(result.slot).isEqualTo(112301554) + // When cache empty and HTTP fails, uses slot as height + assertThat(result.height).isEqualTo(112301554) + } + + @Test + fun `uses optimistic estimated height between throttle intervals`() { + val reader = mock { + on { read(any()) }.thenReturn( + Mono.just(epochInfoResponse(100, 100000000)), + ) + } + + // First call sets cache at slot 100 with height 100000000 + val slot1 = """{"slot": 100, "parent": 99, "root": 50}""" + val result1 = SolanaChainSpecific.getFromHeader(slot1.toByteArray(), "upstream-1", reader).block()!! + assertThat(result1.height).isEqualTo(100000000) + + // Second call uses optimistic estimated height: 100000000 + (101 - 100) = 100000001 + val slot2 = """{"slot": 101, "parent": 100, "root": 50}""" + val result2 = SolanaChainSpecific.getFromHeader(slot2.toByteArray(), "upstream-1", reader).block()!! + + assertThat(result2.slot).isEqualTo(101) + assertThat(result2.height).isEqualTo(100000001) // optimistic estimated height + + // Third call: 100000000 + (103 - 100) = 100000003 + val slot3 = """{"slot": 103, "parent": 102, "root": 50}""" + val result3 = SolanaChainSpecific.getFromHeader(slot3.toByteArray(), "upstream-1", reader).block()!! + + assertThat(result3.slot).isEqualTo(103) + assertThat(result3.height).isEqualTo(100000003) // optimistic estimated height + } + + @Test + fun `height estimation resets after RPC check`() { + val reader = mock { + on { read(any()) } + .thenReturn(Mono.just(epochInfoResponse(100, 100000000))) + .thenReturn(Mono.just(epochInfoResponse(105, 100000004))) + } + + // First call at slot 100 - sets cache with height 100000000 + val slot1 = """{"slot": 100, "parent": 99, "root": 50}""" + SolanaChainSpecific.getFromHeader(slot1.toByteArray(), "upstream-1", reader).block() + + // Slot 105 triggers RPC check - gets actual slot 105 and height 100000004 (1 slot was skipped) + val slot105 = """{"slot": 105, "parent": 104, "root": 50}""" + val result105 = SolanaChainSpecific.getFromHeader(slot105.toByteArray(), "upstream-1", reader).block()!! + assertThat(result105.slot).isEqualTo(105) + assertThat(result105.height).isEqualTo(100000004) + + // Slot 107 uses new baseline: 100000004 + (107 - 105) = 100000006 + val slot107 = """{"slot": 107, "parent": 106, "root": 50}""" + val result107 = SolanaChainSpecific.getFromHeader(slot107.toByteArray(), "upstream-1", reader).block()!! + assertThat(result107.height).isEqualTo(100000006) + } + + @Test + fun `uses estimated height on RPC error when estimation available`() { + val reader = mock { + on { read(any()) } + .thenReturn(Mono.just(epochInfoResponse(100, 100000000))) + .thenReturn(Mono.error(RuntimeException("Network error"))) + } + + // First call succeeds at slot 100 + val slot1 = """{"slot": 100, "parent": 99, "root": 50}""" + SolanaChainSpecific.getFromHeader(slot1.toByteArray(), "upstream-1", reader).block() - val result = SolanaChainSpecific.getFromHeader(example.toByteArray(), "1", reader).block()!! + // Slot 105 triggers RPC check but fails - should use estimated height: 100000000 + (105 - 100) = 100000005 + val slot105 = """{"slot": 105, "parent": 104, "root": 50}""" + val result = SolanaChainSpecific.getFromHeader(slot105.toByteArray(), "upstream-1", reader).block()!! - Assertions.assertThat(result.height).isEqualTo(101210751) - Assertions.assertThat(result.hash).isEqualTo(BlockId.fromBase64("6ojMHjctdqfB55JDpEpqfHnP96fiaHEcvzEQ2NNcxzHP")) - Assertions.assertThat(result.upstreamId).isEqualTo("1") - Assertions.assertThat(result.parentHash).isEqualTo(BlockId.fromBase64("GJp125YAN4ufCSUvZJVdCyWQJ7RPWMmwxoyUQySydZA")) + assertThat(result.slot).isEqualTo(105) + assertThat(result.height).isEqualTo(100000005) // estimated height used as fallback } }