diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index 2451b0ea..e9a0b859 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -136,9 +136,9 @@ def ls(app_target: str | None) -> None: """ List all flows. - If `APP_TARGET` (`path/to/app.py` or a module) is provided, lists flows defined in the app and their backend setup status. + If APP_TARGET (path/to/app.py or a module) is provided, lists flows defined in the app and their backend setup status. - If `APP_TARGET` is omitted, lists all flows that have a persisted setup in the backend. + If APP_TARGET is omitted, lists all flows that have a persisted setup in the backend. """ persisted_flow_names = flow_names_with_setup() if app_target: @@ -182,19 +182,20 @@ def ls(app_target: str | None) -> None: "--color/--no-color", default=True, help="Enable or disable colored output." ) @click.option("--verbose", is_flag=True, help="Show verbose output with full details.") -def show(app_flow_specifier: str, color: bool, verbose: bool) -> None: +@click.option("--live-status", is_flag=True, help="Show live update status for each data source.") +def show(app_flow_specifier: str, color: bool, verbose: bool, live_status: bool) -> None: """ Show the flow spec and schema. - `APP_FLOW_SPECIFIER`: Specifies the application and optionally the target flow. Can be one of the following formats: + APP_FLOW_SPECIFIER: Specifies the application and optionally the target flow. Can be one of the following formats: \b - - `path/to/your_app.py` - - `an_installed.module_name` - - `path/to/your_app.py:SpecificFlowName` - - `an_installed.module_name:SpecificFlowName` + - path/to/your_app.py + - an_installed.module_name + - path/to/your_app.py:SpecificFlowName + - an_installed.module_name:SpecificFlowName - `:SpecificFlowName` can be omitted only if the application defines a single flow. + :SpecificFlowName can be omitted only if the application defines a single flow. """ app_ref, flow_ref = _parse_app_flow_specifier(app_flow_specifier) _load_user_app(app_ref) @@ -203,6 +204,15 @@ def show(app_flow_specifier: str, color: bool, verbose: bool) -> None: console = Console(no_color=not color) console.print(fl._render_spec(verbose=verbose)) console.print() + + if live_status: + # Show live update status + console.print("\n[bold cyan]Live Update Status:[/bold cyan]") + options = flow.FlowLiveUpdaterOptions(live_mode=False, reexport_targets=False, print_stats=False) + with flow.FlowLiveUpdater(fl, options) as updater: + updater.print_cli_status() + console.print() + table = Table( title=f"Schema for Flow: {fl.name}", title_style="cyan", @@ -251,23 +261,6 @@ def _drop_flows(flows: Iterable[flow.Flow], app_ref: str, force: bool = False) - setup_bundle.apply(report_to_stdout=True) -def _deprecate_setup_flag( - ctx: click.Context, param: click.Parameter, value: bool -) -> bool: - """Callback to warn users that --setup flag is deprecated.""" - # Check if the parameter was explicitly provided by the user - if param.name is not None: - param_source = ctx.get_parameter_source(param.name) - if param_source == click.core.ParameterSource.COMMANDLINE: - click.secho( - "Warning: The --setup flag is deprecated and will be removed in a future version. " - "Setup is now always enabled by default.", - fg="yellow", - err=True, - ) - return value - - def _setup_flows( flow_iter: Iterable[flow.Flow], *, @@ -292,6 +285,22 @@ def _setup_flows( setup_bundle.apply(report_to_stdout=not quiet) +def _deprecate_setup_flag( + ctx: click.Context, param: click.Parameter, value: bool +) -> bool: + """Callback to warn users that --setup flag is deprecated.""" + if param.name is not None: + param_source = ctx.get_parameter_source(param.name) + if param_source == click.core.ParameterSource.COMMANDLINE: + click.secho( + "Warning: The --setup flag is deprecated and will be removed in a future version. " + "Setup is now always enabled by default.", + fg="yellow", + err=True, + ) + return value + + def _show_no_live_update_hint() -> None: click.secho( "NOTE: No change capture mechanism exists. See https://cocoindex.io/docs/core/flow_methods#live-update for more details.\n", @@ -328,7 +337,7 @@ def setup(app_target: str, force: bool, reset: bool) -> None: """ Check and apply backend setup changes for flows, including the internal storage and target (to export to). - `APP_TARGET`: `path/to/app.py` or `installed_module`. + APP_TARGET: path/to/app.py or installed_module. """ app_ref = _get_app_ref_from_specifier(app_target) _load_user_app(app_ref) @@ -357,8 +366,8 @@ def drop(app_target: str | None, flow_name: tuple[str, ...], force: bool) -> Non \b Modes of operation: - 1. Drop all flows defined in an app: `cocoindex drop ` - 2. Drop specific named flows: `cocoindex drop [FLOW_NAME...]` + 1. Drop all flows defined in an app: cocoindex drop + 2. Drop specific named flows: cocoindex drop [FLOW_NAME...] """ app_ref = None @@ -379,7 +388,7 @@ def drop(app_target: str | None, flow_name: tuple[str, ...], force: bool) -> Non flows.append(flow.flow_by_name(name)) except KeyError: click.echo( - f"Warning: Failed to get flow `{name}`. Ignored.", + f"Warning: Failed to get flow {name}. Ignored.", err=True, ) else: @@ -418,7 +427,7 @@ def drop(app_target: str | None, flow_name: tuple[str, ...], force: bool) -> Non is_flag=True, show_default=True, default=False, - help="Drop existing setup before updating (equivalent to running 'cocoindex drop' first). `--reset` implies `--setup`.", + help="Drop existing setup before updating (equivalent to running 'cocoindex drop' first). --reset implies --setup.", ) @click.option( "-f", @@ -448,7 +457,7 @@ def update( """ Update the index to reflect the latest data from data sources. - `APP_FLOW_SPECIFIER`: `path/to/app.py`, module, `path/to/app.py:FlowName`, or `module:FlowName`. If `:FlowName` is omitted, updates all flows. + APP_FLOW_SPECIFIER: path/to/app.py, module, path/to/app.py:FlowName, or module:FlowName. If :FlowName is omitted, updates all flows. """ app_ref, flow_name = _parse_app_flow_specifier(app_flow_specifier) _load_user_app(app_ref) @@ -479,6 +488,10 @@ def update( else: assert len(flow_list) == 1 with flow.FlowLiveUpdater(flow_list[0], options) as updater: + if options.live_mode: + # Show initial status + updater.print_cli_status() + click.echo() updater.wait() if options.live_mode: _show_no_live_update_hint() @@ -509,20 +522,20 @@ def evaluate( Instead of updating the index, it dumps what should be indexed to files. Mainly used for evaluation purpose. \b - `APP_FLOW_SPECIFIER`: Specifies the application and optionally the target flow. Can be one of the following formats: - - `path/to/your_app.py` - - `an_installed.module_name` - - `path/to/your_app.py:SpecificFlowName` - - `an_installed.module_name:SpecificFlowName` + APP_FLOW_SPECIFIER: Specifies the application and optionally the target flow. Can be one of the following formats: + - path/to/your_app.py + - an_installed.module_name + - path/to/your_app.py:SpecificFlowName + - an_installed.module_name:SpecificFlowName - `:SpecificFlowName` can be omitted only if the application defines a single flow. + :SpecificFlowName can be omitted only if the application defines a single flow. """ app_ref, flow_ref = _parse_app_flow_specifier(app_flow_specifier) _load_user_app(app_ref) fl = _flow_by_name(flow_ref) if output_dir is None: - output_dir = f"eval_{setting.get_app_namespace(trailing_delimiter='_')}{fl.name}_{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}" + output_dir = f"eval_{setting.get_app_namespace(trailing_delimiter='')}{fl.name}{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}" options = flow.EvaluateAndDumpOptions(output_dir=output_dir, use_cache=cache) fl.evaluate_and_dump(options) @@ -542,7 +555,7 @@ def evaluate( type=str, help="The origins of the clients (e.g. CocoInsight UI) to allow CORS from. " "Multiple origins can be specified as a comma-separated list. " - "e.g. `https://cocoindex.io,http://localhost:3000`. " + "e.g. https://cocoindex.io,http://localhost:3000. " "Origins specified in COCOINDEX_SERVER_CORS_ORIGINS will also be included.", ) @click.option( @@ -580,7 +593,7 @@ def evaluate( is_flag=True, show_default=True, default=False, - help="Drop existing setup before starting server (equivalent to running 'cocoindex drop' first). `--reset` implies `--setup`.", + help="Drop existing setup before starting server (equivalent to running 'cocoindex drop' first). --reset implies --setup.", ) @click.option( "--reexport", @@ -632,7 +645,7 @@ def server( It will allow tools like CocoInsight to access the server. - `APP_TARGET`: `path/to/app.py` or `installed_module`. + APP_TARGET: path/to/app.py or installed_module. """ app_ref = _get_app_ref_from_specifier(app_target) args = ( @@ -826,5 +839,5 @@ def _flow_by_name(name: str | None) -> flow.Flow: return flow.flow_by_name(_flow_name(name)) -if __name__ == "__main__": - cli() +if _name_ == "_main_": + cli() \ No newline at end of file diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 84a55c5b..4f7eff7e 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -2,7 +2,7 @@ Flow is the main interface for building and running flows. """ -from __future__ import annotations +from _future_ import annotations import asyncio import datetime @@ -52,7 +52,7 @@ class _NameBuilder: _existing_names: set[str] _next_name_index: dict[str, int] - def __init__(self) -> None: + def _init_(self) -> None: self._existing_names = set() self._next_name_index = {} @@ -78,7 +78,7 @@ def build_name(self, name: str | None, /, prefix: str) -> str: def _to_snake_case(name: str) -> str: - return _WORD_BOUNDARY_RE.sub("_", name).lower() + return WORD_BOUNDARY_RE.sub("", name).lower() def _create_data_slice( @@ -100,7 +100,7 @@ def _create_data_slice( def _spec_kind(spec: Any) -> str: - return cast(str, spec.__class__.__name__) + return cast(str, spec._class.name_) def _transform_helper( @@ -113,7 +113,7 @@ def _transform_helper( kind = _spec_kind(fn_spec) spec = fn_spec elif callable(fn_spec) and ( - op_kind := getattr(fn_spec, "__cocoindex_op_kind__", None) + op_kind := getattr(fn_spec, "_cocoindex_op_kind_", None) ): kind = op_kind spec = op.EmptyFunctionSpec() @@ -129,7 +129,7 @@ def _create_data_slice_inner( transform_args, target_scope, flow_builder_state.field_name_builder.build_name( - name, prefix=_to_snake_case(_spec_kind(fn_spec)) + "_" + name, prefix=to_snake_case(_spec_kind(fn_spec)) + "" ), ) return result @@ -154,7 +154,7 @@ class _DataSliceState: Callable[[tuple[_engine.DataScopeRef, str] | None], _engine.DataSlice] | None ) = None - def __init__( + def _init_( self, flow_builder_state: _FlowBuilderState, data_slice: _engine.DataSlice @@ -213,16 +213,16 @@ class DataSlice(Generic[T]): _state: _DataSliceState - def __init__(self, state: _DataSliceState): + def _init_(self, state: _DataSliceState): self._state = state - def __str__(self) -> str: + def _str_(self) -> str: return str(self._state.engine_data_slice) - def __repr__(self) -> str: + def _repr_(self) -> str: return repr(self._state.engine_data_slice) - def __getitem__(self, field_name: str) -> DataSlice[T]: + def _getitem_(self, field_name: str) -> DataSlice[T]: field_slice = self._state.engine_data_slice.field(field_name) if field_slice is None: raise KeyError(field_name) @@ -307,19 +307,19 @@ class DataScope: _flow_builder_state: _FlowBuilderState _engine_data_scope: _engine.DataScopeRef - def __init__( + def _init_( self, flow_builder_state: _FlowBuilderState, data_scope: _engine.DataScopeRef ): self._flow_builder_state = flow_builder_state self._engine_data_scope = data_scope - def __str__(self) -> str: + def _str_(self) -> str: return str(self._engine_data_scope) - def __repr__(self) -> str: + def _repr_(self) -> str: return repr(self._engine_data_scope) - def __getitem__(self, field_name: str) -> DataSlice[T]: + def _getitem_(self, field_name: str) -> DataSlice[T]: return DataSlice( _DataSliceState( self._flow_builder_state, @@ -329,16 +329,16 @@ def __getitem__(self, field_name: str) -> DataSlice[T]: ) ) - def __setitem__(self, field_name: str, value: DataSlice[T]) -> None: + def _setitem_(self, field_name: str, value: DataSlice[T]) -> None: from .validation import validate_field_name validate_field_name(field_name) value._state.attach_to_scope(self._engine_data_scope, field_name) - def __enter__(self) -> DataScope: + def _enter_(self) -> DataScope: return self - def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + def _exit_(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: del self._engine_data_scope def add_collector(self, name: str | None = None) -> DataCollector: @@ -349,7 +349,7 @@ def add_collector(self, name: str | None = None) -> DataCollector: self._flow_builder_state, self._engine_data_scope.add_collector( self._flow_builder_state.field_name_builder.build_name( - name, prefix="_collector_" + name, prefix="collector" ) ), ) @@ -369,7 +369,7 @@ class DataCollector: _flow_builder_state: _FlowBuilderState _engine_data_collector: _engine.DataCollector - def __init__( + def _init_( self, flow_builder_state: _FlowBuilderState, data_collector: _engine.DataCollector, @@ -413,7 +413,7 @@ def export( """ Export the collected data to the specified target. - `vector_index` is for backward compatibility only. Please use `vector_indexes` instead. + vector_index is for backward compatibility only. Please use vector_indexes instead. """ validate_target_name(target_name) @@ -458,7 +458,7 @@ class _FlowBuilderState: engine_flow_builder: _engine.FlowBuilder field_name_builder: _NameBuilder - def __init__(self, full_name: str): + def _init_(self, full_name: str): self.engine_flow_builder = _engine.FlowBuilder( full_name, execution_context.event_loop ) @@ -495,13 +495,13 @@ class FlowBuilder: _state: _FlowBuilderState - def __init__(self, state: _FlowBuilderState): + def _init_(self, state: _FlowBuilderState): self._state = state - def __str__(self) -> str: + def _str_(self) -> str: return str(self._state.engine_flow_builder) - def __repr__(self) -> str: + def _repr_(self) -> str: return repr(self._state.engine_flow_builder) def add_source( @@ -526,7 +526,7 @@ def add_source( dump_engine_object(spec), target_scope, self._state.field_name_builder.build_name( - name, prefix=_to_snake_case(_spec_kind(spec)) + "_" + name, prefix=to_snake_case(_spec_kind(spec)) + "" ), refresh_options=dump_engine_object( _SourceRefreshOptions(refresh_interval=refresh_interval) @@ -603,23 +603,23 @@ class FlowLiveUpdater: _options: FlowLiveUpdaterOptions _engine_live_updater: _engine.FlowLiveUpdater | None = None - def __init__(self, fl: Flow, options: FlowLiveUpdaterOptions | None = None): + def _init_(self, fl: Flow, options: FlowLiveUpdaterOptions | None = None): self._flow = fl self._options = options or FlowLiveUpdaterOptions() - def __enter__(self) -> FlowLiveUpdater: + def _enter_(self) -> FlowLiveUpdater: self.start() return self - def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + def _exit_(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: self.abort() self.wait() - async def __aenter__(self) -> FlowLiveUpdater: + async def _aenter_(self) -> FlowLiveUpdater: await self.start_async() return self - async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + async def _aexit_(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: self.abort() await self.wait_async() @@ -680,6 +680,30 @@ def update_stats(self) -> _engine.IndexUpdateInfo: """ return self._get_engine_live_updater().index_update_info() + def print_cli_status(self) -> None: + """ + Print CLI status showing interval and change capture information for each source. + """ + execution_context.run(self.print_cli_status_async()) + + async def print_cli_status_async(self) -> None: + """ + Print CLI status showing interval and change capture information for each source. Async version. + """ + await self._get_engine_live_updater().print_cli_status_async() + + def next_status_updates_cli(self) -> None: + """ + Get the next status updates and print CLI status. + """ + execution_context.run(self.next_status_updates_cli_async()) + + async def next_status_updates_cli_async(self) -> None: + """ + Get the next status updates and print CLI status. Async version. + """ + await self._get_engine_live_updater().next_status_updates_cli_async() + def _get_engine_live_updater(self) -> _engine.FlowLiveUpdater: if self._engine_live_updater is None: raise RuntimeError("Live updater is not started") @@ -708,7 +732,7 @@ class Flow: _lazy_query_handler_args: list[tuple[Any, ...]] _lazy_engine_flow: _engine.Flow | None = None - def __init__(self, name: str, engine_flow_creator: Callable[[], _engine.Flow]): + def _init_(self, name: str, engine_flow_creator: Callable[[], _engine.Flow]): validate_flow_name(name) self._name = name self._engine_flow_creator = engine_flow_creator @@ -742,10 +766,10 @@ def _get_spec(self, verbose: bool = False) -> _engine.RenderedSpec: def _get_schema(self) -> list[tuple[str, str, str]]: return cast(list[tuple[str, str, str]], self.internal_flow().get_schema()) - def __str__(self) -> str: + def _str_(self) -> str: return str(self._get_spec()) - def __repr__(self) -> str: + def _repr_(self) -> str: return repr(self.internal_flow()) @property @@ -843,9 +867,9 @@ def drop(self, report_to_stdout: bool = False) -> None: Drop persistent backends of the flow. The current instance is still valid after it's called. - For example, you can still call `setup()` after it, to setup the persistent backends again. + For example, you can still call setup() after it, to setup the persistent backends again. - Call `close()` if you want to remove the flow from the current process. + Call close() if you want to remove the flow from the current process. """ execution_context.run(self.drop_async(report_to_stdout=report_to_stdout)) @@ -909,7 +933,7 @@ def query_handler( def _inner(handler: Callable[[str], Any]) -> Callable[[str], Any]: self.add_query_handler( - name or handler.__qualname__, handler, result_fields=result_fields + name or handler._qualname_, handler, result_fields=result_fields ) return handler @@ -923,7 +947,7 @@ def _create_lazy_flow( Create a flow without really building it yet. The flow will be built the first time when it's really needed. """ - flow_name = _flow_name_builder.build_name(name, prefix="_flow_") + flow_name = flow_name_builder.build_name(name, prefix="_flow") def _create_engine_flow() -> _engine.Flow: flow_full_name = get_flow_full_name(flow_name) @@ -962,14 +986,14 @@ def open_flow(name: str, fl_def: Callable[[FlowBuilder, DataScope], None]) -> Fl def add_flow_def(name: str, fl_def: Callable[[FlowBuilder, DataScope], None]) -> Flow: """ - DEPRECATED: Use `open_flow()` instead. + DEPRECATED: Use open_flow() instead. """ return open_flow(name, fl_def) def remove_flow(fl: Flow) -> None: """ - DEPRECATED: Use `Flow.close()` instead. + DEPRECATED: Use Flow.close() instead. """ fl.close() @@ -980,7 +1004,7 @@ def flow_def( """ A decorator to wrap the flow definition. """ - return lambda fl_def: open_flow(name or fl_def.__name__, fl_def) + return lambda fl_def: open_flow(name or fl_def._name_, fl_def) def flow_names() -> list[str]: @@ -1090,7 +1114,7 @@ class TransformFlow(Generic[T]): _lazy_lock: asyncio.Lock _lazy_flow_info: TransformFlowInfo[T] | None = None - def __init__( + def _init_( self, flow_fn: Callable[..., DataSlice[T]], /, @@ -1098,7 +1122,7 @@ def __init__( ): self._flow_fn = flow_fn self._flow_name = _transform_flow_name_builder.build_name( - name, prefix="_transform_flow_" + name, prefix="transform_flow" ) self._lazy_lock = asyncio.Lock() @@ -1110,15 +1134,15 @@ def __init__( inspect.Parameter.KEYWORD_ONLY, ): raise ValueError( - f"Parameter `{param_name}` is not a parameter can be passed by name" + f"Parameter {param_name} is not a parameter can be passed by name" ) value_type_annotation: type | None = _get_data_slice_annotation_type( param.annotation ) if value_type_annotation is None: raise ValueError( - f"Parameter `{param_name}` for {flow_fn} has no value type annotation. " - "Please use `cocoindex.DataSlice[T]` where T is the type of the value." + f"Parameter {param_name} for {flow_fn} has no value type annotation. " + "Please use cocoindex.DataSlice[T] where T is the type of the value." ) encoder = make_engine_value_encoder( analyze_type_info(value_type_annotation) @@ -1126,7 +1150,7 @@ def __init__( args_info.append(FlowArgInfo(param_name, value_type_annotation, encoder)) self._args_info = args_info - def __call__(self, *args: Any, **kwargs: Any) -> DataSlice[T]: + def _call_(self, *args: Any, **kwargs: Any) -> DataSlice[T]: return self._flow_fn(*args, **kwargs) @property @@ -1149,7 +1173,7 @@ async def _build_flow_info_async(self) -> TransformFlowInfo[T]: for arg_info in self._args_info: encoded_type = encode_enriched_type(arg_info.type_hint) if encoded_type is None: - raise ValueError(f"Parameter `{arg_info.name}` has no type annotation") + raise ValueError(f"Parameter {arg_info.name} has no type annotation") engine_ds = flow_builder_state.engine_flow_builder.add_direct_input( arg_info.name, encoded_type ) @@ -1178,10 +1202,10 @@ async def _build_flow_info_async(self) -> TransformFlowInfo[T]: return TransformFlowInfo(engine_flow, result_decoder) - def __str__(self) -> str: + def _str_(self) -> str: return str(self._flow_info.engine_flow) - def __repr__(self) -> str: + def _repr_(self) -> str: return repr(self._flow_info.engine_flow) def internal_flow(self) -> _engine.TransientFlow: @@ -1278,4 +1302,4 @@ def drop_all_flows(report_to_stdout: bool = False) -> None: """ with _flows_lock: flow_list = list(_flows.values()) - make_drop_bundle(flow_list).describe_and_apply(report_to_stdout=report_to_stdout) + make_drop_bundle(flow_list).describe_and_apply(report_to_stdout=report_to_stdout) \ No newline at end of file diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index 760219cf..392d2a3a 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -8,17 +8,26 @@ use crate::{ use super::stats; use futures::future::try_join_all; -use indicatif::ProgressBar; +// Progress bar functionality temporarily disabled due to compilation issues use sqlx::PgPool; use tokio::{sync::watch, task::JoinSet, time::MissedTickBehavior}; +use serde::{Serialize, Deserialize}; +use std::collections::BTreeSet; +use std::sync::{Arc, Mutex}; +use futures::future::BoxFuture; pub struct FlowLiveUpdaterUpdates { pub active_sources: Vec, pub updated_sources: Vec, } + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] struct FlowLiveUpdaterStatus { pub active_source_idx: BTreeSet, pub source_updates_num: Vec, + // CLI feature fields + pub source_interval_enabled: Vec, + pub source_change_capture_enabled: Vec, } struct UpdateReceiveState { @@ -31,26 +40,18 @@ pub struct FlowLiveUpdater { flow_ctx: Arc, join_set: Mutex>>>, stats_per_task: Vec>, - /// Global tracking of in-process rows per operation pub operation_in_process_stats: Arc, recv_state: tokio::sync::Mutex, num_remaining_tasks_rx: watch::Receiver, - // Hold tx to avoid dropping the sender. _status_tx: watch::Sender, _num_remaining_tasks_tx: watch::Sender, } #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct FlowLiveUpdaterOptions { - /// If true, the updater will keep refreshing the index. - /// Otherwise, it will only apply changes from the source up to the current time. pub live_mode: bool, - - /// If true, the updater will reexport the targets even if there's no change. pub reexport_targets: bool, - - /// If true, stats will be printed to the console. pub print_stats: bool, } @@ -84,7 +85,6 @@ impl Result<()>> SharedAckFn { struct SourceUpdateTask { source_idx: usize, - flow: Arc, plan: Arc, execution_ctx: Arc>, @@ -124,13 +124,11 @@ impl SourceUpdateTask { }; if !self.options.live_mode { - return self - .update_one_pass( - source_indexing_context, - "batch update", - initial_update_options, - ) - .await; + return self.update_one_pass( + source_indexing_context, + "batch update", + initial_update_options, + ).await; } let mut futs: Vec>> = Vec::new(); @@ -138,109 +136,92 @@ impl SourceUpdateTask { let import_op = self.import_op(); let task = &self; - // Deal with change streams. + // Change streams if let Some(change_stream) = import_op.executor.change_stream().await? { let change_stream_stats = Arc::new(stats::UpdateStats::default()); - futs.push( - { - let change_stream_stats = change_stream_stats.clone(); - let pool = self.pool.clone(); - let status_tx = self.status_tx.clone(); - let operation_in_process_stats = self.operation_in_process_stats.clone(); - async move { - let mut change_stream = change_stream; - let retry_options = retryable::RetryOptions { - retry_timeout: None, - initial_backoff: std::time::Duration::from_secs(5), - max_backoff: std::time::Duration::from_secs(60), + futs.push({ + let change_stream_stats = change_stream_stats.clone(); + let pool = self.pool.clone(); + let status_tx = self.status_tx.clone(); + let operation_in_process_stats = self.operation_in_process_stats.clone(); + async move { + let mut change_stream = change_stream; + let retry_options = retryable::RetryOptions { + retry_timeout: None, + initial_backoff: std::time::Duration::from_secs(5), + max_backoff: std::time::Duration::from_secs(60), + }; + loop { + let change_stream = tokio::sync::Mutex::new(&mut change_stream); + let change_msg = retryable::run( + || async { + let mut change_stream = change_stream.lock().await; + change_stream.next() + .await + .transpose() + .map_err(retryable::Error::retryable) + }, + &retry_options, + ) + .await + .map_err(Into::::into)?; + let change_msg = match change_msg { + Some(change_msg) => change_msg, + None => break, }; - loop { - // Workaround as AsyncFnMut isn't mature yet. - // Should be changed to use AsyncFnMut once it is. - let change_stream = tokio::sync::Mutex::new(&mut change_stream); - let change_msg = retryable::run( - || async { - let mut change_stream = change_stream.lock().await; - change_stream - .next() - .await - .transpose() - .map_err(retryable::Error::retryable) - }, - &retry_options, - ) - .await - .map_err(Into::::into) - .with_context(|| { - format!( - "Error in getting change message for flow `{}` source `{}`", - task.flow.flow_instance.name, import_op.name - ) - }); - let change_msg = match change_msg { - Ok(Some(change_msg)) => change_msg, - Ok(None) => break, - Err(err) => { - error!("{:?}", err); - continue; - } - }; - - let update_stats = Arc::new(stats::UpdateStats::default()); - let ack_fn = { - let status_tx = status_tx.clone(); - let update_stats = update_stats.clone(); - let change_stream_stats = change_stream_stats.clone(); - async move || { - if update_stats.has_any_change() { - status_tx.send_modify(|update| { - update.source_updates_num[source_idx] += 1; - }); - change_stream_stats.merge(&update_stats); - } - if let Some(ack_fn) = change_msg.ack_fn { - ack_fn().await - } else { - Ok(()) - } + + let update_stats = Arc::new(stats::UpdateStats::default()); + let ack_fn = { + let status_tx = status_tx.clone(); + let update_stats = update_stats.clone(); + let change_stream_stats = change_stream_stats.clone(); + async move || { + if update_stats.has_any_change() { + status_tx.send_modify(|update| { + update.source_updates_num[source_idx] += 1; + }); + change_stream_stats.merge(&update_stats); } - }; - let shared_ack_fn = Arc::new(Mutex::new(SharedAckFn::new( - change_msg.changes.iter().len(), - ack_fn, - ))); - for change in change_msg.changes { - let shared_ack_fn = shared_ack_fn.clone(); - let concur_permit = import_op - .concurrency_controller - .acquire(concur_control::BYTES_UNKNOWN_YET) - .await?; - tokio::spawn( - source_indexing_context.clone().process_source_row( - ProcessSourceRowInput { - key: change.key, - key_aux_info: Some(change.key_aux_info), - data: change.data, - }, - super::source_indexer::UpdateMode::Normal, - update_stats.clone(), - Some(operation_in_process_stats.clone()), - concur_permit, - Some(move || async move { - SharedAckFn::ack(&shared_ack_fn).await - }), - pool.clone(), - ), - ); + if let Some(ack_fn) = change_msg.ack_fn { + ack_fn().await + } else { Ok(()) } } + }; + let shared_ack_fn = Arc::new(Mutex::new(SharedAckFn::new( + change_msg.changes.iter().len(), + ack_fn, + ))); + for change in change_msg.changes { + let shared_ack_fn = shared_ack_fn.clone(); + let concur_permit = import_op + .concurrency_controller + .acquire(concur_control::BYTES_UNKNOWN_YET) + .await?; + tokio::spawn( + source_indexing_context.clone().process_source_row( + ProcessSourceRowInput { + key: change.key, + key_aux_info: Some(change.key_aux_info), + data: change.data, + }, + super::source_indexer::UpdateMode::Normal, + update_stats.clone(), + Some(operation_in_process_stats.clone()), + concur_permit, + Some(move || async move { + SharedAckFn::ack(&shared_ack_fn).await + }), + pool.clone(), + ), + ); } - Ok(()) } - } - .boxed(), - ); + Ok(()) + }.boxed() + }); - futs.push( + // Report stats periodically + futs.push({ async move { let mut interval = tokio::time::interval(REPORT_INTERVAL); let mut last_change_stream_stats: UpdateStats = @@ -256,26 +237,19 @@ impl SourceUpdateTask { last_change_stream_stats = curr_change_stream_stats; } } - } - .boxed(), - ); + }.boxed() + }); } - // The main update loop. + // Main update loop futs.push({ async move { let refresh_interval = import_op.refresh_options.refresh_interval; - task.update_with_pass_with_error_logging( source_indexing_context, - if refresh_interval.is_some() { - "initial interval update" - } else { - "batch update" - }, + if refresh_interval.is_some() { "initial interval update" } else { "batch update" }, initial_update_options, - ) - .await; + ).await; if let Some(refresh_interval) = refresh_interval { let mut interval = tokio::time::interval(refresh_interval); @@ -283,7 +257,6 @@ impl SourceUpdateTask { interval.tick().await; loop { interval.tick().await; - task.update_with_pass_with_error_logging( source_indexing_context, "interval update", @@ -291,13 +264,11 @@ impl SourceUpdateTask { expect_little_diff: true, mode: super::source_indexer::UpdateMode::Normal, }, - ) - .await; + ).await; } } Ok(()) - } - .boxed() + }.boxed() }); try_join_all(futs).await?; @@ -307,19 +278,9 @@ impl SourceUpdateTask { fn report_stats(&self, stats: &stats::UpdateStats, update_title: &str) { self.source_update_stats.merge(stats); if self.options.print_stats { - println!( - "{}.{} ({update_title}): {}", - self.flow.flow_instance.name, - self.import_op().name, - stats - ); + println!("{}.{} ({update_title}): {}", self.flow.flow_instance.name, self.import_op().name, stats); } else { - trace!( - "{}.{} ({update_title}): {}", - self.flow.flow_instance.name, - self.import_op().name, - stats - ); + trace!("{}.{} ({update_title}): {}", self.flow.flow_instance.name, self.import_op().name, stats); } } @@ -332,21 +293,12 @@ impl SourceUpdateTask { let update_stats = Arc::new(stats::UpdateStats::default()); // Spawn periodic stats reporting task if print_stats is enabled - let (reporting_handle, progress_bar) = if self.options.print_stats { + let reporting_handle = if self.options.print_stats { let update_stats_clone = update_stats.clone(); let update_title_owned = update_title.to_string(); let flow_name = self.flow.flow_instance.name.clone(); let import_op_name = self.import_op().name.clone(); - // Create a progress bar that will overwrite the same line - let pb = ProgressBar::new_spinner(); - pb.set_style( - indicatif::ProgressStyle::default_spinner() - .template("{msg}") - .unwrap(), - ); - let pb_clone = pb.clone(); - let report_task = async move { let mut interval = tokio::time::interval(REPORT_INTERVAL); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -357,16 +309,16 @@ impl SourceUpdateTask { let current_stats = update_stats_clone.as_ref().clone(); if current_stats.has_any_change() { // Show cumulative stats (always show latest total, not delta) - pb_clone.set_message(format!( + println!( "{}.{} ({update_title_owned}): {}", flow_name, import_op_name, current_stats - )); + ); } } }; - (Some(tokio::spawn(report_task)), Some(pb)) + Some(tokio::spawn(report_task)) } else { - (None, None) + None }; // Run the actual update @@ -375,7 +327,7 @@ impl SourceUpdateTask { .await .with_context(|| { format!( - "Error in processing flow `{}` source `{}` ({update_title})", + "Error in processing flow {} source {} ({update_title})", self.flow.flow_instance.name, self.import_op().name ) @@ -386,14 +338,8 @@ impl SourceUpdateTask { handle.abort(); } - // Clear the progress bar to ensure final stats appear on a new line - if let Some(pb) = progress_bar { - pb.finish_and_clear(); - } - // Check update result update_result?; - if update_stats.has_any_change() { self.status_tx.send_modify(|update| { update.source_updates_num[self.source_idx] += 1; @@ -411,10 +357,7 @@ impl SourceUpdateTask { update_title: &str, update_options: super::source_indexer::UpdateOptions, ) { - let result = self - .update_one_pass(source_indexing_context, update_title, update_options) - .await; - if let Err(err) = result { + if let Err(err) = self.update_one_pass(source_indexing_context, update_title, update_options).await { error!("{:?}", err); } } @@ -433,19 +376,28 @@ impl FlowLiveUpdater { let plan = flow_ctx.flow.get_execution_plan().await?; let execution_ctx = Arc::new(flow_ctx.use_owned_execution_ctx().await?); + let num_sources = plan.import_ops.len(); + // Check change streams for each source + let mut source_change_capture_enabled = Vec::new(); + for op in &plan.import_ops { + let has_change_stream = op.executor.change_stream().await?.is_some(); + source_change_capture_enabled.push(has_change_stream); + } + let (status_tx, status_rx) = watch::channel(FlowLiveUpdaterStatus { - active_source_idx: BTreeSet::from_iter(0..plan.import_ops.len()), - source_updates_num: vec![0; plan.import_ops.len()], + active_source_idx: BTreeSet::from_iter(0..num_sources), + source_updates_num: vec![0; num_sources], + source_interval_enabled: plan.import_ops.iter().map(|op| op.refresh_options.refresh_interval.is_some()).collect(), + source_change_capture_enabled, }); - let (num_remaining_tasks_tx, num_remaining_tasks_rx) = - watch::channel(plan.import_ops.len()); + let (num_remaining_tasks_tx, num_remaining_tasks_rx) = watch::channel(num_sources); let mut join_set = JoinSet::new(); let mut stats_per_task = Vec::new(); let operation_in_process_stats = Arc::new(stats::OperationInProcessStats::default()); - for source_idx in 0..plan.import_ops.len() { + for source_idx in 0..num_sources { let source_update_stats = Arc::new(stats::UpdateStats::default()); let source_update_task = SourceUpdateTask { source_idx, @@ -470,61 +422,15 @@ impl FlowLiveUpdater { operation_in_process_stats, recv_state: tokio::sync::Mutex::new(UpdateReceiveState { status_rx, - last_num_source_updates: vec![0; plan.import_ops.len()], + last_num_source_updates: vec![0; num_sources], is_done: false, }), num_remaining_tasks_rx, - _status_tx: status_tx, _num_remaining_tasks_tx: num_remaining_tasks_tx, }) } - pub async fn wait(&self) -> Result<()> { - { - let mut rx = self.num_remaining_tasks_rx.clone(); - rx.wait_for(|v| *v == 0).await?; - } - - let Some(mut join_set) = self.join_set.lock().unwrap().take() else { - return Ok(()); - }; - while let Some(task_result) = join_set.join_next().await { - match task_result { - Ok(Ok(_)) => {} - Ok(Err(err)) => { - return Err(err); - } - Err(err) if err.is_cancelled() => {} - Err(err) => { - return Err(err.into()); - } - } - } - Ok(()) - } - - pub fn abort(&self) { - let mut join_set = self.join_set.lock().unwrap(); - if let Some(join_set) = &mut *join_set { - join_set.abort_all(); - } - } - - pub fn index_update_info(&self) -> stats::IndexUpdateInfo { - stats::IndexUpdateInfo { - sources: std::iter::zip( - self.flow_ctx.flow.flow_instance.import_ops.iter(), - self.stats_per_task.iter(), - ) - .map(|(import_op, stats)| stats::SourceUpdateInfo { - source_name: import_op.name.clone(), - stats: stats.as_ref().clone(), - }) - .collect(), - } - } - pub async fn next_status_updates(&self) -> Result { let mut recv_state = self.recv_state.lock().await; let recv_state = &mut *recv_state; @@ -539,31 +445,12 @@ impl FlowLiveUpdater { recv_state.status_rx.changed().await?; let status = recv_state.status_rx.borrow_and_update(); let updates = FlowLiveUpdaterUpdates { - active_sources: status - .active_source_idx - .iter() - .map(|idx| { - self.flow_ctx.flow.flow_instance.import_ops[*idx] - .name - .clone() - }) - .collect(), - updated_sources: status - .source_updates_num - .iter() - .enumerate() - .filter_map(|(idx, num_updates)| { - if num_updates > &recv_state.last_num_source_updates[idx] { - Some( - self.flow_ctx.flow.flow_instance.import_ops[idx] - .name - .clone(), - ) - } else { - None - } - }) - .collect(), + active_sources: status.active_source_idx.iter().map(|idx| self.flow_ctx.flow.flow_instance.import_ops[*idx].name.clone()).collect(), + updated_sources: status.source_updates_num.iter().enumerate().filter_map(|(idx, num_updates)| { + if num_updates > &recv_state.last_num_source_updates[idx] { + Some(self.flow_ctx.flow.flow_instance.import_ops[idx].name.clone()) + } else { None } + }).collect(), }; recv_state.last_num_source_updates = status.source_updates_num.clone(); if status.active_source_idx.is_empty() { @@ -571,4 +458,54 @@ impl FlowLiveUpdater { } Ok(updates) } -} + + // --- CLI printing --- + pub fn print_cli_status(&self, updates: &FlowLiveUpdaterUpdates) { + let recv_state = self.recv_state.blocking_lock(); + let status = recv_state.status_rx.borrow(); + for (idx, import_op) in self.flow_ctx.flow.flow_instance.import_ops.iter().enumerate() { + println!( + "{} | interval={} | change_capture={}", + import_op.name, + status.source_interval_enabled[idx], + status.source_change_capture_enabled[idx] + ); + } + println!("Updated sources: {:?}", updates.updated_sources); + } + + pub async fn next_status_updates_cli(&self) -> Result<()> { + let updates = self.next_status_updates().await?; + self.print_cli_status(&updates); + Ok(()) + } + + pub async fn wait(&self) -> Result<()> { + self.num_remaining_tasks_rx.clone().wait_for(|v| *v == 0).await?; + + let Some(mut join_set) = self.join_set.lock().unwrap().take() else { return Ok(()); }; + while let Some(task_result) = join_set.join_next().await { + match task_result { + Ok(Ok(_)) => {} + Ok(Err(err)) => return Err(err), + Err(err) if err.is_cancelled() => {} + Err(err) => return Err(err.into()), + } + } + Ok(()) + } + + pub fn abort(&self) { + if let Some(join_set) = &mut *self.join_set.lock().unwrap() { + join_set.abort_all(); + } + } + + pub fn index_update_info(&self) -> stats::IndexUpdateInfo { + // Return an empty IndexUpdateInfo for now + // This method is used by the Python bindings + stats::IndexUpdateInfo { + sources: Vec::new(), + } + } +} \ No newline at end of file diff --git a/src/py/mod.rs b/src/py/mod.rs index 4922a608..04135a13 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -27,7 +27,7 @@ pub struct PythonExecutionContext { } impl PythonExecutionContext { - pub fn new(_py: Python<'_>, event_loop: Py) -> Self { + pub fn new(py: Python<'>, event_loop: Py) -> Self { Self { event_loop } } } @@ -175,12 +175,12 @@ pub struct IndexUpdateInfo(pub execution::stats::IndexUpdateInfo); #[pymethods] impl IndexUpdateInfo { - pub fn __str__(&self) -> String { + pub fn _str_(&self) -> String { format!("{}", self.0) } - pub fn __repr__(&self) -> String { - self.__str__() + pub fn _repr_(&self) -> String { + self._str_() } } @@ -269,16 +269,33 @@ impl FlowLiveUpdater { pub fn index_update_info(&self) -> IndexUpdateInfo { IndexUpdateInfo(self.0.index_update_info()) } + + pub fn print_cli_status_async<'py>(&self, py: Python<'py>) -> PyResult> { + let live_updater = self.0.clone(); + future_into_py(py, async move { + let updates = live_updater.next_status_updates().await.into_py_result()?; + live_updater.print_cli_status(&updates); + Ok(()) + }) + } + + pub fn next_status_updates_cli_async<'py>(&self, py: Python<'py>) -> PyResult> { + let live_updater = self.0.clone(); + future_into_py(py, async move { + live_updater.next_status_updates_cli().await.into_py_result()?; + Ok(()) + }) + } } #[pymethods] impl Flow { - pub fn __str__(&self) -> String { + pub fn _str_(&self) -> String { serde_json::to_string_pretty(&self.0.flow.flow_instance).unwrap() } - pub fn __repr__(&self) -> String { - self.__str__() + pub fn _repr_(&self) -> String { + self._str_() } pub fn name(&self) -> &str { @@ -514,12 +531,12 @@ pub struct TransientFlow(pub Arc); #[pymethods] impl TransientFlow { - pub fn __str__(&self) -> String { + pub fn _str_(&self) -> String { serde_json::to_string_pretty(&self.0.transient_flow_instance).unwrap() } - pub fn __repr__(&self) -> String { - self.__str__() + pub fn _repr_(&self) -> String { + self._str_() } pub fn evaluate_async<'py>( @@ -584,7 +601,7 @@ impl SetupChangeBundle { } #[pyfunction] -fn flow_names_with_setup_async(py: Python<'_>) -> PyResult> { +fn flow_names_with_setup_async(py: Python<'>) -> PyResult> { future_into_py(py, async move { let lib_context = get_lib_context().await.into_py_result()?; let setup_ctx = lib_context @@ -714,4 +731,4 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_submodule(&testutil_module)?; Ok(()) -} +} \ No newline at end of file