Skip to content

Commit a3dc836

Browse files
committed
refactor async_ng code to introduce random delay in MyAsyncNGBO
1 parent 1099983 commit a3dc836

File tree

2 files changed

+55
-62
lines changed

2 files changed

+55
-62
lines changed

src/iop/_business_host.py

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -580,43 +580,41 @@ def __init__(self, target, request, timeout=-1, description=None, host=None):
580580
self._iris_handle = host.iris_handle
581581
asyncio.create_task(self.send())
582582

583-
async def send_async(self):
584-
# Call the synchronous function
585-
self._iris_handle.dispatchSendRequestAsyncNG(self.target, self.host._dispatch_serializer(self.request), self.timeout, self.description)
586-
587-
# Periodically check if the request is done
588-
while not self._iris_handle.dispatchIsRequestDone():
589-
await asyncio.sleep(0.1) # Adjust the sleep duration as needed
590-
591-
# Set the result of the Future
592-
self.set_result("Request sent and completed")
593-
594583
async def send(self):
584+
# init parameters
595585
message_header_id = iris.ref()
596586
queue_name = iris.ref()
597587
end_time = iris.ref()
598588
request = self.host._dispatch_serializer(self.request)
599-
self._iris_handle.dispatchSendRequestAsyncNG(self.target, request, self.timeout, self.description, message_header_id, queue_name, end_time)
589+
590+
# send request
591+
self._iris_handle.dispatchSendRequestAsyncNG(
592+
self.target, request, self.timeout, self.description,
593+
message_header_id, queue_name, end_time)
594+
595+
# get byref values
600596
self._message_header_id = message_header_id.value
601597
self._queue_name = queue_name.value
602598
self._end_time = end_time.value
603599

604600
while not self._done:
605601
await asyncio.sleep(0.1)
606-
self.done()
602+
self.is_done()
607603

608604
self.set_result(self._response)
609605

610-
def done(self):
606+
def is_done(self):
611607
response = iris.ref()
612-
status = self._iris_handle.dispatchIsRequestDone(self.timeout, self._end_time, self._queue_name, self._message_header_id, response)
608+
status = self._iris_handle.dispatchIsRequestDone(self.timeout, self._end_time,
609+
self._queue_name, self._message_header_id,
610+
response)
611+
613612
self._response = self.host._dispatch_deserializer(response.value)
614-
if status == 1 and self._response is not None:
615-
self._done = True
616613

617-
# def __await__(self):
618-
# if not self._done:
619-
# self.done()
620-
# yield self
621-
# else:
622-
# return self._response
614+
if status == 2: # message found
615+
self._done = True
616+
elif status == 1: # message not found
617+
pass
618+
else:
619+
self._done = True
620+
self.set_exception(RuntimeError(iris.system.Status.GetOneStatusText(status)))

src/iop/cls/IOP/Common.cls

Lines changed: 34 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -223,24 +223,6 @@ ClassMethod OnGetConnections(
223223
quit
224224
}
225225

