Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 182 additions & 10 deletions b3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
from string import ascii_letters
import sys
from random import Random
# from kafka import KafkaProducer
from datagen import Datagen
import json
from datetime import datetime
import time
# import itertools

"""
B3: Standing Query Stress Test
Expand All @@ -27,13 +33,24 @@

kafka_topic = "hosts-proto"
kafka_reset = "earliest"
group_id = f"b3-{Random().randint(1, 10*1000)}"
group_id = f"b3-{int(time.time() * 1000)}"

# namespace_name = "default"

host_ingest_query = (
"""WITH locIdFrom($props.customer_id, 'host', $props.customer_id, $props.entity_id) AS hId """ +
"""WITH idFrom($props.customer_id, 'host', $props.customer_id, $props.entity_id) AS hId """ +
"""MATCH (n) WHERE id(n) = hId """ +
"""SET n = $props, n:host """
)
# num_ingest = {
# "type": "NumberIteratorIngest",
# "format": {
# "type": "CypherLine",
# "query": "MATCH (n) WHERE id(n) = id(gen.node.from(toInteger($that))) SET n:Number, n.i=toInteger($that)",
# },
# "maximumPerSecond": 1,
# "ingestLimit": 1000,
# }
ingest_streams = {
"hosts": {
"name": "hosts",
Expand All @@ -45,7 +62,7 @@
"group_id": group_id
},
}
wait_between_sqs_sec = 30
wait_between_sqs_sec = 10

r = Random(datagen_seed)

Expand All @@ -54,29 +71,184 @@ def query(pattern: str):
return {
"match": f"MATCH (n) WHERE n.hostname =~ '^{pattern}.*' RETURN DISTINCT id(n)",
"action": (f"""MATCH (n) WHERE id(n) = $sqMatch.data.id """ +
f"""MATCH (m) WHERE id(m) = locIdFrom(n.customer_id, n.customer_id, '{pattern}') """ +
f"""MATCH (m) WHERE id(m) = idFrom(n.customer_id, n.customer_id, '{pattern}') """ +
f"""CREATE (n)-[:{pattern}]->(m) """ +
f"""SET m.name = "{pattern} BAZ", m:bar""")
}


def register_query_for_pattern(pattern: str) -> bool:
return register_standing_queries({
f"{pattern}": query(pattern)
f"{pattern}": query(pattern),
})


def nextJson(dgen) -> bytes:
return json.dumps(dgen.next()).encode('utf-8')


WAIT_TIME_AFTER_REMOVING_QUERIES = 5 * 60
CONTROL_RUN_TIME = 10 * 60
WAIT_TIME_AFTER_ADDING_QUERY = 60
SINGLE_QUERY_RUNTIME = 10 * 60
MULT_QUERY_NUM = 300
WAIT_TIME_AFTER_ADDING_INGEST = 5 * 60
MULTI_QUERY_WAIT_TIME_AFTER = 3 * 60
QUERY_THEN_INGEST_WAIT = 5 * 60
QUERY_THEN_INGEST_RUNTIME = 10 * 60

STRESS_TEST_CONSECUTIVE_FAILURES = 3
STRESS_TEST_WAIT_TIME = .5

def nowStr():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
def printNow(s):
print(nowStr(), s)

def mkPattern():
return ''.join(r.choice(ascii_letters) for _ in range(12))

def resetTests():
printNow("Resetting tests")
deleteAllIngest()
removeAllStandingQueries()

sleep(WAIT_TIME_AFTER_REMOVING_QUERIES)
# printNow("Tests successfully reset")

# delete_namespace(namespace_name)
# create_namespace(namespace_name)
# printNow("Namespace cleared")

def runControl():
printNow("starting control test")
startIngests(ingest_streams)
sleep(CONTROL_RUN_TIME)
printNow("control test finished")

def runNQueries(n, waitTime):
for i in range(n):
start = time.time()
queryAccepted = register_query_for_pattern(mkPattern())
printNow("Adding query took " + str(time.time() - start) + " seconds")
if not queryAccepted:
printNow("query was not accepted")
sleep(waitTime)

def runSingleQueryTest():
printNow("Starting single query test")
startIngests(ingest_streams)
sleep(WAIT_TIME_AFTER_ADDING_INGEST)
runNQueries(1, 0)
sleep(SINGLE_QUERY_RUNTIME)
printNow("Single query test done")

def runMultiQueryTest():
printNow("Starting multi query test")
startIngests(ingest_streams)
sleep(WAIT_TIME_AFTER_ADDING_INGEST)
runNQueries(MULT_QUERY_NUM, WAIT_TIME_AFTER_ADDING_QUERY)
sleep(MULTI_QUERY_WAIT_TIME_AFTER)
printNow("Multi query test done")

def runQueriesThenIngest():
printNow("Starting query for adding ingest then adding query")
runNQueries(MULT_QUERY_NUM, 1)
sleep(QUERY_THEN_INGEST_WAIT)
printNow("Queries added. Starting Ingest")
startIngests(ingest_streams)
sleep(QUERY_THEN_INGEST_RUNTIME)
printNow("Query then ingest test done")


def run_b3():
checkConfig()
performanceTests()

def runStressTest(withIngest):
resetTests()
print("Running stress test", "with ingest" if withIngest else "without ingest")
nFailures = 0
nSuccees = 0
if withIngest:
startIngests(ingest_streams)
sleep(10)
while True:
start = time.time()
queryAccepted = register_query_for_pattern(mkPattern())
printNow("Adding query took " + str(time.time() - start) + " seconds")
if not queryAccepted:
printNow("query was not accepted")
nFailures += 1
else:
nFailures = 0
nSuccees += 1
sleep(STRESS_TEST_WAIT_TIME)
if(nFailures > STRESS_TEST_CONSECUTIVE_FAILURES):
printNow("Exceeded maximum consecutive failures at " + str(nSuccees) + " exiting")
break




def performanceTests():
resetTests()
runControl()

resetTests()
runSingleQueryTest()

resetTests()
runControl()

resetTests()
runMultiQueryTest()

resetTests()
runControl()

resetTests()
runQueriesThenIngest()

resetTests()
runControl()

resetTests()
runControl()

resetTests()
runStressTest(True)

resetTests()
runControl()

resetTests()
runStressTest(False)

resetTests()
runControl()
"""
#checkConfig()
removeAllStandingQueries()
deleteAllIngest()

