diff --git a/application/cex_processor.go b/application/cex_processor.go new file mode 100644 index 0000000..133eff5 --- /dev/null +++ b/application/cex_processor.go @@ -0,0 +1,67 @@ +package application + +import ( + "context" + + "github.com/0xAtelerix/sdk/gosdk" + "github.com/0xAtelerix/sdk/gosdk/apptypes" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/rs/zerolog/log" +) + +// CEXProcessor implements gosdk.CEXStreamProcessor for the application layer. +// It reads full order book data on demand via the CEXDataAccessor. +type CEXProcessor struct { + accessor gosdk.CEXDataAccessor +} + +// NewCEXProcessor creates a new CEX stream processor. +// Returns nil if accessor is nil (CEX data not available). +func NewCEXProcessor(accessor gosdk.CEXDataAccessor) *CEXProcessor { + if accessor == nil { + return nil + } + + return &CEXProcessor{accessor: accessor} +} + +// ProcessCEXStream reads full order book data for each ref and processes it. +// +//nolint:unparam // v1 stub: will return real data when KV storage is added +func (p *CEXProcessor) ProcessCEXStream( + ctx context.Context, + refs []apptypes.CEXOrderBookRef, + _ kv.RwTx, +) ([]apptypes.ExternalTransaction, error) { + for _, ref := range refs { + ob, err := p.accessor.ReadCEXOrderBook(ctx, ref.Exchange, ref.Symbol, ref.FetchedAt) + if err != nil { + log.Error().Err(err). + Str("exchange", ref.Exchange). + Str("symbol", ref.Symbol). + Int64("fetchedAt", ref.FetchedAt). + Msg("failed to read CEX order book") + + continue + } + + logEvt := log.Info(). + Str("exchange", ob.Exchange). + Str("symbol", ob.Symbol). + Int("bids", len(ob.Bids)). + Int("asks", len(ob.Asks)). + Int64("fetchedAt", ob.FetchedAt) + + if len(ob.Bids) > 0 { + logEvt = logEvt.Str("topBid", ob.Bids[0].Price).Str("topBidQty", ob.Bids[0].Quantity) + } + + if len(ob.Asks) > 0 { + logEvt = logEvt.Str("topAsk", ob.Asks[0].Price).Str("topAskQty", ob.Asks[0].Quantity) + } + + logEvt.Msg("processing CEX order book") + } + + return nil, nil +} diff --git a/cmd/main.go b/cmd/main.go index 22a1b29..f002a4e 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -71,6 +71,7 @@ func Run(ctx context.Context, cfg *gosdk.InitConfig) error { appInit.Config, gosdk.NewDefaultBatchProcessor[application.Transaction[application.Receipt]]( application.NewExtBlockProcessor(appInit.Storage.Multichain()), + application.NewCEXProcessor(appInit.Storage.CEXData()), appInit.Storage.Multichain(), appInit.Storage.Subscriber(), ), diff --git a/go.mod b/go.mod index 74985e5..a399db0 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/0xAtelerix/example go 1.25.0 require ( - github.com/0xAtelerix/sdk v0.1.7 + github.com/0xAtelerix/sdk v0.1.9-0.20260131114251-58825b72aca2 github.com/blocto/solana-go-sdk v1.30.0 github.com/ethereum/go-ethereum v1.16.3 github.com/holiman/uint256 v1.3.2 diff --git a/go.sum b/go.sum index 27d9637..c53a1a1 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= -github.com/0xAtelerix/sdk v0.1.7 h1:8cEVHTs2vL4LYAuNKwSQcxqssztOfDETzYtl9gvGnak= -github.com/0xAtelerix/sdk v0.1.7/go.mod h1:tYa+zDjpx3DrDnTrqU2gASr8BmEoDzz6XrPuPkNcx64= +github.com/0xAtelerix/sdk v0.1.9-0.20260131114251-58825b72aca2 h1:oxZ7ZWbtNQQgwWe7OhaG4eWUG2E8zxnXweXkeJhZclE= +github.com/0xAtelerix/sdk v0.1.9-0.20260131114251-58825b72aca2/go.mod h1:tYa+zDjpx3DrDnTrqU2gASr8BmEoDzz6XrPuPkNcx64= github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI=