Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions crates/node/primitives/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use calimero_network_primitives::specialized_node_invite::SpecializedNodeType;
use crate::messages::{
NodeMessage, RegisterPendingSpecializedNodeInvite, RemovePendingSpecializedNodeInvite,
};
use crate::sync::BroadcastMessage;
use crate::sync::{BroadcastMessage, StateDeltaPayload};

mod alias;
mod application;
Expand Down Expand Up @@ -121,9 +121,9 @@ impl NodeClient {
let shared_key = SharedKey::from_sk(sender_key);
let nonce = rand::thread_rng().gen();

let encrypted = shared_key
.encrypt(artifact, nonce)
.ok_or_eyre("failed to encrypt artifact")?;
// Bundle artifact and events together before encryption
let delta_payload = StateDeltaPayload::new(artifact, events);
let encrypted = delta_payload.encrypt(&shared_key, nonce)?;

let payload = BroadcastMessage::StateDelta {
context_id: context.id,
Expand All @@ -132,9 +132,8 @@ impl NodeClient {
parent_ids,
hlc,
root_hash: context.root_hash,
artifact: encrypted.into(),
nonce,
events: events.map(Cow::from),
payload: encrypted.into(),
};

let payload = borsh::to_vec(&payload)?;
Expand Down
44 changes: 39 additions & 5 deletions crates/node/primitives/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::borrow::Cow;

use borsh::{BorshDeserialize, BorshSerialize};
use calimero_crypto::Nonce;
use calimero_crypto::{Nonce, SharedKey};
use calimero_network_primitives::specialized_node_invite::SpecializedNodeType;
use calimero_primitives::blobs::BlobId;
use calimero_primitives::context::ContextId;
Expand All @@ -28,12 +28,12 @@ pub enum BroadcastMessage<'a> {
hlc: calimero_storage::logical_clock::HybridTimestamp,

root_hash: Hash, // todo! shouldn't be cleartext
artifact: Cow<'a, [u8]>,
nonce: Nonce,

/// Execution events that were emitted during the state change.
/// This field is encrypted along with the artifact.
events: Option<Cow<'a, [u8]>>,
/// Encrypted and borsh-serialized `StateDeltaPayload`.
/// The `StateDeltaPayload` contains state delta artifact and
/// execution events that were emitted during the state change.
payload: Cow<'a, [u8]>,
},

/// Hash heartbeat for divergence detection
Expand Down Expand Up @@ -135,3 +135,37 @@ pub enum MessagePayload<'a> {
signature: [u8; 64],
},
}

// Encapsulated structure containing artifact and events for `StateDelta`.
// This is required to ensure that both artifact and events are encrypted and tied together.
#[derive(Debug, BorshSerialize, BorshDeserialize)]
pub struct StateDeltaPayload {
/// The state delta artifact.
pub artifact: Vec<u8>,
/// Execution events associated with the delta and artifact.
pub events: Option<Vec<u8>>,
}

