Skip to content

Commit 0670a22

Browse files
committed
fix(worker): refactor input processor to garuntee exit
1 parent 3382a41 commit 0670a22

File tree

3 files changed

+120
-80
lines changed

3 files changed

+120
-80
lines changed

servc/svc/com/worker/__init__.py

Lines changed: 55 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from servc.svc.com.bus import BusComponent, OnConsuming
55
from servc.svc.com.cache import CacheComponent
66
from servc.svc.com.worker.hooks import evaluate_post_hooks, evaluate_pre_hooks
7+
from servc.svc.com.worker.methods import evaluate_exit, get_artifact
78
from servc.svc.com.worker.types import RESOLVER, RESOLVER_CONTEXT, RESOLVER_MAPPING
89
from servc.svc.config import Config
910
from servc.svc.io.input import InputType
@@ -105,7 +106,7 @@ def connect(self):
105106

106107
def run_resolver(
107108
self, method: RESOLVER, context: RESOLVER_CONTEXT, args: Tuple[str, Any]
108-
) -> Tuple[StatusCode, ResponseArtifact | None]:
109+
) -> Tuple[StatusCode, ResponseArtifact | None, Any | None]:
109110
id, payload = args
110111
statuscode: StatusCode = StatusCode.OK
111112
response: ResponseArtifact | None = None
@@ -132,20 +133,10 @@ def run_resolver(
132133
statuscode = StatusCode.SERVER_ERROR
133134
response = getErrorArtifact(id, str(e), StatusCode.SERVER_ERROR)
134135

135-
if self._config.get(f"conf.{self.name}.exiton5xx") and statuscode.value >= 500:
136-
print("Exiting due to 5xx error", error, flush=True)
137-
exit(1)
138-
if (
139-
self._config.get(f"conf.{self.name}.exiton4xx")
140-
and statuscode.value >= 400
141-
and statuscode.value < 500
142-
):
143-
print("Exiting due to 4xx error", error, flush=True)
144-
exit(1)
145-
146-
return statuscode, response
136+
return statuscode, response, error
147137

148138
def inputProcessor(self, message: Any) -> StatusCode:
139+
workerConfig = self._config.get(f"conf.{self.name}")
149140
bus = self._busClass(
150141
self._config.get(f"conf.{self._bus.name}"),
151142
)
@@ -157,6 +148,10 @@ def inputProcessor(self, message: Any) -> StatusCode:
157148
"config": self._config,
158149
}
159150

151+
status_code: StatusCode = StatusCode.OK
152+
response: ResponseArtifact | None = None
153+
error: Any | None = None
154+
160155
if "type" not in message or "route" not in message:
161156
return StatusCode.INVALID_INPUTS
162157

@@ -166,85 +161,66 @@ def inputProcessor(self, message: Any) -> StatusCode:
166161
or "details" not in message
167162
or "instanceId" not in message
168163
):
169-
return StatusCode.INVALID_INPUTS
164+
status_code = StatusCode.INVALID_INPUTS
165+
response = getErrorArtifact(
166+
message["id"] if "id" in message else "",
167+
"Invalid input type for event. event, details or instanceId not specified",
168+
StatusCode.INVALID_INPUTS,
169+
)
170170
if message["event"] not in self._eventResolvers:
171171
return StatusCode.METHOD_NOT_FOUND
172172

