Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from __future__ import annotations

import matplotlib.cm as mplcm
import matplotlib.colors as colors
import matplotlib.pyplot as plt
import numpy as np

threads = [1, 2, 4, 8, 16, 32, 64, 128, 256]

data = {
"local_main": {
"avg": [[0.01], [0.03], [0.05], [0.09], [0.21], [0.38], [0.95], [4.05], [11.11]],
"p50": [[0.01], [0.02], [0.03], [0.05], [0.12], [0.25], [0.67], [3.22], [9.8]],
"p90": [[0.02], [0.05], [0.11], [0.23], [0.52], [0.94], [2.01], [8.88], [23.47]],
"p99": [[0.02], [0.05], [0.11], [0.23], [0.52], [0.94], [2.01], [8.88], [23.47]],
"p100": [[0.04], [0.21], [0.3], [0.58], [1.22], [2.59], [7.25], [17.03], [25.38]],
},
"local_original_1.5": {
"avg": [[0.01], [0.02], [0.04], [0.09], [0.2], [0.35], [0.65], [1.14], [2.21]],
"p50": [[0.01], [0.01], [0.01], [0.02], [0.06], [0.09], [0.2], [0.45], [1.91]],
"p90": [[0.02], [0.04], [0.11], [0.22], [0.49], [0.81], [1.87], [3.72], [4.76]],
"p99": [[0.02], [0.04], [0.11], [0.22], [0.49], [0.81], [1.87], [3.72], [4.76]],
"p100": [[0.04], [0.62], [1.11], [3.33], [4.71], [6.07], [5.64], [6.05], [6.76]],
},
"local_original_2": {
"avg": [[0.01], [0.02], [0.04], [0.09], [0.18], [0.39], [0.63], [1.23], [2.28]],
"p50": [[0.01], [0.01], [0.01], [0.02], [0.02], [0.06], [0.17], [0.41], [1.9]],
"p90": [[0.01], [0.02], [0.08], [0.21], [0.33], [1.24], [1.83], [3.82], [4.91]],
"p99": [[0.01], [0.02], [0.08], [0.21], [0.33], [1.24], [1.83], [3.82], [4.91]],
"p100": [[0.04], [1.3], [1.54], [3.07], [3.72], [5.55], [5.44], [6.42], [7.06]],
},
"local_server_algo": {
"avg": [[0.01], [0.02], [0.05], [0.1], [0.19], [0.36], [0.73], [1.23], [2.19]],
"p50": [[0.01], [0.01], [0.01], [0.01], [0.03], [0.09], [0.19], [0.59], [2.04]],
"p90": [[0.02], [0.04], [0.1], [0.22], [0.51], [1.07], [2.37], [3.58], [4.74]],
"p99": [[0.02], [0.04], [0.1], [0.22], [0.51], [1.07], [2.37], [3.58], [4.74]],
"p100": [[0.09], [0.65], [1.35], [2.87], [3.31], [4.4], [6.55], [5.84], [6.88]],
},
}


metrics = ["avg", "p90", "p100"]
metric_titles = {
"avg": "Average Latency",
"p50": "p50 Latency",
"p90": "p90 Latency",
"p99": "p99 Latency",
"p100": "p100 (Max) Latency",
}

plt.figure(figsize=(16, 4 * len(metrics)))
NUM_COLORS = len(data.keys()) + 1
cm = plt.get_cmap("gist_rainbow")
cNorm = colors.Normalize(vmin=0, vmax=NUM_COLORS - 1)
scalarMap = mplcm.ScalarMappable(norm=cNorm, cmap=cm)
for i, metric in enumerate(metrics, 1):
if metric in ["avg"]:
ax = plt.subplot(len(metrics), 2, i)
else:
ax = plt.subplot(len(metrics), 2, (i - 1) * (2) + 1)
ax.set_prop_cycle(color=[scalarMap.to_rgba(i) for i in range(NUM_COLORS)])
order = []
for label, vals in data.items():
if metric not in vals:
continue
arr = np.concatenate(np.around(np.array(vals[metric]), decimals=2))
order.append(plt.plot(threads, arr, "o-", label=label))

plt.title(metric_titles[metric])
plt.xscale("log", base=2)
plt.xlabel("Threads")
plt.ylabel("Seconds")
plt.xticks(threads, threads)
plt.grid(True, which="both", axis="x", linestyle="--", alpha=0.5)
plt.axhline(y=0, color="gray", linestyle="-")
if metric != "p90":
plt.legend().set_visible(False)
else:
plt.legend(loc=(1.01, 0.5), fontsize=8)

