Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
336 changes: 336 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[workspace]
resolver = "2"
members = ["dfs/metaserver", "dfs/chunkserver", "dfs/client"]
members = ["dfs/metaserver", "dfs/chunkserver", "dfs/client", "dfs/s3_server"]
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ WORKDIR /app
COPY --from=builder /app/target/release/master /app/master
COPY --from=builder /app/target/release/chunkserver /app/chunkserver
COPY --from=builder /app/target/release/dfs_cli /app/dfs_cli
COPY --from=builder /app/target/release/config_server /app/config_server
COPY --from=builder /app/target/release/s3-server /app/s3-server


# Create storage directory
RUN mkdir -p /data
Expand Down
34 changes: 18 additions & 16 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
---

### 19. S3 REST API Compatibility
**Status**: **Not Started**
**Status**: **In Progress**
**Priority**: Medium
**Effort**: Very Large

Expand All @@ -169,26 +169,26 @@
**Milestones**:

#### Milestone 1: Basic Operations & Spark Prerequisites
- [ ] Implement `s3_server` binary (using Axum)
- [ ] Implement Bucket operations (CreateBucket, DeleteBucket, ListBuckets, HeadBucket)
- [x] Implement `s3_server` binary (using Axum)
- [x] Implement Bucket operations (CreateBucket, DeleteBucket, ListBuckets, HeadBucket)
- *Note: Map Buckets to top-level directories*
- [ ] Implement Object operations (PutObject, GetObject, DeleteObject, HeadObject)
- [ ] Simple single-part upload/download
- [ ] Support for Directory Simulation (ListObjects with `prefix` and `delimiter`)
- [ ] Basic authentication (V4 Signature or dummy auth for dev)
- [x] Implement Object operations (PutObject, GetObject, DeleteObject, HeadObject)
- [x] Simple single-part upload/download
- [x] Support for Directory Simulation (ListObjects with `prefix` and `delimiter`)
- [x] Basic authentication (V4 Signature or dummy auth for dev)

#### Milestone 2: Multipart Upload & Rename Support (Crucial for Spark)
- [ ] Implement InitiateMultipartUpload
- [ ] Implement UploadPart (map to DFS blocks)
- [ ] Implement CompleteMultipartUpload
- [ ] Implement AbortMultipartUpload
- [ ] Implement CopyObject (required for `rename()` simulation)
- [x] Implement InitiateMultipartUpload
- [x] Implement UploadPart (map to DFS blocks)
- [x] Implement CompleteMultipartUpload
- [x] Implement AbortMultipartUpload
- [x] Implement CopyObject (required for `rename()` simulation)

#### Milestone 3: Advanced Features
- [ ] Support for Object Metadata (User-defined tags)
- [ ] Support for Range Requests (Partial content)
- [ ] Support for ListObjectsV2 (Pagination)
- [ ] Presigned URLs
- [x] Support for Object Metadata (User-defined tags)
- [x] Support for Range Requests (Partial content)
- [x] Support for ListObjectsV2 (Pagination)
- [x] Presigned URLs

---

