Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- Double write to legacy attributes for backwards compatibility. ([#5490](https://github.com/getsentry/relay/pull/5490))

**Internal**:

- Moves profile chunk processing to the new internal processing pipeline. ([#5505](https://github.com/getsentry/relay/pull/5505))

## 25.12.0

**Features**:
Expand Down
2 changes: 1 addition & 1 deletion relay-profiling/src/outcomes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::ProfileError;

pub fn discard_reason(err: ProfileError) -> &'static str {
pub fn discard_reason(err: &ProfileError) -> &'static str {
match err {
ProfileError::CannotSerializePayload => "profiling_failed_serialization",
ProfileError::ExceedSizeLimit => "profiling_exceed_size_limit",
Expand Down
6 changes: 4 additions & 2 deletions relay-server/src/processing/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::processing::ForwardContext;
use crate::processing::StoreHandle;
use crate::processing::check_ins::CheckInsProcessor;
use crate::processing::logs::LogsProcessor;
use crate::processing::profile_chunks::ProfileChunksProcessor;
use crate::processing::sessions::SessionsProcessor;
use crate::processing::spans::SpansProcessor;
use crate::processing::trace_attachments::TraceAttachmentsProcessor;
Expand Down Expand Up @@ -57,8 +58,9 @@ macro_rules! outputs {
outputs!(
CheckIns => CheckInsProcessor,
Logs => LogsProcessor,
TraceMetrics => TraceMetricsProcessor,
Spans => SpansProcessor,
ProfileChunks => ProfileChunksProcessor,
Sessions => SessionsProcessor,
Spans => SpansProcessor,
TraceAttachments => TraceAttachmentsProcessor,
TraceMetrics => TraceMetricsProcessor,
);
8 changes: 8 additions & 0 deletions relay-server/src/processing/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ impl ForwardContext<'_> {
return Retention::from(*retention);
}

self.event_retention()
}

/// Returns the event [`Retention`].
///
/// This retention is also often used for older products and can be considered a default
/// retention for products which do not define their own retention.
pub fn event_retention(&self) -> Retention {
Retention::from(RetentionConfig {
standard: self
.project_info
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub use self::limits::*;

pub mod check_ins;
pub mod logs;
pub mod profile_chunks;
pub mod sessions;
pub mod spans;
pub mod trace_attachments;
Expand Down
22 changes: 22 additions & 0 deletions relay-server/src/processing/profile_chunks/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use relay_dynamic_config::Feature;

use crate::processing::Context;
use crate::processing::profile_chunks::{Error, Result};

/// Checks whether the profile ingestion feature flag is enabled for the current project.
pub fn feature_flag(ctx: Context<'_>) -> Result<()> {
let feature = match ctx
.project_info
.has_feature(Feature::ContinuousProfilingBetaIngest)
{
// Legacy feature.
true => Feature::ContinuousProfilingBeta,
// The post release ingestion feature.
false => Feature::ContinuousProfiling,
};

match ctx.should_filter(feature) {
true => Err(Error::FilterFeatureFlag),
false => Ok(()),
}
}
189 changes: 189 additions & 0 deletions relay-server/src/processing/profile_chunks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use std::sync::Arc;

use relay_profiling::ProfileType;
use relay_quotas::{DataCategory, RateLimits};

use crate::Envelope;
use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items};
use crate::managed::{Counted, Managed, ManagedEnvelope, ManagedResult as _, Quantities, Rejected};
use crate::processing::{self, Context, CountRateLimited, Forward, Output, QuotaRateLimiter};
use crate::services::outcome::{DiscardReason, Outcome};
use smallvec::smallvec;

mod filter;
mod process;

pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Error raised in [`relay_profiling`].
#[error("Profiling Error: {0}")]
Profiling(#[from] relay_profiling::ProfileError),
/// The profile chunks are rate limited.
#[error("rate limited")]
RateLimited(RateLimits),
/// Profile chunks filtered because of a missing feature flag.
#[error("profile chunks feature flag missing")]
FilterFeatureFlag,
}

impl From<RateLimits> for Error {
fn from(value: RateLimits) -> Self {
Self::RateLimited(value)
}
}

impl crate::managed::OutcomeError for Error {
type Error = Self;

fn consume(self) -> (Option<Outcome>, Self::Error) {
let outcome = match &self {
Self::Profiling(relay_profiling::ProfileError::Filtered(f)) => {
Some(Outcome::Filtered(f.clone()))
}
Self::Profiling(err) => Some(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(err),
))),

Self::RateLimited(limits) => {
let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
Some(Outcome::RateLimited(reason_code))
}
Self::FilterFeatureFlag => None,
};
(outcome, self)
}
}

/// A processor for profile chunks.
///
/// It processes items of type: [`ItemType::ProfileChunk`].
#[derive(Debug)]
pub struct ProfileChunksProcessor {
limiter: Arc<QuotaRateLimiter>,
}

impl ProfileChunksProcessor {
/// Creates a new [`Self`].
pub fn new(limiter: Arc<QuotaRateLimiter>) -> Self {
Self { limiter }
}
}

impl processing::Processor for ProfileChunksProcessor {
type UnitOfWork = SerializedProfileChunks;
type Output = ProfileChunkOutput;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
let profile_chunks = envelope
.envelope_mut()
.take_items_by(|item| matches!(*item.ty(), ItemType::ProfileChunk))
.into_vec();

if profile_chunks.is_empty() {
return None;
}

Some(Managed::from_envelope(
envelope,
SerializedProfileChunks {
headers: envelope.envelope().headers().clone(),
profile_chunks,
},
))
}

async fn process(
&self,
mut profile_chunks: Managed<Self::UnitOfWork>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Error>> {
filter::feature_flag(ctx).reject(&profile_chunks)?;

process::process(&mut profile_chunks, ctx);

let profile_chunks = self.limiter.enforce_quotas(profile_chunks, ctx).await?;

Ok(Output::just(ProfileChunkOutput(profile_chunks)))
}
}

/// Output produced by [`ProfileChunksProcessor`].
#[derive(Debug)]
pub struct ProfileChunkOutput(Managed<SerializedProfileChunks>);

impl Forward for ProfileChunkOutput {
fn serialize_envelope(
self,
_: processing::ForwardContext<'_>,
) -> Result<Managed<Box<Envelope>>, Rejected<()>> {
let Self(profile_chunks) = self;
Ok(profile_chunks
.map(|pc, _| Envelope::from_parts(pc.headers, Items::from_vec(pc.profile_chunks))))
}

#[cfg(feature = "processing")]
fn forward_store(
self,
s: processing::forward::StoreHandle<'_>,
ctx: processing::ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
use crate::services::store::StoreProfileChunk;

let Self(profile_chunks) = self;
let retention_days = ctx.event_retention().standard;

for item in profile_chunks.split(|pc| pc.profile_chunks) {
s.store(item.map(|item, _| StoreProfileChunk {
retention_days,
payload: item.payload(),
quantities: item.quantities(),
}));
}

Ok(())
}
}

/// Serialized profile chunks extracted from an envelope.
#[derive(Debug)]
pub struct SerializedProfileChunks {
/// Original envelope headers.
pub headers: EnvelopeHeaders,
/// List of serialized profile chunk items.
pub profile_chunks: Vec<Item>,
}

impl Counted for SerializedProfileChunks {
fn quantities(&self) -> Quantities {
let mut ui = 0;
let mut backend = 0;

for pc in &self.profile_chunks {
match pc.profile_type() {
Some(ProfileType::Ui) => ui += 1,
Some(ProfileType::Backend) => backend += 1,
None => {}
}
}

let mut quantities = smallvec![];
if ui > 0 {
quantities.push((DataCategory::ProfileChunkUi, ui));
}
if backend > 0 {
quantities.push((DataCategory::ProfileChunk, backend));
}

quantities
}
}

impl CountRateLimited for Managed<SerializedProfileChunks> {
type Error = Error;
}
68 changes: 68 additions & 0 deletions relay-server/src/processing/profile_chunks/process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use relay_profiling::ProfileType;
use relay_quotas::DataCategory;

use crate::envelope::{ContentType, Item, ItemType};
use crate::processing::Context;
use crate::processing::Managed;
use crate::processing::profile_chunks::{Result, SerializedProfileChunks};

/// Processes profile chunks.
pub fn process(profile_chunks: &mut Managed<SerializedProfileChunks>, ctx: Context<'_>) {
// Only run this 'expensive' processing step in processing Relays.
if !ctx.is_processing() {
return;
}

let client_ip = profile_chunks.headers.meta().client_addr();
let filter_settings = &ctx.project_info.config.filter_settings;

profile_chunks.retain(
|pc| &mut pc.profile_chunks,
|item, records| -> Result<()> {
let pc = relay_profiling::ProfileChunk::new(item.payload())?;

// Validate the item inferred profile type with the one from the payload,
// or if missing set it.
//
// This is currently necessary to ensure profile chunks are emitted in the correct
// data category, as well as rate limited with the correct data category.
//
// In the future we plan to make the profile type on the item header a necessity.
// For more context see also: <https://github.com/getsentry/relay/pull/4595>.
if item
.profile_type()
.is_some_and(|pt| pt != pc.profile_type())
{
return Err(relay_profiling::ProfileError::InvalidProfileType.into());
}

// Update the profile type to ensure the following outcomes are emitted in the correct
// data category.
//
// Once the item header on the item is required, this is no longer required.
if item.profile_type().is_none() {
item.set_profile_type(pc.profile_type());
match pc.profile_type() {
ProfileType::Ui => records.modify_by(DataCategory::ProfileChunkUi, 1),
ProfileType::Backend => records.modify_by(DataCategory::ProfileChunk, 1),
}
}

pc.filter(client_ip, filter_settings, ctx.global_config)?;

let expanded = pc.expand()?;
if expanded.len() > ctx.config.max_profile_size() {
return Err(relay_profiling::ProfileError::ExceedSizeLimit.into());
}

*item = {
let mut item = Item::new(ItemType::ProfileChunk);
item.set_profile_type(pc.profile_type());
item.set_payload(ContentType::Json, expanded);
item
};

Ok(())
},
);
}
8 changes: 5 additions & 3 deletions relay-server/src/processing/transactions/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub fn filter(
Err(err) => {
record_keeper.reject_err(
Outcome::Invalid(DiscardReason::Profiling(relay_profiling::discard_reason(
err,
&err,
))),
work.profile.take(),
);
Expand Down Expand Up @@ -246,15 +246,17 @@ fn expand_profile(
Ok(id)
} else {
Err(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit),
relay_profiling::discard_reason(
&relay_profiling::ProfileError::ExceedSizeLimit,
),
)))
}
}
Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => {
Err(Outcome::Filtered(filter_stat_key))
}
Err(err) => Err(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(err),
relay_profiling::discard_reason(&err),
))),
}
}
Expand Down
Loading
Loading