plt.tight_layout()
plt.show()
21 changes: 18 additions & 3 deletions pymongo/asynchronous/client_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,15 +473,25 @@ def _max_time_expired_error(exc: PyMongoError) -> bool:
# This limit is non-configurable and was chosen to be twice the 60 second
# default value of MongoDB's `transactionLifetimeLimitSeconds` parameter.
_WITH_TRANSACTION_RETRY_TIME_LIMIT = 120
_BACKOFF_MAX = 1
_BACKOFF_INITIAL = 0.050 # 50ms initial backoff
_BACKOFF_MAX = 0.500 # 500ms max backoff
_BACKOFF_INITIAL = 0.001 # 1ms initial backoff


def _set_backoff_initial(seconds: float) -> None:
global _BACKOFF_INITIAL # noqa: PLW0603
_BACKOFF_INITIAL = seconds


def _within_time_limit(start_time: float) -> bool:
"""Are we within the with_transaction retry limit?"""
return time.monotonic() - start_time < _WITH_TRANSACTION_RETRY_TIME_LIMIT


def _would_exceed_time_limit(start_time: float, backoff: float) -> bool:
"""Is the backoff within the with_transaction retry limit?"""
return time.monotonic() + backoff - start_time >= _WITH_TRANSACTION_RETRY_TIME_LIMIT


_T = TypeVar("_T")

