From b2a27aa13034091c306c0d0b8d6aaffada7d08a9 Mon Sep 17 00:00:00 2001 From: sspaeti Date: Fri, 10 Feb 2023 11:55:06 +0100 Subject: [PATCH 1/5] adding duckdb --- dagster/stargazer/setup.py | 1 + .../transformation_dbt/config/profiles.yml | 19 +++++++++++-------- .../stargazer/transformation_dbt/profiles.yml | 19 +++++++++++-------- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/dagster/stargazer/setup.py b/dagster/stargazer/setup.py index 188e40b..1302133 100644 --- a/dagster/stargazer/setup.py +++ b/dagster/stargazer/setup.py @@ -18,6 +18,7 @@ "aiohttp", "requests", "beautifulsoup4", + "dbt-duckdb", ], extras_require={"dev": ["dagit", "pytest", "black"]}, ) diff --git a/dagster/stargazer/transformation_dbt/config/profiles.yml b/dagster/stargazer/transformation_dbt/config/profiles.yml index f9f6596..87a783a 100644 --- a/dagster/stargazer/transformation_dbt/config/profiles.yml +++ b/dagster/stargazer/transformation_dbt/config/profiles.yml @@ -2,11 +2,14 @@ stargazers: target: dev outputs: dev: - type: postgres - threads: 1 - host: localhost - port: 5432 - user: postgres - pass: password - dbname: postgres - schema: public + # type: postgres + # threads: 1 + # host: localhost + # port: 5432 + # user: postgres + # pass: password + # dbname: postgres + # schema: public + type: duckdb + path: /tmp/airbyte_local/opendatastack-in-a-box.duckdb + schema: main diff --git a/dagster/stargazer/transformation_dbt/profiles.yml b/dagster/stargazer/transformation_dbt/profiles.yml index f9f6596..87a783a 100644 --- a/dagster/stargazer/transformation_dbt/profiles.yml +++ b/dagster/stargazer/transformation_dbt/profiles.yml @@ -2,11 +2,14 @@ stargazers: target: dev outputs: dev: - type: postgres - threads: 1 - host: localhost - port: 5432 - user: postgres - pass: password - dbname: postgres - schema: public + # type: postgres + # threads: 1 + # host: localhost + # port: 5432 + # user: postgres + # pass: password + # dbname: postgres + # schema: public + type: duckdb + path: /tmp/airbyte_local/opendatastack-in-a-box.duckdb + schema: main From a3d6edbcb5e3e95ca35abf9c0ae5a585e4eb4028 Mon Sep 17 00:00:00 2001 From: sspaeti Date: Thu, 9 Mar 2023 09:39:29 +0100 Subject: [PATCH 2/5] Adding more context --- README.md | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index d15fa19..9ca25f0 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,17 @@ # Open Data Stack Projects -This repo contains different example project related to the open data stack. So far it contains these two examples: -- [Airbyte Monitoring with dbt and Metabase](https://airbyte.com/blog/airbyte-monitoring-with-dbt-and-metabase) -- [Configure Airbyte Connections with Python (Dagster)](https://airbyte.com/tutorials/configure-airbyte-with-python-dagster) -- Future potential use-cases: [Series: Building Airbyte’s Data Stack](https://airbyte.com/blog/building-airbytes-data-stack) +The idea is to have an extendable repository for reference projects with the [Open Data Stack](https://glossary.airbyte.com/term/open-data-stack/). It should be possible to add different BI tools or another orchestrator. -The [Open Data Stack](https://glossary.airbyte.com/term/open-data-stack/) is a better term for the [Modern Data Stack](https://glossary.airbyte.com/term/modern-data-stack/) but focuses on solutions built on open-source and open standards covering the Data Engineering Lifecycle. The open data stack is maintained by everyone using it. Companies can reuse existing battle-tested solutions and build on them instead of reinventing the wheel by re-implementing critical components for each component of the data stack. +So far, this repo contains different these two projects: +- Open Data Stack with Airbyte, dbt, dagster, Metabase: [Configure Airbyte Connections with Python (Dagster)](https://airbyte.com/tutorials/configure-airbyte-with-python-dagster) + - The project scrapes the [Awesome Data Engineering List](https://github.com/igorbarinov/awesome-data-engineering) links with Beautiful Soup and ingested the stars from each GitHub repository to a Postgres database—seeing the trends for each in Metabase. + - You can also check why each of the tools has been chosen in [The Open Data Stack Distilled into Four Core Tools](https://airbyte.com/blog/modern-open-data-stack-four-core-tools) +- Open Data Stack with dbt and Metabase: [Use-Case: Airbyte Monitoring](https://airbyte.com/blog/airbyte-monitoring-with-dbt-and-metabase) + - In this project, you learn how to implement an Airbyte monitoring dashboard with dbt and Metabase on a locally deployed Airbyte instance. It will allow you to get an operational view of your current running data ingestions and a high-level overview of what happened in the past. -*Open* is so important and often overlooked because it’s what makes the #opendatastack more embeddable with tools from the open data stack such as Airbyte, dbt, Dagster, Superset, and so forth. Letting you integrate them into your services, unlike closed-source services. +Future potential use cases have been listed in an inial [series](https://airbyte.com/blog/building-airbytes-data-stack) on how we built the data stack at Airbyte. + +--- +The Open Data Stack is a "better" term for the [Modern Data Stack](https://glossary.airbyte.com/term/modern-data-stack/) that focuses on solutions built on open-source and open standards covering the [Data Engineering Lifecycle](https://glossary.airbyte.com/term/data-engineering-lifecycle/). Companies can reuse existing battle-tested solutions and build on them instead of reinventing the wheel by re-implementing critical components for each component of the data stack. + +*Open* is so important and often overlooked because it embeds the data stack with tools from the open data stack, such as Airbyte, dbt, Dagster, Superset, Rill, and many more. Unlike closed-source services, it lets you integrate them into your services or products. From 2a6a874dccb8cb43d4a47d23bbdc0029e112e7e5 Mon Sep 17 00:00:00 2001 From: sspaeti Date: Wed, 15 Mar 2023 15:37:13 +0100 Subject: [PATCH 3/5] adding inital rill project --- visualization/rill_data/.gitignore | 4 ++++ visualization/rill_data/dashboards/.gitkeep | 0 visualization/rill_data/models/.gitkeep | 0 visualization/rill_data/models/model.sql | 5 +++++ visualization/rill_data/readme.md | 12 ++++++++++++ visualization/rill_data/rill.yaml | 4 ++++ visualization/rill_data/sources/.gitkeep | 0 7 files changed, 25 insertions(+) create mode 100644 visualization/rill_data/.gitignore create mode 100644 visualization/rill_data/dashboards/.gitkeep create mode 100644 visualization/rill_data/models/.gitkeep create mode 100644 visualization/rill_data/models/model.sql create mode 100644 visualization/rill_data/rill.yaml create mode 100644 visualization/rill_data/sources/.gitkeep diff --git a/visualization/rill_data/.gitignore b/visualization/rill_data/.gitignore new file mode 100644 index 0000000..7eeef89 --- /dev/null +++ b/visualization/rill_data/.gitignore @@ -0,0 +1,4 @@ +# Rill +*.db +*.db.wal +data/ diff --git a/visualization/rill_data/dashboards/.gitkeep b/visualization/rill_data/dashboards/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/visualization/rill_data/models/.gitkeep b/visualization/rill_data/models/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/visualization/rill_data/models/model.sql b/visualization/rill_data/models/model.sql new file mode 100644 index 0000000..3401469 --- /dev/null +++ b/visualization/rill_data/models/model.sql @@ -0,0 +1,5 @@ +select +_airbyte_data->>'user'->>'login', +_airbyte_data->>'repository', +* + from _airbyte_raw_stargazers; diff --git a/visualization/rill_data/readme.md b/visualization/rill_data/readme.md index 3273b25..5155f05 100644 --- a/visualization/rill_data/readme.md +++ b/visualization/rill_data/readme.md @@ -2,6 +2,18 @@ [Rill Data](https://www.rilldata.com/) one of the latest BI tools and based on DuckDB. It's fully interactive and using DuckDB as a caching layer. + +## Open Data Stack in a Box + +Starting rill with the project from this repo but the database where we loaded airbyte sources into it: +```sh +rill start --db /tmp/airbyte_local/ods/stage.db --project . +``` + +✍ Note: Make sure the the location matches the settings in `stargazer.py` in the dagster project - defaults settings will work + + +## [Airbyte Monitoring](https://airbyte.com/blog/airbyte-monitoring-with-dbt-and-metabase) Unfortunately the data cannot be read live from airbyte postgres, but you can copy the data with a simple comand as CSV and read these with Rill. The commands below show you how you could create a snapshot and vizualize these data. diff --git a/visualization/rill_data/rill.yaml b/visualization/rill_data/rill.yaml new file mode 100644 index 0000000..09bdd50 --- /dev/null +++ b/visualization/rill_data/rill.yaml @@ -0,0 +1,4 @@ +compiler: rill-beta +rill_version: 0.20.0 + +name: rill_data diff --git a/visualization/rill_data/sources/.gitkeep b/visualization/rill_data/sources/.gitkeep new file mode 100644 index 0000000..e69de29 From 3d14a719866a39bfbb49a3bdb9e5d0cd9f85eebe Mon Sep 17 00:00:00 2001 From: sspaeti Date: Wed, 15 Mar 2023 15:40:08 +0100 Subject: [PATCH 4/5] adding duckdb --- dagster/readme.md | 1 + .../assets/stargazer.py | 62 ++++++++++++++----- dagster/stargazer/setup.py | 3 +- 3 files changed, 49 insertions(+), 17 deletions(-) diff --git a/dagster/readme.md b/dagster/readme.md index 3389517..e1ac277 100644 --- a/dagster/readme.md +++ b/dagster/readme.md @@ -7,6 +7,7 @@ This use case is documented step by step on [Configure Airbyte Connections with setup virtual env, dependencies: ```bash cd stargazer +python3 -m pip install --upgrade pip #otherwise might have problem with duckdb/dagster pip install -e ".[dev]" ``` diff --git a/dagster/stargazer/assets_modern_data_stack/assets/stargazer.py b/dagster/stargazer/assets_modern_data_stack/assets/stargazer.py index c4aeeee..b8db69c 100644 --- a/dagster/stargazer/assets_modern_data_stack/assets/stargazer.py +++ b/dagster/stargazer/assets_modern_data_stack/assets/stargazer.py @@ -6,10 +6,11 @@ AirbyteSyncMode, load_assets_from_connections, ) -from dagster_airbyte.managed.generated.sources import GithubSource +from dagster_airbyte.managed.generated.sources import FakerSource, GithubSource from dagster_airbyte.managed.generated.destinations import ( LocalJsonDestination, PostgresDestination, + DuckdbDestination, ) from typing import List from dagster_dbt import load_assets_from_dbt_project @@ -21,6 +22,7 @@ import asyncio import aiohttp +from sqlalchemy import schema from ..utils.constants import DBT_PROJECT_DIR @@ -99,37 +101,65 @@ def get_awesome_repo_list() -> str: links = [link.replace("https://github.com/", "") for link in existings_links] # due to timeout limits while airbyte is checking each repo, I limited it here to make this demo work for you - links = links[0:10] + links = links[0:2] # return links as a string with blank space as separator return " ".join(links) +# test_data = FakerSource(name="test_data", count=100) + gh_awesome_de_list_source = GithubSource( name="gh_awesome_de_list", credentials=GithubSource.PATCredentials(AIRBYTE_PERSONAL_GITHUB_TOKEN), start_date="2020-01-01T00:00:00Z", - repository=get_awesome_repo_list(), # "prometheus/haproxy_exporter", + # repository=get_awesome_repo_list(), # "prometheus/haproxy_exporter", + repository="prometheus/haproxy_exporter", page_size_for_large_streams=100, ) -postgres_destination = PostgresDestination( - name="postgres", - host="localhost", - port=5432, - database="postgres", - schema="public", - username="postgres", - password=POSTGRES_PASSWORD, - ssl_mode=PostgresDestination.Disable(), +# postgres_destination = PostgresDestination( +# name="postgres", +# host="localhost", +# port=5432, +# database="postgres", +# schema="public", +# username="postgres", +# password=POSTGRES_PASSWORD, +# ssl_mode=PostgresDestination.Disable(), +# ) + +duckdb_destination = DuckdbDestination( + name="duckdb", + path="/local/ods/stage.db", ) +# stargazer_connection = AirbyteConnection( +# name="fetch_stargazer", +# source=gh_awesome_de_list_source, +# destination=postgres_destination, +# stream_config={"stargazers": AirbyteSyncMode.incremental_append()}, +# normalize_data=False, +# ) + + +# test_connection = AirbyteConnection( +# name="fetch_test_data_duckdb", +# source=test_data, +# destination=duckdb_destination, +# stream_config={ +# "_airbyte_raw_users": AirbyteSyncMode.full_refresh_overwrite(), +# "_airbyte_raw_products": AirbyteSyncMode.full_refresh_overwrite(), +# }, +# normalize_data=False, # True, +# ) + stargazer_connection = AirbyteConnection( - name="fetch_stargazer", + name="fetch_stargazer_duckdb", source=gh_awesome_de_list_source, - destination=postgres_destination, - stream_config={"stargazers": AirbyteSyncMode.incremental_append_dedup()}, - normalize_data=True, + destination=duckdb_destination, + stream_config={"stargazers": AirbyteSyncMode.incremental_append()}, + normalize_data=False, # True, ) airbyte_reconciler = AirbyteManagedElementReconciler( diff --git a/dagster/stargazer/setup.py b/dagster/stargazer/setup.py index 1302133..934715a 100644 --- a/dagster/stargazer/setup.py +++ b/dagster/stargazer/setup.py @@ -18,7 +18,8 @@ "aiohttp", "requests", "beautifulsoup4", - "dbt-duckdb", + "dbt-duckdb==1.4.0", + "duckdb==0.7.0", # this must be aligned with the `destination-duckdb` version of airbyte, otherwise stage.db can't be opened! ], extras_require={"dev": ["dagit", "pytest", "black"]}, ) From 79b0c05db784ddc5ebcfca10d6a25a8730ba0b00 Mon Sep 17 00:00:00 2001 From: sspaeti Date: Thu, 11 May 2023 10:41:53 +0200 Subject: [PATCH 5/5] update latest --- .../assets/stargazer.py | 208 ++++++++++++++++-- .../assets_modern_data_stack/repository.py | 15 +- dagster/stargazer/setup.py | 3 +- dagster/stargazer/stage.db | Bin 0 -> 1323008 bytes .../transformation_dbt/config/profiles.yml | 2 +- .../stargazer/transformation_dbt/profiles.yml | 19 +- .../dashboards/stargazer_dashboard.yaml | 13 ++ visualization/rill_data/models/model.sql | 6 +- .../rill_data/models/rss_de_feeds.sql | 1 + visualization/rill_data/models/salary_de.sql | 1 + visualization/rill_data/models/stargazer.sql | 17 ++ 11 files changed, 237 insertions(+), 48 deletions(-) create mode 100644 dagster/stargazer/stage.db create mode 100644 visualization/rill_data/dashboards/stargazer_dashboard.yaml create mode 100644 visualization/rill_data/models/rss_de_feeds.sql create mode 100644 visualization/rill_data/models/salary_de.sql create mode 100644 visualization/rill_data/models/stargazer.sql diff --git a/dagster/stargazer/assets_modern_data_stack/assets/stargazer.py b/dagster/stargazer/assets_modern_data_stack/assets/stargazer.py index b8db69c..c60e667 100644 --- a/dagster/stargazer/assets_modern_data_stack/assets/stargazer.py +++ b/dagster/stargazer/assets_modern_data_stack/assets/stargazer.py @@ -1,4 +1,4 @@ -from dagster import asset +from dagster import asset, op from dagster_airbyte import ( AirbyteManagedElementReconciler, airbyte_resource, @@ -6,7 +6,11 @@ AirbyteSyncMode, load_assets_from_connections, ) -from dagster_airbyte.managed.generated.sources import FakerSource, GithubSource +from dagster_airbyte.managed.generated.sources import ( + FakerSource, + GithubSource, + RssSource, +) from dagster_airbyte.managed.generated.destinations import ( LocalJsonDestination, PostgresDestination, @@ -15,6 +19,10 @@ from typing import List from dagster_dbt import load_assets_from_dbt_project +import requests +import base64 +import re +from urllib.parse import urlparse from bs4 import BeautifulSoup import os @@ -31,6 +39,75 @@ ) POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "please-set-your-token") +GNEW_API_KEY = os.environ.get("GNEW_API_KEY", "please-set-your-token") +NEWS_API_KEY = os.environ.get("NEWS_API_KEY", "please-set-your-token") + +# feed of DE +rss_list = [ + "https://www.theseattledataguy.com/feed/", + "https://www.dataschool.io/rss/", + # "ttps://roundup.getdbt.com/feed", + # "http://feeds.feedburner.com/JamesSerra", + # "https://pedram.substack.com/feed", + # "http://www.gregreda.com/feeds/all.atom.xml", + # "https://jpmonteiro.substack.com/feed", + # "https://www.confessionsofadataguy.com/feed/", + # "https://petrjanda.substack.com/feed", + # "https://sarahsnewsletter.substack.com/feed", + # "https://newsletter.pragmaticengineer.com/feed", + # "https://stkbailey.substack.com/feed", + # "https://benn.substack.com/feed", + # "https://technically.substack.com/feed", + # "https://sspaeti.com/index.xml", + # "https://www.startdataengineering.com/index.xml", + # "https://seattledataguy.substack.com/feed", + # "https://dataproducts.substack.com/feed", + # "https://blog.sqlauthority.com/feed/", + # "https://www.dataengineeringweekly.com/feed", + # "https://medium.com/feed/@maximebeauchemin", + # "https://www.data-is-plural.com/feed.xml", + # "https://davidsj.substack.com/feed", + # "https://kentgraziano.com/feed/", + # "https://fromanengineersight.substack.com/feed", + # "https://groupby1.substack.com/feed", + # "http://www.cathrinewilhelmsen.net/feed/", + # "https://www.eckerson.com/feed/", + # "https://joereis.substack.com/feed", + # "https://medium.com/feed/@laurengreerbalik", + # "https://www.winwithdata.io/feed", + # "https://letters.moderndatastack.xyz/rss/", + # "https://chollinger.com/blog/index.xml", + # "https://mehdio.substack.com/feed", + # "https://moderndata101.substack.com/feed", + # "http://www.hansmichiels.com/feed/", + # "https://agilebi.guru/feed/", + # "https://medium.com/feed/@karimjdda", + # "https://www.kahandatasolutions.com/blog.rss", + # "https://semanticinsight.wordpress.com/feed/", + # "http://www.tableausoftware.com/taxonomy/term/1920/feed", + # "https://dataespresso.substack.com/feed", + # "https://jonathanneo.substack.com/feed", + # "https://vickiboykis.com/index.xml", + # "https://ownyourdata.ai/wp/feed/", +] + + +def extract_domain(url: str) -> str: + parsed_url = urlparse(url) + # remove special character that are not allowed for dagster asset key + cleaned_string = re.sub( + r"[^A-Za-z0-9_]", "", parsed_url.netloc.replace("www.", "").replace(".", "_") + ) + + return cleaned_string + + +# pylint: redefined-builtin +from typing import List, Optional, Union +import dagster._check as check +from dagster._annotations import public +from dagster_airbyte.managed.types import GeneratedAirbyteSource + airbyte_instance = airbyte_resource.configured( { @@ -79,7 +156,6 @@ def get_awesome_repo_list() -> str: url = "https://github.com/igorbarinov/awesome-data-engineering" html = requests.get(url) soup = BeautifulSoup(html.text, "html.parser") - # parse all links into a list starting with github.com links = [ link.get("href") for link in soup.find_all("a") @@ -95,9 +171,7 @@ def get_awesome_repo_list() -> str: links = [link[:-1] if link.endswith("/") else link for link in links] # remove repos without organization links = [link for link in links if len(link.split("/")) == 5] - # check if links are still existing in parallel to save time existings_links = asyncio.run(check_websites_exists(links)) - # remove `https://github.com/` from links links = [link.replace("https://github.com/", "") for link in existings_links] # due to timeout limits while airbyte is checking each repo, I limited it here to make this demo work for you @@ -129,10 +203,6 @@ def get_awesome_repo_list() -> str: # ssl_mode=PostgresDestination.Disable(), # ) -duckdb_destination = DuckdbDestination( - name="duckdb", - path="/local/ods/stage.db", -) # stargazer_connection = AirbyteConnection( # name="fetch_stargazer", @@ -154,35 +224,129 @@ def get_awesome_repo_list() -> str: # normalize_data=False, # True, # ) +json_awesome_de_list = LocalJsonDestination( + name="json_awesome_de_list", + destination_path=f"/local/ods/json_awesome_de_list/", +) + +duckdb_destination = DuckdbDestination( + name="duckdb", + path="/local/ods/stage.db", +) + stargazer_connection = AirbyteConnection( name="fetch_stargazer_duckdb", source=gh_awesome_de_list_source, - destination=duckdb_destination, + destination=json_awesome_de_list, # duckdb_destination, stream_config={"stargazers": AirbyteSyncMode.incremental_append()}, + normalize_data=False, +) + + +# ############################### +# # simple pattern analysis +# ############################### + + +repo_connection = AirbyteConnection( + name="fetch_stargazer_duckdb", + source=gh_awesome_de_list_source, + destination=duckdb_destination, + stream_config={ + "repositories": AirbyteSyncMode.incremental_append(), + "stargazers": AirbyteSyncMode.incremental_append(), + }, normalize_data=False, # True, ) + +@op +def fetch_readme(repos: list) -> dict: + + GITHUB_API_TOKEN = os.getenv("GITHUB_API_TOKEN") + headers = { + "Accept": "application/vnd.github+json", + "Authorization": "token {}".format(GITHUB_API_TOKEN), + } + + matches_dict = {} + + for repo in repos: + + url = "https://api.github.com/repos/{}/readme".format(repo) + + response = requests.get(url, headers=headers) + content = response.json()["content"] + + # Decode base64 content + readme_content = base64.b64decode(content).decode("utf-8") + + # Extract links and names using regex + pattern = r"\* \[(.+?)\]\((.+?)\)" + matches = re.findall(pattern, readme_content) + + for match in matches: + name, link = match + matches_dict[name] = link + + print(f"Name: {name}\nLink: {link}\n") + + return matches_dict + + +def create_rss_airbyte_connection(name: str, rss_url: str): + rss_source = RssSource(name=f"rss_{name}", url=rss_url) + json_destination = LocalJsonDestination( + name=f"json_{name}", + destination_path=f"/local/ods/json_export/{name}.jsonl", + ) + + connection = AirbyteConnection( + name=f"fetch_{name}", + source=rss_source, + destination=json_destination, # duckdb_destination: Would need to handle JSONL format, for now it will through an "Invalid JSON" error + stream_config={"items": AirbyteSyncMode.full_refresh_overwrite()}, + normalize_data=False, + prefix=f"{name}_", + ) + return connection + + +rss_connections = [] +for i, rss_url in enumerate(rss_list): + readable_name = extract_domain(rss_url) + + i = create_rss_airbyte_connection(readable_name, rss_url) + rss_connections.append(i) + +# @asset( +# description="The metabase dashboard where the stargazers are visualized", +# metadata={"dashboard_url": "http://localhost:3000/dashboard/1-airbyte-sync-status"}, +# ) +# def metabase_dashboard(mart_gh_cumulative): +# return "test" + + +# ################################## +# Dagster Reconciler +# ################################## + airbyte_reconciler = AirbyteManagedElementReconciler( airbyte=airbyte_instance, - connections=[stargazer_connection], + connections=rss_connections + [stargazer_connection], + delete_unmentioned_resources=True, ) # load airbyte connection from above pythonic definitions airbyte_assets = load_assets_from_connections( airbyte=airbyte_instance, - connections=[stargazer_connection], - key_prefix=["postgres"], + connections=[stargazer_connection] + rss_connections, + key_prefix=["ods"], ) # preparing assets bassed on existing dbt project dbt_assets = load_assets_from_dbt_project( - project_dir=DBT_PROJECT_DIR, io_manager_key="db_io_manager", key_prefix="postgres" + project_dir=DBT_PROJECT_DIR, + io_manager_key="db_io_manager", + key_prefix=["ods"], ) - - -# @asset( -# description="The metabase dashboard where the stargazers are visualized", -# metadata={"dashboard_url": "http://localhost:3000/dashboard/1-airbyte-sync-status"}, -# ) -# def metabase_dashboard(mart_gh_cumulative): -# return "test" diff --git a/dagster/stargazer/assets_modern_data_stack/repository.py b/dagster/stargazer/assets_modern_data_stack/repository.py index e146b3a..ef6825a 100644 --- a/dagster/stargazer/assets_modern_data_stack/repository.py +++ b/dagster/stargazer/assets_modern_data_stack/repository.py @@ -16,12 +16,13 @@ def assets_modern_data_stack(): return [ airbyte_assets, - with_resources( - dbt_assets, # load_assets_from_package_module(assets), - resource_defs={ - "dbt": dbt_cli_resource.configured(DBT_CONFIG), - "db_io_manager": db_io_manager.configured(POSTGRES_CONFIG), - }, - ), + ## Disable for now as we use DuckDB and Rill for tranformations + # with_resources( + # dbt_assets, + # resource_defs={ + # "dbt": dbt_cli_resource.configured(DBT_CONFIG), + # "db_io_manager": db_io_manager.configured(POSTGRES_CONFIG), + # }, + # ), # metabase_dashboard, ] diff --git a/dagster/stargazer/setup.py b/dagster/stargazer/setup.py index 934715a..34d9bae 100644 --- a/dagster/stargazer/setup.py +++ b/dagster/stargazer/setup.py @@ -14,8 +14,7 @@ "numpy", "scipy", "dbt-core", - "dbt-postgres", - "aiohttp", + "dbt-postgres" "aiohttp", "requests", "beautifulsoup4", "dbt-duckdb==1.4.0", diff --git a/dagster/stargazer/stage.db b/dagster/stargazer/stage.db new file mode 100644 index 0000000000000000000000000000000000000000..6b16da3495669bceea3757e2aad16d188f33ecf4 GIT binary patch literal 1323008 zcmeI*!EYSZ831sP?F4LMRe?ejps+ziO*n9a3#TZtL?Ac?kv1H_Y`jCV$li4}>yStY z!T}@@mr_MYJy)vKUiu%jQY9`N+G9@~Xw@Ei;}3}YzTJ84jP1p~SZc`clXl*|H{W|R z?>7U|vnS)?uivix;di%xf9AsJmw!4nj@eYSs_zN{1PBlyK!5-N0t5&UAVA>m7x?tp z^Isjmdj6wg%TzY2nBLN^N`L?X0t5&UAV7cs0RjXF+coyLPYEJQUfI)Y4^}bX- zUbW>S|EwU_qhI9TsU9Y%_g!fZ+pGP{F-$b)q2hb5y;jbo@?iQz932YZ0YNOV(H`Y75 zWItW?Je0~o`Ocucc4gw|aUlYFuIlsO2Q?avxwX+4m54@T%pjuC7?&ZU(U^bq(`d{W zDSA9I>N7PpQ{=lR-vMz+^C{&!DTay2xl!&^kCDsRks`jw-FG;ZYu(F(_ORRUEnhDO z*HW1e_GDzHhQ;U@eO>LpR}PkA zw#ys6?$wQQx!bw%a@sLJfBLn>Q?D)2gTj!TrF~?`Caa$)w zX(wXTlP6!8FGlAVn+MY8rSyrI`#iRBiALE`iIh0c2cwc=_UqW@));Oy|N3Rk2~kGW zuT6B!*Di)#Yj=Aw{rn$L`YH$zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oU%|3%vE`pC9?hFMqSJS;btM?b4xY%0+H%5)`Rg%%;upoY>qy6$vlC zaqirfW>o?N2oNAZfB*pk1nw1qgK454i?s0gaF>*nc*{y; z%wkKPf+cY)~dR`{l%N9x2LEo4g~=M1PBlyK!5-N z0+R%+NG8c>CqRGz0RjXF5FkKc{{^f__TShM2oNAZfB*pk1PBn=O(5P=RgYY+zh8Gc zmM^d5tleIa6(j8Of~Q6jMc<&7NP^6`l5QHgVv_i-Yr- z)XQxz4t_317X%0pAV7cs0RjXF+!F$MaS#iP&$i4s@7wpa(_6n7A2}Z|pGTaOAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ;Qv$LM>qfShh>zAX>qlO153kho>PLR6>NB6pl|k7a zmYwB(ub$nijatv%SnuqT{dCpyP$~!IJA?AtmG2!rA1U<8wM%8EQ+DcS66Y8Z`@?=N zDk^TD2s%oK4uSG($v`Id_G#{8mB zL>9T-8Q1eR#Q#}U@`=PTs@wkI^mVoWUO8Bf*)DJNx>q;KDLxdy}sDG zuypp-3yZC@OJ^1@wsy>JonLCj6rZhzZJijUorpnCo_t}x7+u=fJdi#ycEsH0v5hM< z%8p9rcrYp{X1|VYZjIqa)30CEoDgM1{Zd57eCcBLYwd1tr~hF@KS6*10RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfWTcN@Z}%R{OQ=Qe)8+hD(2Fe zFCD5*zsRl4=@hA2%%;upoY>qy6$vlCaqirfW>o?N2oNAZfB*pk1nw1qgK454i?s0gaF>*nc*{y;%wkKPf+cY z)~dR`{l%N9x2LEo4g~=M1PBlyK!5-N0+R%+NG8c>CqRGz0RjXF5FkKc{{^f__TShM z2oNAZfB*pk1PBnABoHf-8;z&7eq5#2pW2!qw~gM3^*#GXuhsS5-AUHk2@oJafB*pk z1PBoLpFq4Tw;s7({~I6Ev0!;6XYKZq-grUpE-&eg-J{4y+gfiNJ^rRhd?Jcss))1M z^DBd+(;m)d+be_fnbpf}f4txHew(gF literal 0 HcmV?d00001 diff --git a/dagster/stargazer/transformation_dbt/config/profiles.yml b/dagster/stargazer/transformation_dbt/config/profiles.yml index 87a783a..c7fed2a 100644 --- a/dagster/stargazer/transformation_dbt/config/profiles.yml +++ b/dagster/stargazer/transformation_dbt/config/profiles.yml @@ -11,5 +11,5 @@ stargazers: # dbname: postgres # schema: public type: duckdb - path: /tmp/airbyte_local/opendatastack-in-a-box.duckdb + path: /tmp/airbyte_local/ods/stage.db schema: main diff --git a/dagster/stargazer/transformation_dbt/profiles.yml b/dagster/stargazer/transformation_dbt/profiles.yml index 87a783a..f9f6596 100644 --- a/dagster/stargazer/transformation_dbt/profiles.yml +++ b/dagster/stargazer/transformation_dbt/profiles.yml @@ -2,14 +2,11 @@ stargazers: target: dev outputs: dev: - # type: postgres - # threads: 1 - # host: localhost - # port: 5432 - # user: postgres - # pass: password - # dbname: postgres - # schema: public - type: duckdb - path: /tmp/airbyte_local/opendatastack-in-a-box.duckdb - schema: main + type: postgres + threads: 1 + host: localhost + port: 5432 + user: postgres + pass: password + dbname: postgres + schema: public diff --git a/visualization/rill_data/dashboards/stargazer_dashboard.yaml b/visualization/rill_data/dashboards/stargazer_dashboard.yaml new file mode 100644 index 0000000..6f07752 --- /dev/null +++ b/visualization/rill_data/dashboards/stargazer_dashboard.yaml @@ -0,0 +1,13 @@ +# Visit https://docs.rilldata.com/references/project-files to learn more about Rill project files. + +display_name: "stargazer_dashboard" +model: "stargazer" +default_time_range: "" +smallest_time_grain: "" +timeseries: "starred_at" +measures: + - label: Total records + expression: count(*) + description: Total number of records present + format_preset: humanize +dimensions: [] diff --git a/visualization/rill_data/models/model.sql b/visualization/rill_data/models/model.sql index 3401469..9d83163 100644 --- a/visualization/rill_data/models/model.sql +++ b/visualization/rill_data/models/model.sql @@ -1,5 +1 @@ -select -_airbyte_data->>'user'->>'login', -_airbyte_data->>'repository', -* - from _airbyte_raw_stargazers; +from information_schema.tables \ No newline at end of file diff --git a/visualization/rill_data/models/rss_de_feeds.sql b/visualization/rill_data/models/rss_de_feeds.sql new file mode 100644 index 0000000..73ac3c1 --- /dev/null +++ b/visualization/rill_data/models/rss_de_feeds.sql @@ -0,0 +1 @@ +SELECT count(_airbyte_data.title) FROM read_json_auto('/tmp/airbyte_local/ods/json_export/*/*.jsonl') \ No newline at end of file diff --git a/visualization/rill_data/models/salary_de.sql b/visualization/rill_data/models/salary_de.sql new file mode 100644 index 0000000..7d89fa7 --- /dev/null +++ b/visualization/rill_data/models/salary_de.sql @@ -0,0 +1 @@ +from '/Users/sspaeti/Documents/git/work/open-data-stack.git/simon/ods-in-a-box/datasets/DataEngineer.csv' \ No newline at end of file diff --git a/visualization/rill_data/models/stargazer.sql b/visualization/rill_data/models/stargazer.sql new file mode 100644 index 0000000..856b7b1 --- /dev/null +++ b/visualization/rill_data/models/stargazer.sql @@ -0,0 +1,17 @@ + + +SELECT +_airbyte_data.starred_at, 1 as count +from read_json_auto('/tmp/airbyte_local/ods/awesome-list/_airbyte_raw_stargazers.jsonl') + + + +/*SELECT +_airbyte_data.* + +from read_json_auto('/tmp/airbyte_local/ods/export_json/_airbyte_raw_users.jsonl') +left outer join read_json_auto('/tmp/airbyte_local/ods/export_json/_airbyte_raw_users.jsonl') + +--from read_ndjson_objects('/tmp/airbyte_local/ods/export_json/_airbyte_raw_users.jsonl') +*/ +