-
Notifications
You must be signed in to change notification settings - Fork 0
Add EventBridge webhook endpoint for OSS events with configurable CDN refresh #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1595d04
28c4f3c
06452a7
cf1b76a
5e3be2d
61c5e20
c2f789e
fb9b43b
f264f0c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| { | ||
| // Use IntelliSense to learn about possible attributes. | ||
| // Hover to view descriptions of existing attributes. | ||
| // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 | ||
| "version": "0.2.0", | ||
| "configurations": [ | ||
| { | ||
| "name": "Debug unit tests in library 'janus'", | ||
| "type": "lldb", | ||
| "request": "launch", | ||
| "cargo": { | ||
| "args": [ | ||
| "test" | ||
| ] | ||
| } | ||
| }, | ||
| { | ||
| "name": "Debug executable 'janus'", | ||
| "type": "lldb", | ||
| "request": "launch", | ||
| "cargo": { | ||
| "args": [ | ||
| "run", | ||
| "--bin=janus", | ||
| ] | ||
| }, | ||
| "args": [ | ||
| "server", | ||
| "-c", | ||
| "tmp.toml" | ||
| ] | ||
| }, | ||
| { | ||
| "name": "Debug unit tests in executable 'janus'", | ||
| "type": "lldb", | ||
| "request": "launch", | ||
| "cargo": { | ||
| "args": [ | ||
| "test", | ||
| "--bin=janus" | ||
| ] | ||
| } | ||
| } | ||
| ] | ||
| } |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -68,3 +68,4 @@ rand = "0.8" | |
| jsonwebtoken = "9.3" | ||
| sha2 = "0.10" | ||
| hmac = "0.12" | ||
| percent-encoding = "2.3.2" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| {"id":"7adc8c1a-645d-4476-bdef-5d6fb57f0001","source":"acs.oss","specversion":"1.0","type":"oss:ObjectCreated:PutObject","datacontenttype":"application/json","subject":"acs:oss:cn-hangzhou:1234567:prts-static/images/test.png","time":"2024-01-13T16:04:46.149Z","aliyuneventbusname":"default","aliyunregionid":"cn-hangzhou","data":{"region":"cn-hangzhou","eventVersion":"1.0","eventSource":"acs:oss","eventName":"ObjectCreated:PutObject","eventTime":"2024-01-13T16:04:46.000Z","requestParameters":{"sourceIPAddress":"192.168.1.100"},"userIdentity":{"principalId":"12345678"},"responseElements":{"requestId":"ABC123"},"oss":{"bucket":{"name":"prts-static","arn":"acs:oss:cn-hangzhou:1234567:prts-static","ownerIdentity":"1234567"},"object":{"key":"images/test.png","eTag":"d41d8cd98f00b204e9800998ecf8427e","deltaSize":12345},"ossSchemaVersion":"1.0"}}} | ||
| {"id":"7adc8c1a-645d-4476-bdef-5d6fb57f0002","source":"acs.oss","specversion":"1.0","type":"oss:ObjectCreated:CompleteMultipartUpload","datacontenttype":"application/json","subject":"acs:oss:cn-hangzhou:1234567:ak-media/videos/large.mp4","time":"2024-01-13T17:10:30.000Z","aliyuneventbusname":"default","aliyunregionid":"cn-hangzhou","data":{"region":"cn-hangzhou","eventVersion":"1.0","eventSource":"acs:oss","eventName":"ObjectCreated:CompleteMultipartUpload","eventTime":"2024-01-13T17:10:30.000Z","requestParameters":{"sourceIPAddress":"192.168.1.101"},"userIdentity":{"principalId":"12345678"},"responseElements":{"requestId":"DEF456"},"oss":{"bucket":{"name":"ak-media","arn":"acs:oss:cn-hangzhou:1234567:ak-media","ownerIdentity":"1234567"},"object":{"key":"videos/large.mp4","eTag":"9b2cf83a0e5e3e5c4e5f5f5f5f5f5f5f","deltaSize":104857600},"ossSchemaVersion":"1.0"}}} | ||
| {"id":"7adc8c1a-645d-4476-bdef-5d6fb57f0003","source":"acs.oss","specversion":"1.0","type":"oss:ObjectRemoved:DeleteObject","datacontenttype":"application/json","subject":"acs:oss:cn-hangzhou:1234567:prts-static/old/deprecated.js","time":"2024-01-13T18:20:15.000Z","aliyuneventbusname":"default","aliyunregionid":"cn-hangzhou","data":{"region":"cn-hangzhou","eventVersion":"1.0","eventSource":"acs:oss","eventName":"ObjectRemoved:DeleteObject","eventTime":"2024-01-13T18:20:15.000Z","requestParameters":{"sourceIPAddress":"192.168.1.102"},"userIdentity":{"principalId":"12345678"},"responseElements":{"requestId":"GHI789"},"oss":{"bucket":{"name":"prts-static","arn":"acs:oss:cn-hangzhou:1234567:prts-static","ownerIdentity":"1234567"},"object":{"key":"old/deprecated.js","eTag":"a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6","deltaSize":-5678},"ossSchemaVersion":"1.0"}}} | ||
| {"id":"7adc8c1a-645d-4476-bdef-5d6fb57f0004","source":"acs.oss","specversion":"1.0","type":"oss:ObjectCreated:PostObject","datacontenttype":"application/json","subject":"acs:oss:cn-hangzhou:1234567:ak-media/uploads/document.pdf","time":"2024-01-13T19:30:45.000Z","aliyuneventbusname":"default","aliyunregionid":"cn-hangzhou","data":{"region":"cn-hangzhou","eventVersion":"1.0","eventSource":"acs:oss","eventName":"ObjectCreated:PostObject","eventTime":"2024-01-13T19:30:45.000Z","requestParameters":{"sourceIPAddress":"192.168.1.103"},"userIdentity":{"principalId":"12345678"},"responseElements":{"requestId":"JKL012"},"oss":{"bucket":{"name":"ak-media","arn":"acs:oss:cn-hangzhou:1234567:ak-media","ownerIdentity":"1234567"},"object":{"key":"uploads/document.pdf","eTag":"1a2b3c4d5e6f7g8h9i0j1k2l3m4n5o6","deltaSize":20480},"ossSchemaVersion":"1.0"}}} | ||
| {"id":"7adc8c1a-645d-4476-bdef-5d6fb57f0005","source":"acs.oss","specversion":"1.0","type":"oss:ObjectCreated:CopyObject","datacontenttype":"application/json","subject":"acs:oss:cn-hangzhou:1234567:prts-static/backup/copy.txt","time":"2024-01-13T20:15:20.000Z","aliyuneventbusname":"default","aliyunregionid":"cn-hangzhou","data":{"region":"cn-hangzhou","eventVersion":"1.0","eventSource":"acs:oss","eventName":"ObjectCreated:CopyObject","eventTime":"2024-01-13T20:15:20.000Z","requestParameters":{"sourceIPAddress":"192.168.1.104"},"userIdentity":{"principalId":"12345678"},"responseElements":{"requestId":"MNO345"},"oss":{"bucket":{"name":"prts-static","arn":"acs:oss:cn-hangzhou:1234567:prts-static","ownerIdentity":"1234567"},"object":{"key":"backup/copy.txt","eTag":"f1e2d3c4b5a6978869504132231455","deltaSize":1024},"ossSchemaVersion":"1.0"}}} | ||
| {"id":"7adc8c1a-645d-4476-bdef-5d6fb57f0006","source":"acs.oss","specversion":"1.0","type":"oss:ObjectCreated:AppendObject","datacontenttype":"application/json","subject":"acs:oss:cn-hangzhou:1234567:ak-media/logs/app.log","time":"2024-01-13T21:05:10.000Z","aliyuneventbusname":"default","aliyunregionid":"cn-hangzhou","data":{"region":"cn-hangzhou","eventVersion":"1.0","eventSource":"acs:oss","eventName":"ObjectCreated:AppendObject","eventTime":"2024-01-13T21:05:10.000Z","requestParameters":{"sourceIPAddress":"192.168.1.105"},"userIdentity":{"principalId":"12345678"},"responseElements":{"requestId":"PQR678"},"oss":{"bucket":{"name":"ak-media","arn":"acs:oss:cn-hangzhou:1234567:ak-media","ownerIdentity":"1234567"},"object":{"key":"logs/app.log","eTag":"abcdef0123456789abcdef0123456789","deltaSize":512},"ossSchemaVersion":"1.0"}}} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,13 +1,17 @@ | ||
| use axum::{Json, extract::State}; | ||
| use axum::{Json, extract::State, http::HeaderMap}; | ||
| use percent_encoding::{NON_ALPHANUMERIC, percent_encode}; | ||
| use serde::{Deserialize, Serialize}; | ||
| use tracing::info; | ||
| use utoipa::ToSchema; | ||
|
|
||
| use crate::aliyun::{ | ||
| AliyunCdnClient, DescribeRefreshTasksRequest, DescribeRefreshTasksResponse, | ||
| RefreshObjectCachesRequest, RefreshObjectCachesResponse, | ||
| }; | ||
| use crate::error::AppResult; | ||
| use crate::state::AppState; | ||
| use crate::{ | ||
| aliyun::{ | ||
| AliyunCdnClient, DescribeRefreshTasksRequest, DescribeRefreshTasksResponse, | ||
| RefreshObjectCachesRequest, RefreshObjectCachesResponse, | ||
| }, | ||
| error::{AppError, AppResult}, | ||
| }; | ||
|
|
||
| /// Request payload for describe refresh tasks endpoint | ||
| #[derive(ToSchema, Serialize, Deserialize, Debug)] | ||
|
|
@@ -141,3 +145,166 @@ pub async fn refresh_object_caches( | |
|
|
||
| Ok(Json(response)) | ||
| } | ||
|
|
||
| /// OSS bucket information in event data | ||
| #[derive(ToSchema, Serialize, Deserialize, Debug)] | ||
| pub struct OssBucket { | ||
| pub name: String, | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub arn: Option<String>, | ||
| #[serde(rename = "ownerIdentity", skip_serializing_if = "Option::is_none")] | ||
| pub owner_identity: Option<String>, | ||
| } | ||
|
|
||
| /// OSS object information in event data | ||
| #[derive(ToSchema, Serialize, Deserialize, Debug)] | ||
| pub struct OssObject { | ||
| pub key: String, | ||
| #[serde(rename = "eTag", skip_serializing_if = "Option::is_none")] | ||
| pub etag: Option<String>, | ||
| #[serde(rename = "deltaSize", skip_serializing_if = "Option::is_none")] | ||
| pub delta_size: Option<i64>, | ||
| } | ||
|
|
||
| /// OSS-specific data in event | ||
| #[derive(ToSchema, Serialize, Deserialize, Debug)] | ||
| pub struct OssData { | ||
| pub bucket: OssBucket, | ||
| pub object: OssObject, | ||
| #[serde(rename = "ossSchemaVersion", skip_serializing_if = "Option::is_none")] | ||
| pub oss_schema_version: Option<String>, | ||
| } | ||
|
|
||
| /// Complete event data structure from OSS | ||
| #[derive(ToSchema, Serialize, Deserialize, Debug)] | ||
| pub struct OssEventData { | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub region: Option<String>, | ||
| #[serde(rename = "eventVersion", skip_serializing_if = "Option::is_none")] | ||
| pub event_version: Option<String>, | ||
| #[serde(rename = "eventSource", skip_serializing_if = "Option::is_none")] | ||
| pub event_source: Option<String>, | ||
| #[serde(rename = "eventName", skip_serializing_if = "Option::is_none")] | ||
| pub event_name: Option<String>, | ||
| #[serde(rename = "eventTime", skip_serializing_if = "Option::is_none")] | ||
| pub event_time: Option<String>, | ||
| pub oss: OssData, | ||
| } | ||
|
|
||
| /// EventBridge OSS event payload | ||
| #[derive(ToSchema, Serialize, Deserialize, Debug)] | ||
| pub struct OssEventPayload { | ||
| pub id: String, | ||
| pub source: String, | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub specversion: Option<String>, | ||
| #[serde(rename = "type", skip_serializing_if = "Option::is_none")] | ||
| pub event_type: Option<String>, | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub datacontenttype: Option<String>, | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub subject: Option<String>, | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub time: Option<String>, | ||
| pub data: OssEventData, | ||
| } | ||
|
|
||
| /// Response for OSS event handler | ||
| #[derive(ToSchema, Serialize, Deserialize, Debug)] | ||
| pub struct OssEventResponse { | ||
| pub message: String, | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub task_id: Option<String>, | ||
| } | ||
|
|
||
| /// Handle Aliyun EventBridge OSS events | ||
| #[utoipa::path( | ||
| post, | ||
| tag = "aliyun", | ||
| path = "/aliyun/events", | ||
| request_body = OssEventPayload, | ||
| responses( | ||
| (status = OK, description = "Successfully processed OSS event and triggered CDN refresh", body = OssEventResponse), | ||
| (status = UNAUTHORIZED, description = "Missing or invalid x-eventbridge-signature-token"), | ||
| (status = BAD_REQUEST, description = "Invalid request or unsupported bucket"), | ||
| (status = INTERNAL_SERVER_ERROR, description = "Internal server error") | ||
| ), | ||
| security( | ||
| ("eventbridge_token" = []) | ||
| ) | ||
| )] | ||
|
Comment on lines
+221
to
+235
|
||
| pub async fn handle_oss_events( | ||
| State(state): State<AppState>, | ||
| headers: HeaderMap, | ||
| Json(raw_payload): Json<serde_json::Value>, | ||
| ) -> AppResult<Json<OssEventResponse>> { | ||
| let token = headers | ||
| .get("x-eventbridge-signature-token") | ||
| .ok_or_else(|| { | ||
| AppError::Unauthorized(anyhow::anyhow!( | ||
| "Missing x-eventbridge-signature-token header" | ||
| )) | ||
| })? | ||
| .to_str() | ||
| .map_err(|_| { | ||
| AppError::Unauthorized(anyhow::anyhow!( | ||
| "Invalid x-eventbridge-signature-token header format" | ||
| )) | ||
| })? | ||
| .trim(); | ||
|
|
||
| crate::auth::verify_token(token, &state.jwt_config.public_key).map_err(|err| { | ||
| AppError::Unauthorized(anyhow::anyhow!( | ||
| "JWT verification failed (x-eventbridge-signature-token): {err}" | ||
| )) | ||
| })?; | ||
|
|
||
| // Parse the raw JSON into OssEventPayload | ||
| let payload: OssEventPayload = serde_json::from_value(raw_payload).map_err(|err| { | ||
daflyinbed marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| AppError::BadRequest(anyhow::anyhow!( | ||
| "Failed to parse OSS event payload: {}", | ||
| err | ||
| )) | ||
| })?; | ||
|
|
||
| info!( | ||
| event = ?payload, | ||
| "Received OSS event" | ||
| ); | ||
|
|
||
| let bucket_name = &payload.data.oss.bucket.name; | ||
| let object_key = &payload.data.oss.object.key; | ||
|
|
||
| // Get URL template from bucket map | ||
| let url_template = state | ||
| .aliyun_config | ||
| .bucket_url_map | ||
| .get(bucket_name) | ||
| .ok_or_else(|| { | ||
| AppError::BadRequest(anyhow::anyhow!("Unsupported bucket: {}", bucket_name)) | ||
| })?; | ||
|
|
||
| // Build the full URL by replacing {object_key} with the actual encoded object key | ||
| let encoded_object_key = percent_encode(object_key.as_bytes(), NON_ALPHANUMERIC).to_string(); | ||
| let object_url = url_template.replace("{object_key}", &encoded_object_key); | ||
|
|
||
| // Create CDN client | ||
| let client = AliyunCdnClient::new(&state.aliyun_config, state.http_client.clone()); | ||
|
|
||
| // Refresh the object cache | ||
| let request = RefreshObjectCachesRequest { | ||
| object_path: object_url.clone(), | ||
| object_type: Some("File".to_string()), | ||
| force: Some(false), | ||
| }; | ||
|
|
||
| let response = client.refresh_object_caches(&request).await?; | ||
|
|
||
| Ok(Json(OssEventResponse { | ||
| message: format!( | ||
| "CDN refresh triggered for {} in bucket {}", | ||
| object_key, bucket_name | ||
| ), | ||
| task_id: Some(response.refresh_task_id), | ||
| })) | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.