Skip to content
Merged
48 changes: 48 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
codecov:
require_ci_to_pass: true

coverage:
precision: 2
round: down
range: "70...100"

status:
project:
default:
target: auto
threshold: 1%
base: auto
branches:
- main
if_ci_failed: error
informational: false
only_pulls: false

patch:
default:
target: auto
threshold: 1%
base: auto
if_ci_failed: error
only_pulls: true

changes: false

comment:
layout: "diff, flags, files"
behavior: default
require_changes: false
require_base: false
require_head: true
hide_project_coverage: false

parsers:
gcov:
branch_detection:
conditional: true
loop: true
macro: false
method: false

github_checks:
annotations: true
16 changes: 12 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: checkout
uses: actions/checkout@v4
- name: toolchain
uses: dtolnay/rust-toolchain@master
uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ env.toolchain }}
components: rustfmt
Expand All @@ -43,15 +43,23 @@ jobs:
- name: checkout
uses: actions/checkout@v4
- name: toolchain
uses: dtolnay/rust-toolchain@master
uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ env.toolchain }}
components: clippy
- name: Clippy
run: |
cargo clippy \
--all-targets \
-- -D warnings
-- -D warnings \
-W clippy::pedantic \
-W clippy::nursery \
-W clippy::style \
-W clippy::complexity \
-W clippy::perf \
-W clippy::suspicious \
-W clippy::correctness

test:
name: test
runs-on: ubuntu-22.04
Expand All @@ -73,7 +81,7 @@ jobs:
- name: checkout
uses: actions/checkout@v4
- name: toolchain
uses: dtolnay/rust-toolchain@master
uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ env.toolchain }}
- name: Unit tests
Expand Down
16 changes: 8 additions & 8 deletions atoma-bin/atoma_daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use atoma_daemon::{
server::{run_server, DaemonState},
};
use atoma_state::{config::AtomaStateManagerConfig, AtomaState};
use atoma_sui::client::AtomaSuiClient;
use atoma_sui::client::Client;
use atoma_utils::spawn_with_shutdown;
use clap::Parser;
use sui_sdk::types::base_types::ObjectID;
Expand Down Expand Up @@ -39,13 +39,14 @@ struct DaemonArgs {
}

