diff --git a/notebooks/12-slot-classifier.ipynb b/notebooks/12-slot-classifier.ipynb new file mode 100644 index 0000000..a8636f7 --- /dev/null +++ b/notebooks/12-slot-classifier.ipynb @@ -0,0 +1,331 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0", + "metadata": {}, + "source": [ + "Analysis of Ethereum CL's Attestation Inclusion metrics using the networking events as reference. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1", + "metadata": { + "jupyter": { + "is_executing": true + } + }, + "outputs": [], + "source": [ + "import polars as pl\n", + "from loaders import load_parquet\n", + "from IPython.display import display\n", + "\n", + "import plotly.express as px\n", + "import plotly.graph_objects as go\n", + "from plotly.subplots import make_subplots\n", + "\n", + "from queries.slot_tags import TAG_ORDERS, TAG_LABELS, TAG_GROUPS, short_label\n", + "\n", + "# Global Variables\n", + "target_date = None # Use this as a default for the automation and the rendering of the page" + ] + }, + { + "cell_type": "markdown", + "id": "2", + "metadata": {}, + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3", + "metadata": {}, + "outputs": [], + "source": [ + "# read the parquet file\n", + "df = pl.from_pandas(load_parquet(\"slot_tags\", target_date=target_date))" + ] + }, + { + "cell_type": "markdown", + "id": "4", + "metadata": {}, + "source": [ + "# Tag Distribution Overview — blocks, attestations, aggregations" + ] + }, + { + "cell_type": "markdown", + "id": "5", + "metadata": {}, + "source": [ + "## Tag Distribution Overview\n", + "\n", + "Percentage of slots with each tag value, broken down per dimension and grouped by category (blocks, attestations, aggregations)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6", + "metadata": {}, + "outputs": [], + "source": [ + "total = len(df)\n", + "\n", + "def plot_tag_distribution(tag_cols: list[str], title: str) -> None:\n", + " n = len(tag_cols)\n", + " row_height = 300\n", + " fig = make_subplots(\n", + " rows=n,\n", + " cols=1,\n", + " subplot_titles=[TAG_LABELS[c] for c in tag_cols],\n", + " vertical_spacing=80 / (row_height * n), # fixed 80px gap between subplots\n", + " )\n", + " for i, col in enumerate(tag_cols, 1):\n", + " present = set(df[col].unique().to_list())\n", + " order = [v for v in TAG_ORDERS[col] if v in present]\n", + " counts = (\n", + " df\n", + " .group_by(col)\n", + " .agg(count=pl.len())\n", + " .with_columns(pct=(pl.col(\"count\") * 100 / total))\n", + " .with_columns(pl.col(col).cast(pl.Enum(order)))\n", + " .sort(col)\n", + " )\n", + " pct_values = counts[\"pct\"].to_list()\n", + " fig.add_trace(\n", + " go.Bar(\n", + " x=[short_label(v) for v in counts[col].to_list()],\n", + " y=pct_values,\n", + " text=[f\"{v:.1f}%\" for v in pct_values],\n", + " textposition=\"outside\",\n", + " showlegend=False,\n", + " marker_color=\"#6366f1\",\n", + " ),\n", + " row=i,\n", + " col=1,\n", + " )\n", + " # Extend y-axis range so outside labels stay inside the plot area,\n", + " # preventing the hover tooltip from disappearing when the cursor is over them.\n", + " fig.update_yaxes(title_text=\"% slots\", range=[0, max(pct_values) * 1.25], row=i, col=1)\n", + " fig.update_layout(\n", + " title=title,\n", + " height=row_height * n,\n", + " width=1000,\n", + " margin=dict(t=80, b=40),\n", + " hovermode=\"x\",\n", + " )\n", + " fig.show()\n", + "\n", + "\n", + "for group_name, cols in TAG_GROUPS.items():\n", + " plot_tag_distribution(cols, f\"Slot Tag Distributions — {group_name}\")" + ] + }, + { + "cell_type": "markdown", + "id": "7", + "metadata": {}, + "source": [ + "## Cross-Dimension Co-occurrence Heatmaps\n", + "\n", + "For each pair of dimensions, cells show the percentage of slots in row-category X that also have column-category Y. Rows sum to 100%." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [], + "source": [ + "def cross_tab_heatmap(df: pl.DataFrame, col_x: str, col_y: str) -> go.Figure:\n", + " \"\"\"Row-normalized cross-tab: for each value of col_x, % breakdown by col_y.\"\"\"\n", + " present_x = set(df[col_x].unique().to_list())\n", + " present_y = set(df[col_y].unique().to_list())\n", + " order_x = [v for v in TAG_ORDERS[col_x] if v in present_x]\n", + " order_y = [v for v in TAG_ORDERS[col_y] if v in present_y]\n", + "\n", + " ct = df.group_by([col_x, col_y]).agg(count=pl.len())\n", + " totals = ct.group_by(col_x).agg(total=pl.col(\"count\").sum())\n", + " ct = ct.join(totals, on=col_x).with_columns(pct=(pl.col(\"count\") * 100 / pl.col(\"total\")))\n", + "\n", + " matrix = []\n", + " for x in order_x:\n", + " row = []\n", + " for y in order_y:\n", + " cell = ct.filter((pl.col(col_x) == x) & (pl.col(col_y) == y))\n", + " row.append(round(cell[\"pct\"][0], 1) if len(cell) > 0 else 0.0)\n", + " matrix.append(row)\n", + "\n", + " fig = go.Figure(go.Heatmap(\n", + " z=matrix,\n", + " x=[short_label(v) for v in order_y],\n", + " y=[short_label(v) for v in order_x],\n", + " colorscale=\"Blues\",\n", + " zmin=0,\n", + " zmax=100,\n", + " text=[[f\"{v:.1f}%\" for v in row] for row in matrix],\n", + " texttemplate=\"%{text}\",\n", + " hoverongaps=False,\n", + " colorbar=dict(title=\"% of row\"),\n", + " ))\n", + " fig.update_layout(\n", + " title=f\"{TAG_LABELS[col_x]} vs {TAG_LABELS[col_y]}\",\n", + " xaxis_title=TAG_LABELS[col_y],\n", + " yaxis_title=TAG_LABELS[col_x],\n", + " width=900,\n", + " height=350,\n", + " margin=dict(l=200, b=140),\n", + " )\n", + " return fig\n", + "\n", + "\n", + "for col_x, col_y in [\n", + " (\"block_proposal_tag\", \"block_p50_spread_tag\"),\n", + " (\"block_size_tag\", \"block_p50_spread_tag\"),\n", + " (\"blob_count_tag\", \"block_p50_spread_tag\"),\n", + " (\"block_proposal_tag\", \"block_p50_arrival_tag\"),\n", + "]:\n", + " cross_tab_heatmap(df, col_x, col_y).show()\n", + "\n", + "for col_x, col_y in [\n", + " (\"block_proposal_tag\", \"col_first_seen_p50_tag\"),\n", + " (\"blob_count_tag\", \"col_first_seen_p50_tag\"),\n", + " (\"col_first_seen_p50_tag\", \"col_spread_p50_tag\"),\n", + "]:\n", + " cross_tab_heatmap(df, col_x, col_y).show()\n", + "\n", + "for col_x, col_y in [\n", + " (\"block_proposal_tag\", \"att_first_seen_p50_tag\"),\n", + " (\"att_first_seen_p50_tag\", \"att_spread_p50_tag\"),\n", + " (\"att_first_seen_p50_tag\", \"att_inclusion_p50_tag\"),\n", + " (\"att_spread_p50_tag\", \"att_inclusion_p50_tag\"),\n", + "]:\n", + " cross_tab_heatmap(df, col_x, col_y).show()\n", + "\n", + "for col_x, col_y in [\n", + " (\"block_proposal_tag\", \"agg_first_seen_p50_tag\"),\n", + " (\"agg_first_seen_p50_tag\", \"agg_spread_p50_tag\"),\n", + " (\"att_first_seen_p50_tag\", \"agg_first_seen_p50_tag\"),\n", + "]:\n", + " cross_tab_heatmap(df, col_x, col_y).show()" + ] + }, + { + "cell_type": "markdown", + "id": "9", + "metadata": {}, + "source": [ + "## Slot Tag Flow (Sankey)\n", + "\n", + "Shows how slots flow across dimensions: **Blob Count → Block Size → Proposal Timing → Broadcast Speed**. Width of each band is proportional to the number of slots." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10", + "metadata": {}, + "outputs": [], + "source": [ + "def build_sankey(df: pl.DataFrame, cols: list[str], title: str = \"\") -> go.Figure:\n", + " node_labels: list[str] = []\n", + " node_index: dict[tuple[str, str], int] = {}\n", + "\n", + " for col in cols:\n", + " present = set(df[col].unique().to_list())\n", + " for val in TAG_ORDERS[col]:\n", + " if val in present:\n", + " node_index[(col, val)] = len(node_labels)\n", + " node_labels.append(f\"{TAG_LABELS[col]}: {short_label(val)}\")\n", + "\n", + " sources, targets, values = [], [], []\n", + "\n", + " for i in range(len(cols) - 1):\n", + " col_a, col_b = cols[i], cols[i + 1]\n", + " flow = df.group_by([col_a, col_b]).agg(count=pl.len())\n", + " for row in flow.iter_rows(named=True):\n", + " a = node_index.get((col_a, row[col_a]))\n", + " b = node_index.get((col_b, row[col_b]))\n", + " if a is not None and b is not None:\n", + " sources.append(a)\n", + " targets.append(b)\n", + " values.append(row[\"count\"])\n", + "\n", + " fig = go.Figure(go.Sankey(\n", + " arrangement=\"snap\",\n", + " node=dict(\n", + " label=node_labels,\n", + " pad=20,\n", + " thickness=18,\n", + " color=\"#6366f1\",\n", + " ),\n", + " link=dict(\n", + " source=sources,\n", + " target=targets,\n", + " value=values,\n", + " color=\"rgba(99,102,241,0.25)\",\n", + " ),\n", + " ))\n", + " fig.update_layout(\n", + " title=title,\n", + " width=1200,\n", + " height=700,\n", + " )\n", + " return fig\n", + "\n", + "\n", + "build_sankey(\n", + " df,\n", + " [\"block_proposal_tag\", \"block_size_tag\", \"blob_count_tag\", \"block_p50_arrival_tag\", \"block_p50_spread_tag\"],\n", + " title=\"Slot Tag Flow: Blob Count → Block Size → Proposal Timing → Arrival → Broadcast Speed\",\n", + ").show()\n", + "\n", + "build_sankey(\n", + " df,\n", + " [\"block_proposal_tag\", \"blob_count_tag\", \"col_first_seen_p50_tag\", \"col_spread_p50_tag\"],\n", + " title=\"Data Column Tag Flow (P50): Block Proposal → Blob Count → First Seen → Spread\",\n", + ").show()\n", + "\n", + "build_sankey(\n", + " df,\n", + " [\"block_proposal_tag\", \"att_first_seen_p50_tag\", \"att_spread_p50_tag\", \"att_inclusion_p50_tag\"],\n", + " title=\"Attestation Tag Flow (P50): Block Proposal → First Seen → Spread → Inclusion Delay\",\n", + ").show()\n", + "\n", + "build_sankey(\n", + " df,\n", + " [\"block_proposal_tag\", \"agg_first_seen_p50_tag\", \"agg_spread_p50_tag\"],\n", + " title=\"Aggregation Tag Flow (P50): Block Proposal → First Seen → Spread\",\n", + ").show()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/pipeline.yaml b/pipeline.yaml index 128ee29..e3cf00a 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -112,12 +112,11 @@ queries: description: Block propagation by geographic region from Sentries output_file: block_propagation_by_region.parquet - block_propagation_by_region_contributoor: - module: queries.block_propagation_contributoor - function: fetch_block_propagation_by_region_contributoor - database: contributoor - description: Block propagation by geographic region from Contributoor nodes - output_file: block_propagation_by_region_contributoor.parquet + slot_tags: + module: queries.slot_tagger + function: fetch_slot_tags + description: Classify each slot for what happened based on the events + output_file: slot_tags.parquet # ============================================ # Notebook Registry @@ -241,22 +240,19 @@ notebooks: required: true order: 8 - - id: block-propagation-size - title: Block propagation - description: Block propagation timing by size with corrected MEV timing that isolates network latency from block building + - id: slot-tags + title: Slot types + description: Visualize and aggregate metrics based on what happened on each slot icon: Gauge - source: notebooks/09-block-propagation-size.ipynb + source: notebooks/12-slot-classifier.ipynb schedule: daily queries: - - block_propagation_by_size - - block_production_timeline - - block_propagation_by_region - - block_propagation_by_region_contributoor + - slot_tags parameters: - name: target_date type: date required: true - order: 9 + order: 12 # Schedule options: hourly, daily, weekly, manual # - hourly: Runs every hour, accumulating data throughout the day diff --git a/pyproject.toml b/pyproject.toml index 628256f..99dafab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,8 +18,15 @@ dependencies = [ "boto3>=1.35.0", "scipy>=1.16.3", "statsmodels>=0.14.6", + "polars>=1.38.1", ] +[tool.uv] +package = true + +[tool.setuptools.packages.find] +include = ["queries*"] + [dependency-groups] dev = [ "ipykernel>=7.1.0", diff --git a/queries/__init__.py b/queries/__init__.py index b0173be..77b860b 100644 --- a/queries/__init__.py +++ b/queries/__init__.py @@ -12,6 +12,12 @@ ) from queries.blob_flow import fetch_blob_flow from queries.column_propagation import fetch_col_first_seen, NUM_COLUMNS +from queries.slot_tagger import ( + fetch_slot_tags, + TAG_ORDERS, + TAG_GROUPS, + short_label, +) __all__ = [ # Blob inclusion @@ -23,5 +29,10 @@ "fetch_blob_flow", # Column propagation "fetch_col_first_seen", + # Slot tagger + "fetch_slot_tags", "NUM_COLUMNS", + "TAG_ORDERS", + "TAG_GROUPS", + "short_label", ] diff --git a/queries/slot_tagger.py b/queries/slot_tagger.py new file mode 100644 index 0000000..b34f1cf --- /dev/null +++ b/queries/slot_tagger.py @@ -0,0 +1,600 @@ +""" +All the logic, queries, and final output for the slot tagger logic + +Classify all the slots in the given range of dates to output a relevant map of: +- timestamp, slot -> slot_defining_tags +""" + +import pandas as pd +import clickhouse_connect +from typing import List, Protocol +import concurrent.futures + +from queries.slot_tags import * + + +def _get_date_filter(target_date: str, column: str = "slot_start_date_time") -> str: + """Generate SQL date filter for a specific date.""" + return f"{column} >= '{target_date}' AND {column} < '{target_date}'::date + INTERVAL 1 DAY" + + +def _manual_date_filter(target_date: str, base_h: int, h_interval: int, column: str = "slot_start_date_time") -> str: + """Generate a SQL date filter for a single hour window within a date.""" + end_h = base_h + h_interval + start = f"{target_date} {base_h:02d}:00:00" + if end_h >= 24: + return f"{column} >= '{start}' AND {column} < '{target_date}'::date + INTERVAL 1 DAY" + return f"{column} >= '{start}' AND {column} < '{target_date} {end_h:02d}:00:00'" + + +class SlotTaggerRule(Protocol): + """ + Interface for each of the slot-tagging rules. + Given a clichouse connection, and a start_date/end_date, return allways: slot -> tag mapping dataframe + """ + def execute( + self, client: clickhouse_connect.driver.Client, network: str, target_date: str, + ) -> (pd.DataFrame, str): + """Runs a ClickHouse query and returns a DataFrame with ['slot', 'tag'] columns.""" + pass + + +class BlockProposalAndDistributionRule: + @staticmethod + def execute( + client: clickhouse_connect.driver.Client, network: str, target_date: str, + ) -> (pd.DataFrame, str): + date_filter = _get_date_filter(target_date) + + query = f""" + WITH + block_arrivals as ( + SELECT + slot, + min(propagation_slot_start_diff) as block_proposal, + quantiles(0.50)(propagation_slot_start_diff)[1] as block_arrival_p50, + quantiles(0.50)(propagation_slot_start_diff)[1] - min(propagation_slot_start_diff) as block_spread_p50 + FROM beacon_api_eth_v1_events_block + PREWHERE {date_filter} + WHERE meta_network_name = '{network}' + GROUP BY slot + ), + block_sizes AS ( + SELECT + slot, + CASE + WHEN min(message_size) IS NULL THEN '{MISSED_SLOT}' + WHEN min(message_size) < (50 * 1014) THEN '{BLOCK_SIZE_SMALL}' + WHEN min(message_size) <= (150 * 1014) THEN '{BLOCK_SIZE_AVG}' + WHEN min(message_size) <= (500 * 1024) THEN '{BLOCK_SIZE_LARGE}' + ELSE '{BLOCK_SIZE_EXTRA_LARGE}' + END AS block_size_tag + FROM libp2p_gossipsub_beacon_block + PREWHERE {date_filter} + WHERE meta_network_name = '{network}' + GROUP BY slot + ), + blobs AS ( + SELECT + slot, + count(DISTINCT blob_index) AS blob_count + FROM canonical_beacon_blob_sidecar + PREWHERE {date_filter} + WHERE meta_network_name = '{network}' + GROUP BY slot + + ) + SELECT + bs.slot as slot, + bs.block_size_tag as block_size_tag, + CASE + WHEN ba.block_proposal IS NULL THEN '{MISSED_SLOT}' + WHEN ba.block_proposal < 1000 THEN '{EARLY_PROPOSAL}' + WHEN ba.block_proposal <= 2000 THEN '{NEUTRAL_PROPOSAL}' + WHEN ba.block_proposal <= 4000 THEN '{AGGRESSIVE_PROPOSAL}' + ELSE '{LATE_PROPOSAL}' + END AS block_proposal_tag, + CASE + WHEN ba.block_arrival_p50 IS NULL THEN '{MISSED_SLOT}' + WHEN ba.block_arrival_p50 < 2000 THEN '{BLOCK_ARRIVAL_BELOW_2S}' + WHEN ba.block_arrival_p50 < 3000 THEN '{BLOCK_ARRIVAL_BELOW_3S}' + WHEN ba.block_arrival_p50 <= 4000 THEN '{BLOCK_ARRIVAL_BELOW_4S}' + ELSE '{BLOCK_ARRIVAL_OVER_4S}' + END AS block_p50_arrival_tag, + CASE + WHEN ba.block_spread_p50 IS NULL THEN '{MISSED_SLOT}' + WHEN ba.block_spread_p50 = 0 THEN '{PROPAGATION_NOT_TRACKED}' + WHEN ba.block_spread_p50 < 500 THEN '{PROPAGATION_BELOW_500MS}' + WHEN ba.block_spread_p50 <= 750 THEN '{PROPAGATION_BELOW_750MS}' + WHEN ba.block_spread_p50 <= 1000 THEN '{PROPAGATION_BELOW_1S}' + WHEN ba.block_spread_p50 <= 1500 THEN '{PROPAGATION_BELOW_1_5S}' + WHEN ba.block_spread_p50 <= 2000 THEN '{PROPAGATION_BELOW_2S}' + WHEN ba.block_spread_p50 <= 5000 THEN '{PROPAGATION_BELOW_5S}' + ELSE '{PROPAGATION_OVER_5S}' + END AS block_p50_spread_tag, + CASE + WHEN bl.blob_count IS NULL THEN '{MISSED_SLOT}' + WHEN bl.blob_count = 0 THEN '{NO_BLOBS}' + WHEN bl.blob_count < 9 THEN '{BLOCK_BELOW_BLOB_TARGET_PROPOSAL}' + WHEN bl.blob_count <= 9 THEN '{BLOCK_ON_BLOB_TARGET_PROPOSAL}' + WHEN bl.blob_count <= 20 THEN '{BLOCK_ABOVE_BLOB_TARGET_PROPOSAL}' + ELSE '{BLOCK_MAX_BLOB_PROPOSAL}' + END AS blob_count_tag + FROM block_sizes bs + LEFT JOIN block_arrivals ba ON bs.slot == ba.slot + LEFT JOIN blobs bl ON bs.slot == bl.slot + ORDER BY bs.slot ASC + """ + return client.query_df(query), "Block arrivals" + + +class AttestationArrivalRule: + @staticmethod + def execute( + client: clickhouse_connect.driver.Client, network: str, target_date: str, + ) -> (pd.DataFrame, str): + # Run the full aggregation query in 2-hour batches to avoid ClickHouse timeouts. + batch_dfs = [] + interval = 4 + for base_h in range(0, 24, interval): + date_filter = _manual_date_filter(target_date, base_h=base_h, h_interval=interval) + query = f""" + WITH + per_attestation AS ( + SELECT + slot, + attesting_validator_index AS val_idx, + min(event_date_time) - min(slot_start_date_time) AS first_seen, + quantiles(0.50)(event_date_time)[1] - min(event_date_time) AS spread_p50 + FROM beacon_api_eth_v1_events_attestation + PREWHERE {date_filter} + WHERE meta_network_name = '{network}' + GROUP BY slot, val_idx + ), + per_attestation_inclusion AS ( + SELECT + slot, + arrayJoin(validators) AS val_idx, + min(block_slot - slot) AS inclusion_delay + FROM canonical_beacon_elaborated_attestation + PREWHERE {date_filter} + WHERE meta_network_name = '{network}' + GROUP BY slot, val_idx + ), + slot_stats AS ( + SELECT + a.slot, + quantiles(0.50)(a.first_seen)[1] AS att_first_seen_p50, + quantiles(0.90)(a.first_seen)[1] AS att_first_seen_p90, + quantiles(0.95)(a.first_seen)[1] AS att_first_seen_p95, + quantiles(0.99)(a.first_seen)[1] AS att_first_seen_p99, + quantiles(0.50)(a.spread_p50)[1] AS att_spread_p50, + quantiles(0.90)(a.spread_p50)[1] AS att_spread_p90, + quantiles(0.95)(a.spread_p50)[1] AS att_spread_p95, + quantiles(0.99)(a.spread_p50)[1] AS att_spread_p99, + quantiles(0.50)(ai.inclusion_delay)[1] AS att_inclusion_p50, + quantiles(0.90)(ai.inclusion_delay)[1] AS att_inclusion_p90, + quantiles(0.95)(ai.inclusion_delay)[1] AS att_inclusion_p95, + quantiles(0.99)(ai.inclusion_delay)[1] AS att_inclusion_p99 + FROM per_attestation a + LEFT JOIN per_attestation_inclusion ai ON (a.slot = ai.slot AND a.val_idx = ai.val_idx) + GROUP BY a.slot + ) + SELECT + slot, + CASE + WHEN att_first_seen_p50 IS NULL THEN '{ATT_FIRST_SEEN_MISSED}' + WHEN att_first_seen_p50 < 4 THEN '{ATT_ARRIVAL_BEFORE_4S}' + WHEN att_first_seen_p50 < 5 THEN '{ATT_ARRIVAL_AT_4S}' + WHEN att_first_seen_p50 < 8 THEN '{ATT_ARRIVAL_OVER_4S}' + ELSE '{ATT_ARRIVAL_LATE}' + END AS att_first_seen_p50_tag, + CASE + WHEN att_first_seen_p90 IS NULL THEN '{ATT_FIRST_SEEN_MISSED}' + WHEN att_first_seen_p90 < 4 THEN '{ATT_ARRIVAL_BEFORE_4S}' + WHEN att_first_seen_p90 < 5 THEN '{ATT_ARRIVAL_AT_4S}' + WHEN att_first_seen_p90 < 8 THEN '{ATT_ARRIVAL_OVER_4S}' + ELSE '{ATT_ARRIVAL_LATE}' + END AS att_first_seen_p90_tag, + CASE + WHEN att_first_seen_p95 IS NULL THEN '{ATT_FIRST_SEEN_MISSED}' + WHEN att_first_seen_p95 < 4 THEN '{ATT_ARRIVAL_BEFORE_4S}' + WHEN att_first_seen_p95 < 5 THEN '{ATT_ARRIVAL_AT_4S}' + WHEN att_first_seen_p95 < 8 THEN '{ATT_ARRIVAL_OVER_4S}' + ELSE '{ATT_ARRIVAL_LATE}' + END AS att_first_seen_p95_tag, + CASE + WHEN att_first_seen_p99 IS NULL THEN '{ATT_FIRST_SEEN_MISSED}' + WHEN att_first_seen_p99 < 4 THEN '{ATT_ARRIVAL_BEFORE_4S}' + WHEN att_first_seen_p99 < 5 THEN '{ATT_ARRIVAL_AT_4S}' + WHEN att_first_seen_p99 < 8 THEN '{ATT_ARRIVAL_OVER_4S}' + ELSE '{ATT_ARRIVAL_LATE}' + END AS att_first_seen_p99_tag, + CASE + WHEN att_spread_p50 IS NULL THEN '{PROPAGATION_NOT_TRACKED}' + WHEN att_spread_p50 = 0 THEN '{PROPAGATION_NOT_TRACKED}' + WHEN att_spread_p50 < 0.5 THEN '{PROPAGATION_BELOW_500MS}' + WHEN att_spread_p50 <= 0.75 THEN '{PROPAGATION_BELOW_750MS}' + WHEN att_spread_p50 <= 1.0 THEN '{PROPAGATION_BELOW_1S}' + WHEN att_spread_p50 <= 1.5 THEN '{PROPAGATION_BELOW_1_5S}' + WHEN att_spread_p50 <= 2.0 THEN '{PROPAGATION_BELOW_2S}' + WHEN att_spread_p50 <= 5.0 THEN '{PROPAGATION_BELOW_5S}' + ELSE '{PROPAGATION_OVER_5S}' + END AS att_spread_p50_tag, + CASE + WHEN att_spread_p90 IS NULL THEN '{PROPAGATION_NOT_TRACKED}' + WHEN att_spread_p90 = 0 THEN '{PROPAGATION_NOT_TRACKED}' + WHEN att_spread_p90 < 0.5 THEN '{PROPAGATION_BELOW_500MS}' + WHEN att_spread_p90 <= 0.75 THEN '{PROPAGATION_BELOW_750MS}' + WHEN att_spread_p90 <= 1.0 THEN '{PROPAGATION_BELOW_1S}' + WHEN att_spread_p90 <= 1.5 THEN '{PROPAGATION_BELOW_1_5S}' + WHEN att_spread_p90 <= 2.0 THEN '{PROPAGATION_BELOW_2S}' + WHEN att_spread_p90 <= 5.0 THEN '{PROPAGATION_BELOW_5S}' + ELSE '{PROPAGATION_OVER_5S}' + END AS att_spread_p90_tag, + CASE + WHEN att_spread_p95 IS NULL THEN '{PROPAGATION_NOT_TRACKED}' + WHEN att_spread_p95 = 0 THEN '{PROPAGATION_NOT_TRACKED}' + WHEN att_spread_p95 < 0.5 THEN '{PROPAGATION_BELOW_500MS}' + WHEN att_spread_p95 <= 0.75 THEN '{PROPAGATION_BELOW_750MS}' + WHEN att_spread_p95 <= 1.0 THEN '{PROPAGATION_BELOW_1S}' + WHEN att_spread_p95 <= 1.5 THEN '{PROPAGATION_BELOW_1_5S}' + WHEN att_spread_p95 <= 2.0 THEN '{PROPAGATION_BELOW_2S}' + WHEN att_spread_p95 <= 5.0 THEN '{PROPAGATION_BELOW_5S}' + ELSE '{PROPAGATION_OVER_5S}' + END AS att_spread_p95_tag, + CASE + WHEN att_spread_p99 IS NULL THEN '{PROPAGATION_NOT_TRACKED}' + WHEN att_spread_p99 = 0 THEN '{PROPAGATION_NOT_TRACKED}' + WHEN att_spread_p99 < 0.5 THEN '{PROPAGATION_BELOW_500MS}' + WHEN att_spread_p99 <= 0.75 THEN '{PROPAGATION_BELOW_750MS}' + WHEN att_spread_p99 <= 1.0 THEN '{PROPAGATION_BELOW_1S}' + WHEN att_spread_p99 <= 1.5 THEN '{PROPAGATION_BELOW_1_5S}' + WHEN att_spread_p99 <= 2.0 THEN '{PROPAGATION_BELOW_2S}' + WHEN att_spread_p99 <= 5.0 THEN '{PROPAGATION_BELOW_5S}' + ELSE '{PROPAGATION_OVER_5S}' + END AS att_spread_p99_tag, + CASE + WHEN att_inclusion_p50 IS NULL THEN '{MISSED_SLOT}' + WHEN att_inclusion_p50 <= 1 THEN '{ATT_INCLUSION_1}' + WHEN att_inclusion_p50 <= 2 THEN '{ATT_INCLUSION_2}' + WHEN att_inclusion_p50 <= 5 THEN '{ATT_INCLUSION_BELOW_5}' + ELSE '{ATT_INCLUSION_OVER_5}' + END AS att_inclusion_p50_tag, + CASE + WHEN att_inclusion_p90 IS NULL THEN '{MISSED_SLOT}' + WHEN att_inclusion_p90 <= 1 THEN '{ATT_INCLUSION_1}' + WHEN att_inclusion_p90 <= 2 THEN '{ATT_INCLUSION_2}' + WHEN att_inclusion_p90 <= 5 THEN '{ATT_INCLUSION_BELOW_5}' + ELSE '{ATT_INCLUSION_OVER_5}' + END AS att_inclusion_p90_tag, + CASE + WHEN att_inclusion_p95 IS NULL THEN '{MISSED_SLOT}' + WHEN att_inclusion_p95 <= 1 THEN '{ATT_INCLUSION_1}' + WHEN att_inclusion_p95 <= 2 THEN '{ATT_INCLUSION_2}' + WHEN att_inclusion_p95 <= 5 THEN '{ATT_INCLUSION_BELOW_5}' + ELSE '{ATT_INCLUSION_OVER_5}' + END AS att_inclusion_p95_tag, + CASE + WHEN att_inclusion_p99 IS NULL THEN '{MISSED_SLOT}' + WHEN att_inclusion_p99 <= 1 THEN '{ATT_INCLUSION_1}' + WHEN att_inclusion_p99 <= 2 THEN '{ATT_INCLUSION_2}' + WHEN att_inclusion_p99 <= 5 THEN '{ATT_INCLUSION_BELOW_5}' + ELSE '{ATT_INCLUSION_OVER_5}' + END AS att_inclusion_p99_tag + FROM slot_stats + ORDER BY slot ASC + """ + batch = client.query_df(query) + print(f"\t-> Attestation arrivals (hours {base_h:02d}-{base_h+2:02d}): {len(batch)} slots") + batch_dfs.append(batch) + + combined = pd.concat(batch_dfs, ignore_index=True) + return combined.sort_values("slot").reset_index(drop=True), "Attestation arrivals" + + +class AggregationBroadcastRule: + @staticmethod + def execute( + client: clickhouse_connect.driver.Client, network: str, target_date: str, + ) -> (pd.DataFrame, str): + date_filter = _get_date_filter(target_date) + + query = f""" + WITH + per_aggregate AS ( + SELECT + slot, + committee_index, + aggregator_index, + min(event_date_time) - min(slot_start_date_time) AS first_seen, + quantiles(0.50)(event_date_time)[1] - min(event_date_time) AS spread_p50 + FROM libp2p_gossipsub_aggregate_and_proof + PREWHERE {date_filter} + WHERE meta_network_name = '{network}' + GROUP BY slot, committee_index, aggregator_index + ), + slot_stats AS ( + SELECT + slot, + quantiles(0.50)(first_seen)[1] AS agg_first_seen_p50, + quantiles(0.90)(first_seen)[1] AS agg_first_seen_p90, + quantiles(0.95)(first_seen)[1] AS agg_first_seen_p95, + quantiles(0.99)(first_seen)[1] AS agg_first_seen_p99, + quantiles(0.50)(spread_p50)[1] AS agg_spread_p50, + quantiles(0.90)(spread_p50)[1] AS agg_spread_p90, + quantiles(0.95)(spread_p50)[1] AS agg_spread_p95, + quantiles(0.99)(spread_p50)[1] AS agg_spread_p99 + FROM per_aggregate + GROUP BY slot + ) + SELECT + slot, + CASE + WHEN agg_first_seen_p50 IS NULL THEN '{AGG_FIRST_SEEN_MISSED}' + WHEN agg_first_seen_p50 < 8 THEN '{AGG_ARRIVAL_BEFORE_8S}' + WHEN agg_first_seen_p50 < 9 THEN '{AGG_ARRIVAL_AT_8S}' + WHEN agg_first_seen_p50 < 12 THEN '{AGG_ARRIVAL_BEFORE_12S}' + ELSE '{AGG_ARRIVAL_LATE}' + END AS agg_first_seen_p50_tag, + CASE + WHEN agg_first_seen_p90 IS NULL THEN '{AGG_FIRST_SEEN_MISSED}' + WHEN agg_first_seen_p90 < 8 THEN '{AGG_ARRIVAL_BEFORE_8S}' + WHEN agg_first_seen_p90 < 9 THEN '{AGG_ARRIVAL_AT_8S}' + WHEN agg_first_seen_p90 < 12 THEN '{AGG_ARRIVAL_BEFORE_12S}' + ELSE '{AGG_ARRIVAL_LATE}' + END AS agg_first_seen_p90_tag, + CASE + WHEN agg_first_seen_p95 IS NULL THEN '{AGG_FIRST_SEEN_MISSED}' + WHEN agg_first_seen_p95 < 8 THEN '{AGG_ARRIVAL_BEFORE_8S}' + WHEN agg_first_seen_p95 < 9 THEN '{AGG_ARRIVAL_AT_8S}' + WHEN agg_first_seen_p95 < 12 THEN '{AGG_ARRIVAL_BEFORE_12S}' + ELSE '{AGG_ARRIVAL_LATE}' + END AS agg_first_seen_p95_tag, + CASE + WHEN agg_first_seen_p99 IS NULL THEN '{AGG_FIRST_SEEN_MISSED}' + WHEN agg_first_seen_p99 < 8 THEN '{AGG_ARRIVAL_BEFORE_8S}' + WHEN agg_first_seen_p99 < 9 THEN '{AGG_ARRIVAL_AT_8S}' + WHEN agg_first_seen_p99 < 12 THEN '{AGG_ARRIVAL_BEFORE_12S}' + ELSE '{AGG_ARRIVAL_LATE}' + END AS agg_first_seen_p99_tag, + CASE + WHEN agg_spread_p50 IS NULL THEN '{PROPAGATION_NOT_TRACKED}' + WHEN agg_spread_p50 = 0 THEN '{PROPAGATION_NOT_TRACKED}' + WHEN agg_spread_p50 < 0.5 THEN '{PROPAGATION_BELOW_500MS}' + WHEN agg_spread_p50 <= 0.75 THEN '{PROPAGATION_BELOW_750MS}' + WHEN agg_spread_p50 <= 1.0 THEN '{PROPAGATION_BELOW_1S}' + WHEN agg_spread_p50 <= 1.5 THEN '{PROPAGATION_BELOW_1_5S}' + WHEN agg_spread_p50 <= 2.0 THEN '{PROPAGATION_BELOW_2S}' + WHEN agg_spread_p50 <= 5.0 THEN '{PROPAGATION_BELOW_5S}' + ELSE '{PROPAGATION_OVER_5S}' + END AS agg_spread_p50_tag, + CASE + WHEN agg_spread_p90 IS NULL THEN '{PROPAGATION_NOT_TRACKED}' + WHEN agg_spread_p90 = 0 THEN '{PROPAGATION_NOT_TRACKED}' + WHEN agg_spread_p90 < 0.5 THEN '{PROPAGATION_BELOW_500MS}' + WHEN agg_spread_p90 <= 0.75 THEN '{PROPAGATION_BELOW_750MS}' + WHEN agg_spread_p90 <= 1.0 THEN '{PROPAGATION_BELOW_1S}' + WHEN agg_spread_p90 <= 1.5 THEN '{PROPAGATION_BELOW_1_5S}' + WHEN agg_spread_p90 <= 2.0 THEN '{PROPAGATION_BELOW_2S}' + WHEN agg_spread_p90 <= 5.0 THEN '{PROPAGATION_BELOW_5S}' + ELSE '{PROPAGATION_OVER_5S}' + END AS agg_spread_p90_tag, + CASE + WHEN agg_spread_p95 IS NULL THEN '{PROPAGATION_NOT_TRACKED}' + WHEN agg_spread_p95 = 0 THEN '{PROPAGATION_NOT_TRACKED}' + WHEN agg_spread_p95 < 0.5 THEN '{PROPAGATION_BELOW_500MS}' + WHEN agg_spread_p95 <= 0.75 THEN '{PROPAGATION_BELOW_750MS}' + WHEN agg_spread_p95 <= 1.0 THEN '{PROPAGATION_BELOW_1S}' + WHEN agg_spread_p95 <= 1.5 THEN '{PROPAGATION_BELOW_1_5S}' + WHEN agg_spread_p95 <= 2.0 THEN '{PROPAGATION_BELOW_2S}' + WHEN agg_spread_p95 <= 5.0 THEN '{PROPAGATION_BELOW_5S}' + ELSE '{PROPAGATION_OVER_5S}' + END AS agg_spread_p95_tag, + CASE + WHEN agg_spread_p99 IS NULL THEN '{PROPAGATION_NOT_TRACKED}' + WHEN agg_spread_p99 = 0 THEN '{PROPAGATION_NOT_TRACKED}' + WHEN agg_spread_p99 < 0.5 THEN '{PROPAGATION_BELOW_500MS}' + WHEN agg_spread_p99 <= 0.75 THEN '{PROPAGATION_BELOW_750MS}' + WHEN agg_spread_p99 <= 1.0 THEN '{PROPAGATION_BELOW_1S}' + WHEN agg_spread_p99 <= 1.5 THEN '{PROPAGATION_BELOW_1_5S}' + WHEN agg_spread_p99 <= 2.0 THEN '{PROPAGATION_BELOW_2S}' + WHEN agg_spread_p99 <= 5.0 THEN '{PROPAGATION_BELOW_5S}' + ELSE '{PROPAGATION_OVER_5S}' + END AS agg_spread_p99_tag + FROM slot_stats + ORDER BY slot ASC + """ + return client.query_df(query), "Aggregation arrivals" + + +class DataColumnBroadcastRule: + @staticmethod + def execute( + client: clickhouse_connect.driver.Client, network: str, target_date: str, + ) -> (pd.DataFrame, str): + date_filter = _get_date_filter(target_date, column="slot_start_date_time") + + query = f""" + WITH + per_column AS ( + SELECT + slot, + column_index, + min(propagation_slot_start_diff) AS first_seen, + quantiles(0.50)(propagation_slot_start_diff)[1] - min(propagation_slot_start_diff) AS spread_p50 + FROM beacon_api_eth_v1_events_data_column_sidecar + PREWHERE {date_filter} + WHERE meta_network_name = '{network}' + GROUP BY slot, column_index + ), + slot_stats AS ( + SELECT + slot, + min(first_seen) as first_col_proposal, + max(first_seen) as last_col_proposal, + quantiles(0.50)(first_seen)[1] AS col_first_seen_p50, + quantiles(0.90)(first_seen)[1] AS col_first_seen_p90, + quantiles(0.95)(first_seen)[1] AS col_first_seen_p95, + quantiles(0.99)(first_seen)[1] AS col_first_seen_p99, + quantiles(0.50)(spread_p50)[1] AS col_spread_p50, + quantiles(0.90)(spread_p50)[1] AS col_spread_p90, + quantiles(0.95)(spread_p50)[1] AS col_spread_p95, + quantiles(0.99)(spread_p50)[1] AS col_spread_p99 + FROM per_column + GROUP BY slot + ) + SELECT + slot, + CASE + WHEN first_col_proposal IS NULL THEN '{NO_BLOBS}' + WHEN first_col_proposal < 1000 THEN '{EARLY_PROPOSAL}' + WHEN first_col_proposal <= 2000 THEN '{NEUTRAL_PROPOSAL}' + WHEN first_col_proposal <= 4000 THEN '{AGGRESSIVE_PROPOSAL}' + ELSE '{LATE_PROPOSAL}' + END AS first_col_proposal_tag, + CASE + WHEN last_col_proposal IS NULL THEN '{NO_BLOBS}' + WHEN last_col_proposal < 1000 THEN '{EARLY_PROPOSAL}' + WHEN last_col_proposal <= 2000 THEN '{NEUTRAL_PROPOSAL}' + WHEN last_col_proposal <= 4000 THEN '{AGGRESSIVE_PROPOSAL}' + ELSE '{LATE_PROPOSAL}' + END AS last_col_proposal_tag, + CASE + WHEN col_first_seen_p50 IS NULL THEN '{COL_FIRST_SEEN_MISSED}' + WHEN col_first_seen_p50 < 4000 THEN '{COL_ARRIVAL_BEFORE_4S}' + WHEN col_first_seen_p50 < 5000 THEN '{COL_ARRIVAL_AT_4S}' + WHEN col_first_seen_p50 < 8000 THEN '{COL_ARRIVAL_OVER_4S}' + ELSE '{COL_ARRIVAL_LATE}' + END AS col_first_seen_p50_tag, + CASE + WHEN col_first_seen_p90 IS NULL THEN '{COL_FIRST_SEEN_MISSED}' + WHEN col_first_seen_p90 < 4000 THEN '{COL_ARRIVAL_BEFORE_4S}' + WHEN col_first_seen_p90 < 5000 THEN '{COL_ARRIVAL_AT_4S}' + WHEN col_first_seen_p90 < 8000 THEN '{COL_ARRIVAL_OVER_4S}' + ELSE '{COL_ARRIVAL_LATE}' + END AS col_first_seen_p90_tag, + CASE + WHEN col_first_seen_p95 IS NULL THEN '{COL_FIRST_SEEN_MISSED}' + WHEN col_first_seen_p95 < 4000 THEN '{COL_ARRIVAL_BEFORE_4S}' + WHEN col_first_seen_p95 < 5000 THEN '{COL_ARRIVAL_AT_4S}' + WHEN col_first_seen_p95 < 8000 THEN '{COL_ARRIVAL_OVER_4S}' + ELSE '{COL_ARRIVAL_LATE}' + END AS col_first_seen_p95_tag, + CASE + WHEN col_first_seen_p99 IS NULL THEN '{COL_FIRST_SEEN_MISSED}' + WHEN col_first_seen_p99 < 4000 THEN '{COL_ARRIVAL_BEFORE_4S}' + WHEN col_first_seen_p99 < 5000 THEN '{COL_ARRIVAL_AT_4S}' + WHEN col_first_seen_p99 < 8000 THEN '{COL_ARRIVAL_OVER_4S}' + ELSE '{COL_ARRIVAL_LATE}' + END AS col_first_seen_p99_tag, + CASE + WHEN col_spread_p50 IS NULL THEN '{PROPAGATION_NOT_TRACKED}' + WHEN col_spread_p50 = 0 THEN '{PROPAGATION_NOT_TRACKED}' + WHEN col_spread_p50 < 500 THEN '{PROPAGATION_BELOW_500MS}' + WHEN col_spread_p50 <= 750 THEN '{PROPAGATION_BELOW_750MS}' + WHEN col_spread_p50 <= 1000 THEN '{PROPAGATION_BELOW_1S}' + WHEN col_spread_p50 <= 1500 THEN '{PROPAGATION_BELOW_1_5S}' + WHEN col_spread_p50 <= 2000 THEN '{PROPAGATION_BELOW_2S}' + WHEN col_spread_p50 <= 5000 THEN '{PROPAGATION_BELOW_5S}' + ELSE '{PROPAGATION_OVER_5S}' + END AS col_spread_p50_tag, + CASE + WHEN col_spread_p90 IS NULL THEN '{PROPAGATION_NOT_TRACKED}' + WHEN col_spread_p90 = 0 THEN '{PROPAGATION_NOT_TRACKED}' + WHEN col_spread_p90 < 500 THEN '{PROPAGATION_BELOW_500MS}' + WHEN col_spread_p90 <= 750 THEN '{PROPAGATION_BELOW_750MS}' + WHEN col_spread_p90 <= 1000 THEN '{PROPAGATION_BELOW_1S}' + WHEN col_spread_p90 <= 1500 THEN '{PROPAGATION_BELOW_1_5S}' + WHEN col_spread_p90 <= 2000 THEN '{PROPAGATION_BELOW_2S}' + WHEN col_spread_p90 <= 5000 THEN '{PROPAGATION_BELOW_5S}' + ELSE '{PROPAGATION_OVER_5S}' + END AS col_spread_p90_tag, + CASE + WHEN col_spread_p95 IS NULL THEN '{PROPAGATION_NOT_TRACKED}' + WHEN col_spread_p95 = 0 THEN '{PROPAGATION_NOT_TRACKED}' + WHEN col_spread_p95 < 500 THEN '{PROPAGATION_BELOW_500MS}' + WHEN col_spread_p95 <= 750 THEN '{PROPAGATION_BELOW_750MS}' + WHEN col_spread_p95 <= 1000 THEN '{PROPAGATION_BELOW_1S}' + WHEN col_spread_p95 <= 1500 THEN '{PROPAGATION_BELOW_1_5S}' + WHEN col_spread_p95 <= 2000 THEN '{PROPAGATION_BELOW_2S}' + WHEN col_spread_p95 <= 5000 THEN '{PROPAGATION_BELOW_5S}' + ELSE '{PROPAGATION_OVER_5S}' + END AS col_spread_p95_tag, + CASE + WHEN col_spread_p99 IS NULL THEN '{PROPAGATION_NOT_TRACKED}' + WHEN col_spread_p99 = 0 THEN '{PROPAGATION_NOT_TRACKED}' + WHEN col_spread_p99 < 500 THEN '{PROPAGATION_BELOW_500MS}' + WHEN col_spread_p99 <= 750 THEN '{PROPAGATION_BELOW_750MS}' + WHEN col_spread_p99 <= 1000 THEN '{PROPAGATION_BELOW_1S}' + WHEN col_spread_p99 <= 1500 THEN '{PROPAGATION_BELOW_1_5S}' + WHEN col_spread_p99 <= 2000 THEN '{PROPAGATION_BELOW_2S}' + WHEN col_spread_p99 <= 5000 THEN '{PROPAGATION_BELOW_5S}' + ELSE '{PROPAGATION_OVER_5S}' + END AS col_spread_p99_tag + FROM slot_stats + ORDER BY slot ASC + """ + return client.query_df(query), "Data column arrivals" + + +class SlotTagger: + def __init__(self, clickhouse_client): + self.client = clickhouse_client + self.rules: List[SlotTaggerRule] = [ + BlockProposalAndDistributionRule(), + DataColumnBroadcastRule(), + AttestationArrivalRule(), + AggregationBroadcastRule(), + ] + + def add_rule(self, rule: SlotTaggerRule): + self.rules.append(rule) + + def run(self, network: str, target_date: str) -> pd.DataFrame: + dataframes = [] + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [ + executor.submit(rule.execute, self.client, network, target_date) + for rule in self.rules + ] + + for future in concurrent.futures.as_completed(futures): + try: + df, name = future.result() + print(f" -> query for {name} done: {len(df)} rows") + if len(df) > 0: + dataframes.append(df) + except Exception as e: + import traceback + print(f"Error executing a tagging rule: {e}") + traceback.print_exc() + + if len(dataframes) == 0: + return pd.DataFrame(columns=["slot"]) + + final_df = pd.DataFrame() + for i, df in enumerate(dataframes): + if i == 0: + final_df = df + else: + final_df = pd.merge(final_df, df, on="slot", how="left") + + # Slots with no blobs have no data-column sidecar events, so all col_* tag + # columns are None after the left merge. Fill them with NO_BLOBS. + col_tag_cols = [c for c in final_df.columns if c.contains("_col_") or (c.startswith("col_") and c.endswith("_tag"))] + if col_tag_cols: + final_df[col_tag_cols] = final_df[col_tag_cols].fillna(NO_BLOBS) + + return final_df.sort_values("slot").reset_index(drop=True) + + +def fetch_slot_tags( + client, + target_date: str, + network: str = "mainnet", +) -> tuple: + tagger = SlotTagger(client) + df = tagger.run(network, target_date) + return df, "tags" diff --git a/queries/slot_tags.py b/queries/slot_tags.py new file mode 100644 index 0000000..d82314a --- /dev/null +++ b/queries/slot_tags.py @@ -0,0 +1,251 @@ +# Tags that could be applied to a slot +""" +Some of these tags or subtags can't be combined: i.e., the proposal of the block could only be one of the items listed below. +""" + +MISSED_SLOT = "missed_slot" +NO_BLOBS = "no_blobs" + +# --- 1. Block-related --- +# --- 1.1. Block Proposer --- +BLOCK_STAKING_ENTITY_PROPOSER = "block_staking_entity_proposer" +BLOCK_HOME_STAKING_PROPOSER = "block_home_staking_proposer" + +# 1.2. Proposal of the block --- +EARLY_PROPOSAL = "early_proposal" +NEUTRAL_PROPOSAL = "neutral_proposal" +AGGRESSIVE_PROPOSAL = "aggressive_proposal" +LATE_PROPOSAL = "late_proposal" + +# --- 1.3. Builder of the block --- +# Who is the builder of the block +BLOCK_LOCAL_BUILDER_PROPOSAL = "block_local_builder_proposal" +BLOCK_MEV_BUILDER_PROPOSAL = "block_mev_builder_proposal" +BLOCK_BUILDER_TAG = "block_builder_tag" +BLOCK_MEV_RELAY_TAG = "block_mev_relay_tag" + +# --- 1.4. Blobs per block +BLOCK_BELOW_BLOB_TARGET_PROPOSAL = "block_below_blob_target_proposal" +BLOCK_ON_BLOB_TARGET_PROPOSAL = "block_on_blob_target_proposal" +BLOCK_ABOVE_BLOB_TARGET_PROPOSAL = "block_above_blob_target_proposal" +BLOCK_MAX_BLOB_PROPOSAL = "block_max_blob_proposal" + +# --- 1.5 Block arrival (shared across all percentiles; column name gives context) +BLOCK_ARRIVAL_BELOW_2S = "block_arrival_below_2s" +BLOCK_ARRIVAL_BELOW_3S = "block_arrival_below_3s" +BLOCK_ARRIVAL_BELOW_4S = "block_arrival_below_4s" +BLOCK_ARRIVAL_OVER_4S = "block_arrival_over_4s" + +# --- 1.6 Propagation spread (shared across block, attestation, and aggregation) +# The column name in the DataFrame identifies which metric this applies to. +PROPAGATION_NOT_TRACKED = "propagation_not_tracked" +PROPAGATION_BELOW_500MS = "propagation_below_500ms" +PROPAGATION_BELOW_750MS = "propagation_below_750ms" +PROPAGATION_BELOW_1S = "propagation_below_1s" +PROPAGATION_BELOW_1_5S = "propagation_below_1_5s" +PROPAGATION_BELOW_2S = "propagation_below_2s" +PROPAGATION_BELOW_5S = "propagation_below_5s" +PROPAGATION_OVER_5S = "propagation_over_5s" + +# --- 1.7 Block size +BLOCK_SIZE_SMALL = "small_block_size" +BLOCK_SIZE_AVG = "avg_block_size" +BLOCK_SIZE_LARGE = "large_block_size" +BLOCK_SIZE_EXTRA_LARGE = "extra_large_block_size" + + +# --- 1.8 Data Column arrival (shared across all percentiles; column name gives context) +COL_FIRST_SEEN_MISSED = "col_first_seen_missed" +COL_ARRIVAL_BEFORE_4S = "col_arrival_before_4s" +COL_ARRIVAL_AT_4S = "col_arrival_at_4s" +COL_ARRIVAL_OVER_4S = "col_arrival_over_4s" +COL_ARRIVAL_LATE = "col_arrival_late" + +# --- 1.9 Single Attestation arrivals +ATT_FIRST_SEEN_MISSED = "att_first_seen_missed" +ATT_ARRIVAL_BEFORE_4S = "att_arrival_before_4s" +ATT_ARRIVAL_AT_4S = "att_arrival_at_4s" +ATT_ARRIVAL_OVER_4S = "att_arrival_over_4s" +ATT_ARRIVAL_LATE = "att_arrival_late" + +# inclusion_delay distribution across attestations per slot (slots). +ATT_INCLUSION_1 = "att_inclusion_1" +ATT_INCLUSION_2 = "att_inclusion_2" +ATT_INCLUSION_BELOW_5 = "att_inclusion_below_5" +ATT_INCLUSION_OVER_5 = "att_inclusion_over_5" + +# --- 1.10 Attestation aggregation arrivals +AGG_FIRST_SEEN_MISSED = "agg_first_seen_missed" +AGG_ARRIVAL_BEFORE_8S = "agg_arrival_before_8s" +AGG_ARRIVAL_AT_8S = "agg_arrival_at_8s" +AGG_ARRIVAL_BEFORE_12S = "agg_arrival_before_12s" +AGG_ARRIVAL_LATE = "agg_arrival_late" + + +# --- Visualization helpers --- + +# Shared ordering list for all spread/broadcast tag columns (block, col, att, agg). +# Column name in the DataFrame provides the context; the value set is identical. +_PROPAGATION_SPREAD_ORDER: list[str] = [ + PROPAGATION_NOT_TRACKED, + PROPAGATION_BELOW_500MS, + PROPAGATION_BELOW_750MS, + PROPAGATION_BELOW_1S, + PROPAGATION_BELOW_1_5S, + PROPAGATION_BELOW_2S, + PROPAGATION_BELOW_5S, + PROPAGATION_OVER_5S, +] + +TAG_ORDERS: dict[str, list[str]] = { + "block_size_tag": [ + BLOCK_SIZE_SMALL, + BLOCK_SIZE_AVG, + BLOCK_SIZE_LARGE, + BLOCK_SIZE_EXTRA_LARGE, + ], + "block_proposal_tag": [EARLY_PROPOSAL, NEUTRAL_PROPOSAL, AGGRESSIVE_PROPOSAL,LATE_PROPOSAL], + "block_p50_arrival_tag": [BLOCK_ARRIVAL_BELOW_2S, BLOCK_ARRIVAL_BELOW_3S, BLOCK_ARRIVAL_BELOW_4S, BLOCK_ARRIVAL_OVER_4S], + "block_p50_spread_tag": _PROPAGATION_SPREAD_ORDER, + "blob_count_tag": [ + NO_BLOBS, + BLOCK_BELOW_BLOB_TARGET_PROPOSAL, + BLOCK_ON_BLOB_TARGET_PROPOSAL, + BLOCK_ABOVE_BLOB_TARGET_PROPOSAL, + BLOCK_MAX_BLOB_PROPOSAL, + ], + "first_col_proposal_tag": [NO_BLOBS, EARLY_PROPOSAL, NEUTRAL_PROPOSAL, AGGRESSIVE_PROPOSAL, LATE_PROPOSAL], + "last_col_proposal_tag": [NO_BLOBS, EARLY_PROPOSAL, NEUTRAL_PROPOSAL, AGGRESSIVE_PROPOSAL, LATE_PROPOSAL], + "col_first_seen_p50_tag": [NO_BLOBS, COL_FIRST_SEEN_MISSED, COL_ARRIVAL_BEFORE_4S, COL_ARRIVAL_AT_4S, COL_ARRIVAL_OVER_4S, COL_ARRIVAL_LATE], + "col_first_seen_p90_tag": [NO_BLOBS, COL_FIRST_SEEN_MISSED, COL_ARRIVAL_BEFORE_4S, COL_ARRIVAL_AT_4S, COL_ARRIVAL_OVER_4S, COL_ARRIVAL_LATE], + "col_first_seen_p95_tag": [NO_BLOBS, COL_FIRST_SEEN_MISSED, COL_ARRIVAL_BEFORE_4S, COL_ARRIVAL_AT_4S, COL_ARRIVAL_OVER_4S, COL_ARRIVAL_LATE], + "col_first_seen_p99_tag": [NO_BLOBS, COL_FIRST_SEEN_MISSED, COL_ARRIVAL_BEFORE_4S, COL_ARRIVAL_AT_4S, COL_ARRIVAL_OVER_4S, COL_ARRIVAL_LATE], + + "col_spread_p50_tag": [NO_BLOBS, *_PROPAGATION_SPREAD_ORDER], + "col_spread_p90_tag": [NO_BLOBS, *_PROPAGATION_SPREAD_ORDER], + "col_spread_p95_tag": [NO_BLOBS, *_PROPAGATION_SPREAD_ORDER], + "col_spread_p99_tag": [NO_BLOBS, *_PROPAGATION_SPREAD_ORDER], + + "att_first_seen_p50_tag": [ATT_FIRST_SEEN_MISSED, ATT_ARRIVAL_BEFORE_4S, ATT_ARRIVAL_AT_4S, ATT_ARRIVAL_OVER_4S, ATT_ARRIVAL_LATE], + "att_first_seen_p90_tag": [ATT_FIRST_SEEN_MISSED, ATT_ARRIVAL_BEFORE_4S, ATT_ARRIVAL_AT_4S, ATT_ARRIVAL_OVER_4S, ATT_ARRIVAL_LATE], + "att_first_seen_p95_tag": [ATT_FIRST_SEEN_MISSED, ATT_ARRIVAL_BEFORE_4S, ATT_ARRIVAL_AT_4S, ATT_ARRIVAL_OVER_4S, ATT_ARRIVAL_LATE], + "att_first_seen_p99_tag": [ATT_FIRST_SEEN_MISSED, ATT_ARRIVAL_BEFORE_4S, ATT_ARRIVAL_AT_4S, ATT_ARRIVAL_OVER_4S, ATT_ARRIVAL_LATE], + + "att_spread_p50_tag": _PROPAGATION_SPREAD_ORDER, + "att_spread_p90_tag": _PROPAGATION_SPREAD_ORDER, + "att_spread_p95_tag": _PROPAGATION_SPREAD_ORDER, + "att_spread_p99_tag": _PROPAGATION_SPREAD_ORDER, + + "att_inclusion_p50_tag": [ATT_INCLUSION_1, ATT_INCLUSION_2, ATT_INCLUSION_BELOW_5, ATT_INCLUSION_OVER_5], + "att_inclusion_p90_tag": [ATT_INCLUSION_1, ATT_INCLUSION_2, ATT_INCLUSION_BELOW_5, ATT_INCLUSION_OVER_5], + "att_inclusion_p95_tag": [ATT_INCLUSION_1, ATT_INCLUSION_2, ATT_INCLUSION_BELOW_5, ATT_INCLUSION_OVER_5], + "att_inclusion_p99_tag": [ATT_INCLUSION_1, ATT_INCLUSION_2, ATT_INCLUSION_BELOW_5, ATT_INCLUSION_OVER_5], + + "agg_first_seen_p50_tag": [AGG_FIRST_SEEN_MISSED, AGG_ARRIVAL_BEFORE_8S, AGG_ARRIVAL_AT_8S, AGG_ARRIVAL_BEFORE_12S, AGG_ARRIVAL_LATE], + "agg_first_seen_p90_tag": [AGG_FIRST_SEEN_MISSED, AGG_ARRIVAL_BEFORE_8S, AGG_ARRIVAL_AT_8S, AGG_ARRIVAL_BEFORE_12S, AGG_ARRIVAL_LATE], + "agg_first_seen_p95_tag": [AGG_FIRST_SEEN_MISSED, AGG_ARRIVAL_BEFORE_8S, AGG_ARRIVAL_AT_8S, AGG_ARRIVAL_BEFORE_12S, AGG_ARRIVAL_LATE], + "agg_first_seen_p99_tag": [AGG_FIRST_SEEN_MISSED, AGG_ARRIVAL_BEFORE_8S, AGG_ARRIVAL_AT_8S, AGG_ARRIVAL_BEFORE_12S, AGG_ARRIVAL_LATE], + + "agg_spread_p50_tag": _PROPAGATION_SPREAD_ORDER, + "agg_spread_p90_tag": _PROPAGATION_SPREAD_ORDER, + "agg_spread_p95_tag": _PROPAGATION_SPREAD_ORDER, + "agg_spread_p99_tag": _PROPAGATION_SPREAD_ORDER, +} + +TAG_GROUPS: dict[str, list[str]] = { + "Blocks": [ + "blob_count_tag", + "block_size_tag", + "block_proposal_tag", + "block_p50_arrival_tag", + "block_p50_spread_tag", + ], + "Data Columns": [ + "first_col_proposal_tag", + "last_col_proposal_tag", + "col_first_seen_p50_tag", + "col_first_seen_p90_tag", + "col_first_seen_p95_tag", + "col_first_seen_p99_tag", + "col_spread_p50_tag", + "col_spread_p90_tag", + "col_spread_p95_tag", + "col_spread_p99_tag", + ], + "Attestations": [ + "att_first_seen_p50_tag", + "att_first_seen_p90_tag", + "att_first_seen_p95_tag", + "att_first_seen_p99_tag", + "att_spread_p50_tag", + "att_spread_p90_tag", + "att_spread_p95_tag", + "att_spread_p99_tag", + "att_inclusion_p50_tag", + "att_inclusion_p90_tag", + "att_inclusion_p95_tag", + "att_inclusion_p99_tag", + ], + "Aggregations": [ + "agg_first_seen_p50_tag", + "agg_first_seen_p90_tag", + "agg_first_seen_p95_tag", + "agg_first_seen_p99_tag", + "agg_spread_p50_tag", + "agg_spread_p90_tag", + "agg_spread_p95_tag", + "agg_spread_p99_tag", + ], +} + +TAG_LABELS: dict[str, str] = { + "block_size_tag": "Block Size", + "block_proposal_tag": "Block Proposal Timing", + "block_p50_arrival_tag": "Block P50 Arrival", + "block_p50_spread_tag": "Block P50 Broadcast", + "blob_count_tag": "Blob Count", + "att_first_seen_p50_tag": "Att First Seen P50", + "att_first_seen_p90_tag": "Att First Seen P90", + "att_first_seen_p95_tag": "Att First Seen P95", + "att_first_seen_p99_tag": "Att First Seen P99", + "att_spread_p50_tag": "Att Spread P50", + "att_spread_p90_tag": "Att Spread P90", + "att_spread_p95_tag": "Att Spread P95", + "att_spread_p99_tag": "Att Spread P99", + "att_inclusion_p50_tag": "Att Inclusion Delay P50", + "att_inclusion_p90_tag": "Att Inclusion Delay P90", + "att_inclusion_p95_tag": "Att Inclusion Delay P95", + "att_inclusion_p99_tag": "Att Inclusion Delay P99", + "agg_first_seen_p50_tag": "Agg First Seen P50", + "agg_first_seen_p90_tag": "Agg First Seen P90", + "agg_first_seen_p95_tag": "Agg First Seen P95", + "agg_first_seen_p99_tag": "Agg First Seen P99", + "agg_spread_p50_tag": "Agg Spread P50", + "agg_spread_p90_tag": "Agg Spread P90", + "agg_spread_p95_tag": "Agg Spread P95", + "agg_spread_p99_tag": "Agg Spread P99", + "first_col_proposal_tag": "First Col Proposal", + "last_col_proposal_tag": "Last Col Proposal", + "col_first_seen_p50_tag": "Col First Seen P50", + "col_first_seen_p90_tag": "Col First Seen P90", + "col_first_seen_p95_tag": "Col First Seen P95", + "col_first_seen_p99_tag": "Col First Seen P99", + "col_spread_p50_tag": "Col Spread P50", + "col_spread_p90_tag": "Col Spread P90", + "col_spread_p95_tag": "Col Spread P95", + "col_spread_p99_tag": "Col Spread P99", +} + + +def short_label(val: str | None) -> str: + """Strip common tag prefixes for compact axis labels.""" + if val is None: + return "n/a" + return ( + val + .replace("block_p50_propagation_", "") + .replace("block_", "") + .replace("_proposal", "") + .replace("_tag", "") + .replace("_", " ") + ) diff --git a/uv.lock b/uv.lock index e01f550..6b10c04 100644 --- a/uv.lock +++ b/uv.lock @@ -1202,7 +1202,7 @@ wheels = [ [[package]] name = "peerdas" version = "0.1.0" -source = { virtual = "." } +source = { editable = "." } dependencies = [ { name = "altair" }, { name = "boto3" }, @@ -1213,6 +1213,7 @@ dependencies = [ { name = "pandas" }, { name = "papermill" }, { name = "plotly" }, + { name = "polars" }, { name = "pyarrow" }, { name = "python-dotenv" }, { name = "pyyaml" }, @@ -1239,6 +1240,7 @@ requires-dist = [ { name = "pandas", specifier = ">=2.0" }, { name = "papermill", specifier = ">=2.6.0" }, { name = "plotly", specifier = ">=5.0" }, + { name = "polars", specifier = ">=1.38.1" }, { name = "pyarrow", specifier = ">=22.0.0" }, { name = "python-dotenv", specifier = ">=1.0" }, { name = "pyyaml", specifier = ">=6.0.3" }, @@ -1288,6 +1290,34 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e7/c3/3031c931098de393393e1f93a38dc9ed6805d86bb801acc3cf2d5bd1e6b7/plotly-6.5.0-py3-none-any.whl", hash = "sha256:5ac851e100367735250206788a2b1325412aa4a4917a4fe3e6f0bc5aa6f3d90a", size = 9893174, upload-time = "2025-11-17T18:39:20.351Z" }, ] +[[package]] +name = "polars" +version = "1.38.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "polars-runtime-32" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c6/5e/208a24471a433bcd0e9a6889ac49025fd4daad2815c8220c5bd2576e5f1b/polars-1.38.1.tar.gz", hash = "sha256:803a2be5344ef880ad625addfb8f641995cfd777413b08a10de0897345778239", size = 717667, upload-time = "2026-02-06T18:13:23.013Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0a/49/737c1a6273c585719858261753da0b688454d1b634438ccba8a9c4eb5aab/polars-1.38.1-py3-none-any.whl", hash = "sha256:a29479c48fed4984d88b656486d221f638cba45d3e961631a50ee5fdde38cb2c", size = 810368, upload-time = "2026-02-06T18:11:55.819Z" }, +] + +[[package]] +name = "polars-runtime-32" +version = "1.38.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/07/4b/04d6b3fb7cf336fbe12fbc4b43f36d1783e11bb0f2b1e3980ec44878df06/polars_runtime_32-1.38.1.tar.gz", hash = "sha256:04f20ed1f5c58771f34296a27029dc755a9e4b1390caeaef8f317e06fdfce2ec", size = 2812631, upload-time = "2026-02-06T18:13:25.206Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ae/a2/a00defbddadd8cf1042f52380dcba6b6592b03bac8e3b34c436b62d12d3b/polars_runtime_32-1.38.1-cp310-abi3-macosx_10_12_x86_64.whl", hash = "sha256:18154e96044724a0ac38ce155cf63aa03c02dd70500efbbf1a61b08cadd269ef", size = 44108001, upload-time = "2026-02-06T18:11:58.127Z" }, + { url = "https://files.pythonhosted.org/packages/a7/fb/599ff3709e6a303024efd7edfd08cf8de55c6ac39527d8f41cbc4399385f/polars_runtime_32-1.38.1-cp310-abi3-macosx_11_0_arm64.whl", hash = "sha256:c49acac34cc4049ed188f1eb67d6ff3971a39b4af7f7b734b367119970f313ac", size = 40230140, upload-time = "2026-02-06T18:12:01.181Z" }, + { url = "https://files.pythonhosted.org/packages/dc/8c/3ac18d6f89dc05fe2c7c0ee1dc5b81f77a5c85ad59898232c2500fe2ebbf/polars_runtime_32-1.38.1-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fef2ef2626a954e010e006cc8e4de467ecf32d08008f130cea1c78911f545323", size = 41994039, upload-time = "2026-02-06T18:12:04.332Z" }, + { url = "https://files.pythonhosted.org/packages/f2/5a/61d60ec5cc0ab37cbd5a699edb2f9af2875b7fdfdfb2a4608ca3cc5f0448/polars_runtime_32-1.38.1-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e8a5f7a8125e2d50e2e060296551c929aec09be23a9edcb2b12ca923f555a5ba", size = 45755804, upload-time = "2026-02-06T18:12:07.846Z" }, + { url = "https://files.pythonhosted.org/packages/91/54/02cd4074c98c361ccd3fec3bcb0bd68dbc639c0550c42a4436b0ff0f3ccf/polars_runtime_32-1.38.1-cp310-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:10d19cd9863e129273b18b7fcaab625b5c8143c2d22b3e549067b78efa32e4fa", size = 42159605, upload-time = "2026-02-06T18:12:10.919Z" }, + { url = "https://files.pythonhosted.org/packages/8e/f3/b2a5e720cc56eaa38b4518e63aa577b4bbd60e8b05a00fe43ca051be5879/polars_runtime_32-1.38.1-cp310-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:61e8d73c614b46a00d2f853625a7569a2e4a0999333e876354ac81d1bf1bb5e2", size = 45336615, upload-time = "2026-02-06T18:12:14.074Z" }, + { url = "https://files.pythonhosted.org/packages/f1/8d/ee2e4b7de948090cfb3df37d401c521233daf97bfc54ddec5d61d1d31618/polars_runtime_32-1.38.1-cp310-abi3-win_amd64.whl", hash = "sha256:08c2b3b93509c1141ac97891294ff5c5b0c548a373f583eaaea873a4bf506437", size = 45680732, upload-time = "2026-02-06T18:12:19.097Z" }, + { url = "https://files.pythonhosted.org/packages/bf/18/72c216f4ab0c82b907009668f79183ae029116ff0dd245d56ef58aac48e7/polars_runtime_32-1.38.1-cp310-abi3-win_arm64.whl", hash = "sha256:6d07d0cc832bfe4fb54b6e04218c2c27afcfa6b9498f9f6bbf262a00d58cc7c4", size = 41639413, upload-time = "2026-02-06T18:12:22.044Z" }, +] + [[package]] name = "prometheus-client" version = "0.23.1"