Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,26 +184,45 @@ def export(self, batch: Sequence[ReadableLogRecord]) -> LogExportResult:
serialized_data = encode_logs(batch).SerializeToString()
deadline_sec = time() + self._timeout
for retry_num in range(_MAX_RETRYS):
resp = self._export(serialized_data, deadline_sec - time())
if resp.ok:
return LogExportResult.SUCCESS
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
try:
resp = self._export(serialized_data, deadline_sec - time())
if resp.ok:
return LogExportResult.SUCCESS
except requests.exceptions.RequestException as error:
reason = str(error)
if isinstance(error, ConnectionError):
retryable = True
else:
retryable = False
Comment on lines +195 to +198
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe?

Suggested change
if isinstance(error, ConnectionError):
retryable = True
else:
retryable = False
retryable = isinstance(error, ConnectionError)

status_code = None
else:
reason = resp.reason
retryable = _is_retryable(resp)
status_code = resp.status_code

if not retryable:
_logger.error(
"Failed to export logs batch code: %s, reason: %s",
status_code,
reason,
)
return LogExportResult.FAILURE

if (
not _is_retryable(resp)
or retry_num + 1 == _MAX_RETRYS
retry_num + 1 == _MAX_RETRYS
or backoff_seconds > (deadline_sec - time())
or self._shutdown
):
_logger.error(
"Failed to export logs batch code: %s, reason: %s",
resp.status_code,
resp.text,
"Failed to export logs batch due to timeout,"
"max retries or shutdown."
)
return LogExportResult.FAILURE
_logger.warning(
"Transient error %s encountered while exporting logs batch, retrying in %.2fs.",
resp.reason,
reason,
backoff_seconds,
)
shutdown = self._shutdown_is_occuring.wait(backoff_seconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,26 +231,44 @@ def export(
serialized_data = encode_metrics(metrics_data).SerializeToString()
deadline_sec = time() + self._timeout
for retry_num in range(_MAX_RETRYS):
resp = self._export(serialized_data, deadline_sec - time())
if resp.ok:
return MetricExportResult.SUCCESS
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
try:
resp = self._export(serialized_data, deadline_sec - time())
if resp.ok:
return MetricExportResult.SUCCESS
except requests.exceptions.RequestException as error:
reason = str(error)
if isinstance(error, ConnectionError):
retryable = True
else:
retryable = False
status_code = None
else:
reason = resp.reason
retryable = _is_retryable(resp)
status_code = resp.status_code

if not retryable:
_logger.error(
"Failed to export metrics batch code: %s, reason: %s",
status_code,
reason,
)
return MetricExportResult.FAILURE
if (
not _is_retryable(resp)
or retry_num + 1 == _MAX_RETRYS
retry_num + 1 == _MAX_RETRYS
or backoff_seconds > (deadline_sec - time())
or self._shutdown
):
_logger.error(
"Failed to export metrics batch code: %s, reason: %s",
resp.status_code,
resp.text,
"Failed to export metrics batch due to timeout,"
"max retries or shutdown."
)
return MetricExportResult.FAILURE
_logger.warning(
"Transient error %s encountered while exporting metrics batch, retrying in %.2fs.",
resp.reason,
reason,
backoff_seconds,
)
shutdown = self._shutdown_in_progress.wait(backoff_seconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,26 +179,45 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
serialized_data = encode_spans(spans).SerializePartialToString()
deadline_sec = time() + self._timeout
for retry_num in range(_MAX_RETRYS):
resp = self._export(serialized_data, deadline_sec - time())
if resp.ok:
return SpanExportResult.SUCCESS
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
try:
resp = self._export(serialized_data, deadline_sec - time())
if resp.ok:
return SpanExportResult.SUCCESS
except requests.exceptions.RequestException as error:
reason = str(error)
if isinstance(error, ConnectionError):
retryable = True
else:
retryable = False
status_code = None
else:
reason = resp.reason
retryable = _is_retryable(resp)
status_code = resp.status_code

if not retryable:
_logger.error(
"Failed to export span batch code: %s, reason: %s",
status_code,
reason,
)
return SpanExportResult.FAILURE

if (
not _is_retryable(resp)
or retry_num + 1 == _MAX_RETRYS
retry_num + 1 == _MAX_RETRYS
or backoff_seconds > (deadline_sec - time())
or self._shutdown
):
_logger.error(
"Failed to export span batch code: %s, reason: %s",
resp.status_code,
resp.text,
"Failed to export span batch due to timeout,"
"max retries or shutdown."
)
return SpanExportResult.FAILURE
_logger.warning(
"Transient error %s encountered while exporting span batch, retrying in %.2fs.",
resp.reason,
reason,
backoff_seconds,
)
shutdown = self._shutdown_in_progress.wait(backoff_seconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from unittest import TestCase
from unittest.mock import ANY, MagicMock, Mock, patch

import requests
from requests import Session
from requests.exceptions import ConnectionError
from requests.models import Response

from opentelemetry.exporter.otlp.proto.common.metrics_encoder import (
Expand Down Expand Up @@ -555,6 +557,48 @@ def test_retry_timeout(self, mock_post):
warning.records[0].message,
)

@patch.object(Session, "post")
def test_export_no_collector_available_retryable(self, mock_post):
exporter = OTLPMetricExporter(timeout=1.5)
msg = "Server not available."
mock_post.side_effect = ConnectionError(msg)
with self.assertLogs(level=WARNING) as warning:
before = time.time()
# Set timeout to 1.5 seconds
self.assertEqual(
exporter.export(self.metrics["sum_int"]),
MetricExportResult.FAILURE,
)
after = time.time()
# First call at time 0, second at time 1, then an early return before the second backoff sleep b/c it would exceed timeout.
# Additionally every retry results in two calls, therefore 4.
self.assertEqual(mock_post.call_count, 4)
# There's a +/-20% jitter on each backoff.
self.assertTrue(0.75 < after - before < 1.25)
self.assertIn(
f"Transient error {msg} encountered while exporting metrics batch, retrying in",
warning.records[0].message,
)

@patch.object(Session, "post")
def test_export_no_collector_available(self, mock_post):
exporter = OTLPMetricExporter(timeout=1.5)

mock_post.side_effect = requests.exceptions.RequestException()
with self.assertLogs(level=WARNING) as warning:
# Set timeout to 1.5 seconds
self.assertEqual(
exporter.export(self.metrics["sum_int"]),
MetricExportResult.FAILURE,
)
# First call at time 0, second at time 1, then an early return before the second backoff sleep b/c it would exceed timeout.
self.assertEqual(mock_post.call_count, 1)
# There's a +/-20% jitter on each backoff.
self.assertIn(
"Failed to export metrics batch code",
warning.records[0].message,
)

@patch.object(Session, "post")
def test_timeout_set_correctly(self, mock_post):
resp = Response()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import requests
from google.protobuf.json_format import MessageToDict
from requests import Session
from requests.exceptions import ConnectionError
from requests.models import Response

from opentelemetry._logs import LogRecord, SeverityNumber
Expand Down Expand Up @@ -482,6 +483,48 @@ def test_retry_timeout(self, mock_post):
warning.records[0].message,
)

@patch.object(Session, "post")
def test_export_no_collector_available_retryable(self, mock_post):
exporter = OTLPLogExporter(timeout=1.5)
msg = "Server not available."
mock_post.side_effect = ConnectionError(msg)
with self.assertLogs(level=WARNING) as warning:
before = time.time()
# Set timeout to 1.5 seconds
self.assertEqual(
exporter.export(self._get_sdk_log_data()),
LogExportResult.FAILURE,
)
after = time.time()
# First call at time 0, second at time 1, then an early return before the second backoff sleep b/c it would exceed timeout.
# Additionally every retry results in two calls, therefore 4.
self.assertEqual(mock_post.call_count, 4)
# There's a +/-20% jitter on each backoff.
self.assertTrue(0.75 < after - before < 1.25)
self.assertIn(
f"Transient error {msg} encountered while exporting logs batch, retrying in",
warning.records[0].message,
)

@patch.object(Session, "post")
def test_export_no_collector_available(self, mock_post):
exporter = OTLPLogExporter(timeout=1.5)

mock_post.side_effect = requests.exceptions.RequestException()
with self.assertLogs(level=WARNING) as warning:
# Set timeout to 1.5 seconds
self.assertEqual(
exporter.export(self._get_sdk_log_data()),
LogExportResult.FAILURE,
)
# First call at time 0, second at time 1, then an early return before the second backoff sleep b/c it would exceed timeout.
self.assertEqual(mock_post.call_count, 1)
# There's a +/-20% jitter on each backoff.
self.assertIn(
"Failed to export logs batch code",
warning.records[0].message,
)

@patch.object(Session, "post")
def test_timeout_set_correctly(self, mock_post):
resp = Response()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import requests
from requests import Session
from requests.exceptions import ConnectionError
from requests.models import Response

from opentelemetry.exporter.otlp.proto.http import Compression
Expand Down Expand Up @@ -303,6 +304,48 @@ def test_retry_timeout(self, mock_post):
warning.records[0].message,
)

@patch.object(Session, "post")
def test_export_no_collector_available_retryable(self, mock_post):
exporter = OTLPSpanExporter(timeout=1.5)
msg = "Server not available."
mock_post.side_effect = ConnectionError(msg)
with self.assertLogs(level=WARNING) as warning:
before = time.time()
# Set timeout to 1.5 seconds
self.assertEqual(
exporter.export([BASIC_SPAN]),
SpanExportResult.FAILURE,
)
after = time.time()
# First call at time 0, second at time 1, then an early return before the second backoff sleep b/c it would exceed timeout.
# Additionally every retry results in two calls, therefore 4.
self.assertEqual(mock_post.call_count, 4)
# There's a +/-20% jitter on each backoff.
self.assertTrue(0.75 < after - before < 1.25)
self.assertIn(
f"Transient error {msg} encountered while exporting span batch, retrying in",
warning.records[0].message,
)

@patch.object(Session, "post")
def test_export_no_collector_available(self, mock_post):
exporter = OTLPSpanExporter(timeout=1.5)

mock_post.side_effect = requests.exceptions.RequestException()
with self.assertLogs(level=WARNING) as warning:
# Set timeout to 1.5 seconds
self.assertEqual(
exporter.export([BASIC_SPAN]),
SpanExportResult.FAILURE,
)
# First call at time 0, second at time 1, then an early return before the second backoff sleep b/c it would exceed timeout.
self.assertEqual(mock_post.call_count, 1)
# There's a +/-20% jitter on each backoff.
self.assertIn(
"Failed to export span batch code",
warning.records[0].message,
)

@patch.object(Session, "post")
def test_timeout_set_correctly(self, mock_post):
resp = Response()
Expand Down
Loading