From 816a055fd6f04084aac63b64803aeb1425b2dce5 Mon Sep 17 00:00:00 2001 From: Bharath Vedartham Date: Fri, 12 Dec 2025 20:39:29 +0530 Subject: [PATCH 1/4] add client versions notebook --- README.md | 4 +- _quarto.yml | 8 +- index.qmd | 4 + notebooks/04-client-versions.qmd | 205 +++++++++++++++++++++++++++ queries/__init__.py | 3 + queries/client_versions.py | 52 +++++++ scripts/fetch_data.py | 2 + scripts/generate_historical_index.py | 1 + scripts/prepare_publish.py | 1 + scripts/render_historical.py | 1 + 10 files changed, 275 insertions(+), 6 deletions(-) create mode 100644 notebooks/04-client-versions.qmd create mode 100644 queries/client_versions.py diff --git a/README.md b/README.md index 8de06b3..33b3ac9 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ quarto preview | [01-blob-inclusion](notebooks/01-blob-inclusion.qmd) | Blob inclusion patterns per block and epoch | | [02-blob-flow](notebooks/02-blob-flow.qmd) | Blob flow across validators, builders, and relays | | [03-column-propagation](notebooks/03-column-propagation.qmd)| Column propagation timing across 128 data columns | +| [04-client-versions](notebooks/04-client-versions.qmd) | Consensus client version distribution | ## Architecture @@ -41,7 +42,8 @@ quarto preview ├── queries/ # Query layer (fetch + write to Parquet) │ ├── blob_inclusion.py # fetch_blobs_per_slot(), fetch_blocks_blob_epoch(), ... │ ├── blob_flow.py # fetch_proposer_blobs() -│ └── column_propagation.py # fetch_col_first_seen() +│ ├── column_propagation.py # fetch_col_first_seen() +│ └── client_versions.py # fetch_client_versions() ├── scripts/ │ ├── fetch_data.py # CLI for data fetching │ ├── generate_archive.py # Generates archive.qmd for site diff --git a/_quarto.yml b/_quarto.yml index bcf684a..e9d79af 100644 --- a/_quarto.yml +++ b/_quarto.yml @@ -15,7 +15,7 @@ website: contents: - text: Introduction href: index.qmd - - section: '2025-12-09' + - section: '2025-12-11' contents: - text: Blob inclusion href: notebooks/01-blob-inclusion.qmd @@ -23,10 +23,8 @@ website: href: notebooks/02-blob-flow.qmd - text: Column propagation href: notebooks/03-column-propagation.qmd - - section: Historical - contents: - - text: '2025-12-08' - href: 20251208/index.qmd + - text: Client versions + href: notebooks/04-client-versions.qmd format: html: theme: diff --git a/index.qmd b/index.qmd index cb90a50..dae7e54 100644 --- a/index.qmd +++ b/index.qmd @@ -12,6 +12,10 @@ A collection of notebooks analyzing P2P dynamics in Ethereum networks. Currently - [Blob flow](notebooks/02-blob-flow.qmd): Flow diagrams tracing blob packing per entities, builders, and relays. - [Column propagation](notebooks/03-column-propagation.qmd): Column propagation timing across 128 data columns subnets. +**Network** + +- [Client versions](notebooks/04-client-versions.qmd): Consensus client version distribution across the network. + ## Generation **Frequency: daily at 1am UTC, archiving the last 30 days.** diff --git a/notebooks/04-client-versions.qmd b/notebooks/04-client-versions.qmd new file mode 100644 index 0000000..39598bf --- /dev/null +++ b/notebooks/04-client-versions.qmd @@ -0,0 +1,205 @@ +--- +title: "Client Versions" +--- + +Analysis of consensus client versions connected to xatu nodes on Ethereum mainnet. + +```{python} +#| tags: [parameters] +target_date = None # Set via papermill, or auto-detect from manifest +``` + +```{python} +import pandas as pd +import numpy as np +import plotly.express as px +import plotly.graph_objects as go +from plotly.subplots import make_subplots + +from loaders import load_parquet +``` + +```{python} +# Load client versions data +df = load_parquet("client_versions", target_date) + +# Fill missing values +df["remote_agent_implementation"] = df["remote_agent_implementation"].fillna("unknown") +df["remote_agent_version"] = df["remote_agent_version"].fillna("unknown") + +print(f"Total connections: {len(df):,}") +print(f"Unique peer IDs: {df['remote_peer_id'].nunique():,}") +print(f"Unique client implementations: {df['remote_agent_implementation'].nunique()}") +``` + +## Client Implementation Distribution + +Distribution of consensus client implementations observed across all connections. This shows the diversity of the Ethereum validator client ecosystem. + +```{python} +# Known consensus clients to track individually +KNOWN_CLIENTS = {"lighthouse", "teku", "nimbus", "erigon", "grandine", "lodestar", "prysm"} + +# Filter out unknown implementations for cleaner visualization +df_known = df[df["remote_agent_implementation"] != "unknown"].copy() + +# Map non-standard clients to "Others" +df_known["client"] = df_known["remote_agent_implementation"].apply( + lambda x: x if x.lower() in KNOWN_CLIENTS else "Others" +) + +# Count by implementation +impl_counts = df_known.groupby("client").size().reset_index(name="count") +impl_counts = impl_counts.sort_values("count", ascending=False) + +fig = px.pie( + impl_counts, + values="count", + names="client", + title="Client Implementation Distribution", + color_discrete_sequence=px.colors.qualitative.Set2, +) +fig.update_traces(textposition="inside", textinfo="percent+label") +fig.update_layout(height=500) +fig.show() +``` + +## Connections by Client Implementation + +Bar chart showing the number of connections per client implementation. + +```{python} +fig = px.bar( + impl_counts, + x="client", + y="count", + title="Connections by Client Implementation", + labels={"client": "Client", "count": "Connections"}, + color="client", + color_discrete_sequence=px.colors.qualitative.Set2, +) +fig.update_layout( + showlegend=False, + height=500, + xaxis_tickangle=-45, +) +fig.show() +``` + +## Client Versions Over Time + +Stacked area chart showing how client connections are distributed across implementations over time. + +```{python} +# Group by hour and implementation +df_known["hour"] = pd.to_datetime(df_known["event_date_time"]).dt.floor("h") +hourly_impl = df_known.groupby(["hour", "client"]).size().reset_index(name="count") + +# Pivot for stacked area +hourly_pivot = hourly_impl.pivot(index="hour", columns="client", values="count").fillna(0) + +fig = go.Figure() +for col in hourly_pivot.columns: + fig.add_trace(go.Scatter( + x=hourly_pivot.index, + y=hourly_pivot[col], + mode="lines", + stackgroup="one", + name=col, + )) + +fig.update_layout( + title="Client Implementation Connections Over Time", + xaxis_title="Time", + yaxis_title="Connections", + height=500, + legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), +) +fig.show() +``` + +## Version Distribution by Client + +Detailed breakdown of version distribution for each major client implementation. + +```{python} +# Get known clients (exclude "Others" for version breakdown) +top_clients = [c for c in impl_counts["client"].tolist() if c != "Others"] + +for client in top_clients: + df_client = df_known[df_known["client"] == client] + + if len(df_client) == 0: + continue + + # Count by version + version_counts = df_client.groupby("remote_agent_version").size().reset_index(name="count") + version_counts = version_counts.sort_values("count", ascending=False).head(15) + + fig = px.bar( + version_counts, + x="remote_agent_version", + y="count", + title=f"{client.capitalize()} Version Distribution", + labels={"remote_agent_version": "Version", "count": "Connections"}, + color_discrete_sequence=[px.colors.qualitative.Set2[top_clients.index(client) % len(px.colors.qualitative.Set2)]], + ) + fig.update_layout( + height=400, + xaxis_tickangle=-45, + ) + fig.show() +``` + +## Geographic Distribution + +Distribution of connections by country for known client implementations. + +```{python} +# Filter for known countries +df_geo = df_known[df_known["node_country"].notna() & (df_known["node_country"] != "")] + +if len(df_geo) > 0: + country_counts = df_geo.groupby("node_country").size().reset_index(name="count") + country_counts = country_counts.sort_values("count", ascending=False).head(20) + + fig = px.bar( + country_counts, + x="node_country", + y="count", + title="Connections by Country (Top 20)", + labels={"node_country": "Country", "count": "Connections"}, + color_discrete_sequence=["#636EFA"], + ) + fig.update_layout( + height=500, + xaxis_tickangle=-45, + ) + fig.show() +else: + print("No geographic data available") +``` + +## Summary Statistics + +```{python} +# Summary table +summary_data = [] + +for client in df_known["client"].unique(): + df_client = df_known[df_known["client"] == client] + summary_data.append({ + "Client": client, + "Connections": len(df_client), + "Unique Peers": df_client["remote_peer_id"].nunique(), + "Versions": df_client["remote_agent_version"].nunique(), + "Top Version": df_client["remote_agent_version"].mode().iloc[0] if len(df_client) > 0 else "N/A", + }) + +summary_df = pd.DataFrame(summary_data) +summary_df = summary_df.sort_values("Connections", ascending=False) +summary_df["Connections"] = summary_df["Connections"].apply(lambda x: f"{x:,}") +summary_df["Unique Peers"] = summary_df["Unique Peers"].apply(lambda x: f"{x:,}") +summary_df +``` + diff --git a/queries/__init__.py b/queries/__init__.py index a767c6d..1b7d99e 100644 --- a/queries/__init__.py +++ b/queries/__init__.py @@ -12,6 +12,7 @@ ) from queries.blob_flow import fetch_proposer_blobs from queries.column_propagation import fetch_col_first_seen, NUM_COLUMNS +from queries.client_versions import fetch_client_versions __all__ = [ # Blob inclusion @@ -24,4 +25,6 @@ # Column propagation "fetch_col_first_seen", "NUM_COLUMNS", + # Client versions + "fetch_client_versions", ] diff --git a/queries/client_versions.py b/queries/client_versions.py new file mode 100644 index 0000000..d5b3a35 --- /dev/null +++ b/queries/client_versions.py @@ -0,0 +1,52 @@ +""" +Fetch functions for client versions analysis. + +Each function executes SQL and writes directly to Parquet. +""" + +from pathlib import Path + + +def _get_date_filter(target_date: str, column: str = "event_date_time") -> str: + """Generate SQL date filter for a specific date.""" + return f"{column} >= '{target_date}' AND {column} < '{target_date}'::date + INTERVAL 1 DAY" + + +def fetch_client_versions( + client, + target_date: str, + output_path: Path, + network: str = "mainnet", +) -> int: + """Fetch client versions data from libp2p_connected and write to Parquet. + + This captures the clients and versions connected to xatu nodes. + + Returns row count. + """ + date_filter = _get_date_filter(target_date) + + query = f""" +SELECT + meta_client_name AS node_name, + meta_client_geo_country AS node_country, + meta_client_version AS node_client_version, + meta_client_implementation AS node_client_implementation, + meta_client_id AS node_client_id, + remote_agent_implementation, + remote_agent_version, + remote_agent_version_major, + remote_agent_version_minor, + remote_agent_version_patch, + event_date_time, + remote_peer_id_unique_key AS remote_peer_id +FROM default.libp2p_connected FINAL +WHERE {date_filter} + AND meta_network_name = '{network}' +""" + + df = client.query_df(query) + output_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(output_path, index=False) + return len(df) + diff --git a/scripts/fetch_data.py b/scripts/fetch_data.py index 06e0855..714ff22 100644 --- a/scripts/fetch_data.py +++ b/scripts/fetch_data.py @@ -29,6 +29,7 @@ fetch_slot_in_epoch, fetch_proposer_blobs, fetch_col_first_seen, + fetch_client_versions, ) # List of (name, fetcher) tuples @@ -39,6 +40,7 @@ ("slot_in_epoch", fetch_slot_in_epoch), ("proposer_blobs", fetch_proposer_blobs), ("col_first_seen", fetch_col_first_seen), + ("client_versions", fetch_client_versions), ] diff --git a/scripts/generate_historical_index.py b/scripts/generate_historical_index.py index d537037..a133987 100644 --- a/scripts/generate_historical_index.py +++ b/scripts/generate_historical_index.py @@ -29,6 +29,7 @@ def generate_index(date: str, output_dir: Path):
  • Blob Inclusion
  • Blob Flow
  • Column Propagation
  • +
  • Client Versions
  • diff --git a/scripts/prepare_publish.py b/scripts/prepare_publish.py index 63fa426..b429c2e 100644 --- a/scripts/prepare_publish.py +++ b/scripts/prepare_publish.py @@ -20,6 +20,7 @@ ("01-blob-inclusion", "Blob inclusion"), ("02-blob-flow", "Blob flow"), ("03-column-propagation", "Column propagation"), + ("04-client-versions", "Client versions"), ] DATA_ROOT = Path("notebooks/data") diff --git a/scripts/render_historical.py b/scripts/render_historical.py index fdfc178..bc26257 100644 --- a/scripts/render_historical.py +++ b/scripts/render_historical.py @@ -14,6 +14,7 @@ "01-blob-inclusion", "02-blob-flow", "03-column-propagation", + "04-client-versions", ] DATA_ROOT = Path("notebooks/data") From ca0548e0a7fce5cc2ade7da65bf56b0bf69035fd Mon Sep 17 00:00:00 2001 From: Bharath Vedartham Date: Fri, 12 Dec 2025 20:46:52 +0530 Subject: [PATCH 2/4] remove country distributions --- notebooks/04-client-versions.qmd | 31 +------------------------------ 1 file changed, 1 insertion(+), 30 deletions(-) diff --git a/notebooks/04-client-versions.qmd b/notebooks/04-client-versions.qmd index 39598bf..364760f 100644 --- a/notebooks/04-client-versions.qmd +++ b/notebooks/04-client-versions.qmd @@ -2,7 +2,7 @@ title: "Client Versions" --- -Analysis of consensus client versions connected to xatu nodes on Ethereum mainnet. +Analysis of consensus client versions connected to Xatu nodes on Ethereum mainnet. ```{python} #| tags: [parameters] @@ -151,35 +151,6 @@ for client in top_clients: fig.show() ``` -## Geographic Distribution - -Distribution of connections by country for known client implementations. - -```{python} -# Filter for known countries -df_geo = df_known[df_known["node_country"].notna() & (df_known["node_country"] != "")] - -if len(df_geo) > 0: - country_counts = df_geo.groupby("node_country").size().reset_index(name="count") - country_counts = country_counts.sort_values("count", ascending=False).head(20) - - fig = px.bar( - country_counts, - x="node_country", - y="count", - title="Connections by Country (Top 20)", - labels={"node_country": "Country", "count": "Connections"}, - color_discrete_sequence=["#636EFA"], - ) - fig.update_layout( - height=500, - xaxis_tickangle=-45, - ) - fig.show() -else: - print("No geographic data available") -``` - ## Summary Statistics ```{python} From 27c5cc2f7bd2e6b500b51c52030d689dae54e153 Mon Sep 17 00:00:00 2001 From: raulk Date: Tue, 23 Dec 2025 17:20:19 +0000 Subject: [PATCH 3/4] Refactor client-versions: push aggregation to ClickHouse MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split single raw-data query into three server-side aggregated queries: - client_hourly: hourly connection counts by client (for time series) - client_version_dist: top 15 versions per client (for version charts) - client_summary: unique peers, version count, top version per client Benefits: - Reduces data transfer from millions of rows to ~300 aggregated rows - Moves client categorization (CASE statement) to SQL - Uses ClickHouse's uniqExact() and argMax() for accurate stats - Simplifies notebook code by removing in-memory aggregation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- notebooks/04-client-versions.ipynb | 108 ++++++++++--------------- pipeline.yaml | 24 ++++-- queries/__init__.py | 10 ++- queries/client_versions.py | 122 +++++++++++++++++++++++++---- 4 files changed, 174 insertions(+), 90 deletions(-) diff --git a/notebooks/04-client-versions.ipynb b/notebooks/04-client-versions.ipynb index 30fa584..a2c4165 100644 --- a/notebooks/04-client-versions.ipynb +++ b/notebooks/04-client-versions.ipynb @@ -23,10 +23,8 @@ "metadata": {}, "source": [ "import pandas as pd\n", - "import numpy as np\n", "import plotly.express as px\n", "import plotly.graph_objects as go\n", - "from plotly.subplots import make_subplots\n", "\n", "from loaders import load_parquet" ], @@ -37,16 +35,24 @@ "cell_type": "code", "metadata": {}, "source": [ - "# Load client versions data\n", - "df = load_parquet(\"client_versions\", target_date)\n", - "\n", - "# Fill missing values\n", - "df[\"remote_agent_implementation\"] = df[\"remote_agent_implementation\"].fillna(\"unknown\")\n", - "df[\"remote_agent_version\"] = df[\"remote_agent_version\"].fillna(\"unknown\")\n", - "\n", - "print(f\"Total connections: {len(df):,}\")\n", - "print(f\"Unique peer IDs: {df['remote_peer_id'].nunique():,}\")\n", - "print(f\"Unique client implementations: {df['remote_agent_implementation'].nunique()}\")" + "# Load pre-aggregated data\n", + "df_hourly = load_parquet(\"client_hourly\", target_date)\n", + "df_versions = load_parquet(\"client_version_dist\", target_date)\n", + "df_summary = load_parquet(\"client_summary\", target_date)\n", + "\n", + "# Filter out unknown clients for visualizations\n", + "df_hourly_known = df_hourly[df_hourly[\"client\"] != \"unknown\"]\n", + "df_versions_known = df_versions[df_versions[\"client\"] != \"unknown\"]\n", + "df_summary_known = df_summary[df_summary[\"client\"] != \"unknown\"]\n", + "\n", + "# Compute totals from hourly data\n", + "total_connections = df_hourly[\"connections\"].sum()\n", + "impl_counts = df_hourly_known.groupby(\"client\")[\"connections\"].sum().reset_index()\n", + "impl_counts = impl_counts.sort_values(\"connections\", ascending=False)\n", + "\n", + "print(f\"Total connections: {total_connections:,}\")\n", + "print(f\"Unique clients: {df_summary['client'].nunique()}\")\n", + "print(f\"Total unique peers: {df_summary['unique_peers'].sum():,}\")" ], "execution_count": null, "outputs": [] @@ -64,24 +70,9 @@ "cell_type": "code", "metadata": {}, "source": [ - "# Known consensus clients to track individually\n", - "KNOWN_CLIENTS = {\"lighthouse\", \"teku\", \"nimbus\", \"erigon\", \"grandine\", \"lodestar\", \"prysm\"}\n", - "\n", - "# Filter out unknown implementations for cleaner visualization\n", - "df_known = df[df[\"remote_agent_implementation\"] != \"unknown\"].copy()\n", - "\n", - "# Map non-standard clients to \"Others\"\n", - "df_known[\"client\"] = df_known[\"remote_agent_implementation\"].apply(\n", - " lambda x: x if x.lower() in KNOWN_CLIENTS else \"Others\"\n", - ")\n", - "\n", - "# Count by implementation\n", - "impl_counts = df_known.groupby(\"client\").size().reset_index(name=\"count\")\n", - "impl_counts = impl_counts.sort_values(\"count\", ascending=False)\n", - "\n", "fig = px.pie(\n", " impl_counts,\n", - " values=\"count\",\n", + " values=\"connections\",\n", " names=\"client\",\n", " title=\"Client Implementation Distribution\",\n", " color_discrete_sequence=px.colors.qualitative.Set2,\n", @@ -109,9 +100,9 @@ "fig = px.bar(\n", " impl_counts,\n", " x=\"client\",\n", - " y=\"count\",\n", + " y=\"connections\",\n", " title=\"Connections by Client Implementation\",\n", - " labels={\"client\": \"Client\", \"count\": \"Connections\"},\n", + " labels={\"client\": \"Client\", \"connections\": \"Connections\"},\n", " color=\"client\",\n", " color_discrete_sequence=px.colors.qualitative.Set2,\n", ")\n", @@ -138,12 +129,8 @@ "cell_type": "code", "metadata": {}, "source": [ - "# Group by hour and implementation\n", - "df_known[\"hour\"] = pd.to_datetime(df_known[\"event_date_time\"]).dt.floor(\"h\")\n", - "hourly_impl = df_known.groupby([\"hour\", \"client\"]).size().reset_index(name=\"count\")\n", - "\n", - "# Pivot for stacked area\n", - "hourly_pivot = hourly_impl.pivot(index=\"hour\", columns=\"client\", values=\"count\").fillna(0)\n", + "# Pivot hourly data for stacked area chart\n", + "hourly_pivot = df_hourly_known.pivot(index=\"hour\", columns=\"client\", values=\"connections\").fillna(0)\n", "\n", "fig = go.Figure()\n", "for col in hourly_pivot.columns:\n", @@ -173,33 +160,29 @@ "source": [ "## Version Distribution by Client\n", "\n", - "Detailed breakdown of version distribution for each major client implementation." + "Detailed breakdown of version distribution for each major client implementation (top 15 versions per client)." ] }, { "cell_type": "code", "metadata": {}, "source": [ - "# Get known clients (exclude \"Others\" for version breakdown)\n", - "top_clients = [c for c in impl_counts[\"client\"].tolist() if c != \"Others\"]\n", + "# Get clients sorted by total connections (exclude \"Others\" and \"unknown\")\n", + "top_clients = impl_counts[~impl_counts[\"client\"].isin([\"Others\", \"unknown\"])][\"client\"].tolist()\n", "\n", - "for client in top_clients:\n", - " df_client = df_known[df_known[\"client\"] == client]\n", + "for i, client in enumerate(top_clients):\n", + " df_client = df_versions_known[df_versions_known[\"client\"] == client]\n", " \n", " if len(df_client) == 0:\n", " continue\n", " \n", - " # Count by version\n", - " version_counts = df_client.groupby(\"remote_agent_version\").size().reset_index(name=\"count\")\n", - " version_counts = version_counts.sort_values(\"count\", ascending=False).head(15)\n", - " \n", " fig = px.bar(\n", - " version_counts,\n", - " x=\"remote_agent_version\",\n", - " y=\"count\",\n", + " df_client,\n", + " x=\"version\",\n", + " y=\"connections\",\n", " title=f\"{client.capitalize()} Version Distribution\",\n", - " labels={\"remote_agent_version\": \"Version\", \"count\": \"Connections\"},\n", - " color_discrete_sequence=[px.colors.qualitative.Set2[top_clients.index(client) % len(px.colors.qualitative.Set2)]],\n", + " labels={\"version\": \"Version\", \"connections\": \"Connections\"},\n", + " color_discrete_sequence=[px.colors.qualitative.Set2[i % len(px.colors.qualitative.Set2)]],\n", " )\n", " fig.update_layout(\n", " height=400,\n", @@ -221,24 +204,13 @@ "cell_type": "code", "metadata": {}, "source": [ - "# Summary table\n", - "summary_data = []\n", - "\n", - "for client in df_known[\"client\"].unique():\n", - " df_client = df_known[df_known[\"client\"] == client]\n", - " summary_data.append({\n", - " \"Client\": client,\n", - " \"Connections\": len(df_client),\n", - " \"Unique Peers\": df_client[\"remote_peer_id\"].nunique(),\n", - " \"Versions\": df_client[\"remote_agent_version\"].nunique(),\n", - " \"Top Version\": df_client[\"remote_agent_version\"].mode().iloc[0] if len(df_client) > 0 else \"N/A\",\n", - " })\n", - "\n", - "summary_df = pd.DataFrame(summary_data)\n", - "summary_df = summary_df.sort_values(\"Connections\", ascending=False)\n", - "summary_df[\"Connections\"] = summary_df[\"Connections\"].apply(lambda x: f\"{x:,}\")\n", - "summary_df[\"Unique Peers\"] = summary_df[\"Unique Peers\"].apply(lambda x: f\"{x:,}\")\n", - "summary_df" + "# Format summary table\n", + "summary_display = df_summary_known[[\"client\", \"connections\", \"unique_peers\", \"version_count\", \"top_version\"]].copy()\n", + "summary_display.columns = [\"Client\", \"Connections\", \"Unique Peers\", \"Versions\", \"Top Version\"]\n", + "summary_display = summary_display.sort_values(\"Connections\", ascending=False)\n", + "summary_display[\"Connections\"] = summary_display[\"Connections\"].apply(lambda x: f\"{x:,}\")\n", + "summary_display[\"Unique Peers\"] = summary_display[\"Unique Peers\"].apply(lambda x: f\"{x:,}\")\n", + "summary_display" ], "execution_count": null, "outputs": [] diff --git a/pipeline.yaml b/pipeline.yaml index c50e9d1..d2873eb 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -70,11 +70,23 @@ queries: description: Column first seen timing across 128 subnets output_file: col_first_seen.parquet - client_versions: + client_hourly: module: queries.client_versions - function: fetch_client_versions - description: Consensus client versions from libp2p connections - output_file: client_versions.parquet + function: fetch_client_hourly + description: Hourly connection counts by client implementation + output_file: client_hourly.parquet + + client_version_dist: + module: queries.client_versions + function: fetch_client_version_dist + description: Version distribution per client (top 15) + output_file: client_version_dist.parquet + + client_summary: + module: queries.client_versions + function: fetch_client_summary + description: Summary statistics per client with unique peers + output_file: client_summary.parquet # ============================================ # Notebook Registry @@ -132,7 +144,9 @@ notebooks: source: notebooks/04-client-versions.ipynb schedule: daily queries: - - client_versions + - client_hourly + - client_version_dist + - client_summary parameters: - name: target_date type: date diff --git a/queries/__init__.py b/queries/__init__.py index 1b7d99e..7a8a5ca 100644 --- a/queries/__init__.py +++ b/queries/__init__.py @@ -12,7 +12,11 @@ ) from queries.blob_flow import fetch_proposer_blobs from queries.column_propagation import fetch_col_first_seen, NUM_COLUMNS -from queries.client_versions import fetch_client_versions +from queries.client_versions import ( + fetch_client_hourly, + fetch_client_version_dist, + fetch_client_summary, +) __all__ = [ # Blob inclusion @@ -26,5 +30,7 @@ "fetch_col_first_seen", "NUM_COLUMNS", # Client versions - "fetch_client_versions", + "fetch_client_hourly", + "fetch_client_version_dist", + "fetch_client_summary", ] diff --git a/queries/client_versions.py b/queries/client_versions.py index d5b3a35..557107a 100644 --- a/queries/client_versions.py +++ b/queries/client_versions.py @@ -2,25 +2,38 @@ Fetch functions for client versions analysis. Each function executes SQL and writes directly to Parquet. +Aggregation is pushed to ClickHouse for efficiency. """ from pathlib import Path +# Known consensus clients to track individually (others grouped as "Others") +KNOWN_CLIENTS_SQL = """ +CASE + WHEN lower(remote_agent_implementation) IN ( + 'lighthouse', 'teku', 'nimbus', 'erigon', 'grandine', 'lodestar', 'prysm' + ) THEN remote_agent_implementation + WHEN remote_agent_implementation IS NULL OR remote_agent_implementation = '' + THEN 'unknown' + ELSE 'Others' +END +""".strip() + def _get_date_filter(target_date: str, column: str = "event_date_time") -> str: """Generate SQL date filter for a specific date.""" return f"{column} >= '{target_date}' AND {column} < '{target_date}'::date + INTERVAL 1 DAY" -def fetch_client_versions( +def fetch_client_hourly( client, target_date: str, output_path: Path, network: str = "mainnet", ) -> int: - """Fetch client versions data from libp2p_connected and write to Parquet. + """Fetch hourly connection counts by client implementation. - This captures the clients and versions connected to xatu nodes. + Used for time series visualization and deriving total counts. Returns row count. """ @@ -28,21 +41,14 @@ def fetch_client_versions( query = f""" SELECT - meta_client_name AS node_name, - meta_client_geo_country AS node_country, - meta_client_version AS node_client_version, - meta_client_implementation AS node_client_implementation, - meta_client_id AS node_client_id, - remote_agent_implementation, - remote_agent_version, - remote_agent_version_major, - remote_agent_version_minor, - remote_agent_version_patch, - event_date_time, - remote_peer_id_unique_key AS remote_peer_id + toStartOfHour(event_date_time) AS hour, + {KNOWN_CLIENTS_SQL} AS client, + count() AS connections FROM default.libp2p_connected FINAL WHERE {date_filter} AND meta_network_name = '{network}' +GROUP BY hour, client +ORDER BY hour, client """ df = client.query_df(query) @@ -50,3 +56,89 @@ def fetch_client_versions( df.to_parquet(output_path, index=False) return len(df) + +def fetch_client_version_dist( + client, + target_date: str, + output_path: Path, + network: str = "mainnet", +) -> int: + """Fetch version distribution per client. + + Returns top versions per client with connection counts. + Used for version distribution bar charts. + + Returns row count. + """ + date_filter = _get_date_filter(target_date) + + query = f""" +SELECT + client, + version, + connections +FROM ( + SELECT + {KNOWN_CLIENTS_SQL} AS client, + coalesce(remote_agent_version, 'unknown') AS version, + count() AS connections, + row_number() OVER (PARTITION BY client ORDER BY count() DESC) AS rn + FROM default.libp2p_connected FINAL + WHERE {date_filter} + AND meta_network_name = '{network}' + GROUP BY client, version +) +WHERE rn <= 15 +ORDER BY client, connections DESC +""" + + df = client.query_df(query) + output_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(output_path, index=False) + return len(df) + + +def fetch_client_summary( + client, + target_date: str, + output_path: Path, + network: str = "mainnet", +) -> int: + """Fetch summary statistics per client. + + Includes unique peer counts (properly deduplicated per client). + Used for summary table. + + Returns row count. + """ + date_filter = _get_date_filter(target_date) + + query = f""" +SELECT + {KNOWN_CLIENTS_SQL} AS client, + count() AS connections, + uniqExact(remote_peer_id_unique_key) AS unique_peers, + count(DISTINCT coalesce(remote_agent_version, 'unknown')) AS version_count, + argMax(coalesce(remote_agent_version, 'unknown'), cnt) AS top_version +FROM ( + SELECT + remote_agent_implementation, + remote_agent_version, + remote_peer_id_unique_key, + count() OVER ( + PARTITION BY + {KNOWN_CLIENTS_SQL}, + coalesce(remote_agent_version, 'unknown') + ) AS cnt + FROM default.libp2p_connected FINAL + WHERE {date_filter} + AND meta_network_name = '{network}' +) +GROUP BY client +ORDER BY connections DESC +""" + + df = client.query_df(query) + output_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(output_path, index=False) + return len(df) From 1b3978b3c1ff9fb040e3c90e2b93385cfee8f20a Mon Sep 17 00:00:00 2001 From: raulk Date: Tue, 23 Dec 2025 22:44:50 +0000 Subject: [PATCH 4/4] adjust query. --- queries/client_versions.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/queries/client_versions.py b/queries/client_versions.py index 557107a..78511a3 100644 --- a/queries/client_versions.py +++ b/queries/client_versions.py @@ -13,7 +13,9 @@ WHEN lower(remote_agent_implementation) IN ( 'lighthouse', 'teku', 'nimbus', 'erigon', 'grandine', 'lodestar', 'prysm' ) THEN remote_agent_implementation - WHEN remote_agent_implementation IS NULL OR remote_agent_implementation = '' + WHEN remote_agent_implementation IS NULL + OR remote_agent_implementation = '' + OR remote_agent_implementation = 'unknown' THEN 'unknown' ELSE 'Others' END