Skip to content

Commit e3edef8

Browse files
better logging
1 parent 7b5233c commit e3edef8

File tree

3 files changed

+181
-115
lines changed

3 files changed

+181
-115
lines changed

packages/core-bridge/src/client.rs

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -55,33 +55,36 @@ pub fn client_new(
5555
let runtime = runtime.borrow()?.core_runtime.clone();
5656
let config: CoreClientOptions = config.try_into()?;
5757

58-
runtime.clone().future_to_promise(async move {
59-
let metric_meter = runtime.clone().telemetry().get_temporal_metric_meter();
60-
let res = config.connect_no_namespace(metric_meter).await;
61-
62-
let core_client = match res {
63-
Ok(core_client) => core_client,
64-
Err(ClientInitError::InvalidHeaders(e)) => Err(BridgeError::TypeError {
65-
message: format!("Invalid metadata key: {e}"),
66-
field: None,
67-
})?,
68-
Err(ClientInitError::SystemInfoCallError(e)) => Err(BridgeError::TransportError(
69-
format!("Failed to call GetSystemInfo: {e}"),
70-
))?,
71-
Err(ClientInitError::TonicTransportError(e)) => {
72-
Err(BridgeError::TransportError(format!("{e:?}")))?
73-
}
74-
Err(ClientInitError::InvalidUri(e)) => Err(BridgeError::TypeError {
75-
message: e.to_string(),
76-
field: None,
77-
})?,
78-
};
79-
80-
Ok(OpaqueOutboundHandle::new(Client {
81-
core_runtime: runtime,
82-
core_client,
83-
}))
84-
})
58+
runtime.clone().future_to_promise_named(
59+
async move {
60+
let metric_meter = runtime.clone().telemetry().get_temporal_metric_meter();
61+
let res = config.connect_no_namespace(metric_meter).await;
62+
63+
let core_client = match res {
64+
Ok(core_client) => core_client,
65+
Err(ClientInitError::InvalidHeaders(e)) => Err(BridgeError::TypeError {
66+
message: format!("Invalid metadata key: {e}"),
67+
field: None,
68+
})?,
69+
Err(ClientInitError::SystemInfoCallError(e)) => Err(BridgeError::TransportError(
70+
format!("Failed to call GetSystemInfo: {e}"),
71+
))?,
72+
Err(ClientInitError::TonicTransportError(e)) => {
73+
Err(BridgeError::TransportError(format!("{e:?}")))?
74+
}
75+
Err(ClientInitError::InvalidUri(e)) => Err(BridgeError::TypeError {
76+
message: e.to_string(),
77+
field: None,
78+
})?,
79+
};
80+
81+
Ok(OpaqueOutboundHandle::new(Client {
82+
core_runtime: runtime,
83+
core_client,
84+
}))
85+
},
86+
"client_new",
87+
)
8588
}
8689

8790
/// Update a Client's HTTP request headers

packages/core-bridge/src/runtime.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,15 @@ pub trait RuntimeExt {
167167
F: Future<Output = Result<R, BridgeError>> + Send + 'static,
168168
R: TryIntoJs + Send + 'static;
169169

170+
fn future_to_promise_named<F, R>(
171+
&self,
172+
future: F,
173+
caller: &'static str,
174+
) -> BridgeResult<BridgeFuture<R>>
175+
where
176+
F: Future<Output = Result<R, BridgeError>> + Send + 'static,
177+
R: TryIntoJs + Send + 'static;
178+
170179
/// Spawn a future on the Tokio runtime, and let it run to completion without waiting for it to
171180
/// complete. Should any error occur, we'll try to send them to this Runtime's logger, but may
172181
/// end up or silently dropping entries in some extreme cases.
@@ -187,6 +196,21 @@ impl RuntimeExt for CoreRuntime {
187196
)))
188197
}
189198

199+
fn future_to_promise_named<F, R>(
200+
&self,
201+
future: F,
202+
caller: &'static str,
203+
) -> BridgeResult<BridgeFuture<R>>
204+
where
205+
F: Future<Output = Result<R, BridgeError>> + Send + 'static,
206+
R: TryIntoJs + Send + 'static,
207+
{
208+
enter_sync!(self);
209+
Ok(BridgeFuture::new(Box::pin(future.instrument(
210+
tracing::info_span!("future_to_promise_named", caller),
211+
))))
212+
}
213+
190214
fn spawn_and_forget<F>(&self, future: F)
191215
where
192216
F: Future<Output = Result<(), BridgeError>> + Send + 'static,
@@ -216,6 +240,18 @@ impl RuntimeExt for Arc<CoreRuntime> {
216240
{
217241
self.as_ref().spawn_and_forget(future);
218242
}
243+
244+
fn future_to_promise_named<F, R>(
245+
&self,
246+
future: F,
247+
caller: &'static str,
248+
) -> BridgeResult<BridgeFuture<R>>
249+
where
250+
F: Future<Output = Result<R, BridgeError>> + Send + 'static,
251+
R: TryIntoJs + Send + 'static,
252+
{
253+
self.as_ref().future_to_promise_named(future, caller)
254+
}
219255
}
220256

