Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions crates/control-plane-api/src/discovers/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ pub fn merge_collections(
// Collection is using separate read & write schemas.
// The read schema has been initialized and we do not modify it.
&mut draft_model.write_schema
} else if let Some(initial_read_schema) = initializes_read_schema(&connector_schema) {
} else if let Some(initial_read_schema) = initial_read_schema(&connector_schema) {
// Migrate singular `schema` into separate read & write schemas.
draft_model.write_schema = draft_model.schema.take();
draft_model.read_schema = Some(models::Schema::new(models::RawValue::from_value(
Expand Down Expand Up @@ -359,21 +359,24 @@ pub fn merge_collections(
Ok(modified_collections)
}

fn initializes_read_schema(schema: &models::Schema) -> Option<serde_json::Value> {
// Does the connector provide an explicit initial read schema?
if let Some(initial) = schema.to_value().get(X_INITIAL_READ_SCHEMA) {
return Some(initial.clone());
/// Detects whether given schema uses any of the features that provide an initial read schema.
fn initial_read_schema(schema: &models::Schema) -> Option<serde_json::Value> {
let mut schema_value = schema.to_value();

// First, check if `x-initial-read-schema` is set to directly provide the initial read schema.
if let serde_json::Value::Object(ref mut map) = schema_value {
if let Some(extension @ serde_json::Value::Object(_)) = map.remove(X_INITIAL_READ_SCHEMA) {
return Some(extension);
}
}

// Does the connector use the legacy schema inference annotation?
// If so, initialize with a default read schema.
if let Some(serde_json::Value::Bool(true)) = schema.to_value().get(X_INFER_SCHEMA) {
return Some(serde_json::json!({
"allOf": [
{"$ref": models::Schema::REF_RELAXED_WRITE_SCHEMA_URL},
{"$ref": models::Schema::REF_INFERRED_SCHEMA_URL},
],
}));
// Then check if `x-infer-schema` is set to true, which causes the initial read schema to be set
// to a value that dynamically resolves to the collection's inferred schema.
if matches!(
schema_value.get(X_INFER_SCHEMA),
Some(serde_json::Value::Bool(true))
) {
return Some(models::Schema::default_inferred_read_schema().to_value());
}

None
Expand Down
4 changes: 2 additions & 2 deletions crates/doc/src/shape/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ pub const X_INFER_SCHEMA: &str = "x-infer-schema";
/// collection's initial read schema.
pub const X_INITIAL_READ_SCHEMA: &str = "x-initial-read-schema";

/// X_COMPLEXITY_LIMIT is a JSON-Schema annotation added to emitted inferred schemas that
/// allows for the modification of the default complexity limit applied to inferred schemas.
/// X_COMPLEXITY_LIMIT is a JSON-Schema annotation optionally added to binding schemas that
/// specifies the complexity limit applied to collection's inferred schema.
pub const X_COMPLEXITY_LIMIT: &str = "x-complexity-limit";

#[cfg(test)]
Expand Down
36 changes: 15 additions & 21 deletions crates/runtime/src/capture/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,19 @@ use proto_flow::runtime::{
capture_response_ext::{self, PollResult},
CaptureRequestExt,
};
use std::collections::{BTreeMap, HashSet};
use std::collections::BTreeMap;

// Does the connector have a meaningful write schema drawn from the source system plus SourcedSchema?
// If so, want to give it as much leeway as possible to infer the schema.
// Otherwise, use a lower complexity limit to avoid generating overly complex schemas.
// We may want to tune these limits further in the future, but this is a minimal starting point
// that leaves the door open for more complex heuristics in the future.
fn complexity_limit_for_binding(
binding_index: usize,
bindings_with_sourced_schema: &HashSet<usize>,
) -> usize {
if bindings_with_sourced_schema.contains(&binding_index) {
10_000
} else {
doc::shape::limits::DEFAULT_SCHEMA_COMPLEXITY_LIMIT
}
fn complexity_limit_for_binding(binding_index: usize, shapes: &[doc::Shape]) -> usize {
shapes[binding_index]
.annotations
.get(X_COMPLEXITY_LIMIT)
.and_then(|v| v.as_u64())
.unwrap_or(doc::shape::limits::DEFAULT_SCHEMA_COMPLEXITY_LIMIT as u64) as usize
}

pub async fn recv_client_unary(
Expand Down Expand Up @@ -212,7 +209,6 @@ pub fn send_client_captured_or_checkpoint(
task: &Task,
txn: &mut Transaction,
wb: &mut rocksdb::WriteBatch,
bindings_with_sourced_schema: &HashSet<usize>,
) -> Response {
let doc::combine::DrainedDoc { meta, root } = drained;

Expand Down Expand Up @@ -254,7 +250,7 @@ pub fn send_client_captured_or_checkpoint(
stats.bytes_total += doc_json.len() as u64;

if shapes[index].widen_owned(&root) {
let complexity_limit = complexity_limit_for_binding(index, bindings_with_sourced_schema);
let complexity_limit = complexity_limit_for_binding(index, shapes);

doc::shape::limits::enforce_shape_complexity_limit(
&mut shapes[index],
Expand Down Expand Up @@ -329,7 +325,6 @@ pub async fn recv_client_start_commit(
task: &Task,
txn: &Transaction,
mut wb: rocksdb::WriteBatch,
bindings_with_sourced_schema: &HashSet<usize>,
) -> anyhow::Result<()> {
let verify = verify("client", "StartCommit with runtime_checkpoint");
let request = verify.not_eof(request)?;
Expand Down Expand Up @@ -358,14 +353,7 @@ pub async fn recv_client_start_commit(
// produce structured logs of all inferred schemas that have changed
// in this transaction.
for binding in txn.updated_inferences.iter() {
let mut serialized = doc::shape::schema::to_schema(shapes[*binding].clone());

let complexity_limit = complexity_limit_for_binding(*binding, bindings_with_sourced_schema);

serialized.schema.extensions.insert(
X_COMPLEXITY_LIMIT.to_string(),
serde_json::Value::Number(serde_json::Number::from(complexity_limit)),
);
let serialized = doc::shape::schema::to_schema(shapes[*binding].clone());

tracing::info!(
schema = ?ops::DebugJson(serialized),
Expand Down Expand Up @@ -513,6 +501,12 @@ pub fn apply_sourced_schemas(
shapes[binding].annotations[crate::X_GENERATION_ID].clone(),
);

// Ratchet up complexity limit for bindings with sourced schemas
sourced_shape.annotations.insert(
X_COMPLEXITY_LIMIT.to_string(),
serde_json::Value::Number(serde_json::Number::from(10_000u64)),
);

shapes[binding] = doc::Shape::union(
std::mem::replace(&mut shapes[binding], doc::Shape::nothing()),
sourced_shape,
Expand Down
11 changes: 1 addition & 10 deletions crates/runtime/src/capture/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::future::FusedFuture;
use futures::stream::FusedStream;
use futures::{FutureExt, SinkExt, StreamExt, TryStreamExt};
use proto_flow::capture::{request, Request, Response};
use std::collections::{BTreeMap, HashSet};
use std::collections::BTreeMap;

#[tonic::async_trait]
impl<L: LogHandler> proto_grpc::capture::connector_server::Connector for Runtime<L> {
Expand Down Expand Up @@ -110,7 +110,6 @@ async fn serve_session<L: LogHandler>(
yield_rx,
));

let mut bindings_with_sourced_schema = HashSet::new();
let mut last_checkpoints: u32 = 0; // Checkpoints in the last transaction.
let mut buf = bytes::BytesMut::new();
loop {
Expand Down Expand Up @@ -180,12 +179,6 @@ async fn serve_session<L: LogHandler>(
// Atomic WriteBatch into which we'll stage connector and runtime state updates.
let mut wb = rocksdb::WriteBatch::default();

// Must do this before calling `apply_sourced_schemas` as that
// function clears out `txn.sourced_schemas`.
for (binding_id, _) in txn.sourced_schemas.iter() {
tracing::debug!(binding_id, "tracking sourced schema");
bindings_with_sourced_schema.insert(*binding_id);
}

// Apply sourced schemas to inference before we widen from documents.
// Assuming documents fit the source shape, this prevents unnecessary
Expand All @@ -200,7 +193,6 @@ async fn serve_session<L: LogHandler>(
&task,
&mut txn,
&mut wb,
&bindings_with_sourced_schema,
);
() = co.yield_(response).await;
}
Expand All @@ -216,7 +208,6 @@ async fn serve_session<L: LogHandler>(
&task,
&txn,
wb,
&bindings_with_sourced_schema,
)
.await?;

Expand Down
8 changes: 8 additions & 0 deletions crates/runtime/src/capture/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ impl Task {
crate::X_GENERATION_ID.to_string(),
serde_json::Value::String(binding.collection_generation_id.to_string()),
);

// Initialize complexity limit with default value
by_index[index].annotations.insert(
doc::shape::X_COMPLEXITY_LIMIT.to_string(),
serde_json::Value::Number(serde_json::Number::from(
doc::shape::limits::DEFAULT_SCHEMA_COMPLEXITY_LIMIT
)),
);
}
by_index
}
Expand Down
Loading