impl StateDeltaPayload {
/// Creates a new payload bundling together state delta artifact and events.
pub fn new(artifact: Vec<u8>, events: Option<Vec<u8>>) -> Self {
Self { artifact, events }
}

/// Serializes and encrypts the payload using the provided key and nonce.
pub fn encrypt(&self, key: &SharedKey, nonce: Nonce) -> eyre::Result<Vec<u8>> {
let plaintext = borsh::to_vec(self)?;
key.encrypt(plaintext, nonce)
.ok_or_else(|| eyre::eyre!("failed to encrypt StateDeltaPayload"))
}

/// Decrypts and deserializes the payload using the provided key and nonce.
pub fn decrypt(cipher_text: Vec<u8>, key: &SharedKey, nonce: Nonce) -> eyre::Result<Self> {
let plaintext = key
.decrypt(cipher_text, nonce)
.ok_or_else(|| eyre::eyre!("failed to decrypt StateDeltaPayload"))?;
let payload = borsh::from_slice(&plaintext)?;

Ok(payload)
}
}
7 changes: 2 additions & 5 deletions crates/node/src/handlers/network_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,14 @@ impl Handler<NetworkEvent> for NodeManager {
parent_ids,
hlc,
root_hash,
artifact,
nonce,
events,
payload,
} => {
info!(
%context_id,
%author_id,
delta_id = ?delta_id,
parent_count = parent_ids.len(),
has_events = events.is_some(),
"Matched StateDelta message"
);

Expand All @@ -119,9 +117,8 @@ impl Handler<NetworkEvent> for NodeManager {
parent_ids,
hlc,
root_hash,
artifact.into_owned(),
nonce,
events.map(|e| e.into_owned()),
payload.into_owned(),
)
.await
{
Expand Down
48 changes: 28 additions & 20 deletions crates/node/src/handlers/state_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
use calimero_context_primitives::client::ContextClient;
use calimero_crypto::Nonce;
use calimero_node_primitives::client::NodeClient;
use calimero_node_primitives::sync::StateDeltaPayload;
use calimero_primitives::context::ContextId;
use calimero_primitives::events::{
ContextEvent, ContextEventPayload, ExecutionEvent, NodeEvent, StateMutationPayload,
};
use calimero_primitives::hash::Hash;
use calimero_primitives::identity::{PrivateKey, PublicKey};
use calimero_storage::action::Action;
use calimero_storage::{action::Action, delta::StorageDelta};
use eyre::{bail, OptionExt, Result};
use libp2p::PeerId;
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -44,9 +45,9 @@ pub async fn handle_state_delta(
parent_ids: Vec<[u8; 32]>,
hlc: calimero_storage::logical_clock::HybridTimestamp,
root_hash: Hash,
artifact: Vec<u8>,
nonce: Nonce,
events: Option<Vec<u8>>,
// state delta artifact and execution events bundled together (see `StateDeltaPayload`)
delta_payload: Vec<u8>,
) -> Result<()> {
let Some(context) = node_clients.context.get_context(&context_id)? else {
bail!("context '{}' not found", context_id);
Expand All @@ -73,7 +74,20 @@ pub async fn handle_state_delta(
)
.await?;

let actions = decrypt_delta_actions(artifact, nonce, sender_key)?;
// Decrypt the bundled state delta payload
let (artifact, events) = decrypt_delta_payload(sender_key, nonce, delta_payload)?;

info!(
%context_id,
%author_id,
delta_id = ?delta_id,
has_events = events.is_some(),
artifact_len = artifact.len(),
"Processing state delta"
);

// Extract actions using the new StorageDelta helper
let actions = StorageDelta::extract_actions_from_artifact(&artifact)?;

let delta = calimero_dag::CausalDelta {
id: delta_id,
Expand Down Expand Up @@ -256,23 +270,17 @@ struct DeltaStoreSetup {
is_uninitialized: bool,
}

fn decrypt_delta_actions(
artifact: Vec<u8>,
nonce: Nonce,
/// Helper that decrypts the state delta payload and returns the artifact and events.
fn decrypt_delta_payload(
sender_key: PrivateKey,
) -> Result<Vec<Action>> {
let shared_key = calimero_crypto::SharedKey::from_sk(&sender_key);
let decrypted_artifact = shared_key
.decrypt(artifact, nonce)
.ok_or_eyre("failed to decrypt artifact")?;

let storage_delta: calimero_storage::delta::StorageDelta =
borsh::from_slice(&decrypted_artifact)?;

match storage_delta {
calimero_storage::delta::StorageDelta::Actions(actions) => Ok(actions),
_ => bail!("Expected Actions variant in state delta"),
}
nonce: Nonce,
encrypted_payload: Vec<u8>,
) -> Result<(Vec<u8>, Option<Vec<u8>>)> {
let shared_key = calimero_crypto::SharedKey::from_sk(&sender_key.into());
let StateDeltaPayload { artifact, events } =
StateDeltaPayload::decrypt(encrypted_payload, &shared_key, nonce)?;

Ok((artifact, events))
}

async fn ensure_author_sender_key(
Expand Down
24 changes: 24 additions & 0 deletions crates/storage/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,30 @@ pub enum StorageDelta {
Comparisons(Vec<Comparison>),
}

impl StorageDelta {
/// Deserializes a raw artifact byte slice into a vector of `Action`s.
///
/// This is a helper function to facilitate the creation of `Actions` variant
/// assuming the unencrypted `artifact` contains a list of actions.
///
/// # Args
/// * `artifact` - a raw artifact byte slice conaining an unencrypted list of actions.
///
/// # Errors
/// Returns an error if:
/// * The artifact cannot be Borsh-deserialized.
/// * The artifact contains `Comparisons` instead of `Actions`.
pub fn extract_actions_from_artifact(artifact: &[u8]) -> eyre::Result<Vec<Action>> {
let delta: Self = borsh::from_slice(artifact)?;
match delta {
Self::Actions(actions) => Ok(actions),
Self::Comparisons(_) => {
eyre::bail!("Expected Actions variant in state delta, found Comparisons")
}
}
}
}

impl BorshDeserialize for StorageDelta {
fn deserialize_reader<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let Ok(tag) = u8::deserialize_reader(reader) else {
Expand Down
Loading