#[tokio::main]
#[allow(clippy::redundant_pub_crate)]
async fn main() -> Result<()> {
setup_logging()?;
setup_logging();
let args = DaemonArgs::parse();
let daemon_config = AtomaDaemonConfig::from_file_path(args.config_path.clone());
let state_manager_config = AtomaStateManagerConfig::from_file_path(args.config_path.clone());
let client = Arc::new(RwLock::new(
AtomaSuiClient::new_from_config(args.config_path).await?,
Client::new_from_config(args.config_path).await?,
));

info!(
Expand Down Expand Up @@ -86,7 +87,7 @@ async fn main() -> Result<()> {

let ctrl_c = tokio::task::spawn(async move {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
result = tokio::signal::ctrl_c() => {
info!(
target = "atoma_daemon",
event = "atoma-daemon-stop",
Expand All @@ -95,10 +96,10 @@ async fn main() -> Result<()> {
shutdown_sender
.send(true)
.context("Failed to send shutdown signal")?;
Ok::<(), anyhow::Error>(())
result.map_err(anyhow::Error::from)
}
_ = shutdown_receiver.changed() => {
Ok::<(), anyhow::Error>(())
Ok(())
}
}
});
Expand All @@ -108,7 +109,7 @@ async fn main() -> Result<()> {
daemon_result
}

fn setup_logging() -> Result<()> {
fn setup_logging() {
let log_dir = Path::new(LOGS);
let file_appender = RollingFileAppender::new(Rotation::DAILY, log_dir, LOG_FILE);
let (non_blocking_appender, _guard) = non_blocking(file_appender);
Expand All @@ -133,5 +134,4 @@ fn setup_logging() -> Result<()> {
.with(console_layer)
.with(file_layer)
.init();
Ok(())
}
54 changes: 28 additions & 26 deletions atoma-bin/atoma_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ use std::{
};

use anyhow::{Context, Result};
use atoma_confidential::AtomaConfidentialComputeService;
use atoma_confidential::AtomaConfidentialCompute;
use atoma_daemon::{AtomaDaemonConfig, DaemonState};
use atoma_service::{
config::AtomaServiceConfig,
proxy::{config::ProxyConfig, register_on_proxy},
server::AppState,
};
use atoma_state::{config::AtomaStateManagerConfig, AtomaState, AtomaStateManager};
use atoma_sui::{client::AtomaSuiClient, AtomaSuiConfig, SuiEventSubscriber};
use atoma_sui::{client::Client, config::Config, subscriber::Subscriber};
use atoma_utils::spawn_with_shutdown;
use clap::Parser;
use dotenv::dotenv;
Expand Down Expand Up @@ -66,9 +66,9 @@ struct Args {
/// This struct holds the configuration settings for various components
/// of the Atoma node, including the Sui, service, and state manager configurations.
#[derive(Debug)]
struct Config {
struct NodeConfig {
/// Configuration for the Sui component.
sui: AtomaSuiConfig,
sui: Config,

/// Configuration for the service component.
service: AtomaServiceConfig,
Expand All @@ -83,9 +83,9 @@ struct Config {
proxy: ProxyConfig,
}

impl Config {
async fn load(path: &str) -> Result<Self, ValidationErrors> {
let sui = AtomaSuiConfig::from_file_path(path);
impl NodeConfig {
fn load(path: &str) -> Result<Self, ValidationErrors> {
let sui = Config::from_file_path(path);
let service = AtomaServiceConfig::from_file_path(path);
let state = AtomaStateManagerConfig::from_file_path(path);
let daemon = AtomaDaemonConfig::from_file_path(path);
Expand Down Expand Up @@ -127,7 +127,7 @@ impl Config {
/// async fn example() -> Result<()> {
/// let models = vec!["facebook/opt-125m".to_string()];
/// let revisions = vec!["main".to_string()];
///
///
/// let tokenizers = initialize_tokenizers(&models, &revisions).await?;
/// Ok(())
/// }
Expand Down Expand Up @@ -174,13 +174,15 @@ async fn initialize_tokenizers(
}

#[tokio::main]
#[allow(clippy::too_many_lines)]
#[allow(clippy::redundant_pub_crate)]
async fn main() -> Result<()> {
let _log_guards = setup_logging(LOGS).context("Failed to setup logging")?;

dotenv().ok();

let args = Args::parse();
let config = Config::load(&args.config_path).await?;
let config = NodeConfig::load(&args.config_path)?;

info!("Starting Atoma node service");

Expand All @@ -203,13 +205,13 @@ async fn main() -> Result<()> {
config.sui.max_concurrent_requests(),
)?;
let address = wallet_ctx.active_address()?;
let address_index = args.address_index.unwrap_or(
let address_index = args.address_index.unwrap_or_else(|| {
wallet_ctx
.get_addresses()
.iter()
.position(|a| a == &address)
.unwrap(),
);
.unwrap()
});

info!(
target = "atoma-node-service",
Expand All @@ -232,14 +234,14 @@ async fn main() -> Result<()> {
shutdown_sender.clone(),
);

let (subscriber_confidential_compute_sender, _subscriber_confidential_compute_receiver) =
let (subscriber_confidential_compute_sender, subscriber_confidential_compute_receiver) =
tokio::sync::mpsc::unbounded_channel();
let (app_state_decryption_sender, _app_state_decryption_receiver) =
let (app_state_decryption_sender, app_state_decryption_receiver) =
tokio::sync::mpsc::unbounded_channel();
let (app_state_encryption_sender, _app_state_encryption_receiver) =
let (app_state_encryption_sender, app_state_encryption_receiver) =
tokio::sync::mpsc::unbounded_channel();

for (_, node_small_id) in config.daemon.node_badges.iter() {
for (_, node_small_id) in &config.daemon.node_badges {
if let Err(e) =
register_on_proxy(&config.proxy, *node_small_id, &keystore, address_index).await
{
Expand All @@ -259,19 +261,19 @@ async fn main() -> Result<()> {
);

let client = Arc::new(RwLock::new(
AtomaSuiClient::new_from_config(args.config_path).await?,
Client::new_from_config(args.config_path).await?,
));

let (compute_shared_secret_sender, _compute_shared_secret_receiver) =
let (compute_shared_secret_sender, compute_shared_secret_receiver) =
tokio::sync::mpsc::unbounded_channel();

let confidential_compute_service_handle = spawn_with_shutdown(
AtomaConfidentialComputeService::start_confidential_compute_service(
AtomaConfidentialCompute::start_confidential_compute_service(
client.clone(),
_subscriber_confidential_compute_receiver,
_app_state_decryption_receiver,
_app_state_encryption_receiver,
_compute_shared_secret_receiver,
subscriber_confidential_compute_receiver,
app_state_decryption_receiver,
app_state_encryption_receiver,
compute_shared_secret_receiver,
shutdown_receiver.clone(),
),
shutdown_sender.clone(),
Expand All @@ -286,7 +288,7 @@ async fn main() -> Result<()> {
"Spawning subscriber service"
);

let subscriber = SuiEventSubscriber::new(
let subscriber = Subscriber::new(
config.sui,
event_subscriber_sender,
stack_retrieve_receiver,
Expand Down Expand Up @@ -320,8 +322,8 @@ async fn main() -> Result<()> {
shutdown_sender.clone(),
);

let hf_token = std::env::var(HF_TOKEN)
.context(format!("Variable {} not set in the .env file", HF_TOKEN))?;
let hf_token =
std::env::var(HF_TOKEN).context(format!("Variable {HF_TOKEN} not set in the .env file"))?;
let tokenizers =
initialize_tokenizers(&config.service.models, &config.service.revisions, hf_token).await?;

Expand Down
35 changes: 29 additions & 6 deletions atoma-confidential/src/key_management.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use atoma_utils::encryption::{
decrypt_ciphertext, encrypt_plaintext, EncryptionError, NONCE_BYTE_SIZE,
};
use atoma_utils::encryption::{decrypt_ciphertext, encrypt_plaintext, Error, NONCE_BYTE_SIZE};
use thiserror::Error;
use x25519_dalek::{PublicKey, SharedSecret, StaticSecret};

Expand All @@ -21,6 +19,15 @@ pub struct X25519KeyPairManager {

impl X25519KeyPairManager {
/// Constructor
///
/// # Returns
/// A new `X25519KeyPairManager` instance if successful
///
/// # Errors
/// Returns an error if:
/// - Failed to initialize the key store
/// - Failed to load existing keys
/// - Failed to generate initial keys
#[allow(clippy::new_without_default)]
pub fn new() -> Result<Self> {
let mut rng = rand::thread_rng();
Expand All @@ -36,6 +43,7 @@ impl X25519KeyPairManager {
/// - Identity verification
///
/// The public key will change when `rotate_keys()` is called.
#[must_use]
pub fn get_public_key(&self) -> PublicKey {
PublicKey::from(&self.secret_key)
}
Expand Down Expand Up @@ -72,6 +80,7 @@ impl X25519KeyPairManager {
///
/// # Returns
/// - `SharedSecret` - The shared secret
#[must_use]
pub fn compute_shared_secret(&self, public_key: &PublicKey) -> SharedSecret {
self.secret_key.diffie_hellman(public_key)
}
Expand All @@ -91,7 +100,12 @@ impl X25519KeyPairManager {
///
/// # Returns
/// * `Ok(Vec<u8>)` - The decrypted plaintext as a byte vector
/// * `Err(KeyManagementError)` - If decryption fails
///
/// # Errors
/// Returns a `KeyManagementError` if:
/// - Decryption fails due to invalid key material
/// - The ciphertext is malformed
/// - The authentication tag is invalid
///
/// # Example
/// ```rust,ignore
Expand Down Expand Up @@ -132,7 +146,12 @@ impl X25519KeyPairManager {
/// * `Ok((Vec<u8>, [u8; NONCE_BYTE_SIZE]))` - A tuple containing:
/// - The encrypted ciphertext as a byte vector
/// - A randomly generated nonce used in the encryption
/// * `Err(KeyManagementError)` - If encryption fails
///
/// # Errors
/// Returns a `KeyManagementError` if:
/// - Encryption fails due to invalid key material
/// - Random number generation fails
/// - The plaintext is too large
///
/// # Example
/// ```rust,ignore
Expand All @@ -156,10 +175,14 @@ impl X25519KeyPairManager {
}
}

/// Errors that can occur during key management operations
#[derive(Debug, Error)]
pub enum KeyManagementError {
/// Error during encryption/decryption operations
#[error("Encryption error: `{0}`")]
EncryptionError(#[from] EncryptionError),
EncryptionError(#[from] Error),

/// Error during file I/O operations
#[error("IO error: `{0}`")]
IoError(#[from] std::io::Error),
}
Loading