Skip to content

Commit 91c2c55

Browse files
authored
Merge pull request #30 from TaskarCenterAtUW/stage
2 parents e87badf + b946646 commit 91c2c55

File tree

7 files changed

+31
-24
lines changed

7 files changed

+31
-24
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ COPY ./requirements.txt /code/requirements.txt
44
RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
55
COPY ./src /code/src
66
EXPOSE 8080
7-
CMD ["uvicorn", "src.main:app", "--reload", "--host", "0.0.0.0", "--port", "8080"]
7+
CMD ["uvicorn", "src.main:app","--host", "0.0.0.0", "--port", "8080"]

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ VALIDATION_REQ_SUB=xxxx
3030
VALIDATION_RES_TOPIC=xxxx
3131
CONTAINER_NAME=xxxx
3232
AUTH_PERMISSION_URL=xxx
33+
MAX_CONCURRENT_MESSAGES=xxx
3334

3435
```
3536

3637
The application connect with the `STORAGECONNECTION` string provided in `.env` file and validates downloaded zipfile using `python-osw-validation` package.
3738
`QUEUECONNECTION` is used to send out the messages and listen to messages.
3839

40+
`MAX_CONCURRENT_MESSAGES` is the maximum number of concurrent messages that the service can handle. If not provided, defaults to 2
3941

4042
### How to Set up and Build
4143
Follow the steps to install the python packages required for both building and running the application

requirements.txt

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
1-
psutil==5.9.5
21
fastapi==0.88.0
3-
python-dotenv==0.21.0
42
pydantic==1.10.4
5-
python-ms-core==0.0.18
3+
python-ms-core==0.0.22
64
uvicorn==0.20.0
7-
coverage==7.2.7
85
html_testRunner==1.2.1
9-
httpx==0.24.1
10-
python-osw-validation==0.2.3
6+
python-osw-validation==0.2.4

src/assets/osw-upload.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"messageId": "c8c76e89f30944d2b2abd2491bd95337",
33
"messageType": "workflow_identifier",
44
"data": {
5-
"file_upload_path": "https://tdeisamplestorage.blob.core.windows.net/osw/test_upload/valid.zip",
5+
"file_upload_path": "https://tdeisamplestorage.blob.core.windows.net/tdei-storage-test/Archivew.zip",
66
"user_id": "c59d29b6-a063-4249-943f-d320d15ac9ab",
77
"tdei_project_group_id": "0b41ebc5-350c-42d3-90af-3af4ad3628fb"
88
}

src/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class Settings(BaseSettings):
1717
app_name: str = 'python-osw-validation'
1818
event_bus = EventBusSettings()
1919
auth_permission_url: str = os.environ.get('AUTH_PERMISSION_URL', None)
20+
max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 2)
2021

2122
@property
2223
def auth_provider(self) -> str:

src/main.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
prefix_router = APIRouter(prefix='/health')
1111

12+
# Have a reference to validator in the app object
13+
app.validator = None
1214

1315
@lru_cache()
1416
def get_settings():
@@ -18,7 +20,8 @@ def get_settings():
1820
@app.on_event('startup')
1921
async def startup_event(settings: Settings = Depends(get_settings)) -> None:
2022
try:
21-
OSWValidator()
23+
# OSWValidator()
24+
app.validator = OSWValidator()
2225
except:
2326
print('\n\n\x1b[31m Application startup failed due to missing or invalid .env file \x1b[0m')
2427
print('\x1b[31m Please provide the valid .env file and .env file should contains following parameters\x1b[0m')
@@ -34,6 +37,11 @@ async def startup_event(settings: Settings = Depends(get_settings)) -> None:
3437
child.kill()
3538
parent.kill()
3639

40+
@app.on_event('shutdown')
41+
async def shutdown_event() -> None:
42+
print('Shutting down the application')
43+
if app.validator:
44+
app.validator.stop_listening()
3745

3846
@app.get('/', status_code=status.HTTP_200_OK)
3947
@prefix_router.get('/', status_code=status.HTTP_200_OK)

src/osw_validator.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,26 @@ class OSWValidator:
2020
_settings = Settings()
2121

2222
def __init__(self):
23-
core = Core()
23+
self.core = Core()
2424
options = {
2525
'provider': self._settings.auth_provider,
2626
'api_url': self._settings.auth_permission_url
2727
}
2828
listening_topic_name = self._settings.event_bus.upload_topic or ''
29-
publishing_topic_name = self._settings.event_bus.validation_topic or ''
3029
self.subscription_name = self._settings.event_bus.upload_subscription or ''
31-
self.listening_topic = core.get_topic(topic_name=listening_topic_name)
32-
self.publishing_topic = core.get_topic(topic_name=publishing_topic_name)
33-
self.logger = core.get_logger()
34-
self.storage_client = core.get_storage_client()
35-
self.auth = core.get_authorizer(config=options)
36-
self.container_name = self._settings.event_bus.container_name
37-
self.start_listening()
30+
self.listening_topic = self.core.get_topic(topic_name=listening_topic_name, max_concurrent_messages=self._settings.max_concurrent_messages)
31+
self.logger = self.core.get_logger()
32+
self.storage_client = self.core.get_storage_client()
33+
self.auth = self.core.get_authorizer(config=options)
34+
self.listener_thread = threading.Thread(target=self.start_listening)
35+
self.listener_thread.start()
3836

3937
def start_listening(self):
4038
def process(message) -> None:
4139
if message is not None:
4240
queue_message = QueueMessage.to_dict(message)
4341
upload_message = Upload.data_from(queue_message)
44-
process_thread = threading.Thread(target=self.validate, args=[upload_message])
45-
process_thread.start()
46-
# self.validate(upload_message)
42+
self.validate(received_message=upload_message)
4743

4844
self.listening_topic.subscribe(subscription=self.subscription_name, callback=process)
4945

@@ -89,10 +85,11 @@ def send_status(self, result: ValidationResult, upload_message: Upload):
8985
'data': upload_message.data.to_json()
9086
})
9187
try:
92-
self.publishing_topic.publish(data=data)
88+
self.core.get_topic(topic_name=self._settings.event_bus.validation_topic).publish(data=data)
89+
logger.info(f'Publishing message for : {upload_message.message_id}')
9390
except Exception as e:
94-
print(e)
95-
logger.info(f'Publishing message for : {upload_message.message_id}')
91+
logger.error(f'Error occurred while publishing message for : {upload_message.message_id} with error: {e}')
92+
9693

9794
def has_permission(self, roles: List[str], queue_message: Upload) -> bool:
9895
try:
@@ -107,3 +104,6 @@ def has_permission(self, roles: List[str], queue_message: Upload) -> bool:
107104
except Exception as error:
108105
print('Error validating the request authorization:', error)
109106
return False
107+
108+
def stop_listening(self):
109+
self.listener_thread.join(timeout=0) # Stop the thread during shutdown.Its still an attempt. Not sure if this will work.

0 commit comments

Comments
 (0)