diff --git a/b3.py b/b3.py index 20e7f77..03367db 100644 --- a/b3.py +++ b/b3.py @@ -2,6 +2,12 @@ from string import ascii_letters import sys from random import Random +# from kafka import KafkaProducer +from datagen import Datagen +import json +from datetime import datetime +import time +# import itertools """ B3: Standing Query Stress Test @@ -27,13 +33,24 @@ kafka_topic = "hosts-proto" kafka_reset = "earliest" -group_id = f"b3-{Random().randint(1, 10*1000)}" +group_id = f"b3-{int(time.time() * 1000)}" + +# namespace_name = "default" host_ingest_query = ( - """WITH locIdFrom($props.customer_id, 'host', $props.customer_id, $props.entity_id) AS hId """ + + """WITH idFrom($props.customer_id, 'host', $props.customer_id, $props.entity_id) AS hId """ + """MATCH (n) WHERE id(n) = hId """ + """SET n = $props, n:host """ ) +# num_ingest = { +# "type": "NumberIteratorIngest", +# "format": { +# "type": "CypherLine", +# "query": "MATCH (n) WHERE id(n) = id(gen.node.from(toInteger($that))) SET n:Number, n.i=toInteger($that)", +# }, +# "maximumPerSecond": 1, +# "ingestLimit": 1000, +# } ingest_streams = { "hosts": { "name": "hosts", @@ -45,7 +62,7 @@ "group_id": group_id }, } -wait_between_sqs_sec = 30 +wait_between_sqs_sec = 10 r = Random(datagen_seed) @@ -54,7 +71,7 @@ def query(pattern: str): return { "match": f"MATCH (n) WHERE n.hostname =~ '^{pattern}.*' RETURN DISTINCT id(n)", "action": (f"""MATCH (n) WHERE id(n) = $sqMatch.data.id """ + - f"""MATCH (m) WHERE id(m) = locIdFrom(n.customer_id, n.customer_id, '{pattern}') """ + + f"""MATCH (m) WHERE id(m) = idFrom(n.customer_id, n.customer_id, '{pattern}') """ + f"""CREATE (n)-[:{pattern}]->(m) """ + f"""SET m.name = "{pattern} BAZ", m:bar""") } @@ -62,21 +79,176 @@ def query(pattern: str): def register_query_for_pattern(pattern: str) -> bool: return register_standing_queries({ - f"{pattern}": query(pattern) + f"{pattern}": query(pattern), }) +def nextJson(dgen) -> bytes: + return json.dumps(dgen.next()).encode('utf-8') + + +WAIT_TIME_AFTER_REMOVING_QUERIES = 5 * 60 +CONTROL_RUN_TIME = 10 * 60 +WAIT_TIME_AFTER_ADDING_QUERY = 60 +SINGLE_QUERY_RUNTIME = 10 * 60 +MULT_QUERY_NUM = 300 +WAIT_TIME_AFTER_ADDING_INGEST = 5 * 60 +MULTI_QUERY_WAIT_TIME_AFTER = 3 * 60 +QUERY_THEN_INGEST_WAIT = 5 * 60 +QUERY_THEN_INGEST_RUNTIME = 10 * 60 + +STRESS_TEST_CONSECUTIVE_FAILURES = 3 +STRESS_TEST_WAIT_TIME = .5 + +def nowStr(): + return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") +def printNow(s): + print(nowStr(), s) + +def mkPattern(): + return ''.join(r.choice(ascii_letters) for _ in range(12)) + +def resetTests(): + printNow("Resetting tests") + deleteAllIngest() + removeAllStandingQueries() + + sleep(WAIT_TIME_AFTER_REMOVING_QUERIES) + # printNow("Tests successfully reset") + + # delete_namespace(namespace_name) + # create_namespace(namespace_name) + # printNow("Namespace cleared") + +def runControl(): + printNow("starting control test") + startIngests(ingest_streams) + sleep(CONTROL_RUN_TIME) + printNow("control test finished") + +def runNQueries(n, waitTime): + for i in range(n): + start = time.time() + queryAccepted = register_query_for_pattern(mkPattern()) + printNow("Adding query took " + str(time.time() - start) + " seconds") + if not queryAccepted: + printNow("query was not accepted") + sleep(waitTime) + +def runSingleQueryTest(): + printNow("Starting single query test") + startIngests(ingest_streams) + sleep(WAIT_TIME_AFTER_ADDING_INGEST) + runNQueries(1, 0) + sleep(SINGLE_QUERY_RUNTIME) + printNow("Single query test done") + +def runMultiQueryTest(): + printNow("Starting multi query test") + startIngests(ingest_streams) + sleep(WAIT_TIME_AFTER_ADDING_INGEST) + runNQueries(MULT_QUERY_NUM, WAIT_TIME_AFTER_ADDING_QUERY) + sleep(MULTI_QUERY_WAIT_TIME_AFTER) + printNow("Multi query test done") + +def runQueriesThenIngest(): + printNow("Starting query for adding ingest then adding query") + runNQueries(MULT_QUERY_NUM, 1) + sleep(QUERY_THEN_INGEST_WAIT) + printNow("Queries added. Starting Ingest") + startIngests(ingest_streams) + sleep(QUERY_THEN_INGEST_RUNTIME) + printNow("Query then ingest test done") + + def run_b3(): - checkConfig() + performanceTests() + +def runStressTest(withIngest): + resetTests() + print("Running stress test", "with ingest" if withIngest else "without ingest") + nFailures = 0 + nSuccees = 0 + if withIngest: + startIngests(ingest_streams) + sleep(10) + while True: + start = time.time() + queryAccepted = register_query_for_pattern(mkPattern()) + printNow("Adding query took " + str(time.time() - start) + " seconds") + if not queryAccepted: + printNow("query was not accepted") + nFailures += 1 + else: + nFailures = 0 + nSuccees += 1 + sleep(STRESS_TEST_WAIT_TIME) + if(nFailures > STRESS_TEST_CONSECUTIVE_FAILURES): + printNow("Exceeded maximum consecutive failures at " + str(nSuccees) + " exiting") + break + + + + +def performanceTests(): + resetTests() + runControl() + + resetTests() + runSingleQueryTest() + + resetTests() + runControl() + + resetTests() + runMultiQueryTest() + + resetTests() + runControl() + + resetTests() + runQueriesThenIngest() + + resetTests() + runControl() + + resetTests() + runControl() + + resetTests() + runStressTest(True) + + resetTests() + runControl() + + resetTests() + runStressTest(False) + + resetTests() + runControl() + """ + #checkConfig() + removeAllStandingQueries() + deleteAllIngest() + + # sleep(10) startIngests(ingest_streams) print("Waiting 1 minute to get a baseline 1-minute ingest rate") sleep(60) - queryAccepted = True - while queryAccepted: + total = 0 + for i in range(1): pattern = ''.join(r.choice(ascii_letters) for _ in range(12)) queryAccepted = register_query_for_pattern(pattern) - sleep(wait_between_sqs_sec) - print("Server rejected the most recent Standing Query") + if not queryAccepted: + total = i-1 + break + # sleep(wait_between_sqs_sec) + print("Server rejected the most recent Standing Query", total) + # dgen = Datagen(datagen_seed, message_count) + + # queryAccepted = True + # while queryAccepted: + """ if __name__ == "__main__": if len(sys.argv) > 1 and (sys.argv[1] == "run" or sys.argv[1] == "b3"): diff --git a/cassandra_make_tables_commands.txt b/cassandra_make_tables_commands.txt new file mode 100644 index 0000000..9e7f4d3 --- /dev/null +++ b/cassandra_make_tables_commands.txt @@ -0,0 +1,82 @@ +CREATE KEYSPACE quine WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = false; +CREATE TABLE quine.journals ( + quine_id blob, + timestamp bigint, + data blob, + PRIMARY KEY (quine_id, timestamp) +) WITH CLUSTERING ORDER BY (timestamp ASC) + AND compaction = {'class': 'TimeWindowCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4', 'compaction_window_unit': 'MINUTES', 'compaction_window_size': '1'} + AND default_time_to_live = 30 + AND gc_grace_seconds = 120; +CREATE TABLE quine.meta_data ( + key text PRIMARY KEY, + value blob +); +CREATE TABLE quine.snapshots ( + quine_id blob, + timestamp bigint, + multipart_index int, + data blob, + multipart_count int, + PRIMARY KEY (quine_id, timestamp, multipart_index) +) WITH CLUSTERING ORDER BY (timestamp DESC, multipart_index ASC) + AND compaction ={'class': 'TimeWindowCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4', 'compaction_window_unit': 'MINUTES', 'compaction_window_size': '1'} + AND default_time_to_live = 30 + AND gc_grace_seconds = 120; +CREATE TABLE quine.standing_queries ( + query_id uuid PRIMARY KEY, + queries blob +); +CREATE TABLE quine.standing_query_states ( + quine_id blob, + standing_query_id uuid, + standing_query_part_id uuid, + data blob, + PRIMARY KEY (quine_id, standing_query_id, standing_query_part_id) +) WITH CLUSTERING ORDER BY (standing_query_id ASC, standing_query_part_id ASC) + AND default_time_to_live = 30; +CREATE TABLE quine.domain_graph_nodes ( + dgn_id bigint PRIMARY KEY, + data blob +) WITH additional_write_policy = '99p' + AND bloom_filter_fp_chance = 0.01 + AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'} + AND cdc = false + AND comment = '' + AND compaction = {'class': 'TimeWindowCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4', 'compaction_window_unit': 'MINUTES', 'compaction_window_size': '1'} + AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'} + AND memtable = 'default' + AND crc_check_chance = 1.0 + AND default_time_to_live = 30 + AND extensions = {} + AND gc_grace_seconds = 864000 + AND max_index_interval = 2048 + AND memtable_flush_period_in_ms = 0 + AND min_index_interval = 128 + AND read_repair = 'BLOCKING' + AND speculative_retry = '99p'; + +CREATE TABLE quine.domain_index_events ( + quine_id blob, + timestamp bigint, + data blob, + dgn_id bigint, + PRIMARY KEY (quine_id, timestamp) +) WITH CLUSTERING ORDER BY (timestamp ASC) + AND additional_write_policy = '99p' + AND bloom_filter_fp_chance = 0.01 + AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'} + AND cdc = false + AND comment = '' + AND compaction = {'class': 'TimeWindowCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4', 'compaction_window_unit': 'MINUTES', 'compaction_window_size': '1'} + AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'} + AND memtable = 'default' + AND crc_check_chance = 1.0 + AND default_time_to_live = 30 + AND extensions = {} + AND gc_grace_seconds = 864000 + AND max_index_interval = 2048 + AND memtable_flush_period_in_ms = 0 + AND min_index_interval = 128 + AND read_repair = 'BLOCKING' + AND speculative_retry = '99p'; \ No newline at end of file diff --git a/produce_messages.py b/produce_messages.py index a7dfe1c..ec97044 100644 --- a/produce_messages.py +++ b/produce_messages.py @@ -5,6 +5,7 @@ from google.protobuf import json_format from model.host_pb2 import Host from utils.prodenv import kafka_servers, datagen_seed, message_count +from kafka.admin import KafkaAdminClient, ConfigResource, NewTopic """ Produces Host messages (JSON or protobuf) to the specified kafka topic at the @@ -31,13 +32,20 @@ def nextProto() -> bytes: datum = dgen.next() msg = Host() json_format.ParseDict(datum, msg) - return msg.SerializeToString() + return (msg.SerializeToString(), datum["customer_id"]) + +admin_client = KafkaAdminClient() +# topic_list = [] +# topic_list.append(ConfigResource('TOPIC', topic, {"retention.ms":"1000", "retention.bytes":"10000000","segment.bytes":"1000000"})) +# admin_client.alter_configs(topic_list) + +admin_client.create_topics([NewTopic(topic, 8, 1, topic_configs={'retention.ms': '-1'})]) for i in range(message_count): - payload = nextProto() if proto else nextJson() - f = producer.send(topic, value=payload) + (payload, partitionKey) = nextProto() if proto else nextJson() + f = producer.send(topic, key=partitionKey.encode('utf-8'), value=payload) if i % 10000 == 0: - print(f"{i}/{message_count}") + print(f"{i}/{message_count}", flush=True) # print(f.get()) # print(payload) diff --git a/quine.conf b/quine.conf new file mode 100644 index 0000000..fd52b11 --- /dev/null +++ b/quine.conf @@ -0,0 +1,72 @@ +quine.cluster { + target-size = 2 + cluster-join = { + type = static-seed-addresses + seed-addresses = [ + {address = "172.31.24.210", port = 25520}, + {address = "172.31.17.64", port = 25520}, + {address = "172.31.22.20", port = 25520} + ] + } +} + +quine.store { + # store data in an Apache Cassandra instance + type = cassandra + help-make-quine-better = false + + # "host:port" strings at which Cassandra nodes can be accessed from + # the application + endpoints = [ + "172.31.26.114:9042" + ] + + # the keyspace to use + keyspace = quine + + # whether the application should create the keyspace if it does not + # yet exist + should-create-keyspace = false + + # whether the application should create tables in the keyspace if + # they do not yet exist + should-create-tables = false + + # how many copies of each datum the Cassandra cluster should retain + replication-factor = 1 + + # how many hosts must agree on a datum for Quine to consider that + # datum written/read + write-consistency = LOCAL_QUORUM + read-consistency = LOCAL_QUORUM + + # passed through to Cassandra + local-datacenter = "datacenter1" + + # how long to wait before considering a write operation failed + write-timeout = "10s" + + # how long to wait before considering a read operation failed + read-timeout = "10s" + + # if set, the number of nodes for which to optimize node creation + # latency + # bloom-filter-size = + +} +persistence { + journal-enabled=false + snapshot-singleton=true +} + +quine.api-2-enabled = true +quine.metrics-reporters = [ + { + type = influxdb + period = 1 + database = metrics + scheme = http + host = "172.31.26.114" + port = 8086 + } +] diff --git a/stress_testing.py b/stress_testing.py new file mode 100644 index 0000000..ed9e61d --- /dev/null +++ b/stress_testing.py @@ -0,0 +1,314 @@ +from utils.prodenv import * +from string import ascii_letters +import sys +from random import Random +# from kafka import KafkaProducer +from datagen import Datagen +import json +from datetime import datetime +import time +# import itertools + +""" +To set up the environment: + +#install +kafka +influxdb (v1) +java +docker +pipenv +script + +To run the tests Each in their own terminal (tmux recommended) + +#kafka (python host) +docker-compose up + + +#cassandra (python host) +docker run -it --rm -p9042:9042 --name cassandra cassandra + + +#before every run +dcqlsh + drop keyspace quine; + +#quine for profiler +cd quine +java -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9000 \ + -Dcom.sun.management.jmxremote.rmi.port=9000 -Dcom.sun.management.jmxremote.ssl=false \ + -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=localhost \ + -Dquine.help-make-quine-better=false -Dconfig.file=[the quine.conf path here] \ + -Dquine.id.type=uuid \ + -Dquine.id.partitioned=true \ + -jar quine-enterprise-assembly-1.6.4-66-g4ffd0df06.jar + +@quine without profiler +java -Dquine.help-make-quine-better=false \ + -Dconfig.file=quine.conf -Xmx4012m -Xms4012m \ + -Dquine.id.type=uuid \ + -Dquine.id.partitioned=true \ + -jar quine-enterprise-assembly-1.6.4-66-g4ffd0df06.jar + + +#produce messages (python host) +cd [cluster-testing-directory] +pipenv shell +python produce_messages.py hosts-proto proto + +#python tests +cd quine/cluster-test-scripts/ +pipenv shell +script +python stress_testing.py run +""" + + +kafka_topic = "hosts-proto" +kafka_reset = "earliest" +group_id = f"b3-{int(time.time() * 1000)}" + +# namespace_name = "default" + +host_ingest_query = ( + """WITH locIdFrom(kafkaHash($props.customer_id), 'host', $props.customer_id, $props.entity_id) AS hId """ + + """MATCH (n) WHERE id(n) = hId """ + + """SET n = $props, n:host """ +) +# num_ingest = { +# "type": "NumberIteratorIngest", +# "format": { +# "type": "CypherLine", +# "query": "MATCH (n) WHERE id(n) = id(gen.node.from(toInteger($that))) SET n:Number, n.i=toInteger($that)", +# }, +# "maximumPerSecond": 1, +# "ingestLimit": 1000, +# } +ingest_streams = { + "hosts": { + "name": "hosts", + "topic": kafka_topic, + "query": host_ingest_query, + "type": "Host", + "kafka_reset": kafka_reset, + "format": "PROTO", + "group_id": group_id + }, +} +wait_between_sqs_sec = 10 + +r = Random(datagen_seed) + + +def query(pattern: str): + return { + "match": f"MATCH (n) WHERE n.hostname =~ '^{pattern}.*' RETURN DISTINCT id(n)", + "action": (f"""MATCH (n) WHERE id(n) = $sqMatch.data.id """ + + f"""MATCH (m) WHERE id(m) = idFrom(n.customer_id, n.customer_id, '{pattern}') """ + + f"""CREATE (n)-[:{pattern}]->(m) """ + + f"""SET m.name = "{pattern} BAZ", m:bar""") + } + + +def register_query_for_pattern(pattern: str) -> bool: + return register_standing_queries({ + f"{pattern}": query(pattern), + }) + + +def nextJson(dgen) -> bytes: + return json.dumps(dgen.next()).encode('utf-8') + + +WAIT_TIME_AFTER_REMOVING_QUERIES = 0 * 60 +CONTROL_RUN_TIME = 10 * 60 +WAIT_TIME_AFTER_ADDING_QUERY = 10 +SINGLE_QUERY_RUNTIME = 10 * 60 +MULT_QUERY_NUM = 1000 +WAIT_TIME_AFTER_ADDING_INGEST = 10 #0 * 60 +MULTI_QUERY_WAIT_TIME_AFTER = 3 * 60 +QUERY_THEN_INGEST_WAIT = 5 * 60 +QUERY_THEN_INGEST_RUNTIME = 10 * 60 + +STRESS_TEST_CONSECUTIVE_FAILURES = 3 +STRESS_TEST_WAIT_TIME = .5 + +def nowStr(): + return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") +def printNow(s): + print(nowStr(), s) + +def mkPattern(): + return ''.join(r.choice(ascii_letters) for _ in range(12)) + +def resetTests(): + printNow("Resetting tests") + deleteAllIngest() + removeAllStandingQueries() + + sleep(WAIT_TIME_AFTER_REMOVING_QUERIES) + # printNow("Tests successfully reset") + + # delete_namespace(namespace_name) + # create_namespace(namespace_name) + # printNow("Namespace cleared") + +def runControl(): + printNow("starting control test") + startIngests(ingest_streams) + sleep(CONTROL_RUN_TIME) + printNow("control test finished") + +def runNQueries(n, waitTime): + for i in range(n): + t = time.time() + # if i == 2 or i == 30: + # input("Do the heap dump") + liveness_check() + start = time.time() + queryAccepted = register_query_for_pattern(mkPattern()) + printNow("Adding " + str(i+1) + "th query took " + str(time.time() - start) + " seconds") + if not queryAccepted: + printNow("query was not accepted") + sleep(waitTime - (time.time() - t)) + +def runSingleQueryTest(): + printNow("Starting single query test") + startIngests(ingest_streams) + sleep(WAIT_TIME_AFTER_ADDING_INGEST) + runNQueries(1, 0) + sleep(SINGLE_QUERY_RUNTIME) + printNow("Single query test done") + +def runMultiQueryTest(): + printNow("Starting multi query test") + startIngests(ingest_streams) + sleep(WAIT_TIME_AFTER_ADDING_INGEST) + runNQueries(MULT_QUERY_NUM, WAIT_TIME_AFTER_ADDING_QUERY) + sleep(MULTI_QUERY_WAIT_TIME_AFTER) + printNow("Multi query test done") + +def runQueriesThenIngest(): + printNow("Starting query for adding ingest then adding query") + runNQueries(MULT_QUERY_NUM, 1) + sleep(QUERY_THEN_INGEST_WAIT) + printNow("Queries added. Starting Ingest") + startIngests(ingest_streams) + sleep(QUERY_THEN_INGEST_RUNTIME) + printNow("Query then ingest test done") + + +def run_b3(): + performanceTests() + +def runStressTest(withIngest): + resetTests() + print("Running stress test", "with ingest" if withIngest else "without ingest") + nFailures = 0 + nSuccees = 0 + if withIngest: + startIngests(ingest_streams) + sleep(10) + while True: + start = time.time() + queryAccepted = register_query_for_pattern(mkPattern()) + printNow("Adding query took " + str(time.time() - start) + " seconds") + if not queryAccepted: + printNow("query was not accepted") + nFailures += 1 + else: + nFailures = 0 + nSuccees += 1 + sleep(STRESS_TEST_WAIT_TIME) + if(nFailures > STRESS_TEST_CONSECUTIVE_FAILURES): + printNow("Exceeded maximum consecutive failures at " + str(nSuccees) + " exiting") + break + + + +tests = { + "control":runControl, + "singleQuery":runSingleQueryTest, + "multiQuery":runMultiQueryTest, + "queriesThenIngest":runQueriesThenIngest, + "stressTestWithIngest": lambda : runStressTest(True), + "stressTestNoIngest": lambda : runStressTest(False), +} +def runTest(testName): + resetTests() + if testName in tests: + print("Running test:", testName) + tests[testName]() + else: + print("Invalid test:", testName) + + +def performanceTests(): + # resetTests() + # runControl() + + # resetTests() + # runSingleQueryTest() + + # resetTests() + # runControl() + + resetTests() + runMultiQueryTest() + + # resetTests() + # runControl() + + # resetTests() + # runQueriesThenIngest() + + # resetTests() + # runControl() + + # resetTests() + # runControl() + + # resetTests() + # runStressTest(True) + + # resetTests() + # runControl() + + # resetTests() + # runStressTest(False) + + # resetTests() + # runControl() + """ + #checkConfig() + removeAllStandingQueries() + deleteAllIngest() + + # sleep(10) + startIngests(ingest_streams) + print("Waiting 1 minute to get a baseline 1-minute ingest rate") + sleep(60) + total = 0 + for i in range(1): + pattern = ''.join(r.choice(ascii_letters) for _ in range(12)) + queryAccepted = register_query_for_pattern(pattern) + if not queryAccepted: + total = i-1 + break + # sleep(wait_between_sqs_sec) + print("Server rejected the most recent Standing Query", total) + # dgen = Datagen(datagen_seed, message_count) + + # queryAccepted = True + # while queryAccepted: + """ + +if __name__ == "__main__": + if len(sys.argv) > 1: + if (sys.argv[1] == "run" or sys.argv[1] == "b3"): + run_b3() + elif sys.argv[1] in tests: + runTest(sys.argv[1]) + else: + print("Invalid test name:", sys.argv[1]) diff --git a/utils/prodenv.py b/utils/prodenv.py index 9bf080f..ad92809 100644 --- a/utils/prodenv.py +++ b/utils/prodenv.py @@ -3,8 +3,8 @@ import requests from typing import * import sys -from time import sleep - +from time import sleep, time +import json """ Env: Utilities =========================== @@ -27,23 +27,22 @@ # Hosts that form the cluster. EDIT THIS. -# THATDOT DEVELOPER -quine_hosts: List[str] = ["http://localhost:8080", "http://localhost:8081", "http://localhost:8082", "http://localhost:8083"] -quine_hosts_with_spares: List[str] = quine_hosts + \ - ["http://localhost:8084"] - -# if (len(quine_hosts) < 3): -# print("G2 expects a cluster of at least 3 hosts, please update prodenv.py") -# exit(-1) - -# Sometimes we just need an arbitrary (but consistent) host in the cluster to run an API call against -a_quine_host = quine_hosts[0] - +a_quine_host: str = "http://172.31.24.210:8080" + + # Hosts that form the cluster. EDIT THIS. +def get_quine_hosts(): + return [f"http://{host['address']}:8080" for i, host in requests.get(f"{a_quine_host}/api/v1/admin/status").json()["cluster"]["clusterMembers"].items()] +try: + quine_hosts: List[str] = get_quine_hosts() + a_quine_host = quine_hosts[0] +except Exception as e: + print(e) + # Number of partitions in the kafka topics -kafka_partitions = 32 +kafka_partitions = 8 # kafka broker string -kafka_servers = "broker0.kafka:9092" +kafka_servers = "172.31.26.114:9092" # how many ingest queries to execute simultaneously (per-host) ingest_parallelism = 32 @@ -169,26 +168,27 @@ def printIngestedCounts(which_ingest: str) -> None: print(f"totalCount={totalCnt}") -def listIngest() -> None: +def listIngest(namespace) -> None: for quine_host in quine_hosts_with_spares: stats = requests.get( - f"{quine_host}/api/v1/ingest").json() + f"{quine_host}/api/v1/ingest", + params = {"namespace" : namespace}).json() print("-------------------- " + quine_host + " --------------------") print(stats) print("-------------------------------------------------------------") -def ingestStats(quine_host: str, which_ingest: str): +def ingestStats(quine_host: str, which_ingest: str, namespace): stats = requests.get( - f"{quine_host}/api/v1/ingest/{which_ingest}").json()["stats"] + f"{quine_host}/api/v1/ingest/{which_ingest}", params = {"namespace" : namespace}).json()["stats"] return stats -def listAllIngests() -> None: +def listAllIngests(namespace) -> None: one_min_rate = 0 for quine_host in quine_hosts: - stats = requests.get(f"{quine_host}/api/v1/ingest").json() + stats = requests.get(f"{quine_host}/api/v1/ingest", params = {"namespace" : namespace}).json() # print(stats) for which_ingest in stats: if stats[which_ingest]["status"] == "FAILED": @@ -208,23 +208,21 @@ def listAllIngests() -> None: def deleteAllIngest() -> None: for quine_host in quine_hosts: - stats = requests.get(f"{quine_host}/api/v1/ingest").json() + resp = requests.get(f"{quine_host}/api/v1/ingest") + if not resp.ok: + raise Exception("failed to decode ingest list", resp.text) + stats = resp.json() # print(stats) for which_ingest in stats: - stats = requests.delete( - f"{quine_host}/api/v1/ingest/{which_ingest}").json() - print(stats) - + resp = requests.delete(f"{quine_host}/api/v1/ingest/{which_ingest}") + if not resp.ok: + raise Exception("failed to delete ingest, ", resp.text) def deleteIngest(which_ingest: str) -> None: for quine_host in quine_hosts: - try: - stats = requests.delete( - f"{quine_host}/api/v1/ingest/{which_ingest}").json() - print(stats) - except Exception as e: - pass - + resp = requests.delete(f"{quine_host}/api/v1/ingest/{which_ingest}") + if not resp.ok: + raise Exception("failed to delete ingest, ", resp.text) def addSampleQuery(): resp = requests.put(f"{a_quine_host}/api/v1/query-ui/sample-queries", json=[ @@ -251,22 +249,24 @@ def clusterHealthAPICall(): def removeAllStandingQueries(): - queries = requests.get( - f"{a_quine_host}/api/v1/query/standing").json() - + resp = requests.get(f"{a_quine_host}/api/v1/query/standing") + if not resp.ok: + raise Exception("Failed to get standing queries:", resp.text) + queries = resp.json() for query in queries: - resp = requests.delete( - f"{a_quine_host}/api/v1/query/standing/{query['name']}") - print(f"deleted SQ={query['name']}") - print(resp) - + try: + resp = requests.delete( + f"{a_quine_host}/api/v1/query/standing/{query['name']}") + print(f"deleted SQ={query['name']}") + print(resp) + except: + print("failed to decode standing query deletion response") def startIngests(streams): for stream in streams: for i, quine_host in enumerate(quine_hosts): print(f"stream={stream} host={quine_host}") sobj = streams[stream] - if sobj["format"] == "JSON": startIngestJSON(quine_host, partitions(i), sobj) else: @@ -287,7 +287,7 @@ def startIngestJSON(quine_host, partitions, sobj): "type": "CypherJson", "query": sobj["query"], "parameter": "props" - } + }, }) if resp.ok: print(f"Registered JSON ingest={sobj['name']} ec2 on {quine_host}") @@ -319,7 +319,7 @@ def startIngestPROTO(quine_host, partitions, sobj): "parameter": "props", "schemaUrl": protobuf_schema_url, "typeName": sobj["type"] - } + }, }) if resp.ok: print(f"Registered PROTO ingest={sobj['name']} ec2 on {quine_host}") @@ -328,6 +328,21 @@ def startIngestPROTO(quine_host, partitions, sobj): # sys.exit(1) + +def liveness_check(timeout=5) -> bool: + t = time() + resp = requests.get(f"{a_quine_host}/api/v1/admin/liveness", timeout=timeout) + diff = time() - t + ok = True + if resp.ok: + print(f"Liveness check took {diff} seconds on host {a_quine_host}") + else: + print( + f"Could not get liveness check from {a_quine_host}: " + resp.text) + ok = False + return ok + + def register_standing_queries(queries) -> bool: ok = True for qname, qparts in queries.items():