226-
Method dispatchSendRequestAsyncNGTest(
227-
pTarget,
228-
pRequest,
229-
pTimeout,
230-
pDescription,
231-
ByRef pMessageHeaderId,
232-
ByRef pQueueName,
233-
ByRef pEndTime) As %String
234-
{
235-
Set tTargetConfigName = $get($$$DispatchNameToConfigName(pTarget)) Quit:""=tTargetConfigName $$$EnsError($$$EnsErrBusinessDispatchNameNotRegistered,pTarget)
236-
Set tTargetBusinessClass=$$$ConfigClassName(tTargetConfigName)
237-
Set tPriority=$$$eMessagePriorityAsync
238-
239-
Set tSC=$classmethod(tTargetBusinessClass,"acceptRequestAsync",..%ConfigName,tTargetConfigName,pRequest,tPriority,$$$queueSyncCallQueueName,..%SessionId,"",.tRequestHeader,pDescription,..%SuperSession)
240-
241-
Quit tSC
242-
}
243-
244226
Method dispatchSendRequestAsyncNG(
245227
pTarget,
246228
pRequest,
@@ -305,37 +287,50 @@ Method dispatchIsRequestDone(
305287
pMessageHeaderId,
306288
ByRef pResponse) As %Status
307289
{
290+
308291
set tSC=$$$OK
309292
try {
310-
set tTimeout=$s(pTimeout=-1:-1,1:pEndTime-$zh) if (pTimeout'=-1)&&(tTimeout<0) quit
293+
set tTimeout=$s(pTimeout=-1:-1,1:pEndTime-$zh)
294+
311295
set tSC = ##class(Ens.Queue).DeQueue($$$queueSyncCallQueueName,.tResponseHeader,tTimeout,.tIsTimedOut,0) Quit:$$$ISERR(tSC)
312-
quit:tIsTimedOut
296+
297+
quit:$IsObject(tResponseHeader)=0
313298

314299
set tFound = $select(tResponseHeader.CorrespondingMessageId: pMessageHeaderId=tResponseHeader.CorrespondingMessageId, 1: 0)
315300
if tFound=0 {
316-
$$$sysTRACE("Out-of-band message '"_tResponseHeader.%Id()_"' discarded")
317-
do tResponseHeader.SetStatus($$$eMessageStatusDiscarded)
318-
quit
301+
302+
set tSC = ##class(Ens.Queue).EnQueue(tResponseHeader)
303+
Kill $$$EnsActiveMessage($$$SystemName_":"_$Job)
319304
}
320-
if tResponseHeader.IsError {
305+
else {
306+
307+
if tIsTimedOut || ((pTimeout'=-1)&&(tTimeout<0)) {
308+
309+
do tResponseHeader.SetStatus($$$eMessageStatusDiscarded)
310+
return $$$ERROR($$$EnsErrFailureTimeout, tTimeout, $$$StatusDisplayString(tSC), $$$CurrentClass)
311+
}
312+
if tResponseHeader.IsError {
313+
314+
do tResponseHeader.SetStatus($$$eMessageStatusCompleted)
315+
return $$$EnsError($$$EnsErrGeneral,"Error message received: "_tResponseHeader.ErrorText)
316+
317+
}
318+
if tResponseHeader.MessageBodyClassName'="" {
319+
320+
set tResponse = $classmethod(tResponseHeader.MessageBodyClassName,"%OpenId",tResponseHeader.MessageBodyId,,.tSC)
321+
if '$IsObject(tResponse) return $$$EnsError($$$EnsErrGeneral,"Could not open MessageBody "_tResponseHeader.MessageBodyId_" for MessageHeader #"_tResponseHeader.%Id()_" with body class "_tResponseHeader.MessageBodyClassName_":"_$$$StatusDisplayString(tSC))
322+
} else {
323+
324+
set tResponse=$$$NULLOREF
325+
}
326+
set pResponse=tResponse
321327
do tResponseHeader.SetStatus($$$eMessageStatusCompleted)
322-
set tSC = $$$EnsError($$$EnsErrGeneral,"Error message received: "_tResponseHeader.ErrorText)
323-
quit
324-
}
325-
if tResponseHeader.MessageBodyClassName'="" {
326-
set tResponse = $classmethod(tResponseHeader.MessageBodyClassName,"%OpenId",tResponseHeader.MessageBodyId,,.tSC)
327-
if '$IsObject(tResponse) Set tSC=$$$EnsError($$$EnsErrGeneral,"Could not open MessageBody "_tResponseHeader.MessageBodyId_" for MessageHeader #"_tResponseHeader.%Id()_" with body class "_tResponseHeader.MessageBodyClassName_":"_$$$StatusDisplayString(tSC)) Quit
328-
} else {
329-
set tResponse=$$$NULLOREF
328+
set tSC = 2
329+
330330
}
331-
set pResponse=tResponse
332-
do tResponseHeader.SetStatus($$$eMessageStatusCompleted)
333-
334-
335-
Set tSC2 = ##class(Ens.Queue).Delete($$$queueSyncCallQueueName,"*") quit:$$$ISERR(tSC2)
336331
}
337-
catch {
338-
set tSC = $$$EnsSystemError
332+
catch ex {
333+
set tSC = ex.AsStatus()
339334
}
340335
quit tSC
341336
}

0 commit comments

Comments
 (0)