diff --git a/.github/workflows/python_actions.yml b/.github/workflows/python_actions.yml index d8357a0..43f4bf6 100644 --- a/.github/workflows/python_actions.yml +++ b/.github/workflows/python_actions.yml @@ -45,7 +45,7 @@ jobs: - name: Install dependencies run: | - python -m pip install --upgrade setuptools pip + python -m pip install --upgrade setuptools "pip<24.1" pip install -r requirements.txt pip install -r dev-requirements.txt - name: Test with pytest diff --git a/README.md b/README.md old mode 100644 new mode 100755 index 8aebdb0..d677f80 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ This pipeline is to process source reference files, if xml to parse them first a python run.py RESOLVE -s ``` - 2. Specify a directory, and file extension, to recursively search all sub directories for this type of reference file, and queue them all for processing, use the command + 2. Specify a directory, and file extension (i.e. -e *.raw), to recursively search all sub directories for this type of reference file, and queue them all for processing, use the command ``` python run.py RESOLVE -p -e ``` diff --git a/adsrefpipe/app.py b/adsrefpipe/app.py old mode 100644 new mode 100755 index 4e5d2a5..1a587cb --- a/adsrefpipe/app.py +++ b/adsrefpipe/app.py @@ -8,6 +8,7 @@ from builtins import str from adsputils import ADSCelery from datetime import datetime, timedelta +from typing import List, Dict from adsrefpipe.models import Action, Parser, ReferenceSource, ProcessedHistory, ResolvedReference, CompareClassic from adsrefpipe.utils import get_date_created, get_date_modified, get_date_now, get_resolved_filename, \ @@ -17,28 +18,37 @@ from sqlalchemy import and_, literal from sqlalchemy.sql import exists from sqlalchemy.sql.expression import case, func +from sqlalchemy import desc from texttable import Texttable class ADSReferencePipelineCelery(ADSCelery): + """ + celery-based pipeline for processing and resolving references + handles reference parsing, resolution, and database management + """ + + # matches an identifier starting with 'H', followed by a number (history_id), then 'I', followed by another number (item_num) RE_PARSE_ID = re.compile(r'^H(?P\d+)+I(?P\d+)$') + # captures a double file extension at the end of a string, such as 'test.aas.raw' RE_MATCH_EXT = re.compile(r'.*(\..*?\.[a-z]+)$') default_parsers = {} - def __init__(self, app_name, *args, **kwargs): + def __init__(self, app_name: str, *args: tuple, **kwargs: Dict): """ + initialize the ADS reference pipeline celery application - :param app_name: - :param args: - :param kwargs: + :param app_name: name of the application + :param args: additional positional arguments + :param kwargs: additional keyword arguments """ ADSCelery.__init__(self, app_name, *args, **kwargs) - def init_default_parsers(self): + def init_default_parsers(self) -> None: """ - read into memory parser info from the lookup table + load parser information from the database into memory :return: """ @@ -62,13 +72,14 @@ def init_default_parsers(self): } self.default_parsers[to_dict['extension_pattern']] = to_dict - def match_parser(self, rows, journal, volume): + def match_parser(self, rows: List, journal: str, volume: str) -> Dict: """ + match a parser based on journal and volume information - :param rows: - :param journal: - :param volume: - :return: + :param rows: List of parser records + :param journal: journal name + :param volume: volume number or identifier + :return: matching parser record as a dictionary """ for row in rows: for match in row.get_matches(): @@ -89,11 +100,12 @@ def match_parser(self, rows, journal, volume): return row.toJSON() return {} - def get_parser(self, source_filename): + def get_parser(self, source_filename: str) -> Dict: """ + retrieve a parser based on the source filename - :param source_filename: - :return: + :param source_filename: filename of the source reference + :return: parser details as a dictionary """ if not self.default_parsers: self.init_default_parsers() @@ -130,12 +142,12 @@ def get_parser(self, source_filename): self.logger.error("Unrecognizable source file %s."%source_filename) return {} - def get_reference_service_endpoint(self, parsername): + def get_reference_service_endpoint(self, parsername: str) -> str: """ - given parsername find the endpoint that shall be called for the reference to get resolved + retrieve the reference service endpoint for a given parser - :param parsername: - :return: + :param parsername: name of the parser + :return: service endpoint URL """ with self.session_scope() as session: rows = session.query(Parser).filter(Parser.name == parsername).all() @@ -145,14 +157,14 @@ def get_reference_service_endpoint(self, parsername): self.logger.error("No unique record found in table `Parser` matching name %s." % parsername) return '' - def query_reference_source_tbl(self, bibcode_list=None, source_filename_list=None, parsername=None): + def query_reference_source_tbl(self, bibcode_list: List = None, source_filename_list: List = None, parsername: str = None) -> List: """ - Queries reference table and returns results. + query the reference source table - :param bibcode_list: - :param source_filename_list: - :param parsername: - :return: + :param bibcode_list: List of bibcodes to filter + :param source_filename_list: List of source filenames to filter + :param parsername: parser name to filter + :return: List of reference source records """ with self.session_scope() as session: if bibcode_list and source_filename_list: @@ -191,13 +203,13 @@ def query_reference_source_tbl(self, bibcode_list=None, source_filename_list=Non results.append(row.toJSON()) return results - def query_processed_history_tbl(self, bibcode_list=None, source_filename_list=None): + def query_processed_history_tbl(self, bibcode_list: List = None, source_filename_list: List = None) -> List: """ - Queries history table and returns results. + query the processed history table - :param bibcode_list: - :param source_filename_list: - :return: + :param bibcode_list: List of bibcodes to filter + :param source_filename_list: List of source filenames to filter + :return: List of processed history records """ with self.session_scope() as session: if bibcode_list and source_filename_list: @@ -262,13 +274,14 @@ def query_processed_history_tbl(self, bibcode_list=None, source_filename_list=No }) return results - def query_resolved_reference_tbl(self, history_id_list=None): + def query_resolved_reference_tbl(self, history_id_list: List = None) -> List: """ - Queries resolved table and returns results. + query the resolved reference table - :param history_id_list: - :return: + :param history_id_list: List of history IDs to filter + :return: List of resolved reference records """ + results = [] with self.session_scope() as session: if history_id_list: rows = session.query(func.count(ResolvedReference.item_num).label('num_references'), @@ -278,27 +291,28 @@ def query_resolved_reference_tbl(self, history_id_list=None): .group_by(ResolvedReference.history_id).all() self.logger.info("Fetched records for history ids = %s." % (','.join(str(h) for h in history_id_list))) - if len(rows) == 0: - if history_id_list: + if len(rows) == 0: self.logger.error("No records found for history ids = %s." % (','.join(str(h) for h in history_id_list))) - else: - self.logger.error("No records found in table `ResolvedReference`.") + return results + + for row in rows: + results.append({ + 'last_run_num_references': row.num_references, + 'last_run_num_resolved_references': row.num_resolved_references, + 'history_id': row.history_id, + }) + else: + self.logger.error("No history_id provided, returning no records.") - results = [] - for row in rows: - results.append({ - 'last_run_num_references': row.num_references, - 'last_run_num_resolved_references': row.num_resolved_references, - 'history_id': row.history_id, - }) - return results + return results - def diagnostic_query(self, bibcode_list=None, source_filename_list=None): + def diagnostic_query(self, bibcode_list: List = None, source_filename_list: List = None) -> List: """ + perform a diagnostic query to retrieve combined reference records - :param bibcode_list - :param source_filename_list - :return: list of json records or None + :param bibcode_list: List of bibcodes to filter + :param source_filename_list: List of source filenames to filter + :return: List of combined records from multiple tables """ results = [] @@ -314,7 +328,7 @@ def diagnostic_query(self, bibcode_list=None, source_filename_list=None): history_bibcodes = [item['bibcode'] for item in processed_history] # find unique bibcodes bibcodes = sorted(list(set(reference_bibcodes) | set(history_bibcodes))) - # go through the list and combine records from all three sources + # go through the List and combine records from all three sources for bibcode in bibcodes: result = {} reference_record = next(item for item in reference_source if item['bibcode'] == bibcode) @@ -333,13 +347,13 @@ def diagnostic_query(self, bibcode_list=None, source_filename_list=None): return results - def insert_reference_source_record(self, session, reference): + def insert_reference_source_record(self, session: object, reference: ReferenceSource) -> tuple: """ - check to see if the record already exists in the db first, if not, then add it in + insert a new record into the reference source table if it does not exist - :param session: - :param reference: - :return: + :param session: database session + :param reference: reference source record + :return: tuple containing bibcode and source filename """ found = session.query(exists().where(and_(ReferenceSource.bibcode == reference.bibcode, ReferenceSource.source_filename == reference.source_filename))).scalar() @@ -351,63 +365,66 @@ def insert_reference_source_record(self, session, reference): self.logger.debug("Added a `Reference` record successfully.") return reference.bibcode, reference.source_filename - def insert_history_record(self, session, history): + def insert_history_record(self, session: object, history: ProcessedHistory) -> int: """ + insert a new record into the processed history table - :param session: - :param history: - :return: + :param session: database session + :param history: processed history record + :return: history record ID """ session.add(history) session.flush() self.logger.debug("Added a `ProcessedHistory` record successfully.") return history.id - def insert_resolved_referencce_records(self, session, resolved_list): + def insert_resolved_reference_records(self, session: object, resolved_list: List[ResolvedReference]) -> bool: """ + insert resolved reference records into the database - :param session: - :param resolved_list: - :return: + :param session: database session + :param resolved_list: List of resolved reference records + :return: True if successful """ session.bulk_save_objects(resolved_list) session.flush() self.logger.debug("Added `ResolvedReference` records successfully.") return True - def update_resolved_reference_records(self, session, resolved_list): + def update_resolved_reference_records(self, session: object, resolved_list: List[ResolvedReference]) -> bool: """ + update resolved reference records in the database - :param session: - :param resolved_list: - :return: + :param session: database session + :param resolved_list: List of resolved reference records + :return: True if successful """ session.bulk_update_mappings(ResolvedReference, [r.toJSON() for r in resolved_list]) session.flush() self.logger.debug("Added `ResolvedReference` records successfully.") return True - def insert_compare_records(self, session, compared_list): + def insert_compare_records(self, session: object, compared_list: List[CompareClassic]) -> bool: """ + insert records into the compare classic table - :param session: - :param compared_list: - :return: + :param session: database session + :param compared_list: List of comparison records + :return: True if successful """ session.bulk_save_objects(compared_list) session.flush() self.logger.debug("Added `CompareClassic` records successfully.") return True - def populate_resolved_reference_records_pre_resolved(self, references, history_id, item_nums=None): + def populate_resolved_reference_records_pre_resolved(self, references: List, history_id: int, item_nums: List = None) -> tuple: """ - insert resolved records before sending them to service to be matched - if we have xml references, then insert populate the xml table as well + insert resolved reference records before sending them to a service - :param references: - :param history_id: - :param item_nums: - :return: + :param references: List of references + :param history_id: history record ID + :param item_nums: optional List of item numbers + :return: tuple containing resolved records and updated references """ if not item_nums: item_nums = list(range(1, len(references)+1)) @@ -425,18 +442,18 @@ def populate_resolved_reference_records_pre_resolved(self, references, history_i if 'item_num' in ref: del ref['item_num'] return resolved_records, references - def populate_tables_pre_resolved_initial_status(self, source_bibcode, source_filename, parsername, references): + def populate_tables_pre_resolved_initial_status(self, source_bibcode: str, source_filename: str, parsername: str, references: List) -> List: """ - this is called when the references are being processed for the first time, from the file + populate database tables for references being processed for the first time - :param source_bibcode: - :param source_filename: - :param parsername: - :param references: - :return: + :param source_bibcode: source bibcode + :param source_filename: source filename + :param parsername: parser name + :param references: List of references + :return: List of processed references """ - try: - with self.session_scope() as session: + with self.session_scope() as session: + try: reference_record = ReferenceSource(bibcode=source_bibcode, source_filename=source_filename, resolved_filename=get_resolved_filename(source_filename), @@ -451,27 +468,27 @@ def populate_tables_pre_resolved_initial_status(self, source_bibcode, source_fil total_ref=len(references)) history_id = self.insert_history_record(session, history_record) resolved_records, references = self.populate_resolved_reference_records_pre_resolved(references, history_id) - self.insert_resolved_referencce_records(session, resolved_records) + self.insert_resolved_reference_records(session, resolved_records) session.commit() self.logger.info("Source file %s for bibcode %s with %d references, processed successfully." % (source_filename, source_bibcode, len(references))) return references - except SQLAlchemyError as e: - session.rollback() - self.logger.info("Source file %s information failed to get added to database. Error: %s" % (source_filename, str(e.__dict__['orig']))) - return [] + except SQLAlchemyError as e: + session.rollback() + self.logger.error("Source file %s information failed to get added to database. Error: %s" % (source_filename, str(e))) + return [] - def populate_tables_pre_resolved_retry_status(self, source_bibcode, source_filename, source_modified, retry_records): + def populate_tables_pre_resolved_retry_status(self, source_bibcode: str, source_filename: str, source_modified: str, retry_records: List[Dict]) -> List[Dict]: """ - this is called when the references are being reprocessed, usually cherry picked from the records in the database + this is called when the references are being reprocessed, usually cherry-picked from the records in the database - :param source_bibcode: - :param source_filename: - :param source_modified: - :param retry_records: - :return: + :param source_bibcode: source bibcode + :param source_filename: source filename + :param source_modified: last modified date of the source file + :param retry_records: List of references to be reprocessed + :return: List of processed references """ - try: - with self.session_scope() as session: + with self.session_scope() as session: + try: history_record = ProcessedHistory(bibcode=source_bibcode, source_filename=source_filename, source_modified=source_modified, @@ -480,113 +497,117 @@ def populate_tables_pre_resolved_retry_status(self, source_bibcode, source_filen total_ref=len(retry_records)) history_id = self.insert_history_record(session, history_record) resolved_records, references = self.populate_resolved_reference_records_pre_resolved(retry_records, history_id) - if resolved_records: - self.insert_resolved_referencce_records(session, resolved_records) - session.commit() - self.logger.info("Source file %s for bibcode %s with %d references, for reprocessing added successfully." % (source_filename, source_bibcode, len(references))) - return references - except SQLAlchemyError as e: - session.rollback() - self.logger.info("Source file %s information for reprocessing failed to get added to database." % (source_filename, str(e.__dict__['orig']))) - return [] + self.insert_resolved_reference_records(session, resolved_records) + session.commit() + self.logger.info("Source file %s for bibcode %s with %d references, for reprocessing added successfully." % (source_filename, source_bibcode, len(references))) + return references + except SQLAlchemyError as e: + session.rollback() + self.logger.error("Source file %s information for reprocessing failed to get added to database. Error: %s" % (source_filename, str(e))) + return [] - def populate_tables_post_resolved(self, resolved_reference, source_bibcode, classic_resolved_filename): + def populate_tables_post_resolved(self, resolved_reference: List, source_bibcode: str, classic_resolved_filename: str) -> bool: """ - this is called after references has been resolved + update tables after references have been resolved - :param resolved_reference: - :param source_bibcode: - :param classic_resolved_filename: - :return: + :param resolved_reference: List of resolved references + :param source_bibcode: source bibcode + :param classic_resolved_filename: filename of classic resolved references + :return: True if successful """ - try: - # if the filename for classic resolver output is supplied, read the resolved information - # make sure that the length matches resolved, classic does some breaking a reference into two - # and hence messes up the order if we want to compare one-to-one, if that is the case, just - # ignore the result - resolved_classic = None - if classic_resolved_filename: - resolved_classic = compare_classic_and_service(resolved_reference, source_bibcode, classic_resolved_filename) - - with self.session_scope() as session: - resolved_records = [] - compare_records = [] - for i, ref in enumerate(resolved_reference): - match = self.RE_PARSE_ID.match(ref['id']) - history_id = int(match.group('history_id')) - item_num = int(match.group('item_num')) - # TODO change refstring to refraw for reference_raw - resolved_record = ResolvedReference(history_id=history_id, - item_num=item_num, - reference_str=ref.get('refstring', None), - bibcode=ref.get('bibcode', None), - score=ref.get('score', None), - reference_raw=ref.get('refstring', None)) - resolved_records.append(resolved_record) + with self.session_scope() as session: + try: + # if the filename for classic resolver output is supplied, read the resolved information + # make sure that the length matches resolved, classic does some breaking a reference into two + # and hence messes up the order if we want to compare one-to-one, if that is the case, just + # ignore the result + resolved_classic = None + if classic_resolved_filename: + resolved_classic = compare_classic_and_service(resolved_reference, source_bibcode, classic_resolved_filename) + + resolved_records = [] + compare_records = [] + for i, ref in enumerate(resolved_reference): + match = self.RE_PARSE_ID.match(ref['id']) + history_id = int(match.group('history_id')) + item_num = int(match.group('item_num')) + # TODO change refstring to refraw for reference_raw + resolved_record = ResolvedReference(history_id=history_id, + item_num=item_num, + reference_str=ref.get('refstring', None), + bibcode=ref.get('bibcode', None), + score=ref.get('score', None), + reference_raw=ref.get('refstring', None)) + resolved_records.append(resolved_record) + if resolved_classic: + compare_record = CompareClassic(history_id=history_id, + item_num=item_num, + bibcode=resolved_classic[i][1], + score=int(resolved_classic[i][2]), + state=resolved_classic[i][3]) + compare_records.append(compare_record) if resolved_classic: - compare_record = CompareClassic(history_id=history_id, - item_num=item_num, - bibcode=resolved_classic[i][1], - score=int(resolved_classic[i][2]), - state=resolved_classic[i][3]) - compare_records.append(compare_record) - if resolved_classic: - self.update_resolved_reference_records(session, resolved_records) - self.insert_compare_records(session, compare_records) - else: - self.update_resolved_reference_records(session, resolved_records) - session.commit() - self.logger.info("Updated %d resolved reference records successfully." % len(resolved_reference)) - return True - except SQLAlchemyError as e: - session.rollback() - self.logger.info("Failed to update %d resolved reference records successfully. Error %s" % (len(resolved_reference), str(e))) - return False + self.update_resolved_reference_records(session, resolved_records) + self.insert_compare_records(session, compare_records) + else: + self.update_resolved_reference_records(session, resolved_records) + session.commit() + self.logger.info("Updated %d resolved reference records successfully." % len(resolved_reference)) + return True + except SQLAlchemyError as e: + session.rollback() + self.logger.error("Failed to update %d resolved reference records successfully. Error %s" % (len(resolved_reference), str(e))) + return False - def get_count_reference_source_records(self, session): + def get_count_reference_source_records(self, session: object) -> int: """ + get the count of records in the reference source table - :param session: - :return: + :param session: database session + :return: number of records """ rows = session.query(ReferenceSource).count() self.logger.debug("Currently there are %d records in `ReferenceSource` table."%rows) return rows - def get_count_processed_history_records(self, session): + def get_count_processed_history_records(self, session: object) -> int: """ + get the count of records in the processed history table - :param session: - :return: + :param session: database session + :return: number of records """ rows = session.query(ProcessedHistory).count() self.logger.debug("Currently there are %d records in `ProcessedHistory` table."%rows) return rows - def get_count_resolved_reference_records(self, session): + def get_count_resolved_reference_records(self, session: object) -> int: """ + get the count of records in the resolved reference table - :param session: - :return: + :param session: database session + :return: number of records """ rows = session.query(ResolvedReference).count() self.logger.debug("Currently there are %d records in `ResolvedReference` table."%rows) return rows - def get_count_compare_classic_records(self, session): + def get_count_compare_classic_records(self, session: object) -> int: """ + get the count of records in the compare classic table - :param session: - :return: + :param session: database session + :return: number of records """ rows = session.query(CompareClassic).count() self.logger.debug("Currently there are %d records in `CompareClassic` table."%rows) return rows - def get_count_records(self): + def get_count_records(self) -> List: """ + get the count of records in all tables - :return: + :return: List of dictionaries with table names and record counts """ with self.session_scope() as session: results = [ @@ -614,14 +635,14 @@ def get_count_records(self): ] return results - def get_service_classic_compare_tags(self, session, source_bibcode, source_filename): + def get_service_classic_compare_tags(self, session: object, source_bibcode: str, source_filename: str) -> object: """ - makes a grid of classic and service compared tags and returns the query + generates a comparison grid for classic and service resolved references - :param session: - :param source_bibcode: - :param source_filename: - :return: + :param session: database session + :param source_bibcode: source bibcode + :param source_filename: source filename + :return: subquery object containing comparison results """ # given reference source (bibcodes and filenames), have query that would contain # all resolved records ids, and if we have reprocessed records, it contains one @@ -652,16 +673,17 @@ def get_service_classic_compare_tags(self, session, source_bibcode, source_filen .group_by(CompareClassic.history_id, CompareClassic.item_num) \ .subquery() - def get_service_classic_compare_stats_grid(self, source_bibcode, source_filename): + def get_service_classic_compare_stats_grid(self, source_bibcode: str, source_filename: str) -> tuple: """ + retrieve comparison statistics between service and classic resolved references - :param source_bibcode: - :param source_filename: - :return: + :param source_bibcode: source bibcode + :param source_filename: source filename + :return: tuple containing a text-based grid, total references, and resolved references """ with self.session_scope() as session: compare_grid = self.get_service_classic_compare_tags(session, source_bibcode, source_filename) - results = session.query(ResolvedReference.reference_str.label('refstr'), + rows = session.query(ResolvedReference.reference_str.label('refstr'), ResolvedReference.bibcode.label('service_bibcode'), CompareClassic.bibcode.label('classic_bibcode'), ResolvedReference.score.label('service_conf'), CompareClassic.score.label('classic_score'), compare_grid.c.MATCH.label('match'), compare_grid.c.MISS.label('miss'), @@ -673,66 +695,67 @@ def get_service_classic_compare_stats_grid(self, source_bibcode, source_filename ResolvedReference.item_num == compare_grid.c.item_num)) \ .order_by(ResolvedReference.history_id, ResolvedReference.item_num) \ .all() - if results: + if rows: # Texttable functionality is here https://pypi.org/project/texttable/ table = Texttable() table.set_cols_width([60,19,19,15,15,5,5,5,5,5]) table.set_cols_dtype(['t']*10) table.set_cols_align(['l']+['c']*9) - table.header(results[0]._asdict().keys()) + table.header(rows[0]._asdict().keys()) num_resolved = 0 - for result in results: + for row in rows: # count how many was resolved on the side of service - if not result[1].startswith('.'): + if not row[1].startswith('.'): num_resolved += 1 - row = [] - for item in result: + result = [] + for item in row: if not item: item = '' - row.append(item) - table.add_row(row) - return table.draw(), len(results), num_resolved + result.append(item) + table.add_row(result) + return table.draw(), len(rows), num_resolved return 'Unable to fetch data for reference source file `%s` from database!'%source_filename, -1, -1 - def get_reprocess_records(self, type, score_cutoff, match_bibcode, date_cutoff): + def filter_reprocess_query(self, query: object, type: int, score_cutoff: float, match_bibcode: str, date_cutoff: int) -> object: """ + apply one of the four selected filters, also apply date if requested - :param type: - :param score_cutoff: - :param match_bibcode: - :param date_cutoff: - :return: + :param query: SQLAlchemy query object + :param type: type of filter to apply + :param score_cutoff: score threshold for filtering + :param match_bibcode: bibcode pattern for filtering + :param date_cutoff: number of days to filter by recent records + :return: filtered query object """ - def apply_filter(query, type, score_cutoff, match_bibcode, date_cutoff): - """ - apply one of the four selected filter, also apply date if requested - - :param query: - :param type: - :param score_cutoff: - :param match_bibcode: - :param date_cutoff: - :return: - """ - if type == ReprocessQueryType.score: - query = query.filter(ResolvedReference.score <= "%.2f" % score_cutoff) - elif type == ReprocessQueryType.bibstem and len(match_bibcode): - query = query.filter(ResolvedReference.bibcode.like('____%s__________' % match_bibcode)) - elif type == ReprocessQueryType.year and len(match_bibcode): - query = query.filter(ResolvedReference.bibcode.like('%s_______________' % match_bibcode)) - elif type == ReprocessQueryType.failed: - query = query.filter(and_(ResolvedReference.bibcode == '0000', ResolvedReference.score == -1)) - if date_cutoff: - since = datetime.now() - timedelta(days=int(date_cutoff)) - query = query.filter(ProcessedHistory.date >= since) - return query - - rows = [] + if type == ReprocessQueryType.score: + query = query.filter(ResolvedReference.score <= "%.2f" % score_cutoff) + elif type == ReprocessQueryType.bibstem and len(match_bibcode): + query = query.filter(ResolvedReference.bibcode.like('____%s__________' % match_bibcode)) + elif type == ReprocessQueryType.year and len(match_bibcode): + query = query.filter(ResolvedReference.bibcode.like('%s_______________' % match_bibcode)) + elif type == ReprocessQueryType.failed: + query = query.filter(and_(ResolvedReference.bibcode == '0000', ResolvedReference.score == -1)) + if date_cutoff: + since = datetime.now() - timedelta(days=int(date_cutoff)) + query = query.filter(ProcessedHistory.date >= since) + return query + + def get_reprocess_records(self, type: int, score_cutoff: float, match_bibcode: str, date_cutoff: int) -> List: + """ + retrieve references that need reprocessing based on filters + + :param type: type of reprocessing filter + :param score_cutoff: score threshold + :param match_bibcode: bibcode filter + :param date_cutoff: date filter in days + :return: List of references for reprocessing + """ + results = [] with self.session_scope() as session: # have a query containing unique reference source ids (bibcodes and filenames), # that have been filtered on one of four possible options and also date if requested reference_source_ids = session.query(ProcessedHistory.bibcode, ProcessedHistory.source_filename) \ .filter(ProcessedHistory.id == ResolvedReference.history_id) - reference_source_ids = apply_filter(reference_source_ids, type, score_cutoff, match_bibcode, date_cutoff) + reference_source_ids = self.filter_reprocess_query(reference_source_ids, type, score_cutoff, match_bibcode, date_cutoff) reference_source_ids = reference_source_ids.distinct().all() bibcodes = [ids[0] for ids in reference_source_ids] filenames = [ids[1] for ids in reference_source_ids] @@ -744,11 +767,11 @@ def apply_filter(query, type, score_cutoff, match_bibcode, date_cutoff): .filter(and_(ProcessedHistory.id == ResolvedReference.history_id), ProcessedHistory.bibcode.in_(bibcodes), ProcessedHistory.source_filename.in_(filenames)) - resolved_reference_ids = apply_filter(resolved_reference_ids, type, score_cutoff, match_bibcode, date_cutoff) + resolved_reference_ids = self.filter_reprocess_query(resolved_reference_ids, type, score_cutoff, match_bibcode, date_cutoff) resolved_reference_ids = resolved_reference_ids.distinct().subquery() - results = session.query(resolved_reference_ids.c.history_id.label('history_id'), + rows = session.query(resolved_reference_ids.c.history_id.label('history_id'), resolved_reference_ids.c.item_num.label('item_num'), ResolvedReference.reference_str.label('refstr'), ResolvedReference.reference_raw.label('refraw'), @@ -764,30 +787,154 @@ def apply_filter(query, type, score_cutoff, match_bibcode, date_cutoff): .order_by(ResolvedReference.history_id, ResolvedReference.item_num) \ .all() - if results: - results = [r._asdict() for r in results] - row = {} + if rows: + rows = [r._asdict() for r in rows] + result = {} history_id = -1 - for result in results: - if result['history_id'] != history_id: - if row: - rows.append(row) - row = {} - history_id = result['history_id'] + for row in rows: + if row['history_id'] != history_id: + if result: + results.append(result) + result = {} + history_id = row['history_id'] for key in ['source_bibcode', 'source_filename', 'source_modified', 'parser_name']: - row[key] = result[key] - row['references'] = [] + result[key] = row[key] + result['references'] = [] reference = {} for key in ['item_num', 'refstr', 'refraw']: - reference[key] = result[key] - row['references'].append(reference) + reference[key] = row[key] + result['references'].append(reference) else: reference = {} for key in ['item_num', 'refstr', 'refraw']: - reference[key] = result[key] - row['references'].append(reference) + reference[key] = row[key] + result['references'].append(reference) # last batch, if any - if row: - rows.append(row) - return rows + if result: + results.append(result) + return results + + def get_resolved_references_all(self, source_bibcode: str) -> List[tuple]: + """ + retrieve all resolved references with the highest score per resolved bibcode + + :param source_bibcode: source bibcode for which resolved references should be queried + :return: List of tuples containing resolved references with metadata + """ + result = [] + with self.session_scope() as session: + # build the query to select the highest-scored resolved references per resolved bibcode + # also return name of the parser, order number of parsed reference, date it was parsed, + # and the confidence score + highest_scored_resolved_reference = session.query( + ReferenceSource.bibcode.label('source_bibcode'), + ProcessedHistory.date.label('date'), + ResolvedReference.item_num.label('id'), + ResolvedReference.bibcode.label('resolved_bibcode'), + ResolvedReference.score.label('score'), + ReferenceSource.parser_name.label('parser_name'), + func.row_number().over( + partition_by=[ReferenceSource.bibcode, ReferenceSource.parser_name, ResolvedReference.bibcode], + order_by=desc(ResolvedReference.score) + ).label('ranking_by_score') + ).join(ProcessedHistory, ProcessedHistory.id == ResolvedReference.history_id) \ + .join(ReferenceSource, ProcessedHistory.bibcode == ReferenceSource.bibcode) \ + .filter(and_(ReferenceSource.bibcode == source_bibcode, + ResolvedReference.score != 0)) \ + .subquery() + + # query database now + rows = session.query( + highest_scored_resolved_reference.c.source_bibcode, + highest_scored_resolved_reference.c.date, + highest_scored_resolved_reference.c.id, + highest_scored_resolved_reference.c.resolved_bibcode, + highest_scored_resolved_reference.c.score, + highest_scored_resolved_reference.c.parser_name) \ + .filter(highest_scored_resolved_reference.c.ranking_by_score == 1) \ + .order_by(highest_scored_resolved_reference.c.resolved_bibcode) \ + .all() + + if len(rows) > 0: + for row in rows: + result.append((row.source_bibcode, + row.date.strftime("%Y-%m-%d %H:%M:%S"), + row.id, + row.resolved_bibcode, + float(row.score), + row.parser_name)) + else: + self.logger.error(f'Unable to fetch resolved references for source bibcode `{source_bibcode}`.') + + return result + + def get_resolved_references(self, source_bibcode: str) -> List[Dict]: + """ + retrieve resolved references with the highest parser priority for each unique combination of source_bibcode, parser_name, and resolved_bibcode + + :param source_bibcode: source bibcode for which resolved references should be queried + :return: List of dictionaries containing the highest-priority resolved references + """ + result = [] + with self.session_scope() as session: + + # Build the query to rank parsers by priority (based on the parser_name) and then by score + highest_priority_resolved_reference = session.query( + ReferenceSource.bibcode.label('source_bibcode'), + ProcessedHistory.date.label('date'), + ResolvedReference.item_num.label('id'), + ResolvedReference.bibcode.label('resolved_bibcode'), + ResolvedReference.score.label('score'), + ReferenceSource.parser_name.label('parser_name'), + case( + [ + (ReferenceSource.parser_name.in_(['arXiv', 'CrossRef']), 1), + (ReferenceSource.parser_name == 'Arthur', 3) + ], + else_=2 + ).label('parser_priority'), + func.row_number().over( + partition_by=[ReferenceSource.bibcode, ResolvedReference.bibcode], + order_by=[desc(case( + [ + (ReferenceSource.parser_name.in_(['arXiv', 'CrossRef']), 1), + (ReferenceSource.parser_name == 'Arthur', 3) + ], + else_=2 + )), desc(ResolvedReference.score)] + ).label('ranking_by_priority') + ).join(ProcessedHistory, ProcessedHistory.id == ResolvedReference.history_id) \ + .join(ReferenceSource, ProcessedHistory.bibcode == ReferenceSource.bibcode) \ + .filter(and_(ReferenceSource.bibcode == source_bibcode, + ResolvedReference.score != 0)) \ + .subquery() + + # Query the ranked resolved references, ensuring we get the highest-ranked ones (ranking_by_priority == 1) + rows = session.query( + highest_priority_resolved_reference.c.source_bibcode, + highest_priority_resolved_reference.c.date, + highest_priority_resolved_reference.c.id, + highest_priority_resolved_reference.c.resolved_bibcode, + highest_priority_resolved_reference.c.score, + highest_priority_resolved_reference.c.parser_name, + highest_priority_resolved_reference.c.parser_priority)\ + .filter(highest_priority_resolved_reference.c.ranking_by_priority == 1) \ + .order_by(highest_priority_resolved_reference.c.resolved_bibcode) \ + .all() + + # Process the results + if rows: + for row in rows: + result.append({ + 'source_bibcode': row.source_bibcode, + 'date': row.date.strftime("%Y-%m-%d %H:%M:%S"), + 'id': row.id, + 'resolved_bibcode': row.resolved_bibcode, + 'score': float(row.score), + 'parser_name': row.parser_name, + 'parser_priority': row.parser_priority + }) + else: + self.logger.error(f'Unable to fetch resolved references for source bibcode `{source_bibcode}`.') + return result diff --git a/adsrefpipe/models.py b/adsrefpipe/models.py old mode 100644 new mode 100755 index e1edc3d..5e2c725 --- a/adsrefpipe/models.py +++ b/adsrefpipe/models.py @@ -20,17 +20,19 @@ class Action(Base): __tablename__ = 'action' status = Column(String, primary_key=True) - def get_status_new(self): + def get_status_new(self) -> str: """ + returns the initial status - :return: + :return: string indicating the initial status """ return 'initial' - def get_status_retry(self): + def get_status_retry(self) -> str: """ + returns the retry status - :return: + :return: string indicating the retry status """ return 'retry' @@ -49,50 +51,57 @@ class Parser(Base): reference_service_endpoint = Column(String) matches = Column(JSONB, default=dict) - def __init__(self, name, extension_pattern, reference_service_endpoint, matches=[]): + def __init__(self, name: str, extension_pattern: str, reference_service_endpoint: str, matches: list = []): """ + initializes a parser object - :param name: - :param extension_pattern: - :param reference_service_endpoint: - :param matches: + :param name: name of the parser + :param extension_pattern: reference file extension pattern used by the parser + :param reference_service_endpoint: endpoint for the reference service + :param matches: list of matches for the parser-reference file mapping """ self.name = name self.extension_pattern = extension_pattern self.reference_service_endpoint = reference_service_endpoint self.matches = matches - def get_name(self): + def get_name(self) -> str: """ + returns the name of the parser - :return: + :return: string indicating the name of the parser """ return self.name - def get_extension_pattern(self): + def get_extension_pattern(self) -> str: """ + returns the extension pattern of the reference files processed by the parser - :return: + :return: string indicating the file extension pattern """ return self.extension_pattern - def get_endpoint(self): + def get_endpoint(self) -> str: """ + returns the reference service endpoint to resolve references - :return: + :return: string indicating the reference service endpoint """ return self.reference_service_endpoint - def get_matches(self): + def get_matches(self) -> list: """ + returns the list of mappings for the parser - :return: + :return: list of matches """ return self.matches - def toJSON(self): + def toJSON(self) -> dict: """ - :return: values formatted as python dict + converts the parser object to a JSON dictionary + + :return: dictionary containing the parser details """ return { 'name': self.name, @@ -103,28 +112,37 @@ def toJSON(self): class ReferenceSource(Base): + """ + This class represents the source of a reference in the database, + each entry links a source file with its resolved version and + the parser used to process the reference. + It serves as the initial record for the reference processing pipeline. + """ __tablename__ = 'reference_source' bibcode = Column(String, primary_key=True) source_filename = Column(String, primary_key=True) resolved_filename = Column(String) parser_name = Column(String, ForeignKey('parser.name')) - def __init__(self, bibcode, source_filename, resolved_filename, parser_name): + def __init__(self, bibcode: str, source_filename: str, resolved_filename: str, parser_name: str): """ + initializes a reference source object - :param bibcode: - :param source_filename: - :param resolved_filename: - :param parser_name: + :param bibcode: unique bibcode for the reference source + :param source_filename: name of the reference file + :param resolved_filename: name of the resolved file for future use + :param parser_name: name of the parser used """ self.bibcode = bibcode self.source_filename = source_filename self.resolved_filename = resolved_filename self.parser_name = parser_name - def toJSON(self): + def toJSON(self) -> dict: """ - :return: values formatted as python dict, if no values found returns empty structure, not None + converts the reference source object to a JSON dictionary + + :return: dictionary containing reference source details """ return { 'bibcode': self.bibcode, @@ -135,6 +153,10 @@ def toJSON(self): class ProcessedHistory(Base): + """ + This class tracks the processing history of a resolved reference, recording details about the processing status, + reference file timestamp, and the total number of references parsed. + """ __tablename__ = 'processed_history' __table_args__ = (ForeignKeyConstraint( ['bibcode', 'source_filename'], ['reference_source.bibcode', 'reference_source.source_filename']),) id = Column(Integer, primary_key=True) @@ -145,15 +167,16 @@ class ProcessedHistory(Base): date = Column(DateTime, default=func.now()) total_ref = Column(Integer) - def __init__(self, bibcode, source_filename, source_modified, status, date, total_ref): + def __init__(self, bibcode: str, source_filename: str, source_modified: DateTime, status: str, date: DateTime, total_ref: int): """ + initializes a processed history object - :param bibcode: - :param source_filename: - :param source_modified: - :param status: - :param date: - :param total_ref: + :param bibcode: bibcode for the reference source + :param source_filename: name of the source reference file + :param source_modified: timestamp of the reference file at the time it was read + :param status: first time processing, or reprocessing this list of references + :param date: date of processing + :param total_ref: total number of references parsed """ self.bibcode = bibcode self.source_filename = source_filename @@ -162,9 +185,11 @@ def __init__(self, bibcode, source_filename, source_modified, status, date, tota self.date = date self.total_ref = total_ref - def toJSON(self): + def toJSON(self) -> dict: """ - :return: values formatted as python dict, if no values found returns empty structure, not None + converts the processed history object to a JSON dictionary + + :return: dictionary containing processed history details """ return { 'bibcode': self.bibcode, @@ -177,6 +202,10 @@ def toJSON(self): class ResolvedReference(Base): + """ + This class stores information about references that have been resolved, including the reference string, score, + and its associated history entry. + """ __tablename__ = 'resolved_reference' history_id = Column(Integer, ForeignKey('processed_history.id'), primary_key=True) item_num = Column(Integer, primary_key=True) @@ -185,14 +214,16 @@ class ResolvedReference(Base): score = Column(Numeric) reference_raw = Column(String) - def __init__(self, history_id, item_num, reference_str, bibcode, score, reference_raw): + def __init__(self, history_id: int, item_num: int, reference_str: str, bibcode: str, score: float, reference_raw: str): """ + initializes a resolved reference object - :param history_id: - :param item_num - :param reference_str: - :param bibcode: - :param score: + :param history_id: ID of the related processed history entry + :param item_num: order of the reference within the source + :param reference_str: reference string + :param bibcode: resolved bibcode + :param score: confidence score of the resolved reference + :param reference_raw: raw reference string """ self.history_id = history_id self.item_num = item_num @@ -201,26 +232,19 @@ def __init__(self, history_id, item_num, reference_str, bibcode, score, referenc self.score = score self.reference_raw = reference_raw - def toJSON(self): + def toJSON(self) -> dict: """ - :return: values formatted as python dict, if no values found returns empty structure, not None + converts the resolved reference object to a JSON dictionary + + :return: dictionary containing resolved reference details """ - if self.reference_raw: - return { - 'history_id': self.history_id, - 'reference_str': self.reference_str, - 'bibcode': self.bibcode, - 'score': self.score, - 'item_num': self.item_num, - 'reference_raw': self.reference_raw - } - # do not include reference_raw if it is None return { 'history_id': self.history_id, 'reference_str': self.reference_str, 'bibcode': self.bibcode, 'score': self.score, 'item_num': self.item_num, + **({'reference_raw': self.reference_raw} if self.reference_raw else {}) } @@ -228,8 +252,8 @@ class CompareClassic(Base): """ This table is for comparing classic resolver with service reference, keeps track of service reference that matched classic reference - bibcode and score here is for classic - + bibcode and score here is for classic, should be a temparary class + only used during development/testing and verification """ __tablename__ = 'compare_classic' history_id = Column(Integer, ForeignKey('processed_history.id'), primary_key=True) @@ -238,14 +262,15 @@ class CompareClassic(Base): score = Column(Numeric) state = Column(String) - def __init__(self, history_id, item_num, bibcode, score, state): + def __init__(self, history_id: int, item_num: int, bibcode: str, score: Numeric, state: str): """ + initializes a compare classic object - :param history_id: - :param item_num: - :param bibcode: - :param classic_score: - :param state: + :param history_id: ID of the related processed history entry + :param item_num: order of the reference within the source + :param bibcode: resolved bibcode + :param score: confidence score of the resolved reference + :param state: comparison state (ie, matched, unmatched, etc.) """ self.history_id = history_id self.item_num = item_num @@ -253,9 +278,11 @@ def __init__(self, history_id, item_num, bibcode, score, state): self.score = score self.state = state - def toJSON(self): + def toJSON(self) -> dict: """ - :return: values formatted as python dict, if no values found returns empty structure, not None + converts the compare classic object to a JSON dictionary + + :return: dictionary containing compare classic details """ return { 'history_id': self.history_id, diff --git a/adsrefpipe/refparsers/AASxml.py b/adsrefpipe/refparsers/AASxml.py index 5f91afd..7815d56 100644 --- a/adsrefpipe/refparsers/AASxml.py +++ b/adsrefpipe/refparsers/AASxml.py @@ -1,10 +1,9 @@ import sys, os -import regex as re import argparse +from typing import List, Dict from adsputils import setup_logging, load_config - logger = setup_logging('refparsers') config = {} config.update(load_config()) @@ -15,9 +14,14 @@ class AASreference(XMLreference): + """ + This class handles parsing AAS references in XML format. It extracts citation information such as authors, + year, journal, title, volume, pages, DOI, and eprint, and stores the parsed details. + """ def parse(self): """ + parse the AAS reference and extract citation information such as authors, year, title, and DOI :return: """ @@ -50,22 +54,26 @@ def parse(self): class AAStoREFs(XMLtoREFs): + """ + This class converts AAS XML references to a standardized reference format. It processes raw AAS references from + either a file or a buffer and outputs parsed references, including bibcodes, authors, volume, pages, and DOI. + """ - def __init__(self, filename, buffer): + def __init__(self, filename: str, buffer: str): """ + initialize the AAStoREFs object to process AAS references - :param filename: - :param buffer: - :param unicode: - :param tag: + :param filename: the path to the source file + :param buffer: the XML references as a buffer """ XMLtoREFs.__init__(self, filename, buffer, parsername=AAStoREFs, tag='CITATION') - def process_and_dispatch(self): + def process_and_dispatch(self) -> List[Dict[str, List[Dict[str, str]]]]: """ + perform reference cleaning and parsing, then dispatch the parsed references - :return: + :return: a list of dictionaries containing bibcodes and parsed references """ references = [] for raw_block_references in self.raw_references: @@ -90,6 +98,10 @@ def process_and_dispatch(self): return references +# This is the main program used for manual testing and verification of AASxml references. +# It allows parsing references from either a file or a buffer, and if no input is provided, +# it runs a source test file to verify the functionality against expected parsed results. +# The test results are printed to indicate whether the parsing is successful or not. from adsrefpipe.tests.unittests.stubdata import parsed_references if __name__ == '__main__': # pragma: no cover parser = argparse.ArgumentParser(description='Parse AAS references') diff --git a/adsrefpipe/refparsers/ADShtml.py b/adsrefpipe/refparsers/ADShtml.py index 5ee3f55..b7b967f 100644 --- a/adsrefpipe/refparsers/ADShtml.py +++ b/adsrefpipe/refparsers/ADShtml.py @@ -3,20 +3,25 @@ import regex as re import argparse import urllib.parse - -from adsrefpipe.refparsers.toREFs import HTMLtoREFs -from adsrefpipe.refparsers.reference import unicode_handler -from adsrefpipe.utils import get_bibcode as get_bibcode_from_doi, verify_bibcode +from typing import List, Dict from adsputils import setup_logging, load_config - logger = setup_logging('refparsers') config = {} config.update(load_config()) +from adsrefpipe.refparsers.toREFs import HTMLtoREFs +from adsrefpipe.refparsers.reference import unicode_handler +from adsrefpipe.utils import get_bibcode as get_bibcode_from_doi, verify_bibcode + class ADSHTMLtoREFs(HTMLtoREFs): + """ + This class processes ADS HTML references and converts them into a standardized reference format. + It handles reference cleanup and parsing of citation information like authors, title, year, journal, volume, pages, DOI, and eprint. + """ + # list of regex patterns to clean the HTML references reference_cleanup = [ (re.compile(r'()'), ''), (re.compile(r'()', re.I), ''), @@ -38,23 +43,23 @@ class ADSHTMLtoREFs(HTMLtoREFs): (re.compile(r'()'), ''), # if there was a nested tag (ie, href inside comment ] - def __init__(self, filename, buffer, parsername, tag, file_type, cleanup=None, encoding='UTF-8'): + def __init__(self, filename: str, buffer: str, parsername: str, tag: str, file_type: str, cleanup=None, encoding='UTF-8'): """ - - :param filename: - :param buffer: - :param parsername: - :param tag: + :param filename: path to the reference file + :param buffer: buffer containing the references + :param parsername: name of the parser + :param tag: regex tag for parsing + :param file_type: the file type (HTML, XML, etc.) """ if not cleanup: cleanup = self.reference_cleanup HTMLtoREFs.__init__(self, filename, buffer, parsername=parsername, tag=tag, file_type=file_type, cleanup=cleanup, encoding=encoding) - def process_and_dispatch(self): + def process_and_dispatch(self) -> List[Dict[str, List[Dict[str, str]]]]: """ - this function does reference cleaning and then calls the parser + perform reference cleaning and parsing, then dispatch the parsed references - :return: + :return: a list of dictionaries containing bibcodes and parsed references """ references = [] for raw_block_references in self.raw_references: @@ -75,13 +80,15 @@ def process_and_dispatch(self): class AnnRevHTMLtoREFs(ADSHTMLtoREFs): """ - This is to process Annual Review references. There are + This class processes Annual Review references. + They are AnRFM/*/annurev.fluid AREPS/*/annurev.earth ARA+A/*/annurev.astro """ + # to clean up html references reference_cleanup = [ (re.compile(r'()'), ''), (re.compile(r'(|)', re.I), ''), @@ -101,6 +108,7 @@ class AnnRevHTMLtoREFs(ADSHTMLtoREFs): (re.compile(r'–'), '-'), (re.compile(r'(\<\d+:[A-Z]+\>)'), '') ] + # to clean up block of html references block_cleanup = [ (re.compile(r'(||||||||
|
|
|
||||||)', re.I), ''), (re.compile(r'&'), '&'), @@ -109,28 +117,33 @@ class AnnRevHTMLtoREFs(ADSHTMLtoREFs): (re.compile(r'(\s*', re.DOTALL), '') ] - # re_tag = re.compile(r'((?:|]*>)\s*([A-Z][a-z]+.*?)|(?:).*?)(?:|$)', (re.IGNORECASE | re.DOTALL)) + # to match tags in the reference block re_tag = re.compile(r'(?:(?:|]*>)\s*([A-Z][a-z]+.*?)|(?:)(.*?))(?:|$)', (re.IGNORECASE | re.DOTALL)) + # to match DOI in the format re_doi = re.compile(r'\(doi:(.*?)\)', re.IGNORECASE) + # to extract the bibcode re_bibcode = re.compile(r'(.*)') + # to match the reference text before a