Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/weak-mice-jam.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Introduces bundling for SVR (It's not enabled by default) #added
83 changes: 83 additions & 0 deletions .github/workflows/devenv-ocr2-chaos.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
name: OCR2 Chaos Test

on:
schedule:
- cron: "0 6 * * *" # Run daily at 6 AM
workflow_dispatch:

defaults:
run:
working-directory: devenv

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.sha }}
cancel-in-progress: true

jobs:
chaos:
permissions:
id-token: write
contents: read
pull-requests: write
runs-on: ubuntu24.04-16cores-64GB # ghv-ignore!
steps:
- name: Checkout code
uses: actions/checkout@v5
with:
fetch-depth: 0

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1

- name: Install Just
uses: extractions/setup-just@e33e0265a09d6d736e2ee1e0eb685ef1de4669ff # v3
with:
just-version: "1.40.0"

- name: Configure AWS credentials using OIDC
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
role-to-assume: ${{ secrets.AWS_OIDC_IAM_ROLE_SDLC_ECR_READONLY_ARN }}
aws-region: us-west-2

- name: Authenticate to ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@062b18b96a7aff071d4dc91bc00c4c1a7945b076 # v2.0.1

- name: Set up Go
uses: actions/setup-go@v6 # v6
with:
cache: true
go-version-file: devenv/go.mod
cache-dependency-path: devenv/go.sum

- name: Download Go dependencies
run: |
go mod download

- name: Set environment variables
id: set-env
run: |
echo "CHAINLINK_IMAGE=${{ secrets.REGISTRY_SDLC }}/chainlink:nightly-$(date +%Y%m%d)-plugins" >> $GITHUB_ENV

- name: Run OCR2 environment
env:
FAKE_SERVER_IMAGE: ${{ secrets.FAKE_SERVER_IMAGE }}
run: |
cd cmd/cl && go install . && cd -
cl u env.toml,products/ocr2/basic.toml && cl obs up -f

- name: Run Chaos tests
id: chaos_test
working-directory: devenv/tests/ocr2
run: |
echo "Running tests for: $CHAINLINK_IMAGE, product: OCR2"
go test -v -timeout 4h -run TestOCR2Chaos

- name: Upload Logs
if: always()
uses: actions/upload-artifact@v4
with:
name: container-logs-smoke
path: devenv/tests/ocr2/logs
retention-days: 3
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:
working-directory: devenv/tests/ocr2
run: |
echo "Running tests for: $CHAINLINK_IMAGE, product: OCR2"
go test -v -timeout 4h -run TestOCR2Load/clean
go test -v -timeout 4h -run TestOCR2Soak/clean

