diff --git a/data_acquisition/Pipfile b/data_acquisition/Pipfile index d99a923..b82fb5d 100644 --- a/data_acquisition/Pipfile +++ b/data_acquisition/Pipfile @@ -8,6 +8,7 @@ verify_ssl = true [packages] requests = "*" psycopg2-binary = "*" +aiohttp = "*" [requires] python_version = "3.6" diff --git a/data_acquisition/clear_expired_jobs.py b/data_acquisition/clear_expired_jobs.py index 9f5c1bb..0c45bd3 100644 --- a/data_acquisition/clear_expired_jobs.py +++ b/data_acquisition/clear_expired_jobs.py @@ -4,25 +4,39 @@ import psycopg2 import getpass import logging +import asyncio +from aiohttp import ClientSession url_index = 2 id_index = 0 # Currently just makes a GET request to the URLs in the DB and returns true if the status code isn't 200 # TODO: read HTML for websites to see if they're expired even if the status code is 200 -def check_expired(job): - url = job[url_index] - if(url): - response = requests.get(url) - if(response.status_code != 200): - return True - else: - return False - +def job_is_expired(response): + if response['response'].status != 200: + return True else: - print("No URL") return False +async def get_url_response(job, session): + async with session.get(job[url_index]) as response: + return {'job_id': job[id_index], 'response': response} + +async def delete_expired_jobs(cur, con): + tasks = [] + async with ClientSession() as session: + for job in cur: + task = asyncio.ensure_future(get_url_response(job, session)) + tasks.append(task) + + responses = await asyncio.gather(*tasks) + + for response in responses: + if job_is_expired(response): + cur.execute("DELETE FROM job WHERE id = %s", (response['job_id'],)) + + con.commit() + # Gets all the jobs from the db and deletes the ones that are expired def check_for_expired_jobs(): # dbname, user, host, and password should match your database info in ormconfig.json @@ -32,18 +46,9 @@ def check_for_expired_jobs(): # Get all jobs from db cur.execute("SELECT * FROM job") - # Go through all of the jobs from the db and check if they're expired - expired_job_ids = [] - for job in cur: - expired = check_expired(job) - if(expired): - expired_job_ids.append(job[id_index]) - - # Go through all of the expired jobs and delete them from the db - for job_id in expired_job_ids: - cur.execute("DELETE FROM job WHERE id = %s", (job_id,)) - - con.commit() + loop = asyncio.get_event_loop() + future = asyncio.ensure_future(delete_expired_jobs(cur, con)) + loop.run_until_complete(future) try: check_for_expired_jobs()