221257
////////////////////////////////////////////////////////////////////////////////////////////////////

packages/core-bridge/src/worker.rs

Lines changed: 115 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,15 @@ pub fn worker_validate(worker: OpaqueInboundHandle<Worker>) -> BridgeResult<Brid
103103
let worker = worker_ref.core_worker.clone();
104104
let runtime = worker_ref.core_runtime.clone();
105105

106-
runtime.future_to_promise(async move {
107-
worker
108-
.validate()
109-
.await
110-
.map_err(|err| BridgeError::TransportError(err.to_string()))
111-
})
106+
runtime.future_to_promise_named(
107+
async move {
108+
worker
109+
.validate()
110+
.await
111+
.map_err(|err| BridgeError::TransportError(err.to_string()))
112+
},
113+
"worker_validate",
114+
)
112115
}
113116

114117
/// Initiate a single workflow activation poll request.
@@ -121,19 +124,22 @@ pub fn worker_poll_workflow_activation(
121124
let worker = worker_ref.core_worker.clone();
122125
let runtime = worker_ref.core_runtime.clone();
123126

124-
runtime.future_to_promise(async move {
125-
let result = worker.poll_workflow_activation().await;
127+
runtime.future_to_promise_named(
128+
async move {
129+
let result = worker.poll_workflow_activation().await;
126130

127-
match result {
128-
Ok(task) => Ok(task.encode_to_vec()),
129-
Err(err) => match err {
130-
PollError::ShutDown => Err(BridgeError::WorkerShutdown)?,
131-
PollError::TonicError(status) => {
132-
Err(BridgeError::TransportError(status.message().to_string()))?
133-
}
134-
},
135-
}
136-
})
131+
match result {
132+
Ok(task) => Ok(task.encode_to_vec()),
133+
Err(err) => match err {
134+
PollError::ShutDown => Err(BridgeError::WorkerShutdown)?,
135+
PollError::TonicError(status) => {
136+
Err(BridgeError::TransportError(status.message().to_string()))?
137+
}
138+
},
139+
}
140+
},
141+
"worker_poll_workflow_activation",
142+
)
137143
}
138144

