From f1b917e20e152792c2e8aeb701a28fea14a05142 Mon Sep 17 00:00:00 2001 From: blaginin Date: Thu, 19 Mar 2026 17:34:05 +0000 Subject: [PATCH 1/6] `AggregateFnRef` [de]serialize Signed-off-by: blaginin --- vortex-array/src/aggregate_fn/erased.rs | 36 +++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/vortex-array/src/aggregate_fn/erased.rs b/vortex-array/src/aggregate_fn/erased.rs index 750a7c24f77..42264392f17 100644 --- a/vortex-array/src/aggregate_fn/erased.rs +++ b/vortex-array/src/aggregate_fn/erased.rs @@ -10,8 +10,12 @@ use std::hash::Hash; use std::hash::Hasher; use std::sync::Arc; +use arcref::ArcRef; use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_session::VortexSession; use vortex_utils::debug_with::DebugWith; use crate::aggregate_fn::AccumulatorRef; @@ -19,6 +23,7 @@ use crate::aggregate_fn::AggregateFnId; use crate::aggregate_fn::AggregateFnVTable; use crate::aggregate_fn::GroupedAccumulatorRef; use crate::aggregate_fn::options::AggregateFnOptions; +use crate::aggregate_fn::session::AggregateFnSessionExt; use crate::aggregate_fn::typed::AggregateFnInner; use crate::aggregate_fn::typed::DynAggregateFn; use crate::dtype::DType; @@ -34,6 +39,37 @@ use crate::dtype::DType; pub struct AggregateFnRef(pub(super) Arc); impl AggregateFnRef { + /// Deserialize an aggregate function from its ID and serialized metadata bytes. + /// + /// Looks up the aggregate function plugin by ID in the session's registry + /// and delegates deserialization to it. + pub fn deserialize(id: &str, metadata: &[u8], session: &VortexSession) -> VortexResult { + let agg_fn_id: AggregateFnId = ArcRef::new_arc(Arc::from(id)); + let plugin = session + .aggregate_fns() + .registry() + .find(&agg_fn_id) + .ok_or_else(|| vortex_err!("unknown aggregate function id: {}", id))?; + let agg_fn = plugin.deserialize(metadata, session)?; + + if agg_fn.id() != agg_fn_id { + vortex_bail!( + "Aggregate function ID mismatch: expected {}, got {}", + agg_fn_id, + agg_fn.id() + ); + } + + Ok(agg_fn) + } + + /// Serialize this aggregate function's options to bytes. + /// + /// Returns `Ok(None)` if the function is not serializable. + pub fn serialize(&self) -> VortexResult>> { + self.0.options_serialize() + } + /// Returns the ID of this aggregate function. pub fn id(&self) -> AggregateFnId { self.0.id() From 1eefd67fdaa4451ad390abdd5062efb51c42dd8a Mon Sep 17 00:00:00 2001 From: blaginin Date: Thu, 19 Mar 2026 17:39:20 +0000 Subject: [PATCH 2/6] api lockfile Signed-off-by: blaginin --- vortex-array/public-api.lock | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index a1e370d08e9..de8faef557d 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -488,6 +488,8 @@ pub fn vortex_array::aggregate_fn::AggregateFnRef::as_opt vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::AggregateFnRef::deserialize(id: &str, metadata: &[u8], session: &vortex_session::VortexSession) -> vortex_error::VortexResult + pub fn vortex_array::aggregate_fn::AggregateFnRef::id(&self) -> vortex_array::aggregate_fn::AggregateFnId pub fn vortex_array::aggregate_fn::AggregateFnRef::is(&self) -> bool @@ -496,6 +498,8 @@ pub fn vortex_array::aggregate_fn::AggregateFnRef::options(&self) -> vortex_arra pub fn vortex_array::aggregate_fn::AggregateFnRef::return_dtype(&self, input_dtype: &vortex_array::dtype::DType) -> core::option::Option +pub fn vortex_array::aggregate_fn::AggregateFnRef::serialize(&self) -> vortex_error::VortexResult>> + pub fn vortex_array::aggregate_fn::AggregateFnRef::state_dtype(&self, input_dtype: &vortex_array::dtype::DType) -> core::option::Option pub fn vortex_array::aggregate_fn::AggregateFnRef::vtable_ref(&self) -> core::option::Option<&V> From df97bacb289332a4a87961e204c97db5c07c0ee0 Mon Sep 17 00:00:00 2001 From: blaginin Date: Thu, 19 Mar 2026 17:59:03 +0000 Subject: [PATCH 3/6] do like Expr Signed-off-by: blaginin --- vortex-array/public-api.lock | 26 ++++-- vortex-array/src/aggregate_fn/erased.rs | 36 -------- vortex-array/src/aggregate_fn/fns/sum/mod.rs | 12 +++ vortex-array/src/aggregate_fn/mod.rs | 1 + vortex-array/src/aggregate_fn/proto.rs | 95 ++++++++++++++++++++ vortex-proto/proto/expr.proto | 6 ++ vortex-proto/public-api.lock | 40 +++++++++ vortex-proto/src/generated/vortex.expr.rs | 8 ++ 8 files changed, 182 insertions(+), 42 deletions(-) create mode 100644 vortex-array/src/aggregate_fn/proto.rs diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index de8faef557d..aec2901aad3 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -368,7 +368,7 @@ pub fn vortex_array::aggregate_fn::fns::sum::Sum::reset(&self, partial: &mut Sel pub fn vortex_array::aggregate_fn::fns::sum::Sum::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> core::option::Option -pub fn vortex_array::aggregate_fn::fns::sum::Sum::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult>> +pub fn vortex_array::aggregate_fn::fns::sum::Sum::serialize(&self, _options: &Self::Options) -> vortex_error::VortexResult>> pub fn vortex_array::aggregate_fn::fns::sum::Sum::to_scalar(&self, partial: &Self::Partial) -> vortex_error::VortexResult @@ -388,6 +388,16 @@ pub fn vortex_array::aggregate_fn::kernels::DynGroupedAggregateKernel::grouped_a pub fn vortex_array::aggregate_fn::kernels::DynGroupedAggregateKernel::grouped_aggregate_fixed_size(&self, aggregate_fn: &vortex_array::aggregate_fn::AggregateFnRef, groups: &vortex_array::arrays::FixedSizeListArray) -> vortex_error::VortexResult> +pub mod vortex_array::aggregate_fn::proto + +pub trait vortex_array::aggregate_fn::proto::AggregateFnSerializeProtoExt + +pub fn vortex_array::aggregate_fn::proto::AggregateFnSerializeProtoExt::serialize_proto(&self) -> vortex_error::VortexResult + +impl vortex_array::aggregate_fn::proto::AggregateFnSerializeProtoExt for vortex_array::aggregate_fn::AggregateFnRef + +pub fn vortex_array::aggregate_fn::AggregateFnRef::serialize_proto(&self) -> vortex_error::VortexResult + pub mod vortex_array::aggregate_fn::session pub struct vortex_array::aggregate_fn::session::AggregateFnSession @@ -488,8 +498,6 @@ pub fn vortex_array::aggregate_fn::AggregateFnRef::as_opt vortex_error::VortexResult -pub fn vortex_array::aggregate_fn::AggregateFnRef::deserialize(id: &str, metadata: &[u8], session: &vortex_session::VortexSession) -> vortex_error::VortexResult - pub fn vortex_array::aggregate_fn::AggregateFnRef::id(&self) -> vortex_array::aggregate_fn::AggregateFnId pub fn vortex_array::aggregate_fn::AggregateFnRef::is(&self) -> bool @@ -498,12 +506,14 @@ pub fn vortex_array::aggregate_fn::AggregateFnRef::options(&self) -> vortex_arra pub fn vortex_array::aggregate_fn::AggregateFnRef::return_dtype(&self, input_dtype: &vortex_array::dtype::DType) -> core::option::Option -pub fn vortex_array::aggregate_fn::AggregateFnRef::serialize(&self) -> vortex_error::VortexResult>> - pub fn vortex_array::aggregate_fn::AggregateFnRef::state_dtype(&self, input_dtype: &vortex_array::dtype::DType) -> core::option::Option pub fn vortex_array::aggregate_fn::AggregateFnRef::vtable_ref(&self) -> core::option::Option<&V> +impl vortex_array::aggregate_fn::AggregateFnRef + +pub fn vortex_array::aggregate_fn::AggregateFnRef::from_proto(proto: &vortex_proto::expr::AggregateFn, session: &vortex_session::VortexSession) -> vortex_error::VortexResult + impl core::clone::Clone for vortex_array::aggregate_fn::AggregateFnRef pub fn vortex_array::aggregate_fn::AggregateFnRef::clone(&self) -> vortex_array::aggregate_fn::AggregateFnRef @@ -526,6 +536,10 @@ impl core::hash::Hash for vortex_array::aggregate_fn::AggregateFnRef pub fn vortex_array::aggregate_fn::AggregateFnRef::hash(&self, state: &mut H) +impl vortex_array::aggregate_fn::proto::AggregateFnSerializeProtoExt for vortex_array::aggregate_fn::AggregateFnRef + +pub fn vortex_array::aggregate_fn::AggregateFnRef::serialize_proto(&self) -> vortex_error::VortexResult + pub struct vortex_array::aggregate_fn::EmptyOptions impl core::clone::Clone for vortex_array::aggregate_fn::EmptyOptions @@ -778,7 +792,7 @@ pub fn vortex_array::aggregate_fn::fns::sum::Sum::reset(&self, partial: &mut Sel pub fn vortex_array::aggregate_fn::fns::sum::Sum::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> core::option::Option -pub fn vortex_array::aggregate_fn::fns::sum::Sum::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult>> +pub fn vortex_array::aggregate_fn::fns::sum::Sum::serialize(&self, _options: &Self::Options) -> vortex_error::VortexResult>> pub fn vortex_array::aggregate_fn::fns::sum::Sum::to_scalar(&self, partial: &Self::Partial) -> vortex_error::VortexResult diff --git a/vortex-array/src/aggregate_fn/erased.rs b/vortex-array/src/aggregate_fn/erased.rs index 42264392f17..750a7c24f77 100644 --- a/vortex-array/src/aggregate_fn/erased.rs +++ b/vortex-array/src/aggregate_fn/erased.rs @@ -10,12 +10,8 @@ use std::hash::Hash; use std::hash::Hasher; use std::sync::Arc; -use arcref::ArcRef; use vortex_error::VortexExpect; use vortex_error::VortexResult; -use vortex_error::vortex_bail; -use vortex_error::vortex_err; -use vortex_session::VortexSession; use vortex_utils::debug_with::DebugWith; use crate::aggregate_fn::AccumulatorRef; @@ -23,7 +19,6 @@ use crate::aggregate_fn::AggregateFnId; use crate::aggregate_fn::AggregateFnVTable; use crate::aggregate_fn::GroupedAccumulatorRef; use crate::aggregate_fn::options::AggregateFnOptions; -use crate::aggregate_fn::session::AggregateFnSessionExt; use crate::aggregate_fn::typed::AggregateFnInner; use crate::aggregate_fn::typed::DynAggregateFn; use crate::dtype::DType; @@ -39,37 +34,6 @@ use crate::dtype::DType; pub struct AggregateFnRef(pub(super) Arc); impl AggregateFnRef { - /// Deserialize an aggregate function from its ID and serialized metadata bytes. - /// - /// Looks up the aggregate function plugin by ID in the session's registry - /// and delegates deserialization to it. - pub fn deserialize(id: &str, metadata: &[u8], session: &VortexSession) -> VortexResult { - let agg_fn_id: AggregateFnId = ArcRef::new_arc(Arc::from(id)); - let plugin = session - .aggregate_fns() - .registry() - .find(&agg_fn_id) - .ok_or_else(|| vortex_err!("unknown aggregate function id: {}", id))?; - let agg_fn = plugin.deserialize(metadata, session)?; - - if agg_fn.id() != agg_fn_id { - vortex_bail!( - "Aggregate function ID mismatch: expected {}, got {}", - agg_fn_id, - agg_fn.id() - ); - } - - Ok(agg_fn) - } - - /// Serialize this aggregate function's options to bytes. - /// - /// Returns `Ok(None)` if the function is not serializable. - pub fn serialize(&self) -> VortexResult>> { - self.0.options_serialize() - } - /// Returns the ID of this aggregate function. pub fn id(&self) -> AggregateFnId { self.0.id() diff --git a/vortex-array/src/aggregate_fn/fns/sum/mod.rs b/vortex-array/src/aggregate_fn/fns/sum/mod.rs index 49706ca56c0..51922ae1dd7 100644 --- a/vortex-array/src/aggregate_fn/fns/sum/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/sum/mod.rs @@ -74,6 +74,18 @@ impl AggregateFnVTable for Sum { AggregateFnId::new_ref("vortex.sum") } + fn serialize(&self, _options: &Self::Options) -> VortexResult>> { + Ok(Some(vec![])) + } + + fn deserialize( + &self, + _metadata: &[u8], + _session: &vortex_session::VortexSession, + ) -> VortexResult { + Ok(EmptyOptions) + } + fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option { // When a sum overflows, we return a sum _value_ of null. Therefore, we all return dtypes // are nullable. diff --git a/vortex-array/src/aggregate_fn/mod.rs b/vortex-array/src/aggregate_fn/mod.rs index 4f89d5a4178..0dad56f3222 100644 --- a/vortex-array/src/aggregate_fn/mod.rs +++ b/vortex-array/src/aggregate_fn/mod.rs @@ -31,6 +31,7 @@ pub use options::*; pub mod fns; pub mod kernels; +pub mod proto; pub mod session; /// A unique identifier for an aggregate function. diff --git a/vortex-array/src/aggregate_fn/proto.rs b/vortex-array/src/aggregate_fn/proto.rs new file mode 100644 index 00000000000..c5060f9d08b --- /dev/null +++ b/vortex-array/src/aggregate_fn/proto.rs @@ -0,0 +1,95 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use arcref::ArcRef; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_proto::expr as pb; +use vortex_session::VortexSession; + +use crate::aggregate_fn::AggregateFnId; +use crate::aggregate_fn::AggregateFnRef; +use crate::aggregate_fn::session::AggregateFnSessionExt; + +/// Extension trait for serializing an [`AggregateFnRef`] to protobuf. +/// +/// Note: the serialization format is not stable and may change between versions. +pub trait AggregateFnSerializeProtoExt { + /// Serialize this aggregate function to its protobuf representation. + fn serialize_proto(&self) -> VortexResult; +} + +impl AggregateFnSerializeProtoExt for AggregateFnRef { + fn serialize_proto(&self) -> VortexResult { + let metadata = self + .options() + .serialize()? + .ok_or_else(|| vortex_err!("Aggregate function '{}' is not serializable", self.id()))?; + + Ok(pb::AggregateFn { + id: self.id().to_string(), + metadata: Some(metadata), + }) + } +} + +impl AggregateFnRef { + /// Deserialize an aggregate function from its protobuf representation. + /// + /// Looks up the aggregate function plugin by ID in the session's registry + /// and delegates deserialization to it. + /// + /// Note: the serialization format is not stable and may change between versions. + pub fn from_proto(proto: &pb::AggregateFn, session: &VortexSession) -> VortexResult { + let agg_fn_id: AggregateFnId = ArcRef::new_arc(Arc::from(proto.id.as_str())); + let plugin = session + .aggregate_fns() + .registry() + .find(&agg_fn_id) + .ok_or_else(|| vortex_err!("unknown aggregate function id: {}", proto.id))?; + let agg_fn = plugin.deserialize(proto.metadata(), session)?; + + if agg_fn.id() != agg_fn_id { + vortex_bail!( + "Aggregate function ID mismatch: expected {}, got {}", + agg_fn_id, + agg_fn.id() + ); + } + + Ok(agg_fn) + } +} + +#[cfg(test)] +mod tests { + use prost::Message; + use vortex_proto::expr as pb; + use vortex_session::VortexSession; + + use super::AggregateFnSerializeProtoExt; + use crate::aggregate_fn::AggregateFnRef; + use crate::aggregate_fn::AggregateFnVTableExt; + use crate::aggregate_fn::EmptyOptions; + use crate::aggregate_fn::fns::sum::Sum; + use crate::aggregate_fn::session::AggregateFnSession; + use crate::aggregate_fn::session::AggregateFnSessionExt; + + #[test] + fn aggregate_fn_serde() { + let session = VortexSession::empty().with::(); + session.aggregate_fns().register(Sum); + + let agg_fn = Sum.bind(EmptyOptions); + + let serialized = agg_fn.serialize_proto().unwrap(); + let buf = serialized.encode_to_vec(); + let deserialized_proto = pb::AggregateFn::decode(buf.as_slice()).unwrap(); + let deserialized = AggregateFnRef::from_proto(&deserialized_proto, &session).unwrap(); + + assert_eq!(deserialized, agg_fn); + } +} diff --git a/vortex-proto/proto/expr.proto b/vortex-proto/proto/expr.proto index 3b47db2a756..73ba7209a15 100644 --- a/vortex-proto/proto/expr.proto +++ b/vortex-proto/proto/expr.proto @@ -20,6 +20,12 @@ message Expr { optional bytes metadata = 3; } +// Captures a serialized aggregate function with its ID and options metadata. +message AggregateFn { + string id = 1; + optional bytes metadata = 2; +} + // Options for `vortex.literal` message LiteralOpts { vortex.scalar.Scalar value = 1; diff --git a/vortex-proto/public-api.lock b/vortex-proto/public-api.lock index 3282170a2bf..045b53d17eb 100644 --- a/vortex-proto/public-api.lock +++ b/vortex-proto/public-api.lock @@ -768,6 +768,46 @@ pub fn vortex_proto::expr::select_opts::Opts::hash<__H: core::hash::Hasher>(&sel impl core::marker::StructuralPartialEq for vortex_proto::expr::select_opts::Opts +pub struct vortex_proto::expr::AggregateFn + +pub vortex_proto::expr::AggregateFn::id: alloc::string::String + +pub vortex_proto::expr::AggregateFn::metadata: core::option::Option> + +impl vortex_proto::expr::AggregateFn + +pub fn vortex_proto::expr::AggregateFn::metadata(&self) -> &[u8] + +impl core::clone::Clone for vortex_proto::expr::AggregateFn + +pub fn vortex_proto::expr::AggregateFn::clone(&self) -> vortex_proto::expr::AggregateFn + +impl core::cmp::Eq for vortex_proto::expr::AggregateFn + +impl core::cmp::PartialEq for vortex_proto::expr::AggregateFn + +pub fn vortex_proto::expr::AggregateFn::eq(&self, other: &vortex_proto::expr::AggregateFn) -> bool + +impl core::default::Default for vortex_proto::expr::AggregateFn + +pub fn vortex_proto::expr::AggregateFn::default() -> Self + +impl core::fmt::Debug for vortex_proto::expr::AggregateFn + +pub fn vortex_proto::expr::AggregateFn::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::hash::Hash for vortex_proto::expr::AggregateFn + +pub fn vortex_proto::expr::AggregateFn::hash<__H: core::hash::Hasher>(&self, state: &mut __H) + +impl core::marker::StructuralPartialEq for vortex_proto::expr::AggregateFn + +impl prost::message::Message for vortex_proto::expr::AggregateFn + +pub fn vortex_proto::expr::AggregateFn::clear(&mut self) + +pub fn vortex_proto::expr::AggregateFn::encoded_len(&self) -> usize + pub struct vortex_proto::expr::BetweenOpts pub vortex_proto::expr::BetweenOpts::lower_strict: bool diff --git a/vortex-proto/src/generated/vortex.expr.rs b/vortex-proto/src/generated/vortex.expr.rs index 9bc61475e59..9c7ddb1d90c 100644 --- a/vortex-proto/src/generated/vortex.expr.rs +++ b/vortex-proto/src/generated/vortex.expr.rs @@ -11,6 +11,14 @@ pub struct Expr { #[prost(bytes = "vec", optional, tag = "3")] pub metadata: ::core::option::Option<::prost::alloc::vec::Vec>, } +/// Captures a serialized aggregate function with its ID and options metadata. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct AggregateFn { + #[prost(string, tag = "1")] + pub id: ::prost::alloc::string::String, + #[prost(bytes = "vec", optional, tag = "2")] + pub metadata: ::core::option::Option<::prost::alloc::vec::Vec>, +} /// Options for `vortex.literal` #[derive(Clone, PartialEq, ::prost::Message)] pub struct LiteralOpts { From 4568eec78da01cdcf66c11c92424b2f5b615635f Mon Sep 17 00:00:00 2001 From: blaginin Date: Thu, 19 Mar 2026 19:27:13 +0000 Subject: [PATCH 4/6] Add default aggregates Co-authored-by: Nicholas Gates Signed-off-by: blaginin --- vortex-array/src/aggregate_fn/session.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/vortex-array/src/aggregate_fn/session.rs b/vortex-array/src/aggregate_fn/session.rs index 0fc54f81052..6e85ee97b6b 100644 --- a/vortex-array/src/aggregate_fn/session.rs +++ b/vortex-array/src/aggregate_fn/session.rs @@ -15,6 +15,8 @@ use crate::aggregate_fn::AggregateFnVTable; use crate::aggregate_fn::fns::is_constant::IsConstant; use crate::aggregate_fn::fns::is_sorted::IsSorted; use crate::aggregate_fn::fns::min_max::MinMax; +use crate::aggregate_fn::fns::nan_count::NanCount; +use crate::aggregate_fn::fns::sum::Sum; use crate::aggregate_fn::kernels::DynAggregateKernel; use crate::aggregate_fn::kernels::DynGroupedAggregateKernel; use crate::arrays::Chunked; @@ -47,6 +49,13 @@ impl Default for AggregateFnSession { grouped_kernels: RwLock::new(HashMap::default()), }; + // Register the built-in aggregate functions + this.register(IsConstant); + this.register(IsSorted); + this.register(MinMax); + this.register(NanCount); + this.register(Sum); + // Register the built-in aggregate kernels. this.register_aggregate_kernel(Chunked::ID, None, &ChunkedArrayAggregate); this.register_aggregate_kernel(Dict::ID, Some(MinMax.id()), &DictMinMaxKernel); From 18761da9be73a36487c25962b5df38cac26f262b Mon Sep 17 00:00:00 2001 From: blaginin Date: Thu, 19 Mar 2026 19:32:05 +0000 Subject: [PATCH 5/6] Serialize all aggegate fns Co-authored-by: Claude Signed-off-by: blaginin --- .../src/aggregate_fn/fns/is_constant/mod.rs | 12 ++++++++++ .../src/aggregate_fn/fns/is_sorted/mod.rs | 24 +++++++++++++++++++ .../src/aggregate_fn/fns/min_max/mod.rs | 12 ++++++++++ .../src/aggregate_fn/fns/nan_count/mod.rs | 12 ++++++++++ 4 files changed, 60 insertions(+) diff --git a/vortex-array/src/aggregate_fn/fns/is_constant/mod.rs b/vortex-array/src/aggregate_fn/fns/is_constant/mod.rs index 98e7ff4bde3..b670c52b16e 100644 --- a/vortex-array/src/aggregate_fn/fns/is_constant/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/is_constant/mod.rs @@ -259,6 +259,18 @@ impl AggregateFnVTable for IsConstant { AggregateFnId::new_ref("vortex.is_constant") } + fn serialize(&self, _options: &Self::Options) -> VortexResult>> { + Ok(Some(vec![])) + } + + fn deserialize( + &self, + _metadata: &[u8], + _session: &vortex_session::VortexSession, + ) -> VortexResult { + Ok(EmptyOptions) + } + fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option { match input_dtype { DType::Null => None, diff --git a/vortex-array/src/aggregate_fn/fns/is_sorted/mod.rs b/vortex-array/src/aggregate_fn/fns/is_sorted/mod.rs index 8a9c4e295f2..85efc336881 100644 --- a/vortex-array/src/aggregate_fn/fns/is_sorted/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/is_sorted/mod.rs @@ -13,6 +13,7 @@ use std::fmt::Formatter; use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_error::vortex_bail; use self::bool::check_bool_sorted; use self::decimal::check_decimal_sorted; @@ -231,6 +232,29 @@ impl AggregateFnVTable for IsSorted { AggregateFnId::new_ref("vortex.is_sorted") } + fn serialize(&self, options: &Self::Options) -> VortexResult>> { + Ok(Some(vec![u8::from(options.strict)])) + } + + fn deserialize( + &self, + metadata: &[u8], + _session: &vortex_session::VortexSession, + ) -> VortexResult { + let &[strict_byte] = metadata else { + vortex_bail!( + "IsSorted: expected 1 byte of metadata, got {}", + metadata.len() + ); + }; + let strict = match strict_byte { + 0 => false, + 1 => true, + _ => vortex_bail!("IsSorted: expected 0 or 1 for strict, got {}", strict_byte), + }; + Ok(IsSortedOptions { strict }) + } + fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option { match input_dtype { DType::Null | DType::Struct(..) | DType::List(..) | DType::FixedSizeList(..) => None, diff --git a/vortex-array/src/aggregate_fn/fns/min_max/mod.rs b/vortex-array/src/aggregate_fn/fns/min_max/mod.rs index 2241cefcc3b..b312a8ce5eb 100644 --- a/vortex-array/src/aggregate_fn/fns/min_max/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/min_max/mod.rs @@ -176,6 +176,18 @@ impl AggregateFnVTable for MinMax { AggregateFnId::new_ref("vortex.min_max") } + fn serialize(&self, _options: &Self::Options) -> VortexResult>> { + Ok(Some(vec![])) + } + + fn deserialize( + &self, + _metadata: &[u8], + _session: &vortex_session::VortexSession, + ) -> VortexResult { + Ok(EmptyOptions) + } + fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option { match input_dtype { DType::Bool(_) diff --git a/vortex-array/src/aggregate_fn/fns/nan_count/mod.rs b/vortex-array/src/aggregate_fn/fns/nan_count/mod.rs index f60d50f2a20..e1e3cb495c0 100644 --- a/vortex-array/src/aggregate_fn/fns/nan_count/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/nan_count/mod.rs @@ -84,6 +84,18 @@ impl AggregateFnVTable for NanCount { AggregateFnId::new_ref("vortex.nan_count") } + fn serialize(&self, _options: &Self::Options) -> VortexResult>> { + Ok(Some(vec![])) + } + + fn deserialize( + &self, + _metadata: &[u8], + _session: &vortex_session::VortexSession, + ) -> VortexResult { + Ok(EmptyOptions) + } + fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option { if let DType::Primitive(ptype, ..) = input_dtype && ptype.is_float() From ab9c381f90943ae88d990fbd88efb7beb7c86d77 Mon Sep 17 00:00:00 2001 From: blaginin Date: Thu, 19 Mar 2026 19:35:54 +0000 Subject: [PATCH 6/6] no `AggregateFnSerializeProtoExt` Signed-off-by: blaginin --- vortex-array/public-api.lock | 30 +++++++++----------------- vortex-array/src/aggregate_fn/proto.rs | 16 ++++---------- 2 files changed, 14 insertions(+), 32 deletions(-) diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index aec2901aad3..b72ced1ebf3 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -82,7 +82,7 @@ pub fn vortex_array::aggregate_fn::fns::is_constant::IsConstant::reset(&self, pa pub fn vortex_array::aggregate_fn::fns::is_constant::IsConstant::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> core::option::Option -pub fn vortex_array::aggregate_fn::fns::is_constant::IsConstant::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult>> +pub fn vortex_array::aggregate_fn::fns::is_constant::IsConstant::serialize(&self, _options: &Self::Options) -> vortex_error::VortexResult>> pub fn vortex_array::aggregate_fn::fns::is_constant::IsConstant::to_scalar(&self, partial: &Self::Partial) -> vortex_error::VortexResult @@ -120,7 +120,7 @@ pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::coerce_args(&self, pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::combine_partials(&self, partial: &mut Self::Partial, other: vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()> -pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::deserialize(&self, _metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::deserialize(&self, metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::empty_partial(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult @@ -228,7 +228,7 @@ pub fn vortex_array::aggregate_fn::fns::min_max::MinMax::reset(&self, partial: & pub fn vortex_array::aggregate_fn::fns::min_max::MinMax::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> core::option::Option -pub fn vortex_array::aggregate_fn::fns::min_max::MinMax::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult>> +pub fn vortex_array::aggregate_fn::fns::min_max::MinMax::serialize(&self, _options: &Self::Options) -> vortex_error::VortexResult>> pub fn vortex_array::aggregate_fn::fns::min_max::MinMax::to_scalar(&self, partial: &Self::Partial) -> vortex_error::VortexResult @@ -306,7 +306,7 @@ pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::reset(&self, partia pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> core::option::Option -pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult>> +pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::serialize(&self, _options: &Self::Options) -> vortex_error::VortexResult>> pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::to_scalar(&self, partial: &Self::Partial) -> vortex_error::VortexResult @@ -390,14 +390,6 @@ pub fn vortex_array::aggregate_fn::kernels::DynGroupedAggregateKernel::grouped_a pub mod vortex_array::aggregate_fn::proto -pub trait vortex_array::aggregate_fn::proto::AggregateFnSerializeProtoExt - -pub fn vortex_array::aggregate_fn::proto::AggregateFnSerializeProtoExt::serialize_proto(&self) -> vortex_error::VortexResult - -impl vortex_array::aggregate_fn::proto::AggregateFnSerializeProtoExt for vortex_array::aggregate_fn::AggregateFnRef - -pub fn vortex_array::aggregate_fn::AggregateFnRef::serialize_proto(&self) -> vortex_error::VortexResult - pub mod vortex_array::aggregate_fn::session pub struct vortex_array::aggregate_fn::session::AggregateFnSession @@ -514,6 +506,8 @@ impl vortex_array::aggregate_fn::AggregateFnRef pub fn vortex_array::aggregate_fn::AggregateFnRef::from_proto(proto: &vortex_proto::expr::AggregateFn, session: &vortex_session::VortexSession) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::AggregateFnRef::serialize_proto(&self) -> vortex_error::VortexResult + impl core::clone::Clone for vortex_array::aggregate_fn::AggregateFnRef pub fn vortex_array::aggregate_fn::AggregateFnRef::clone(&self) -> vortex_array::aggregate_fn::AggregateFnRef @@ -536,10 +530,6 @@ impl core::hash::Hash for vortex_array::aggregate_fn::AggregateFnRef pub fn vortex_array::aggregate_fn::AggregateFnRef::hash(&self, state: &mut H) -impl vortex_array::aggregate_fn::proto::AggregateFnSerializeProtoExt for vortex_array::aggregate_fn::AggregateFnRef - -pub fn vortex_array::aggregate_fn::AggregateFnRef::serialize_proto(&self) -> vortex_error::VortexResult - pub struct vortex_array::aggregate_fn::EmptyOptions impl core::clone::Clone for vortex_array::aggregate_fn::EmptyOptions @@ -656,7 +646,7 @@ pub fn vortex_array::aggregate_fn::fns::is_constant::IsConstant::reset(&self, pa pub fn vortex_array::aggregate_fn::fns::is_constant::IsConstant::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> core::option::Option -pub fn vortex_array::aggregate_fn::fns::is_constant::IsConstant::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult>> +pub fn vortex_array::aggregate_fn::fns::is_constant::IsConstant::serialize(&self, _options: &Self::Options) -> vortex_error::VortexResult>> pub fn vortex_array::aggregate_fn::fns::is_constant::IsConstant::to_scalar(&self, partial: &Self::Partial) -> vortex_error::VortexResult @@ -672,7 +662,7 @@ pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::coerce_args(&self, pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::combine_partials(&self, partial: &mut Self::Partial, other: vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()> -pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::deserialize(&self, _metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::deserialize(&self, metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::empty_partial(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult @@ -724,7 +714,7 @@ pub fn vortex_array::aggregate_fn::fns::min_max::MinMax::reset(&self, partial: & pub fn vortex_array::aggregate_fn::fns::min_max::MinMax::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> core::option::Option -pub fn vortex_array::aggregate_fn::fns::min_max::MinMax::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult>> +pub fn vortex_array::aggregate_fn::fns::min_max::MinMax::serialize(&self, _options: &Self::Options) -> vortex_error::VortexResult>> pub fn vortex_array::aggregate_fn::fns::min_max::MinMax::to_scalar(&self, partial: &Self::Partial) -> vortex_error::VortexResult @@ -758,7 +748,7 @@ pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::reset(&self, partia pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> core::option::Option -pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult>> +pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::serialize(&self, _options: &Self::Options) -> vortex_error::VortexResult>> pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::to_scalar(&self, partial: &Self::Partial) -> vortex_error::VortexResult diff --git a/vortex-array/src/aggregate_fn/proto.rs b/vortex-array/src/aggregate_fn/proto.rs index c5060f9d08b..aac2d4982d8 100644 --- a/vortex-array/src/aggregate_fn/proto.rs +++ b/vortex-array/src/aggregate_fn/proto.rs @@ -14,16 +14,11 @@ use crate::aggregate_fn::AggregateFnId; use crate::aggregate_fn::AggregateFnRef; use crate::aggregate_fn::session::AggregateFnSessionExt; -/// Extension trait for serializing an [`AggregateFnRef`] to protobuf. -/// -/// Note: the serialization format is not stable and may change between versions. -pub trait AggregateFnSerializeProtoExt { +impl AggregateFnRef { /// Serialize this aggregate function to its protobuf representation. - fn serialize_proto(&self) -> VortexResult; -} - -impl AggregateFnSerializeProtoExt for AggregateFnRef { - fn serialize_proto(&self) -> VortexResult { + /// + /// Note: the serialization format is not stable and may change between versions. + pub fn serialize_proto(&self) -> VortexResult { let metadata = self .options() .serialize()? @@ -34,9 +29,7 @@ impl AggregateFnSerializeProtoExt for AggregateFnRef { metadata: Some(metadata), }) } -} -impl AggregateFnRef { /// Deserialize an aggregate function from its protobuf representation. /// /// Looks up the aggregate function plugin by ID in the session's registry @@ -70,7 +63,6 @@ mod tests { use vortex_proto::expr as pb; use vortex_session::VortexSession; - use super::AggregateFnSerializeProtoExt; use crate::aggregate_fn::AggregateFnRef; use crate::aggregate_fn::AggregateFnVTableExt; use crate::aggregate_fn::EmptyOptions;