if TYPE_CHECKING:
Expand Down Expand Up @@ -705,10 +715,14 @@ async def callback(session, custom_arg, custom_kwarg=None):
"""
start_time = time.monotonic()
retry = 0
last_error: Optional[BaseException] = None
while True:
if retry: # Implement exponential backoff on retry.
jitter = random.random() # noqa: S311
backoff = jitter * min(_BACKOFF_INITIAL * (2**retry), _BACKOFF_MAX)
backoff = jitter * min(_BACKOFF_INITIAL * (1.25**retry), _BACKOFF_MAX)
if _would_exceed_time_limit(start_time, backoff):
assert last_error is not None
raise last_error
await asyncio.sleep(backoff)
retry += 1
await self.start_transaction(
Expand All @@ -718,6 +732,7 @@ async def callback(session, custom_arg, custom_kwarg=None):
ret = await callback(self)
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
except BaseException as exc:
last_error = exc
if self.in_transaction:
await self.abort_transaction()
if (
Expand Down
21 changes: 18 additions & 3 deletions pymongo/synchronous/client_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,15 +471,25 @@ def _max_time_expired_error(exc: PyMongoError) -> bool:
# This limit is non-configurable and was chosen to be twice the 60 second
# default value of MongoDB's `transactionLifetimeLimitSeconds` parameter.
_WITH_TRANSACTION_RETRY_TIME_LIMIT = 120
_BACKOFF_MAX = 1
_BACKOFF_INITIAL = 0.050 # 50ms initial backoff
_BACKOFF_MAX = 0.500 # 500ms max backoff
_BACKOFF_INITIAL = 0.001 # 1ms initial backoff


def _set_backoff_initial(seconds: float) -> None:
global _BACKOFF_INITIAL # noqa: PLW0603
_BACKOFF_INITIAL = seconds


def _within_time_limit(start_time: float) -> bool:
"""Are we within the with_transaction retry limit?"""
return time.monotonic() - start_time < _WITH_TRANSACTION_RETRY_TIME_LIMIT


def _would_exceed_time_limit(start_time: float, backoff: float) -> bool:
"""Is the backoff within the with_transaction retry limit?"""
return time.monotonic() + backoff - start_time >= _WITH_TRANSACTION_RETRY_TIME_LIMIT


_T = TypeVar("_T")

if TYPE_CHECKING:
Expand Down Expand Up @@ -703,17 +713,22 @@ def callback(session, custom_arg, custom_kwarg=None):
"""
start_time = time.monotonic()
retry = 0
last_error: Optional[BaseException] = None
while True:
if retry: # Implement exponential backoff on retry.
jitter = random.random() # noqa: S311
backoff = jitter * min(_BACKOFF_INITIAL * (2**retry), _BACKOFF_MAX)
backoff = jitter * min(_BACKOFF_INITIAL * (1.25**retry), _BACKOFF_MAX)
if _would_exceed_time_limit(start_time, backoff):
assert last_error is not None
raise last_error
time.sleep(backoff)
retry += 1
self.start_transaction(read_concern, write_concern, read_preference, max_commit_time_ms)
try:
ret = callback(self)
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
except BaseException as exc:
last_error = exc
if self.in_transaction:
self.abort_transaction()
if (
Expand Down
65 changes: 65 additions & 0 deletions summarize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import annotations

import csv
import pprint
import re
from collections import defaultdict


def testing_n_threads(f_in, data):
threads = re.match(r"Testing (?P<n_threads>.*) threads", f_in.readline()).group("n_threads")
seconds = re.match(
r"All threads completed after (?P<n_seconds>.*) seconds", f_in.readline()
).group("n_seconds")
tries = re.match(r"Total number of retry attempts: (?P<n_tries>.*)", f_in.readline()).group(
"n_tries"
)
data[f"{threads}_sec"].append(float(seconds))
data[f"{threads}_try"].append(int(tries))
return data


def read_table(f_in, data):
# Initialize the CSV reader with the pipe '|' as the delimiter
reader = csv.reader(f_in, delimiter="|")
next(reader) # skip header

for row in reader:
if "threads " in row:
continue
row = [col.strip() for col in row] # noqa:PLW2901
if row == []:
continue
# Convert numbers to appropriate types (int for threads, float for statistics)
threads = int(row[0])
avg, p50, p90, p99, p100 = map(float, row[1:])
# Append the parsed row to the list
data[f"{threads}_avg"].append(avg)
data[f"{threads}_p50"].append(p50)
data[f"{threads}_p90"].append(p90)
data[f"{threads}_p99"].append(p99)
data[f"{threads}_p100"].append(p100)
return data


path = "/Users/iris.ho/Github/backpressure/final"
files = ["main", "local_original_1.5", "local_original_2", "local_server_algo"]
print_data = {}
pp = pprint.PrettyPrinter(width=80)
THREADS = [1, 2, 4, 8, 16, 32, 64, 128, 256]
for f in files:
data = defaultdict(list)
with open(f"{path}/{f}.txt") as f_in:
for _ in THREADS:
data = testing_n_threads(f_in, data)
f_in.readline()
f_in.readline()
data = read_table(f_in, data)
print_data[f] = {
"avg": [data[f"{thread}_avg"] for thread in THREADS],
"p50": [data[f"{thread}_p50"] for thread in THREADS],
"p90": [data[f"{thread}_p90"] for thread in THREADS],
"p99": [data[f"{thread}_p99"] for thread in THREADS],
"p100": [data[f"{thread}_p100"] for thread in THREADS],
}
print(print_data) # noqa: T201
83 changes: 82 additions & 1 deletion test/asynchronous/test_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
"""Execute Transactions Spec tests."""
from __future__ import annotations

import random
import sys
import time
from io import BytesIO
from test.asynchronous.utils_spec_runner import AsyncSpecRunner

Expand All @@ -36,7 +38,11 @@
from bson.raw_bson import RawBSONDocument
from pymongo import WriteConcern, _csot
from pymongo.asynchronous import client_session
from pymongo.asynchronous.client_session import TransactionOptions
from pymongo.asynchronous.client_session import (
_BACKOFF_MAX,
TransactionOptions,
_set_backoff_initial,
)
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
from pymongo.asynchronous.cursor import AsyncCursor
from pymongo.asynchronous.helpers import anext
Expand Down Expand Up @@ -602,6 +608,81 @@ async def callback(session):
await s.with_transaction(callback)
self.assertFalse(s.in_transaction)

@async_client_context.require_test_commands
@async_client_context.require_transactions
async def test_transaction_backoff_is_random(self):
client = async_client_context.client
coll = client[self.db.name].test
# set fail point to trigger transaction failure and trigger backoff
await self.set_fail_point(
{
"configureFailPoint": "failCommand",
"mode": {
"times": 30
}, # sufficiently high enough such that the time effect of backoff is noticeable
"data": {
"failCommands": ["commitTransaction"],
"errorCode": 24,
},
}
)
self.addAsyncCleanup(
self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"}
)

start = time.monotonic()

async def callback(session):
await coll.insert_one({}, session=session)

async with self.client.start_session() as s:
await s.with_transaction(callback)

end = time.monotonic()
self.assertLess(end - start, 5) # sum of backoffs is ~3.5 seconds

@async_client_context.require_test_commands
@async_client_context.require_transactions
async def test_transaction_backoff(self):
client = async_client_context.client
coll = client[self.db.name].test
# patch random to make it deterministic
_original_random_random = random.random

def always_one():
return 1

random.random = always_one
# set fail point to trigger transaction failure and trigger backoff
await self.set_fail_point(
{
"configureFailPoint": "failCommand",
"mode": {
"times": 30
}, # sufficiently high enough such that the time effect of backoff is noticeable
"data": {
"failCommands": ["commitTransaction"],
"errorCode": 24,
},
}
)
self.addAsyncCleanup(
self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"}
)

start = time.monotonic()

async def callback(session):
await coll.insert_one({}, session=session)

async with self.client.start_session() as s:
await s.with_transaction(callback)

end = time.monotonic()
self.assertGreaterEqual(end - start, 3.5) # sum of backoffs is 3.5629515313825695

random.random = _original_random_random


class TestOptionsInsideTransactionProse(AsyncTransactionsBase):
@async_client_context.require_transactions
Expand Down
Loading
Loading