From 0910f0f1418700c1dfebf4faba064ce98231e5da Mon Sep 17 00:00:00 2001 From: kodadasiofficial Date: Thu, 25 Jul 2024 16:34:25 +0300 Subject: [PATCH 1/4] implementation of saving graph in json files --- airdrop_analysis/data_handler/graph_builder.py | 16 ++++++++++------ .../models/table_models/graph_record.py | 11 +++++------ .../query_handlers/chain_query_controller.py | 5 +++++ .../query_handlers/pw_query_handler.py | 14 ++++++++++++-- airdrop_analysis/main.py | 2 +- airdrop_analysis/utils/custom_keys.py | 1 + airdrop_analysis/utils/path_provider.py | 14 +++++++++++++- 7 files changed, 47 insertions(+), 16 deletions(-) diff --git a/airdrop_analysis/data_handler/graph_builder.py b/airdrop_analysis/data_handler/graph_builder.py index dd96c28..047cfd2 100644 --- a/airdrop_analysis/data_handler/graph_builder.py +++ b/airdrop_analysis/data_handler/graph_builder.py @@ -236,7 +236,7 @@ def build_graph(self, params: GraphQueryParameters) -> Graph: def build_graph_from_distributor( self, - params: GraphQueryParameters, + params: GraphQueryParameters, ) -> Graph: hist_p = self.__get_transactions_query_params( params.center_addresses[0], @@ -249,18 +249,22 @@ def build_graph_from_distributor( g.add_node(center) for dex_address in self.__dex_addresses: g.delete_node(dex_address) - self.__save_graph(g,params) + result_dict = self.__get_result_dict(g,params) + self.__save_graph(result_dict,params) return g def build_graph_json(self, params : GraphQueryParameters): graph = self.build_graph_from_distributor(params) - return self.__save_graph(graph, params) + result_dict = self.__get_result_dict(graph,params) + return self.__dict_to_json(result_dict) - def __save_graph(self, graph : Graph , params : GraphQueryParameters ,): + def __save_graph(self, result_dict : dict, params: GraphQueryParameters): + return self.__controller.save_graph_record(params.user_id,result_dict) + + def __get_result_dict(self, graph: Graph , params: GraphQueryParameters): result_dict = graph.get_graph_dict() result_dict[ck.PARAMETERS] = self.__dict_to_json(params.to_dict()) - self.__controller.save_graph_record(params.user_id,result_dict) - return self.__dict_to_json(result_dict) + return result_dict def __dict_to_json(self,data): dict_string = json.dumps(data) diff --git a/airdrop_analysis/data_handler/models/table_models/graph_record.py b/airdrop_analysis/data_handler/models/table_models/graph_record.py index f70ebbc..10c790a 100644 --- a/airdrop_analysis/data_handler/models/table_models/graph_record.py +++ b/airdrop_analysis/data_handler/models/table_models/graph_record.py @@ -1,6 +1,5 @@ from peewee import CharField, DateTimeField, IntegerField, FloatField import json -from datetime import datetime from data_handler.models.table_models.base_model import BaseModel from utils.custom_keys import CustomKeys as ck @@ -8,18 +7,18 @@ class Graph_Record(BaseModel): user_id = CharField() time_stamp = DateTimeField() - graph = CharField() + graph_path = CharField() - def create_from(user_id : str , graph : dict): + def create_from(user_id : str , graph_path : str , time_stamp : str): return Graph_Record.create( user_id = user_id, - time_stamp = datetime.now().strftime(ck.DATETIME_FORMAT_FOR_QUERIED_TRANSFERS), - graph = json.dumps(graph) + time_stamp = time_stamp, + graph_path = graph_path ) def to_dict(data): return { "user_id": data.user_id, "time_stamp": str(data.time_stamp), - "graph": json.loads(data.graph) + "graph": data.graph_path } \ No newline at end of file diff --git a/airdrop_analysis/data_handler/query_handlers/chain_query_controller.py b/airdrop_analysis/data_handler/query_handlers/chain_query_controller.py index 5a24f65..cdde0ae 100644 --- a/airdrop_analysis/data_handler/query_handlers/chain_query_controller.py +++ b/airdrop_analysis/data_handler/query_handlers/chain_query_controller.py @@ -29,11 +29,16 @@ def get_graph_records(self,user_id : str): record_list = [] for graph in graph_records: record_dict = Graph_Record.to_dict(graph) + record_dict["graph"] = self.__read_graph_from_json(record_dict["graph"]) record_string = json.dumps(record_dict) record_json = json.loads(record_string) record_list.append(record_json) return record_list + def __read_graph_from_json(self,graph_path : str): + with open(graph_path,"r") as file: + return json.loads(file.read()) + def __query_wallet_token_transfers( self, params: TokenTransfersQueryParameters, diff --git a/airdrop_analysis/data_handler/query_handlers/pw_query_handler.py b/airdrop_analysis/data_handler/query_handlers/pw_query_handler.py index fe29b1b..26b390c 100644 --- a/airdrop_analysis/data_handler/query_handlers/pw_query_handler.py +++ b/airdrop_analysis/data_handler/query_handlers/pw_query_handler.py @@ -1,3 +1,5 @@ +from datetime import datetime +import json from peewee import DoesNotExist from data_handler.models.table_models.base_model import db @@ -6,9 +8,12 @@ from data_handler.models.table_models.graph_record import Graph_Record from data_handler.models.base_models.transaction_history \ import TransactionHistory +from utils.path_provider import PathProvider +from utils.custom_keys import CustomKeys as ck class PWQueryHandler(object): def __init__(self): + self.__path_provider = PathProvider() db.create_tables([Address_Record, Token_Transfer, Graph_Record], safe=True) def create_wallet_token_transfers(self, chain: str, transfers: list): @@ -52,8 +57,13 @@ def update_address_record(self, address, data: dict[str, any]): .where(Address_Record.address == address) address_record.execute() def create_graph_record(self, user_id : str, graph: dict): - return Graph_Record.create_from(user_id,graph) - + time_now = datetime.now().strftime(ck.DATETIME_FORMAT_FOR_QUERIED_TRANSFERS) + graph_path = self.__path_provider.get_graph_json_path(user_id,time_now) + with open(graph_path,"w") as file: + graph_string = json.dumps(graph) + file.write(graph_string) + return Graph_Record.create_from(user_id,graph_path,time_now) + def get_graph_records(self,user_id : str): return Graph_Record.select().where(Graph_Record.user_id==user_id) \ No newline at end of file diff --git a/airdrop_analysis/main.py b/airdrop_analysis/main.py index 000b5cc..09ad2b4 100644 --- a/airdrop_analysis/main.py +++ b/airdrop_analysis/main.py @@ -208,7 +208,7 @@ def json_distribution_graph( graph = AirdropAnalyzer().get_distribution_graph_json(param) return JSONResponse(content=graph, status_code=200) -@app.get("/graph_records/") +@app.get("/graph_records_for_fast_api/") def get_graph_records( user_id : str ) -> JSONResponse: diff --git a/airdrop_analysis/utils/custom_keys.py b/airdrop_analysis/utils/custom_keys.py index 0544d9f..70840f3 100644 --- a/airdrop_analysis/utils/custom_keys.py +++ b/airdrop_analysis/utils/custom_keys.py @@ -10,6 +10,7 @@ class CustomKeys: CLAIMERS_PATH = 'claimers_path' TABLES_FILE_PATH = 'tables_file_path' DEX_ADDRESSES_PATH = 'dex_addresses_path' + GRAPH_JSONS_FOLDER_PATH = 'graph_jsons_folder_path' # API keys MORALIS = 'moralis' diff --git a/airdrop_analysis/utils/path_provider.py b/airdrop_analysis/utils/path_provider.py index 9b2f3d0..61f4bf0 100644 --- a/airdrop_analysis/utils/path_provider.py +++ b/airdrop_analysis/utils/path_provider.py @@ -59,4 +59,16 @@ def get_claimer_lists_json_path(self) -> str: def get_claimer_lists(self) -> dict: with open(self.get_claimer_lists_json_path(), 'r') as file: - return json.loads(file.read()) \ No newline at end of file + return json.loads(file.read()) + + def get_graph_json_path(self,user_id : str,time_stamp : str): + folder_path = self[ck.GRAPH_JSONS_FOLDER_PATH].replace('/', self.__sep) + arranged_time_stamp = time_stamp.replace(":" , "-") + if not os.path.exists(f"{folder_path}{user_id}"): + os.makedirs(f"{folder_path}{user_id}") + return self.__sep.join( + [ + f"{folder_path}{user_id}", + arranged_time_stamp + ".json" + ] + ) \ No newline at end of file From 81257950ee58d84f959df9e12b0401f1dd485172 Mon Sep 17 00:00:00 2001 From: kodadasiofficial Date: Thu, 25 Jul 2024 17:27:33 +0300 Subject: [PATCH 2/4] hierarchy value added to graph json --- airdrop_analysis/airdrop_analyzer.py | 14 ++---------- .../data_handler/graph_builder.py | 22 +++++++++++++++++-- .../data_handler/models/graph_models/graph.py | 11 ++++++++-- .../query_handlers/chain_query_controller.py | 2 +- airdrop_analysis/utils/custom_keys.py | 1 + 5 files changed, 33 insertions(+), 17 deletions(-) diff --git a/airdrop_analysis/airdrop_analyzer.py b/airdrop_analysis/airdrop_analyzer.py index a9dd94d..4060fdd 100644 --- a/airdrop_analysis/airdrop_analyzer.py +++ b/airdrop_analysis/airdrop_analyzer.py @@ -64,25 +64,15 @@ def get_distribution_graph_json(self, param: GraphQueryParameters): def get_graph_records_from_user_id(self,user_id: str): return self.__controller.get_graph_records(user_id) - - def __get_communities_from_partition(self, partition: dict) -> dict: - communities: dict[str, list] = {} - for nodeID, communityID in partition.items(): - if communityID not in communities: - communities[communityID] = [] - communities[communityID].append(nodeID) - return communities def get_communities(self, param: GraphQueryParameters) -> dict: - graph = self.__builder.build_graph_from_distributor(param) - partition, _ = self.__nx_builder.get_louvain_partition(graph) - return self.__get_communities_from_partition(partition) + return self.__builder.get_communities(param) def get_graph_summary(self, param: GraphQueryParameters) -> dict: graph = self.__builder.build_graph_from_distributor(param) analysis = self.__analyzer.analyze(graph) if param.partition: partition, _ = self.__nx_builder.get_louvain_partition(graph) - communities = self.__get_communities_from_partition(partition) + communities = self.__builder.get_communities_from_partition(partition) analysis['communities'] = len(communities) return analysis \ No newline at end of file diff --git a/airdrop_analysis/data_handler/graph_builder.py b/airdrop_analysis/data_handler/graph_builder.py index 047cfd2..9a0cb18 100644 --- a/airdrop_analysis/data_handler/graph_builder.py +++ b/airdrop_analysis/data_handler/graph_builder.py @@ -2,6 +2,7 @@ from typing import List, Dict import pandas as pd +from data_handler.networkx_builder import NetworkXBuilder from data_handler.query_handlers.chain_query_controller import \ ChainQueryController from data_handler.models.base_models.transaction_history \ @@ -24,6 +25,7 @@ def __init__(self, api_keys_path: str, dex_addresses_path: str): self.__graph = Graph() self.__current_query_params = None self.__current_hirerarchy_stack = [] + self.__nx_builder = NetworkXBuilder() def __get_dex_addresses_from_csv(self,dex_addresses_path): dex_info = pd.read_csv(dex_addresses_path) @@ -262,11 +264,27 @@ def __save_graph(self, result_dict : dict, params: GraphQueryParameters): return self.__controller.save_graph_record(params.user_id,result_dict) def __get_result_dict(self, graph: Graph , params: GraphQueryParameters): - result_dict = graph.get_graph_dict() + communities = {} + if params.partition: + partition, _ = self.__nx_builder.get_louvain_partition(graph) + communities = self.get_communities_from_partition(partition) + result_dict = graph.get_graph_dict(communities) result_dict[ck.PARAMETERS] = self.__dict_to_json(params.to_dict()) return result_dict def __dict_to_json(self,data): dict_string = json.dumps(data) return json.loads(dict_string) - \ No newline at end of file + + def get_communities_from_partition(self, partition: dict) -> dict: + communities: dict[str, list] = {} + for nodeID, communityID in partition.items(): + if communityID not in communities: + communities[communityID] = [] + communities[communityID].append(nodeID) + return communities + + def get_communities(self, param: GraphQueryParameters) -> dict: + graph = self.build_graph_from_distributor(param) + partition, _ = self.__nx_builder.get_louvain_partition(graph) + return self.get_communities_from_partition(partition) \ No newline at end of file diff --git a/airdrop_analysis/data_handler/models/graph_models/graph.py b/airdrop_analysis/data_handler/models/graph_models/graph.py index a56ae7b..e6bbb42 100644 --- a/airdrop_analysis/data_handler/models/graph_models/graph.py +++ b/airdrop_analysis/data_handler/models/graph_models/graph.py @@ -1,6 +1,7 @@ from pydantic import BaseModel from typing import Dict, List, Union +from data_handler.models.base_models.query_parameters import GraphQueryParameters from data_handler.models.graph_models.node import Node from data_handler.models.graph_models.edge import Edge from utils.custom_keys import CustomKeys as ck @@ -108,12 +109,18 @@ def get_most_productive_parent(self, hirerarchy: int) -> Node: most_productive = node return most_productive - def get_graph_dict(self): + def get_graph_dict(self, communities : dict = {}): graph_dict = {} graph_dict[ck.NODES] = [] graph_dict[ck.LINKS] = [] for node in self.__nodes.values(): - node_dict = {"id" : node.id} + hierarchy = node.hierarchy + label = node.label + for key,community in communities.items(): + if label in community: + hierarchy = key + break + node_dict = {"id" : node.id , "hierarchy" : hierarchy} graph_dict[ck.NODES].append(node_dict) for edge in self.__edges.values(): edge_dict = {"source" : edge.source.id , "target" : edge.destination.id} diff --git a/airdrop_analysis/data_handler/query_handlers/chain_query_controller.py b/airdrop_analysis/data_handler/query_handlers/chain_query_controller.py index cdde0ae..3fdf491 100644 --- a/airdrop_analysis/data_handler/query_handlers/chain_query_controller.py +++ b/airdrop_analysis/data_handler/query_handlers/chain_query_controller.py @@ -29,7 +29,7 @@ def get_graph_records(self,user_id : str): record_list = [] for graph in graph_records: record_dict = Graph_Record.to_dict(graph) - record_dict["graph"] = self.__read_graph_from_json(record_dict["graph"]) + record_dict[ck.GRAPH] = self.__read_graph_from_json(record_dict[ck.GRAPH]) record_string = json.dumps(record_dict) record_json = json.loads(record_string) record_list.append(record_json) diff --git a/airdrop_analysis/utils/custom_keys.py b/airdrop_analysis/utils/custom_keys.py index 70840f3..8e74cf5 100644 --- a/airdrop_analysis/utils/custom_keys.py +++ b/airdrop_analysis/utils/custom_keys.py @@ -34,6 +34,7 @@ class CustomKeys: ASC = 'ASC' DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ' DATETIME_FORMAT_FOR_QUERIED_TRANSFERS = '%Y-%m-%dT%H:%M:%S.%fZ' + GRAPH = 'graph' # Wallet stats NFTS = 'nfts' From c2d1b0e967bdd9f3e7f43ddecb126c870b0e8c1b Mon Sep 17 00:00:00 2001 From: kodadasiofficial Date: Thu, 25 Jul 2024 23:01:52 +0300 Subject: [PATCH 3/4] average_times list implemented --- .../models/base_models/transaction_time.py | 5 +++++ .../query_handlers/moralis_query_handler.py | 11 ++++++++++- airdrop_analysis/run_tests.py | 6 +++--- .../tests/query_tests/chain_query_controller_test.py | 12 +++++++++--- 4 files changed, 27 insertions(+), 7 deletions(-) create mode 100644 airdrop_analysis/data_handler/models/base_models/transaction_time.py diff --git a/airdrop_analysis/data_handler/models/base_models/transaction_time.py b/airdrop_analysis/data_handler/models/base_models/transaction_time.py new file mode 100644 index 0000000..75bad81 --- /dev/null +++ b/airdrop_analysis/data_handler/models/base_models/transaction_time.py @@ -0,0 +1,5 @@ +class TransactionTime(): + average_time = [] + + def __init__(self , last_transaction_count): + self.last_transaction_count = last_transaction_count \ No newline at end of file diff --git a/airdrop_analysis/data_handler/query_handlers/moralis_query_handler.py b/airdrop_analysis/data_handler/query_handlers/moralis_query_handler.py index 6268823..8679b55 100644 --- a/airdrop_analysis/data_handler/query_handlers/moralis_query_handler.py +++ b/airdrop_analysis/data_handler/query_handlers/moralis_query_handler.py @@ -2,7 +2,9 @@ import moralis import os from requests_html import HTMLSession +from datetime import datetime +from data_handler.models.base_models.transaction_time import TransactionTime from data_handler.models.base_models.query_parameters \ import TransactionsQueryParameters , TokenTransfersQueryParameters from data_handler.models.base_models.query_parameters \ @@ -37,12 +39,14 @@ def __query_wallet_transactions( event: str, ): address = params.address + transaction_time = TransactionTime(0) transactions = [] try: + start_time = datetime.now() while params.cursor is not None: tnxs, cursor = self.__query_wallet_transactions_page( params, query, - ) + ) transactions.extend(tnxs) params.cursor = cursor cnt = len(transactions) @@ -54,6 +58,11 @@ def __query_wallet_transactions( else: s += ' Done.' + ' ' * 25 print(s, end='\r') + end_time = datetime.now() + difference = (end_time - start_time).total_seconds() + transaction_time.last_transaction_count = cnt + if cnt != 0: + TransactionTime.average_time.append(difference / cnt) except Exception as e: if 'Reason: Internal Server Error' in str(e): print(f'Internal Server Error querying {event}s for {address}') diff --git a/airdrop_analysis/run_tests.py b/airdrop_analysis/run_tests.py index 44e9c92..072c25f 100644 --- a/airdrop_analysis/run_tests.py +++ b/airdrop_analysis/run_tests.py @@ -7,10 +7,10 @@ def main(): - # test = ChainQueryControllerTest() - # test.run_tests() - test = GraphBuilderTest() + test = ChainQueryControllerTest() test.run_tests() + # test = GraphBuilderTest() + # test.run_tests() if __name__ == '__main__': main() \ No newline at end of file diff --git a/airdrop_analysis/tests/query_tests/chain_query_controller_test.py b/airdrop_analysis/tests/query_tests/chain_query_controller_test.py index b850edb..bb7d1c8 100644 --- a/airdrop_analysis/tests/query_tests/chain_query_controller_test.py +++ b/airdrop_analysis/tests/query_tests/chain_query_controller_test.py @@ -10,6 +10,7 @@ from utils.path_provider import PathProvider from utils.custom_keys import CustomKeys as ck from data_handler.models.base_models.query_parameters import * +from data_handler.models.base_models.transaction_time import TransactionTime class ChainQueryControllerTest(): @@ -54,20 +55,25 @@ def __test_query_wallet_token_transfers_with_contracts(self): def __test_query_wallet_token_transfers(self): contract_addresses=['0x4ed4e862860bed51a9570b96d89af5e1b0efefed'] - addresses = self.__claimers[ck.WALLET_ADDRESS].to_list()[:8] + addresses = self.__claimers[ck.WALLET_ADDRESS].to_list()[150:200] for address in addresses: params = TokenTransfersQueryParameters( address=address, chain='base', cached_first=True, - from_date='2022-01-01T00:00:00Z', + from_date='2024-01-01T00:00:00Z', to_date='2024-07-15T00:00:00Z', contract_addresses=contract_addresses, ) history, _ = self.__controller.get_wallet_token_transfer_history( params, ) - print(history.get_transaction_count()) + total_time = 0 + length = len(TransactionTime.average_time) + for i in TransactionTime.average_time: + total_time += i + print(total_time/length) + # print(history.get_transaction_count()) def __test_query_total_token_transfer_count(self): contract_addresses=['0x4ed4e862860bed51a9570b96d89af5e1b0efefed'] From e9ab0a2e86f1b52ca4cc29690ec32b60f886616b Mon Sep 17 00:00:00 2001 From: kodadasiofficial Date: Wed, 28 Aug 2024 18:42:57 +0300 Subject: [PATCH 4/4] odbc database implementation added for azure --- airdrop_analysis/airdrop_analyzer.py | 16 +- .../data_handler/graph_builder.py | 6 +- .../models/base_models/transaction_history.py | 2 +- .../data_handler/models/graph_models/edge.py | 3 +- .../data_handler/models/graph_models/graph.py | 16 +- .../models/odbc_models/address_record.py | 72 ++++++++ .../models/odbc_models/graph_record.py | 30 +++ .../models/odbc_models/token_transfer.py | 27 +++ .../models/table_models/graph_record.py | 11 +- .../query_handlers/chain_query_controller.py | 16 +- .../query_handlers/moralis_query_handler.py | 1 + .../query_handlers/odbc_query_handlers.py | 173 ++++++++++++++++++ airdrop_analysis/run_tests.py | 6 +- .../chain_query_controller_test.py | 3 +- .../tests/query_tests/graph_builder_test.py | 21 ++- airdrop_analysis/utils/custom_keys.py | 20 ++ airdrop_analysis/utils/path_provider.py | 8 +- 17 files changed, 393 insertions(+), 38 deletions(-) create mode 100644 airdrop_analysis/data_handler/models/odbc_models/address_record.py create mode 100644 airdrop_analysis/data_handler/models/odbc_models/graph_record.py create mode 100644 airdrop_analysis/data_handler/models/odbc_models/token_transfer.py create mode 100644 airdrop_analysis/data_handler/query_handlers/odbc_query_handlers.py diff --git a/airdrop_analysis/airdrop_analyzer.py b/airdrop_analysis/airdrop_analyzer.py index 4060fdd..71e2c80 100644 --- a/airdrop_analysis/airdrop_analyzer.py +++ b/airdrop_analysis/airdrop_analyzer.py @@ -64,15 +64,25 @@ def get_distribution_graph_json(self, param: GraphQueryParameters): def get_graph_records_from_user_id(self,user_id: str): return self.__controller.get_graph_records(user_id) - + + def __get_communities_from_partition(self, partition: dict) -> dict: + communities: dict[str, list] = {} + for nodeID, communityID in partition.items(): + if communityID not in communities: + communities[communityID] = [] + communities[communityID].append(nodeID) + return communities + def get_communities(self, param: GraphQueryParameters) -> dict: - return self.__builder.get_communities(param) + graph = self.__builder.build_graph_from_distributor(param) + partition, _ = self.__nx_builder.get_louvain_partition(graph) + return self.__get_communities_from_partition(partition) def get_graph_summary(self, param: GraphQueryParameters) -> dict: graph = self.__builder.build_graph_from_distributor(param) analysis = self.__analyzer.analyze(graph) if param.partition: partition, _ = self.__nx_builder.get_louvain_partition(graph) - communities = self.__builder.get_communities_from_partition(partition) + communities = self.__get_communities_from_partition(partition) analysis['communities'] = len(communities) return analysis \ No newline at end of file diff --git a/airdrop_analysis/data_handler/graph_builder.py b/airdrop_analysis/data_handler/graph_builder.py index 9a0cb18..5717405 100644 --- a/airdrop_analysis/data_handler/graph_builder.py +++ b/airdrop_analysis/data_handler/graph_builder.py @@ -9,7 +9,7 @@ import TransactionHistory from data_handler.models.base_models.query_parameters import \ TokenTransfersQueryParameters, GraphQueryParameters -from data_handler.models.table_models.token_transfer import Token_Transfer +from data_handler.models.odbc_models.token_transfer import Token_Transfer from data_handler.models.graph_models.node import Node from data_handler.models.graph_models.edge import Edge from data_handler.models.graph_models.graph import Graph @@ -44,7 +44,7 @@ def __get_transactions_query_params( from_date=params.from_date, to_date=params.to_date, order=params.edge_order, - limit=params.edge_limit if params.edge_limit > 0 else 300, + limit=params.edge_limit if params.edge_limit > 0 else 100, ) def __get_parent_addresses(self, sender_addresses: List[str]) -> List[str]: @@ -268,7 +268,7 @@ def __get_result_dict(self, graph: Graph , params: GraphQueryParameters): if params.partition: partition, _ = self.__nx_builder.get_louvain_partition(graph) communities = self.get_communities_from_partition(partition) - result_dict = graph.get_graph_dict(communities) + result_dict = graph.get_graph_dict() result_dict[ck.PARAMETERS] = self.__dict_to_json(params.to_dict()) return result_dict diff --git a/airdrop_analysis/data_handler/models/base_models/transaction_history.py b/airdrop_analysis/data_handler/models/base_models/transaction_history.py index 1f50c64..775aa42 100644 --- a/airdrop_analysis/data_handler/models/base_models/transaction_history.py +++ b/airdrop_analysis/data_handler/models/base_models/transaction_history.py @@ -1,7 +1,7 @@ from pydantic import BaseModel from typing import Optional, List -from data_handler.models.table_models.token_transfer import Token_Transfer +from data_handler.models.odbc_models.token_transfer import Token_Transfer from utils.custom_keys import CustomKeys as ck diff --git a/airdrop_analysis/data_handler/models/graph_models/edge.py b/airdrop_analysis/data_handler/models/graph_models/edge.py index 803bec5..41d31eb 100644 --- a/airdrop_analysis/data_handler/models/graph_models/edge.py +++ b/airdrop_analysis/data_handler/models/graph_models/edge.py @@ -1,5 +1,6 @@ from pydantic import BaseModel from typing import TYPE_CHECKING +from datetime import datetime if TYPE_CHECKING: from data_handler.models.graph_models.node import Node @@ -10,7 +11,7 @@ class Edge(BaseModel): destination: 'Node' edge_type: str edge_value: float - edge_timestamp: str + edge_timestamp: datetime def __init__( self, diff --git a/airdrop_analysis/data_handler/models/graph_models/graph.py b/airdrop_analysis/data_handler/models/graph_models/graph.py index e6bbb42..3dbf524 100644 --- a/airdrop_analysis/data_handler/models/graph_models/graph.py +++ b/airdrop_analysis/data_handler/models/graph_models/graph.py @@ -1,7 +1,6 @@ from pydantic import BaseModel from typing import Dict, List, Union -from data_handler.models.base_models.query_parameters import GraphQueryParameters from data_handler.models.graph_models.node import Node from data_handler.models.graph_models.edge import Edge from utils.custom_keys import CustomKeys as ck @@ -109,24 +108,23 @@ def get_most_productive_parent(self, hirerarchy: int) -> Node: most_productive = node return most_productive - def get_graph_dict(self, communities : dict = {}): + def get_graph_dict(self): graph_dict = {} graph_dict[ck.NODES] = [] graph_dict[ck.LINKS] = [] for node in self.__nodes.values(): - hierarchy = node.hierarchy - label = node.label - for key,community in communities.items(): - if label in community: - hierarchy = key - break - node_dict = {"id" : node.id , "hierarchy" : hierarchy} + node_dict = {"id" : node.id} graph_dict[ck.NODES].append(node_dict) for edge in self.__edges.values(): edge_dict = {"source" : edge.source.id , "target" : edge.destination.id} graph_dict[ck.LINKS].append(edge_dict) return graph_dict + def get_neighbors(self, node: Node) -> List[Node]: + inc_neighbors = [edge.source for edge in node.incoming_edges] + out_neighbors = [edge.destination for edge in node.outgoing_edges] + return inc_neighbors + out_neighbors + def __contains__(self, node: Union[Node, str]) -> bool: if isinstance(node, str): return node in self.__nodes diff --git a/airdrop_analysis/data_handler/models/odbc_models/address_record.py b/airdrop_analysis/data_handler/models/odbc_models/address_record.py new file mode 100644 index 0000000..dad47bd --- /dev/null +++ b/airdrop_analysis/data_handler/models/odbc_models/address_record.py @@ -0,0 +1,72 @@ +from utils.custom_keys import CustomKeys as ck +from data_handler.models.base_models.transaction_history import TransactionHistory + + +class Address_Record: + def __init__(self, address, from_date, to_date, balance, total_transaction_count, incoming_transaction_count, outgoing_transaction_count, first_incoming_transaction_date, first_incoming_transaction_source, last_outgoing_transaction_date, last_outgoing_transaction_destination, chain, contract_address, last_cursor): + self.address = address + self.from_date = from_date + self.to_date = to_date + self.balance = balance + self.total_transaction_count = total_transaction_count + self.incoming_transaction_count = incoming_transaction_count + self.outgoing_transaction_count = outgoing_transaction_count + self.first_incoming_transaction_date = first_incoming_transaction_date + self.first_incoming_transaction_source = first_incoming_transaction_source + self.last_outgoing_transaction_date = last_outgoing_transaction_date + self.last_outgoing_transaction_destination = last_outgoing_transaction_destination + self.chain = chain + self.contract_address = contract_address + self.last_cursor = last_cursor + + @staticmethod + def create_from_dict(data: dict): + return Address_Record( + address=data[ck.ADDRESS], + from_date=data[ck.FROM_DATE], + to_date=data[ck.TO_DATE], + balance=data[ck.BALANCE], + total_transaction_count=data[ck.TOTAL_TRANSACTION_COUNT], + incoming_transaction_count=data[ck.INCOMING_TRANSACTION_COUNT], + outgoing_transaction_count=data[ck.OUTGOING_TRANSACTION_COUNT], + first_incoming_transaction_date=data[ck.FIRST_INCOMING_TRANSACTION_DATE], + first_incoming_transaction_source=data[ck.FIRST_INCOMING_TRANSACTION_SOURCE], + last_outgoing_transaction_date=data[ck.LAST_OUTGOING_TRANSACTION_DATE], + last_outgoing_transaction_destination=data[ck.LAST_OUTGOING_TRANSACTION_DESTINATION], + chain=data[ck.CHAIN], + contract_address=data[ck.CONTRACT_ADDRESS], + last_cursor=data[ck.LAST_CURSOR], + ) + + @staticmethod + def create_from_history(history: TransactionHistory): + ins = history.get_received_transactions() + outs = history.get_sent_transactions() + first_date, last_date = '', '' + first_source, last_destination = '', '' + if ins: + first_date = ins[0].block_timestamp + first_source = ins[0].from_address + if outs: + last_date = outs[-1].block_timestamp + last_destination = outs[-1].to_address + balance = sum([t.value for t in ins]) - sum([t.value for t in outs]) + contract_address = '' + if history.contract_addresses: + contract_address = history.contract_addresses[0] + return Address_Record( + address=history.address, + from_date=history.from_date, + to_date=history.to_date, + balance=balance, + total_transaction_count=history.get_transaction_count(), + incoming_transaction_count=len(ins), + outgoing_transaction_count=len(outs), + first_incoming_transaction_date=first_date, + first_incoming_transaction_source=first_source, + last_outgoing_transaction_date=last_date, + last_outgoing_transaction_destination=last_destination, + chain=history.chain, + contract_address=contract_address, + last_cursor=history.last_cursor, + ) \ No newline at end of file diff --git a/airdrop_analysis/data_handler/models/odbc_models/graph_record.py b/airdrop_analysis/data_handler/models/odbc_models/graph_record.py new file mode 100644 index 0000000..902e882 --- /dev/null +++ b/airdrop_analysis/data_handler/models/odbc_models/graph_record.py @@ -0,0 +1,30 @@ +from utils.custom_keys import CustomKeys as ck + +class Graph_Record: + def __init__(self, user_id, time_stamp, graph_path): + self.user_id = user_id + self.time_stamp = time_stamp + self.graph_path = graph_path + + @staticmethod + def create_from_dict(data: dict): + return Graph_Record( + user_id=data[ck.USER_ID], + time_stamp=data[ck.TIME_STAMP], + graph_path=data[ck.GRAPH_PATH], + ) + + @staticmethod + def create_from(user_id : str , graph_path : str , time_stamp : str): + return Graph_Record( + user_id = user_id, + time_stamp = time_stamp, + graph_path = graph_path + ) + + def to_dict(self): + return { + ck.USER_ID: self.user_id, + ck.TIME_STAMP: str(self.time_stamp), + ck.GRAPH_PATH: self.graph_path + } \ No newline at end of file diff --git a/airdrop_analysis/data_handler/models/odbc_models/token_transfer.py b/airdrop_analysis/data_handler/models/odbc_models/token_transfer.py new file mode 100644 index 0000000..d6be1a5 --- /dev/null +++ b/airdrop_analysis/data_handler/models/odbc_models/token_transfer.py @@ -0,0 +1,27 @@ +from utils.custom_keys import CustomKeys as ck + +class Token_Transfer: + def __init__(self, from_address, to_address, value, block_timestamp, block_hash, transaction_hash, token_name, contract_address, chain): + self.from_address = from_address + self.to_address = to_address + self.value = value + self.block_timestamp = block_timestamp + self.block_hash = block_hash + self.transaction_hash = transaction_hash + self.token_name = token_name + self.contract_address = contract_address + self.chain = chain + + @staticmethod + def create_from_dict(chain: str, response: dict): + return Token_Transfer( + from_address=response[ck.FROM_ADDRESS], + to_address=response[ck.TO_ADDRESS], + value=float(response[ck.VALUE]) / 10**18, + block_timestamp=response[ck.BLOCK_TIMESTAMP], + block_hash=response[ck.BLOCK_HASH], + transaction_hash=response[ck.TRANSACTION_HASH], + token_name=response[ck.TOKEN_NAME], + contract_address=response[ck.ADDRESS], + chain=chain, + ) \ No newline at end of file diff --git a/airdrop_analysis/data_handler/models/table_models/graph_record.py b/airdrop_analysis/data_handler/models/table_models/graph_record.py index 10c790a..f70ebbc 100644 --- a/airdrop_analysis/data_handler/models/table_models/graph_record.py +++ b/airdrop_analysis/data_handler/models/table_models/graph_record.py @@ -1,5 +1,6 @@ from peewee import CharField, DateTimeField, IntegerField, FloatField import json +from datetime import datetime from data_handler.models.table_models.base_model import BaseModel from utils.custom_keys import CustomKeys as ck @@ -7,18 +8,18 @@ class Graph_Record(BaseModel): user_id = CharField() time_stamp = DateTimeField() - graph_path = CharField() + graph = CharField() - def create_from(user_id : str , graph_path : str , time_stamp : str): + def create_from(user_id : str , graph : dict): return Graph_Record.create( user_id = user_id, - time_stamp = time_stamp, - graph_path = graph_path + time_stamp = datetime.now().strftime(ck.DATETIME_FORMAT_FOR_QUERIED_TRANSFERS), + graph = json.dumps(graph) ) def to_dict(data): return { "user_id": data.user_id, "time_stamp": str(data.time_stamp), - "graph": data.graph_path + "graph": json.loads(data.graph) } \ No newline at end of file diff --git a/airdrop_analysis/data_handler/query_handlers/chain_query_controller.py b/airdrop_analysis/data_handler/query_handlers/chain_query_controller.py index 3fdf491..d2db27a 100644 --- a/airdrop_analysis/data_handler/query_handlers/chain_query_controller.py +++ b/airdrop_analysis/data_handler/query_handlers/chain_query_controller.py @@ -2,12 +2,12 @@ from datetime import datetime import json -from data_handler.models.table_models.graph_record import Graph_Record +from data_handler.models.odbc_models.graph_record import Graph_Record from data_handler.query_handlers.moralis_query_handler \ import MoralisQueryHandler -from data_handler.query_handlers.pw_query_handler import PWQueryHandler +from data_handler.query_handlers.odbc_query_handlers import ODBCQueryHandler from data_handler.models.base_models.query_parameters import * -from data_handler.models.table_models.address_record import Address_Record +from data_handler.models.odbc_models.address_record import Address_Record from data_handler.models.base_models.transaction_history \ import TransactionHistory from utils.custom_keys import CustomKeys as ck @@ -16,7 +16,7 @@ class ChainQueryController(): def __init__(self, api_keys_path: str): self.__moralis_handler = MoralisQueryHandler(api_keys_path) - self.__database_handler = PWQueryHandler() + self.__database_handler = ODBCQueryHandler() def get_address_record(self, address) -> Address_Record: return self.__database_handler.get_address_record(address) @@ -52,15 +52,11 @@ def __query_wallet_token_transfers( ) return transfers, cursor - def compare_dates(self,date1_str: str,date2_str: str): + def compare_dates(self,date1_str: str,date2: datetime): try: date1 = datetime.strptime(date1_str,ck.DATETIME_FORMAT) except ValueError: date1 = datetime.strptime(date1_str,ck.DATETIME_FORMAT_FOR_QUERIED_TRANSFERS) - try: - date2 = datetime.strptime(date2_str,ck.DATETIME_FORMAT) - except ValueError: - date2 = datetime.strptime(date2_str,ck.DATETIME_FORMAT_FOR_QUERIED_TRANSFERS) return (date1-date2).total_seconds() def query_not_overlapped_transfers( @@ -104,7 +100,7 @@ def get_wallet_token_transfer_history( get_token_transfers_by_address(params.address)) transfers.sort( key=lambda transfer: - datetime.strptime(transfer.block_timestamp,ck.DATETIME_FORMAT_FOR_QUERIED_TRANSFERS) + transfer.block_timestamp ) if params.order == ck.DESC: transfers.reverse() diff --git a/airdrop_analysis/data_handler/query_handlers/moralis_query_handler.py b/airdrop_analysis/data_handler/query_handlers/moralis_query_handler.py index 8679b55..b515857 100644 --- a/airdrop_analysis/data_handler/query_handlers/moralis_query_handler.py +++ b/airdrop_analysis/data_handler/query_handlers/moralis_query_handler.py @@ -65,6 +65,7 @@ def __query_wallet_transactions( TransactionTime.average_time.append(difference / cnt) except Exception as e: if 'Reason: Internal Server Error' in str(e): + print(e) print(f'Internal Server Error querying {event}s for {address}') else: print(e) diff --git a/airdrop_analysis/data_handler/query_handlers/odbc_query_handlers.py b/airdrop_analysis/data_handler/query_handlers/odbc_query_handlers.py new file mode 100644 index 0000000..5cfc6cc --- /dev/null +++ b/airdrop_analysis/data_handler/query_handlers/odbc_query_handlers.py @@ -0,0 +1,173 @@ +import pyodbc +from datetime import datetime +import json +import os + +from utils.path_provider import PathProvider +from data_handler.models.odbc_models.address_record import Address_Record +from data_handler.models.odbc_models.token_transfer import Token_Transfer +from data_handler.models.odbc_models.graph_record import Graph_Record +from data_handler.models.base_models.transaction_history import TransactionHistory +from utils.custom_keys import CustomKeys as ck + +class ODBCQueryHandler: + def __init__(self): + self.connection_string = os.getenv(ck.ODBC_CONNECTION_STRING) + self.__path_provider = PathProvider() + + def get_connection(self): + return pyodbc.connect(self.connection_string) + + def create_wallet_token_transfers(self, chain: str, transfers: list): + transfer_models = [ + Token_Transfer.create_from_dict(chain, t) for t in transfers + ] + self.insert_token_transfers(transfer_models) + return transfer_models + + def insert_token_transfers(self, transfers): + conn = self.get_connection() + cursor = conn.cursor() + for transfer in transfers: + cursor.execute(""" + INSERT INTO token_transfer (from_address, to_address, value, + block_timestamp, block_hash, transaction_hash, token_name, + contract_address, chain) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + transfer.from_address, transfer.to_address, transfer.value, transfer.block_timestamp, + transfer.block_hash, transfer.transaction_hash, transfer.token_name, + transfer.contract_address, transfer.chain + )) + conn.commit() + cursor.close() + conn.close() + + def get_token_transfers_by_address(self, address: str): + conn = self.get_connection() + cursor = conn.cursor() + cursor.execute(""" + SELECT * FROM token_transfer WHERE from_address = ? OR to_address = ? + """, (address, address)) + rows = cursor.fetchall() + cursor.close() + conn.close() + return [Token_Transfer(*(row[1:])) for row in rows] + + def get_wallet_token_transfers_from_address(self, address: str): + conn = self.get_connection() + cursor = conn.cursor() + cursor.execute(""" + SELECT * FROM token_transfer WHERE from_address = ? + """, (address,)) + rows = cursor.fetchall() + cursor.close() + conn.close() + return [Token_Transfer(*(row[1:])) for row in rows] + + def get_wallet_token_transfers_to_address(self, address: str): + conn = self.get_connection() + cursor = conn.cursor() + cursor.execute(""" + SELECT * FROM token_transfer WHERE to_address = ? + """, (address,)) + rows = cursor.fetchall() + cursor.close() + conn.close() + return [Token_Transfer(*(row[1:])) for row in rows] + + def create_wallet_record(self, history: TransactionHistory): + address = self.get_address_record(history.address) + if address is not None: + return address + address_record = Address_Record.create_from_history(history) + self.insert_address_record(address_record) + return address_record + + + def insert_address_record(self, address_record: Address_Record): + conn = self.get_connection() + cursor = conn.cursor() + cursor.execute(""" + INSERT INTO address_record (address, from_date, + to_date, balance, total_transaction_count, + incoming_transaction_count, + outgoing_transaction_count, + first_incoming_transaction_date, + first_incoming_transaction_source, + last_outgoing_transaction_date, + last_outgoing_transaction_destination, + chain, contract_address, last_cursor) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + address_record.address, address_record.from_date, + address_record.to_date, address_record.balance, + address_record.total_transaction_count, + address_record.incoming_transaction_count, + address_record.outgoing_transaction_count, + address_record.first_incoming_transaction_date, + address_record.first_incoming_transaction_source, + address_record.last_outgoing_transaction_date, + address_record.last_outgoing_transaction_destination, + address_record.chain, address_record.contract_address, + address_record.last_cursor + )) + conn.commit() + cursor.close() + conn.close() + + def get_address_record(self, address: str): + conn = self.get_connection() + cursor = conn.cursor() + cursor.execute(f""" + SELECT * FROM address_record WHERE address = ? + """, (address,)) + row = cursor.fetchone() + cursor.close() + conn.close() + return Address_Record(*(row[1:])) if row else None + + def update_address_record(self, address, data: dict[str, any]): + conn = self.get_connection() + cursor = conn.cursor() + for key,value in data.items(): + cursor.execute(""" + UPDATE address_record SET ? = ? WHERE address = ? + """, (key, value, address)) + conn.commit() + cursor.close() + conn.close() + + def create_graph_record(self, user_id : str, graph: dict): + time_now = datetime.now() + graph_path = self.__path_provider.get_graph_json_path(user_id,time_now) + with open(graph_path,"w") as file: + graph_string = json.dumps(graph) + file.write(graph_string) + graph_record = Graph_Record.create_from(user_id,graph_path,time_now) + self.insert_graph_record(graph_record) + return graph_record + + def insert_graph_record(self, graph_record : Graph_Record): + conn = self.get_connection() + cursor = conn.cursor() + cursor.execute(""" + INSERT INTO cached_graph_record (user_id, time_stamp, graph_path) + VALUES (?, ?, ?) + """, ( + graph_record.user_id, graph_record.time_stamp, graph_record.graph_path + )) + conn.commit() + cursor.close() + conn.close() + + def get_graph_records(self, node_id: str): + conn = self.get_connection() + cursor = conn.cursor() + cursor.execute(""" + SELECT * FROM cached_graph_record WHERE user_id = ? + """, (node_id,)) + rows = cursor.fetchall() + cursor.close() + conn.close() + return [Graph_Record(*(row[1:])) for row in rows] \ No newline at end of file diff --git a/airdrop_analysis/run_tests.py b/airdrop_analysis/run_tests.py index 072c25f..44e9c92 100644 --- a/airdrop_analysis/run_tests.py +++ b/airdrop_analysis/run_tests.py @@ -7,10 +7,10 @@ def main(): - test = ChainQueryControllerTest() - test.run_tests() - # test = GraphBuilderTest() + # test = ChainQueryControllerTest() # test.run_tests() + test = GraphBuilderTest() + test.run_tests() if __name__ == '__main__': main() \ No newline at end of file diff --git a/airdrop_analysis/tests/query_tests/chain_query_controller_test.py b/airdrop_analysis/tests/query_tests/chain_query_controller_test.py index bb7d1c8..25daf81 100644 --- a/airdrop_analysis/tests/query_tests/chain_query_controller_test.py +++ b/airdrop_analysis/tests/query_tests/chain_query_controller_test.py @@ -72,7 +72,8 @@ def __test_query_wallet_token_transfers(self): length = len(TransactionTime.average_time) for i in TransactionTime.average_time: total_time += i - print(total_time/length) + if length > 0: + print(total_time/length) # print(history.get_transaction_count()) def __test_query_total_token_transfer_count(self): diff --git a/airdrop_analysis/tests/query_tests/graph_builder_test.py b/airdrop_analysis/tests/query_tests/graph_builder_test.py index ad27736..f4c6af0 100644 --- a/airdrop_analysis/tests/query_tests/graph_builder_test.py +++ b/airdrop_analysis/tests/query_tests/graph_builder_test.py @@ -12,6 +12,7 @@ GraphQueryParameters from data_handler.models.graph_models.graph import Graph from utils.path_provider import PathProvider +from airdrop_analyzer import AirdropAnalyzer class GraphBuilderTest(): @@ -23,6 +24,7 @@ def __init__(self): self.__path_provider.get_dex_addresses_path() ) self.__nx_builder = NetworkXBuilder() + self.__analyzer = AirdropAnalyzer() self.__claimers = pd.read_csv( self.__path_provider[ck.CLAIMERS_PATH], ) @@ -79,9 +81,26 @@ def __test_visualizing_distribution_graph(self): ) g = self.__builder.build_graph_from_distributor(param) return self.__show_graph(g, with_partition=param.partition) + + def __test_getting_communities(self): + param = GraphQueryParameters( + center_addresses=['0xa2a5c549a454a1631ff226e0cf8dc4af03a61a75'], + chain='base', + contract_addresses=['0x4ed4e862860bed51a9570b96d89af5e1b0efefed'], + from_date='2023-12-01T00:00:00Z', + to_date='2024-06-01T00:00:00Z', + parent_depth=1, + child_depth=1, + edge_limit=-1, + edge_order=ck.DESC, + partition=True, + ) + communities = self.__analyzer.get_communities(param) + return communities def run_tests(self): # self.__test_building_graph_with_limit_one() - self.__test_visualizing_distribution_graph() + # self.__test_visualizing_distribution_graph() + self.__test_getting_communities() return diff --git a/airdrop_analysis/utils/custom_keys.py b/airdrop_analysis/utils/custom_keys.py index 8e74cf5..4f49dc0 100644 --- a/airdrop_analysis/utils/custom_keys.py +++ b/airdrop_analysis/utils/custom_keys.py @@ -85,6 +85,26 @@ class CustomKeys: VERIFIED_CONTRACT = 'verified_contract' ASC = 'ASC' DESC = 'DESC' + + # Graph Record + USER_ID = 'user_id' + TIME_STAMP = 'time_stamp' + GRAPH_PATH = 'graph_path' + + # Address Record + BALANCE = 'balance' + TOTAL_TRANSACTION_COUNT = 'total_transaction_count' + INCOMING_TRANSACTION_COUNT = 'incoming_transaction_count' + OUTGOING_TRANSACTION_COUNT = 'outgoing_transaction_count' + FIRST_INCOMING_TRANSACTION_DATE = 'first_incoming_transaction_date' + FIRST_INCOMING_TRANSACTION_SOURCE = 'first_incoming_transaction_source' + LAST_OUTGOING_TRANSACTION_DATE = 'last_outgoing_transaction_date' + LAST_OUTGOING_TRANSACTION_DESTINATION = 'last_outgoing_transaction_destination' + CONTRACT_ADDRESS = 'contract_address' + LAST_CURSOR = 'last_cursor' + + # Azure Database + ODBC_CONNECTION_STRING = 'odbc_connection_string' # DynamoDB ITEM = 'Item' diff --git a/airdrop_analysis/utils/path_provider.py b/airdrop_analysis/utils/path_provider.py index 61f4bf0..94b5773 100644 --- a/airdrop_analysis/utils/path_provider.py +++ b/airdrop_analysis/utils/path_provider.py @@ -1,5 +1,6 @@ import json import os +from datetime import datetime from utils.custom_keys import CustomKeys as ck @@ -21,6 +22,7 @@ def __read_paths(self, paths_json_path: str): paths_json_path = self.__sep.join( [self.__path_prefix, paths_json_path], ) + print(self.__path_prefix) with open(paths_json_path, 'r') as file: self.__paths = json.loads(file.read()) return self.__paths @@ -61,8 +63,12 @@ def get_claimer_lists(self) -> dict: with open(self.get_claimer_lists_json_path(), 'r') as file: return json.loads(file.read()) - def get_graph_json_path(self,user_id : str,time_stamp : str): + def get_graph_json_path(self,user_id : str,time_stamp_date : datetime): folder_path = self[ck.GRAPH_JSONS_FOLDER_PATH].replace('/', self.__sep) + try: + time_stamp = time_stamp_date.strftime(ck.DATETIME_FORMAT) + except Exception: + time_stamp = time_stamp_date.strftime(ck.DATETIME_FORMAT_FOR_QUERIED_TRANSFERS) arranged_time_stamp = time_stamp.replace(":" , "-") if not os.path.exists(f"{folder_path}{user_id}"): os.makedirs(f"{folder_path}{user_id}")