diff --git a/graph.py b/graph.py new file mode 100644 index 0000000000..ffb278f741 --- /dev/null +++ b/graph.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +import matplotlib.cm as mplcm +import matplotlib.colors as colors +import matplotlib.pyplot as plt +import numpy as np + +threads = [1, 2, 4, 8, 16, 32, 64, 128, 256] + +data = { + "local_main": { + "avg": [[0.01], [0.03], [0.05], [0.09], [0.21], [0.38], [0.95], [4.05], [11.11]], + "p50": [[0.01], [0.02], [0.03], [0.05], [0.12], [0.25], [0.67], [3.22], [9.8]], + "p90": [[0.02], [0.05], [0.11], [0.23], [0.52], [0.94], [2.01], [8.88], [23.47]], + "p99": [[0.02], [0.05], [0.11], [0.23], [0.52], [0.94], [2.01], [8.88], [23.47]], + "p100": [[0.04], [0.21], [0.3], [0.58], [1.22], [2.59], [7.25], [17.03], [25.38]], + }, + "local_original_1.5": { + "avg": [[0.01], [0.02], [0.04], [0.09], [0.2], [0.35], [0.65], [1.14], [2.21]], + "p50": [[0.01], [0.01], [0.01], [0.02], [0.06], [0.09], [0.2], [0.45], [1.91]], + "p90": [[0.02], [0.04], [0.11], [0.22], [0.49], [0.81], [1.87], [3.72], [4.76]], + "p99": [[0.02], [0.04], [0.11], [0.22], [0.49], [0.81], [1.87], [3.72], [4.76]], + "p100": [[0.04], [0.62], [1.11], [3.33], [4.71], [6.07], [5.64], [6.05], [6.76]], + }, + "local_original_2": { + "avg": [[0.01], [0.02], [0.04], [0.09], [0.18], [0.39], [0.63], [1.23], [2.28]], + "p50": [[0.01], [0.01], [0.01], [0.02], [0.02], [0.06], [0.17], [0.41], [1.9]], + "p90": [[0.01], [0.02], [0.08], [0.21], [0.33], [1.24], [1.83], [3.82], [4.91]], + "p99": [[0.01], [0.02], [0.08], [0.21], [0.33], [1.24], [1.83], [3.82], [4.91]], + "p100": [[0.04], [1.3], [1.54], [3.07], [3.72], [5.55], [5.44], [6.42], [7.06]], + }, + "local_server_algo": { + "avg": [[0.01], [0.02], [0.05], [0.1], [0.19], [0.36], [0.73], [1.23], [2.19]], + "p50": [[0.01], [0.01], [0.01], [0.01], [0.03], [0.09], [0.19], [0.59], [2.04]], + "p90": [[0.02], [0.04], [0.1], [0.22], [0.51], [1.07], [2.37], [3.58], [4.74]], + "p99": [[0.02], [0.04], [0.1], [0.22], [0.51], [1.07], [2.37], [3.58], [4.74]], + "p100": [[0.09], [0.65], [1.35], [2.87], [3.31], [4.4], [6.55], [5.84], [6.88]], + }, +} + + +metrics = ["avg", "p90", "p100"] +metric_titles = { + "avg": "Average Latency", + "p50": "p50 Latency", + "p90": "p90 Latency", + "p99": "p99 Latency", + "p100": "p100 (Max) Latency", +} + +plt.figure(figsize=(16, 4 * len(metrics))) +NUM_COLORS = len(data.keys()) + 1 +cm = plt.get_cmap("gist_rainbow") +cNorm = colors.Normalize(vmin=0, vmax=NUM_COLORS - 1) +scalarMap = mplcm.ScalarMappable(norm=cNorm, cmap=cm) +for i, metric in enumerate(metrics, 1): + if metric in ["avg"]: + ax = plt.subplot(len(metrics), 2, i) + else: + ax = plt.subplot(len(metrics), 2, (i - 1) * (2) + 1) + ax.set_prop_cycle(color=[scalarMap.to_rgba(i) for i in range(NUM_COLORS)]) + order = [] + for label, vals in data.items(): + if metric not in vals: + continue + arr = np.concatenate(np.around(np.array(vals[metric]), decimals=2)) + order.append(plt.plot(threads, arr, "o-", label=label)) + + plt.title(metric_titles[metric]) + plt.xscale("log", base=2) + plt.xlabel("Threads") + plt.ylabel("Seconds") + plt.xticks(threads, threads) + plt.grid(True, which="both", axis="x", linestyle="--", alpha=0.5) + plt.axhline(y=0, color="gray", linestyle="-") + if metric != "p90": + plt.legend().set_visible(False) + else: + plt.legend(loc=(1.01, 0.5), fontsize=8) + +plt.tight_layout() +plt.show() diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index 4bb927d995..7041bde5ab 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -473,8 +473,13 @@ def _max_time_expired_error(exc: PyMongoError) -> bool: # This limit is non-configurable and was chosen to be twice the 60 second # default value of MongoDB's `transactionLifetimeLimitSeconds` parameter. _WITH_TRANSACTION_RETRY_TIME_LIMIT = 120 -_BACKOFF_MAX = 1 -_BACKOFF_INITIAL = 0.050 # 50ms initial backoff +_BACKOFF_MAX = 0.500 # 500ms max backoff +_BACKOFF_INITIAL = 0.001 # 1ms initial backoff + + +def _set_backoff_initial(seconds: float) -> None: + global _BACKOFF_INITIAL # noqa: PLW0603 + _BACKOFF_INITIAL = seconds def _within_time_limit(start_time: float) -> bool: @@ -482,6 +487,11 @@ def _within_time_limit(start_time: float) -> bool: return time.monotonic() - start_time < _WITH_TRANSACTION_RETRY_TIME_LIMIT +def _would_exceed_time_limit(start_time: float, backoff: float) -> bool: + """Is the backoff within the with_transaction retry limit?""" + return time.monotonic() + backoff - start_time >= _WITH_TRANSACTION_RETRY_TIME_LIMIT + + _T = TypeVar("_T") if TYPE_CHECKING: @@ -705,10 +715,14 @@ async def callback(session, custom_arg, custom_kwarg=None): """ start_time = time.monotonic() retry = 0 + last_error: Optional[BaseException] = None while True: if retry: # Implement exponential backoff on retry. jitter = random.random() # noqa: S311 - backoff = jitter * min(_BACKOFF_INITIAL * (2**retry), _BACKOFF_MAX) + backoff = jitter * min(_BACKOFF_INITIAL * (1.25**retry), _BACKOFF_MAX) + if _would_exceed_time_limit(start_time, backoff): + assert last_error is not None + raise last_error await asyncio.sleep(backoff) retry += 1 await self.start_transaction( @@ -718,6 +732,7 @@ async def callback(session, custom_arg, custom_kwarg=None): ret = await callback(self) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as exc: + last_error = exc if self.in_transaction: await self.abort_transaction() if ( diff --git a/pymongo/synchronous/client_session.py b/pymongo/synchronous/client_session.py index a8f03fac74..b803544ae9 100644 --- a/pymongo/synchronous/client_session.py +++ b/pymongo/synchronous/client_session.py @@ -471,8 +471,13 @@ def _max_time_expired_error(exc: PyMongoError) -> bool: # This limit is non-configurable and was chosen to be twice the 60 second # default value of MongoDB's `transactionLifetimeLimitSeconds` parameter. _WITH_TRANSACTION_RETRY_TIME_LIMIT = 120 -_BACKOFF_MAX = 1 -_BACKOFF_INITIAL = 0.050 # 50ms initial backoff +_BACKOFF_MAX = 0.500 # 500ms max backoff +_BACKOFF_INITIAL = 0.001 # 1ms initial backoff + + +def _set_backoff_initial(seconds: float) -> None: + global _BACKOFF_INITIAL # noqa: PLW0603 + _BACKOFF_INITIAL = seconds def _within_time_limit(start_time: float) -> bool: @@ -480,6 +485,11 @@ def _within_time_limit(start_time: float) -> bool: return time.monotonic() - start_time < _WITH_TRANSACTION_RETRY_TIME_LIMIT +def _would_exceed_time_limit(start_time: float, backoff: float) -> bool: + """Is the backoff within the with_transaction retry limit?""" + return time.monotonic() + backoff - start_time >= _WITH_TRANSACTION_RETRY_TIME_LIMIT + + _T = TypeVar("_T") if TYPE_CHECKING: @@ -703,10 +713,14 @@ def callback(session, custom_arg, custom_kwarg=None): """ start_time = time.monotonic() retry = 0 + last_error: Optional[BaseException] = None while True: if retry: # Implement exponential backoff on retry. jitter = random.random() # noqa: S311 - backoff = jitter * min(_BACKOFF_INITIAL * (2**retry), _BACKOFF_MAX) + backoff = jitter * min(_BACKOFF_INITIAL * (1.25**retry), _BACKOFF_MAX) + if _would_exceed_time_limit(start_time, backoff): + assert last_error is not None + raise last_error time.sleep(backoff) retry += 1 self.start_transaction(read_concern, write_concern, read_preference, max_commit_time_ms) @@ -714,6 +728,7 @@ def callback(session, custom_arg, custom_kwarg=None): ret = callback(self) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as exc: + last_error = exc if self.in_transaction: self.abort_transaction() if ( diff --git a/summarize.py b/summarize.py new file mode 100644 index 0000000000..bcac39512a --- /dev/null +++ b/summarize.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +import csv +import pprint +import re +from collections import defaultdict + + +def testing_n_threads(f_in, data): + threads = re.match(r"Testing (?P.*) threads", f_in.readline()).group("n_threads") + seconds = re.match( + r"All threads completed after (?P.*) seconds", f_in.readline() + ).group("n_seconds") + tries = re.match(r"Total number of retry attempts: (?P.*)", f_in.readline()).group( + "n_tries" + ) + data[f"{threads}_sec"].append(float(seconds)) + data[f"{threads}_try"].append(int(tries)) + return data + + +def read_table(f_in, data): + # Initialize the CSV reader with the pipe '|' as the delimiter + reader = csv.reader(f_in, delimiter="|") + next(reader) # skip header + + for row in reader: + if "threads " in row: + continue + row = [col.strip() for col in row] # noqa:PLW2901 + if row == []: + continue + # Convert numbers to appropriate types (int for threads, float for statistics) + threads = int(row[0]) + avg, p50, p90, p99, p100 = map(float, row[1:]) + # Append the parsed row to the list + data[f"{threads}_avg"].append(avg) + data[f"{threads}_p50"].append(p50) + data[f"{threads}_p90"].append(p90) + data[f"{threads}_p99"].append(p99) + data[f"{threads}_p100"].append(p100) + return data + + +path = "/Users/iris.ho/Github/backpressure/final" +files = ["main", "local_original_1.5", "local_original_2", "local_server_algo"] +print_data = {} +pp = pprint.PrettyPrinter(width=80) +THREADS = [1, 2, 4, 8, 16, 32, 64, 128, 256] +for f in files: + data = defaultdict(list) + with open(f"{path}/{f}.txt") as f_in: + for _ in THREADS: + data = testing_n_threads(f_in, data) + f_in.readline() + f_in.readline() + data = read_table(f_in, data) + print_data[f] = { + "avg": [data[f"{thread}_avg"] for thread in THREADS], + "p50": [data[f"{thread}_p50"] for thread in THREADS], + "p90": [data[f"{thread}_p90"] for thread in THREADS], + "p99": [data[f"{thread}_p99"] for thread in THREADS], + "p100": [data[f"{thread}_p100"] for thread in THREADS], + } +print(print_data) # noqa: T201 diff --git a/test/asynchronous/test_transactions.py b/test/asynchronous/test_transactions.py index 5c2a4f6fae..150cdde2f8 100644 --- a/test/asynchronous/test_transactions.py +++ b/test/asynchronous/test_transactions.py @@ -15,7 +15,9 @@ """Execute Transactions Spec tests.""" from __future__ import annotations +import random import sys +import time from io import BytesIO from test.asynchronous.utils_spec_runner import AsyncSpecRunner @@ -36,7 +38,11 @@ from bson.raw_bson import RawBSONDocument from pymongo import WriteConcern, _csot from pymongo.asynchronous import client_session -from pymongo.asynchronous.client_session import TransactionOptions +from pymongo.asynchronous.client_session import ( + _BACKOFF_MAX, + TransactionOptions, + _set_backoff_initial, +) from pymongo.asynchronous.command_cursor import AsyncCommandCursor from pymongo.asynchronous.cursor import AsyncCursor from pymongo.asynchronous.helpers import anext @@ -602,6 +608,81 @@ async def callback(session): await s.with_transaction(callback) self.assertFalse(s.in_transaction) + @async_client_context.require_test_commands + @async_client_context.require_transactions + async def test_transaction_backoff_is_random(self): + client = async_client_context.client + coll = client[self.db.name].test + # set fail point to trigger transaction failure and trigger backoff + await self.set_fail_point( + { + "configureFailPoint": "failCommand", + "mode": { + "times": 30 + }, # sufficiently high enough such that the time effect of backoff is noticeable + "data": { + "failCommands": ["commitTransaction"], + "errorCode": 24, + }, + } + ) + self.addAsyncCleanup( + self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"} + ) + + start = time.monotonic() + + async def callback(session): + await coll.insert_one({}, session=session) + + async with self.client.start_session() as s: + await s.with_transaction(callback) + + end = time.monotonic() + self.assertLess(end - start, 5) # sum of backoffs is ~3.5 seconds + + @async_client_context.require_test_commands + @async_client_context.require_transactions + async def test_transaction_backoff(self): + client = async_client_context.client + coll = client[self.db.name].test + # patch random to make it deterministic + _original_random_random = random.random + + def always_one(): + return 1 + + random.random = always_one + # set fail point to trigger transaction failure and trigger backoff + await self.set_fail_point( + { + "configureFailPoint": "failCommand", + "mode": { + "times": 30 + }, # sufficiently high enough such that the time effect of backoff is noticeable + "data": { + "failCommands": ["commitTransaction"], + "errorCode": 24, + }, + } + ) + self.addAsyncCleanup( + self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"} + ) + + start = time.monotonic() + + async def callback(session): + await coll.insert_one({}, session=session) + + async with self.client.start_session() as s: + await s.with_transaction(callback) + + end = time.monotonic() + self.assertGreaterEqual(end - start, 3.5) # sum of backoffs is 3.5629515313825695 + + random.random = _original_random_random + class TestOptionsInsideTransactionProse(AsyncTransactionsBase): @async_client_context.require_transactions diff --git a/test/test_transactions.py b/test/test_transactions.py index f4578deddb..a0f8f143ec 100644 --- a/test/test_transactions.py +++ b/test/test_transactions.py @@ -15,7 +15,9 @@ """Execute Transactions Spec tests.""" from __future__ import annotations +import random import sys +import time from io import BytesIO from test.utils_spec_runner import SpecRunner @@ -47,7 +49,11 @@ from pymongo.read_concern import ReadConcern from pymongo.read_preferences import ReadPreference from pymongo.synchronous import client_session -from pymongo.synchronous.client_session import TransactionOptions +from pymongo.synchronous.client_session import ( + _BACKOFF_MAX, + TransactionOptions, + _set_backoff_initial, +) from pymongo.synchronous.command_cursor import CommandCursor from pymongo.synchronous.cursor import Cursor from pymongo.synchronous.helpers import next @@ -590,6 +596,77 @@ def callback(session): s.with_transaction(callback) self.assertFalse(s.in_transaction) + @client_context.require_test_commands + @client_context.require_transactions + def test_transaction_backoff_is_random(self): + client = client_context.client + coll = client[self.db.name].test + # set fail point to trigger transaction failure and trigger backoff + self.set_fail_point( + { + "configureFailPoint": "failCommand", + "mode": { + "times": 30 + }, # sufficiently high enough such that the time effect of backoff is noticeable + "data": { + "failCommands": ["commitTransaction"], + "errorCode": 24, + }, + } + ) + self.addCleanup(self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"}) + + start = time.monotonic() + + def callback(session): + coll.insert_one({}, session=session) + + with self.client.start_session() as s: + s.with_transaction(callback) + + end = time.monotonic() + self.assertLess(end - start, 5) # sum of backoffs is ~3.5 seconds + + @client_context.require_test_commands + @client_context.require_transactions + def test_transaction_backoff(self): + client = client_context.client + coll = client[self.db.name].test + # patch random to make it deterministic + _original_random_random = random.random + + def always_one(): + return 1 + + random.random = always_one + # set fail point to trigger transaction failure and trigger backoff + self.set_fail_point( + { + "configureFailPoint": "failCommand", + "mode": { + "times": 30 + }, # sufficiently high enough such that the time effect of backoff is noticeable + "data": { + "failCommands": ["commitTransaction"], + "errorCode": 24, + }, + } + ) + self.addCleanup(self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"}) + + start = time.monotonic() + + def callback(session): + coll.insert_one({}, session=session) + + with self.client.start_session() as s: + s.with_transaction(callback) + + end = time.monotonic() + self.assertGreaterEqual(end - start, 3.5) # sum of backoffs is 3.5629515313825695 + + random.random = _original_random_random + class TestOptionsInsideTransactionProse(TransactionsBase): @client_context.require_transactions diff --git a/withTransaction.py b/withTransaction.py new file mode 100644 index 0000000000..a24f950c0c --- /dev/null +++ b/withTransaction.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import os +import time +from concurrent.futures import ThreadPoolExecutor + +from pymongo import MongoClient + + +class RunOrderTransaction: + def __init__(self, client): + super(RunOrderTransaction, self).__init__() # noqa:UP008 + self.retry_attempts = -1 + self.time = 0 + self.client = client + + def run(self): + start = time.time() + with self.client.start_session() as session: + try: + session.with_transaction(self.callback) + finally: + self.time = time.time() - start + return self + + def callback(self, session): + self.retry_attempts += 1 + return callback(session, self.client) + + +def callback(session, client): + order_id = client.test.orders1.insert_one({"sku": "foo", "qty": 1}, session=session).inserted_id + res = client.test.inventory1.update_one( + {"sku": "foo", "qty": {"$gte": 1}}, {"$inc": {"qty": -1}}, session=session + ) + if not res.modified_count: + raise TypeError("Insufficient inventory count") + + return order_id + + +def run(num_threads: int, local: bool): + if local: + client = MongoClient() + else: + client = MongoClient(os.getenv("ATLAS_URI")) + try: + client.drop_database("test") + except Exception: # noqa: S110 + # fails on atlas? + pass + db = client.test + db.drop_collection("orders1") + db.create_collection("orders1") + db.drop_collection("inventory1") + inventory = db.create_collection("inventory1") + inventory.insert_one({"sku": "foo", "qty": 1000000}) + + f.write("Testing %s threads\n" % num_threads) + start = time.time() + N_TXNS = 512 + results = [] + ops = [RunOrderTransaction(client) for _ in range(N_TXNS)] + with ThreadPoolExecutor(max_workers=num_threads) as exc: + futures = [exc.submit(op.run) for op in ops] + for future in futures: + result = future.result() + results.append(result) + + end = time.time() + total_time = end - start + total_attempts = sum(r.retry_attempts for r in results) + + f.write("All threads completed after %s seconds\n" % (end - start)) + f.write(f"Total number of retry attempts: {total_attempts}\n") + client.close() + + latencies = sorted(r.time for r in results) + avg_latency = sum(latencies) / N_TXNS + p50 = latencies[int(N_TXNS * 0.5)] + p90 = latencies[int(N_TXNS * 0.9)] + p99 = latencies[int(N_TXNS * 0.99)] + p100 = latencies[int(N_TXNS * 1.0) - 1] + # print(f'avg latency: {avg_latency:.2f}s p50: {p50:.2f}s p90: {p90:.2f}s p99: {p99:.2f}s p100: {p100:.2f}s') + return total_time, total_attempts, avg_latency, p50, p90, p99, p100 + + +def main(f, local=True): + NUM_THREADS = [1, 2, 4, 8, 16, 32, 64, 128, 256] + data = {} + for num in NUM_THREADS: + times, attempts, avg_latency, p50, p90, p99, p100 = run(num, local) + data[num] = { + "avg": avg_latency, + "p50": p50, + "p90": p90, + "p99": p99, + "p100": p100, + } + f.write("\n") + time.sleep(10) + f.write("\nthreads | avg | p50 | p90 | p99 | p100\n") + for num in NUM_THREADS: + f.write( + f"{num:7} | {data[num]['avg']:5.2f} | {data[num]['p50']:5.2f} | {data[num]['p90']:5.2f} | {data[num]['p90']:5.2f} | {data[num]['p100']:5.2f}\n" + ) + + +if __name__ == "__main__": + with open("/Users/iris.ho/Github/backpressure/final/local_original_1.5.txt", "w") as f: + main(f, local=True)