forked from hyperliquid-dex/hyperliquid-python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathevm_block_indexer.py
More file actions
196 lines (164 loc) · 8.17 KB
/
evm_block_indexer.py
File metadata and controls
196 lines (164 loc) · 8.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
from typing import Any
import argparse
import json
import os
from datetime import datetime
import lz4.frame
import msgpack
def decompress_lz4(input_file, output_file):
with open(input_file, "rb") as f_in:
compressed_data = f_in.read()
decompressed_data = lz4.frame.decompress(compressed_data)
with open(output_file, "wb") as f_out:
f_out.write(decompressed_data)
class BytesEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
return "0x" + obj.hex()
return super().default(obj)
class EthBlockIndexer:
def __init__(self):
self.blocks = []
# convert a Buffer object to hex string
def _convert_buffer(self, buffer_obj: dict[str, Any]) -> str:
if isinstance(buffer_obj, dict) and buffer_obj.get("type") == "Buffer":
return "0x" + "".join(f"{x:02x}" for x in buffer_obj["data"])
return str(buffer_obj)
# recursively process nested Buffer objects
def _process_nested_buffers(self, data: Any) -> Any:
if isinstance(data, dict):
if data.get("type") == "Buffer":
return self._convert_buffer(data)
return {k: self._process_nested_buffers(v) for k, v in data.items()}
elif isinstance(data, list):
return [self._process_nested_buffers(item) for item in data]
elif isinstance(data, bytes):
return "0x" + data.hex()
return data
def _bytes_to_int(self, value: Any) -> int:
if isinstance(value, dict) and value.get("type") == "Buffer":
raw_bytes = bytes(value["data"])
return int.from_bytes(raw_bytes, byteorder="big")
elif isinstance(value, bytes):
return int.from_bytes(value, byteorder="big")
return 0
def _process_transaction(self, tx: dict[str, Any]) -> dict[str, Any]:
if not tx.get("transaction"):
return {}
tx_data = tx["transaction"]
tx_type = next(iter(tx_data.keys())) # Either 'Legacy' or 'Eip1559'
tx_content = tx_data[tx_type]
processed = {
"type": tx_type,
"chainId": self._bytes_to_int(tx_content.get("chainId", {"type": "Buffer", "data": []})),
"nonce": self._bytes_to_int(tx_content.get("nonce", {"type": "Buffer", "data": []})),
"gas": self._bytes_to_int(tx_content.get("gas", {"type": "Buffer", "data": []})),
"to": self._process_nested_buffers(tx_content.get("to")),
"value": self._bytes_to_int(tx_content.get("value", {"type": "Buffer", "data": []})),
"input": self._process_nested_buffers(tx_content.get("input")),
"signature": [self._process_nested_buffers(sig) for sig in tx.get("signature", [])],
}
if tx_type == "Legacy":
processed["gasPrice"] = self._bytes_to_int(tx_content.get("gasPrice", {"type": "Buffer", "data": []}))
elif tx_type == "Eip1559":
processed.update(
{
"maxFeePerGas": self._bytes_to_int(tx_content.get("maxFeePerGas", {"type": "Buffer", "data": []})),
"maxPriorityFeePerGas": self._bytes_to_int(
tx_content.get("maxPriorityFeePerGas", {"type": "Buffer", "data": []})
),
"accessList": self._process_nested_buffers(tx_content.get("accessList", [])),
}
)
return processed
def _process_block(self, block_data: dict[str, Any]) -> dict[str, Any]:
if not isinstance(block_data, dict) or "block" not in block_data:
raise ValueError("invalid block format")
reth_block = block_data["block"]["Reth115"]
header = reth_block.get("header", {}).get("header", {})
processed_block = {
"hash": self._process_nested_buffers(reth_block["header"].get("hash")),
"parentHash": self._process_nested_buffers(header.get("parentHash")),
"sha3Uncles": self._process_nested_buffers(header.get("sha3Uncles")),
"miner": self._process_nested_buffers(header.get("miner")),
"stateRoot": self._process_nested_buffers(header.get("stateRoot")),
"transactionsRoot": self._process_nested_buffers(header.get("transactionsRoot")),
"receiptsRoot": self._process_nested_buffers(header.get("receiptsRoot")),
"number": self._bytes_to_int(header.get("number", {"type": "Buffer", "data": []})),
"gasLimit": self._bytes_to_int(header.get("gasLimit", {"type": "Buffer", "data": []})),
"gasUsed": self._bytes_to_int(header.get("gasUsed", {"type": "Buffer", "data": []})),
"timestamp": self._bytes_to_int(header.get("timestamp", {"type": "Buffer", "data": []})),
"extraData": self._process_nested_buffers(header.get("extraData")),
"baseFeePerGas": self._bytes_to_int(header.get("baseFeePerGas", {"type": "Buffer", "data": []})),
"transactions": [
self._process_transaction(tx) for tx in reth_block.get("body", {}).get("transactions", [])
],
}
if processed_block["timestamp"]:
processed_block["datetime"] = datetime.fromtimestamp(processed_block["timestamp"]).isoformat()
else:
processed_block["datetime"] = None
return processed_block
def process_msgpack_file(self, filename: str) -> None:
with open(filename, "rb") as f:
data = msgpack.load(f)
if isinstance(data, list):
for block_data in data:
processed_block = self._process_block(block_data)
self.blocks.append(processed_block)
else:
processed_block = self._process_block(data)
self.blocks.append(processed_block)
def save_to_json(self, output_filename: str) -> None:
with open(output_filename, "w") as f:
json.dump(
{
"blocks": self.blocks,
"totalBlocks": len(self.blocks),
"totalTransactions": sum(len(block["transactions"]) for block in self.blocks),
},
f,
indent=2,
cls=BytesEncoder,
)
def summarize_blocks(self) -> dict[str, Any]:
if not self.blocks:
return {"error": "no blocks processed"}
total_gas_used = sum(block["gasUsed"] for block in self.blocks)
total_txs = sum(len(block["transactions"]) for block in self.blocks)
return {
"totalBlocks": len(self.blocks),
"totalTransactions": total_txs,
"averageGasUsed": total_gas_used / len(self.blocks) if self.blocks else 0,
"blockNumbers": [block["number"] for block in self.blocks],
"timeRange": {
"first": next((b["datetime"] for b in self.blocks if b["datetime"]), None),
"last": next((b["datetime"] for b in reversed(self.blocks) if b["datetime"]), None),
},
}
if __name__ == "__main__":
# Download ethereum block files from s3://hl-[testnet|mainnet]-evm-blocks
# and input them into the indexer
parser = argparse.ArgumentParser(description="index evm blocks")
parser.add_argument("--data-dir", type=str, required=True)
parser.add_argument("--start-height", type=int, required=True)
parser.add_argument("--end-height", type=int, required=True)
args = parser.parse_args()
data_dir = args.data_dir
start_height = args.start_height
end_height = args.end_height
mp_flns = []
for height in range(start_height, end_height + 1):
lz4_fln = f"{data_dir}/{height}.rmp.lz4"
if not os.path.exists(lz4_fln):
raise Exception(
f"block with height {height} not found - download missing block file(s) using 'aws s3 cp s3://hl-[testnet | mainnet]-evm-blocks/<block_object_path> --request-payer requester'"
)
mp_fln = f"{data_dir}/{height}.rmp"
decompress_lz4(lz4_fln, mp_fln)
mp_flns.append(mp_fln)
indexer = EthBlockIndexer()
for mp_fln in mp_flns:
indexer.process_msgpack_file(mp_fln)
print(indexer.summarize_blocks())
indexer.save_to_json(f"{data_dir}/processed_blocks.json")