diff --git a/README.md b/README.md index d15fa19..9ca25f0 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,17 @@ # Open Data Stack Projects -This repo contains different example project related to the open data stack. So far it contains these two examples: -- [Airbyte Monitoring with dbt and Metabase](https://airbyte.com/blog/airbyte-monitoring-with-dbt-and-metabase) -- [Configure Airbyte Connections with Python (Dagster)](https://airbyte.com/tutorials/configure-airbyte-with-python-dagster) -- Future potential use-cases: [Series: Building Airbyte’s Data Stack](https://airbyte.com/blog/building-airbytes-data-stack) +The idea is to have an extendable repository for reference projects with the [Open Data Stack](https://glossary.airbyte.com/term/open-data-stack/). It should be possible to add different BI tools or another orchestrator. -The [Open Data Stack](https://glossary.airbyte.com/term/open-data-stack/) is a better term for the [Modern Data Stack](https://glossary.airbyte.com/term/modern-data-stack/) but focuses on solutions built on open-source and open standards covering the Data Engineering Lifecycle. The open data stack is maintained by everyone using it. Companies can reuse existing battle-tested solutions and build on them instead of reinventing the wheel by re-implementing critical components for each component of the data stack. +So far, this repo contains different these two projects: +- Open Data Stack with Airbyte, dbt, dagster, Metabase: [Configure Airbyte Connections with Python (Dagster)](https://airbyte.com/tutorials/configure-airbyte-with-python-dagster) + - The project scrapes the [Awesome Data Engineering List](https://github.com/igorbarinov/awesome-data-engineering) links with Beautiful Soup and ingested the stars from each GitHub repository to a Postgres database—seeing the trends for each in Metabase. + - You can also check why each of the tools has been chosen in [The Open Data Stack Distilled into Four Core Tools](https://airbyte.com/blog/modern-open-data-stack-four-core-tools) +- Open Data Stack with dbt and Metabase: [Use-Case: Airbyte Monitoring](https://airbyte.com/blog/airbyte-monitoring-with-dbt-and-metabase) + - In this project, you learn how to implement an Airbyte monitoring dashboard with dbt and Metabase on a locally deployed Airbyte instance. It will allow you to get an operational view of your current running data ingestions and a high-level overview of what happened in the past. -*Open* is so important and often overlooked because it’s what makes the #opendatastack more embeddable with tools from the open data stack such as Airbyte, dbt, Dagster, Superset, and so forth. Letting you integrate them into your services, unlike closed-source services. +Future potential use cases have been listed in an inial [series](https://airbyte.com/blog/building-airbytes-data-stack) on how we built the data stack at Airbyte. + +--- +The Open Data Stack is a "better" term for the [Modern Data Stack](https://glossary.airbyte.com/term/modern-data-stack/) that focuses on solutions built on open-source and open standards covering the [Data Engineering Lifecycle](https://glossary.airbyte.com/term/data-engineering-lifecycle/). Companies can reuse existing battle-tested solutions and build on them instead of reinventing the wheel by re-implementing critical components for each component of the data stack. + +*Open* is so important and often overlooked because it embeds the data stack with tools from the open data stack, such as Airbyte, dbt, Dagster, Superset, Rill, and many more. Unlike closed-source services, it lets you integrate them into your services or products. diff --git a/dagster/readme.md b/dagster/readme.md index 3389517..e1ac277 100644 --- a/dagster/readme.md +++ b/dagster/readme.md @@ -7,6 +7,7 @@ This use case is documented step by step on [Configure Airbyte Connections with setup virtual env, dependencies: ```bash cd stargazer +python3 -m pip install --upgrade pip #otherwise might have problem with duckdb/dagster pip install -e ".[dev]" ``` diff --git a/dagster/stargazer/assets_modern_data_stack/assets/stargazer.py b/dagster/stargazer/assets_modern_data_stack/assets/stargazer.py index c4aeeee..c60e667 100644 --- a/dagster/stargazer/assets_modern_data_stack/assets/stargazer.py +++ b/dagster/stargazer/assets_modern_data_stack/assets/stargazer.py @@ -1,4 +1,4 @@ -from dagster import asset +from dagster import asset, op from dagster_airbyte import ( AirbyteManagedElementReconciler, airbyte_resource, @@ -6,14 +6,23 @@ AirbyteSyncMode, load_assets_from_connections, ) -from dagster_airbyte.managed.generated.sources import GithubSource +from dagster_airbyte.managed.generated.sources import ( + FakerSource, + GithubSource, + RssSource, +) from dagster_airbyte.managed.generated.destinations import ( LocalJsonDestination, PostgresDestination, + DuckdbDestination, ) from typing import List from dagster_dbt import load_assets_from_dbt_project +import requests +import base64 +import re +from urllib.parse import urlparse from bs4 import BeautifulSoup import os @@ -21,6 +30,7 @@ import asyncio import aiohttp +from sqlalchemy import schema from ..utils.constants import DBT_PROJECT_DIR @@ -29,6 +39,75 @@ ) POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "please-set-your-token") +GNEW_API_KEY = os.environ.get("GNEW_API_KEY", "please-set-your-token") +NEWS_API_KEY = os.environ.get("NEWS_API_KEY", "please-set-your-token") + +# feed of DE +rss_list = [ + "https://www.theseattledataguy.com/feed/", + "https://www.dataschool.io/rss/", + # "ttps://roundup.getdbt.com/feed", + # "http://feeds.feedburner.com/JamesSerra", + # "https://pedram.substack.com/feed", + # "http://www.gregreda.com/feeds/all.atom.xml", + # "https://jpmonteiro.substack.com/feed", + # "https://www.confessionsofadataguy.com/feed/", + # "https://petrjanda.substack.com/feed", + # "https://sarahsnewsletter.substack.com/feed", + # "https://newsletter.pragmaticengineer.com/feed", + # "https://stkbailey.substack.com/feed", + # "https://benn.substack.com/feed", + # "https://technically.substack.com/feed", + # "https://sspaeti.com/index.xml", + # "https://www.startdataengineering.com/index.xml", + # "https://seattledataguy.substack.com/feed", + # "https://dataproducts.substack.com/feed", + # "https://blog.sqlauthority.com/feed/", + # "https://www.dataengineeringweekly.com/feed", + # "https://medium.com/feed/@maximebeauchemin", + # "https://www.data-is-plural.com/feed.xml", + # "https://davidsj.substack.com/feed", + # "https://kentgraziano.com/feed/", + # "https://fromanengineersight.substack.com/feed", + # "https://groupby1.substack.com/feed", + # "http://www.cathrinewilhelmsen.net/feed/", + # "https://www.eckerson.com/feed/", + # "https://joereis.substack.com/feed", + # "https://medium.com/feed/@laurengreerbalik", + # "https://www.winwithdata.io/feed", + # "https://letters.moderndatastack.xyz/rss/", + # "https://chollinger.com/blog/index.xml", + # "https://mehdio.substack.com/feed", + # "https://moderndata101.substack.com/feed", + # "http://www.hansmichiels.com/feed/", + # "https://agilebi.guru/feed/", + # "https://medium.com/feed/@karimjdda", + # "https://www.kahandatasolutions.com/blog.rss", + # "https://semanticinsight.wordpress.com/feed/", + # "http://www.tableausoftware.com/taxonomy/term/1920/feed", + # "https://dataespresso.substack.com/feed", + # "https://jonathanneo.substack.com/feed", + # "https://vickiboykis.com/index.xml", + # "https://ownyourdata.ai/wp/feed/", +] + + +def extract_domain(url: str) -> str: + parsed_url = urlparse(url) + # remove special character that are not allowed for dagster asset key + cleaned_string = re.sub( + r"[^A-Za-z0-9_]", "", parsed_url.netloc.replace("www.", "").replace(".", "_") + ) + + return cleaned_string + + +# pylint: redefined-builtin +from typing import List, Optional, Union +import dagster._check as check +from dagster._annotations import public +from dagster_airbyte.managed.types import GeneratedAirbyteSource + airbyte_instance = airbyte_resource.configured( { @@ -77,7 +156,6 @@ def get_awesome_repo_list() -> str: url = "https://github.com/igorbarinov/awesome-data-engineering" html = requests.get(url) soup = BeautifulSoup(html.text, "html.parser") - # parse all links into a list starting with github.com links = [ link.get("href") for link in soup.find_all("a") @@ -93,66 +171,182 @@ def get_awesome_repo_list() -> str: links = [link[:-1] if link.endswith("/") else link for link in links] # remove repos without organization links = [link for link in links if len(link.split("/")) == 5] - # check if links are still existing in parallel to save time existings_links = asyncio.run(check_websites_exists(links)) - # remove `https://github.com/` from links links = [link.replace("https://github.com/", "") for link in existings_links] # due to timeout limits while airbyte is checking each repo, I limited it here to make this demo work for you - links = links[0:10] + links = links[0:2] # return links as a string with blank space as separator return " ".join(links) +# test_data = FakerSource(name="test_data", count=100) + gh_awesome_de_list_source = GithubSource( name="gh_awesome_de_list", credentials=GithubSource.PATCredentials(AIRBYTE_PERSONAL_GITHUB_TOKEN), start_date="2020-01-01T00:00:00Z", - repository=get_awesome_repo_list(), # "prometheus/haproxy_exporter", + # repository=get_awesome_repo_list(), # "prometheus/haproxy_exporter", + repository="prometheus/haproxy_exporter", page_size_for_large_streams=100, ) -postgres_destination = PostgresDestination( - name="postgres", - host="localhost", - port=5432, - database="postgres", - schema="public", - username="postgres", - password=POSTGRES_PASSWORD, - ssl_mode=PostgresDestination.Disable(), +# postgres_destination = PostgresDestination( +# name="postgres", +# host="localhost", +# port=5432, +# database="postgres", +# schema="public", +# username="postgres", +# password=POSTGRES_PASSWORD, +# ssl_mode=PostgresDestination.Disable(), +# ) + + +# stargazer_connection = AirbyteConnection( +# name="fetch_stargazer", +# source=gh_awesome_de_list_source, +# destination=postgres_destination, +# stream_config={"stargazers": AirbyteSyncMode.incremental_append()}, +# normalize_data=False, +# ) + + +# test_connection = AirbyteConnection( +# name="fetch_test_data_duckdb", +# source=test_data, +# destination=duckdb_destination, +# stream_config={ +# "_airbyte_raw_users": AirbyteSyncMode.full_refresh_overwrite(), +# "_airbyte_raw_products": AirbyteSyncMode.full_refresh_overwrite(), +# }, +# normalize_data=False, # True, +# ) + +json_awesome_de_list = LocalJsonDestination( + name="json_awesome_de_list", + destination_path=f"/local/ods/json_awesome_de_list/", +) + +duckdb_destination = DuckdbDestination( + name="duckdb", + path="/local/ods/stage.db", ) stargazer_connection = AirbyteConnection( - name="fetch_stargazer", + name="fetch_stargazer_duckdb", source=gh_awesome_de_list_source, - destination=postgres_destination, - stream_config={"stargazers": AirbyteSyncMode.incremental_append_dedup()}, - normalize_data=True, + destination=json_awesome_de_list, # duckdb_destination, + stream_config={"stargazers": AirbyteSyncMode.incremental_append()}, + normalize_data=False, ) + +# ############################### +# # simple pattern analysis +# ############################### + + +repo_connection = AirbyteConnection( + name="fetch_stargazer_duckdb", + source=gh_awesome_de_list_source, + destination=duckdb_destination, + stream_config={ + "repositories": AirbyteSyncMode.incremental_append(), + "stargazers": AirbyteSyncMode.incremental_append(), + }, + normalize_data=False, # True, +) + + +@op +def fetch_readme(repos: list) -> dict: + + GITHUB_API_TOKEN = os.getenv("GITHUB_API_TOKEN") + headers = { + "Accept": "application/vnd.github+json", + "Authorization": "token {}".format(GITHUB_API_TOKEN), + } + + matches_dict = {} + + for repo in repos: + + url = "https://api.github.com/repos/{}/readme".format(repo) + + response = requests.get(url, headers=headers) + content = response.json()["content"] + + # Decode base64 content + readme_content = base64.b64decode(content).decode("utf-8") + + # Extract links and names using regex + pattern = r"\* \[(.+?)\]\((.+?)\)" + matches = re.findall(pattern, readme_content) + + for match in matches: + name, link = match + matches_dict[name] = link + + print(f"Name: {name}\nLink: {link}\n") + + return matches_dict + + +def create_rss_airbyte_connection(name: str, rss_url: str): + rss_source = RssSource(name=f"rss_{name}", url=rss_url) + json_destination = LocalJsonDestination( + name=f"json_{name}", + destination_path=f"/local/ods/json_export/{name}.jsonl", + ) + + connection = AirbyteConnection( + name=f"fetch_{name}", + source=rss_source, + destination=json_destination, # duckdb_destination: Would need to handle JSONL format, for now it will through an "Invalid JSON" error + stream_config={"items": AirbyteSyncMode.full_refresh_overwrite()}, + normalize_data=False, + prefix=f"{name}_", + ) + return connection + + +rss_connections = [] +for i, rss_url in enumerate(rss_list): + readable_name = extract_domain(rss_url) + + i = create_rss_airbyte_connection(readable_name, rss_url) + rss_connections.append(i) + +# @asset( +# description="The metabase dashboard where the stargazers are visualized", +# metadata={"dashboard_url": "http://localhost:3000/dashboard/1-airbyte-sync-status"}, +# ) +# def metabase_dashboard(mart_gh_cumulative): +# return "test" + + +# ################################## +# Dagster Reconciler +# ################################## + airbyte_reconciler = AirbyteManagedElementReconciler( airbyte=airbyte_instance, - connections=[stargazer_connection], + connections=rss_connections + [stargazer_connection], + delete_unmentioned_resources=True, ) # load airbyte connection from above pythonic definitions airbyte_assets = load_assets_from_connections( airbyte=airbyte_instance, - connections=[stargazer_connection], - key_prefix=["postgres"], + connections=[stargazer_connection] + rss_connections, + key_prefix=["ods"], ) # preparing assets bassed on existing dbt project dbt_assets = load_assets_from_dbt_project( - project_dir=DBT_PROJECT_DIR, io_manager_key="db_io_manager", key_prefix="postgres" + project_dir=DBT_PROJECT_DIR, + io_manager_key="db_io_manager", + key_prefix=["ods"], ) - - -# @asset( -# description="The metabase dashboard where the stargazers are visualized", -# metadata={"dashboard_url": "http://localhost:3000/dashboard/1-airbyte-sync-status"}, -# ) -# def metabase_dashboard(mart_gh_cumulative): -# return "test" diff --git a/dagster/stargazer/assets_modern_data_stack/repository.py b/dagster/stargazer/assets_modern_data_stack/repository.py index e146b3a..ef6825a 100644 --- a/dagster/stargazer/assets_modern_data_stack/repository.py +++ b/dagster/stargazer/assets_modern_data_stack/repository.py @@ -16,12 +16,13 @@ def assets_modern_data_stack(): return [ airbyte_assets, - with_resources( - dbt_assets, # load_assets_from_package_module(assets), - resource_defs={ - "dbt": dbt_cli_resource.configured(DBT_CONFIG), - "db_io_manager": db_io_manager.configured(POSTGRES_CONFIG), - }, - ), + ## Disable for now as we use DuckDB and Rill for tranformations + # with_resources( + # dbt_assets, + # resource_defs={ + # "dbt": dbt_cli_resource.configured(DBT_CONFIG), + # "db_io_manager": db_io_manager.configured(POSTGRES_CONFIG), + # }, + # ), # metabase_dashboard, ] diff --git a/dagster/stargazer/setup.py b/dagster/stargazer/setup.py index 188e40b..34d9bae 100644 --- a/dagster/stargazer/setup.py +++ b/dagster/stargazer/setup.py @@ -14,10 +14,11 @@ "numpy", "scipy", "dbt-core", - "dbt-postgres", - "aiohttp", + "dbt-postgres" "aiohttp", "requests", "beautifulsoup4", + "dbt-duckdb==1.4.0", + "duckdb==0.7.0", # this must be aligned with the `destination-duckdb` version of airbyte, otherwise stage.db can't be opened! ], extras_require={"dev": ["dagit", "pytest", "black"]}, ) diff --git a/dagster/stargazer/stage.db b/dagster/stargazer/stage.db new file mode 100644 index 0000000..6b16da3 Binary files /dev/null and b/dagster/stargazer/stage.db differ diff --git a/dagster/stargazer/transformation_dbt/config/profiles.yml b/dagster/stargazer/transformation_dbt/config/profiles.yml index f9f6596..c7fed2a 100644 --- a/dagster/stargazer/transformation_dbt/config/profiles.yml +++ b/dagster/stargazer/transformation_dbt/config/profiles.yml @@ -2,11 +2,14 @@ stargazers: target: dev outputs: dev: - type: postgres - threads: 1 - host: localhost - port: 5432 - user: postgres - pass: password - dbname: postgres - schema: public + # type: postgres + # threads: 1 + # host: localhost + # port: 5432 + # user: postgres + # pass: password + # dbname: postgres + # schema: public + type: duckdb + path: /tmp/airbyte_local/ods/stage.db + schema: main diff --git a/visualization/rill_data/.gitignore b/visualization/rill_data/.gitignore new file mode 100644 index 0000000..7eeef89 --- /dev/null +++ b/visualization/rill_data/.gitignore @@ -0,0 +1,4 @@ +# Rill +*.db +*.db.wal +data/ diff --git a/visualization/rill_data/dashboards/.gitkeep b/visualization/rill_data/dashboards/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/visualization/rill_data/dashboards/stargazer_dashboard.yaml b/visualization/rill_data/dashboards/stargazer_dashboard.yaml new file mode 100644 index 0000000..6f07752 --- /dev/null +++ b/visualization/rill_data/dashboards/stargazer_dashboard.yaml @@ -0,0 +1,13 @@ +# Visit https://docs.rilldata.com/references/project-files to learn more about Rill project files. + +display_name: "stargazer_dashboard" +model: "stargazer" +default_time_range: "" +smallest_time_grain: "" +timeseries: "starred_at" +measures: + - label: Total records + expression: count(*) + description: Total number of records present + format_preset: humanize +dimensions: [] diff --git a/visualization/rill_data/models/.gitkeep b/visualization/rill_data/models/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/visualization/rill_data/models/model.sql b/visualization/rill_data/models/model.sql new file mode 100644 index 0000000..9d83163 --- /dev/null +++ b/visualization/rill_data/models/model.sql @@ -0,0 +1 @@ +from information_schema.tables \ No newline at end of file diff --git a/visualization/rill_data/models/rss_de_feeds.sql b/visualization/rill_data/models/rss_de_feeds.sql new file mode 100644 index 0000000..73ac3c1 --- /dev/null +++ b/visualization/rill_data/models/rss_de_feeds.sql @@ -0,0 +1 @@ +SELECT count(_airbyte_data.title) FROM read_json_auto('/tmp/airbyte_local/ods/json_export/*/*.jsonl') \ No newline at end of file diff --git a/visualization/rill_data/models/salary_de.sql b/visualization/rill_data/models/salary_de.sql new file mode 100644 index 0000000..7d89fa7 --- /dev/null +++ b/visualization/rill_data/models/salary_de.sql @@ -0,0 +1 @@ +from '/Users/sspaeti/Documents/git/work/open-data-stack.git/simon/ods-in-a-box/datasets/DataEngineer.csv' \ No newline at end of file diff --git a/visualization/rill_data/models/stargazer.sql b/visualization/rill_data/models/stargazer.sql new file mode 100644 index 0000000..856b7b1 --- /dev/null +++ b/visualization/rill_data/models/stargazer.sql @@ -0,0 +1,17 @@ + + +SELECT +_airbyte_data.starred_at, 1 as count +from read_json_auto('/tmp/airbyte_local/ods/awesome-list/_airbyte_raw_stargazers.jsonl') + + + +/*SELECT +_airbyte_data.* + +from read_json_auto('/tmp/airbyte_local/ods/export_json/_airbyte_raw_users.jsonl') +left outer join read_json_auto('/tmp/airbyte_local/ods/export_json/_airbyte_raw_users.jsonl') + +--from read_ndjson_objects('/tmp/airbyte_local/ods/export_json/_airbyte_raw_users.jsonl') +*/ + diff --git a/visualization/rill_data/readme.md b/visualization/rill_data/readme.md index 3273b25..5155f05 100644 --- a/visualization/rill_data/readme.md +++ b/visualization/rill_data/readme.md @@ -2,6 +2,18 @@ [Rill Data](https://www.rilldata.com/) one of the latest BI tools and based on DuckDB. It's fully interactive and using DuckDB as a caching layer. + +## Open Data Stack in a Box + +Starting rill with the project from this repo but the database where we loaded airbyte sources into it: +```sh +rill start --db /tmp/airbyte_local/ods/stage.db --project . +``` + +✍ Note: Make sure the the location matches the settings in `stargazer.py` in the dagster project - defaults settings will work + + +## [Airbyte Monitoring](https://airbyte.com/blog/airbyte-monitoring-with-dbt-and-metabase) Unfortunately the data cannot be read live from airbyte postgres, but you can copy the data with a simple comand as CSV and read these with Rill. The commands below show you how you could create a snapshot and vizualize these data. diff --git a/visualization/rill_data/rill.yaml b/visualization/rill_data/rill.yaml new file mode 100644 index 0000000..09bdd50 --- /dev/null +++ b/visualization/rill_data/rill.yaml @@ -0,0 +1,4 @@ +compiler: rill-beta +rill_version: 0.20.0 + +name: rill_data diff --git a/visualization/rill_data/sources/.gitkeep b/visualization/rill_data/sources/.gitkeep new file mode 100644 index 0000000..e69de29