Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions application/cex_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package application

import (
"context"

"github.com/0xAtelerix/sdk/gosdk"
"github.com/0xAtelerix/sdk/gosdk/apptypes"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/rs/zerolog/log"
)

// CEXProcessor implements gosdk.CEXStreamProcessor for the application layer.
// It reads full order book data on demand via the CEXDataAccessor.
type CEXProcessor struct {
accessor gosdk.CEXDataAccessor
}

// NewCEXProcessor creates a new CEX stream processor.
// Returns nil if accessor is nil (CEX data not available).
func NewCEXProcessor(accessor gosdk.CEXDataAccessor) *CEXProcessor {
if accessor == nil {
return nil
}

return &CEXProcessor{accessor: accessor}
}

// ProcessCEXStream reads full order book data for each ref and processes it.
//
//nolint:unparam // v1 stub: will return real data when KV storage is added
func (p *CEXProcessor) ProcessCEXStream(
ctx context.Context,
refs []apptypes.CEXOrderBookRef,
_ kv.RwTx,
) ([]apptypes.ExternalTransaction, error) {
for _, ref := range refs {
ob, err := p.accessor.ReadCEXOrderBook(ctx, ref.Exchange, ref.Symbol, ref.FetchedAt)
if err != nil {
log.Error().Err(err).
Str("exchange", ref.Exchange).
Str("symbol", ref.Symbol).
Int64("fetchedAt", ref.FetchedAt).
Msg("failed to read CEX order book")

continue
}

logEvt := log.Info().
Str("exchange", ob.Exchange).
Str("symbol", ob.Symbol).
Int("bids", len(ob.Bids)).
Int("asks", len(ob.Asks)).
Int64("fetchedAt", ob.FetchedAt)

if len(ob.Bids) > 0 {
logEvt = logEvt.Str("topBid", ob.Bids[0].Price).Str("topBidQty", ob.Bids[0].Quantity)
}

if len(ob.Asks) > 0 {
logEvt = logEvt.Str("topAsk", ob.Asks[0].Price).Str("topAskQty", ob.Asks[0].Quantity)
}

logEvt.Msg("processing CEX order book")
}

return nil, nil
}
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func Run(ctx context.Context, cfg *gosdk.InitConfig) error {
appInit.Config,
gosdk.NewDefaultBatchProcessor[application.Transaction[application.Receipt]](
application.NewExtBlockProcessor(appInit.Storage.Multichain()),
application.NewCEXProcessor(appInit.Storage.CEXData()),
appInit.Storage.Multichain(),
appInit.Storage.Subscriber(),
),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/0xAtelerix/example
go 1.25.0

require (
github.com/0xAtelerix/sdk v0.1.7
github.com/0xAtelerix/sdk v0.1.9-0.20260131114251-58825b72aca2
github.com/blocto/solana-go-sdk v1.30.0
github.com/ethereum/go-ethereum v1.16.3
github.com/holiman/uint256 v1.3.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/0xAtelerix/sdk v0.1.7 h1:8cEVHTs2vL4LYAuNKwSQcxqssztOfDETzYtl9gvGnak=
github.com/0xAtelerix/sdk v0.1.7/go.mod h1:tYa+zDjpx3DrDnTrqU2gASr8BmEoDzz6XrPuPkNcx64=
github.com/0xAtelerix/sdk v0.1.9-0.20260131114251-58825b72aca2 h1:oxZ7ZWbtNQQgwWe7OhaG4eWUG2E8zxnXweXkeJhZclE=
github.com/0xAtelerix/sdk v0.1.9-0.20260131114251-58825b72aca2/go.mod h1:tYa+zDjpx3DrDnTrqU2gASr8BmEoDzz6XrPuPkNcx64=
github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA=
github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8=
github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI=
Expand Down