Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
b3f36f4
Add realtime GPU dispatch kernel library with RPC-based function disp…
wsttiger Feb 18, 2026
85b38ab
Add AI predecoder service with hybrid GPU-CPU decoding pipeline
wsttiger Feb 18, 2026
1496281
Enable real ONNX model inference in AI predecoder pipeline
wsttiger Feb 19, 2026
ffaab3d
Integrate real PyMatching MWPM decoder into AI predecoder pipeline
wsttiger Feb 19, 2026
35792ec
Refactor predecoder test into multi-distance PipelineConfig architecture
wsttiger Feb 19, 2026
f7b4c6e
Add PipelineBenchmark utility for realtime decoding latency measurement
wsttiger Feb 19, 2026
70d3eac
Integrate PipelineBenchmark into predecoder test and track incomplete…
wsttiger Feb 19, 2026
4de331e
Add per-worker timing breakdown to predecoder pipeline test
wsttiger Feb 19, 2026
44c04c4
Cache TRT engines to disk and use per-worker decoder pool
wsttiger Feb 19, 2026
6a2010f
Add streaming test mode with continuous syndrome arrival simulation
wsttiger Feb 19, 2026
a36a2c3
Added design document
wsttiger Feb 20, 2026
5ddd4d3
Add host-side spin-polling dispatcher to replace device-side persiste…
wsttiger Feb 20, 2026
779cdcb
realtime: host-side dynamic worker pool dispatcher and predecoder ref…
wsttiger Feb 21, 2026
10dfcfb
Updated the design document to reflect code changes.
wsttiger Feb 22, 2026
df47e95
perf: optimize predecoder realtime pipeline latency
wsttiger Feb 22, 2026
a04ef38
Copied the updated realtime code (dispatchers and all) to the realtim…
wsttiger Feb 26, 2026
84bbda2
Fix streaming pipeline: out-of-order consumer, race fix, and timing i…
wsttiger Feb 26, 2026
d8bdbc1
Scale pipeline to 16 workers / 32 slots for sustained 30 µs arrival rate
wsttiger Feb 26, 2026
25e9b7f
Handle dynamic batch dims in TRT engine build; swap d13 to memory model
wsttiger Feb 26, 2026
099bca2
Optimize GPU copy kernels: vectorize loads and use DMA for output copy
wsttiger Feb 26, 2026
3744c9e
Add pre-launch DMA input copy callback and d13_r104 config
wsttiger Feb 27, 2026
9c544a5
Add RealtimePipeline scaffolding; refactor benchmark to use it
wsttiger Mar 2, 2026
b03bf1e
Add GTest suite for realtime pipeline with SKIP_TRT identity passthrough
wsttiger Mar 3, 2026
b923e8c
Remove dead predecoder_input_kernel; update design doc
wsttiger Mar 3, 2026
e6ea8ef
Implement roadmap items: GPU-only mode, post_launch_fn, and naming im…
wsttiger Mar 3, 2026
e4df4c4
Merge branch 'add_realtime_ai_predecoder_host_side_gb200' of github.c…
wsttiger Mar 3, 2026
84af084
Added pipeline library to QEC unittests CMake
wsttiger Mar 4, 2026
64c0d9f
Fix critical and major defects from code review
wsttiger Mar 4, 2026
c5ee3c8
Formatting
wsttiger Mar 4, 2026
c819354
Added mermaid documentation
wsttiger Mar 4, 2026
ac8277c
Fixed errors in mermaid diagram
wsttiger Mar 4, 2026
9e183df
Remove in-tree realtime/ directory; use pre-installed cudaq-realtime …
wsttiger Mar 6, 2026
1ae8ae3
Fix predecoder test link: add host-dispatch lib and prioritize build …
wsttiger Mar 6, 2026
cbb8e1e
Adapt cudaqx to extern "C" host dispatcher API
wsttiger Mar 6, 2026
61f1368
Update pipeline for 24-byte RPC header and tune d13_r104 config
wsttiger Mar 9, 2026
8cd20a5
Update CMake for TensorRT decoder unit test (#448)
bmhowe23 Feb 24, 2026
0a1afbf
Merge branch 'main' into add_realtime_ai_predecoder_host_side_gb200
bmhowe23 Mar 9, 2026
96f5c33
Fix uint8 model I/O and enable correctness verification with Stim data
wsttiger Mar 11, 2026
9eab912
Add syndrome density diagnostic and fix X-basis config labels
wsttiger Mar 12, 2026
6715bd4
Merge remote-tracking branch 'bmhowe23/bhowe/add_realtime_ai_predecod…
wsttiger Mar 16, 2026
d91d6bb
Add NVTX profiling instrumentation to realtime pipeline
wsttiger Mar 16, 2026
cfbc4be
Forgot to add this to the NVTX stuff
wsttiger Mar 16, 2026
293ad5b
Decouple PyMatching workers from predecoder workers and update docs
wsttiger Mar 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
514 changes: 514 additions & 0 deletions docs/host_side_dispatcher_design_gemini.md

Large diffs are not rendered by default.

785 changes: 785 additions & 0 deletions docs/hybrid_ai_predecoder_pipeline.md

Large diffs are not rendered by default.

452 changes: 452 additions & 0 deletions docs/realtime_pipeline_architecture.md

Large diffs are not rendered by default.

80 changes: 80 additions & 0 deletions libs/qec/include/cudaq/qec/realtime/ai_decoder_service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/****************************************************************-*- C++ -*-****
* Copyright (c) 2026 NVIDIA Corporation & Affiliates. *
* All rights reserved. *
* *
* This source code and the accompanying materials are made available under *
* the terms of the Apache License 2.0 which accompanies this distribution. *
******************************************************************************/

#pragma once

#include <NvInfer.h>
#include <cuda_runtime.h>
#include <memory>
#include <stdexcept>
#include <string>
#include <vector>

namespace cudaq::qec {

class AIDecoderService {
public:
class Logger : public nvinfer1::ILogger {
void log(Severity severity, const char *msg) noexcept override;
} static gLogger;

/// @brief Constructor. Accepts a serialized TRT engine (.engine/.plan) or
/// an ONNX model (.onnx) which will be compiled to a TRT engine.
/// @param model_path Path to the model file
/// @param device_mailbox_slot Pointer to the specific slot in the global
/// mailbox bank
/// @param engine_save_path If non-empty and model_path is .onnx, save the
/// built engine to this path for fast reloading on subsequent runs
AIDecoderService(const std::string &model_path, void **device_mailbox_slot,
const std::string &engine_save_path = "");

virtual ~AIDecoderService();

virtual void capture_graph(cudaStream_t stream);

cudaGraphExec_t get_executable_graph() const { return graph_exec_; }

/// @brief Size of the primary input tensor in bytes (payload from RPC)
size_t get_input_size() const { return input_size_; }

/// @brief Size of the primary output tensor in bytes (forwarded to CPU)
size_t get_output_size() const { return output_size_; }

void *get_trt_input_ptr() const { return d_trt_input_; }

protected:
void load_engine(const std::string &path);
void build_engine_from_onnx(const std::string &onnx_path,
const std::string &engine_save_path = "");
void setup_bindings();
void allocate_resources();

std::unique_ptr<nvinfer1::IRuntime> runtime_;
std::unique_ptr<nvinfer1::ICudaEngine> engine_;
std::unique_ptr<nvinfer1::IExecutionContext> context_;

cudaGraphExec_t graph_exec_ = nullptr;

void **device_mailbox_slot_;
void *d_trt_input_ = nullptr; // Primary input buffer
void *d_trt_output_ = nullptr; // Primary output buffer (residual_detectors)
std::vector<void *> d_aux_buffers_; // Additional I/O buffers TRT needs

struct TensorBinding {
std::string name;
void *d_buffer = nullptr;
size_t size_bytes = 0;
bool is_input = false;
};
std::vector<TensorBinding> all_bindings_;

size_t input_size_ = 0;
size_t output_size_ = 0;
};

} // namespace cudaq::qec
84 changes: 84 additions & 0 deletions libs/qec/include/cudaq/qec/realtime/ai_predecoder_service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/****************************************************************-*- C++ -*-****
* Copyright (c) 2026 NVIDIA Corporation & Affiliates. *
* All rights reserved. *
* *
* This source code and the accompanying materials are made available under *
* the terms of the Apache License 2.0 which accompanies this distribution. *
******************************************************************************/

#pragma once

#include "cudaq/qec/realtime/ai_decoder_service.h"
#include <atomic>
#include <cuda/atomic>

// Portable CPU Yield Macro for busy-polling (skip if already defined by realtime API)
#ifndef QEC_CPU_RELAX
#if defined(__x86_64__)
#include <immintrin.h>
#define QEC_CPU_RELAX() _mm_pause()
#elif defined(__aarch64__)
#define QEC_CPU_RELAX() __asm__ volatile("yield" ::: "memory")
#else
#define QEC_CPU_RELAX() std::atomic_thread_fence(std::memory_order_seq_cst)
#endif
#endif

namespace cudaq::qec {

struct PreDecoderJob {
int slot_idx; ///< Worker/slot index (for release_job; always 0)
int origin_slot; ///< FPGA ring slot for tx_flags routing (dynamic pool)
void *ring_buffer_ptr;
void *inference_data; ///< Points into the pinned output (single slot)

// Performance Tracking
uint64_t submit_ts_ns;
uint64_t dispatch_ts_ns;
uint64_t poll_ts_ns;
};

class AIPreDecoderService : public AIDecoderService {
public:
AIPreDecoderService(const std::string &engine_path,
void **device_mailbox_slot, int queue_depth = 1,
const std::string &engine_save_path = "");
virtual ~AIPreDecoderService();

void capture_graph(cudaStream_t stream, bool device_launch);
void capture_graph(cudaStream_t stream) override {
capture_graph(stream, true);
}

bool poll_next_job(PreDecoderJob &out_job);
void release_job(int slot_idx);

/// Stub for device-dispatcher batch path (returns nullptr; streaming uses
/// host dispatcher)
int *get_device_queue_idx() const { return nullptr; }
cuda::atomic<int, cuda::thread_scope_system> *get_device_ready_flags() const {
return d_ready_flags_;
}
int *get_device_inflight_flag() const { return nullptr; }

cuda::atomic<int, cuda::thread_scope_system> *get_host_ready_flags() const {
return h_ready_flags_;
}
volatile int *get_host_queue_idx() const { return nullptr; }
int get_queue_depth() const { return queue_depth_; }

void **get_host_ring_ptrs() const { return h_ring_ptrs_; }

private:
int queue_depth_; // Always 1

cuda::atomic<int, cuda::thread_scope_system> *h_ready_flags_ = nullptr;
void **h_ring_ptrs_ = nullptr;
void *h_predecoder_outputs_ = nullptr;

cuda::atomic<int, cuda::thread_scope_system> *d_ready_flags_ = nullptr;
void **d_ring_ptrs_ = nullptr;
void *d_predecoder_outputs_ = nullptr;
};

} // namespace cudaq::qec
32 changes: 32 additions & 0 deletions libs/qec/include/cudaq/qec/realtime/nvtx_helpers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/****************************************************************-*- C++ -*-****
* Copyright (c) 2026 NVIDIA Corporation & Affiliates.
* All rights reserved.
*
* This source code and the accompanying materials are made available under
* the terms of the Apache License 2.0 which accompanies this distribution.
******************************************************************************/

#pragma once

#ifdef ENABLE_NVTX

#include <nvtx3/nvToolsExt.h>

struct NvtxRange {
explicit NvtxRange(const char *name) { nvtxRangePushA(name); }
~NvtxRange() { nvtxRangePop(); }
NvtxRange(const NvtxRange &) = delete;
NvtxRange &operator=(const NvtxRange &) = delete;
};

#define NVTX_RANGE(name) NvtxRange _nvtx_range_##__LINE__(name)
#define NVTX_PUSH(name) nvtxRangePushA(name)
#define NVTX_POP() nvtxRangePop()

#else

#define NVTX_RANGE(name) (void)0
#define NVTX_PUSH(name) (void)0
#define NVTX_POP() (void)0

#endif
187 changes: 187 additions & 0 deletions libs/qec/include/cudaq/qec/realtime/pipeline.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*******************************************************************************
* Copyright (c) 2026 NVIDIA Corporation & Affiliates.
* All rights reserved.
*
* This source code and the accompanying materials are made available under
* the terms of the Apache License 2.0 which accompanies this distribution.
******************************************************************************/

#pragma once

#include <cstddef>
#include <cstdint>
#include <cuda_runtime.h>
#include <functional>
#include <memory>
#include <string>

namespace cudaq::realtime {

// ---------------------------------------------------------------------------
// Configuration
// ---------------------------------------------------------------------------

struct CorePinning {
int dispatcher = -1; // -1 = no pinning
int consumer = -1;
int worker_base = -1; // workers pin to base, base+1, ...
};

struct PipelineStageConfig {
int num_workers = 8;
int num_slots = 32;
size_t slot_size = 16384;
CorePinning cores;
};

// ---------------------------------------------------------------------------
// GPU Stage Factory
// ---------------------------------------------------------------------------

struct GpuWorkerResources {
cudaGraphExec_t graph_exec = nullptr;
cudaStream_t stream = nullptr;
void (*pre_launch_fn)(void *user_data, void *slot_dev,
cudaStream_t stream) = nullptr;
void *pre_launch_data = nullptr;
void (*post_launch_fn)(void *user_data, void *slot_dev,
cudaStream_t stream) = nullptr;
void *post_launch_data = nullptr;
uint32_t function_id = 0;
void *user_context = nullptr;
};

/// Called once per worker during start(). Returns GPU resources for that
/// worker.
using GpuStageFactory = std::function<GpuWorkerResources(int worker_id)>;

// ---------------------------------------------------------------------------
// CPU Stage Callback
// ---------------------------------------------------------------------------

/// Passed to the user's CPU stage callback on each completed GPU workload.
/// The user reads gpu_output, does post-processing, and writes the
/// result into response_buffer. No atomics are exposed.
struct CpuStageContext {
int worker_id;
int origin_slot;
const void *gpu_output;
size_t gpu_output_size;
void *response_buffer;
size_t max_response_size;
void *user_context;
};

/// Returns the number of bytes written into response_buffer.
/// Return 0 if no GPU result is ready yet (poll again).
/// Return DEFERRED_COMPLETION to release the worker immediately while
/// deferring slot completion to a later complete_deferred() call.
using CpuStageCallback = std::function<size_t(const CpuStageContext &ctx)>;

/// Sentinel return value from CpuStageCallback: release the worker
/// (idle_mask) but do NOT signal slot completion (tx_flags). The caller
/// is responsible for calling RealtimePipeline::complete_deferred(slot)
/// once the deferred work (e.g. a separate decode thread) finishes.
static constexpr size_t DEFERRED_COMPLETION = SIZE_MAX;

// ---------------------------------------------------------------------------
// Completion Callback
// ---------------------------------------------------------------------------

struct Completion {
uint64_t request_id;
int slot;
bool success;
int cuda_error; // 0 on success
};

/// Called by the consumer thread for each completed (or errored) request.
using CompletionCallback = std::function<void(const Completion &c)>;

// ---------------------------------------------------------------------------
// Ring Buffer Injector (software-only test/replay data source)
// ---------------------------------------------------------------------------

/// Writes RPC-framed requests into the pipeline's ring buffer, simulating
/// FPGA DMA deposits. Created via RealtimePipeline::create_injector().
/// The parent RealtimePipeline must outlive the injector.
class RingBufferInjector {
public:
~RingBufferInjector();
RingBufferInjector(RingBufferInjector &&) noexcept;
RingBufferInjector &operator=(RingBufferInjector &&) noexcept;

RingBufferInjector(const RingBufferInjector &) = delete;
RingBufferInjector &operator=(const RingBufferInjector &) = delete;

/// Try to submit a request. Returns true if accepted, false if
/// backpressure (all slots busy). Non-blocking. Thread-safe.
bool try_submit(uint32_t function_id, const void *payload,
size_t payload_size, uint64_t request_id);

/// Blocking submit: spins until a slot becomes available.
void submit(uint32_t function_id, const void *payload, size_t payload_size,
uint64_t request_id);

uint64_t backpressure_stalls() const;

private:
friend class RealtimePipeline;
struct State;
std::unique_ptr<State> state_;
explicit RingBufferInjector(std::unique_ptr<State> s);
};

// ---------------------------------------------------------------------------
// Pipeline
// ---------------------------------------------------------------------------

class RealtimePipeline {
public:
explicit RealtimePipeline(const PipelineStageConfig &config);
~RealtimePipeline();

RealtimePipeline(const RealtimePipeline &) = delete;
RealtimePipeline &operator=(const RealtimePipeline &) = delete;

/// Register the GPU stage factory (called before start).
void set_gpu_stage(GpuStageFactory factory);

/// Register the CPU worker callback (called before start).
void set_cpu_stage(CpuStageCallback callback);

/// Register the completion callback (called before start).
void set_completion_handler(CompletionCallback handler);

/// Allocate resources, build dispatcher config, spawn all threads.
void start();

/// Signal shutdown, join all threads, free resources.
void stop();

/// Create a software injector for testing without FPGA hardware.
/// The pipeline must be constructed but need not be started yet.
RingBufferInjector create_injector();

struct Stats {
uint64_t submitted;
uint64_t completed;
uint64_t dispatched;
uint64_t backpressure_stalls;
};

/// Thread-safe, lock-free stats snapshot.
Stats stats() const;

/// Signal that deferred processing for a slot is complete.
/// Call this from any thread after the cpu_stage callback returned
/// DEFERRED_COMPLETION and the deferred work has finished writing the
/// response into the slot's ring buffer area.
void complete_deferred(int slot);

private:
struct Impl;
std::unique_ptr<Impl> impl_;
};

} // namespace cudaq::realtime
Loading
Loading