Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
# Open Data Stack Projects

This repo contains different example project related to the open data stack. So far it contains these two examples:
- [Airbyte Monitoring with dbt and Metabase](https://airbyte.com/blog/airbyte-monitoring-with-dbt-and-metabase)
- [Configure Airbyte Connections with Python (Dagster)](https://airbyte.com/tutorials/configure-airbyte-with-python-dagster)
- Future potential use-cases: [Series: Building Airbyte’s Data Stack](https://airbyte.com/blog/building-airbytes-data-stack)
The idea is to have an extendable repository for reference projects with the [Open Data Stack](https://glossary.airbyte.com/term/open-data-stack/). It should be possible to add different BI tools or another orchestrator.

The [Open Data Stack](https://glossary.airbyte.com/term/open-data-stack/) is a better term for the [Modern Data Stack](https://glossary.airbyte.com/term/modern-data-stack/) but focuses on solutions built on open-source and open standards covering the Data Engineering Lifecycle. The open data stack is maintained by everyone using it. Companies can reuse existing battle-tested solutions and build on them instead of reinventing the wheel by re-implementing critical components for each component of the data stack.
So far, this repo contains different these two projects:
- Open Data Stack with Airbyte, dbt, dagster, Metabase: [Configure Airbyte Connections with Python (Dagster)](https://airbyte.com/tutorials/configure-airbyte-with-python-dagster)
- The project scrapes the [Awesome Data Engineering List](https://github.com/igorbarinov/awesome-data-engineering) links with Beautiful Soup and ingested the stars from each GitHub repository to a Postgres database—seeing the trends for each in Metabase.
- You can also check why each of the tools has been chosen in [The Open Data Stack Distilled into Four Core Tools](https://airbyte.com/blog/modern-open-data-stack-four-core-tools)
- Open Data Stack with dbt and Metabase: [Use-Case: Airbyte Monitoring](https://airbyte.com/blog/airbyte-monitoring-with-dbt-and-metabase)
- In this project, you learn how to implement an Airbyte monitoring dashboard with dbt and Metabase on a locally deployed Airbyte instance. It will allow you to get an operational view of your current running data ingestions and a high-level overview of what happened in the past.

*Open* is so important and often overlooked because it’s what makes the #opendatastack more embeddable with tools from the open data stack such as Airbyte, dbt, Dagster, Superset, and so forth. Letting you integrate them into your services, unlike closed-source services.
Future potential use cases have been listed in an inial [series](https://airbyte.com/blog/building-airbytes-data-stack) on how we built the data stack at Airbyte.

---
The Open Data Stack is a "better" term for the [Modern Data Stack](https://glossary.airbyte.com/term/modern-data-stack/) that focuses on solutions built on open-source and open standards covering the [Data Engineering Lifecycle](https://glossary.airbyte.com/term/data-engineering-lifecycle/). Companies can reuse existing battle-tested solutions and build on them instead of reinventing the wheel by re-implementing critical components for each component of the data stack.

*Open* is so important and often overlooked because it embeds the data stack with tools from the open data stack, such as Airbyte, dbt, Dagster, Superset, Rill, and many more. Unlike closed-source services, it lets you integrate them into your services or products.
1 change: 1 addition & 0 deletions dagster/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This use case is documented step by step on [Configure Airbyte Connections with
setup virtual env, dependencies:
```bash
cd stargazer
python3 -m pip install --upgrade pip #otherwise might have problem with duckdb/dagster
pip install -e ".[dev]"
```

Expand Down
258 changes: 226 additions & 32 deletions dagster/stargazer/assets_modern_data_stack/assets/stargazer.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,36 @@
from dagster import asset
from dagster import asset, op
from dagster_airbyte import (
AirbyteManagedElementReconciler,
airbyte_resource,
AirbyteConnection,
AirbyteSyncMode,
load_assets_from_connections,
)
from dagster_airbyte.managed.generated.sources import GithubSource
from dagster_airbyte.managed.generated.sources import (
FakerSource,
GithubSource,
RssSource,
)
from dagster_airbyte.managed.generated.destinations import (
LocalJsonDestination,
PostgresDestination,
DuckdbDestination,
)
from typing import List
from dagster_dbt import load_assets_from_dbt_project

import requests
import base64
import re
from urllib.parse import urlparse

from bs4 import BeautifulSoup
import os
import requests

import asyncio
import aiohttp
from sqlalchemy import schema
from ..utils.constants import DBT_PROJECT_DIR


Expand All @@ -29,6 +39,75 @@
)
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "please-set-your-token")

GNEW_API_KEY = os.environ.get("GNEW_API_KEY", "please-set-your-token")
NEWS_API_KEY = os.environ.get("NEWS_API_KEY", "please-set-your-token")

# feed of DE
rss_list = [
"https://www.theseattledataguy.com/feed/",
"https://www.dataschool.io/rss/",
# "ttps://roundup.getdbt.com/feed",
# "http://feeds.feedburner.com/JamesSerra",
# "https://pedram.substack.com/feed",
# "http://www.gregreda.com/feeds/all.atom.xml",
# "https://jpmonteiro.substack.com/feed",
# "https://www.confessionsofadataguy.com/feed/",
# "https://petrjanda.substack.com/feed",
# "https://sarahsnewsletter.substack.com/feed",
# "https://newsletter.pragmaticengineer.com/feed",
# "https://stkbailey.substack.com/feed",
# "https://benn.substack.com/feed",
# "https://technically.substack.com/feed",
# "https://sspaeti.com/index.xml",
# "https://www.startdataengineering.com/index.xml",
# "https://seattledataguy.substack.com/feed",
# "https://dataproducts.substack.com/feed",
# "https://blog.sqlauthority.com/feed/",
# "https://www.dataengineeringweekly.com/feed",
# "https://medium.com/feed/@maximebeauchemin",
# "https://www.data-is-plural.com/feed.xml",
# "https://davidsj.substack.com/feed",
# "https://kentgraziano.com/feed/",
# "https://fromanengineersight.substack.com/feed",
# "https://groupby1.substack.com/feed",
# "http://www.cathrinewilhelmsen.net/feed/",
# "https://www.eckerson.com/feed/",
# "https://joereis.substack.com/feed",
# "https://medium.com/feed/@laurengreerbalik",
# "https://www.winwithdata.io/feed",
# "https://letters.moderndatastack.xyz/rss/",
# "https://chollinger.com/blog/index.xml",
# "https://mehdio.substack.com/feed",
# "https://moderndata101.substack.com/feed",
# "http://www.hansmichiels.com/feed/",
# "https://agilebi.guru/feed/",
# "https://medium.com/feed/@karimjdda",
# "https://www.kahandatasolutions.com/blog.rss",
# "https://semanticinsight.wordpress.com/feed/",
# "http://www.tableausoftware.com/taxonomy/term/1920/feed",
# "https://dataespresso.substack.com/feed",
# "https://jonathanneo.substack.com/feed",
# "https://vickiboykis.com/index.xml",
# "https://ownyourdata.ai/wp/feed/",
]


def extract_domain(url: str) -> str:
parsed_url = urlparse(url)
# remove special character that are not allowed for dagster asset key
cleaned_string = re.sub(
r"[^A-Za-z0-9_]", "", parsed_url.netloc.replace("www.", "").replace(".", "_")
)

return cleaned_string


# pylint: redefined-builtin
from typing import List, Optional, Union
import dagster._check as check
from dagster._annotations import public
from dagster_airbyte.managed.types import GeneratedAirbyteSource


airbyte_instance = airbyte_resource.configured(
{
Expand Down Expand Up @@ -77,7 +156,6 @@ def get_awesome_repo_list() -> str:
url = "https://github.com/igorbarinov/awesome-data-engineering"
html = requests.get(url)
soup = BeautifulSoup(html.text, "html.parser")
# parse all links into a list starting with github.com
links = [
link.get("href")
for link in soup.find_all("a")
Expand All @@ -93,66 +171,182 @@ def get_awesome_repo_list() -> str:
links = [link[:-1] if link.endswith("/") else link for link in links]
# remove repos without organization
links = [link for link in links if len(link.split("/")) == 5]
# check if links are still existing in parallel to save time
existings_links = asyncio.run(check_websites_exists(links))
# remove `https://github.com/` from links
links = [link.replace("https://github.com/", "") for link in existings_links]

# due to timeout limits while airbyte is checking each repo, I limited it here to make this demo work for you
links = links[0:10]
links = links[0:2]

# return links as a string with blank space as separator
return " ".join(links)


# test_data = FakerSource(name="test_data", count=100)

gh_awesome_de_list_source = GithubSource(
name="gh_awesome_de_list",
credentials=GithubSource.PATCredentials(AIRBYTE_PERSONAL_GITHUB_TOKEN),
start_date="2020-01-01T00:00:00Z",
repository=get_awesome_repo_list(), # "prometheus/haproxy_exporter",
# repository=get_awesome_repo_list(), # "prometheus/haproxy_exporter",
repository="prometheus/haproxy_exporter",
page_size_for_large_streams=100,
)

postgres_destination = PostgresDestination(
name="postgres",
host="localhost",
port=5432,
database="postgres",
schema="public",
username="postgres",
password=POSTGRES_PASSWORD,
ssl_mode=PostgresDestination.Disable(),
# postgres_destination = PostgresDestination(
# name="postgres",
# host="localhost",
# port=5432,
# database="postgres",
# schema="public",
# username="postgres",
# password=POSTGRES_PASSWORD,
# ssl_mode=PostgresDestination.Disable(),
# )


# stargazer_connection = AirbyteConnection(
# name="fetch_stargazer",
# source=gh_awesome_de_list_source,
# destination=postgres_destination,
# stream_config={"stargazers": AirbyteSyncMode.incremental_append()},
# normalize_data=False,
# )


# test_connection = AirbyteConnection(
# name="fetch_test_data_duckdb",
# source=test_data,
# destination=duckdb_destination,
# stream_config={
# "_airbyte_raw_users": AirbyteSyncMode.full_refresh_overwrite(),
# "_airbyte_raw_products": AirbyteSyncMode.full_refresh_overwrite(),
# },
# normalize_data=False, # True,
# )

json_awesome_de_list = LocalJsonDestination(
name="json_awesome_de_list",
destination_path=f"/local/ods/json_awesome_de_list/",
)

duckdb_destination = DuckdbDestination(
name="duckdb",
path="/local/ods/stage.db",
)

stargazer_connection = AirbyteConnection(
name="fetch_stargazer",
name="fetch_stargazer_duckdb",
source=gh_awesome_de_list_source,
destination=postgres_destination,
stream_config={"stargazers": AirbyteSyncMode.incremental_append_dedup()},
normalize_data=True,
destination=json_awesome_de_list, # duckdb_destination,
stream_config={"stargazers": AirbyteSyncMode.incremental_append()},
normalize_data=False,
)


# ###############################
# # simple pattern analysis
# ###############################


repo_connection = AirbyteConnection(
name="fetch_stargazer_duckdb",
source=gh_awesome_de_list_source,
destination=duckdb_destination,
stream_config={
"repositories": AirbyteSyncMode.incremental_append(),
"stargazers": AirbyteSyncMode.incremental_append(),
},
normalize_data=False, # True,
)


@op
def fetch_readme(repos: list) -> dict:

GITHUB_API_TOKEN = os.getenv("GITHUB_API_TOKEN")
headers = {
"Accept": "application/vnd.github+json",
"Authorization": "token {}".format(GITHUB_API_TOKEN),
}

matches_dict = {}

for repo in repos:

url = "https://api.github.com/repos/{}/readme".format(repo)

response = requests.get(url, headers=headers)
content = response.json()["content"]

# Decode base64 content
readme_content = base64.b64decode(content).decode("utf-8")

# Extract links and names using regex
pattern = r"\* \[(.+?)\]\((.+?)\)"
matches = re.findall(pattern, readme_content)

for match in matches:
name, link = match
matches_dict[name] = link

print(f"Name: {name}\nLink: {link}\n")

return matches_dict


def create_rss_airbyte_connection(name: str, rss_url: str):
rss_source = RssSource(name=f"rss_{name}", url=rss_url)
json_destination = LocalJsonDestination(
name=f"json_{name}",
destination_path=f"/local/ods/json_export/{name}.jsonl",
)

connection = AirbyteConnection(
name=f"fetch_{name}",
source=rss_source,
destination=json_destination, # duckdb_destination: Would need to handle JSONL format, for now it will through an "Invalid JSON" error
stream_config={"items": AirbyteSyncMode.full_refresh_overwrite()},
normalize_data=False,
prefix=f"{name}_",
)
return connection


rss_connections = []
for i, rss_url in enumerate(rss_list):
readable_name = extract_domain(rss_url)

i = create_rss_airbyte_connection(readable_name, rss_url)
rss_connections.append(i)

# @asset(
# description="The metabase dashboard where the stargazers are visualized",
# metadata={"dashboard_url": "http://localhost:3000/dashboard/1-airbyte-sync-status"},
# )
# def metabase_dashboard(mart_gh_cumulative):
# return "test"


# ##################################
# Dagster Reconciler
# ##################################

airbyte_reconciler = AirbyteManagedElementReconciler(
airbyte=airbyte_instance,
connections=[stargazer_connection],
connections=rss_connections + [stargazer_connection],
delete_unmentioned_resources=True,
)

# load airbyte connection from above pythonic definitions
airbyte_assets = load_assets_from_connections(
airbyte=airbyte_instance,
connections=[stargazer_connection],
key_prefix=["postgres"],
connections=[stargazer_connection] + rss_connections,
key_prefix=["ods"],
)

# preparing assets bassed on existing dbt project
dbt_assets = load_assets_from_dbt_project(
project_dir=DBT_PROJECT_DIR, io_manager_key="db_io_manager", key_prefix="postgres"
project_dir=DBT_PROJECT_DIR,
io_manager_key="db_io_manager",
key_prefix=["ods"],
)


# @asset(
# description="The metabase dashboard where the stargazers are visualized",
# metadata={"dashboard_url": "http://localhost:3000/dashboard/1-airbyte-sync-status"},
# )
# def metabase_dashboard(mart_gh_cumulative):
# return "test"
15 changes: 8 additions & 7 deletions dagster/stargazer/assets_modern_data_stack/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
def assets_modern_data_stack():
return [
airbyte_assets,
with_resources(
dbt_assets, # load_assets_from_package_module(assets),
resource_defs={
"dbt": dbt_cli_resource.configured(DBT_CONFIG),
"db_io_manager": db_io_manager.configured(POSTGRES_CONFIG),
},
),
## Disable for now as we use DuckDB and Rill for tranformations
# with_resources(
# dbt_assets,
# resource_defs={
# "dbt": dbt_cli_resource.configured(DBT_CONFIG),
# "db_io_manager": db_io_manager.configured(POSTGRES_CONFIG),
# },
# ),
# metabase_dashboard,
]
Loading