From d1da263bd9639c79c9cede7563e950637eb21e5c Mon Sep 17 00:00:00 2001 From: David Herberth Date: Mon, 22 Dec 2025 15:20:38 +0100 Subject: [PATCH] ref(profile-chunks): Move profile chunks to new processing pipeline --- CHANGELOG.md | 4 + relay-profiling/src/outcomes.rs | 2 +- relay-server/src/processing/common.rs | 6 +- relay-server/src/processing/forward.rs | 8 + relay-server/src/processing/mod.rs | 1 + .../src/processing/profile_chunks/filter.rs | 22 ++ .../src/processing/profile_chunks/mod.rs | 189 ++++++++++++++++++ .../src/processing/profile_chunks/process.rs | 68 +++++++ .../src/processing/transactions/profile.rs | 8 +- relay-server/src/services/processor.rs | 38 +--- .../src/services/processor/profile.rs | 10 +- .../src/services/processor/profile_chunk.rs | 115 ----------- relay-server/src/services/store.rs | 79 +++++--- 13 files changed, 369 insertions(+), 181 deletions(-) create mode 100644 relay-server/src/processing/profile_chunks/filter.rs create mode 100644 relay-server/src/processing/profile_chunks/mod.rs create mode 100644 relay-server/src/processing/profile_chunks/process.rs delete mode 100644 relay-server/src/services/processor/profile_chunk.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index fc450572869..0de8c032f3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ - Double write to legacy attributes for backwards compatibility. ([#5490](https://github.com/getsentry/relay/pull/5490)) +**Internal**: + +- Moves profile chunk processing to the new internal processing pipeline. ([#5505](https://github.com/getsentry/relay/pull/5505)) + ## 25.12.0 **Features**: diff --git a/relay-profiling/src/outcomes.rs b/relay-profiling/src/outcomes.rs index 4ea58ff52bd..e5cdb6fb0a2 100644 --- a/relay-profiling/src/outcomes.rs +++ b/relay-profiling/src/outcomes.rs @@ -1,6 +1,6 @@ use crate::ProfileError; -pub fn discard_reason(err: ProfileError) -> &'static str { +pub fn discard_reason(err: &ProfileError) -> &'static str { match err { ProfileError::CannotSerializePayload => "profiling_failed_serialization", ProfileError::ExceedSizeLimit => "profiling_exceed_size_limit", diff --git a/relay-server/src/processing/common.rs b/relay-server/src/processing/common.rs index 882cee40e17..f9ac3552528 100644 --- a/relay-server/src/processing/common.rs +++ b/relay-server/src/processing/common.rs @@ -5,6 +5,7 @@ use crate::processing::ForwardContext; use crate::processing::StoreHandle; use crate::processing::check_ins::CheckInsProcessor; use crate::processing::logs::LogsProcessor; +use crate::processing::profile_chunks::ProfileChunksProcessor; use crate::processing::sessions::SessionsProcessor; use crate::processing::spans::SpansProcessor; use crate::processing::trace_attachments::TraceAttachmentsProcessor; @@ -57,8 +58,9 @@ macro_rules! outputs { outputs!( CheckIns => CheckInsProcessor, Logs => LogsProcessor, - TraceMetrics => TraceMetricsProcessor, - Spans => SpansProcessor, + ProfileChunks => ProfileChunksProcessor, Sessions => SessionsProcessor, + Spans => SpansProcessor, TraceAttachments => TraceAttachmentsProcessor, + TraceMetrics => TraceMetricsProcessor, ); diff --git a/relay-server/src/processing/forward.rs b/relay-server/src/processing/forward.rs index 790fefb1c1c..02357367970 100644 --- a/relay-server/src/processing/forward.rs +++ b/relay-server/src/processing/forward.rs @@ -93,6 +93,14 @@ impl ForwardContext<'_> { return Retention::from(*retention); } + self.event_retention() + } + + /// Returns the event [`Retention`]. + /// + /// This retention is also often used for older products and can be considered a default + /// retention for products which do not define their own retention. + pub fn event_retention(&self) -> Retention { Retention::from(RetentionConfig { standard: self .project_info diff --git a/relay-server/src/processing/mod.rs b/relay-server/src/processing/mod.rs index cebe2f310fe..2b03eb7daff 100644 --- a/relay-server/src/processing/mod.rs +++ b/relay-server/src/processing/mod.rs @@ -25,6 +25,7 @@ pub use self::limits::*; pub mod check_ins; pub mod logs; +pub mod profile_chunks; pub mod sessions; pub mod spans; pub mod trace_attachments; diff --git a/relay-server/src/processing/profile_chunks/filter.rs b/relay-server/src/processing/profile_chunks/filter.rs new file mode 100644 index 00000000000..7b84157bbaf --- /dev/null +++ b/relay-server/src/processing/profile_chunks/filter.rs @@ -0,0 +1,22 @@ +use relay_dynamic_config::Feature; + +use crate::processing::Context; +use crate::processing::profile_chunks::{Error, Result}; + +/// Checks whether the profile ingestion feature flag is enabled for the current project. +pub fn feature_flag(ctx: Context<'_>) -> Result<()> { + let feature = match ctx + .project_info + .has_feature(Feature::ContinuousProfilingBetaIngest) + { + // Legacy feature. + true => Feature::ContinuousProfilingBeta, + // The post release ingestion feature. + false => Feature::ContinuousProfiling, + }; + + match ctx.should_filter(feature) { + true => Err(Error::FilterFeatureFlag), + false => Ok(()), + } +} diff --git a/relay-server/src/processing/profile_chunks/mod.rs b/relay-server/src/processing/profile_chunks/mod.rs new file mode 100644 index 00000000000..cdb458a1b99 --- /dev/null +++ b/relay-server/src/processing/profile_chunks/mod.rs @@ -0,0 +1,189 @@ +use std::sync::Arc; + +use relay_profiling::ProfileType; +use relay_quotas::{DataCategory, RateLimits}; + +use crate::Envelope; +use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items}; +use crate::managed::{Counted, Managed, ManagedEnvelope, ManagedResult as _, Quantities, Rejected}; +use crate::processing::{self, Context, CountRateLimited, Forward, Output, QuotaRateLimiter}; +use crate::services::outcome::{DiscardReason, Outcome}; +use smallvec::smallvec; + +mod filter; +mod process; + +pub type Result = std::result::Result; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Error raised in [`relay_profiling`]. + #[error("Profiling Error: {0}")] + Profiling(#[from] relay_profiling::ProfileError), + /// The profile chunks are rate limited. + #[error("rate limited")] + RateLimited(RateLimits), + /// Profile chunks filtered because of a missing feature flag. + #[error("profile chunks feature flag missing")] + FilterFeatureFlag, +} + +impl From for Error { + fn from(value: RateLimits) -> Self { + Self::RateLimited(value) + } +} + +impl crate::managed::OutcomeError for Error { + type Error = Self; + + fn consume(self) -> (Option, Self::Error) { + let outcome = match &self { + Self::Profiling(relay_profiling::ProfileError::Filtered(f)) => { + Some(Outcome::Filtered(f.clone())) + } + Self::Profiling(err) => Some(Outcome::Invalid(DiscardReason::Profiling( + relay_profiling::discard_reason(err), + ))), + + Self::RateLimited(limits) => { + let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone()); + Some(Outcome::RateLimited(reason_code)) + } + Self::FilterFeatureFlag => None, + }; + (outcome, self) + } +} + +/// A processor for profile chunks. +/// +/// It processes items of type: [`ItemType::ProfileChunk`]. +#[derive(Debug)] +pub struct ProfileChunksProcessor { + limiter: Arc, +} + +impl ProfileChunksProcessor { + /// Creates a new [`Self`]. + pub fn new(limiter: Arc) -> Self { + Self { limiter } + } +} + +impl processing::Processor for ProfileChunksProcessor { + type UnitOfWork = SerializedProfileChunks; + type Output = ProfileChunkOutput; + type Error = Error; + + fn prepare_envelope( + &self, + envelope: &mut ManagedEnvelope, + ) -> Option> { + let profile_chunks = envelope + .envelope_mut() + .take_items_by(|item| matches!(*item.ty(), ItemType::ProfileChunk)) + .into_vec(); + + if profile_chunks.is_empty() { + return None; + } + + Some(Managed::from_envelope( + envelope, + SerializedProfileChunks { + headers: envelope.envelope().headers().clone(), + profile_chunks, + }, + )) + } + + async fn process( + &self, + mut profile_chunks: Managed, + ctx: Context<'_>, + ) -> Result, Rejected> { + filter::feature_flag(ctx).reject(&profile_chunks)?; + + process::process(&mut profile_chunks, ctx); + + let profile_chunks = self.limiter.enforce_quotas(profile_chunks, ctx).await?; + + Ok(Output::just(ProfileChunkOutput(profile_chunks))) + } +} + +/// Output produced by [`ProfileChunksProcessor`]. +#[derive(Debug)] +pub struct ProfileChunkOutput(Managed); + +impl Forward for ProfileChunkOutput { + fn serialize_envelope( + self, + _: processing::ForwardContext<'_>, + ) -> Result>, Rejected<()>> { + let Self(profile_chunks) = self; + Ok(profile_chunks + .map(|pc, _| Envelope::from_parts(pc.headers, Items::from_vec(pc.profile_chunks)))) + } + + #[cfg(feature = "processing")] + fn forward_store( + self, + s: processing::forward::StoreHandle<'_>, + ctx: processing::ForwardContext<'_>, + ) -> Result<(), Rejected<()>> { + use crate::services::store::StoreProfileChunk; + + let Self(profile_chunks) = self; + let retention_days = ctx.event_retention().standard; + + for item in profile_chunks.split(|pc| pc.profile_chunks) { + s.store(item.map(|item, _| StoreProfileChunk { + retention_days, + payload: item.payload(), + quantities: item.quantities(), + })); + } + + Ok(()) + } +} + +/// Serialized profile chunks extracted from an envelope. +#[derive(Debug)] +pub struct SerializedProfileChunks { + /// Original envelope headers. + pub headers: EnvelopeHeaders, + /// List of serialized profile chunk items. + pub profile_chunks: Vec, +} + +impl Counted for SerializedProfileChunks { + fn quantities(&self) -> Quantities { + let mut ui = 0; + let mut backend = 0; + + for pc in &self.profile_chunks { + match pc.profile_type() { + Some(ProfileType::Ui) => ui += 1, + Some(ProfileType::Backend) => backend += 1, + None => {} + } + } + + let mut quantities = smallvec![]; + if ui > 0 { + quantities.push((DataCategory::ProfileChunkUi, ui)); + } + if backend > 0 { + quantities.push((DataCategory::ProfileChunk, backend)); + } + + quantities + } +} + +impl CountRateLimited for Managed { + type Error = Error; +} diff --git a/relay-server/src/processing/profile_chunks/process.rs b/relay-server/src/processing/profile_chunks/process.rs new file mode 100644 index 00000000000..e4ac2d81a04 --- /dev/null +++ b/relay-server/src/processing/profile_chunks/process.rs @@ -0,0 +1,68 @@ +use relay_profiling::ProfileType; +use relay_quotas::DataCategory; + +use crate::envelope::{ContentType, Item, ItemType}; +use crate::processing::Context; +use crate::processing::Managed; +use crate::processing::profile_chunks::{Result, SerializedProfileChunks}; + +/// Processes profile chunks. +pub fn process(profile_chunks: &mut Managed, ctx: Context<'_>) { + // Only run this 'expensive' processing step in processing Relays. + if !ctx.is_processing() { + return; + } + + let client_ip = profile_chunks.headers.meta().client_addr(); + let filter_settings = &ctx.project_info.config.filter_settings; + + profile_chunks.retain( + |pc| &mut pc.profile_chunks, + |item, records| -> Result<()> { + let pc = relay_profiling::ProfileChunk::new(item.payload())?; + + // Validate the item inferred profile type with the one from the payload, + // or if missing set it. + // + // This is currently necessary to ensure profile chunks are emitted in the correct + // data category, as well as rate limited with the correct data category. + // + // In the future we plan to make the profile type on the item header a necessity. + // For more context see also: . + if item + .profile_type() + .is_some_and(|pt| pt != pc.profile_type()) + { + return Err(relay_profiling::ProfileError::InvalidProfileType.into()); + } + + // Update the profile type to ensure the following outcomes are emitted in the correct + // data category. + // + // Once the item header on the item is required, this is no longer required. + if item.profile_type().is_none() { + item.set_profile_type(pc.profile_type()); + match pc.profile_type() { + ProfileType::Ui => records.modify_by(DataCategory::ProfileChunkUi, 1), + ProfileType::Backend => records.modify_by(DataCategory::ProfileChunk, 1), + } + } + + pc.filter(client_ip, filter_settings, ctx.global_config)?; + + let expanded = pc.expand()?; + if expanded.len() > ctx.config.max_profile_size() { + return Err(relay_profiling::ProfileError::ExceedSizeLimit.into()); + } + + *item = { + let mut item = Item::new(ItemType::ProfileChunk); + item.set_profile_type(pc.profile_type()); + item.set_payload(ContentType::Json, expanded); + item + }; + + Ok(()) + }, + ); +} diff --git a/relay-server/src/processing/transactions/profile.rs b/relay-server/src/processing/transactions/profile.rs index 6ac55f652b2..d2a63905733 100644 --- a/relay-server/src/processing/transactions/profile.rs +++ b/relay-server/src/processing/transactions/profile.rs @@ -77,7 +77,7 @@ pub fn filter( Err(err) => { record_keeper.reject_err( Outcome::Invalid(DiscardReason::Profiling(relay_profiling::discard_reason( - err, + &err, ))), work.profile.take(), ); @@ -246,7 +246,9 @@ fn expand_profile( Ok(id) } else { Err(Outcome::Invalid(DiscardReason::Profiling( - relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit), + relay_profiling::discard_reason( + &relay_profiling::ProfileError::ExceedSizeLimit, + ), ))) } } @@ -254,7 +256,7 @@ fn expand_profile( Err(Outcome::Filtered(filter_stat_key)) } Err(err) => Err(Outcome::Invalid(DiscardReason::Profiling( - relay_profiling::discard_reason(err), + relay_profiling::discard_reason(&err), ))), } } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index a581b5109a6..6f148c6069e 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -49,6 +49,7 @@ use crate::metrics_extraction::transactions::ExtractedMetrics; use crate::metrics_extraction::transactions::types::ExtractMetricsError; use crate::processing::check_ins::CheckInsProcessor; use crate::processing::logs::LogsProcessor; +use crate::processing::profile_chunks::ProfileChunksProcessor; use crate::processing::sessions::SessionsProcessor; use crate::processing::spans::SpansProcessor; use crate::processing::trace_attachments::TraceAttachmentsProcessor; @@ -97,7 +98,6 @@ mod event; mod metrics; mod nel; mod profile; -mod profile_chunk; mod replay; mod report; mod span; @@ -1170,6 +1170,7 @@ struct Processing { spans: SpansProcessor, check_ins: CheckInsProcessor, sessions: SessionsProcessor, + profile_chunks: ProfileChunksProcessor, trace_attachments: TraceAttachmentsProcessor, } @@ -1260,6 +1261,7 @@ impl EnvelopeProcessorService { spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()), check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)), sessions: SessionsProcessor::new(Arc::clone("a_limiter)), + profile_chunks: ProfileChunksProcessor::new(Arc::clone("a_limiter)), trace_attachments: TraceAttachmentsProcessor::new(quota_limiter), }, geoip_lookup, @@ -1691,33 +1693,6 @@ impl EnvelopeProcessorService { Ok(Some(extracted_metrics)) } - async fn process_profile_chunks( - &self, - managed_envelope: &mut TypedEnvelope, - ctx: processing::Context<'_>, - ) -> Result, ProcessingError> { - profile_chunk::filter(managed_envelope, ctx.project_info); - - if_processing!(self.inner.config, { - profile_chunk::process( - managed_envelope, - ctx.project_info, - ctx.global_config, - ctx.config, - ); - }); - - self.enforce_quotas( - managed_envelope, - Annotated::empty(), - &mut ProcessingExtractedMetrics::new(), - ctx, - ) - .await?; - - Ok(None) - } - /// Processes standalone items that require an event ID, but do not have an event on the same envelope. async fn process_standalone( &self, @@ -2006,7 +1981,12 @@ impl EnvelopeProcessorService { } ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx), ProcessingGroup::ProfileChunk => { - run!(process_profile_chunks, ctx) + self.process_with_processor( + &self.inner.processing.profile_chunks, + managed_envelope, + ctx, + ) + .await } // Currently is not used. ProcessingGroup::Metrics => { diff --git a/relay-server/src/services/processor/profile.rs b/relay-server/src/services/processor/profile.rs index 67ffefca548..70fb00717d6 100644 --- a/relay-server/src/services/processor/profile.rs +++ b/relay-server/src/services/processor/profile.rs @@ -49,13 +49,13 @@ pub fn filter( ItemAction::Keep } Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling( - relay_profiling::discard_reason(err), + relay_profiling::discard_reason(&err), ))), } } // We found another profile, we'll drop it. ItemType::Profile => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling( - relay_profiling::discard_reason(ProfileError::TooManyProfiles), + relay_profiling::discard_reason(&ProfileError::TooManyProfiles), ))), _ => ItemAction::Keep, }); @@ -132,7 +132,9 @@ fn expand_profile( Ok(id) } else { Err(Outcome::Invalid(DiscardReason::Profiling( - relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit), + relay_profiling::discard_reason( + &relay_profiling::ProfileError::ExceedSizeLimit, + ), ))) } } @@ -140,7 +142,7 @@ fn expand_profile( Err(Outcome::Filtered(filter_stat_key)) } Err(err) => Err(Outcome::Invalid(DiscardReason::Profiling( - relay_profiling::discard_reason(err), + relay_profiling::discard_reason(&err), ))), } } diff --git a/relay-server/src/services/processor/profile_chunk.rs b/relay-server/src/services/processor/profile_chunk.rs deleted file mode 100644 index 971bf5921ff..00000000000 --- a/relay-server/src/services/processor/profile_chunk.rs +++ /dev/null @@ -1,115 +0,0 @@ -//! Profile chunks processor code. - -use relay_dynamic_config::Feature; - -use crate::envelope::ItemType; -use crate::managed::{ItemAction, TypedEnvelope}; - -use crate::services::projects::project::ProjectInfo; -#[cfg(feature = "processing")] -use { - crate::envelope::ContentType, - crate::services::outcome::{DiscardReason, Outcome}, - crate::services::processor::ProfileChunkGroup, - relay_config::Config, - relay_dynamic_config::GlobalConfig, - relay_profiling::ProfileError, -}; - -/// Removes profile chunks from the envelope if the feature is not enabled. -pub fn filter(managed_envelope: &mut TypedEnvelope, project_info: &ProjectInfo) { - let continuous_profiling_enabled = - if project_info.has_feature(Feature::ContinuousProfilingBetaIngest) { - project_info.has_feature(Feature::ContinuousProfilingBeta) - } else { - project_info.has_feature(Feature::ContinuousProfiling) - }; - managed_envelope.retain_items(|item| match item.ty() { - ItemType::ProfileChunk if !continuous_profiling_enabled => ItemAction::DropSilently, - _ => ItemAction::Keep, - }); -} - -/// Processes profile chunks. -#[cfg(feature = "processing")] -pub fn process( - managed_envelope: &mut TypedEnvelope, - project_info: &ProjectInfo, - global_config: &GlobalConfig, - config: &Config, -) { - let client_ip = managed_envelope.envelope().meta().client_addr(); - let filter_settings = &project_info.config.filter_settings; - - let continuous_profiling_enabled = - if project_info.has_feature(Feature::ContinuousProfilingBetaIngest) { - project_info.has_feature(Feature::ContinuousProfilingBeta) - } else { - project_info.has_feature(Feature::ContinuousProfiling) - }; - - managed_envelope.retain_items(|item| match item.ty() { - ItemType::ProfileChunk => { - if !continuous_profiling_enabled { - return ItemAction::DropSilently; - } - - let chunk = match relay_profiling::ProfileChunk::new(item.payload()) { - Ok(chunk) => chunk, - Err(err) => return error_to_action(err), - }; - - // Validate the item inferred profile type with the one from the payload, - // or if missing set it. - // - // This is currently necessary to ensure profile chunks are emitted in the correct - // data category, as well as rate limited with the correct data category. - // - // In the future we plan to make the profile type on the item header a necessity. - // For more context see also: . - match item.profile_type() { - Some(profile_type) => { - // Validate the profile type inferred from the item header (either set before - // or from the platform) against the profile type from the parsed chunk itself. - if profile_type != chunk.profile_type() { - return error_to_action(relay_profiling::ProfileError::InvalidProfileType); - } - } - None => { - // Important: set the profile type to get outcomes in the correct category, - // if there isn't already one on the profile. - item.set_profile_type(chunk.profile_type()); - } - } - - if let Err(err) = chunk.filter(client_ip, filter_settings, global_config) { - return error_to_action(err); - } - - let payload = match chunk.expand() { - Ok(expanded) => expanded, - Err(err) => return error_to_action(err), - }; - - if payload.len() > config.max_profile_size() { - return error_to_action(relay_profiling::ProfileError::ExceedSizeLimit); - } - - item.set_payload(ContentType::Json, payload); - ItemAction::Keep - } - _ => ItemAction::Keep, - }); -} - -#[cfg(feature = "processing")] -fn error_to_action(err: ProfileError) -> ItemAction { - match err { - ProfileError::Filtered(filter_stat_key) => { - ItemAction::Drop(Outcome::Filtered(filter_stat_key)) - } - err => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling( - relay_profiling::discard_reason(err), - ))), - } -} diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index b226f016431..20ace0733dd 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -140,6 +140,25 @@ impl Counted for StoreSpanV2 { } } +/// Publishes a singular profile chunk to Kafka. +#[derive(Debug)] +pub struct StoreProfileChunk { + /// Default retention of the span. + pub retention_days: u16, + /// The serialized profile chunk payload. + pub payload: Bytes, + /// Outcome quantities associated with this profile. + /// + /// Quantities are different for backend and ui profile chunks. + pub quantities: Quantities, +} + +impl Counted for StoreProfileChunk { + fn quantities(&self) -> Quantities { + self.quantities.clone() + } +} + /// The asynchronous thread pool used for scheduling storing tasks in the envelope store. pub type StoreServicePool = AsyncPool>; @@ -160,6 +179,8 @@ pub enum Store { TraceItem(Managed), /// A singular Span. Span(Managed>), + /// A singular profile chunk. + ProfileChunk(Managed), } impl Store { @@ -170,6 +191,7 @@ impl Store { Store::Metrics(_) => "metrics", Store::TraceItem(_) => "log", Store::Span(_) => "span", + Store::ProfileChunk(_) => "profile_chunk", } } } @@ -208,6 +230,14 @@ impl FromMessage>> for Store { } } +impl FromMessage> for Store { + type Response = NoResponse; + + fn from_message(message: Managed, _: ()) -> Self { + Self::ProfileChunk(message) + } +} + /// Service implementing the [`Store`] interface. pub struct StoreService { pool: StoreServicePool, @@ -245,6 +275,7 @@ impl StoreService { Store::Metrics(message) => self.handle_store_metrics(message), Store::TraceItem(message) => self.handle_store_trace_item(message), Store::Span(message) => self.handle_store_span(message), + Store::ProfileChunk(message) => self.handle_store_profile_chunk(message), } }) } @@ -396,13 +427,6 @@ impl StoreService { "StoreService received unsupported item type '{ty}' in envelope" ); } - ItemType::ProfileChunk => self.produce_profile_chunk( - scoping.organization_id, - scoping.project_id, - received_at, - retention, - item, - )?, other => { let event_type = event_item.as_ref().map(|item| item.ty().as_str()); let item_types = envelope @@ -666,6 +690,27 @@ impl StoreService { } } + fn handle_store_profile_chunk(&self, message: Managed) { + let scoping = message.scoping(); + let received_at = message.received_at(); + + let _ = message.try_accept(|message| { + let message = ProfileChunkKafkaMessage { + organization_id: scoping.organization_id, + project_id: scoping.project_id, + received: safe_timestamp(received_at), + retention_days: message.retention_days, + headers: BTreeMap::from([( + "project_id".to_owned(), + scoping.project_id.to_string(), + )]), + payload: message.payload, + }; + + self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message)) + }); + } + fn create_metric_message<'a>( &self, organization_id: OrganizationId, @@ -1148,26 +1193,6 @@ impl StoreService { Ok(()) } - - fn produce_profile_chunk( - &self, - organization_id: OrganizationId, - project_id: ProjectId, - received_at: DateTime, - retention_days: u16, - item: &Item, - ) -> Result<(), StoreError> { - let message = ProfileChunkKafkaMessage { - organization_id, - project_id, - received: safe_timestamp(received_at), - retention_days, - headers: BTreeMap::from([("project_id".to_owned(), project_id.to_string())]), - payload: item.payload(), - }; - self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))?; - Ok(()) - } } impl Service for StoreService {