-
Notifications
You must be signed in to change notification settings - Fork 0
How script v0.24 works
The script, keywords4cv_0.24.py, implements a sophisticated keyword extraction and analysis pipeline tailored for job descriptions. It aims to mimic and extend the functionality of an Applicant Tracking System (ATS) by identifying, scoring, and categorizing keywords within a corpus of job descriptions, comparing them against a user-defined set of skills and categories. The script goes beyond simple keyword matching by incorporating techniques like fuzzy matching, semantic similarity analysis, and reinforcement learning for adaptive parameter tuning.
-
Primary Input (Job Descriptions): The
-ior--inputcommand-line argument must resolve to a valid, readable file path accessible by the script's execution environment. This path is treated as a string and passed directly to theopen()function. Potential failure points include:- Permissions: The script's user must have read permissions on the file.
- File Existence: The file must exist at the specified path.
- Path Validity: The path must be correctly formatted for the operating system.
-
Secondary Input (Configuration): The
-cor--configargument behaves similarly to the input argument, requiring a valid, readable file path. The same potential failure points apply. The configuration file must be a valid YAML file adhering to a specific schema, validated rigorously by bothschemaandpydantic. -
Tertiary Input (Synonyms): The
phrase_synonyms_pathwithinconfig.yamlis optional. If provided, it must be a valid, readable file path, subject to the same constraints as the primary input. Ifphrase_synonym_sourceis set toapi, theapi_endpointmust be a valid URL, and theapi_keymust be a valid string (although its correctness is only verified during the API call). -
Environment Variable: The
K4CV_CACHE_SALTenvironment variable is checked. If it exists, its value is used; otherwise, thecache_saltvalue from the config file is used. If neither is present, a default value (default_secret_salt) is used.
-
Job Descriptions (JSON): The JSON structure is strictly enforced. Deviations (e.g., incorrect nesting, invalid data types, missing keys) will result in a
json.JSONDecodeError. The script expects a single top-level dictionary; other JSON structures (e.g., a list of dictionaries) will be misinterpreted. The values (job descriptions) are expected to be strings, but no explicit length limit is enforced at this stage (length limits are applied during sanitization). -
Configuration (YAML): The YAML structure is validated against a complex schema defined in
config_validation.py. This schema enforces data types, allowed values, and relationships between different configuration parameters. Theschemalibrary provides initial structural validation, whilepydanticprovides more fine-grained validation and type coercion. TheConfigclass and nested models (e.g.,ValidationConfig,TextProcessingConfig, etc.) define the expected structure and data types. Theextra="forbid"setting in theConfigclass ensures that no undefined parameters are allowed in the configuration file. -
Synonyms (JSON): Similar to job descriptions, the JSON structure is strictly enforced. The top level must be a dictionary. Keys must be strings, and values must be lists of strings. Empty lists are permitted, but
nullvalues or other data types within the lists will cause errors during processing. -
Synonyms (API): The API response must be valid JSON. The script specifically checks for the presence of the
synonymskey. The value associated with this key must be a list of strings. Any deviation from this format will result in a warning, and the API synonyms will be ignored for that particular phrase. The script implements a retry mechanism with exponential backoff for API calls, handlingrequests.Timeoutandrequests.RequestExceptionerrors.
-
Job Descriptions: The
load_job_datafunction uses atry...exceptblock to handle potential errors. Thewith open(...)statement ensures proper file handling (automatic closing). Theencoding="utf-8"argument explicitly specifies UTF-8 encoding, which is crucial for handling a wide range of characters. Thejson.load(f)function directly parses the file's contents into a Python dictionary. -
Configuration: The
load_configfunction also uses atry...exceptblock. It reads the entire YAML file into memory usingf.read(), then parses it withyaml.safe_load().yaml.safe_load()is used instead ofyaml.load()for security reasons, preventing arbitrary code execution from malicious YAML files. The Pydantic validation occurs after the initial YAML parsing. -
Synonyms: If
phrase_synonym_sourceisstatic, the_load_phrase_synonymsfunction uses atry...exceptblock andwith open(...)for file handling, similar toload_job_data. If the source isapi, the_get_synonyms_from_apimethod uses therequests.get()function with atimeoutparameter to prevent indefinite hanging. It includes detailed error handling for various API failure scenarios, including timeouts, connection errors, and invalid JSON responses. The API response is cached inself.api_cacheto reduce the number of API calls. -
NLTK Resources: The
ensure_nltk_resourcesfunction usesnltk.data.findto check if a resource exists. If not, it callsnltk.downloadwithquiet=Trueto suppress download output.
-
json: Specifically, theloadfunction for reading and parsing JSON data. -
yaml: Thesafe_loadfunction is used, avoiding the potentially unsafeloadfunction. -
argparse: TheArgumentParserand related classes are used to define and parse command-line arguments. -
sys: Used forsys.exitto terminate the script on errors andsys.version_infoto check the Python version. -
pathlib: ThePathclass is used for creating and manipulating file paths in an OS-independent manner. -
pydantic: TheBaseModel,Field,ValidationError, andfield_validatorcomponents are used extensively for defining the configuration schema and validating the loaded configuration. -
nltk: Used for downloading and finding NLTK resources, specificallynltk.data.findandnltk.download. -
requests: Thegetfunction is used to make HTTP requests to the synonym API, along with exception handling forTimeoutandRequestException. -
os: Used withos.environ.getto retrieve environment variables.
Command-line arguments (strings) provided by the operating system.
-
argparse.ArgumentParser.parse_args(): Parses the command-line strings, converting them into Python data types (strings for file paths). This step handles default values and argument validation (e.g., checking if the arguments are provided). -
initialize_analyzer(args.config):-
load_config(config_path):-
Path(config_path): Converts the string path to aPathobject. -
with open(...): Opens the file in read mode ("r") with UTF-8 encoding (encoding="utf-8"). -
yaml.safe_load(f): Parses the YAML content into a Python dictionary. -
Config(**raw_config): Creates a PydanticConfigobject, triggering validation against the defined schema. This step performs type coercion (e.g., converting strings to integers or floats where appropriate) and checks for constraints (e.g., minimum/maximum values, allowed values). -
config.dict(by_alias=True): Converts the Pydantic object to a dictionary, using aliases for field names (e.g.,format_becomesformat).
-
-
ensure_nltk_resources(): Iterates through a predefined list of NLTK resources (NLTK_RESOURCES). For each resource:-
nltk.data.find(resource): Attempts to locate the resource. -
nltk.download(resource.split("/")[1], quiet=True): If the resource is not found, it's downloaded.
-
-
OptimizedATS(config_path): The constructor of theOptimizedATSclass performs extensive initialization:-
self.config = load_config(config_path): Stores the validated configuration. -
self._load_and_configure_spacy_model():- Retrieves the
spacy_modelname from the configuration. - Determines the enabled and disabled spaCy pipeline components based on the configuration.
-
spacy.load(model_name, disable=disabled): Attempts to load the specified spaCy model, disabling the specified components. - Adds
sentencizerandlemmatizerif they are not already in the pipeline and are required. - Implements a retry mechanism with exponential backoff for loading the model, handling
OSErrorexceptions. If loading fails, attempts to download the model usingspacy.cli.download(model_name).
- Retrieves the
-
AdvancedKeywordExtractor(self.config, self.nlp): Initializes the keyword extractor:-
self._load_phrase_synonyms(): Loads phrase synonyms, either from a file (ifphrase_synonym_sourceis "static") or from an API (ifphrase_synonym_sourceis "api"). This function includes validation of the synonym data using theSynonymEntryPydantic model. -
self._load_and_process_all_skills(): Loads, preprocesses, and expands all skills from thekeyword_categoriesin the configuration. This involves preprocessing (lowercasing, cleaning), tokenization, n-gram generation, and synonym generation (including both static/API synonyms and WordNet-based synonyms). - Initializes
self.category_vectors,self.ngram_range, and other attributes.
-
-
ParallelProcessor(self.config, self.nlp, self.keyword_extractor): Initializes the parallel processor. -
TrigramOptimizer(...): Initializes the trigram optimizer:- Creates an
LRUCachefor caching trigram candidates. - Warms up the cache with a subset of the skills from the
keyword_categories.
- Creates an
-
SmartChunker(self.config): Initializes the smart chunker. -
AutoTuner(self.config): Initializes the auto tuner. -
self.working_dir = Path(...): Creates aPathobject for the working directory. -
self.working_dir.mkdir(exist_ok=True): Creates the working directory if it doesn't exist. -
self.run_id = ...: Generates a unique run ID usingxxhashand the current time. -
self._validate_config(): Performs final configuration validation using Pydantic. -
self._add_entity_ruler(self.nlp): Adds an entity ruler to the spaCy pipeline:- Creates patterns for section headings and skills based on the configuration.
- Adds these patterns to the entity ruler.
-
self._init_categories(): Calculates and stores centroid vectors for each keyword category.
-
-
load_job_data(args.input):-
with open(input_file, "r", encoding="utf-8") as f: Opens the job description file. -
return json.load(f): Loads and parses the JSON data.
-
-
-
analyzer: A fully initializedOptimizedATSinstance. This object contains:-
config: The validated configuration (as a dictionary). -
nlp: The loaded and configured spaCy model. -
keyword_extractor: An initializedAdvancedKeywordExtractorinstance. -
processor: An initializedParallelProcessorinstance. -
trigram_optim: An initializedTrigramOptimizerinstance. -
chunker: An initializedSmartChunkerinstance. -
tuner: An initializedAutoTunerinstance. -
working_dir: APathobject representing the working directory. -
run_id: A unique string identifier for the current run.
-
-
jobs: A Python dictionary containing the job descriptions, loaded from the input JSON file.
All initialized objects and data are stored in memory as attributes of the analyzer object or as local variables within the run_analysis function.
The same as listed in the previous response, with more specific function calls detailed above.
- Argument Parsing: O(1).
- Configuration Loading: I/O bound (reading the file). YAML parsing complexity depends on the size and complexity of the YAML file, but it's generally efficient. Pydantic validation adds some overhead, but it's also relatively fast.
- NLTK Resource Download: I/O bound (downloading files). This only happens if the resources are not already present.
-
spaCy Model Loading: This is the most computationally expensive part of this step. The complexity depends on the size of the spaCy model and the number of components in the pipeline. Loading a large model like
en_core_web_lgcan take several seconds. - Keyword Extractor Initialization: Loading synonyms and pre-processing skills can take some time, depending on the size of the synonym file and the number of skills.
- Other Initializations: The remaining initialization steps (creating the parallel processor, trigram optimizer, etc.) are relatively fast.
The jobs dictionary: {job_title (str): job_description (str)}.
-
analyzer.sanitize_input(jobs):- Creates an empty dictionary
cleaned = {}. - Iterates through the
jobsdictionary usingjobs.items(). -
Title Check:
isinstance(title, str): Checks if the title is a string.- If False:
-
self.config["validation"].get("allow_numeric_titles", False): Checks the configuration. - If
allow_numeric_titlesis True:title = str(title): Converts the title to a string. - If
allow_numeric_titlesis False: Logs a warning and continues to the next iteration (skipping the current job).
-
- If True: Proceeds to the description check.
- If False:
-
Description Check:
isinstance(desc, str) and desc.strip(): Checks if the description is a string and is not empty after stripping whitespace.- If False:
-
self.config["validation"].get("empty_description_policy", "warn"): Checks the configuration. - If
empty_description_policyis "error": Logs an error and continues to the next iteration. - If
empty_description_policyis "warn": Logs a warning. - If
empty_description_policyis "allow": Proceeds without logging.
-
- If True:
cleaned[title] = desc.strip(): Adds the job title and stripped description to thecleaneddictionary.
- If False:
- Returns the
cleaneddictionary.
- Creates an empty dictionary
- The config also has a
strict_modeoption. If this is set toTrue, the script will raise an exception instead of logging a warning or error.
cleaned: A dictionary with the same structure as jobs, but potentially containing fewer entries (if any jobs were discarded due to invalid titles or descriptions).
The cleaned dictionary replaces the original jobs dictionary in memory.
logging.
O(n), where n is the number of job descriptions in the input jobs dictionary. The operations within the loop (type checking, string stripping) are constant time.
The cleaned dictionary: {job_title (str): job_description (str)}.
-
analyzer._calc_dataset_stats(job_descriptions):-
lengths = [len(desc) for desc in job_descriptions.values()]: Creates a list of the lengths of all job descriptions. -
"avg_length": np.mean(lengths) if lengths else 0: Calculates the average length usingnp.mean(). Handles the case where the list is empty to avoid aZeroDivisionError. -
"num_texts": len(job_descriptions): Gets the number of job descriptions. - Returns a dictionary:
{"avg_length": ..., "num_texts": ...}.
-
dataset_stats: A dictionary containing the calculated statistics.
The dataset_stats dictionary is stored in memory.
numpy (specifically, the mean function).
O(n), where n is the number of job descriptions. Calculating the length of each description and then the mean is linear in the number of descriptions.
The dataset_stats dictionary: {"avg_length": ..., "num_texts": ...}.
-
analyzer.chunker.get_chunk_size(dataset_stats):-
state = (...): Creates a tuple representing the current state, based onavg_length,num_texts, and current memory usage (psutil.virtual_memory().percent). The state is discretized into bins (e.g.,avg_lengthdivided by 100). -
self.state_history.append(state): Appends the current state to a deque (state_history). - Iterates through the keys of the Q-table (
self.q_table), decaying the Q-values byself.decay_factorand removing entries with very small values (less than 0.01). -
self.q_table.get(state, self.config["dataset"]["default_chunk_size"]): Retrieves the Q-value for the current state. If the state is not in the Q-table, uses a default chunk size from the configuration. -
max(...),min(...): Clamps the calculated chunk size betweenmin_chunk_sizeandmax_chunk_size(from the configuration). - Returns the calculated chunk size.
-
An integer representing the chunk size.
The Q-table (self.q_table) and state history (self.state_history) are stored as attributes of the SmartChunker object.
cachetools (for LRUCache), psutil, collections (for defaultdict and deque), numpy.
- Q-table lookup: O(1) on average (due to the use of
LRUCache). - Q-table decay and cleanup: The complexity depends on the size of the Q-table, but it's amortized over multiple calls.
- Other operations (creating the state tuple, clamping the chunk size): O(1).
- The
cleaneddictionary:{job_title (str): job_description (str)}. - The calculated chunk size (integer).
-
analyzer._create_chunks(job_descriptions):-
items = list(job_descriptions.items()): Converts the dictionary into a list of (key, value) pairs. - Uses a list comprehension with slicing to create chunks:
[dict(items[i : i + chunk_size]) for i in range(0, len(job_descriptions), chunk_size)]. - Returns the list of chunks.
-
A list of dictionaries, where each dictionary is a chunk of job descriptions.
The list of chunks is stored in memory.
None.
O(n), where n is the number of job descriptions. Converting the dictionary to a list and creating the chunks is linear in the number of descriptions.
A single chunk of job descriptions (a dictionary: {job_title (str): job_description (str)}).
- Iterates through the chunks (outer loop in
analyze_jobs). -
texts = list(chunk.values()): Extracts the job description texts into a list. -
enhanced_keywords_with_original = list(self.processor.keyword_extractor.extract_keywords(texts)): Calls the keyword extraction function and immediately converts the generator output to a list. -
self.processor.keyword_extractor.extract_keywords(texts):-
workers = self.get_optimal_workers(texts): Determines the optimal number of worker processes for parallel processing, based on system resources and text complexity. This involves sampling texts, calculating their complexity (length + number of entities), and considering available memory. -
chunk_size = max(1, len(texts) // workers): calculates chunk size for thenlp.pipe -
chunks = self._chunk_texts(texts, chunk_size): creates chunks of texts for the nlp.pipe -
with ProcessPoolExecutor(max_workers=workers) as executor:: Creates a process pool for parallel processing. -
results = list(executor.map(self._process_text_chunk, chunks)): usesexecutor.mapto process chunks by multiple processes. - Inside
_process_text_chunk:-
docs = list(self.nlp.pipe(texts)): Processes the texts in batches usingnlp.pipe. This is where spaCy performs tokenization, POS tagging, lemmatization, and entity recognition. Thebatch_sizeandn_processparameters control the batching and parallel processing within spaCy. - Iterates through the
docsand correspondingtextsusingzip(docs, texts). -
Entity Extraction:
entity_keywords = [ent.text for ent in doc.ents if ent.label_ == "SKILL"]: Extracts entities labeled as "SKILL". -
Tokenization and Lemmatization:
- Identifies the spans of skill entities.
- Iterates through tokens in the
doc. - For tokens not within skill entity spans:
- Checks if the token's length is greater than 1 and if it's not a stop word.
- If both conditions are true, appends the lowercase lemma of the token to
non_entity_tokens.
-
preprocessed_text = self.preprocessor.preprocess(" ".join(non_entity_tokens)): Preprocesses the non-entity tokens (lowercasing, removing URLs, emails, special characters, and extra whitespace). -
token_list = preprocessed_text.split(): Splits the preprocessed text into a list of tokens.
-
N-gram Generation:
-
for n in range(self.ngram_range[0], self.ngram_range[1] + 1):: Iterates through the specified n-gram range. -
non_entity_keywords.update(self._generate_ngrams(token_list, n)): Generates n-grams using the_generate_ngramsfunction (which also filters stop words and short tokens) and adds them to thenon_entity_keywordsset.
-
-
Keyword Filtering:
- Combines
entity_keywordsandnon_entity_keywordsinto a single setkeywords. - Filters
keywordsto remove short keywords (length <= 1) and keywords consisting entirely of stop words.
- Combines
-
Fuzzy Matching and POS Filtering:
-
_apply_fuzzy_matching_and_pos_filter:- Iterates through the
filtered_keywords. - If a keyword is already in
self.all_skills(lowercase), it's added to thefiltered_keywordslist directly. - Otherwise, uses
rapidfuzz.process.extractOneto find the best match inself.all_skills. - If a match is found with a score above the
score_cutoff, and if the matched term's POS tag is in theallowed_poslist, the matched term (fromall_skills) is added tofiltered_keywords.
- Iterates through the
-
-
Semantic Filtering:
-
_semantic_filter:- Checks if
semantic_validationis enabled in the configuration. - Removes keywords that are in the
negative_keywordslist. - Calls
_is_in_contextfor each keyword:- Calculates the embedding of the keyword using spaCy.
- Extracts a context window around the keyword in the original text, using
_extract_sentencesand_get_context_window. These functions handle sentence splitting (including bullet points and numbered lists) and paragraph breaks. - Calculates the embedding of the context window.
- Calculates the cosine similarity between the keyword embedding and the context window embedding.
- Returns
Trueif the similarity is above thesimilarity_threshold,Falseotherwise.
- Checks if
-
- Yields the
(original_tokens, filtered_keywords)tuple.
-
-
For each chunk, a list of tuples is produced. Each tuple has the form (original_tokens, filtered_keywords), where:
-
original_tokens: A list of all identified tokens (including entities) in the job description, converted to lowercase. -
filtered_keywords: A list of keywords that have passed all filtering steps (n-gram generation, fuzzy matching, semantic validation, stop word removal, length checks).
- spaCy
Docobjects are processed in batches, and their intermediate results (tokens, POS tags, lemmas, embeddings) are stored within theDocobjects themselves. These objects are released after each batch is processed. - The
original_tokensandfiltered_keywordslists are stored in memory for each job description. - The
all_skillsset is stored in memory.
spacy, rapidfuzz, sklearn.feature_extraction.text (for TfidfVectorizer, although it's not directly used in this step, it's part of the overall process), numpy, concurrent.futures (for ProcessPoolExecutor), logging, re.
This is the most computationally intensive step in the pipeline.
-
spaCy Processing (
nlp.pipe): The complexity of spaCy processing depends on the length of the text, the complexity of the pipeline (number of components), and the size of the vocabulary. For theen_core_web_lgmodel, processing a single sentence can take milliseconds to tens of milliseconds, depending on the sentence's length and complexity. Batch processing withnlp.pipesignificantly improves efficiency. -
N-gram Generation (
_generate_ngrams): The number of n-grams grows rapidly with the length of the text and thengram_range. For a text of length n and an n-gram range of (1, 3), the number of n-grams is O(n) for unigrams, O(n) for bigrams, and O(n) for trigrams. The filtering within_generate_ngrams(stop words, short tokens) reduces the number of n-grams. -
Fuzzy Matching (
rapidfuzz.process.extractOne): The complexity of fuzzy matching depends on the algorithm used and the size of the vocabulary (self.all_skills).rapidfuzzis highly optimized, but for a largeall_skillsset, this can still be a significant cost. TheextractOnefunction finds the best match, which is generally faster than finding all matches above a threshold. -
Semantic Validation (
_is_in_context):- Embedding Calculation: Calculating the embedding for the keyword and the context window involves matrix operations within spaCy. The complexity depends on the embedding dimension (which is fixed for a given spaCy model) and the length of the text being embedded.
- Cosine Similarity: Calculating the cosine similarity between two vectors is O(d), where d is the embedding dimension.
-
Parallel Processing (
ProcessPoolExecutor): The use ofProcessPoolExecutordistributes the work across multiple CPU cores, reducing the overall processing time. The speedup depends on the number of cores, the overhead of process creation and communication, and the workload distribution. -
Overall: The complexity of this step is difficult to express with a single Big O notation due to the combination of different operations and the use of parallel processing. However, it's clear that this step is computationally intensive, and its performance is heavily influenced by the length of the job descriptions, the size of the
all_skillsset, thengram_range, thesimilarity_threshold, and the efficiency of the spaCy model andrapidfuzz.
-
keywords: A dictionary mapping job titles to tuples of(original_tokens, filtered_keywords). -
chunk: The original chunk of job descriptions (a dictionary).
-
self._calculate_scores(...)(Continued):-
if term_lower in self.keyword_extractor.all_skills: (Continued)-
score *= weighting.get("whitelist_boost", 1.5): Applies the whitelist boost if the term is in the expanded set of skills.
-
-
job_text = job_descriptions_list[job_index][1]: Retrieves the full job description text. -
section = self.keyword_extractor._detect_keyword_section(term, job_text): Determines the section of the job description where the keyword was found:-
keyword_lower = keyword.lower(): Converts the keyword to lowercase. -
match = re.search(rf"(?i)\b{re.escape(keyword_lower)}\b", text): Performs a case-insensitive, whole-word search for the keyword in the job description text. - If a match is found:
- Iterates through the pre-compiled section heading regular expressions (
self._section_heading_re). - Finds the section heading that appears before the keyword match.
- Returns the name of the section (e.g., "responsibilities", "requirements").
- Iterates through the pre-compiled section heading regular expressions (
- If no match is found or no section heading is found before the keyword, returns "default".
-
-
score *= section_weights.get(section, section_weights.get("default", 1.0)): Applies section-specific weighting. - Creates a dictionary
resultcontaining:-
"Keyword": The term. -
"Job Title": The job title. -
"Score": The calculated score. -
"TF-IDF": The raw TF-IDF value. -
"Frequency": The presence/absence (1 or 0). -
"Category": The category of the keyword, determined byself.keyword_extractor._categorize_term(term). This involves:- Checking if the term is directly present in any of the category term lists (using a cached lookup).
- If not found directly, performing semantic categorization:
- Calculating the term's vector using spaCy.
- Calculating the cosine similarity between the term vector and the centroid vector of each category.
- Assigning the term to the category with the highest similarity score above a threshold.
-
"In Whitelist": A boolean indicating whether the lowercase term is inself.keyword_extractor.all_skills.
-
-
yield result: Yields theresultdictionary.
-
-
df = pd.DataFrame(results): Creates a Pandas DataFrame from the list ofresultdictionaries. - If the DataFrame
dfis empty, returns two empty DataFrames. -
summary_chunk = df.groupby("Keyword").agg({"Score": ["sum", "mean"], "Job Title": "nunique"}): Groups the DataFrame by "Keyword" and calculates:- The sum of the "Score" for each keyword.
- The mean of the "Score" for each keyword.
- The number of unique job titles ("Job Title") associated with each keyword.
-
summary_chunk.columns = ["Total_Score", "Avg_Score", "Job_Count"]: Renames the columns of thesummary_chunkDataFrame. -
details_chunk = df: Assigns the original DataFrame todetails_chunk. -
return summary_chunk, details_chunk: Returns the two DataFrames.
A tuple containing two Pandas DataFrames:
-
summary_chunk: Contains aggregated keyword statistics (Total_Score, Avg_Score, Job_Count). The index of this DataFrame is the "Keyword". -
details_chunk: Contains the individual keyword scores and related information (Job Title, Score, TF-IDF, Frequency, Category, In Whitelist).
- The TF-IDF vectorizer (
self.tfidf_vectorizer) is stored as an attribute of theOptimizedATSobject and reused across chunks. - The
summary_chunkanddetails_chunkDataFrames are stored in memory. - The category vectors (
self.category_vectors) are stored in memory and used by the_categorize_termmethod (which uses@lru_cachefor caching).
sklearn.feature_extraction.text (for TfidfVectorizer), numpy, pandas, re, logging, functools (for lru_cache).
-
TF-IDF Matrix Creation (
_create_tfidf_matrix):- Fitting the vectorizer (done only once): O(n*m), where n is the number of documents (job descriptions) used for fitting (which may be a sample) and m is the number of features (keywords), limited by
max_features. - Transforming the data (done for each chunk): O(c*k), where c is the number of job descriptions in the chunk and k is the number of unique keywords in the chunk.
- Fitting the vectorizer (done only once): O(n*m), where n is the number of documents (job descriptions) used for fitting (which may be a sample) and m is the number of features (keywords), limited by
-
Score Calculation (
_calculate_scores): O(c*k), where c is the number of job descriptions in the chunk and k is the number of unique keywords in the chunk. The operations within the loop (calculating the score, determining the section, categorizing the term) are relatively fast, but they are performed for each keyword in each job description. -
DataFrame Operations:
- Creating the DataFrame: O(c*k).
- Grouping and aggregation (
groupby().agg()): The complexity depends on the number of unique keywords and the efficiency of the Pandasgroupbyimplementation. It's generally faster than iterating through the DataFrame manually.
-
Category determination (
_categorize_term): The complexity depends on whether the term is found in the direct match cache. If not, it calculates the cosine similarity with each category centroid, which is O(number of categories * embedding dimension).
-
batch_idx: The current batch index (integer). -
summary_chunks: A list ofsummary_chunkDataFrames (from previous chunks). -
details_chunks: A list ofdetails_chunkDataFrames (from previous chunks).
-
if not self.config["intermediate_save"]["enabled"]: Checks if intermediate saving is enabled. If not, returns immediately. -
save_interval = self.config["intermediate_save"].get("save_interval", 0): Gets the save interval. -
if save_interval > 0 and (i + 1) % save_interval == 0: Checks if the current chunk index (ifrom the outer loop) is a multiple of thesave_interval. - If the conditions are met:
- Determines the file format (
format_type) and suffix based on the configuration. - Constructs file paths for the summary and details files using
self.working_dir,self.run_id,batch_idx, and the suffix. - Initializes an empty dictionary
checksumsto store checksums. - Defines a nested function
save_and_verifyto handle saving and checksum verification with retries:- Takes the file path, data, save function, append flag, and maximum retries as arguments.
- Uses a
forloop to retry saving up tomax_retriestimes. - Calls the provided
save_functo save the data. - Calculates the checksum of the saved file using
self._calculate_file_checksum. - Verifies the checksum using
self._verify_single_checksum. - If the checksum is valid, returns the checksum.
- If the checksum is invalid or an exception occurs, logs a warning or error and retries.
- If all retries fail, raises a
DataIntegrityError.
- Uses a
try...exceptblock to handle potential errors during saving. - Based on the
format_type:-
"feather":
- Iterates through the
summary_chunksanddetails_chunkslists usingzip. - For the first chunk (
i == 0):- Resets the index of the DataFrames using
reset_index(). - Calls
save_and_verifyto save the summary and details DataFrames usingfeather.write_feather. - Reads and stores the schema of the saved files using
pq.ParquetFile(...).schema_arrowfor subsequent appends.
- Resets the index of the DataFrames using
- For subsequent chunks (
i > 0):- Resets the index of the DataFrames.
- Defines a nested function
append_to_parquetto handle appending data to existing Parquet files:- Takes path, df, append flag, and schema.
- Creates a
pa.Tablefrom the dataframe. - If a schema is provided, it checks for compatibility (basic field count).
- Uses a
pq.ParquetWriterwith ZSTD compression (if available, otherwise SNAPPY) to append the data. - If appending fails, it creates a fallback file.
- Calls
save_and_verifyto save the summary and details chunks usingappend_to_parquet, passing the stored schema.
- Iterates through the
-
"jsonl":
- Iterates through
summary_chunksanddetails_chunks. - For the first chunk (
i == 0):- Calls
save_and_verifywithsrsly.write_jsonlto write the initial files. The data is converted to a generator of dictionaries.
- Calls
- For subsequent chunks (
i > 0):- Calls
save_and_verifywithsrsly.write_jsonl, settingappend=Trueto append to the existing files.
- Calls
- Iterates through
-
"json":
- Concatenates all
summary_chunksinto a single DataFrame usingpd.concat. - Concatenates all
details_chunksinto a single DataFrame. - Calls
save_and_verifyto save the combined DataFrames usingsrsly.write_json. This rewrites the entire file each time.
- Concatenates all
-
"feather":
- Logs a message indicating that intermediate results have been saved.
- Calls
self._save_checksum_manifest(checksums)to save the calculated checksums to a manifest file:- Opens the
self.checksum_manifest_pathfile in append mode ("a"). - Iterates through the
checksumsdictionary. - For each file path and checksum, writes a JSON line to the manifest file using
srsly.write_jsonl.
- Opens the
- Determines the file format (
Intermediate files are written to disk in the specified format (feather, jsonl, or json) in the working_dir. A checksum manifest file (checksums.jsonl) is also written (or appended to) in the working_dir.
The intermediate results are stored on disk.
pathlib, srsly, pyarrow.feather, pyarrow.parquet, pandas, xxhash, time, logging.
I/O bound. The complexity depends on the size of the DataFrames, the chosen file format, and the disk I/O speed. Feather is generally the fastest format for storing Pandas DataFrames. JSON is the slowest, especially since it rewrites the entire file each time. JSONL offers a good balance between speed and readability. The checksum calculation adds a small overhead.
The (summary_chunk, details_chunk) tuple (two Pandas DataFrames) from Step 7.
-
metrics = analyzer._calc_metrics(chunk_results):- Records the start time.
- Calculates recall:
- Creates a set of the original (unexpanded) lowercase skills from the
keyword_categories. - Calculates the intersection between the set of lowercase keywords in the
summary_chunkindex and the set of original skills. - Divides the size of the intersection by the size of the original skills set. Handles the case where the original skills set is empty to avoid division by zero.
- Creates a set of the original (unexpanded) lowercase skills from the
- Calculates time per job:
- Calculates the elapsed time.
- Divides the elapsed time by the number of rows in the
summaryDataFrame. Handles cases wheresummaryis empty or has zero length.
- Gets the current memory usage percentage using
psutil.virtual_memory().percent. - Returns a dictionary:
{"recall": ..., "memory": ..., "time_per_job": ...}.
-
hit_rate = np.mean(list(self.trigram_optim.hit_rates)) if self.trigram_optim.hit_rates else 0: Calculates the average hit rate of the trigram cache. -
new_params = self.tuner.tune_parameters(metrics, hit_rate):-
new_params = {"chunk_size": self._adjust_chunk_size(metrics["memory"])}: Adjusts the chunk size based on memory usage:- If memory usage is above 80%, halves the current chunk size.
- If memory usage is below 60%, doubles the current chunk size.
- Otherwise, keeps the current chunk size.
- Adjusts the
pos_processingstrategy based on recall and trigram hit rate:- If recall is below 0.7:
- If the trigram hit rate is below 0.5, sets
pos_processingto "original". - Otherwise, sets
pos_processingto "hybrid".
- If the trigram hit rate is below 0.5, sets
- If the trigram hit rate is above 0.8 and memory usage is below 60%, sets
pos_processingto "noun_chunks".
- If recall is below 0.7:
- Clamps the
chunk_sizebetweenmin_chunk_sizeandmax_chunk_sizefrom the configuration. - Returns the
new_paramsdictionary.
-
-
self.config.update(new_params): Updates the configuration with the new parameters. -
self.chunker.update_model(self._calc_reward(metrics)):-
reward = ...: Calculates a reward based on the metrics:-
metrics["recall"] * weights["recall"]: Weighted recall. -
- metrics["memory"] / scale * weights["memory"]: Weighted (negative) memory usage. -
- metrics["time_per_job"] * weights["time"]: Weighted (negative) processing time.
-
- Updates the reward history (
self.reward_history). - If the reward history has at least 10 elements:
- Calculates the standard deviation of the rewards.
- Adjusts the learning rate (
self.learning_rate) based on the standard deviation:- If the standard deviation is below
reward_std_low, decreases the learning rate (to stabilize learning). - If the standard deviation is above
reward_std_high, increases the learning rate (to adapt to changes).
- If the standard deviation is below
- Resets the learning rate to the base value if it becomes too small.
- Updates the Q-table (
self.q_table) using the calculated reward and the learning rate. This is a standard Q-learning update rule. - Updates the timestamps for the visited states.
-
-
metrics: A dictionary containing the calculated metrics (recall, memory, time_per_job). - The
self.configdictionary is updated with potentially new values forchunk_sizeandpos_processing. - The
self.chunker.q_tableis updated.
- The updated configuration is stored in the
self.configattribute of theOptimizedATSobject. - The Q-table is stored in the
self.chunker.q_tableattribute (anLRUCache). - The reward history and state history are stored in deques within the
SmartChunkerobject.
time, psutil, numpy, logging, collections (for deque).
-
Metrics Calculation (
_calc_metrics): O(s), where s is the number of unique keywords in thesummary_chunk(for calculating recall). The other calculations (time per job, memory usage) are O(1). -
Parameter Tuning (
tune_parameters): O(1). -
Model Update (
update_model): The complexity depends on the size of the Q-table and the length of the state history, but it's generally fast because the Q-table is implemented using anLRUCache(O(1) average access time).
None (control flow).
- The script continues to iterate through the remaining chunks of job descriptions (outer loop in
analyze_jobs), repeating Steps 6-9 for each chunk. -
gc.collect(): After processing all chunks, explicitly calls the garbage collector to release any unreferenced memory.
None.
N/A
gc.
N/A (for the loop itself). The complexity of gc.collect() depends on the amount of garbage to be collected.
The intermediate files (summary and details files for each batch) stored in the working_dir.
- Determines the number of batches (
batch_count) by checking for the existence of intermediate files. -
analyzer._verify_intermediate_checksums():- Checks if the checksum manifest file (
self.checksum_manifest_path) exists. If not, logs a warning and returns (or raises an exception, depending on the configuration). - Loads the checksums from the manifest file using
srsly.read_jsonl. - Iterates through the stored checksums.
- For each file:
- Checks if the file exists. If not, logs an error and raises a
DataIntegrityError. - Calculates the checksum of the file using
self._calculate_file_checksum. - Compares the calculated checksum with the stored checksum. If they don't match, logs an error and raises a
DataIntegrityError. - If the checksums match, logs a message indicating that the checksum has been verified.
- Checks if the file exists. If not, logs an error and raises a
- If all checksums are verified, logs a message indicating success.
- Checks if the checksum manifest file (
-
loaded_results_generator = analyzer._load_all_intermediate(batch_count):- Determines the file format and suffix based on the configuration.
- Iterates through the batch indices (from 0 to
batch_count- 1). - For each batch:
- Constructs the file paths for the summary and details files.
- Checks if both files exist.
- If both files exist:
- Checks the file sizes. If either file is empty, it logs a warning. If
strict_modeis enabled, it raises aDataIntegrityError. - Based on the
format_type:-
"feather": Reads the files using
pd.read_feather. -
"jsonl": Reads the files using
srsly.read_jsonland converts the result to a Pandas DataFrame. -
"json": Reads the files using
srsly.read_jsonand converts the result to a Pandas DataFrame.
-
"feather": Reads the files using
- Applies consistent data types to columns.
-
yield summary, details: Yields the loaded DataFrames.
- Checks the file sizes. If either file is empty, it logs a warning. If
- If either file is missing: Logs a warning. If
strict_modeisTrue, it raises aFileNotFoundError. - If any errors occur during file reading, it logs an error and yields empty dataframes.
-
final_summary, final_details = analyzer._aggregate_results(loaded_results_generator):- Initializes a
defaultdict(summary_agg) to store aggregated summary statistics. The default factory is a lambda function that returns a dictionary with initial values for "total", "count", and "jobs". - Initializes an empty list (
details_list) to store the detailed results. - Iterates through the
loaded_results_generator(which yields tuples of DataFrames). - For each
(summary_chunk, detail_chunk):- Checks if either DataFrame is empty. If so, logs a warning and continues to the next iteration.
- Iterates through the rows of the
summary_chunkDataFrame usingsummary_chunk.iterrows(). - For each keyword and row:
- Extracts the "Total_Score" and "Job_Count" values, converting them to float and int, respectively. Handles potential
ValueErrororTypeErrorexceptions, logging an error and skipping the row if necessary. - Updates the
summary_aggdictionary:- Adds the
total_scoreto the "total" for the keyword. - Adds the
job_countto the "count" for the keyword. - Updates the "jobs" set for the keyword with the job titles from the
detail_chunkDataFrame.
- Adds the
- Extracts the "Total_Score" and "Job_Count" values, converting them to float and int, respectively. Handles potential
- Extends the
details_listwith the records from thedetail_chunkDataFrame (converted to a list of dictionaries usingdetail_chunk.to_dict("records")). - Releases memory by deleting
summary_chunkanddetail_chunk.
- Creates the final
summary_dfDataFrame from the aggregated data insummary_agg:- Uses
pd.DataFrame.from_dictwithorient="index"to create the DataFrame. - Calculates the "Avg_Score" by dividing "Total_Score" by "Job_Count" (handling potential division by zero).
- Sorts the DataFrame by "Total_Score" in descending order.
- Uses
- Creates the final
details_dfDataFrame from thedetails_list. - Returns the
summary_dfanddetails_dfDataFrames.
- Initializes a
Two Pandas DataFrames:
-
final_summary: The aggregated summary statistics for all keywords across all batches. -
final_details: The detailed keyword scores and related information for all jobs across all batches.
The loaded and aggregated DataFrames are stored in memory.
pathlib, srsly, pyarrow.feather, pandas, collections (for defaultdict), logging, xxhash.
-
Checksum Verification (
_verify_intermediate_checksums): O(b), where b is the number of batches (and thus the number of intermediate files). The checksum calculation for each file is relatively fast. -
Loading Intermediate Results (
_load_all_intermediate): I/O bound. The complexity depends on the number of batches, the size of the intermediate files, and the chosen file format. -
Aggregating Results (
_aggregate_results): O(n), where n is the total number of rows in all intermediate detail DataFrames. The aggregation process involves iterating through the DataFrames and updating thesummary_aggdictionary, which has O(1) average access time.
The final results are saved to an Excel file specified by the -o or --output command-line argument (default: results.xlsx). The script checks available disk space before saving.
The output is an Excel file (.xlsx) containing two sheets:
-
"Summary": This sheet contains the
final_summaryDataFrame. The columns are likely "Total_Score", "Avg_Score", and "Job_Count", with the "Keyword" as the index. -
"Detailed Scores": This sheet contains the
final_detailsDataFrame. The columns are likely "Keyword", "Job Title", "Score", "TF-IDF", "Frequency", "Category", and "In Whitelist".
The output provides a comprehensive report of the keyword analysis, suitable for:
- Identifying the most important skills and requirements for a set of job descriptions.
- Comparing the skills required for different roles.
- Analyzing trends in job requirements.
- Potentially informing the creation of targeted resumes or cover letters.
-
pandas: TheExcelWriterclass is used to write the DataFrames to an Excel file. This likely uses eitheropenpyxlorxlsxwriteras the underlying engine (depending on what's installed). -
logging: Used for logging messages about the save operation. -
shutil: Used for checking disk space (shutil.disk_usage). -
pathlib: Used for creating path to output file.
Project Documentation