diff --git a/experiments/README.md b/experiments/README.md index 74303bcc4..3c4383ce2 100644 --- a/experiments/README.md +++ b/experiments/README.md @@ -1,14 +1,18 @@ -# Semian Experimental Resource +# Semian Experimental Resources -This directory contains an experimental resource adapter for running complex experiments with Semian. +This directory contains experimental resource adapters for running complex experiments with Semian. ## Overview -The `ExperimentalResource` class simulates a distributed service with multiple endpoints, each with configurable latencies following statistical distributions. This allows for testing various failure scenarios and performance characteristics. +Two resource types are available: + +1. **ExperimentalResource** - Simulates a distributed service with multiple endpoints, each with configurable latencies following statistical distributions. Ideal for testing various failure scenarios and performance characteristics with synthetic traffic. + +2. **TrafficReplayExperimentalResource** - Replays real production traffic patterns from Grafana exports, allowing you to test how your system would behave during actual incidents by simulating the exact latency patterns observed in production. ## Features -### Current Implementation +### ExperimentalResource (Synthetic Traffic) 1. **Multiple Endpoints**: Configure any number of endpoints, each with its own fixed latency 2. **Statistical Distributions**: Latencies are assigned based on statistical distributions @@ -26,8 +30,20 @@ The `ExperimentalResource` class simulates a distributed service with multiple e - **Error rate changes**: Modify error rate for the entire service - **Gradual ramp-up**: Both degradations support gradual transitions over time +### TrafficReplayExperimentalResource (Production Traffic Replay) + +1. **Real Traffic Patterns**: Load and replay latency patterns from Grafana JSON exports +2. **Time-Based Simulation**: Simulates requests as though an incident were happening in real-time + - Matches request latencies to timeline offsets + - Uses elapsed time since service start to find corresponding latencies +3. **Automatic Completion**: Stops accepting requests when timeline is exceeded +4. **Request Timeouts**: Configure a maximum timeout for requests +5. **Simple Interface**: No need to configure endpoints, distributions, or error rates - everything comes from the log file + ## Usage +### Synthetic Traffic Generation + See `example_with_circuit_breaker.rb` for usage: ``` @@ -38,3 +54,65 @@ bundle exec ruby example_with_circuit_breaker.rb Output: ![](./example_output.png) + +### Traffic Replay Mode + +The traffic replay feature allows you to simulate real production incidents by replaying latency patterns from Grafana exports. + +#### How It Works + +1. Export traffic data from Grafana as JSON (one JSON object per line) +2. Initialize the resource with `traffic_log_path` parameter +3. The service will simulate latencies based on elapsed time since initialization +4. When a request comes in at time T seconds after service start, it uses the latency from the log entry at offset T +5. When the service has been running longer than the log timeline, it stops accepting requests + +#### Required JSON Format + +Each line in the JSON file should be a complete JSON object with: +- `timestamp`: ISO8601 timestamp (e.g., `"2025-10-02T16:19:30.814890047Z"`) +- `attrs.db.sql.total_duration_ms`: Database latency in milliseconds + +Example: +```json +{"timestamp": "2025-10-02T16:19:30.814890047Z", "attrs.db.sql.total_duration_ms": 2.5, "attrs.db.sql.total_count": 1} +{"timestamp": "2025-10-02T16:19:31.314890047Z", "attrs.db.sql.total_duration_ms": 5.8, "attrs.db.sql.total_count": 2} +{"timestamp": "2025-10-02T16:19:31.814890047Z", "attrs.db.sql.total_duration_ms": 12.3, "attrs.db.sql.total_count": 3} +``` + +If a request doesn't have `attrs.db.sql.total_duration_ms`, it's treated as 0ms latency. + +#### Example Usage + +```ruby +resource = Semian::Experiments::TrafficReplayExperimentalResource.new( + name: "my_service", + traffic_log_path: "path/to/grafana_export.json", + timeout: 30.0, + semian: { + circuit_breaker: true, + success_threshold: 2, + error_threshold: 3, + error_threshold_timeout: 10, + } +) + +# Make requests - they'll be served with latencies from the log +begin + resource.request do |latency| + puts "Request completed with latency: #{(latency * 1000).round(2)}ms" + end +rescue Semian::Experiments::TrafficReplayExperimentalResource::TrafficReplayCompleteError + puts "Traffic replay completed!" +end +``` + +#### Running the Example + +A complete example with a sample traffic log is provided: + +```bash +bundle exec ruby example_with_traffic_replay.rb sample_traffic_log.json +``` + +The sample log simulates a 12-second incident where latency spikes from ~2ms to over 300ms and then recovers. diff --git a/experiments/example_output.png b/experiments/example_output.png index 0976a711c..c0f8698d1 100644 Binary files a/experiments/example_output.png and b/experiments/example_output.png differ diff --git a/experiments/example_with_traffic_replay.rb b/experiments/example_with_traffic_replay.rb new file mode 100755 index 000000000..1b74f1344 --- /dev/null +++ b/experiments/example_with_traffic_replay.rb @@ -0,0 +1,103 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require_relative "traffic_replay_experimental_resource" + +# Example usage of TrafficReplayExperimentalResource with traffic replay + +puts "=== Semian ExperimentalResource - Traffic Replay Example ===" +puts + +# Example 1: Create a resource with traffic replay from Grafana export +puts "Example: Using traffic replay from Grafana export" +puts "-" * 60 + +# To use this example, you need a Grafana export JSON file +# where each line is a JSON object with: +# - "timestamp": ISO8601 timestamp +# - "attrs.db.sql.total_duration_ms": latency in milliseconds + +traffic_log_path = ARGV[0] || "path/to/grafana_export.json" + +unless File.exist?(traffic_log_path) + puts "ERROR: Traffic log file not found: #{traffic_log_path}" + puts + puts "Usage: ruby #{__FILE__} " + puts + puts "The JSON file should contain one JSON object per line, with fields:" + puts ' - "timestamp": ISO8601 timestamp (e.g., "2025-10-02T16:19:30.814890047Z")' + puts ' - "attrs.db.sql.total_duration_ms": latency in milliseconds' + puts + puts "Example JSON line:" + puts "{" + puts ' "timestamp": "2025-10-02T16:19:30.814890047Z",' + puts ' "attrs.db.sql.total_duration_ms": 5.2,' + puts " ... other fields ..." + puts "}" + exit 1 +end + +begin + # Create resource with traffic replay + resource = Semian::Experiments::TrafficReplayExperimentalResource.new( + name: "my_service", + traffic_log_path: traffic_log_path, # Path to Grafana JSON export + timeout: 0.1, # 100ms timeout + semian: { + circuit_breaker: true, + success_threshold: 2, + error_threshold: 3, + error_threshold_timeout: 10, + error_timeout: 0.2, + }, + ) + + puts + puts "Resource created successfully!" + puts "Starting to process requests..." + puts "Press Ctrl+C to stop" + puts + + # Make requests continuously until the timeline is exhausted + request_count = 0 + loop do + result = resource.request do |latency| + request_count += 1 + puts "[#{Time.now.strftime("%H:%M:%S")}] Request ##{request_count} - " \ + "Latency: #{(latency * 1000).round(2)}ms" + { latency: latency, request_number: request_count } + end + + # Small delay between requests to avoid overwhelming the output + sleep(0.1) + rescue Semian::Experiments::TrafficReplayExperimentalResource::TrafficReplayCompleteError => e + puts + puts "Traffic replay completed!" + puts "Total requests processed: #{request_count}" + break + rescue Semian::Experiments::TrafficReplayExperimentalResource::CircuitOpenError => e + puts "[#{Time.now.strftime("%H:%M:%S")}] Circuit breaker is OPEN - #{e.message}" + sleep(1) + rescue Semian::Experiments::TrafficReplayExperimentalResource::TimeoutError => e + puts "[#{Time.now.strftime("%H:%M:%S")}] Timeout: #{e.message}" + sleep(1) + rescue Semian::Experiments::TrafficReplayExperimentalResource::ResourceBusyError => e + puts "[#{Time.now.strftime("%H:%M:%S")}] Resource busy: #{e.message}" + sleep(1) + rescue => e + puts "[#{Time.now.strftime("%H:%M:%S")}] Error: #{e.class} - #{e.message}" + sleep(0.5) + end + + puts + puts "=== Replay Complete ===" +rescue ArgumentError => e + puts "ERROR: #{e.message}" + exit(1) +rescue Interrupt + puts + puts + puts "=== Interrupted by user ===" + puts "Total requests processed: #{request_count}" + exit(0) +end diff --git a/experiments/sample_traffic_log.json b/experiments/sample_traffic_log.json new file mode 100644 index 000000000..72654c225 --- /dev/null +++ b/experiments/sample_traffic_log.json @@ -0,0 +1,26 @@ +{"timestamp": "2025-10-02T16:19:30.000000000Z", "attrs.db.sql.total_duration_ms": 2.5, "attrs.db.sql.total_count": 1} +{"timestamp": "2025-10-02T16:19:30.500000000Z", "attrs.db.sql.total_duration_ms": 3.2, "attrs.db.sql.total_count": 1} +{"timestamp": "2025-10-02T16:19:31.000000000Z", "attrs.db.sql.total_duration_ms": 2.8, "attrs.db.sql.total_count": 1} +{"timestamp": "2025-10-02T16:19:31.500000000Z", "attrs.db.sql.total_duration_ms": 5.1, "attrs.db.sql.total_count": 2} +{"timestamp": "2025-10-02T16:19:32.000000000Z", "attrs.db.sql.total_duration_ms": 8.5, "attrs.db.sql.total_count": 2} +{"timestamp": "2025-10-02T16:19:32.500000000Z", "attrs.db.sql.total_duration_ms": 15.3, "attrs.db.sql.total_count": 3} +{"timestamp": "2025-10-02T16:19:33.000000000Z", "attrs.db.sql.total_duration_ms": 25.7, "attrs.db.sql.total_count": 4} +{"timestamp": "2025-10-02T16:19:33.500000000Z", "attrs.db.sql.total_duration_ms": 45.2, "attrs.db.sql.total_count": 5} +{"timestamp": "2025-10-02T16:19:34.000000000Z", "attrs.db.sql.total_duration_ms": 78.4, "attrs.db.sql.total_count": 8} +{"timestamp": "2025-10-02T16:19:34.500000000Z", "attrs.db.sql.total_duration_ms": 125.6, "attrs.db.sql.total_count": 10} +{"timestamp": "2025-10-02T16:19:35.000000000Z", "attrs.db.sql.total_duration_ms": 187.3, "attrs.db.sql.total_count": 15} +{"timestamp": "2025-10-02T16:19:35.500000000Z", "attrs.db.sql.total_duration_ms": 245.8, "attrs.db.sql.total_count": 18} +{"timestamp": "2025-10-02T16:19:36.000000000Z", "attrs.db.sql.total_duration_ms": 298.2, "attrs.db.sql.total_count": 20} +{"timestamp": "2025-10-02T16:19:36.500000000Z", "attrs.db.sql.total_duration_ms": 312.5, "attrs.db.sql.total_count": 22} +{"timestamp": "2025-10-02T16:19:37.000000000Z", "attrs.db.sql.total_duration_ms": 287.6, "attrs.db.sql.total_count": 20} +{"timestamp": "2025-10-02T16:19:37.500000000Z", "attrs.db.sql.total_duration_ms": 234.3, "attrs.db.sql.total_count": 18} +{"timestamp": "2025-10-02T16:19:38.000000000Z", "attrs.db.sql.total_duration_ms": 178.9, "attrs.db.sql.total_count": 15} +{"timestamp": "2025-10-02T16:19:38.500000000Z", "attrs.db.sql.total_duration_ms": 125.4, "attrs.db.sql.total_count": 12} +{"timestamp": "2025-10-02T16:19:39.000000000Z", "attrs.db.sql.total_duration_ms": 82.1, "attrs.db.sql.total_count": 8} +{"timestamp": "2025-10-02T16:19:39.500000000Z", "attrs.db.sql.total_duration_ms": 45.7, "attrs.db.sql.total_count": 5} +{"timestamp": "2025-10-02T16:19:40.000000000Z", "attrs.db.sql.total_duration_ms": 25.3, "attrs.db.sql.total_count": 3} +{"timestamp": "2025-10-02T16:19:40.500000000Z", "attrs.db.sql.total_duration_ms": 12.8, "attrs.db.sql.total_count": 2} +{"timestamp": "2025-10-02T16:19:41.000000000Z", "attrs.db.sql.total_duration_ms": 6.5, "attrs.db.sql.total_count": 1} +{"timestamp": "2025-10-02T16:19:41.500000000Z", "attrs.db.sql.total_duration_ms": 3.8, "attrs.db.sql.total_count": 1} +{"timestamp": "2025-10-02T16:19:42.000000000Z", "attrs.db.sql.total_duration_ms": 2.7, "attrs.db.sql.total_count": 1} + diff --git a/experiments/traffic_replay_experimental_resource.rb b/experiments/traffic_replay_experimental_resource.rb new file mode 100644 index 000000000..ab84145c4 --- /dev/null +++ b/experiments/traffic_replay_experimental_resource.rb @@ -0,0 +1,192 @@ +# frozen_string_literal: true + +require "json" +require "time" + +# Add lib to load path if not already there +lib_path = File.expand_path("../lib", __dir__) +$LOAD_PATH.unshift(lib_path) unless $LOAD_PATH.include?(lib_path) + +require "semian/adapter" + +module Semian + module Experiments + # TrafficReplayExperimentalResource replays real production traffic patterns from Grafana exports. + # It simulates request latencies based on a timeline extracted from JSON log files, + # allowing you to test how your system would behave during a real incident. + class TrafficReplayExperimentalResource + include Semian::Adapter + + attr_reader :name, :traffic_log_path, :timeout + + # Initialize the traffic replay resource + # @param name [String] The identifier for this resource + # @param traffic_log_path [String] Path to Grafana JSON export for traffic replay + # @param timeout [Float, nil] Maximum time to wait for a request (in seconds). If nil, no timeout is enforced. + # @param options [Hash] Additional Semian options + def initialize(name:, traffic_log_path:, timeout: nil, **options) + @name = name + @traffic_log_path = traffic_log_path + @timeout = timeout + @raw_semian_options = options[:semian] + + # Parse the traffic log and build timeline + @traffic_timeline = parse_traffic_log(@traffic_log_path) + @service_start_time = Time.now + + puts "Traffic replay mode enabled. Timeline duration: #{@traffic_timeline.last[:offset].round(2)}s with #{@traffic_timeline.size} requests" + end + + # Required by Adapter + def semian_identifier + @name.to_sym + end + + # Simulate making a request that replays latency from the traffic log + # @raises [TimeoutError] if the request would exceed the configured timeout + # @raises [TrafficReplayCompleteError] if the timeline has been exceeded + def request(&block) + acquire_semian_resource(scope: :request, adapter: :experimental) do + perform_request(&block) + end + end + + private + + def perform_request(&block) + # Get latency from timeline based on elapsed time + latency = get_latency_from_timeline + + # Check if we've exceeded the log timeline + if latency.nil? + puts "\n=== Traffic replay completed ===" + puts "Service has been running longer than the traffic log timeline." + puts "No more requests will be processed." + raise TrafficReplayCompleteError, "Traffic replay has completed - timeline exceeded" + end + + # Check if request would timeout + if @timeout && latency > @timeout + # Sleep for the timeout period, then raise exception + sleep(@timeout) if @timeout > 0 + raise TimeoutError, + "Request timed out after #{@timeout}s (would have taken #{latency.round(3)}s)" + end + + # Simulate the request with calculated latency + sleep(latency) if latency > 0 + + if block_given? + yield(latency) + else + { latency: latency } + end + end + + attr_reader :raw_semian_options + + def resource_exceptions + [TimeoutError, TrafficReplayCompleteError] + end + + # Parse the traffic log JSON file and build a timeline + # @param file_path [String] Path to the Grafana JSON export + # @return [Array] Array of { offset: Float, latency: Float } sorted by offset + def parse_traffic_log(file_path) + unless File.exist?(file_path) + raise ArgumentError, "Traffic log file not found: #{file_path}" + end + + entries = [] + first_timestamp = nil + + File.foreach(file_path) do |line| + line = line.strip + next if line.empty? + + begin + entry = JSON.parse(line) + timestamp_str = entry["timestamp"] + + unless timestamp_str + warn("Warning: Entry missing timestamp field, skipping") + next + end + + timestamp = Time.parse(timestamp_str) + + # Track the first timestamp to calculate offsets + first_timestamp ||= timestamp + + # Calculate offset from start in seconds + offset = timestamp - first_timestamp + + # Get latency in milliseconds, default to 0 if not present + latency_ms = entry.dig("attrs.db.sql.total_duration_ms") || 0 + latency_seconds = latency_ms / 1000.0 + + entries << { offset: offset, latency: latency_seconds } + rescue JSON::ParserError => e + warn("Warning: Failed to parse JSON line: #{e.message}") + next + rescue ArgumentError => e + warn("Warning: Failed to parse timestamp: #{e.message}") + next + end + end + + if entries.empty? + raise ArgumentError, "No valid entries found in traffic log file: #{file_path}" + end + + # Sort by offset to ensure timeline is in order + entries.sort_by { |e| e[:offset] } + end + + # Get latency for current elapsed time from the traffic timeline + # @return [Float, nil] Latency in seconds, or nil if timeline exceeded + def get_latency_from_timeline + elapsed = Time.now - @service_start_time + + # Check if we've exceeded the timeline + if elapsed > @traffic_timeline.last[:offset] + return + end + + # Find the entry with the closest offset to elapsed time + # Using binary search would be more efficient, but linear search is simpler + # and fine for most use cases + closest_entry = @traffic_timeline.min_by { |entry| (entry[:offset] - elapsed).abs } + + closest_entry[:latency] + end + + # Error classes specific to this adapter + class CircuitOpenError < ::Semian::BaseError + def initialize(semian_identifier, *args) + super(*args) + @semian_identifier = semian_identifier + end + end + + class ResourceBusyError < ::Semian::BaseError + def initialize(semian_identifier, *args) + super(*args) + @semian_identifier = semian_identifier + end + end + + class TimeoutError < StandardError + def marks_semian_circuits? + true # This error should trigger circuit breaker + end + end + + class TrafficReplayCompleteError < StandardError + def marks_semian_circuits? + false # This is not a real error, just indicates replay is complete + end + end + end + end +end