From 82a5e44876c4474928c055bcbe2eb6616f122b7d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 17 Jan 2026 07:20:57 +0000 Subject: [PATCH 1/3] Initial plan From e9410c4623af502ef36a627c52103745982cb8d5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 17 Jan 2026 07:38:27 +0000 Subject: [PATCH 2/3] Add Aliyun CDN and OSS EventBridge integration - Add aliyun module with CDN API client and V3 signature - Add OSS EventBridge webhook handler for automatic CDN refresh - Add CDN API endpoints: describeRefreshTasks, refreshObjectCaches - Update configuration, state, auth, and routing for aliyun support - Add demo.jsonl with sample OSS events - Add .vscode/launch.json for debugging - All builds and clippy checks pass Co-authored-by: daflyinbed <21363956+daflyinbed@users.noreply.github.com> --- .vscode/launch.json | 51 ++++++ Cargo.lock | 12 ++ Cargo.toml | 5 + demo.jsonl | 3 + example.toml | 10 ++ src/aliyun/cdn.rs | 242 ++++++++++++++++++++++++++++ src/aliyun/mod.rs | 2 + src/aliyun/signature.rs | 112 +++++++++++++ src/auth.rs | 7 +- src/config.rs | 13 ++ src/lib.rs | 1 + src/routes/aliyun_handlers.rs | 287 ++++++++++++++++++++++++++++++++++ src/routes/mod.rs | 33 +++- src/state.rs | 4 +- src/tracing.rs | 2 +- 15 files changed, 777 insertions(+), 7 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 demo.jsonl create mode 100644 src/aliyun/cdn.rs create mode 100644 src/aliyun/mod.rs create mode 100644 src/aliyun/signature.rs create mode 100644 src/routes/aliyun_handlers.rs diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..849c938 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,51 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "type": "lldb", + "request": "launch", + "name": "Debug janus server", + "cargo": { + "args": [ + "build", + "--bin=janus", + "--package=janus" + ], + "filter": { + "name": "janus", + "kind": "bin" + } + }, + "args": [ + "server", + "--config", + "config.toml" + ], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug generate-jwt", + "cargo": { + "args": [ + "build", + "--bin=janus", + "--package=janus" + ], + "filter": { + "name": "janus", + "kind": "bin" + } + }, + "args": [ + "generate-jwt", + "--config", + "config.toml", + "--subject", + "test_user" + ], + "cwd": "${workspaceFolder}" + } + ] +} diff --git a/Cargo.lock b/Cargo.lock index e401ff9..fc8cf67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1460,10 +1460,12 @@ dependencies = [ "anyhow", "async-trait", "axum", + "base64", "bytes", "chrono", "clap", "futures", + "hmac", "jsonwebtoken", "mimalloc", "rand 0.8.5", @@ -1472,6 +1474,7 @@ dependencies = [ "serde", "serde_json", "serde_variant", + "sha2", "sqlx", "thiserror", "tokio", @@ -1480,9 +1483,11 @@ dependencies = [ "tower-http", "tracing", "tracing-subscriber", + "urlencoding", "utoipa", "utoipa-axum", "utoipa-scalar", + "uuid", ] [[package]] @@ -3604,6 +3609,12 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf-8" version = "0.7.6" @@ -3677,6 +3688,7 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" dependencies = [ + "getrandom 0.3.4", "js-sys", "serde_core", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index 17277bd..4626a29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,3 +66,8 @@ serde_variant = "0.1.3" reqwest = { version = "0.12.28", features = ["json", "multipart"] } rand = "0.8" jsonwebtoken = "9.3" +sha2 = "0.10" +hmac = "0.12" +base64 = "0.22" +urlencoding = "2.1" +uuid = { version = "1.0", features = ["v4"] } diff --git a/demo.jsonl b/demo.jsonl new file mode 100644 index 0000000..7c3e2dc --- /dev/null +++ b/demo.jsonl @@ -0,0 +1,3 @@ +{"datacontenttype":"application/json;charset=utf-8","aliyunaccountid":"164901546557****","data":{"region":"cn-beijing","eventVersion":"1.0","eventSource":"acs:oss","eventName":"ObjectCreated:PutObject","eventTime":"2021-08-13T06:45:43.000Z","requestParameters":{"sourceIPAddress":"118.31.XX.XX"},"userIdentity":{"principalId":"28815334868278****"},"responseElements":{"requestId":"61161517B258223732BC****"},"oss":{"bucket":{"name":"oss-source-bucket1-cn-beijing","arn":"acs:oss:cn-beijing:164901546557****:oss-source-bucket1-cn-beijing","ownerIdentity":"164901546557****"},"ossSchemaVersion":"1.0","object":{"size":9,"deltaSize":9,"eTag":"F0F18C2C66AE1DD512BDCD4366F7****","key":"objectname"}}},"subject":"acs:oss:cn-beijing:164901546557****:oss-source-bucket1-cn-beijing/1628837143916","aliyunoriginalaccountid":"164901546557****","source":"acs.oss","type":"oss:ObjectCreated:PutObject","aliyunpublishtime":"2021-08-13T06:45:43.986Z","specversion":"1.0","aliyuneventbusname":"default","id":"61161517B258223732BC****","time":"2021-08-13T06:45:43Z","aliyunregionid":"cn-beijing"} +{"datacontenttype":"application/json;charset=utf-8","aliyunaccountid":"164901546557****","data":{"region":"cn-beijing","eventVersion":"1.0","eventSource":"acs:oss","eventName":"ObjectCreated:CompleteMultipartUpload","eventTime":"2021-08-13T06:45:43.000Z","requestParameters":{"sourceIPAddress":"118.31.XX.XX"},"userIdentity":{"principalId":"28815334868278****"},"responseElements":{"requestId":"61161517B258223732BC****"},"oss":{"bucket":{"name":"oss-source-bucket1-cn-beijing","arn":"acs:oss:cn-beijing:164901546557****:oss-source-bucket1-cn-beijing","ownerIdentity":"164901546557****"},"ossSchemaVersion":"1.0","object":{"size":9,"deltaSize":9,"eTag":"F0F18C2C66AE1DD512BDCD4366F7****","key":"objectname"}}},"subject":"acs:oss:cn-beijing:164901546557****:oss-source-bucket1-cn-beijing/1628837143916","aliyunoriginalaccountid":"164901546557****","source":"acs.oss","type":"oss:ObjectCreated:CompleteMultipartUpload","aliyunpublishtime":"2021-08-13T06:45:43.986Z","specversion":"1.0","aliyuneventbusname":"default","id":"61161517B258223732BC****","time":"2021-08-13T06:45:43Z","aliyunregionid":"cn-beijing"} +{"datacontenttype":"application/json;charset=utf-8","aliyunaccountid":"164901546557****","data":{"region":"cn-beijing","eventVersion":"1.0","eventSource":"acs:oss","eventName":"ObjectRemoved:DeleteObject","eventTime":"2021-08-13T06:45:43.000Z","requestParameters":{"sourceIPAddress":"118.31.XX.XX"},"userIdentity":{"principalId":"28815334868278****"},"responseElements":{"requestId":"61161517B258223732BC****"},"oss":{"bucket":{"name":"oss-source-bucket1-cn-beijing","arn":"acs:oss:cn-beijing:164901546557****:oss-source-bucket1-cn-beijing","ownerIdentity":"164901546557****"},"ossSchemaVersion":"1.0","object":{"key":"test/file.txt"}}},"subject":"acs:oss:cn-beijing:164901546557****:oss-source-bucket1-cn-beijing/test/file.txt","aliyunoriginalaccountid":"164901546557****","source":"acs.oss","type":"oss:ObjectRemoved:DeleteObject","aliyunpublishtime":"2021-08-13T06:45:43.986Z","specversion":"1.0","aliyuneventbusname":"default","id":"61161517B258223732BC****","time":"2021-08-13T06:45:43Z","aliyunregionid":"cn-beijing"} diff --git a/example.toml b/example.toml index c7d0a74..05d5727 100644 --- a/example.toml +++ b/example.toml @@ -43,6 +43,16 @@ public_key = """-----BEGIN PUBLIC KEY----- YOUR_PUBLIC_KEY_HERE -----END PUBLIC KEY-----""" +# Aliyun Configuration (optional, for CDN and OSS event handling) +# [aliyun] +# access_key_id = "your_aliyun_access_key_id" +# access_key_secret = "your_aliyun_access_key_secret" +# # Mapping from OSS bucket names to CDN URL templates +# # The {object_key} placeholder will be replaced with the percent-encoded object key +# [aliyun.bucket_url_map] +# "my-oss-bucket" = "https://cdn.example.com/{object_key}" +# "another-bucket" = "https://static.example.com/{object_key}" + # [sentry] # dsn = "" # traces_sample_rate = 1.0 diff --git a/src/aliyun/cdn.rs b/src/aliyun/cdn.rs new file mode 100644 index 0000000..c544b60 --- /dev/null +++ b/src/aliyun/cdn.rs @@ -0,0 +1,242 @@ +use crate::aliyun::signature::AliyunSigner; +use anyhow::{Context, Result}; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; + +const CDN_ENDPOINT: &str = "https://cdn.aliyuncs.com"; +const API_VERSION: &str = "2018-05-10"; + +/// Aliyun CDN API client +pub struct AliyunCdnClient { + signer: AliyunSigner, + http_client: reqwest::Client, +} + +impl AliyunCdnClient { + /// Create a new CDN client + pub fn new( + access_key_id: String, + access_key_secret: String, + http_client: reqwest::Client, + ) -> Self { + Self { + signer: AliyunSigner::new(access_key_id, access_key_secret), + http_client, + } + } + + /// Query CDN refresh tasks + /// + /// API documentation: https://help.aliyun.com/zh/cdn/developer-reference/api-cdn-2018-05-10-describerefreshtasks + pub async fn describe_refresh_tasks( + &self, + request: &DescribeRefreshTasksRequest, + ) -> Result { + let mut params = BTreeMap::new(); + + // Required parameters + params.insert("Action".to_string(), "DescribeRefreshTasks".to_string()); + params.insert("Version".to_string(), API_VERSION.to_string()); + params.insert("Format".to_string(), "JSON".to_string()); + params.insert( + "AccessKeyId".to_string(), + self.signer.access_key_id().to_string(), + ); + params.insert("SignatureMethod".to_string(), "HMAC-SHA256".to_string()); + params.insert("SignatureVersion".to_string(), "1.0".to_string()); + params.insert( + "SignatureNonce".to_string(), + uuid::Uuid::new_v4().to_string(), + ); + params.insert( + "Timestamp".to_string(), + chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(), + ); + + // Optional parameters + if let Some(domain_name) = &request.domain_name { + params.insert("DomainName".to_string(), domain_name.clone()); + } + if let Some(task_id) = &request.task_id { + params.insert("TaskId".to_string(), task_id.clone()); + } + if let Some(object_path) = &request.object_path { + params.insert("ObjectPath".to_string(), object_path.clone()); + } + if let Some(page_number) = request.page_number { + params.insert("PageNumber".to_string(), page_number.to_string()); + } + if let Some(page_size) = request.page_size { + params.insert("PageSize".to_string(), page_size.to_string()); + } + if let Some(object_type) = &request.object_type { + params.insert("ObjectType".to_string(), object_type.clone()); + } + if let Some(status) = &request.status { + params.insert("Status".to_string(), status.clone()); + } + if let Some(start_time) = &request.start_time { + params.insert("StartTime".to_string(), start_time.clone()); + } + if let Some(end_time) = &request.end_time { + params.insert("EndTime".to_string(), end_time.clone()); + } + + // Generate signature + let (query_string, signature) = self.signer.sign("GET", "/", ¶ms); + + // Build final URL (parameters are already percent-encoded in query_string) + let url = format!( + "{}/?{}&Signature={}", + CDN_ENDPOINT, + query_string, + urlencoding::encode(&signature) + ); + + // Make request + let response = self + .http_client + .get(&url) + .send() + .await + .context("Failed to send request to Aliyun CDN API")?; + + let status = response.status(); + let body = response + .text() + .await + .context("Failed to read response body")?; + + if !status.is_success() { + anyhow::bail!("Aliyun CDN API error ({}): {}", status, body); + } + + serde_json::from_str(&body).context("Failed to parse Aliyun CDN API response") + } + + /// Refresh CDN object caches + /// + /// API documentation: https://help.aliyun.com/zh/cdn/developer-reference/api-cdn-2018-05-10-refreshobjectcaches + pub async fn refresh_object_caches( + &self, + request: &RefreshObjectCachesRequest, + ) -> Result { + let mut params = BTreeMap::new(); + + // Required parameters + params.insert("Action".to_string(), "RefreshObjectCaches".to_string()); + params.insert("Version".to_string(), API_VERSION.to_string()); + params.insert("Format".to_string(), "JSON".to_string()); + params.insert( + "AccessKeyId".to_string(), + self.signer.access_key_id().to_string(), + ); + params.insert("SignatureMethod".to_string(), "HMAC-SHA256".to_string()); + params.insert("SignatureVersion".to_string(), "1.0".to_string()); + params.insert( + "SignatureNonce".to_string(), + uuid::Uuid::new_v4().to_string(), + ); + params.insert( + "Timestamp".to_string(), + chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(), + ); + params.insert("ObjectPath".to_string(), request.object_path.clone()); + + // Optional parameters + if let Some(object_type) = &request.object_type { + params.insert("ObjectType".to_string(), object_type.clone()); + } + if let Some(area) = &request.area { + params.insert("Area".to_string(), area.clone()); + } + + // Generate signature + let (query_string, signature) = self.signer.sign("GET", "/", ¶ms); + + // Build final URL (parameters are already percent-encoded in query_string) + let url = format!( + "{}/?{}&Signature={}", + CDN_ENDPOINT, + query_string, + urlencoding::encode(&signature) + ); + + // Make request + let response = self + .http_client + .get(&url) + .send() + .await + .context("Failed to send request to Aliyun CDN API")?; + + let status = response.status(); + let body = response + .text() + .await + .context("Failed to read response body")?; + + if !status.is_success() { + anyhow::bail!("Aliyun CDN API error ({}): {}", status, body); + } + + serde_json::from_str(&body).context("Failed to parse Aliyun CDN API response") + } +} + +// Request/Response structures + +#[derive(Debug, Serialize, Deserialize)] +pub struct DescribeRefreshTasksRequest { + pub domain_name: Option, + pub task_id: Option, + pub object_path: Option, + pub page_number: Option, + pub page_size: Option, + pub object_type: Option, + pub status: Option, + pub start_time: Option, + pub end_time: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct DescribeRefreshTasksResponse { + pub request_id: String, + pub page_number: i64, + pub page_size: i64, + pub total_count: i64, + pub tasks: Tasks, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct Tasks { + pub c_d_n_task: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct CdnTask { + pub task_id: String, + pub object_path: String, + pub process: String, + pub status: String, + pub creation_time: String, + pub description: String, + pub object_type: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct RefreshObjectCachesRequest { + pub object_path: String, + pub object_type: Option, + pub area: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct RefreshObjectCachesResponse { + pub request_id: String, + pub refresh_task_id: String, +} diff --git a/src/aliyun/mod.rs b/src/aliyun/mod.rs new file mode 100644 index 0000000..7b4e196 --- /dev/null +++ b/src/aliyun/mod.rs @@ -0,0 +1,2 @@ +pub mod cdn; +pub mod signature; diff --git a/src/aliyun/signature.rs b/src/aliyun/signature.rs new file mode 100644 index 0000000..b4046a4 --- /dev/null +++ b/src/aliyun/signature.rs @@ -0,0 +1,112 @@ +use base64::{Engine as _, engine::general_purpose}; +use hmac::{Hmac, Mac}; +use sha2::Sha256; +use std::collections::BTreeMap; + +type HmacSha256 = Hmac; + +/// Aliyun API V3 signature generator +/// +/// Implements the signature algorithm for Aliyun API V3 as documented at: +/// https://help.aliyun.com/zh/sdk/product-overview/v3-request-structure-and-signature +pub struct AliyunSigner { + access_key_id: String, + access_key_secret: String, +} + +impl AliyunSigner { + /// Create a new AliyunSigner with access credentials + pub fn new(access_key_id: String, access_key_secret: String) -> Self { + Self { + access_key_id, + access_key_secret, + } + } + + /// Generate signature for API request + /// + /// # Arguments + /// * `method` - HTTP method (e.g., "GET", "POST") + /// * `path` - API path (e.g., "/") + /// * `params` - Query parameters as key-value pairs + /// + /// # Returns + /// Returns a tuple of (query_string, signature) where: + /// - query_string: URL-encoded query string with all parameters + /// - signature: Base64-encoded HMAC-SHA256 signature + pub fn sign( + &self, + method: &str, + path: &str, + params: &BTreeMap, + ) -> (String, String) { + // Build canonical query string from sorted parameters + let canonical_query = self.build_canonical_query(params); + + // Build string to sign + let string_to_sign = format!("{}\n{}\n{}", method, path, canonical_query); + + // Generate signature + let signature_key = format!("{}&", self.access_key_secret); + let mut mac = HmacSha256::new_from_slice(signature_key.as_bytes()) + .expect("HMAC can take key of any size"); + mac.update(string_to_sign.as_bytes()); + let signature_bytes = mac.finalize().into_bytes(); + let signature = general_purpose::STANDARD.encode(signature_bytes); + + (canonical_query, signature) + } + + /// Build canonical query string from parameters + /// Parameters are sorted by key (BTreeMap maintains order) and percent-encoded per RFC 3986 + fn build_canonical_query(&self, params: &BTreeMap) -> String { + params + .iter() + .map(|(k, v)| format!("{}={}", percent_encode(k), percent_encode(v))) + .collect::>() + .join("&") + } + + /// Get access key ID + pub fn access_key_id(&self) -> &str { + &self.access_key_id + } +} + +/// Percent-encode a string according to RFC 3986 +/// +/// This encodes all characters except: A-Z, a-z, 0-9, -, _, ., ~ +/// Space is encoded as %20 (not +) +fn percent_encode(s: &str) -> String { + urlencoding::encode(s).to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_percent_encode() { + assert_eq!(percent_encode("hello world"), "hello%20world"); + assert_eq!(percent_encode("test@example.com"), "test%40example.com"); + assert_eq!(percent_encode("a-b_c.d~e"), "a-b_c.d~e"); + } + + #[test] + fn test_signature_generation() { + let signer = AliyunSigner::new("test_key_id".to_string(), "test_key_secret".to_string()); + + let mut params = BTreeMap::new(); + params.insert("Action".to_string(), "DescribeRefreshTasks".to_string()); + params.insert("Version".to_string(), "2018-05-10".to_string()); + + let (query, signature) = signer.sign("GET", "/", ¶ms); + + // Verify query string is properly formatted + assert!(query.contains("Action=DescribeRefreshTasks")); + assert!(query.contains("Version=2018-05-10")); + + // Verify signature is base64 encoded + assert!(general_purpose::STANDARD.decode(&signature).is_ok()); + } +} diff --git a/src/auth.rs b/src/auth.rs index 6d3db5a..e142ebb 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -13,8 +13,9 @@ use crate::state::AppState; /// JWT Claims structure using standard registered claims #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Claims { - /// Subject (user identifier) - pub sub: String, + /// Subject (user identifier) - optional to support different token types + #[serde(skip_serializing_if = "Option::is_none")] + pub sub: Option, /// Issued at (as Unix timestamp) pub iat: u64, } @@ -28,7 +29,7 @@ impl Claims { .as_secs(); Self { - sub: subject, + sub: Some(subject), iat: now, } } diff --git a/src/config.rs b/src/config.rs index ba50be9..b2cfa3f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -111,6 +111,18 @@ pub struct JwtConfig { pub public_key: String, } +/// Aliyun configuration for CDN and OSS event handling +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct AliyunConfig { + /// Aliyun Access Key ID + pub access_key_id: String, + /// Aliyun Access Key Secret + pub access_key_secret: String, + /// Mapping from OSS bucket names to URL templates + /// Example: "my-bucket" -> "https://cdn.example.com/{object_key}" + pub bucket_url_map: std::collections::HashMap, +} + /// Server configuration for application use #[derive(Debug, Clone, Deserialize, Serialize)] pub struct ServerConfig { @@ -145,6 +157,7 @@ pub struct AppSettings { pub sentry: Option, pub bilibili: BilibiliConfig, pub jwt: JwtConfig, + pub aliyun: Option, } impl AppSettings { diff --git a/src/lib.rs b/src/lib.rs index ed65b3d..13c1fa9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +mod aliyun; pub mod app; pub mod auth; mod config; diff --git a/src/routes/aliyun_handlers.rs b/src/routes/aliyun_handlers.rs new file mode 100644 index 0000000..ea4a071 --- /dev/null +++ b/src/routes/aliyun_handlers.rs @@ -0,0 +1,287 @@ +use crate::aliyun::cdn::{ + AliyunCdnClient, DescribeRefreshTasksRequest, RefreshObjectCachesRequest, +}; +use crate::auth::verify_token; +use crate::error::{AppError, AppResult}; +use crate::state::AppState; +use axum::{Json, extract::State}; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +/// Request body for describing refresh tasks +#[derive(Debug, Deserialize, ToSchema)] +pub struct DescribeRefreshTasksRequestBody { + #[serde(skip_serializing_if = "Option::is_none")] + pub domain_name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub task_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub object_path: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub page_number: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub page_size: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub object_type: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub status: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub start_time: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub end_time: Option, +} + +/// Request body for refreshing object caches +#[derive(Debug, Deserialize, ToSchema)] +pub struct RefreshObjectCachesRequestBody { + pub object_path: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub object_type: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub area: Option, +} + +/// Generic success response +#[derive(Debug, Serialize, ToSchema)] +pub struct SuccessResponse { + pub code: i32, +} + +/// Describe CDN refresh tasks +#[utoipa::path( + post, + path = "/api/aliyun/describeRefreshTasks", + tag = "aliyun", + request_body = DescribeRefreshTasksRequestBody, + responses( + (status = 200, description = "Refresh tasks retrieved successfully", body = serde_json::Value), + (status = 401, description = "Unauthorized", body = SuccessResponse), + (status = 500, description = "Internal server error", body = SuccessResponse), + ), + security( + ("bearer_auth" = []) + ) +)] +pub async fn describe_refresh_tasks( + State(state): State, + Json(req): Json, +) -> AppResult> { + let aliyun_config = state.aliyun_config.ok_or_else(|| { + AppError::InternalError(anyhow::anyhow!("Aliyun configuration not found")) + })?; + + let client = AliyunCdnClient::new( + aliyun_config.access_key_id, + aliyun_config.access_key_secret, + state.http_client, + ); + + let request = DescribeRefreshTasksRequest { + domain_name: req.domain_name, + task_id: req.task_id, + object_path: req.object_path, + page_number: req.page_number, + page_size: req.page_size, + object_type: req.object_type, + status: req.status, + start_time: req.start_time, + end_time: req.end_time, + }; + + let response = client + .describe_refresh_tasks(&request) + .await + .map_err(AppError::InternalError)?; + + Ok(Json(serde_json::to_value(response)?)) +} + +/// Refresh CDN object caches +#[utoipa::path( + post, + path = "/api/aliyun/refreshObjectCaches", + tag = "aliyun", + request_body = RefreshObjectCachesRequestBody, + responses( + (status = 200, description = "Cache refresh initiated successfully", body = serde_json::Value), + (status = 401, description = "Unauthorized", body = SuccessResponse), + (status = 500, description = "Internal server error", body = SuccessResponse), + ), + security( + ("bearer_auth" = []) + ) +)] +pub async fn refresh_object_caches( + State(state): State, + Json(req): Json, +) -> AppResult> { + let aliyun_config = state.aliyun_config.ok_or_else(|| { + AppError::InternalError(anyhow::anyhow!("Aliyun configuration not found")) + })?; + + let client = AliyunCdnClient::new( + aliyun_config.access_key_id, + aliyun_config.access_key_secret, + state.http_client, + ); + + let request = RefreshObjectCachesRequest { + object_path: req.object_path, + object_type: req.object_type, + area: req.area, + }; + + let response = client + .refresh_object_caches(&request) + .await + .map_err(AppError::InternalError)?; + + Ok(Json(serde_json::to_value(response)?)) +} + +/// OSS EventBridge event structures +#[derive(Debug, Deserialize, ToSchema)] +pub struct OssEventPayload { + pub data: OssEventData, +} + +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct OssEventData { + pub oss: OssData, +} + +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct OssData { + pub bucket: OssBucket, + pub object: OssObject, +} + +#[derive(Debug, Deserialize, ToSchema)] +pub struct OssBucket { + pub name: String, +} + +#[derive(Debug, Deserialize, ToSchema)] +pub struct OssObject { + pub key: String, +} + +/// Handle OSS EventBridge events and trigger CDN refresh +#[utoipa::path( + post, + path = "/api/aliyun/events", + tag = "aliyun", + request_body = serde_json::Value, + responses( + (status = 200, description = "Event processed successfully", body = SuccessResponse), + (status = 401, description = "Unauthorized", body = SuccessResponse), + (status = 500, description = "Internal server error", body = SuccessResponse), + ), + security( + ("eventbridge_token" = []) + ) +)] +pub async fn handle_oss_events( + State(state): State, + req: axum::extract::Request, +) -> AppResult> { + // Extract and verify EventBridge signature token + let headers = req.headers(); + let token = headers + .get("x-eventbridge-signature-token") + .ok_or_else(|| { + AppError::Unauthorized(anyhow::anyhow!( + "Missing x-eventbridge-signature-token header" + )) + })? + .to_str() + .map_err(|_| { + AppError::Unauthorized(anyhow::anyhow!( + "Invalid x-eventbridge-signature-token format" + )) + })?; + + // Verify JWT token + verify_token(token, &state.jwt_config.public_key).map_err(|err| { + AppError::Unauthorized(anyhow::anyhow!( + "EventBridge token verification failed: {}", + err + )) + })?; + + // Read request body + let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) + .await + .map_err(|e| { + AppError::InternalError(anyhow::anyhow!("Failed to read request body: {}", e)) + })?; + + let body_str = String::from_utf8(body_bytes.to_vec()).map_err(|e| { + AppError::InternalError(anyhow::anyhow!("Invalid UTF-8 in request body: {}", e)) + })?; + + // Log raw payload for debugging + tracing::debug!("Received OSS event payload: {}", body_str); + + // Parse event + let event: OssEventPayload = serde_json::from_str(&body_str).map_err(|e| { + AppError::BadRequest(anyhow::anyhow!("Failed to parse event payload: {}", e)) + })?; + + let bucket_name = &event.data.oss.bucket.name; + let object_key = &event.data.oss.object.key; + + tracing::info!( + "Processing OSS event for bucket: {}, object: {}", + bucket_name, + object_key + ); + + // Get Aliyun config + let aliyun_config = state.aliyun_config.ok_or_else(|| { + AppError::InternalError(anyhow::anyhow!("Aliyun configuration not found")) + })?; + + // Look up URL template for bucket + let url_template = aliyun_config + .bucket_url_map + .get(bucket_name) + .ok_or_else(|| { + AppError::BadRequest(anyhow::anyhow!( + "No URL mapping found for bucket: {}", + bucket_name + )) + })?; + + // Build URL by replacing {object_key} placeholder with percent-encoded object key + let encoded_key = urlencoding::encode(object_key); + let cdn_url = url_template.replace("{object_key}", &encoded_key); + + tracing::info!("Triggering CDN refresh for URL: {}", cdn_url); + + // Create CDN client and refresh cache + let cdn_client = AliyunCdnClient::new( + aliyun_config.access_key_id.clone(), + aliyun_config.access_key_secret.clone(), + state.http_client.clone(), + ); + + let refresh_request = RefreshObjectCachesRequest { + object_path: cdn_url.clone(), + object_type: Some("File".to_string()), + area: None, + }; + + cdn_client + .refresh_object_caches(&refresh_request) + .await + .map_err(|e| { + AppError::InternalError(anyhow::anyhow!("Failed to refresh CDN cache: {}", e)) + })?; + + tracing::info!("Successfully triggered CDN refresh for: {}", cdn_url); + + Ok(Json(SuccessResponse { code: 0 })) +} diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 1bd9553..5fee30b 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -1,4 +1,5 @@ #![allow(clippy::needless_for_each)] +mod aliyun_handlers; mod bilibili_handlers; mod misc_handlers; use crate::{auth::jwt_auth_middleware, middleware::apply_axum_middleware, state::AppState}; @@ -12,9 +13,20 @@ use utoipa_scalar::{Scalar, Servable}; tags( (name = "health", description = "Health check endpoints"), (name = "bilibili", description = "Bilibili dynamic posting endpoints"), + (name = "aliyun", description = "Aliyun CDN and OSS event endpoints"), ), components( - schemas(bilibili_handlers::DynamicResponse) + schemas( + bilibili_handlers::DynamicResponse, + aliyun_handlers::DescribeRefreshTasksRequestBody, + aliyun_handlers::RefreshObjectCachesRequestBody, + aliyun_handlers::SuccessResponse, + aliyun_handlers::OssEventPayload, + aliyun_handlers::OssEventData, + aliyun_handlers::OssData, + aliyun_handlers::OssBucket, + aliyun_handlers::OssObject, + ) ), modifiers(&SecurityAddon) )] @@ -33,7 +45,19 @@ impl utoipa::Modify for SecurityAddon { .bearer_format("JWT") .build(), ), - ) + ); + components.add_security_scheme( + "eventbridge_token", + utoipa::openapi::security::SecurityScheme::Http( + utoipa::openapi::security::HttpBuilder::new() + .scheme(utoipa::openapi::security::HttpAuthScheme::Bearer) + .bearer_format("JWT") + .description(Some( + "JWT token from EventBridge via x-eventbridge-signature-token header", + )) + .build(), + ), + ); } } } @@ -50,6 +74,11 @@ pub fn build_router(state: AppState) -> Router { )) // Bilibili routes (protected by JWT auth) .routes(routes!(bilibili_handlers::create_dynamic)) + // Aliyun CDN routes (protected by JWT auth) + .routes(routes!(aliyun_handlers::describe_refresh_tasks)) + .routes(routes!(aliyun_handlers::refresh_object_caches)) + // Aliyun OSS event handler (requires custom auth header handling) + .routes(routes!(aliyun_handlers::handle_oss_events)) .split_for_parts(); openapi.paths.paths = openapi diff --git a/src/state.rs b/src/state.rs index 73bd475..e3463ef 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,5 +1,5 @@ use crate::{ - config::{AppSettings, BilibiliConfig, JwtConfig}, + config::{AliyunConfig, AppSettings, BilibiliConfig, JwtConfig}, repository::PostgresRepository, }; @@ -8,6 +8,7 @@ pub struct AppState { pub repository: PostgresRepository, pub bilibili_config: BilibiliConfig, pub jwt_config: JwtConfig, + pub aliyun_config: Option, pub http_client: reqwest::Client, } @@ -21,6 +22,7 @@ pub async fn init_state_with_pg(config: &AppSettings) -> AppState { repository: PostgresRepository { pool }, bilibili_config: config.bilibili.clone(), jwt_config: config.jwt.clone(), + aliyun_config: config.aliyun.clone(), http_client: reqwest::Client::new(), } } diff --git a/src/tracing.rs b/src/tracing.rs index 4765bfe..e8fd1db 100644 --- a/src/tracing.rs +++ b/src/tracing.rs @@ -12,7 +12,7 @@ use tracing_subscriber::{ use crate::config::{LogFormat, LogLevel, LoggerConfig, SentryConfig}; -const MODULE_WHITELIST: &[&str] = &["tower_http", "sqlx::query", "my_axum_template"]; +const MODULE_WHITELIST: &[&str] = &["tower_http", "sqlx::query", "janus"]; fn init_env_filter(override_filter: Option<&String>, level: &LogLevel) -> EnvFilter { EnvFilter::try_from_default_env() From 847cfcef8f38969f565bf8cb003fd47034a9fa05 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 17 Jan 2026 07:58:01 +0000 Subject: [PATCH 3/3] Remove CDN API endpoints, keep only /api/aliyun/events - Remove /api/aliyun/describeRefreshTasks endpoint - Remove /api/aliyun/refreshObjectCaches endpoint - Remove unused CDN request/response structures - Remove describe_refresh_tasks method from AliyunCdnClient - Update OpenAPI schemas to only include OSS event structures - Keep only the EventBridge webhook handler for OSS events Co-authored-by: daflyinbed <21363956+daflyinbed@users.noreply.github.com> --- src/aliyun/cdn.rs | 130 --------------------------------- src/routes/aliyun_handlers.rs | 132 +--------------------------------- src/routes/mod.rs | 7 +- 3 files changed, 4 insertions(+), 265 deletions(-) diff --git a/src/aliyun/cdn.rs b/src/aliyun/cdn.rs index c544b60..1e2266d 100644 --- a/src/aliyun/cdn.rs +++ b/src/aliyun/cdn.rs @@ -25,95 +25,6 @@ impl AliyunCdnClient { } } - /// Query CDN refresh tasks - /// - /// API documentation: https://help.aliyun.com/zh/cdn/developer-reference/api-cdn-2018-05-10-describerefreshtasks - pub async fn describe_refresh_tasks( - &self, - request: &DescribeRefreshTasksRequest, - ) -> Result { - let mut params = BTreeMap::new(); - - // Required parameters - params.insert("Action".to_string(), "DescribeRefreshTasks".to_string()); - params.insert("Version".to_string(), API_VERSION.to_string()); - params.insert("Format".to_string(), "JSON".to_string()); - params.insert( - "AccessKeyId".to_string(), - self.signer.access_key_id().to_string(), - ); - params.insert("SignatureMethod".to_string(), "HMAC-SHA256".to_string()); - params.insert("SignatureVersion".to_string(), "1.0".to_string()); - params.insert( - "SignatureNonce".to_string(), - uuid::Uuid::new_v4().to_string(), - ); - params.insert( - "Timestamp".to_string(), - chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(), - ); - - // Optional parameters - if let Some(domain_name) = &request.domain_name { - params.insert("DomainName".to_string(), domain_name.clone()); - } - if let Some(task_id) = &request.task_id { - params.insert("TaskId".to_string(), task_id.clone()); - } - if let Some(object_path) = &request.object_path { - params.insert("ObjectPath".to_string(), object_path.clone()); - } - if let Some(page_number) = request.page_number { - params.insert("PageNumber".to_string(), page_number.to_string()); - } - if let Some(page_size) = request.page_size { - params.insert("PageSize".to_string(), page_size.to_string()); - } - if let Some(object_type) = &request.object_type { - params.insert("ObjectType".to_string(), object_type.clone()); - } - if let Some(status) = &request.status { - params.insert("Status".to_string(), status.clone()); - } - if let Some(start_time) = &request.start_time { - params.insert("StartTime".to_string(), start_time.clone()); - } - if let Some(end_time) = &request.end_time { - params.insert("EndTime".to_string(), end_time.clone()); - } - - // Generate signature - let (query_string, signature) = self.signer.sign("GET", "/", ¶ms); - - // Build final URL (parameters are already percent-encoded in query_string) - let url = format!( - "{}/?{}&Signature={}", - CDN_ENDPOINT, - query_string, - urlencoding::encode(&signature) - ); - - // Make request - let response = self - .http_client - .get(&url) - .send() - .await - .context("Failed to send request to Aliyun CDN API")?; - - let status = response.status(); - let body = response - .text() - .await - .context("Failed to read response body")?; - - if !status.is_success() { - anyhow::bail!("Aliyun CDN API error ({}): {}", status, body); - } - - serde_json::from_str(&body).context("Failed to parse Aliyun CDN API response") - } - /// Refresh CDN object caches /// /// API documentation: https://help.aliyun.com/zh/cdn/developer-reference/api-cdn-2018-05-10-refreshobjectcaches @@ -186,47 +97,6 @@ impl AliyunCdnClient { // Request/Response structures -#[derive(Debug, Serialize, Deserialize)] -pub struct DescribeRefreshTasksRequest { - pub domain_name: Option, - pub task_id: Option, - pub object_path: Option, - pub page_number: Option, - pub page_size: Option, - pub object_type: Option, - pub status: Option, - pub start_time: Option, - pub end_time: Option, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "PascalCase")] -pub struct DescribeRefreshTasksResponse { - pub request_id: String, - pub page_number: i64, - pub page_size: i64, - pub total_count: i64, - pub tasks: Tasks, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "PascalCase")] -pub struct Tasks { - pub c_d_n_task: Vec, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "PascalCase")] -pub struct CdnTask { - pub task_id: String, - pub object_path: String, - pub process: String, - pub status: String, - pub creation_time: String, - pub description: String, - pub object_type: String, -} - #[derive(Debug, Serialize, Deserialize)] pub struct RefreshObjectCachesRequest { pub object_path: String, diff --git a/src/routes/aliyun_handlers.rs b/src/routes/aliyun_handlers.rs index ea4a071..7406e7a 100644 --- a/src/routes/aliyun_handlers.rs +++ b/src/routes/aliyun_handlers.rs @@ -1,144 +1,18 @@ -use crate::aliyun::cdn::{ - AliyunCdnClient, DescribeRefreshTasksRequest, RefreshObjectCachesRequest, -}; +use crate::aliyun::cdn::{AliyunCdnClient, RefreshObjectCachesRequest}; use crate::auth::verify_token; use crate::error::{AppError, AppResult}; use crate::state::AppState; -use axum::{Json, extract::State}; +use axum::Json; +use axum::extract::State; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; -/// Request body for describing refresh tasks -#[derive(Debug, Deserialize, ToSchema)] -pub struct DescribeRefreshTasksRequestBody { - #[serde(skip_serializing_if = "Option::is_none")] - pub domain_name: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub task_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub object_path: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub page_number: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub page_size: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub object_type: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub status: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub start_time: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub end_time: Option, -} - -/// Request body for refreshing object caches -#[derive(Debug, Deserialize, ToSchema)] -pub struct RefreshObjectCachesRequestBody { - pub object_path: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub object_type: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub area: Option, -} - /// Generic success response #[derive(Debug, Serialize, ToSchema)] pub struct SuccessResponse { pub code: i32, } -/// Describe CDN refresh tasks -#[utoipa::path( - post, - path = "/api/aliyun/describeRefreshTasks", - tag = "aliyun", - request_body = DescribeRefreshTasksRequestBody, - responses( - (status = 200, description = "Refresh tasks retrieved successfully", body = serde_json::Value), - (status = 401, description = "Unauthorized", body = SuccessResponse), - (status = 500, description = "Internal server error", body = SuccessResponse), - ), - security( - ("bearer_auth" = []) - ) -)] -pub async fn describe_refresh_tasks( - State(state): State, - Json(req): Json, -) -> AppResult> { - let aliyun_config = state.aliyun_config.ok_or_else(|| { - AppError::InternalError(anyhow::anyhow!("Aliyun configuration not found")) - })?; - - let client = AliyunCdnClient::new( - aliyun_config.access_key_id, - aliyun_config.access_key_secret, - state.http_client, - ); - - let request = DescribeRefreshTasksRequest { - domain_name: req.domain_name, - task_id: req.task_id, - object_path: req.object_path, - page_number: req.page_number, - page_size: req.page_size, - object_type: req.object_type, - status: req.status, - start_time: req.start_time, - end_time: req.end_time, - }; - - let response = client - .describe_refresh_tasks(&request) - .await - .map_err(AppError::InternalError)?; - - Ok(Json(serde_json::to_value(response)?)) -} - -/// Refresh CDN object caches -#[utoipa::path( - post, - path = "/api/aliyun/refreshObjectCaches", - tag = "aliyun", - request_body = RefreshObjectCachesRequestBody, - responses( - (status = 200, description = "Cache refresh initiated successfully", body = serde_json::Value), - (status = 401, description = "Unauthorized", body = SuccessResponse), - (status = 500, description = "Internal server error", body = SuccessResponse), - ), - security( - ("bearer_auth" = []) - ) -)] -pub async fn refresh_object_caches( - State(state): State, - Json(req): Json, -) -> AppResult> { - let aliyun_config = state.aliyun_config.ok_or_else(|| { - AppError::InternalError(anyhow::anyhow!("Aliyun configuration not found")) - })?; - - let client = AliyunCdnClient::new( - aliyun_config.access_key_id, - aliyun_config.access_key_secret, - state.http_client, - ); - - let request = RefreshObjectCachesRequest { - object_path: req.object_path, - object_type: req.object_type, - area: req.area, - }; - - let response = client - .refresh_object_caches(&request) - .await - .map_err(AppError::InternalError)?; - - Ok(Json(serde_json::to_value(response)?)) -} - /// OSS EventBridge event structures #[derive(Debug, Deserialize, ToSchema)] pub struct OssEventPayload { diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 5fee30b..089cc01 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -13,13 +13,11 @@ use utoipa_scalar::{Scalar, Servable}; tags( (name = "health", description = "Health check endpoints"), (name = "bilibili", description = "Bilibili dynamic posting endpoints"), - (name = "aliyun", description = "Aliyun CDN and OSS event endpoints"), + (name = "aliyun", description = "Aliyun OSS event endpoints"), ), components( schemas( bilibili_handlers::DynamicResponse, - aliyun_handlers::DescribeRefreshTasksRequestBody, - aliyun_handlers::RefreshObjectCachesRequestBody, aliyun_handlers::SuccessResponse, aliyun_handlers::OssEventPayload, aliyun_handlers::OssEventData, @@ -74,9 +72,6 @@ pub fn build_router(state: AppState) -> Router { )) // Bilibili routes (protected by JWT auth) .routes(routes!(bilibili_handlers::create_dynamic)) - // Aliyun CDN routes (protected by JWT auth) - .routes(routes!(aliyun_handlers::describe_refresh_tasks)) - .routes(routes!(aliyun_handlers::refresh_object_caches)) // Aliyun OSS event handler (requires custom auth header handling) .routes(routes!(aliyun_handlers::handle_oss_events)) .split_for_parts();