- name: Upload Logs
if: always()
Expand Down
4 changes: 0 additions & 4 deletions core/capabilities/ccip/configs/evm/contract_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,6 @@ var DestReaderConfig = config.ChainReaderConfig{
},
},
Configs: map[string]*config.ChainReaderDefinition{
consts.MethodNameFeeQuoterGetStaticConfig: {
ChainSpecificName: mustGetMethodName("getStaticConfig", feeQuoterABI),
ReadType: config.Method,
},
consts.MethodNameFeeQuoterGetTokenPrices: {
ChainSpecificName: mustGetMethodName("getTokenPrices", feeQuoterABI),
ReadType: config.Method,
Expand Down
14 changes: 14 additions & 0 deletions core/capabilities/remote/messagecache/message_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@ func (c *MessageCache[EventID, PeerID]) Delete(eventID EventID) {
delete(c.events, eventID)
}

// Peers returns a snapshot of peer IDs that have inserted a message for eventID.
func (c *MessageCache[EventID, PeerID]) Peers(eventID EventID) map[PeerID]bool {
ev, ok := c.events[eventID]
if !ok {
return nil
}

peers := make(map[PeerID]bool, len(ev.peerMsgs))
for peerID := range ev.peerMsgs {
peers[peerID] = true
}
return peers
}

// Return the number of events deleted.
// Scans all keys, which might be slow for large caches.
func (c *MessageCache[EventID, PeerID]) DeleteOlderThan(cutoffTimestamp int64) int {
Expand Down
91 changes: 70 additions & 21 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (p *triggerPublisher) Start(ctx context.Context) error {
}

p.wg.Add(1)
go p.registrationCleanupLoop()
go p.cacheCleanupLoop()
p.wg.Add(1)
go p.batchingLoop()
p.lggr.Info("TriggerPublisher started")
Expand Down Expand Up @@ -287,7 +287,7 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {
nowMs := time.Now().UnixMilli()
p.ackCache.Insert(key, sender, nowMs, msg.Payload)
minRequired := uint32(2*callerDon.F + 1)
ready, _ := p.ackCache.Ready(key, minRequired, nowMs-cfg.remoteConfig.RegistrationExpiry.Milliseconds(), false)
ready, _ := p.ackCache.Ready(key, minRequired, nowMs-cfg.remoteConfig.MessageExpiry.Milliseconds(), false)
if !ready {
p.lggr.Debugw("not ready to ACK trigger event yet", "triggerEventId", triggerEventID, "minRequired", minRequired)
return
Expand All @@ -298,22 +298,22 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {
p.lggr.Debugw("ACKing trigger event", "triggerEventId", triggerEventID)
err = cfg.underlying.AckEvent(ctx, p.capabilityID, triggerEventID, p.capMethodName)
if err != nil {
p.lggr.Errorf("failed to AckEvent on underlying trigger capability (eventID = %s, capabilityID: %s): %v",
triggerEventID, p.capabilityID, err)
p.lggr.Errorw("failed to AckEvent on underlying trigger capability",
"eventID", triggerEventID, "capabilityID", p.capabilityID, "err", err)
}
default:
p.lggr.Errorw("received message with unknown method",
"method", SanitizeLogString(msg.Method), "sender", sender)
}
}

func (p *triggerPublisher) registrationCleanupLoop() {
func (p *triggerPublisher) cacheCleanupLoop() {
defer p.wg.Done()

// Get initial config for ticker setup
firstCfg := p.cfg.Load()
if firstCfg == nil {
p.lggr.Errorw("registrationCleanupLoop started but config not set")
p.lggr.Errorw("cacheCleanupLoop started but config not set")
return
}
cleanupInterval := firstCfg.remoteConfig.MessageExpiry
Expand Down Expand Up @@ -348,7 +348,13 @@ func (p *triggerPublisher) registrationCleanupLoop() {
p.messageCache.Delete(key)
}
}

deleted := p.ackCache.DeleteOlderThan(now - cfg.remoteConfig.MessageExpiry.Milliseconds())
p.mu.Unlock()

if deleted > 0 {
p.lggr.Debugw("cleaned expired AckCache entries", "deleted", deleted)
}
}
}
}
Expand Down Expand Up @@ -434,23 +440,66 @@ func (p *triggerPublisher) sendBatch(resp *batchedResponse) {
resp.workflowIDs = nil
resp.triggerIDs = nil
}
msg := &types.MessageBody{
CapabilityId: p.capabilityID,
CapabilityDonId: cfg.capDonInfo.ID,
CallerDonId: resp.callerDonID,
Method: types.MethodTriggerEvent,
Payload: resp.rawResponse,
Metadata: &types.MessageBody_TriggerEventMetadata{
TriggerEventMetadata: &types.TriggerEventMetadata{
WorkflowIds: workflowBatch,
TriggerIds: triggerBatch,
TriggerEventId: resp.triggerEventID,
},
},
CapabilityMethod: p.capMethodName,

ackSnapshot := make(map[string]map[p2ptypes.PeerID]bool)
p.mu.RLock()
for _, triggerID := range triggerBatch {
key := ackKey{
callerDonID: resp.callerDonID,
triggerEventID: resp.triggerEventID,
triggerID: triggerID,
}
ackSnapshot[triggerID] = p.ackCache.Peers(key)
}
// NOTE: send to all nodes by default, introduce different strategies later (KS-76)
p.mu.RUnlock()

for _, peerID := range cfg.workflowDONs[resp.callerDonID].Members {
var missingTriggerIDs []string
var missingWorkflowIDs []string

// determine which triggerIDs / workflowIDs have not yet ACKd this trigger event
for i, triggerID := range triggerBatch {
peers := ackSnapshot[triggerID]
if peers == nil || !peers[peerID] {
missingTriggerIDs = append(missingTriggerIDs, triggerID)
missingWorkflowIDs = append(missingWorkflowIDs, workflowBatch[i])
}
}

if len(missingTriggerIDs) == 0 {
p.lggr.Debugw("skipping trigger event send; all triggerIDs already ACKed by peer",
"peerID", peerID,
"callerDonID", resp.callerDonID,
"triggerEventID", resp.triggerEventID,
"triggerIDs", triggerBatch,
)
continue
}

p.lggr.Debugw("sending trigger event to peer",
"peerID", peerID,
"callerDonID", resp.callerDonID,
"triggerEventID", resp.triggerEventID,
"workflowIDs", missingWorkflowIDs,
"triggerIDs", missingTriggerIDs,
)

msg := &types.MessageBody{
CapabilityId: p.capabilityID,
CapabilityDonId: cfg.capDonInfo.ID,
CallerDonId: resp.callerDonID,
Method: types.MethodTriggerEvent,
Payload: resp.rawResponse,
CapabilityMethod: p.capMethodName,
Metadata: &types.MessageBody_TriggerEventMetadata{
TriggerEventMetadata: &types.TriggerEventMetadata{
WorkflowIds: missingWorkflowIDs,
TriggerIds: missingTriggerIDs,
TriggerEventId: resp.triggerEventID,
},
},
}

err := p.dispatcher.Send(peerID, msg)
if err != nil {
p.lggr.Errorw("failed to send trigger event", "peerID", peerID, "err", err)
Expand Down
Loading
Loading