From fb7a6a00f120e86293958de7d4e45493ef4f005e Mon Sep 17 00:00:00 2001 From: Septen Date: Thu, 8 Jan 2026 03:12:21 +0000 Subject: [PATCH] Node: improve state delta payload handling. --- crates/node/primitives/src/client.rs | 11 +++--- crates/node/primitives/src/sync.rs | 44 ++++++++++++++++++--- crates/node/src/handlers/network_event.rs | 7 +--- crates/node/src/handlers/state_delta.rs | 48 +++++++++++++---------- crates/storage/src/delta.rs | 24 ++++++++++++ 5 files changed, 98 insertions(+), 36 deletions(-) diff --git a/crates/node/primitives/src/client.rs b/crates/node/primitives/src/client.rs index 55a0e6e4d..ab08a0683 100644 --- a/crates/node/primitives/src/client.rs +++ b/crates/node/primitives/src/client.rs @@ -25,7 +25,7 @@ use calimero_network_primitives::specialized_node_invite::SpecializedNodeType; use crate::messages::{ NodeMessage, RegisterPendingSpecializedNodeInvite, RemovePendingSpecializedNodeInvite, }; -use crate::sync::BroadcastMessage; +use crate::sync::{BroadcastMessage, StateDeltaPayload}; mod alias; mod application; @@ -121,9 +121,9 @@ impl NodeClient { let shared_key = SharedKey::from_sk(sender_key); let nonce = rand::thread_rng().gen(); - let encrypted = shared_key - .encrypt(artifact, nonce) - .ok_or_eyre("failed to encrypt artifact")?; + // Bundle artifact and events together before encryption + let delta_payload = StateDeltaPayload::new(artifact, events); + let encrypted = delta_payload.encrypt(&shared_key, nonce)?; let payload = BroadcastMessage::StateDelta { context_id: context.id, @@ -132,9 +132,8 @@ impl NodeClient { parent_ids, hlc, root_hash: context.root_hash, - artifact: encrypted.into(), nonce, - events: events.map(Cow::from), + payload: encrypted.into(), }; let payload = borsh::to_vec(&payload)?; diff --git a/crates/node/primitives/src/sync.rs b/crates/node/primitives/src/sync.rs index 3e09e0f51..2fdd5a1c9 100644 --- a/crates/node/primitives/src/sync.rs +++ b/crates/node/primitives/src/sync.rs @@ -3,7 +3,7 @@ use std::borrow::Cow; use borsh::{BorshDeserialize, BorshSerialize}; -use calimero_crypto::Nonce; +use calimero_crypto::{Nonce, SharedKey}; use calimero_network_primitives::specialized_node_invite::SpecializedNodeType; use calimero_primitives::blobs::BlobId; use calimero_primitives::context::ContextId; @@ -28,12 +28,12 @@ pub enum BroadcastMessage<'a> { hlc: calimero_storage::logical_clock::HybridTimestamp, root_hash: Hash, // todo! shouldn't be cleartext - artifact: Cow<'a, [u8]>, nonce: Nonce, - /// Execution events that were emitted during the state change. - /// This field is encrypted along with the artifact. - events: Option>, + /// Encrypted and borsh-serialized `StateDeltaPayload`. + /// The `StateDeltaPayload` contains state delta artifact and + /// execution events that were emitted during the state change. + payload: Cow<'a, [u8]>, }, /// Hash heartbeat for divergence detection @@ -135,3 +135,37 @@ pub enum MessagePayload<'a> { signature: [u8; 64], }, } + +// Encapsulated structure containing artifact and events for `StateDelta`. +// This is required to ensure that both artifact and events are encrypted and tied together. +#[derive(Debug, BorshSerialize, BorshDeserialize)] +pub struct StateDeltaPayload { + /// The state delta artifact. + pub artifact: Vec, + /// Execution events associated with the delta and artifact. + pub events: Option>, +} + +impl StateDeltaPayload { + /// Creates a new payload bundling together state delta artifact and events. + pub fn new(artifact: Vec, events: Option>) -> Self { + Self { artifact, events } + } + + /// Serializes and encrypts the payload using the provided key and nonce. + pub fn encrypt(&self, key: &SharedKey, nonce: Nonce) -> eyre::Result> { + let plaintext = borsh::to_vec(self)?; + key.encrypt(plaintext, nonce) + .ok_or_else(|| eyre::eyre!("failed to encrypt StateDeltaPayload")) + } + + /// Decrypts and deserializes the payload using the provided key and nonce. + pub fn decrypt(cipher_text: Vec, key: &SharedKey, nonce: Nonce) -> eyre::Result { + let plaintext = key + .decrypt(cipher_text, nonce) + .ok_or_else(|| eyre::eyre!("failed to decrypt StateDeltaPayload"))?; + let payload = borsh::from_slice(&plaintext)?; + + Ok(payload) + } +} diff --git a/crates/node/src/handlers/network_event.rs b/crates/node/src/handlers/network_event.rs index 140e63290..14af24dbe 100644 --- a/crates/node/src/handlers/network_event.rs +++ b/crates/node/src/handlers/network_event.rs @@ -86,16 +86,14 @@ impl Handler for NodeManager { parent_ids, hlc, root_hash, - artifact, nonce, - events, + payload, } => { info!( %context_id, %author_id, delta_id = ?delta_id, parent_count = parent_ids.len(), - has_events = events.is_some(), "Matched StateDelta message" ); @@ -119,9 +117,8 @@ impl Handler for NodeManager { parent_ids, hlc, root_hash, - artifact.into_owned(), nonce, - events.map(|e| e.into_owned()), + payload.into_owned(), ) .await { diff --git a/crates/node/src/handlers/state_delta.rs b/crates/node/src/handlers/state_delta.rs index 4145d66e1..4ef89c0e0 100644 --- a/crates/node/src/handlers/state_delta.rs +++ b/crates/node/src/handlers/state_delta.rs @@ -5,13 +5,14 @@ use calimero_context_primitives::client::ContextClient; use calimero_crypto::Nonce; use calimero_node_primitives::client::NodeClient; +use calimero_node_primitives::sync::StateDeltaPayload; use calimero_primitives::context::ContextId; use calimero_primitives::events::{ ContextEvent, ContextEventPayload, ExecutionEvent, NodeEvent, StateMutationPayload, }; use calimero_primitives::hash::Hash; use calimero_primitives::identity::{PrivateKey, PublicKey}; -use calimero_storage::action::Action; +use calimero_storage::{action::Action, delta::StorageDelta}; use eyre::{bail, OptionExt, Result}; use libp2p::PeerId; use tracing::{debug, info, warn}; @@ -44,9 +45,9 @@ pub async fn handle_state_delta( parent_ids: Vec<[u8; 32]>, hlc: calimero_storage::logical_clock::HybridTimestamp, root_hash: Hash, - artifact: Vec, nonce: Nonce, - events: Option>, + // state delta artifact and execution events bundled together (see `StateDeltaPayload`) + delta_payload: Vec, ) -> Result<()> { let Some(context) = node_clients.context.get_context(&context_id)? else { bail!("context '{}' not found", context_id); @@ -73,7 +74,20 @@ pub async fn handle_state_delta( ) .await?; - let actions = decrypt_delta_actions(artifact, nonce, sender_key)?; + // Decrypt the bundled state delta payload + let (artifact, events) = decrypt_delta_payload(sender_key, nonce, delta_payload)?; + + info!( + %context_id, + %author_id, + delta_id = ?delta_id, + has_events = events.is_some(), + artifact_len = artifact.len(), + "Processing state delta" + ); + + // Extract actions using the new StorageDelta helper + let actions = StorageDelta::extract_actions_from_artifact(&artifact)?; let delta = calimero_dag::CausalDelta { id: delta_id, @@ -256,23 +270,17 @@ struct DeltaStoreSetup { is_uninitialized: bool, } -fn decrypt_delta_actions( - artifact: Vec, - nonce: Nonce, +/// Helper that decrypts the state delta payload and returns the artifact and events. +fn decrypt_delta_payload( sender_key: PrivateKey, -) -> Result> { - let shared_key = calimero_crypto::SharedKey::from_sk(&sender_key); - let decrypted_artifact = shared_key - .decrypt(artifact, nonce) - .ok_or_eyre("failed to decrypt artifact")?; - - let storage_delta: calimero_storage::delta::StorageDelta = - borsh::from_slice(&decrypted_artifact)?; - - match storage_delta { - calimero_storage::delta::StorageDelta::Actions(actions) => Ok(actions), - _ => bail!("Expected Actions variant in state delta"), - } + nonce: Nonce, + encrypted_payload: Vec, +) -> Result<(Vec, Option>)> { + let shared_key = calimero_crypto::SharedKey::from_sk(&sender_key.into()); + let StateDeltaPayload { artifact, events } = + StateDeltaPayload::decrypt(encrypted_payload, &shared_key, nonce)?; + + Ok((artifact, events)) } async fn ensure_author_sender_key( diff --git a/crates/storage/src/delta.rs b/crates/storage/src/delta.rs index f59eb1f05..975b33085 100644 --- a/crates/storage/src/delta.rs +++ b/crates/storage/src/delta.rs @@ -138,6 +138,30 @@ pub enum StorageDelta { Comparisons(Vec), } +impl StorageDelta { + /// Deserializes a raw artifact byte slice into a vector of `Action`s. + /// + /// This is a helper function to facilitate the creation of `Actions` variant + /// assuming the unencrypted `artifact` contains a list of actions. + /// + /// # Args + /// * `artifact` - a raw artifact byte slice conaining an unencrypted list of actions. + /// + /// # Errors + /// Returns an error if: + /// * The artifact cannot be Borsh-deserialized. + /// * The artifact contains `Comparisons` instead of `Actions`. + pub fn extract_actions_from_artifact(artifact: &[u8]) -> eyre::Result> { + let delta: Self = borsh::from_slice(artifact)?; + match delta { + Self::Actions(actions) => Ok(actions), + Self::Comparisons(_) => { + eyre::bail!("Expected Actions variant in state delta, found Comparisons") + } + } + } +} + impl BorshDeserialize for StorageDelta { fn deserialize_reader(reader: &mut R) -> io::Result { let Ok(tag) = u8::deserialize_reader(reader) else {