Skip to content

Commit 905c5cd

Browse files
committed
Video shorts dagster pipeline
1 parent f875036 commit 905c5cd

File tree

13 files changed

+921
-3
lines changed

13 files changed

+921
-3
lines changed

dg_projects/canvas/canvas/definitions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020
default_io_manager,
2121
)
2222
from ol_orchestrate.lib.utils import authenticate_vault, s3_uploads_bucket
23+
from ol_orchestrate.resources.api_client_factory import ApiClientFactory
2324

2425
from canvas.assets.canvas import (
2526
canvas_course_ids,
2627
course_content_metadata,
2728
export_course_content,
2829
)
29-
from canvas.resources.api_client_factory import ApiClientFactory
3030
from canvas.sensors.canvas import canvas_google_sheet_course_id_sensor
3131

3232
# Initialize vault with resilient loading

dg_projects/learning_resources/Dockerfile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ FROM python:3.13-slim-bookworm
3232
# It is important to use the image that matches the builder, as the path to the
3333
# Python executable must be the same.
3434

35+
# Install ffmpeg for video processing (needed by yt-dlp)
36+
RUN apt-get update && \
37+
apt-get install -y ffmpeg && \
38+
apt-get clean && \
39+
rm -rf /var/lib/apt/lists/*
40+
3541
# Copy the application from the builder
3642
COPY --from=builder /app/dg_projects/learning_resources /app
3743

Lines changed: 327 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,327 @@
1+
"""Assets for processing YouTube Shorts videos."""
2+
3+
import hashlib
4+
import json
5+
import tempfile
6+
from pathlib import Path
7+
from typing import Any
8+
9+
import httpx
10+
import yt_dlp
11+
from dagster import (
12+
AssetExecutionContext,
13+
AssetIn,
14+
AssetKey,
15+
AssetMaterialization,
16+
AssetOut,
17+
Backoff,
18+
DagsterEventType,
19+
DataVersion,
20+
DynamicPartitionsDefinition,
21+
EventRecordsFilter,
22+
Jitter,
23+
Output,
24+
RetryPolicy,
25+
asset,
26+
multi_asset,
27+
)
28+
29+
from learning_resources.lib.youtube import get_highest_quality_thumbnail
30+
31+
# Dynamic partitions for video IDs
32+
youtube_video_ids = DynamicPartitionsDefinition(name="youtube_video_ids")
33+
34+
35+
def get_latest_materialization(
36+
context: AssetExecutionContext,
37+
asset_key: AssetKey,
38+
partition_key: str | None = None,
39+
) -> AssetMaterialization:
40+
"""
41+
Retrieve the latest materialization for a given asset and partition.
42+
43+
Args:
44+
context: Asset execution context
45+
asset_key: The asset key to query
46+
partition_key: Optional partition key to filter by
47+
48+
Returns:
49+
AssetMaterialization object containing metadata from the latest materialization
50+
51+
Raises:
52+
ValueError: If no materialization is found
53+
"""
54+
materialization_records = context.instance.get_event_records(
55+
EventRecordsFilter(
56+
event_type=DagsterEventType.ASSET_MATERIALIZATION,
57+
asset_key=asset_key,
58+
asset_partitions=[partition_key] if partition_key else None,
59+
),
60+
limit=1,
61+
)
62+
63+
if not materialization_records:
64+
partition_msg = f" with partition: {partition_key}" if partition_key else ""
65+
msg = f"No materialization found for asset {asset_key}{partition_msg}"
66+
raise ValueError(msg)
67+
68+
return materialization_records[0].asset_materialization
69+
70+
71+
@multi_asset(
72+
code_version="youtube_shorts_v1",
73+
group_name="youtube_shorts",
74+
required_resource_keys={"youtube_api"},
75+
partitions_def=youtube_video_ids,
76+
outs={
77+
"video_content": AssetOut(
78+
io_manager_key="yt_s3file_io_manager",
79+
key=AssetKey(["youtube_shorts", "video_content"]),
80+
),
81+
"video_thumbnail": AssetOut(
82+
io_manager_key="yt_s3file_io_manager",
83+
key=AssetKey(["youtube_shorts", "video_thumbnail"]),
84+
),
85+
"video_metadata": AssetOut(
86+
io_manager_key="yt_s3file_io_manager",
87+
key=AssetKey(["youtube_shorts", "video_metadata"]),
88+
),
89+
},
90+
)
91+
def download_youtube_video_assets(context: AssetExecutionContext):
92+
"""
93+
Download video content, thumbnail, and metadata for a YouTube video.
94+
95+
This multi-asset downloads:
96+
1. Video content using yt-dlp
97+
2. Highest quality thumbnail
98+
3. Full video metadata from YouTube API
99+
100+
Outputs are uploaded to S3 using S3FileObjectIOManager.
101+
"""
102+
video_id = context.partition_key
103+
context.log.info("Processing video ID: %s", video_id)
104+
105+
# Fetch video metadata from YouTube API
106+
videos = context.resources.youtube_api.client.get_videos([video_id])
107+
if not videos:
108+
msg = f"No video found for ID: {video_id}"
109+
raise ValueError(msg)
110+
111+
video_metadata = videos[0]
112+
video_title = video_metadata["snippet"]["title"]
113+
video_published_at = video_metadata["snippet"]["publishedAt"]
114+
115+
# Create data version from video ID, title, and publish date
116+
# This detects changes in metadata (e.g., title edits) and triggers re-processing
117+
# Hash ensures consistent 16-char version string for file paths
118+
version_string = f"{video_id}|{video_title}|{video_published_at}"
119+
data_version = hashlib.sha256(version_string.encode()).hexdigest()[:16]
120+
context.log.info("Data version for %s: %s", video_id, data_version)
121+
122+
# Download video using yt-dlp
123+
context.log.info("Downloading video: %s", video_title)
124+
with tempfile.TemporaryDirectory() as temp_dir:
125+
video_output_template = str(Path(temp_dir) / f"{video_id}.%(ext)s")
126+
127+
ydl_opts = {
128+
"format": "best[ext=mp4]",
129+
"outtmpl": video_output_template,
130+
"quiet": True,
131+
"no_warnings": True,
132+
}
133+
134+
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
135+
info = ydl.extract_info(
136+
f"https://www.youtube.com/watch?v={video_id}", download=True
137+
)
138+
video_ext = info.get("ext", "mp4")
139+
video_file = Path(temp_dir) / f"{video_id}.{video_ext}"
140+
141+
if not video_file.exists():
142+
msg = f"Failed to download video: {video_id}"
143+
raise RuntimeError(msg)
144+
145+
# S3 path: youtube_shorts/{video_id}/{video_id}.mp4
146+
video_s3_path = f"{video_id}/{video_id}.{video_ext}"
147+
148+
context.log.info("Video downloaded: %s -> %s", video_file, video_s3_path)
149+
150+
# Download thumbnail
151+
thumbnail_info = get_highest_quality_thumbnail(video_metadata)
152+
thumbnail_url = thumbnail_info.get("url")
153+
154+
if not thumbnail_url:
155+
msg = f"No thumbnail found for video: {video_id}"
156+
raise ValueError(msg)
157+
158+
context.log.info("Downloading thumbnail from: %s", thumbnail_url)
159+
thumbnail_response = httpx.get(thumbnail_url)
160+
thumbnail_response.raise_for_status()
161+
162+
# Always use .jpg extension for consistency
163+
thumbnail_ext = "jpg"
164+
thumbnail_file = Path(temp_dir) / f"{video_id}.{thumbnail_ext}"
165+
thumbnail_file.write_bytes(thumbnail_response.content)
166+
167+
# S3 path: youtube_shorts/{video_id}/{video_id}.jpg
168+
thumbnail_s3_path = f"{video_id}/{video_id}.{thumbnail_ext}"
169+
170+
context.log.info("Thumbnail saved: %s -> %s", thumbnail_file, thumbnail_s3_path)
171+
172+
# Save metadata as JSON
173+
metadata_file = Path(temp_dir) / f"{video_id}.json"
174+
with metadata_file.open("w") as f:
175+
json.dump(video_metadata, f, indent=2)
176+
177+
# S3 path: youtube_shorts/{video_id}/{video_id}.json
178+
metadata_s3_path = f"{video_id}/{video_id}.json"
179+
180+
context.log.info("Metadata saved: %s -> %s", metadata_file, metadata_s3_path)
181+
182+
# Yield outputs with data version for change detection
183+
yield Output(
184+
value=(video_file, video_s3_path),
185+
output_name="video_content",
186+
data_version=DataVersion(data_version),
187+
metadata={
188+
"video_id": video_id,
189+
"video_title": video_title,
190+
"publish_date": video_published_at,
191+
"data_version": data_version,
192+
"file_size_bytes": video_file.stat().st_size,
193+
"format": video_ext,
194+
"s3_path": video_s3_path,
195+
},
196+
)
197+
198+
yield Output(
199+
value=(thumbnail_file, thumbnail_s3_path),
200+
output_name="video_thumbnail",
201+
data_version=DataVersion(data_version),
202+
metadata={
203+
"video_id": video_id,
204+
"video_title": video_title,
205+
"data_version": data_version,
206+
"thumbnail_width": thumbnail_info.get("width"),
207+
"thumbnail_height": thumbnail_info.get("height"),
208+
"s3_path": thumbnail_s3_path,
209+
},
210+
)
211+
212+
yield Output(
213+
value=(metadata_file, metadata_s3_path),
214+
output_name="video_metadata",
215+
data_version=DataVersion(data_version),
216+
metadata={
217+
"video_id": video_id,
218+
"video_title": video_title,
219+
"publish_date": video_published_at,
220+
"data_version": data_version,
221+
"s3_path": metadata_s3_path,
222+
"youtube_metadata": video_metadata,
223+
},
224+
)
225+
226+
227+
@asset(
228+
code_version="youtube_shorts_webhook_v1",
229+
key=AssetKey(["youtube_shorts", "video_webhook"]),
230+
group_name="youtube_shorts",
231+
description="Send webhook to Learn API after YouTube video processing.",
232+
partitions_def=youtube_video_ids,
233+
ins={
234+
"video_content": AssetIn(key=AssetKey(["youtube_shorts", "video_content"])),
235+
"video_thumbnail": AssetIn(key=AssetKey(["youtube_shorts", "video_thumbnail"])),
236+
"video_metadata": AssetIn(key=AssetKey(["youtube_shorts", "video_metadata"])),
237+
},
238+
required_resource_keys={"learn_api"},
239+
retry_policy=RetryPolicy(
240+
max_retries=3,
241+
delay=2.0,
242+
backoff=Backoff.EXPONENTIAL,
243+
jitter=Jitter.PLUS_MINUS,
244+
),
245+
)
246+
def youtube_video_webhook(
247+
context: AssetExecutionContext,
248+
video_content: Any,
249+
video_thumbnail: Any,
250+
video_metadata: Any,
251+
) -> dict[str, Any]:
252+
"""
253+
Send webhook notification to Learn API after video assets are ready.
254+
255+
This asset depends on all three video assets (content, thumbnail, metadata)
256+
and only executes after they complete successfully. It sends the YouTube
257+
metadata JSON to the Learn API webhook endpoint.
258+
"""
259+
video_id = context.partition_key
260+
context.log.info("Sending webhook for video ID: %s", video_id)
261+
262+
# Retrieve metadata from upstream asset materialization instead of reading from S3
263+
metadata_asset_key = AssetKey(["youtube_shorts", "video_metadata"])
264+
materialization = get_latest_materialization(
265+
context, metadata_asset_key, partition_key=video_id
266+
)
267+
268+
# Extract metadata dict from materialization metadata
269+
metadata_value = materialization.metadata.get("youtube_metadata")
270+
if metadata_value is None:
271+
msg = f"Metadata field not found in materialization for video: {video_id}"
272+
raise ValueError(msg)
273+
274+
# metadata_value is a MetadataValue, extract the actual dict value
275+
metadata_content = metadata_value.value
276+
277+
# Convert S3 paths to strings for webhook payload
278+
video_content_path = str(video_content)
279+
video_thumbnail_path = str(video_thumbnail)
280+
video_metadata_path = str(video_metadata)
281+
282+
# Extract key from full S3 path (remove s3://bucket/ prefix)
283+
# e.g., "s3://bucket/youtube_shorts/ID/ID.mp4" -> "youtube_shorts/ID/ID.mp4"
284+
content_key = "/".join(video_content_path.split("/")[3:])
285+
thumbnail_key = "/".join(video_thumbnail_path.split("/")[3:])
286+
metadata_key = "/".join(video_metadata_path.split("/")[3:])
287+
288+
# Construct webhook payload with YouTube metadata
289+
webhook_data = {
290+
"video_id": video_id,
291+
"s3_paths": {
292+
"content": content_key,
293+
"thumbnail": thumbnail_key,
294+
"metadata": metadata_key,
295+
},
296+
"youtube_metadata": metadata_content,
297+
"source": "youtube_shorts",
298+
}
299+
300+
context.log.info("Webhook payload for %s: %s", video_id, webhook_data)
301+
302+
try:
303+
# Send webhook to Learn API using HMAC-signed request
304+
response_data = context.resources.learn_api.client.notify_video_shorts(
305+
webhook_data
306+
)
307+
308+
context.log.info(
309+
"Webhook sent successfully for video_id=%s, response=%s",
310+
video_id,
311+
response_data,
312+
)
313+
314+
except httpx.HTTPStatusError as error:
315+
error_message = (
316+
f"Webhook failed for video_id={video_id} "
317+
f"with status code {error.response.status_code}: {error}"
318+
)
319+
context.log.exception(error_message)
320+
raise RuntimeError(error_message) from error
321+
322+
else:
323+
return {
324+
"video_id": video_id,
325+
"webhook_status": "success",
326+
"response_data": response_data,
327+
}

0 commit comments

Comments
 (0)