Expand Down Expand Up @@ -249,6 +249,8 @@
- [ ] Log replication
- [ ] Vote counting
- [ ] Term updates
- [x] Develop integration test script (`test_scripts/s3_integration_test.py`) using AWS SDK
- [x] Verify full S3 compatibility with DataFrame APIs
- [ ] Add integration tests for network partitions
- [ ] Multi-node scenarios
- [ ] Network partition simulation
Expand Down
2 changes: 1 addition & 1 deletion dfs/client/src/bin/dfs_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ enum ClusterAction {
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cli = Cli::parse();

let master_addrs: Vec<String> = cli
Expand Down
123 changes: 115 additions & 8 deletions dfs/client/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ pub mod sharding;
use crate::dfs::chunk_server_service_client::ChunkServerServiceClient;
use crate::dfs::master_service_client::MasterServiceClient;
use crate::dfs::{
AllocateBlockRequest, CreateFileRequest, GetFileInfoRequest, ListFilesRequest,
ReadBlockRequest, RenameRequest, WriteBlockRequest,
AllocateBlockRequest, CreateFileRequest, DeleteFileRequest, GetFileInfoRequest,
ListFilesRequest, ReadBlockRequest, RenameRequest, WriteBlockRequest,
};
use crate::sharding::ShardMap;
use std::collections::HashMap;
Expand Down Expand Up @@ -70,7 +70,10 @@ impl Client {
url.to_string()
}

pub async fn list_files(&self, path: &str) -> Result<Vec<String>, Box<dyn std::error::Error>> {
pub async fn list_files(
&self,
path: &str,
) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {
let (response, _) = self
.execute_rpc(Some(path), |mut client| {
let path = path.to_string();
Expand All @@ -84,11 +87,85 @@ impl Client {
Ok(response.into_inner().files)
}

pub async fn list_all_files(
&self,
) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {
let (shards, default_masters) = {
let map = self.shard_map.read().unwrap();
(map.get_all_shards(), self.master_addrs.clone())
};

let mut all_files = std::collections::HashSet::new();

if shards.is_empty() {
// No shards configured, use default masters as single shard
let (response, _) = self
.execute_rpc_internal(
&default_masters,
self.max_retries,
self.initial_backoff_ms,
|mut client| async move {
let request = tonic::Request::new(ListFilesRequest {
path: "/".to_string(),
});
client.list_files(request).await
},
)
.await?;
for f in response.into_inner().files {
all_files.insert(f);
}
} else {
// Query each shard
for shard_id in shards {
let peers = {
let map = self.shard_map.read().unwrap();
map.get_shard_peers(&shard_id).unwrap_or_default()
};

if peers.is_empty() {
continue;
}

// We try to query the shard. If it fails, we log and continue (partial results better than crash?)
// Ideally we should fail if any shard is unreachable to be consistent.
// Let's fail if we can't get data from a shard.
let result = self
.execute_rpc_internal(
&peers,
self.max_retries,
self.initial_backoff_ms,
|mut client| async move {
let request = tonic::Request::new(ListFilesRequest {
path: "/".to_string(),
});
client.list_files(request).await
},
)
.await;

match result {
Ok((response, _)) => {
for f in response.into_inner().files {
all_files.insert(f);
}
}
Err(e) => {
eprintln!("Failed to list files from shard {}: {}", shard_id, e);
return Err(e);
}
}
}
}

Ok(all_files.into_iter().collect())
}

pub async fn create_file(
&self,
source: &Path,
dest: &str,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// 1. Create file on Master
let (create_resp, success_addr) = self
.execute_rpc(Some(dest), |mut client| {
Expand Down Expand Up @@ -194,7 +271,7 @@ impl Client {
&self,
source: &str,
dest: &Path,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// 1. Get file info from Master
let (info_resp, _) = self
.execute_rpc(Some(source), |mut client| {
Expand Down Expand Up @@ -253,11 +330,41 @@ impl Client {
Ok(())
}

pub async fn delete_file(
&self,
path: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let (delete_resp, _) = self
.execute_rpc(Some(path), |mut client| {
let path = path.to_string();
async move {
let delete_req = tonic::Request::new(DeleteFileRequest { path });
let response = client.delete_file(delete_req).await?;
let inner = response.get_ref();
if !inner.success && inner.error_message == "Not Leader" {
return Err(tonic::Status::unavailable(format!(
"Not Leader|{}",
inner.leader_hint
)));
}
Ok(response)
}
})
.await?;
let delete_resp = delete_resp.into_inner();

if !delete_resp.success {
return Err(format!("Failed to delete file: {}", delete_resp.error_message).into());
}

Ok(())
}

pub async fn rename_file(
&self,
source: &str,
dest: &str,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Use source path for routing
let (rename_resp, _) = self
.execute_rpc(Some(source), |mut client| {
Expand Down Expand Up @@ -300,7 +407,7 @@ impl Client {
&self,
key: Option<&str>,
f: F,
) -> Result<(T, String), Box<dyn std::error::Error>>
) -> Result<(T, String), Box<dyn std::error::Error + Send + Sync>>
where
F: Fn(MasterServiceClient<Channel>) -> Fut,
Fut: std::future::Future<Output = Result<T, tonic::Status>>,
Expand Down Expand Up @@ -334,7 +441,7 @@ impl Client {
max_retries: usize,
initial_backoff_ms: u64,
f: F,
) -> Result<(T, String), Box<dyn std::error::Error>>
) -> Result<(T, String), Box<dyn std::error::Error + Send + Sync>>
where
F: Fn(MasterServiceClient<Channel>) -> Fut,
Fut: std::future::Future<Output = Result<T, tonic::Status>>,
Expand Down
65 changes: 58 additions & 7 deletions dfs/metaserver/src/master.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use crate::dfs::{
AbortTransactionRequest, AbortTransactionResponse, AddRaftServerRequest, AddRaftServerResponse,
AllocateBlockRequest, AllocateBlockResponse, BlockInfo, ChunkServerCommand, ClusterMember,
CommitTransactionRequest, CommitTransactionResponse, CompleteFileRequest, CompleteFileResponse,
CreateFileRequest, CreateFileResponse, FileMetadata, GetBlockLocationsRequest,
GetBlockLocationsResponse, GetClusterInfoRequest, GetClusterInfoResponse, GetFileInfoRequest,
GetFileInfoResponse, GetSafeModeStatusRequest, GetSafeModeStatusResponse, HeartbeatRequest,
HeartbeatResponse, ListFilesRequest, ListFilesResponse, PrepareTransactionRequest,
PrepareTransactionResponse, RegisterChunkServerRequest, RegisterChunkServerResponse,
RemoveRaftServerRequest, RemoveRaftServerResponse, RenameRequest, RenameResponse,
SetSafeModeRequest, SetSafeModeResponse,
CreateFileRequest, CreateFileResponse, DeleteFileRequest, DeleteFileResponse, FileMetadata,
GetBlockLocationsRequest, GetBlockLocationsResponse, GetClusterInfoRequest,
GetClusterInfoResponse, GetFileInfoRequest, GetFileInfoResponse, GetSafeModeStatusRequest,
GetSafeModeStatusResponse, HeartbeatRequest, HeartbeatResponse, ListFilesRequest,
ListFilesResponse, PrepareTransactionRequest, PrepareTransactionResponse,
RegisterChunkServerRequest, RegisterChunkServerResponse, RemoveRaftServerRequest,
RemoveRaftServerResponse, RenameRequest, RenameResponse, SetSafeModeRequest,
SetSafeModeResponse,
};
use crate::simple_raft::{AppState, Command, Event, MasterCommand, MembershipCommand, Role};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -646,6 +647,56 @@ impl MasterService for MyMaster {
}
}

async fn delete_file(
&self,
request: Request<DeleteFileRequest>,
) -> Result<Response<DeleteFileResponse>, Status> {
let req = request.into_inner();
self.check_shard_ownership(&req.path)?;
self.check_safe_mode()?;

// Check if file exists
{
let state_lock = self.state.lock().unwrap();
if let AppState::Master(ref state) = *state_lock {
if !state.files.contains_key(&req.path) {
return Ok(Response::new(DeleteFileResponse {
success: false,
error_message: "File not found".to_string(),
leader_hint: "".to_string(),
}));
}
}
}

let (tx, rx) = tokio::sync::oneshot::channel();
if self
.raft_tx
.send(Event::ClientRequest {
command: Command::Master(MasterCommand::DeleteFile { path: req.path }),
reply_tx: tx,
})
.await
.is_err()
{
return Err(Status::internal("Raft channel closed"));
}

match rx.await {
Ok(Ok(())) => Ok(Response::new(DeleteFileResponse {
success: true,
error_message: "".to_string(),
leader_hint: "".to_string(),
})),
Ok(Err(leader_opt)) => Ok(Response::new(DeleteFileResponse {
success: false,
error_message: "Not Leader".to_string(),
leader_hint: leader_opt.unwrap_or_default(),
})),
Err(_) => Err(Status::internal("Raft response error")),
}
}

async fn allocate_block(
&self,
request: Request<AllocateBlockRequest>,
Expand Down
9 changes: 9 additions & 0 deletions dfs/metaserver/src/simple_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ pub enum MasterCommand {
CreateFile {
path: String,
},
DeleteFile {
path: String,
},

AllocateBlock {
path: String,
block_id: String,
Expand Down Expand Up @@ -1184,6 +1188,11 @@ impl RaftNode {
},
);
}
MasterCommand::DeleteFile { path } => {
if master_state.files.remove(path).is_some() {
println!("Deleted file {}", path);
}
}
MasterCommand::AllocateBlock {
path,
block_id,
Expand Down
24 changes: 24 additions & 0 deletions dfs/s3_server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "s3-server"
version = "0.1.0"
edition = "2021"

[dependencies]
axum = { version = "0.8", features = ["macros"] }
tokio = { version = "1.0", features = ["full"] }
tower = "0.5"
tower-http = { version = "0.6", features = ["trace", "cors", "normalize-path"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
quick-xml = { version = "0.38", features = ["serialize"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
anyhow = "1.0"
dfs-client = { path = "../client" }
bytes = "1.0"
chrono = "0.4"
headers = "0.4"
uuid = { version = "1.0", features = ["v4"] }
md-5 = "0.10"
hex = "0.4"
tempfile = "3"
Loading