# sleep(10)
startIngests(ingest_streams)
print("Waiting 1 minute to get a baseline 1-minute ingest rate")
sleep(60)
queryAccepted = True
while queryAccepted:
total = 0
for i in range(1):
pattern = ''.join(r.choice(ascii_letters) for _ in range(12))
queryAccepted = register_query_for_pattern(pattern)
sleep(wait_between_sqs_sec)
print("Server rejected the most recent Standing Query")
if not queryAccepted:
total = i-1
break
# sleep(wait_between_sqs_sec)
print("Server rejected the most recent Standing Query", total)
# dgen = Datagen(datagen_seed, message_count)

# queryAccepted = True
# while queryAccepted:
"""

if __name__ == "__main__":
if len(sys.argv) > 1 and (sys.argv[1] == "run" or sys.argv[1] == "b3"):
Expand Down
82 changes: 82 additions & 0 deletions cassandra_make_tables_commands.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
CREATE KEYSPACE quine WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = false;
CREATE TABLE quine.journals (
quine_id blob,
timestamp bigint,
data blob,
PRIMARY KEY (quine_id, timestamp)
) WITH CLUSTERING ORDER BY (timestamp ASC)
AND compaction = {'class': 'TimeWindowCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4', 'compaction_window_unit': 'MINUTES', 'compaction_window_size': '1'}
AND default_time_to_live = 30
AND gc_grace_seconds = 120;
CREATE TABLE quine.meta_data (
key text PRIMARY KEY,
value blob
);
CREATE TABLE quine.snapshots (
quine_id blob,
timestamp bigint,
multipart_index int,
data blob,
multipart_count int,
PRIMARY KEY (quine_id, timestamp, multipart_index)
) WITH CLUSTERING ORDER BY (timestamp DESC, multipart_index ASC)
AND compaction ={'class': 'TimeWindowCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4', 'compaction_window_unit': 'MINUTES', 'compaction_window_size': '1'}
AND default_time_to_live = 30
AND gc_grace_seconds = 120;
CREATE TABLE quine.standing_queries (
query_id uuid PRIMARY KEY,
queries blob
);
CREATE TABLE quine.standing_query_states (
quine_id blob,
standing_query_id uuid,
standing_query_part_id uuid,
data blob,
PRIMARY KEY (quine_id, standing_query_id, standing_query_part_id)
) WITH CLUSTERING ORDER BY (standing_query_id ASC, standing_query_part_id ASC)
AND default_time_to_live = 30;
CREATE TABLE quine.domain_graph_nodes (
dgn_id bigint PRIMARY KEY,
data blob
) WITH additional_write_policy = '99p'
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND cdc = false
AND comment = ''
AND compaction = {'class': 'TimeWindowCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4', 'compaction_window_unit': 'MINUTES', 'compaction_window_size': '1'}
AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND memtable = 'default'
AND crc_check_chance = 1.0
AND default_time_to_live = 30
AND extensions = {}
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair = 'BLOCKING'
AND speculative_retry = '99p';

CREATE TABLE quine.domain_index_events (
quine_id blob,
timestamp bigint,
data blob,
dgn_id bigint,
PRIMARY KEY (quine_id, timestamp)
) WITH CLUSTERING ORDER BY (timestamp ASC)
AND additional_write_policy = '99p'
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND cdc = false
AND comment = ''
AND compaction = {'class': 'TimeWindowCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4', 'compaction_window_unit': 'MINUTES', 'compaction_window_size': '1'}
AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND memtable = 'default'
AND crc_check_chance = 1.0
AND default_time_to_live = 30
AND extensions = {}
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair = 'BLOCKING'
AND speculative_retry = '99p';
16 changes: 12 additions & 4 deletions produce_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from google.protobuf import json_format
from model.host_pb2 import Host
from utils.prodenv import kafka_servers, datagen_seed, message_count
from kafka.admin import KafkaAdminClient, ConfigResource, NewTopic

"""
Produces Host messages (JSON or protobuf) to the specified kafka topic at the
Expand All @@ -31,13 +32,20 @@ def nextProto() -> bytes:
datum = dgen.next()
msg = Host()
json_format.ParseDict(datum, msg)
return msg.SerializeToString()
return (msg.SerializeToString(), datum["customer_id"])

admin_client = KafkaAdminClient()
# topic_list = []
# topic_list.append(ConfigResource('TOPIC', topic, {"retention.ms":"1000", "retention.bytes":"10000000","segment.bytes":"1000000"}))
# admin_client.alter_configs(topic_list)

admin_client.create_topics([NewTopic(topic, 8, 1, topic_configs={'retention.ms': '-1'})])

for i in range(message_count):
payload = nextProto() if proto else nextJson()
f = producer.send(topic, value=payload)
(payload, partitionKey) = nextProto() if proto else nextJson()
f = producer.send(topic, key=partitionKey.encode('utf-8'), value=payload)
if i % 10000 == 0:
print(f"{i}/{message_count}")
print(f"{i}/{message_count}", flush=True)
# print(f.get())
# print(payload)

Expand Down
Loading