139145
/// Submit a workflow activation completion to core.
@@ -154,21 +160,24 @@ pub fn worker_complete_workflow_activation(
154160
let worker = worker_ref.core_worker.clone();
155161
let runtime = worker_ref.core_runtime.clone();
156162

157-
runtime.future_to_promise(async move {
158-
worker
159-
.complete_workflow_activation(workflow_completion)
160-
.await
161-
.map_err(|err| match err {
162-
CompleteWfError::MalformedWorkflowCompletion { reason, run_id } => {
163-
BridgeError::TypeError {
164-
field: None,
165-
message: format!(
166-
"Malformed Workflow Completion: {reason:?} for RunID={run_id}"
167-
),
163+
runtime.future_to_promise_named(
164+
async move {
165+
worker
166+
.complete_workflow_activation(workflow_completion)
167+
.await
168+
.map_err(|err| match err {
169+
CompleteWfError::MalformedWorkflowCompletion { reason, run_id } => {
170+
BridgeError::TypeError {
171+
field: None,
172+
message: format!(
173+
"Malformed Workflow Completion: {reason:?} for RunID={run_id}"
174+
),
175+
}
168176
}
169-
}
170-
})
171-
})
177+
})
178+
},
179+
"worker_complete_workflow_activation",
180+
)
172181
}
173182

174183
/// Initiate a single activity task poll request.
@@ -181,19 +190,22 @@ pub fn worker_poll_activity_task(
181190
let worker = worker_ref.core_worker.clone();
182191
let runtime = worker_ref.core_runtime.clone();
183192

184-
runtime.future_to_promise(async move {
185-
let result = worker.poll_activity_task().await;
193+
runtime.future_to_promise_named(
194+
async move {
195+
let result = worker.poll_activity_task().await;
186196

187-
match result {
188-
Ok(task) => Ok(task.encode_to_vec()),
189-
Err(err) => match err {
190-
PollError::ShutDown => Err(BridgeError::WorkerShutdown)?,
191-
PollError::TonicError(status) => {
192-
Err(BridgeError::TransportError(status.message().to_string()))?
193-
}
194-
},
195-
}
196-
})
197+
match result {
198+
Ok(task) => Ok(task.encode_to_vec()),
199+
Err(err) => match err {
200+
PollError::ShutDown => Err(BridgeError::WorkerShutdown)?,
201+
PollError::TonicError(status) => {
202+
Err(BridgeError::TransportError(status.message().to_string()))?
203+
}
204+
},
205+
}
206+
},
207+
"worker_poll_activity_task",
208+
)
197209
}
198210

199211
/// Submit an activity task completion to core.
@@ -214,20 +226,23 @@ pub fn worker_complete_activity_task(
214226
let worker = worker_ref.core_worker.clone();
215227
let runtime = worker_ref.core_runtime.clone();
216228

217-
runtime.future_to_promise(async move {
218-
worker
219-
.complete_activity_task(activity_completion)
220-
.await
221-
.map_err(|err| match err {
222-
CompleteActivityError::MalformedActivityCompletion {
223-
reason,
224-
completion: _,
225-
} => BridgeError::TypeError {
226-
field: None,
227-
message: format!("Malformed Activity Completion: {reason:?}"),
228-
},
229-
})
230-
})
229+
runtime.future_to_promise_named(
230+
async move {
231+
worker
232+
.complete_activity_task(activity_completion)
233+
.await
234+
.map_err(|err| match err {
235+
CompleteActivityError::MalformedActivityCompletion {
236+
reason,
237+
completion: _,
238+
} => BridgeError::TypeError {
239+
field: None,
240+
message: format!("Malformed Activity Completion: {reason:?}"),
241+
},
242+
})
243+
},
244+
"worker_complete_activity_task",
245+
)
231246
}
232247

233248
/// Submit an activity heartbeat to core.
@@ -260,19 +275,22 @@ pub fn worker_poll_nexus_task(
260275
let worker = worker_ref.core_worker.clone();
261276
let runtime = worker_ref.core_runtime.clone();
262277

263-
runtime.future_to_promise(async move {
264-
let result = worker.poll_nexus_task().await;
278+
runtime.future_to_promise_named(
279+
async move {
280+
let result = worker.poll_nexus_task().await;
265281

266-
match result {
267-
Ok(task) => Ok(task.encode_to_vec()),
268-
Err(err) => match err {
269-
PollError::ShutDown => Err(BridgeError::WorkerShutdown)?,
270-
PollError::TonicError(status) => {
271-
Err(BridgeError::TransportError(status.message().to_string()))?
272-
}
273-
},
274-
}
275-
})
282+
match result {
283+
Ok(task) => Ok(task.encode_to_vec()),
284+
Err(err) => match err {
285+
PollError::ShutDown => Err(BridgeError::WorkerShutdown)?,
286+
PollError::TonicError(status) => {
287+
Err(BridgeError::TransportError(status.message().to_string()))?
288+
}
289+
},
290+
}
291+
},
292+
"worker_poll_nexus_task",
293+
)
276294
}
277295

278296
/// Submit an nexus task completion to core.
@@ -291,20 +309,25 @@ pub fn worker_complete_nexus_task(
291309
let worker = worker_ref.core_worker.clone();
292310
let runtime = worker_ref.core_runtime.clone();
293311

294-
runtime.future_to_promise(async move {
295-
worker
296-
.complete_nexus_task(nexus_completion)
297-
.await
298-
.map_err(|err| match err {
299-
CompleteNexusError::NexusNotEnabled {} => {
300-
BridgeError::UnexpectedError(format!("{err}"))
301-
}
302-
CompleteNexusError::MalformedNexusCompletion { reason } => BridgeError::TypeError {
303-
field: None,
304-
message: format!("Malformed nexus Completion: {reason:?}"),
305-
},
306-
})
307-
})
312+
runtime.future_to_promise_named(
313+
async move {
314+
worker
315+
.complete_nexus_task(nexus_completion)
316+
.await
317+
.map_err(|err| match err {
318+
CompleteNexusError::NexusNotEnabled {} => {
319+
BridgeError::UnexpectedError(format!("{err}"))
320+
}
321+
CompleteNexusError::MalformedNexusCompletion { reason } => {
322+
BridgeError::TypeError {
323+
field: None,
324+
message: format!("Malformed nexus Completion: {reason:?}"),
325+
}
326+
}
327+
})
328+
},
329+
"worker_complete_nexus_task",
330+
)
308331
}
309332

310333
/// Request shutdown of the worker.
@@ -313,6 +336,7 @@ pub fn worker_complete_nexus_task(
313336
/// the loop to ensure graceful shutdown.
314337
#[js_function]
315338
pub fn worker_initiate_shutdown(worker: OpaqueInboundHandle<Worker>) -> BridgeResult<()> {
339+
tracing::info!("Typescript initiate worker shutdown");
316340
let worker_ref = worker.borrow()?;
317341
worker_ref.core_worker.initiate_shutdown();
318342
Ok(())
@@ -337,10 +361,13 @@ pub fn worker_finalize_shutdown(
337361
}
338362
})?;
339363

340-
worker_ref.core_runtime.future_to_promise(async move {
341-
worker.finalize_shutdown().await;
342-
Ok(())
343-
})
364+
worker_ref.core_runtime.future_to_promise_named(
365+
async move {
366+
worker.finalize_shutdown().await;
367+
Ok(())
368+
},
369+
"worker_finalize_shutdown",
370+
)
344371
}
345372

346373
impl MutableFinalize for Worker {

0 commit comments

Comments
 (0)