diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..1b345a8 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,45 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Debug unit tests in library 'janus'", + "type": "lldb", + "request": "launch", + "cargo": { + "args": [ + "test" + ] + } + }, + { + "name": "Debug executable 'janus'", + "type": "lldb", + "request": "launch", + "cargo": { + "args": [ + "run", + "--bin=janus", + ] + }, + "args": [ + "server", + "-c", + "tmp.toml" + ] + }, + { + "name": "Debug unit tests in executable 'janus'", + "type": "lldb", + "request": "launch", + "cargo": { + "args": [ + "test", + "--bin=janus" + ] + } + } + ] +} diff --git a/Cargo.lock b/Cargo.lock index a0dcda4..125e137 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1467,6 +1467,7 @@ dependencies = [ "hmac", "jsonwebtoken", "mimalloc", + "percent-encoding", "rand 0.8.5", "reqwest", "sentry", diff --git a/Cargo.toml b/Cargo.toml index 00da353..e379dde 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,3 +68,4 @@ rand = "0.8" jsonwebtoken = "9.3" sha2 = "0.10" hmac = "0.12" +percent-encoding = "2.3.2" diff --git a/demo.jsonl b/demo.jsonl new file mode 100644 index 0000000..60009bf --- /dev/null +++ b/demo.jsonl @@ -0,0 +1,6 @@ +{"id":"7adc8c1a-645d-4476-bdef-5d6fb57f0001","source":"acs.oss","specversion":"1.0","type":"oss:ObjectCreated:PutObject","datacontenttype":"application/json","subject":"acs:oss:cn-hangzhou:1234567:prts-static/images/test.png","time":"2024-01-13T16:04:46.149Z","aliyuneventbusname":"default","aliyunregionid":"cn-hangzhou","data":{"region":"cn-hangzhou","eventVersion":"1.0","eventSource":"acs:oss","eventName":"ObjectCreated:PutObject","eventTime":"2024-01-13T16:04:46.000Z","requestParameters":{"sourceIPAddress":"192.168.1.100"},"userIdentity":{"principalId":"12345678"},"responseElements":{"requestId":"ABC123"},"oss":{"bucket":{"name":"prts-static","arn":"acs:oss:cn-hangzhou:1234567:prts-static","ownerIdentity":"1234567"},"object":{"key":"images/test.png","eTag":"d41d8cd98f00b204e9800998ecf8427e","deltaSize":12345},"ossSchemaVersion":"1.0"}}} +{"id":"7adc8c1a-645d-4476-bdef-5d6fb57f0002","source":"acs.oss","specversion":"1.0","type":"oss:ObjectCreated:CompleteMultipartUpload","datacontenttype":"application/json","subject":"acs:oss:cn-hangzhou:1234567:ak-media/videos/large.mp4","time":"2024-01-13T17:10:30.000Z","aliyuneventbusname":"default","aliyunregionid":"cn-hangzhou","data":{"region":"cn-hangzhou","eventVersion":"1.0","eventSource":"acs:oss","eventName":"ObjectCreated:CompleteMultipartUpload","eventTime":"2024-01-13T17:10:30.000Z","requestParameters":{"sourceIPAddress":"192.168.1.101"},"userIdentity":{"principalId":"12345678"},"responseElements":{"requestId":"DEF456"},"oss":{"bucket":{"name":"ak-media","arn":"acs:oss:cn-hangzhou:1234567:ak-media","ownerIdentity":"1234567"},"object":{"key":"videos/large.mp4","eTag":"9b2cf83a0e5e3e5c4e5f5f5f5f5f5f5f","deltaSize":104857600},"ossSchemaVersion":"1.0"}}} +{"id":"7adc8c1a-645d-4476-bdef-5d6fb57f0003","source":"acs.oss","specversion":"1.0","type":"oss:ObjectRemoved:DeleteObject","datacontenttype":"application/json","subject":"acs:oss:cn-hangzhou:1234567:prts-static/old/deprecated.js","time":"2024-01-13T18:20:15.000Z","aliyuneventbusname":"default","aliyunregionid":"cn-hangzhou","data":{"region":"cn-hangzhou","eventVersion":"1.0","eventSource":"acs:oss","eventName":"ObjectRemoved:DeleteObject","eventTime":"2024-01-13T18:20:15.000Z","requestParameters":{"sourceIPAddress":"192.168.1.102"},"userIdentity":{"principalId":"12345678"},"responseElements":{"requestId":"GHI789"},"oss":{"bucket":{"name":"prts-static","arn":"acs:oss:cn-hangzhou:1234567:prts-static","ownerIdentity":"1234567"},"object":{"key":"old/deprecated.js","eTag":"a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6","deltaSize":-5678},"ossSchemaVersion":"1.0"}}} +{"id":"7adc8c1a-645d-4476-bdef-5d6fb57f0004","source":"acs.oss","specversion":"1.0","type":"oss:ObjectCreated:PostObject","datacontenttype":"application/json","subject":"acs:oss:cn-hangzhou:1234567:ak-media/uploads/document.pdf","time":"2024-01-13T19:30:45.000Z","aliyuneventbusname":"default","aliyunregionid":"cn-hangzhou","data":{"region":"cn-hangzhou","eventVersion":"1.0","eventSource":"acs:oss","eventName":"ObjectCreated:PostObject","eventTime":"2024-01-13T19:30:45.000Z","requestParameters":{"sourceIPAddress":"192.168.1.103"},"userIdentity":{"principalId":"12345678"},"responseElements":{"requestId":"JKL012"},"oss":{"bucket":{"name":"ak-media","arn":"acs:oss:cn-hangzhou:1234567:ak-media","ownerIdentity":"1234567"},"object":{"key":"uploads/document.pdf","eTag":"1a2b3c4d5e6f7g8h9i0j1k2l3m4n5o6","deltaSize":20480},"ossSchemaVersion":"1.0"}}} +{"id":"7adc8c1a-645d-4476-bdef-5d6fb57f0005","source":"acs.oss","specversion":"1.0","type":"oss:ObjectCreated:CopyObject","datacontenttype":"application/json","subject":"acs:oss:cn-hangzhou:1234567:prts-static/backup/copy.txt","time":"2024-01-13T20:15:20.000Z","aliyuneventbusname":"default","aliyunregionid":"cn-hangzhou","data":{"region":"cn-hangzhou","eventVersion":"1.0","eventSource":"acs:oss","eventName":"ObjectCreated:CopyObject","eventTime":"2024-01-13T20:15:20.000Z","requestParameters":{"sourceIPAddress":"192.168.1.104"},"userIdentity":{"principalId":"12345678"},"responseElements":{"requestId":"MNO345"},"oss":{"bucket":{"name":"prts-static","arn":"acs:oss:cn-hangzhou:1234567:prts-static","ownerIdentity":"1234567"},"object":{"key":"backup/copy.txt","eTag":"f1e2d3c4b5a6978869504132231455","deltaSize":1024},"ossSchemaVersion":"1.0"}}} +{"id":"7adc8c1a-645d-4476-bdef-5d6fb57f0006","source":"acs.oss","specversion":"1.0","type":"oss:ObjectCreated:AppendObject","datacontenttype":"application/json","subject":"acs:oss:cn-hangzhou:1234567:ak-media/logs/app.log","time":"2024-01-13T21:05:10.000Z","aliyuneventbusname":"default","aliyunregionid":"cn-hangzhou","data":{"region":"cn-hangzhou","eventVersion":"1.0","eventSource":"acs:oss","eventName":"ObjectCreated:AppendObject","eventTime":"2024-01-13T21:05:10.000Z","requestParameters":{"sourceIPAddress":"192.168.1.105"},"userIdentity":{"principalId":"12345678"},"responseElements":{"requestId":"PQR678"},"oss":{"bucket":{"name":"ak-media","arn":"acs:oss:cn-hangzhou:1234567:ak-media","ownerIdentity":"1234567"},"object":{"key":"logs/app.log","eTag":"abcdef0123456789abcdef0123456789","deltaSize":512},"ossSchemaVersion":"1.0"}}} diff --git a/example.toml b/example.toml index 315a0b3..6b30111 100644 --- a/example.toml +++ b/example.toml @@ -35,6 +35,12 @@ bili_jct = "your_bilibili_bili_jct" access_key_id = "your_aliyun_access_key_id" access_key_secret = "your_aliyun_access_key_secret" +# Bucket to URL template mapping +# The {object_key} placeholder will be replaced with the actual object key (URL-encoded) +[aliyun.bucket_url_map] +prts-static = "https://static.prts.wiki/{object_key}" +ak-media = "https://media.prts.wiki/{object_key}" + # JWT Configuration (required for API authentication) # Generate ES256 key pair (compatible with jsonwebtoken): # openssl ecparam -genkey -name prime256v1 -noout -out private.pem diff --git a/src/aliyun/signature.rs b/src/aliyun/signature.rs index 67e0c41..4b230cb 100644 --- a/src/aliyun/signature.rs +++ b/src/aliyun/signature.rs @@ -1,5 +1,6 @@ use anyhow::{Context, Result}; use chrono::Utc; +use percent_encoding::{NON_ALPHANUMERIC, percent_encode}; use rand::RngCore; use sha2::{Digest, Sha256}; use std::collections::BTreeMap; @@ -54,7 +55,13 @@ impl AliyunSigner { fn build_canonical_query_string(params: &BTreeMap) -> String { params .iter() - .map(|(k, v)| format!("{}={}", percent_encode(k), percent_encode(v))) + .map(|(k, v)| { + format!( + "{}={}", + percent_encode(k.as_bytes(), NON_ALPHANUMERIC), + percent_encode(v.as_bytes(), NON_ALPHANUMERIC) + ) + }) .collect::>() .join("&") } @@ -77,7 +84,7 @@ impl AliyunSigner { out.push_str( &trimmed .split('/') - .map(percent_encode) + .map(|segment| percent_encode(segment.as_bytes(), NON_ALPHANUMERIC).to_string()) .collect::>() .join("/"), ); @@ -213,21 +220,6 @@ impl AliyunSigner { } } -/// Percent encode a string according to RFC 3986 -/// -/// This encodes all characters except: A-Z, a-z, 0-9, -, _, ., ~ -fn percent_encode(input: &str) -> String { - input - .bytes() - .map(|byte| match byte { - b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => { - (byte as char).to_string() - } - _ => format!("%{:02X}", byte), - }) - .collect() -} - fn sha256_hex(input: &[u8]) -> String { let mut hasher = Sha256::new(); hasher.update(input); diff --git a/src/auth.rs b/src/auth.rs index 6d3db5a..033e15c 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -5,7 +5,10 @@ use axum::{ }; use jsonwebtoken::{Algorithm, DecodingKey, EncodingKey, Header, Validation, decode, encode}; use serde::{Deserialize, Serialize}; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::{ + collections::HashSet, + time::{SystemTime, UNIX_EPOCH}, +}; use crate::error::{AppError, AppResult}; use crate::state::AppState; @@ -13,7 +16,7 @@ use crate::state::AppState; /// JWT Claims structure using standard registered claims #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Claims { - /// Subject (user identifier) + /// Subject (user identifier) - optional since EventBridge tokens may not include it pub sub: String, /// Issued at (as Unix timestamp) pub iat: u64, @@ -55,6 +58,7 @@ pub fn verify_token( let decoding_key = DecodingKey::from_ec_pem(public_key_pem.as_bytes())?; let mut validation = Validation::new(Algorithm::ES256); validation.validate_exp = false; // No expiration validation + validation.required_spec_claims = HashSet::new(); // don't validate “exp”, “nbf”, “aud”, “iss”, “sub” let token_data = decode::(token, &decoding_key, &validation)?; Ok(token_data.claims) diff --git a/src/config.rs b/src/config.rs index c597f2e..9b30389 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use serde_variant::to_variant_name; -use std::{fs, path::Path}; +use std::{collections::HashMap, fs, path::Path}; use thiserror::Error; use tracing::info; @@ -118,6 +118,10 @@ pub struct AliyunConfig { pub access_key_id: String, /// Aliyun Access Key Secret pub access_key_secret: String, + /// Bucket name to URL template mapping + /// The URL template can contain {object_key} placeholder which will be replaced with the actual object key + #[serde(default)] + pub bucket_url_map: HashMap, } /// Server configuration for application use diff --git a/src/error.rs b/src/error.rs index 49d2a46..bf05c75 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,6 +5,7 @@ use axum::{ }; use serde_json::json; use thiserror::Error; +use tracing::error; /// Application-level errors for HTTP handlers #[derive(Error, Debug)] @@ -35,7 +36,11 @@ impl IntoResponse for AppError { let status = self.status_code(); // Log the detailed error with full context chain - tracing::error!("Handler error: {:?}", self); + error!( + error = ?self, + status_code = %status, + "Handler error" + ); let body = json!({ "code": 1, diff --git a/src/routes/aliyun_handlers.rs b/src/routes/aliyun_handlers.rs index 24595d0..fd65609 100644 --- a/src/routes/aliyun_handlers.rs +++ b/src/routes/aliyun_handlers.rs @@ -1,13 +1,17 @@ -use axum::{Json, extract::State}; +use axum::{Json, extract::State, http::HeaderMap}; +use percent_encoding::{NON_ALPHANUMERIC, percent_encode}; use serde::{Deserialize, Serialize}; +use tracing::info; use utoipa::ToSchema; -use crate::aliyun::{ - AliyunCdnClient, DescribeRefreshTasksRequest, DescribeRefreshTasksResponse, - RefreshObjectCachesRequest, RefreshObjectCachesResponse, -}; -use crate::error::AppResult; use crate::state::AppState; +use crate::{ + aliyun::{ + AliyunCdnClient, DescribeRefreshTasksRequest, DescribeRefreshTasksResponse, + RefreshObjectCachesRequest, RefreshObjectCachesResponse, + }, + error::{AppError, AppResult}, +}; /// Request payload for describe refresh tasks endpoint #[derive(ToSchema, Serialize, Deserialize, Debug)] @@ -141,3 +145,166 @@ pub async fn refresh_object_caches( Ok(Json(response)) } + +/// OSS bucket information in event data +#[derive(ToSchema, Serialize, Deserialize, Debug)] +pub struct OssBucket { + pub name: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub arn: Option, + #[serde(rename = "ownerIdentity", skip_serializing_if = "Option::is_none")] + pub owner_identity: Option, +} + +/// OSS object information in event data +#[derive(ToSchema, Serialize, Deserialize, Debug)] +pub struct OssObject { + pub key: String, + #[serde(rename = "eTag", skip_serializing_if = "Option::is_none")] + pub etag: Option, + #[serde(rename = "deltaSize", skip_serializing_if = "Option::is_none")] + pub delta_size: Option, +} + +/// OSS-specific data in event +#[derive(ToSchema, Serialize, Deserialize, Debug)] +pub struct OssData { + pub bucket: OssBucket, + pub object: OssObject, + #[serde(rename = "ossSchemaVersion", skip_serializing_if = "Option::is_none")] + pub oss_schema_version: Option, +} + +/// Complete event data structure from OSS +#[derive(ToSchema, Serialize, Deserialize, Debug)] +pub struct OssEventData { + #[serde(skip_serializing_if = "Option::is_none")] + pub region: Option, + #[serde(rename = "eventVersion", skip_serializing_if = "Option::is_none")] + pub event_version: Option, + #[serde(rename = "eventSource", skip_serializing_if = "Option::is_none")] + pub event_source: Option, + #[serde(rename = "eventName", skip_serializing_if = "Option::is_none")] + pub event_name: Option, + #[serde(rename = "eventTime", skip_serializing_if = "Option::is_none")] + pub event_time: Option, + pub oss: OssData, +} + +/// EventBridge OSS event payload +#[derive(ToSchema, Serialize, Deserialize, Debug)] +pub struct OssEventPayload { + pub id: String, + pub source: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub specversion: Option, + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub event_type: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub datacontenttype: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub subject: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub time: Option, + pub data: OssEventData, +} + +/// Response for OSS event handler +#[derive(ToSchema, Serialize, Deserialize, Debug)] +pub struct OssEventResponse { + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub task_id: Option, +} + +/// Handle Aliyun EventBridge OSS events +#[utoipa::path( + post, + tag = "aliyun", + path = "/aliyun/events", + request_body = OssEventPayload, + responses( + (status = OK, description = "Successfully processed OSS event and triggered CDN refresh", body = OssEventResponse), + (status = UNAUTHORIZED, description = "Missing or invalid x-eventbridge-signature-token"), + (status = BAD_REQUEST, description = "Invalid request or unsupported bucket"), + (status = INTERNAL_SERVER_ERROR, description = "Internal server error") + ), + security( + ("eventbridge_token" = []) + ) +)] +pub async fn handle_oss_events( + State(state): State, + headers: HeaderMap, + Json(raw_payload): Json, +) -> AppResult> { + let token = headers + .get("x-eventbridge-signature-token") + .ok_or_else(|| { + AppError::Unauthorized(anyhow::anyhow!( + "Missing x-eventbridge-signature-token header" + )) + })? + .to_str() + .map_err(|_| { + AppError::Unauthorized(anyhow::anyhow!( + "Invalid x-eventbridge-signature-token header format" + )) + })? + .trim(); + + crate::auth::verify_token(token, &state.jwt_config.public_key).map_err(|err| { + AppError::Unauthorized(anyhow::anyhow!( + "JWT verification failed (x-eventbridge-signature-token): {err}" + )) + })?; + + // Parse the raw JSON into OssEventPayload + let payload: OssEventPayload = serde_json::from_value(raw_payload).map_err(|err| { + AppError::BadRequest(anyhow::anyhow!( + "Failed to parse OSS event payload: {}", + err + )) + })?; + + info!( + event = ?payload, + "Received OSS event" + ); + + let bucket_name = &payload.data.oss.bucket.name; + let object_key = &payload.data.oss.object.key; + + // Get URL template from bucket map + let url_template = state + .aliyun_config + .bucket_url_map + .get(bucket_name) + .ok_or_else(|| { + AppError::BadRequest(anyhow::anyhow!("Unsupported bucket: {}", bucket_name)) + })?; + + // Build the full URL by replacing {object_key} with the actual encoded object key + let encoded_object_key = percent_encode(object_key.as_bytes(), NON_ALPHANUMERIC).to_string(); + let object_url = url_template.replace("{object_key}", &encoded_object_key); + + // Create CDN client + let client = AliyunCdnClient::new(&state.aliyun_config, state.http_client.clone()); + + // Refresh the object cache + let request = RefreshObjectCachesRequest { + object_path: object_url.clone(), + object_type: Some("File".to_string()), + force: Some(false), + }; + + let response = client.refresh_object_caches(&request).await?; + + Ok(Json(OssEventResponse { + message: format!( + "CDN refresh triggered for {} in bucket {}", + object_key, bucket_name + ), + task_id: Some(response.refresh_task_id), + })) +} diff --git a/src/routes/bilibili_handlers.rs b/src/routes/bilibili_handlers.rs index 3471248..e4cbf4e 100644 --- a/src/routes/bilibili_handlers.rs +++ b/src/routes/bilibili_handlers.rs @@ -7,7 +7,7 @@ use rand::Rng; use reqwest::multipart::{Form, Part}; use serde::{Deserialize, Serialize}; use std::time::{SystemTime, UNIX_EPOCH}; -use tracing::info; +use tracing::{info, warn}; use utoipa::ToSchema; use crate::error::{AppError, AppResult}; @@ -167,7 +167,10 @@ async fn handle_create_dynamic_response( let body = resp.text().await.context("Read response failed")?; - info!("Create dynamic response: {}", body); + info!( + response_body = %body, + "Create dynamic response received" + ); let r: BilibiliCreateResponse = serde_json::from_str(&body).context("Parse create dynamic response failed")?; @@ -306,7 +309,10 @@ pub async fn create_dynamic( break; } Err(e) => { - info!("Error reading multipart field: {}", e); + warn!( + error = %e, + "Error reading multipart field" + ); break; } } @@ -323,7 +329,7 @@ pub async fn create_dynamic( // If files are present, upload them first if !files.is_empty() { - info!("Uploading {} files", files.len()); + info!(file_count = files.len(), "Uploading files"); let mut pics: Vec = Vec::new(); for (file_data, file_name, content_type) in files { diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 17c1391..fdedcaa 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -20,6 +20,12 @@ use utoipa_scalar::{Scalar, Servable}; bilibili_handlers::DynamicResponse, aliyun_handlers::DescribeRefreshTasksPayload, aliyun_handlers::RefreshObjectCachesPayload, + aliyun_handlers::OssEventPayload, + aliyun_handlers::OssEventResponse, + aliyun_handlers::OssEventData, + aliyun_handlers::OssData, + aliyun_handlers::OssBucket, + aliyun_handlers::OssObject, crate::aliyun::DescribeRefreshTasksResponse, crate::aliyun::RefreshObjectCachesResponse, crate::aliyun::cdn::TasksContainer, @@ -43,28 +49,51 @@ impl utoipa::Modify for SecurityAddon { .bearer_format("JWT") .build(), ), - ) + ); + components.add_security_scheme( + "eventbridge_token", + utoipa::openapi::security::SecurityScheme::ApiKey( + utoipa::openapi::security::ApiKey::Header( + utoipa::openapi::security::ApiKeyValue::new( + "x-eventbridge-signature-token", + ), + ), + ), + ); } } } pub fn build_router(state: AppState) -> Router { - let (api_routes, mut openapi) = OpenApiRouter::with_openapi(ApiDoc::openapi()) + // Routes without JWT auth (public + custom auth) + let (public_routes, openapi_public) = OpenApiRouter::with_openapi(ApiDoc::openapi()) // Health endpoints (no auth required) .routes(routes!(misc_handlers::ping)) .routes(routes!(misc_handlers::health)) - // Apply JWT authentication for subsequent routes - .route_layer(middleware::from_fn_with_state( - state.clone(), - jwt_auth_middleware, - )) + // Aliyun EventBridge endpoint with custom JWT auth via `x-eventbridge-signature-token` header + .routes(routes!(aliyun_handlers::handle_oss_events)) + .split_for_parts(); + + // Routes protected by Authorization header JWT + let (protected_routes, openapi_protected) = OpenApiRouter::new() // Bilibili routes (protected by JWT auth) .routes(routes!(bilibili_handlers::create_dynamic)) // Aliyun routes (protected by JWT auth) .routes(routes!(aliyun_handlers::describe_refresh_tasks)) .routes(routes!(aliyun_handlers::refresh_object_caches)) + .route_layer(middleware::from_fn_with_state( + state.clone(), + jwt_auth_middleware, + )) .split_for_parts(); + // Merge OpenAPI specs + let mut openapi = openapi_public; + openapi.merge(openapi_protected); + + // Merge route handlers + let api_routes = public_routes.merge(protected_routes); + openapi.paths.paths = openapi .paths .paths