-
Notifications
You must be signed in to change notification settings - Fork 6
Youtube Shorts Dagster pipeline #1734
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
| GitGuardian id | GitGuardian status | Secret | Commit | Filename | |
|---|---|---|---|---|---|
| 9430286 | Triggered | Generic Password | f0445ad | docker-compose.yaml | View secret |
🛠 Guidelines to remediate hardcoded secrets
- Understand the implications of revoking this secret by investigating where it is used in your code.
- Replace and store your secret safely. Learn here the best practices.
- Revoke and rotate this secret.
- If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.
To avoid such incidents in the future consider
- following these best practices for managing and storing secrets including API keys and other credentials
- install secret detection on pre-commit to catch secret before it leaves your machine and ease remediation.
🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a Dagster pipeline for processing YouTube Shorts videos, extracting metadata, downloading video content and thumbnails, and uploading them to S3. The pipeline includes sensor-based monitoring of YouTube playlists to automatically detect and process new videos.
Key changes:
- Added YouTube Shorts processing pipeline with three assets: video metadata, content, and thumbnails
- Implemented sensor to monitor YouTube channels/playlists for new videos and trigger processing
- Added webhook notifications to MIT Learn API when videos are processed or deleted
Reviewed Changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| dg_projects/learning_resources/learning_resources/assets/youtube_shorts.py | Core asset definitions for downloading and uploading video metadata, content, and thumbnails to S3 |
| dg_projects/learning_resources/learning_resources/sensors/youtube_shorts.py | Sensor for monitoring YouTube channels and triggering video processing jobs |
| dg_projects/learning_resources/learning_resources/resources/youtube_client.py | YouTube API client resource with Vault integration |
| dg_projects/learning_resources/learning_resources/resources/youtube_config.py | Configuration provider for YouTube playlist monitoring |
| dg_projects/learning_resources/learning_resources/resources/api_client_factory.py | Factory for creating API clients with Vault credentials |
| dg_projects/learning_resources/learning_resources/definitions.py | Updated Dagster definitions with YouTube assets, jobs, and sensors |
| packages/ol-orchestrate-lib/src/ol_orchestrate/resources/learn_api.py | Added webhook notification methods for video processing events |
| dg_projects/learning_resources/pyproject.toml | Added dependencies for YouTube API, YAML parsing, and video downloading |
| dg_projects/learning_resources/Dockerfile | Added ffmpeg installation for video processing |
| docker-compose.yaml | Added environment variables for AWS credentials and YouTube API configuration |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
dg_projects/learning_resources/learning_resources/sensors/youtube_shorts.py
Outdated
Show resolved
Hide resolved
dg_projects/learning_resources/learning_resources/sensors/youtube_shorts.py
Outdated
Show resolved
Hide resolved
dg_projects/learning_resources/learning_resources/assets/youtube_shorts.py
Outdated
Show resolved
Hide resolved
bdf1c50 to
a7ae73f
Compare
a7afc9b to
905c5cd
Compare
| ), | ||
| "yt_s3file_io_manager": S3FileObjectIOManager( | ||
| bucket=os.environ.get("YOUTUBE_SHORTS_BUCKET"), | ||
| path_prefix="youtube_shorts", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be /frontend/static/youtube_shorts for MIT-Learn, to be publicly accessible and cached by Fastly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 11 out of 13 changed files in this pull request and generated 1 comment.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| description=( | ||
| "Monitor YouTube playlists daily and process the 16 most recent videos." | ||
| ), | ||
| minimum_interval_seconds=60, # Check once per minute |
Copilot
AI
Oct 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states 'Check once per minute' but line 71 in the description says 'Monitor YouTube playlists daily'. This creates inconsistency between the intended behavior and the configured interval. Update the comment to clarify this is for testing purposes, or adjust the interval to match the daily monitoring intent (e.g., 86400 seconds).
| minimum_interval_seconds=60, # Check once per minute | |
| minimum_interval_seconds=86400, # Check once per day |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stray commit of a change made for testing, fixed
| # This detects changes in metadata (e.g., title edits) and triggers re-processing | ||
| # Hash ensures consistent 16-char version string for file paths | ||
| version_string = f"{video_id}|{video_title}|{video_published_at}" | ||
| data_version = hashlib.sha256(version_string.encode()).hexdigest()[:16] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the motivation for truncating to 16 characters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was originally thinking of using the version in the s3 file path, and at 16 characters would make the path shorter and more readable while keeping collision risk low. But ultimately ended up not including the version in the file path, so will get rid of that truncation.
| materialization = get_latest_materialization( | ||
| context, metadata_asset_key, partition_key=video_id | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than fetching the materialization, the metadata file object is passed as an input to this function automatically by Dagster, so you can just use json.loads(video_metadata.read_text()) to get the actual contents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do, that's actually how I originally had it, but changed it because I thought that function was doing a read from S3 (is it reading from a local file instead?), which seemed a bit wasteful since the data was already present locally.
| metadata_content = metadata_value.value | ||
|
|
||
| # Convert S3 paths to strings for webhook payload | ||
| video_content_path = str(video_content) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The video_content object is a UPath object, so you should be able to just use the .path attribute (https://github.com/fsspec/universal_pathlib)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting rid of this from the payload altogether since it's not going to be used on the other end.
| video_content: Any, | ||
| video_thumbnail: Any, | ||
| video_metadata: Any, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| video_content: Any, | |
| video_thumbnail: Any, | |
| video_metadata: Any, | |
| video_content: UPath, | |
| video_thumbnail: UPath, | |
| video_metadata: UPath, |
| def fetch_youtube_shorts_config(config_url: str) -> list[dict[str, Any]]: | ||
| """ | ||
| Fetch YouTube shorts playlist configuration from GitHub. | ||
| Args: | ||
| config_url: URL to the YAML configuration file | ||
| Returns: | ||
| List of configuration dictionaries containing channel and playlist info | ||
| """ | ||
| response = httpx.get(config_url) | ||
| response.raise_for_status() | ||
| return yaml.safe_load(response.text) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than being a method, this may be better modeled as an asset itself to return the data directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this may complicate things because the sensor relies on this config and resulting list of video ids to determine what the run requests should be:
run_requests = [
RunRequest(
asset_selection=asset_keys,
partition_key=video_id,
)
for video_id in videos_to_process
]How about just moving most of the code that calls this function and determines the value of videos_to_process into a separate helper function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
result = get_videos_to_process(....) which calls fetch_youtube_shorts_config() and ultimately returns the list of video ids (videos_to_process?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like there is a asset external_playlist_config_key = AssetKey(["youtube_shorts", "external_playlist_config"]) but it's materialized / triggered by sensor only?
|
@blarghmatey ready for another look |
| for channel_config in config: | ||
| if playlists := channel_config.get("playlists"): | ||
| playlist_ids.extend([p["id"] for p in playlists]) | ||
|
|
||
| # Fetch all videos from all playlists | ||
| all_video_ids = set() | ||
| for playlist_id in playlist_ids: | ||
| playlist_items = youtube_client.get_playlist_items(playlist_id) | ||
| video_ids = [ | ||
| extract_video_id_from_playlist_item(item) for item in playlist_items | ||
| ] | ||
| all_video_ids.update(video_ids) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a functional difference, but for the sake of visibility and full lineage tracking it might be useful to track the playlists and videos as external assets. That way they get included in the full lineage graph of these assets. The video metadata etc. can also be modeled against those external assets https://docs.dagster.io/guides/build/assets/external-assets
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as external assets
9268559 to
bfe3fa6
Compare
| external_playlist_config_key = AssetKey(["youtube_shorts", "external_playlist_config"]) | ||
| external_playlist_api_key = AssetKey( | ||
| [ | ||
| "youtube_shorts", | ||
| "external_playlist_api", | ||
| ] | ||
| ) | ||
| external_playlist_key = AssetKey(["youtube_shorts", "external_playlist"]) | ||
| external_video_key = AssetKey(["youtube_shorts", "external_video"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor thing: For clarity, should we move these external assets into external group? Right now, they are all in the same group. Also, I think it might be clearer to have separate playlist and video as external assets. Do we need these external_playlist_config and external_playlist_api assets in the graph?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it looks better now. One thing I noticed that I forgot to comment on is that there is no lineage between playlist_api and video_metadata
| bucket=s3_uploads_bucket(DAGSTER_ENV)["bucket"], | ||
| path_prefix=s3_uploads_bucket(DAGSTER_ENV)["prefix"], | ||
| ), | ||
| "yt_s3file_io_manager": S3FileObjectIOManager( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably should use the default IO manager to makes easier to test in local environment. I had to modify this as I ran into S3 permission issue.
"yt_s3file_io_manager": default_file_object_io_manager(
dagster_env=DAGSTER_ENV,
bucket=os.environ.get(
"YOUTUBE_SHORTS_BUCKET", f"ol-mitlearn-app-storage-{DAGSTER_ENV}"
),
path_prefix=os.environ.get("LEARN_SHORTS_PREFIX", "shorts/"),
),```
| def fetch_youtube_shorts_config(config_url: str) -> list[dict[str, Any]]: | ||
| """ | ||
| Fetch YouTube shorts playlist configuration from GitHub. | ||
| Args: | ||
| config_url: URL to the YAML configuration file | ||
| Returns: | ||
| List of configuration dictionaries containing channel and playlist info | ||
| """ | ||
| response = httpx.get(config_url) | ||
| response.raise_for_status() | ||
| return yaml.safe_load(response.text) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like there is a asset external_playlist_config_key = AssetKey(["youtube_shorts", "external_playlist_config"]) but it's materialized / triggered by sensor only?
rachellougee
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! functionality works well overall
Yes, I'm not quite sure how else to do it. It's just a yaml file at https://raw.githubusercontent.com/mitodl/open-video-data/refs/heads/mitopen/youtube/shorts.yaml and is pretty static (it might change once or twice a year if ever). Is there a more dagster-ish way to do it, or should I just remove it as an external asset to avoid any confusion? |
86894cc to
2d73019
Compare
|
I moved code to retrieve the yaml config and youtube api results from the discovery sensor to the assets, but now it takes 2 runs of the discovery sensor to retrieve everything. On the first run it just materializes the config and api, on the 2nd run it materializes the video assets. |
2d73019 to
04e793c
Compare
|
I did some more refactoring and reduced the number of external assets and changed the dependencies a bit, now it seems to be working as expected. |
|
More refactoring - since the playlist config and api results are actually materialized, I removed their "external" status. And the discovery sensor does less work. Best tested locally by reducing the sensor's frequency to every 1-2 minutes. |
rachellougee
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I ran another test - the youtube_shorts_api_schedule and youtube_shorts_discovery_sensor seem to work as expected. But the new video didn't trigger a run, which I've commented below. Otherwise, everything looks good to me.
| automation_condition=( | ||
| upstream_or_code_changes() | ||
| | AutomationCondition.on_missing() | ||
| | AutomationCondition.on_cron("0 * * * *") # Check hourly for metadata changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you check if this is working as expected? There is one video ID partition added by the youtube_shorts_discovery_sensor but I don't see it got picked up by this automation condition?

My understanding that AutomationCondition.on_cron will only triggers run if the default_automation_condition_sensor is enable and running.
Since youtube_shorts_discovery_sensor detects the new videos and add new partitions, why not trigger RunRequest directly from the sensor instead of relying on a cron job to pick it up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assumed default_automation_condition_sensor is supposed to be running for the automation_condition parameters to be in effect? And ideally that should handle triggering materializations for new videos instead of the sensor?
I tested it this way:
- Changed max # videos to process to 8
- Changed frequency of
youtube_shorts_api_scheduleandyoutube_shorts_sensoreach to 60 seconds - Enabled
default_automation_condition_sensor,youtube_shorts_api_schedule, andyoutube_shorts_discovery_sensor - Waited a few minutes - eventually, 8 videos got fully processed/materialized.
- Changed max # videos to process back to 12
- Restarted containers
- After another few minutes, 4 additional videos were processed, bringing the total to 12
If it is okay for the sensor to do more work and start run requests, then I can add that back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. The default_automation_condition_sensor needs to be enabled, which I didn't. In that case, the new video materialization should be triggered by the upstream_or_code_changes() condition, because newly_missing = AutomationCondition.newly_missing() should be evaluated as True for a new asset.
I initially got confused with the additional AutomationCondition.on_missing() and AutomationCondition.on_cron("0 * * * *") conditions, but the metadata changes should already be handled by upstream_or_code_changes() unless I am overlooking something.
Either way, if the new asset is already handled by the automation policy, we don’t need to add it to the sensor. But in my opinion, it doesn’t hurt to request it from the sensor, since it doesn’t have to wait for the automation to pick it up. That said, It's up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just added run requests in the sensor
| @asset( | ||
| key=playlist_config_key, | ||
| group_name="youtube_shorts", | ||
| automation_condition=upstream_or_code_changes(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this assets defines an automation_condition, we should add code_version here. Otherwise, dagster will generates a different code version on each run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added code_version , but got rid of the automation_condition here and for playlist_api because those should be freshly retrieved on every youtube_shorts_api_schedule job run
3b5b55f to
5ce8cca
Compare
rachellougee
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| json.dump(processed_metadata, f, indent=2) | ||
|
|
||
| # S3 path: youtube_shorts/{video_id}/{video_id}.json | ||
| metadata_s3_path = f"{video_id}/{video_id}.json" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question - Is it intended that we don't track the data version for each video on S3? If a video file changes, we only store the latest version?
5ce8cca to
f0445ad
Compare

What are the relevant tickets?
Part of https://github.com/mitodl/hq/issues/8880
Description (What does it do?)
Creates a dagster pipeline for uploading youtube short videos, metadata, and thumbnails to S3 then sending a webhook to mit-learn.
Each of the above plus the webhook is modeled as an asset.
How can this be tested?
minimum_interval_secondsvalue for theyoutube_shorts_sensorto something more frequent like60(every minute):docker compose up --buildyoutube_shorts_discovery_sensorandyoutube_shorts_version_sensorvideo_content,video_thumbnail,video_metadatashould all be created successfully, but thevideo_webhookassets will all fail because they're trying to reach a non-existent endpoint.youtube_shortsprefix have the expected data.