173-
status_code, response = self.run_resolver(
173+
status_code, response, error = self.run_resolver(
174174
self._eventResolvers[message["event"]],
175175
context,
176176
("", {**message}),
177177
)
178178

179-
return status_code
180-
181-
if message["type"] in [InputType.INPUT.value, InputType.INPUT]:
182-
if "id" not in message:
183-
return StatusCode.INVALID_INPUTS
184-
if "argumentId" not in message:
185-
cache.setKey(
186-
message["id"],
187-
getErrorArtifact(
188-
message["id"],
189-
"Invalid input type. Id and argumentId not specified",
190-
StatusCode.INVALID_INPUTS,
191-
),
179+
elif message["type"] in [InputType.INPUT.value, InputType.INPUT]:
180+
if "id" not in message or "argumentId" not in message:
181+
status_code = StatusCode.INVALID_INPUTS
182+
response = getErrorArtifact(
183+
message["id"] if "id" in message else "",
184+
"Invalid input type. Id and argumentId not specified",
185+
StatusCode.INVALID_INPUTS,
192186
)
193-
return StatusCode.INVALID_INPUTS
187+
status_code = StatusCode.INVALID_INPUTS
194188
if "instanceId" in message and message["instanceId"] != bus.instanceId:
195189
return StatusCode.NO_PROCESSING
196190

197-
if message["argumentId"] in ["raw", "plain"] and message["inputs"]:
198-
artifact = message["argument"]
191+
# get the artifact from the message
192+
artifact = get_artifact(message, cache)
193+
if isinstance(artifact, tuple):
194+
status_code, response = artifact
199195
else:
200-
artifact = cache.getKey(message["argumentId"])
201-
if artifact is None or "method" not in artifact or "inputs" not in artifact:
202-
cache.setKey(
203-
message["id"],
204-
getErrorArtifact(
205-
message["id"],
206-
"Invalid argument. Need to specify method and inputs in payload",
207-
StatusCode.USER_ERROR,
208-
),
209-
)
210-
return StatusCode.USER_ERROR
211-
if artifact["method"] not in self._resolvers:
212-
cache.setKey(
213-
message["id"],
214-
getErrorArtifact(
196+
if artifact["method"] not in self._resolvers:
197+
status_code = StatusCode.METHOD_NOT_FOUND
198+
response = getErrorArtifact(
215199
message["id"], "Method not found", StatusCode.METHOD_NOT_FOUND
216-
),
217-
)
218-
if self._config.get(f"conf.{self.name}.exiton4xx"):
219-
print("Exiting due to 4xx error:", "Method not found", flush=True)
220-
exit(1)
221-
return StatusCode.METHOD_NOT_FOUND
222-
223-
continueExecution = evaluate_pre_hooks(
224-
self._resolvers,
225-
message,
226-
artifact,
227-
context,
228-
)
229-
if not continueExecution:
230-
return StatusCode.OK
231-
232-
statusCode, response = self.run_resolver(
233-
self._resolvers[artifact["method"]],
234-
context,
235-
(message["id"], artifact["inputs"]),
236-
)
237-
if statusCode == StatusCode.NO_PROCESSING:
238-
return StatusCode.NO_PROCESSING
200+
)
201+
else:
202+
continueExecution = evaluate_pre_hooks(
203+
self._resolvers,
204+
message,
205+
artifact,
206+
context,
207+
)
208+
if not continueExecution:
209+
return StatusCode.OK
210+
211+
status_code, response, error = self.run_resolver(
212+
self._resolvers[artifact["method"]],
213+
context,
214+
(message["id"], artifact["inputs"]),
215+
)
216+
if status_code == StatusCode.NO_PROCESSING:
217+
return StatusCode.NO_PROCESSING
218+
219+
evaluate_exit(
220+
message, response, cache, status_code, workerConfig, error
221+
)
222+
evaluate_post_hooks(bus, cache, message, artifact)
223+
224+
evaluate_exit(message, response, cache, status_code, workerConfig, error)
239225

240-
cache.setKey(message["id"], response)
241-
evaluate_post_hooks(bus, cache, message, artifact)
242-
return statusCode
243-
244-
cache.setKey(
245-
message["id"],
246-
getErrorArtifact(
247-
message["id"], "Invalid input type", StatusCode.INVALID_INPUTS
248-
),
249-
)
250226
return StatusCode.INVALID_INPUTS

servc/svc/com/worker/methods.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from typing import Any, Tuple
2+
3+
from servc.svc.com.cache import CacheComponent
4+
from servc.svc.config import Config
5+
from servc.svc.io.input import ArgumentArtifact, InputPayload
6+
from servc.svc.io.output import ResponseArtifact, StatusCode
7+
from servc.svc.io.response import getErrorArtifact
8+
9+
10+
def evaluate_exit(
11+
message: InputPayload,
12+
response: ResponseArtifact | None,
13+
cache: CacheComponent,
14+
statusCode: StatusCode,
15+
config: Config,
16+
error: Any | None,
17+
):
18+
if config.get("exiton5xx") and statusCode.value >= 500:
19+
print("Exiting due to 5xx error: ", error, flush=True)
20+
exit(1)
21+
if config.get("exiton4xx") and statusCode.value >= 400 and statusCode.value < 500:
22+
print("Exiting due to 4xx error: ", error, flush=True)
23+
exit(1)
24+
25+
# allow specific exit to an error code
26+
error_str: str = str(statusCode.value)
27+
if config.get(f"exiton{error_str}"):
28+
print(f"Exiting due to {error_str} error: ", error, flush=True)
29+
exit(1)
30+
31+
if response is not None and "id" in message and message["id"]:
32+
cache.setKey(message["id"], response)
33+
34+
35+
def get_artifact(
36+
message: InputPayload, cache: CacheComponent
37+
) -> ArgumentArtifact | Tuple[StatusCode, ResponseArtifact]:
38+
artifact = (
39+
cache.getKey(message["argumentId"])
40+
if message["argumentId"] not in ["raw", "plain"]
41+
else message["argument"]
42+
)
43+
44+
if artifact is None or "method" not in artifact or "inputs" not in artifact:
45+
return (
46+
StatusCode.USER_ERROR,
47+
getErrorArtifact(
48+
message["id"],
49+
"Invalid argument. Need to specify method and inputs in payload",
50+
StatusCode.USER_ERROR,
51+
),
52+
)
53+
54+
return artifact

servc/svc/config/__init__.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,17 @@
2424

2525
BOOLEAN_CONFIGS = os.getenv(
2626
"SERVC_BOOLEAN_CONFIGS",
27-
"conf.worker.exiton4xx,conf.worker.exiton5xx,conf.worker.bindtoeventexchange",
27+
",".join(
28+
[
29+
"conf.worker.exiton400",
30+
"conf.worker.exiton404",
31+
"conf.worker.exiton401",
32+
"conf.worker.exiton422",
33+
"conf.worker.exiton4xx",
34+
"conf.worker.exiton5xx",
35+
"conf.worker.bindtoeventexchange",
36+
]
37+
),
2838
).split(",")
2939
DOT_MARKER = os.getenv("SERVC_DOT_MARKER", "_DOT_")
3040
DASH_MARKER = os.getenv("SERVC_DASH_MARKER", "_DASH_")

0 commit comments

Comments
 (0)