diff --git a/src/lib.rs b/src/lib.rs index 8fad81e..34c4036 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,7 +7,7 @@ use chrono::{DateTime, Utc}; use clap::{Parser, Subcommand}; use futures_util::StreamExt; use indicatif::{ProgressBar, ProgressStyle}; -use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; +use percent_encoding::{utf8_percent_encode, AsciiSet, CONTROLS}; use reqwest::{Body, Client, StatusCode}; use serde::{Deserialize, Serialize}; use std::collections::HashSet; @@ -39,6 +39,25 @@ pub const MAX_RETRIES: u32 = 5; pub const INITIAL_RETRY_DELAY_MS: u64 = 1000; pub const MAX_RETRY_DELAY_MS: u64 = 10000; +// Define the query encoding set for URL parameters +// This encodes control characters, spaces, and characters that have special meaning in URLs +const QUERY_ENCODE_SET: &AsciiSet = &CONTROLS + .add(b' ') // Space + .add(b'"') // Quote + .add(b'#') // Hash (fragment identifier) + .add(b'<') // Less than + .add(b'>') // Greater than + .add(b'?') // Question mark (query separator) + .add(b'`') // Backtick + .add(b'{') // Left brace + .add(b'}') // Right brace + .add(b'|') // Pipe + .add(b'\\') // Backslash + .add(b'^') // Caret + .add(b'[') // Left bracket + .add(b']') // Right bracket + .add(b'%'); // Percent (to avoid double encoding) + // JWT Authentication structures #[derive(Serialize, Deserialize, Debug, Clone)] pub struct AuthTokens { @@ -1648,12 +1667,17 @@ async fn upload_file_with_auth( .header("Content-Length", file_size) .header("Content-Type", "application/octet-stream"); - // Add JWT auth header if available + // Add auth headers - JWT takes precedence if let Some(ref auth_tokens) = creds.auth_tokens { request = request.header( "Authorization", format!("Bearer {}", auth_tokens.access_token), ); + } else { + // Legacy auth via headers + request = request + .header("X-User-Id", &creds.user_id) + .header("X-User-App-Key", &creds.user_app_key); } let resp = request.body(body).send().await?; @@ -1903,7 +1927,7 @@ async fn priority_download_single_file( // Build URL without credentials (security fix) let url = format!( "{}/priorityDownload?file_name={}", - base_url, utf8_percent_encode(file_name_in_bucket, NON_ALPHANUMERIC) + base_url, utf8_percent_encode(file_name_in_bucket, QUERY_ENCODE_SET) ); // Add legacy auth headers @@ -1940,7 +1964,7 @@ async fn priority_download_single_file_with_auth( // Build URL without credentials (security fix) let url = format!( "{}/priorityDownload?file_name={}", - base_url, utf8_percent_encode(file_name_in_bucket, NON_ALPHANUMERIC) + base_url, utf8_percent_encode(file_name_in_bucket, QUERY_ENCODE_SET) ); let mut request = client.get(&url); @@ -4055,7 +4079,7 @@ pub async fn run_cli() -> Result<()> { let mut url = format!( "{}/{}?file_name={}&epochs={}", selected_endpoint, endpoint, - utf8_percent_encode(&file_name, NON_ALPHANUMERIC), + utf8_percent_encode(&file_name, QUERY_ENCODE_SET), epochs_final ); if let Some(tier_name) = tier { @@ -5274,7 +5298,7 @@ pub async fn run_cli() -> Result<()> { let mut url = format!("{}/{}?file_name={}", selected_endpoint, endpoint, - utf8_percent_encode(&rel_path, NON_ALPHANUMERIC)); + utf8_percent_encode(&rel_path, QUERY_ENCODE_SET)); if let Some(tier_name) = &tier_clone { url = format!("{}&tier={}", url, tier_name); } @@ -5617,7 +5641,7 @@ pub async fn run_cli() -> Result<()> { // Build URL without credentials (security fix) let url = format!( "{}/priorityUpload?file_name={}", - selected_endpoint, utf8_percent_encode(&rel_path, NON_ALPHANUMERIC) + selected_endpoint, utf8_percent_encode(&rel_path, QUERY_ENCODE_SET) ); // Use retry wrapper for priority directory uploads @@ -5882,7 +5906,7 @@ pub async fn run_cli() -> Result<()> { // Build URL without credentials (security fix) let url = format!( "{}/priorityUpload?file_name={}&epochs={}", - base_url, utf8_percent_encode(&file_name, NON_ALPHANUMERIC), epochs_final + base_url, utf8_percent_encode(&file_name, QUERY_ENCODE_SET), epochs_final ); // Use retry wrapper for priority single file upload @@ -6183,10 +6207,13 @@ pub async fn run_cli() -> Result<()> { parallel, } => { // Load credentials - let creds = load_credentials_from_file(config_path)?.ok_or_else(|| { + let mut creds = load_credentials_from_file(config_path)?.ok_or_else(|| { anyhow!("No credentials found. Please create a user or login first.") })?; + // Ensure we have valid JWT token if available + ensure_valid_token(&client, base_url, &mut creds, config_path).await?; + // Parse conflict strategy let conflict_strategy = sync::ConflictStrategy::from_str(&conflict) .ok_or_else(|| anyhow!("Invalid conflict strategy: {}", conflict))?; diff --git a/src/sync.rs b/src/sync.rs index 76968f5..8c3b5a3 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,6 +1,6 @@ use std::path::{Path, PathBuf}; use std::collections::HashMap; -use std::time::SystemTime; +use std::time::{SystemTime, Duration, Instant}; use std::sync::{Arc, atomic::{AtomicU64, Ordering}}; use serde::{Deserialize, Serialize}; use anyhow::Result; @@ -8,10 +8,68 @@ use reqwest::Client; use chrono::{DateTime, Utc}; use tokio::fs; use tokio::io::AsyncReadExt; +use tokio::sync::{mpsc, Mutex, Semaphore}; use indicatif::{ProgressBar, ProgressStyle, MultiProgress}; use blake3; use crate::{SavedCredentials, upload_file_with_auth, improved_download_file_with_auth}; +use percent_encoding::{utf8_percent_encode, AsciiSet, CONTROLS}; + +// Same encoding set as in lib.rs +const QUERY_ENCODE_SET: &AsciiSet = &CONTROLS + .add(b' ') // Space + .add(b'"') // Quote + .add(b'#') // Hash (fragment identifier) + .add(b'<') // Less than + .add(b'>') // Greater than + .add(b'?') // Question mark (query separator) + .add(b'`') // Backtick + .add(b'{') // Left brace + .add(b'}') // Right brace + .add(b'|') // Pipe + .add(b'\\') // Backslash + .add(b'^') // Caret + .add(b'[') // Left bracket + .add(b']') // Right bracket + .add(b'%'); // Percent (to avoid double encoding) + +/// File queued for processing +#[derive(Debug, Clone)] +struct FileToProcess { + path: PathBuf, + relative_path: String, + size: u64, + modified: DateTime, +} + +/// File ready for upload +#[derive(Debug, Clone)] +struct FileToUpload { + local_path: PathBuf, + relative_path: String, + size: u64, + _hash: String, + _modified: DateTime, +} + +/// Progress tracking for streaming sync +struct StreamingProgress { + // Counters + files_discovered: AtomicU64, + files_hashed: AtomicU64, + files_uploaded: AtomicU64, + bytes_discovered: AtomicU64, + bytes_hashed: AtomicU64, + bytes_uploaded: AtomicU64, + + // Progress bars + progress_bar: ProgressBar, + + // State management + partial_state: Arc>>, + state_path: PathBuf, + last_save: Arc>, +} /// Represents a file's sync state #[derive(Debug, Clone, Serialize, Deserialize)] @@ -130,6 +188,7 @@ fn format_file_size(size: u64) -> String { } /// Main sync context +#[derive(Clone)] pub struct SyncContext { pub client: Client, pub base_url: String, @@ -402,6 +461,420 @@ async fn calculate_file_hash(path: &Path) -> Result { Ok(hasher.finalize().to_hex().to_string()) } +/// File scanner that discovers files and sends them for processing +async fn scan_files_streaming( + base_path: &Path, + file_tx: mpsc::Sender, + existing_state: &HashMap, + progress: Arc, +) -> Result<()> { + scan_files_recursive(base_path, base_path, &file_tx, existing_state, &progress).await?; + Ok(()) +} + +async fn scan_files_recursive( + base_path: &Path, + current_path: &Path, + file_tx: &mpsc::Sender, + existing_state: &HashMap, + progress: &Arc, +) -> Result<()> { + let mut entries = fs::read_dir(current_path).await?; + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + let metadata = entry.metadata().await?; + + if metadata.is_dir() { + // Recurse into subdirectory + Box::pin(scan_files_recursive(base_path, &path, file_tx, existing_state, progress)).await?; + } else if metadata.is_file() { + // Skip .pipe-sync files + if path.file_name() + .and_then(|n| n.to_str()) + .map(|n| n.starts_with(".pipe-sync")) + .unwrap_or(false) + { + continue; + } + + let file_size = metadata.len(); + let modified = metadata.modified()? + .duration_since(SystemTime::UNIX_EPOCH)? + .as_secs(); + let modified_dt = DateTime::from_timestamp(modified as i64, 0) + .unwrap_or_else(|| Utc::now()); + + // Get relative path + let relative_path = path.strip_prefix(base_path)? + .to_string_lossy() + .replace('\\', "/"); + + // Check if file needs processing + let needs_processing = if let Some(existing) = existing_state.get(&relative_path) { + // Check if file has changed + existing.size != file_size || existing.modified != modified_dt + } else { + // New file + true + }; + + if needs_processing { + // Send file for processing + let file = FileToProcess { + path: path.clone(), + relative_path, + size: file_size, + modified: modified_dt, + }; + + if file_tx.send(file).await.is_err() { + // Channel closed, stop scanning + break; + } + + // Update progress + progress.files_discovered.fetch_add(1, Ordering::Relaxed); + progress.bytes_discovered.fetch_add(file_size, Ordering::Relaxed); + } + } + } + + Ok(()) +} + +/// Hash pipeline that calculates hashes and determines which files need upload +async fn hash_pipeline( + mut file_rx: mpsc::Receiver, + upload_tx: mpsc::Sender, + progress: Arc, + existing_state: &HashMap, +) -> Result<()> { + while let Some(file) = file_rx.recv().await { + // Update progress bar with current file being hashed + let _current_msg = format!("Hashing: {} ({})", + file.relative_path, + format_file_size(file.size) + ); + + // Calculate hash + let hash = match calculate_file_hash(&file.path).await { + Ok(h) => h, + Err(e) => { + eprintln!("Failed to hash {}: {}", file.relative_path, e); + continue; + } + }; + + // Check if file needs upload + let needs_upload = if let Some(existing) = existing_state.get(&file.relative_path) { + // File exists in state - check if it changed + existing.size != file.size || + existing.modified != file.modified || + existing.hash.as_ref() != Some(&hash) + } else { + // New file - always upload + true + }; + + // Update partial state + { + let mut state = progress.partial_state.lock().await; + state.insert(file.relative_path.clone(), FileState { + path: file.relative_path.clone(), + size: file.size, + modified: file.modified, + hash: Some(hash.clone()), + last_synced: None, + sync_version: 0, + remote_modified: None, + }); + } + + // Update progress + progress.files_hashed.fetch_add(1, Ordering::Relaxed); + progress.bytes_hashed.fetch_add(file.size, Ordering::Relaxed); + + if needs_upload { + let upload_file = FileToUpload { + local_path: file.path, + relative_path: file.relative_path, + size: file.size, + _hash: hash, + _modified: file.modified, + }; + + if upload_tx.send(upload_file).await.is_err() { + // Upload channel closed + break; + } + } + + // Save state periodically + let should_save = { + let last_save = progress.last_save.lock().await; + last_save.elapsed() > Duration::from_secs(30) + }; + + if should_save { + save_partial_state(&progress).await?; + } + } + + Ok(()) +} + +/// Save partial state to disk +async fn save_partial_state(progress: &Arc) -> Result<()> { + let state = progress.partial_state.lock().await; + let sync_state = SyncState { + last_sync: None, + files: state.clone(), + }; + + // Save to partial state file + let partial_path = progress.state_path.with_extension("partial"); + sync_state.save(&partial_path).await?; + + // Update last save time + *progress.last_save.lock().await = Instant::now(); + + Ok(()) +} + +/// Upload pipeline with concurrent workers +async fn upload_pipeline( + upload_rx: mpsc::Receiver, + ctx: &SyncContext, + progress: Arc, + workers: usize, +) -> Result<()> { + let semaphore = Arc::new(Semaphore::new(workers)); + let upload_rx = Arc::new(Mutex::new(upload_rx)); + + // Spawn upload workers + let mut handles = vec![]; + + for _worker_id in 0..workers { + let rx = upload_rx.clone(); + let sem = semaphore.clone(); + let progress = progress.clone(); + let client = ctx.client.clone(); + let base_url = ctx.base_url.clone(); + let creds = ctx.creds.clone(); + let remote_path = ctx.remote_path.clone(); + + let handle = tokio::spawn(async move { + loop { + // Get next file to upload + let file = { + let mut rx_guard = rx.lock().await; + rx_guard.recv().await + }; + + let Some(file) = file else { + // Channel closed, worker done + break; + }; + + // Acquire semaphore permit + let _permit = sem.acquire().await.unwrap(); + + // Update progress - for now just use the main progress display + + // Build remote path + let remote_file_path = if remote_path.is_empty() { + file.relative_path.clone() + } else { + format!("{}/{}", remote_path, file.relative_path) + }; + + // Upload file - use priorityUpload endpoint for sync + let full_url = format!("{}/priorityUpload?file_name={}&tier=enterprise", + base_url, + utf8_percent_encode(&remote_file_path, QUERY_ENCODE_SET) + ); + match upload_file_with_auth( + &client, + &file.local_path, + &full_url, + &remote_file_path, + &creds, + ).await { + Ok(_) => { + // Update progress + progress.files_uploaded.fetch_add(1, Ordering::Relaxed); + progress.bytes_uploaded.fetch_add(file.size, Ordering::Relaxed); + + // Update partial state with upload success + { + let mut state = progress.partial_state.lock().await; + if let Some(file_state) = state.get_mut(&file.relative_path) { + file_state.last_synced = Some(Utc::now()); + file_state.sync_version += 1; + } + } + } + Err(e) => { + eprintln!("Failed to upload {}: {}", file.relative_path, e); + } + } + } + + Ok::<(), anyhow::Error>(()) + }); + + handles.push(handle); + } + + // Wait for all workers to complete + for handle in handles { + handle.await??; + } + + Ok(()) +} + +/// Create streaming progress tracker +fn create_streaming_progress( + state_path: PathBuf, + existing_state: HashMap, +) -> Arc { + // Create a single progress bar + let progress_bar = ProgressBar::new_spinner(); + progress_bar.set_style( + ProgressStyle::default_spinner() + .template("{spinner:.green} {msg}") + .unwrap() + ); + progress_bar.enable_steady_tick(Duration::from_millis(100)); + + Arc::new(StreamingProgress { + files_discovered: AtomicU64::new(0), + files_hashed: AtomicU64::new(0), + files_uploaded: AtomicU64::new(0), + bytes_discovered: AtomicU64::new(0), + bytes_hashed: AtomicU64::new(0), + bytes_uploaded: AtomicU64::new(0), + progress_bar, + partial_state: Arc::new(Mutex::new(existing_state)), + state_path, + last_save: Arc::new(Mutex::new(Instant::now())), + }) +} + +/// Update overall progress display +async fn update_progress_display(progress: &Arc) { + let files_discovered = progress.files_discovered.load(Ordering::Relaxed); + let files_hashed = progress.files_hashed.load(Ordering::Relaxed); + let files_uploaded = progress.files_uploaded.load(Ordering::Relaxed); + + let bytes_discovered = progress.bytes_discovered.load(Ordering::Relaxed); + let bytes_hashed = progress.bytes_hashed.load(Ordering::Relaxed); + let bytes_uploaded = progress.bytes_uploaded.load(Ordering::Relaxed); + + let msg = format!( + "Files: {}/{} discovered, {}/{} hashed, {} uploaded | Data: {}/{} discovered, {}/{} hashed, {} uploaded", + files_hashed, files_discovered, + files_hashed, files_discovered, + files_uploaded, + format_file_size(bytes_hashed), format_file_size(bytes_discovered), + format_file_size(bytes_hashed), format_file_size(bytes_discovered), + format_file_size(bytes_uploaded) + ); + + progress.progress_bar.set_message(msg); +} + +/// Execute sync operations using streaming pipeline +async fn execute_streaming_sync( + ctx: &SyncContext, + _local_files: HashMap, +) -> Result<()> { + println!("\nšŸš€ Starting streaming sync..."); + + // Create channels for the pipeline + let (file_tx, file_rx) = mpsc::channel::(1000); + let (upload_tx, upload_rx) = mpsc::channel::(100); + + // Create progress tracking + let progress = create_streaming_progress( + ctx.state.files.is_empty().then(|| ctx.local_path.join(".pipe-sync.partial")) + .unwrap_or_else(|| ctx.local_path.join(".pipe-sync")), + ctx.state.files.clone(), + ); + + // Spawn progress updater + let progress_updater = { + let progress = progress.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_millis(500)); + loop { + interval.tick().await; + update_progress_display(&progress).await; + } + }) + }; + + // Spawn file scanner + let scanner = { + let base_path = ctx.local_path.clone(); + let existing_state = ctx.state.files.clone(); + let progress = progress.clone(); + + tokio::spawn(async move { + scan_files_streaming(&base_path, file_tx, &existing_state, progress).await + }) + }; + + // Spawn hash pipeline + let hasher = { + let existing_state = ctx.state.files.clone(); + let progress = progress.clone(); + + tokio::spawn(async move { + hash_pipeline(file_rx, upload_tx, progress, &existing_state).await + }) + }; + + // Spawn upload pipeline + let uploader = { + let ctx_clone = ctx.clone(); + let progress = progress.clone(); + + tokio::spawn(async move { + upload_pipeline(upload_rx, &ctx_clone, progress, 3).await + }) + }; + + // Wait for scanner to complete + scanner.await??; + + // Wait for hasher to complete + hasher.await??; + + // Wait for uploader to complete + uploader.await??; + + // Stop progress updater + progress_updater.abort(); + + // Clear progress bar + progress.progress_bar.finish_and_clear(); + + // Final save of state + save_partial_state(&progress).await?; + + // Show final stats + let files_uploaded = progress.files_uploaded.load(Ordering::Relaxed); + let bytes_uploaded = progress.bytes_uploaded.load(Ordering::Relaxed); + + println!("\nāœ… Streaming sync complete!"); + println!(" Uploaded {} files ({})", files_uploaded, format_file_size(bytes_uploaded)); + + Ok(()) +} + /// List remote files for a user pub async fn list_remote_files( client: &Client, @@ -776,8 +1249,11 @@ async fn execute_upload( ); pb.set_message(format!("ā¬†ļø {}", local_path.display())); - // Upload the file - let full_url = format!("{}/uploadFile", ctx.base_url); + // Upload the file - use priorityUpload endpoint + let full_url = format!("{}/priorityUpload?file_name={}&tier=enterprise", + ctx.base_url, + utf8_percent_encode(&remote_path, QUERY_ENCODE_SET) + ); let result = upload_file_with_auth( &ctx.client, &full_path, @@ -922,115 +1398,38 @@ pub async fn sync_command( fs::create_dir_all(&local_path).await?; } - let local_files = if local_path.exists() { - // Create progress bar for file scanning - let pb = ProgressBar::new_spinner(); - pb.set_style( - ProgressStyle::default_bar() - .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({percent}%) | ETA: {eta_precise} | {msg}") - .unwrap() - .progress_chars("#>-") - ); - - // Path for partial state file - let partial_state_path = local_path.join(".pipe-sync.partial"); - - let files = list_local_files_with_progress(&local_path, &pb, &partial_state_path).await?; - let total_size: u64 = files.values().map(|f| f.size).sum(); - pb.finish_with_message(format!("āœ“ Scanned {} files ({})", files.len(), format_file_size(total_size))); - - // Clean up partial state file on success - if partial_state_path.exists() { - if let Err(e) = fs::remove_file(&partial_state_path).await { - eprintln!("Warning: Failed to remove partial state file: {}", e); - } - } - - files - } else { - HashMap::new() - }; - - if local_files.is_empty() && local_path.exists() { - println!(" No files found in directory"); + // Check if local path exists for upload sync + if is_upload && !local_path.exists() { + return Err(anyhow::anyhow!("Local path does not exist: {}", local_path.display())); } - // For now, assume no remote files until we have a proper list API - let remote_files = HashMap::new(); - println!(" Remote file listing not yet implemented - will upload all files"); - - // Compare and determine operations - let operations = if state.files.is_empty() { - // First sync - use simple comparison - println!(" First sync detected - using simple comparison"); - compare_files(&local_files, &remote_files, conflict_strategy) - } else { - // Incremental sync - use state-aware comparison - println!(" Using incremental sync (last sync: {})", - state.last_sync - .map(|dt| dt.format("%Y-%m-%d %H:%M:%S UTC").to_string()) - .unwrap_or_else(|| "never".to_string()) - ); - compare_files_with_state(&local_files, &remote_files, &state, conflict_strategy) - }; - - // Print summary - print_sync_summary(&operations); - - if operations.is_empty() { - return Ok(()); - } - - // Ask for confirmation if not dry run - if !dry_run { - print!("\nProceed with sync? [y/N] "); - std::io::Write::flush(&mut std::io::stdout())?; - - let mut input = String::new(); - std::io::stdin().read_line(&mut input)?; - - if !input.trim().eq_ignore_ascii_case("y") { - println!("Sync cancelled."); - return Ok(()); - } + // For now, we only support upload sync + if !is_upload { + return Err(anyhow::anyhow!("Download sync not yet implemented")); } - // Execute operations + // Execute streaming sync if !dry_run { - execute_sync(&ctx, operations).await?; + // Use streaming sync for upload + execute_streaming_sync(&ctx, HashMap::new()).await?; - // Update sync state with current file states - state.last_sync = Some(Utc::now()); - - // Build new state from successfully synced files - let mut new_files = HashMap::new(); - - // Add all files that were successfully synced - for (path, local_state) in &local_files { - // Update with current state including sync time - let mut updated_state = local_state.clone(); - updated_state.last_synced = Some(Utc::now()); - updated_state.sync_version += 1; - new_files.insert(path.clone(), updated_state); - } - - // For downloads, update with remote file states - for (path, remote_state) in &remote_files { - if local_files.contains_key(path) { - // Already handled above - continue; - } - let mut updated_state = remote_state.clone(); - updated_state.last_synced = Some(Utc::now()); - updated_state.sync_version += 1; - new_files.insert(path.clone(), updated_state); + // Load the final state from partial state + let partial_state_path = local_path.join(".pipe-sync.partial"); + if partial_state_path.exists() { + // Move partial state to final state + let final_state = SyncState::load(&partial_state_path).await?; + state.files = final_state.files; + state.last_sync = Some(Utc::now()); + state.save(&state_path).await?; + + // Clean up partial state + let _ = fs::remove_file(&partial_state_path).await; } - state.files = new_files; - state.save(&state_path).await?; - println!("\nāœ… Sync completed successfully!"); println!("šŸ“ Sync state saved to: {}", state_path.display()); + } else { + println!("Dry run mode not yet implemented for streaming sync"); } Ok(())