From 45534125ae94635f2a9632abce37a466570a331d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 3 Dec 2025 23:12:00 +0000 Subject: [PATCH 1/3] Initial plan From 7cb49e450ae7e3b4c211b88fa853548d55b7df7e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 3 Dec 2025 23:15:33 +0000 Subject: [PATCH 2/3] Fix whitespace formatting errors (W291, W292, W293) Co-authored-by: VinciGit00 <88108002+VinciGit00@users.noreply.github.com> --- scrapegraphai/graphs/markdownify_graph.py | 2 +- scrapegraphai/helpers/models_tokens.py | 1 - scrapegraphai/models/xai.py | 2 +- scrapegraphai/nodes/markdownify_node.py | 2 +- scrapegraphai/utils/code_error_analysis.py | 50 ++++---- scrapegraphai/utils/code_error_correction.py | 60 ++++----- scrapegraphai/utils/research_web.py | 124 +++++++++---------- 7 files changed, 120 insertions(+), 121 deletions(-) diff --git a/scrapegraphai/graphs/markdownify_graph.py b/scrapegraphai/graphs/markdownify_graph.py index 78d33b12..0bf2804c 100644 --- a/scrapegraphai/graphs/markdownify_graph.py +++ b/scrapegraphai/graphs/markdownify_graph.py @@ -80,4 +80,4 @@ def execute( - Dictionary with the markdown result in the "markdown" key - List of execution logs """ - return super().execute(initial_state) \ No newline at end of file + return super().execute(initial_state) diff --git a/scrapegraphai/helpers/models_tokens.py b/scrapegraphai/helpers/models_tokens.py index c700a6dc..9d531e9a 100644 --- a/scrapegraphai/helpers/models_tokens.py +++ b/scrapegraphai/helpers/models_tokens.py @@ -32,7 +32,6 @@ "o1-preview": 128000, "o1-mini": 128000, "o1": 128000, - "gpt-4.5-preview": 128000, "o3-mini": 200000, }, "azure_openai": { diff --git a/scrapegraphai/models/xai.py b/scrapegraphai/models/xai.py index 86969483..065af3ed 100644 --- a/scrapegraphai/models/xai.py +++ b/scrapegraphai/models/xai.py @@ -19,4 +19,4 @@ def __init__(self, **llm_config): llm_config["openai_api_key"] = llm_config.pop("api_key") llm_config["openai_api_base"] = "https://api.x.ai/v1" - super().__init__(**llm_config) \ No newline at end of file + super().__init__(**llm_config) diff --git a/scrapegraphai/nodes/markdownify_node.py b/scrapegraphai/nodes/markdownify_node.py index 2119908a..da1407a4 100644 --- a/scrapegraphai/nodes/markdownify_node.py +++ b/scrapegraphai/nodes/markdownify_node.py @@ -64,4 +64,4 @@ def execute(self, state: dict) -> dict: # Update state with markdown content state.update({self.output[0]: markdown_content}) - return state \ No newline at end of file + return state diff --git a/scrapegraphai/utils/code_error_analysis.py b/scrapegraphai/utils/code_error_analysis.py index d2c6a42d..e799aff6 100644 --- a/scrapegraphai/utils/code_error_analysis.py +++ b/scrapegraphai/utils/code_error_analysis.py @@ -80,10 +80,10 @@ def validate_validation_errors(cls, v): def get_optimal_analysis_template(error_type: str) -> str: """ Returns the optimal prompt template based on the error type. - + Args: error_type (str): Type of error to analyze. - + Returns: str: The prompt template text. """ @@ -106,10 +106,10 @@ def syntax_focused_analysis(state: Dict[str, Any], llm_model) -> str: Returns: str: The result of the syntax error analysis. - + Raises: InvalidStateError: If state is missing required keys. - + Example: >>> state = { 'generated_code': 'print("Hello World")', @@ -123,24 +123,24 @@ def syntax_focused_analysis(state: Dict[str, Any], llm_model) -> str: generated_code=state.get("generated_code", ""), errors=state.get("errors", {}) ) - + # Check if syntax errors exist if "syntax" not in validated_state.errors: raise InvalidStateError("No syntax errors found in state dictionary") - + # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_analysis_template("syntax"), input_variables=["generated_code", "errors"] ) chain = prompt | llm_model | StrOutputParser() - + # Execute chain with validated state return chain.invoke({ "generated_code": validated_state.generated_code, "errors": validated_state.errors["syntax"] }) - + except KeyError as e: raise InvalidStateError(f"Missing required key in state dictionary: {e}") except Exception as e: @@ -157,10 +157,10 @@ def execution_focused_analysis(state: Dict[str, Any], llm_model) -> str: Returns: str: The result of the execution error analysis. - + Raises: InvalidStateError: If state is missing required keys. - + Example: >>> state = { 'generated_code': 'print(x)', @@ -178,14 +178,14 @@ def execution_focused_analysis(state: Dict[str, Any], llm_model) -> str: html_code=state.get("html_code", ""), html_analysis=state.get("html_analysis", "") ) - + # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_analysis_template("execution"), input_variables=["generated_code", "errors", "html_code", "html_analysis"], ) chain = prompt | llm_model | StrOutputParser() - + # Execute chain with validated state return chain.invoke({ "generated_code": validated_state.generated_code, @@ -193,7 +193,7 @@ def execution_focused_analysis(state: Dict[str, Any], llm_model) -> str: "html_code": validated_state.html_code, "html_analysis": validated_state.html_analysis, }) - + except KeyError as e: raise InvalidStateError(f"Missing required key in state dictionary: {e}") except Exception as e: @@ -211,10 +211,10 @@ def validation_focused_analysis(state: Dict[str, Any], llm_model) -> str: Returns: str: The result of the validation error analysis. - + Raises: InvalidStateError: If state is missing required keys. - + Example: >>> state = { 'generated_code': 'return {"name": "John"}', @@ -232,14 +232,14 @@ def validation_focused_analysis(state: Dict[str, Any], llm_model) -> str: json_schema=state.get("json_schema", {}), execution_result=state.get("execution_result", {}) ) - + # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_analysis_template("validation"), input_variables=["generated_code", "errors", "json_schema", "execution_result"], ) chain = prompt | llm_model | StrOutputParser() - + # Execute chain with validated state return chain.invoke({ "generated_code": validated_state.generated_code, @@ -247,7 +247,7 @@ def validation_focused_analysis(state: Dict[str, Any], llm_model) -> str: "json_schema": validated_state.json_schema, "execution_result": validated_state.execution_result, }) - + except KeyError as e: raise InvalidStateError(f"Missing required key in state dictionary: {e}") except Exception as e: @@ -268,10 +268,10 @@ def semantic_focused_analysis( Returns: str: The result of the semantic error analysis. - + Raises: InvalidStateError: If state or comparison_result is missing required keys. - + Example: >>> state = { 'generated_code': 'def add(a, b): return a + b' @@ -288,28 +288,28 @@ def semantic_focused_analysis( generated_code=state.get("generated_code", ""), errors=state.get("errors", {}) ) - + # Validate comparison_result if "differences" not in comparison_result: raise InvalidStateError("comparison_result missing 'differences' key") if "explanation" not in comparison_result: raise InvalidStateError("comparison_result missing 'explanation' key") - + # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_analysis_template("semantic"), input_variables=["generated_code", "differences", "explanation"], ) chain = prompt | llm_model | StrOutputParser() - + # Execute chain with validated inputs return chain.invoke({ "generated_code": validated_state.generated_code, "differences": json.dumps(comparison_result["differences"], indent=2), "explanation": comparison_result["explanation"], }) - + except KeyError as e: raise InvalidStateError(f"Missing required key: {e}") except Exception as e: - raise AnalysisError(f"Semantic analysis failed: {str(e)}") \ No newline at end of file + raise AnalysisError(f"Semantic analysis failed: {str(e)}") diff --git a/scrapegraphai/utils/code_error_correction.py b/scrapegraphai/utils/code_error_correction.py index 9727c9ad..38ba266e 100644 --- a/scrapegraphai/utils/code_error_correction.py +++ b/scrapegraphai/utils/code_error_correction.py @@ -11,10 +11,10 @@ """ import json -from typing import Any, Dict, Optional +from typing import Any, Dict from functools import lru_cache -from pydantic import BaseModel, Field, validator +from pydantic import BaseModel, Field from langchain_core.prompts import PromptTemplate from langchain_core.output_parsers import StrOutputParser @@ -39,7 +39,7 @@ class InvalidCorrectionStateError(CodeGenerationError): class CorrectionState(BaseModel): """Base model for code correction state validation.""" generated_code: str = Field(..., description="The original generated code to correct") - + class Config: extra = "allow" @@ -60,10 +60,10 @@ def get_optimal_correction_template(error_type: str) -> str: """ Returns the optimal prompt template for code correction based on the error type. Results are cached for performance. - + Args: error_type (str): Type of error to correct. - + Returns: str: The prompt template text. """ @@ -87,10 +87,10 @@ def syntax_focused_code_generation(state: Dict[str, Any], analysis: str, llm_mod Returns: str: The corrected code. - + Raises: InvalidCorrectionStateError: If state is missing required keys. - + Example: >>> state = { 'generated_code': 'print("Hello World"' @@ -103,23 +103,23 @@ def syntax_focused_code_generation(state: Dict[str, Any], analysis: str, llm_mod validated_state = CorrectionState( generated_code=state.get("generated_code", "") ) - + if not analysis or not isinstance(analysis, str): raise InvalidCorrectionStateError("Analysis must be a non-empty string") - + # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_correction_template("syntax"), input_variables=["analysis", "generated_code"], ) chain = prompt | llm_model | StrOutputParser() - + # Execute chain with validated state return chain.invoke({ "analysis": analysis, "generated_code": validated_state.generated_code }) - + except KeyError as e: raise InvalidCorrectionStateError(f"Missing required key in state dictionary: {e}") except Exception as e: @@ -137,10 +137,10 @@ def execution_focused_code_generation(state: Dict[str, Any], analysis: str, llm_ Returns: str: The corrected code. - + Raises: InvalidCorrectionStateError: If state is missing required keys or analysis is invalid. - + Example: >>> state = { 'generated_code': 'print(x)' @@ -153,23 +153,23 @@ def execution_focused_code_generation(state: Dict[str, Any], analysis: str, llm_ validated_state = CorrectionState( generated_code=state.get("generated_code", "") ) - + if not analysis or not isinstance(analysis, str): raise InvalidCorrectionStateError("Analysis must be a non-empty string") - + # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_correction_template("execution"), input_variables=["analysis", "generated_code"], ) chain = prompt | llm_model | StrOutputParser() - + # Execute chain with validated state return chain.invoke({ "analysis": analysis, "generated_code": validated_state.generated_code }) - + except KeyError as e: raise InvalidCorrectionStateError(f"Missing required key in state dictionary: {e}") except Exception as e: @@ -187,10 +187,10 @@ def validation_focused_code_generation(state: Dict[str, Any], analysis: str, llm Returns: str: The corrected code. - + Raises: InvalidCorrectionStateError: If state is missing required keys or analysis is invalid. - + Example: >>> state = { 'generated_code': 'return {"name": "John"}', @@ -205,24 +205,24 @@ def validation_focused_code_generation(state: Dict[str, Any], analysis: str, llm generated_code=state.get("generated_code", ""), json_schema=state.get("json_schema", {}) ) - + if not analysis or not isinstance(analysis, str): raise InvalidCorrectionStateError("Analysis must be a non-empty string") - + # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_correction_template("validation"), input_variables=["analysis", "generated_code", "json_schema"], ) chain = prompt | llm_model | StrOutputParser() - + # Execute chain with validated state return chain.invoke({ "analysis": analysis, "generated_code": validated_state.generated_code, "json_schema": validated_state.json_schema, }) - + except KeyError as e: raise InvalidCorrectionStateError(f"Missing required key in state dictionary: {e}") except Exception as e: @@ -240,10 +240,10 @@ def semantic_focused_code_generation(state: Dict[str, Any], analysis: str, llm_m Returns: str: The corrected code. - + Raises: InvalidCorrectionStateError: If state is missing required keys or analysis is invalid. - + Example: >>> state = { 'generated_code': 'def add(a, b): return a + b', @@ -260,10 +260,10 @@ def semantic_focused_code_generation(state: Dict[str, Any], analysis: str, llm_m execution_result=state.get("execution_result", {}), reference_answer=state.get("reference_answer", {}) ) - + if not analysis or not isinstance(analysis, str): raise InvalidCorrectionStateError("Analysis must be a non-empty string") - + # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_correction_template("semantic"), @@ -275,7 +275,7 @@ def semantic_focused_code_generation(state: Dict[str, Any], analysis: str, llm_m ], ) chain = prompt | llm_model | StrOutputParser() - + # Execute chain with validated state return chain.invoke({ "analysis": analysis, @@ -283,8 +283,8 @@ def semantic_focused_code_generation(state: Dict[str, Any], analysis: str, llm_m "generated_result": json.dumps(validated_state.execution_result, indent=2), "reference_result": json.dumps(validated_state.reference_answer, indent=2), }) - + except KeyError as e: raise InvalidCorrectionStateError(f"Missing required key in state dictionary: {e}") except Exception as e: - raise CodeGenerationError(f"Semantic code generation failed: {str(e)}") \ No newline at end of file + raise CodeGenerationError(f"Semantic code generation failed: {str(e)}") diff --git a/scrapegraphai/utils/research_web.py b/scrapegraphai/utils/research_web.py index 195e11ca..df1b07c9 100644 --- a/scrapegraphai/utils/research_web.py +++ b/scrapegraphai/utils/research_web.py @@ -1,5 +1,5 @@ """ -research_web module for web searching across different search engines with improved +research_web module for web searching across different search engines with improved error handling, validation, and security features. """ @@ -48,7 +48,7 @@ class SearchConfig(BaseModel): serper_api_key: Optional[str] = Field(None, description="API key for Serper") region: Optional[str] = Field(None, description="Country/region code") language: str = Field("en", description="Language code") - + @validator('search_engine') def validate_search_engine(cls, v): """Validate search engine.""" @@ -56,14 +56,14 @@ def validate_search_engine(cls, v): if v.lower() not in valid_engines: raise ValueError(f"Search engine must be one of: {', '.join(valid_engines)}") return v.lower() - + @validator('query') def validate_query(cls, v): """Validate search query.""" if not v or not isinstance(v, str): raise ValueError("Query must be a non-empty string") return v - + @validator('max_results') def validate_max_results(cls, v): """Validate max results.""" @@ -80,17 +80,17 @@ def validate_max_results(cls, v): def rate_limited(calls: int, period: int = 60): """ Decorator to limit the rate of function calls. - + Args: calls (int): Maximum number of calls allowed in the period. period (int): Time period in seconds. - + Returns: Callable: Decorated function with rate limiting. """ min_interval = period / float(calls) last_called = [0.0] - + def decorator(func): @wraps(func) def wrapper(*args, **kwargs): @@ -108,10 +108,10 @@ def wrapper(*args, **kwargs): def sanitize_search_query(query: str) -> str: """ Sanitizes search query to prevent injection attacks. - + Args: query (str): The search query. - + Returns: str: Sanitized query. """ @@ -135,7 +135,7 @@ def sanitize_search_query(query: str) -> str: def get_random_user_agent() -> str: """ Returns a random user agent from the list. - + Returns: str: Random user agent string. """ @@ -167,10 +167,10 @@ def search_on_web( serper_api_key (str): API key for Serper region (str): Country/region code (e.g., 'mx' for Mexico) language (str): Language code (e.g., 'es' for Spanish) - + Returns: List[str]: List of URLs from search results - + Raises: SearchConfigError: If search configuration is invalid SearchRequestError: If search request fails @@ -179,7 +179,7 @@ def search_on_web( try: # Sanitize query for security sanitized_query = sanitize_search_query(query) - + # Validate search configuration config = SearchConfig( query=sanitized_query, @@ -192,12 +192,12 @@ def search_on_web( region=region, language=language ) - + # Format proxy once formatted_proxy = None if config.proxy: formatted_proxy = format_proxy(config.proxy) - + results = [] if config.search_engine == "duckduckgo": # Create a DuckDuckGo search object with max_results @@ -209,25 +209,25 @@ def search_on_web( elif config.search_engine == "bing": results = _search_bing( - config.query, - config.max_results, - config.timeout, + config.query, + config.max_results, + config.timeout, formatted_proxy ) elif config.search_engine == "searxng": results = _search_searxng( - config.query, - config.max_results, - config.port, + config.query, + config.max_results, + config.port, config.timeout ) elif config.search_engine == "serper": results = _search_serper( - config.query, - config.max_results, - config.serper_api_key, + config.query, + config.max_results, + config.serper_api_key, config.timeout ) @@ -246,40 +246,40 @@ def _search_bing( ) -> List[str]: """ Helper function for Bing search with improved error handling. - + Args: query (str): Search query max_results (int): Maximum number of results to return timeout (int): Request timeout in seconds proxy (str, optional): Proxy configuration - + Returns: List[str]: List of URLs from search results """ headers = { "User-Agent": get_random_user_agent() } - + params = { "q": query, "count": max_results } - + proxies = {"http": proxy, "https": proxy} if proxy else None - + try: response = requests.get( - "https://www.bing.com/search", - params=params, - headers=headers, - proxies=proxies, + "https://www.bing.com/search", + params=params, + headers=headers, + proxies=proxies, timeout=timeout ) response.raise_for_status() - + soup = BeautifulSoup(response.text, "html.parser") results = [] - + # Extract URLs from Bing search results for link in soup.select("li.b_algo h2 a"): url = link.get("href") @@ -287,7 +287,7 @@ def _search_bing( results.append(url) if len(results) >= max_results: break - + return results except Exception as e: raise SearchRequestError(f"Bing search failed: {str(e)}") @@ -298,20 +298,20 @@ def _search_searxng( ) -> List[str]: """ Helper function for SearXNG search. - + Args: query (str): Search query max_results (int): Maximum number of results to return port (int): Port for SearXNG timeout (int): Request timeout in seconds - + Returns: List[str]: List of URLs from search results """ headers = { "User-Agent": get_random_user_agent() } - + params = { "q": query, "format": "json", @@ -321,7 +321,7 @@ def _search_searxng( "engines": "duckduckgo,bing,brave", "results": max_results } - + try: response = requests.get( f"http://localhost:{port}/search", @@ -330,7 +330,7 @@ def _search_searxng( timeout=timeout ) response.raise_for_status() - + json_data = response.json() results = [result["url"] for result in json_data.get("results", [])] return results[:max_results] @@ -343,29 +343,29 @@ def _search_serper( ) -> List[str]: """ Helper function for Serper search. - + Args: query (str): Search query max_results (int): Maximum number of results to return api_key (str): API key for Serper timeout (int): Request timeout in seconds - + Returns: List[str]: List of URLs from search results """ if not api_key: raise SearchConfigError("Serper API key is required") - + headers = { "X-API-KEY": api_key, "Content-Type": "application/json" } - + data = { "q": query, "num": max_results } - + try: response = requests.post( "https://google.serper.dev/search", @@ -374,17 +374,17 @@ def _search_serper( timeout=timeout ) response.raise_for_status() - + json_data = response.json() results = [] - + # Extract organic search results for item in json_data.get("organic", []): if "link" in item: results.append(item["link"]) if len(results) >= max_results: break - + return results except Exception as e: raise SearchRequestError(f"Serper search failed: {str(e)}") @@ -393,34 +393,34 @@ def _search_serper( def format_proxy(proxy_config: Union[str, Dict, ProxyConfig]) -> str: """ Format proxy configuration into a string. - + Args: proxy_config: Proxy configuration as string, dict, or ProxyConfig - + Returns: str: Formatted proxy string """ if isinstance(proxy_config, str): return proxy_config - + if isinstance(proxy_config, dict): proxy_config = ProxyConfig(**proxy_config) - + # Format proxy with authentication if provided if proxy_config.username and proxy_config.password: auth = f"{proxy_config.username}:{proxy_config.password}@" return f"http://{auth}{proxy_config.server}" - + return f"http://{proxy_config.server}" def filter_pdf_links(urls: List[str]) -> List[str]: """ Filter out PDF links from search results. - + Args: urls (List[str]): List of URLs - + Returns: List[str]: Filtered list of URLs without PDFs """ @@ -430,28 +430,28 @@ def filter_pdf_links(urls: List[str]) -> List[str]: def verify_request_signature(request_data: Dict, signature: str, secret_key: str) -> bool: """ Verify the signature of an incoming request. - + Args: request_data (Dict): Request data to verify signature (str): Provided signature secret_key (str): Secret key for verification - + Returns: bool: True if signature is valid, False otherwise """ import hmac import hashlib import json - + # Sort keys for consistent serialization data_string = json.dumps(request_data, sort_keys=True) - + # Create HMAC signature computed_signature = hmac.new( secret_key.encode(), data_string.encode(), hashlib.sha256 ).hexdigest() - + # Compare signatures using constant-time comparison to prevent timing attacks - return hmac.compare_digest(computed_signature, signature) \ No newline at end of file + return hmac.compare_digest(computed_signature, signature) From 6deac76bec3fd304ce1f4e8052725614bb23a258 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 3 Dec 2025 23:17:37 +0000 Subject: [PATCH 3/3] Apply black and isort formatting to modified files Co-authored-by: VinciGit00 <88108002+VinciGit00@users.noreply.github.com> --- scrapegraphai/graphs/markdownify_graph.py | 4 +- scrapegraphai/models/xai.py | 1 + scrapegraphai/utils/code_error_analysis.py | 90 +++++++++------- scrapegraphai/utils/code_error_correction.py | 95 +++++++++++------ scrapegraphai/utils/research_web.py | 103 +++++++++---------- 5 files changed, 166 insertions(+), 127 deletions(-) diff --git a/scrapegraphai/graphs/markdownify_graph.py b/scrapegraphai/graphs/markdownify_graph.py index 0bf2804c..7decb259 100644 --- a/scrapegraphai/graphs/markdownify_graph.py +++ b/scrapegraphai/graphs/markdownify_graph.py @@ -64,9 +64,7 @@ def __init__( graph_name="Markdownify", ) - def execute( - self, initial_state: Dict - ) -> Tuple[Dict, List[Dict]]: + def execute(self, initial_state: Dict) -> Tuple[Dict, List[Dict]]: """ Execute the markdownify graph. diff --git a/scrapegraphai/models/xai.py b/scrapegraphai/models/xai.py index 065af3ed..64fc79b7 100644 --- a/scrapegraphai/models/xai.py +++ b/scrapegraphai/models/xai.py @@ -1,6 +1,7 @@ """ xAI Grok Module """ + from langchain_openai import ChatOpenAI diff --git a/scrapegraphai/utils/code_error_analysis.py b/scrapegraphai/utils/code_error_analysis.py index e799aff6..7a496f42 100644 --- a/scrapegraphai/utils/code_error_analysis.py +++ b/scrapegraphai/utils/code_error_analysis.py @@ -14,9 +14,9 @@ import json from typing import Any, Dict, Optional -from pydantic import BaseModel, Field, validator -from langchain_core.prompts import PromptTemplate from langchain_core.output_parsers import StrOutputParser +from langchain_core.prompts import PromptTemplate +from pydantic import BaseModel, Field, validator from ..prompts import ( TEMPLATE_EXECUTION_ANALYSIS, @@ -28,20 +28,25 @@ class AnalysisError(Exception): """Base exception for code analysis errors.""" + pass class InvalidStateError(AnalysisError): """Exception raised when state dictionary is missing required keys.""" + pass class CodeAnalysisState(BaseModel): """Base model for code analysis state validation.""" + generated_code: str = Field(..., description="The generated code to analyze") - errors: Dict[str, Any] = Field(..., description="Dictionary containing error information") + errors: Dict[str, Any] = Field( + ..., description="Dictionary containing error information" + ) - @validator('errors') + @validator("errors") def validate_errors(cls, v): """Ensure errors dictionary has expected structure.""" if not isinstance(v, dict): @@ -51,28 +56,30 @@ def validate_errors(cls, v): class ExecutionAnalysisState(CodeAnalysisState): """Model for execution analysis state validation.""" + html_code: Optional[str] = Field(None, description="HTML code if available") html_analysis: Optional[str] = Field(None, description="Analysis of HTML code") - @validator('errors') + @validator("errors") def validate_execution_errors(cls, v): """Ensure errors dictionary contains execution key.""" super().validate_errors(v) - if 'execution' not in v: + if "execution" not in v: raise ValueError("errors dictionary must contain 'execution' key") return v class ValidationAnalysisState(CodeAnalysisState): """Model for validation analysis state validation.""" + json_schema: Dict[str, Any] = Field(..., description="JSON schema for validation") execution_result: Any = Field(..., description="Result of code execution") - @validator('errors') + @validator("errors") def validate_validation_errors(cls, v): """Ensure errors dictionary contains validation key.""" super().validate_errors(v) - if 'validation' not in v: + if "validation" not in v: raise ValueError("errors dictionary must contain 'validation' key") return v @@ -121,7 +128,7 @@ def syntax_focused_analysis(state: Dict[str, Any], llm_model) -> str: # Validate state using Pydantic model validated_state = CodeAnalysisState( generated_code=state.get("generated_code", ""), - errors=state.get("errors", {}) + errors=state.get("errors", {}), ) # Check if syntax errors exist @@ -131,15 +138,17 @@ def syntax_focused_analysis(state: Dict[str, Any], llm_model) -> str: # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_analysis_template("syntax"), - input_variables=["generated_code", "errors"] + input_variables=["generated_code", "errors"], ) chain = prompt | llm_model | StrOutputParser() # Execute chain with validated state - return chain.invoke({ - "generated_code": validated_state.generated_code, - "errors": validated_state.errors["syntax"] - }) + return chain.invoke( + { + "generated_code": validated_state.generated_code, + "errors": validated_state.errors["syntax"], + } + ) except KeyError as e: raise InvalidStateError(f"Missing required key in state dictionary: {e}") @@ -176,7 +185,7 @@ def execution_focused_analysis(state: Dict[str, Any], llm_model) -> str: generated_code=state.get("generated_code", ""), errors=state.get("errors", {}), html_code=state.get("html_code", ""), - html_analysis=state.get("html_analysis", "") + html_analysis=state.get("html_analysis", ""), ) # Create prompt template and chain @@ -187,12 +196,14 @@ def execution_focused_analysis(state: Dict[str, Any], llm_model) -> str: chain = prompt | llm_model | StrOutputParser() # Execute chain with validated state - return chain.invoke({ - "generated_code": validated_state.generated_code, - "errors": validated_state.errors["execution"], - "html_code": validated_state.html_code, - "html_analysis": validated_state.html_analysis, - }) + return chain.invoke( + { + "generated_code": validated_state.generated_code, + "errors": validated_state.errors["execution"], + "html_code": validated_state.html_code, + "html_analysis": validated_state.html_analysis, + } + ) except KeyError as e: raise InvalidStateError(f"Missing required key in state dictionary: {e}") @@ -230,23 +241,30 @@ def validation_focused_analysis(state: Dict[str, Any], llm_model) -> str: generated_code=state.get("generated_code", ""), errors=state.get("errors", {}), json_schema=state.get("json_schema", {}), - execution_result=state.get("execution_result", {}) + execution_result=state.get("execution_result", {}), ) # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_analysis_template("validation"), - input_variables=["generated_code", "errors", "json_schema", "execution_result"], + input_variables=[ + "generated_code", + "errors", + "json_schema", + "execution_result", + ], ) chain = prompt | llm_model | StrOutputParser() # Execute chain with validated state - return chain.invoke({ - "generated_code": validated_state.generated_code, - "errors": validated_state.errors["validation"], - "json_schema": validated_state.json_schema, - "execution_result": validated_state.execution_result, - }) + return chain.invoke( + { + "generated_code": validated_state.generated_code, + "errors": validated_state.errors["validation"], + "json_schema": validated_state.json_schema, + "execution_result": validated_state.execution_result, + } + ) except KeyError as e: raise InvalidStateError(f"Missing required key in state dictionary: {e}") @@ -286,7 +304,7 @@ def semantic_focused_analysis( # Validate state using Pydantic model validated_state = CodeAnalysisState( generated_code=state.get("generated_code", ""), - errors=state.get("errors", {}) + errors=state.get("errors", {}), ) # Validate comparison_result @@ -303,11 +321,13 @@ def semantic_focused_analysis( chain = prompt | llm_model | StrOutputParser() # Execute chain with validated inputs - return chain.invoke({ - "generated_code": validated_state.generated_code, - "differences": json.dumps(comparison_result["differences"], indent=2), - "explanation": comparison_result["explanation"], - }) + return chain.invoke( + { + "generated_code": validated_state.generated_code, + "differences": json.dumps(comparison_result["differences"], indent=2), + "explanation": comparison_result["explanation"], + } + ) except KeyError as e: raise InvalidStateError(f"Missing required key: {e}") diff --git a/scrapegraphai/utils/code_error_correction.py b/scrapegraphai/utils/code_error_correction.py index 38ba266e..ac969f87 100644 --- a/scrapegraphai/utils/code_error_correction.py +++ b/scrapegraphai/utils/code_error_correction.py @@ -11,12 +11,12 @@ """ import json -from typing import Any, Dict from functools import lru_cache +from typing import Any, Dict -from pydantic import BaseModel, Field -from langchain_core.prompts import PromptTemplate from langchain_core.output_parsers import StrOutputParser +from langchain_core.prompts import PromptTemplate +from pydantic import BaseModel, Field from ..prompts import ( TEMPLATE_EXECUTION_CODE_GENERATION, @@ -28,17 +28,22 @@ class CodeGenerationError(Exception): """Base exception for code generation errors.""" + pass class InvalidCorrectionStateError(CodeGenerationError): """Exception raised when state dictionary is missing required keys.""" + pass class CorrectionState(BaseModel): """Base model for code correction state validation.""" - generated_code: str = Field(..., description="The original generated code to correct") + + generated_code: str = Field( + ..., description="The original generated code to correct" + ) class Config: extra = "allow" @@ -46,11 +51,13 @@ class Config: class ValidationCorrectionState(CorrectionState): """Model for validation correction state validation.""" + json_schema: Dict[str, Any] = Field(..., description="JSON schema for validation") class SemanticCorrectionState(CorrectionState): """Model for semantic correction state validation.""" + execution_result: Any = Field(..., description="Result of code execution") reference_answer: Any = Field(..., description="Reference answer for comparison") @@ -76,7 +83,9 @@ def get_optimal_correction_template(error_type: str) -> str: return template_registry.get(error_type, TEMPLATE_SYNTAX_CODE_GENERATION) -def syntax_focused_code_generation(state: Dict[str, Any], analysis: str, llm_model) -> str: +def syntax_focused_code_generation( + state: Dict[str, Any], analysis: str, llm_model +) -> str: """ Generates corrected code based on syntax error analysis. @@ -115,18 +124,21 @@ def syntax_focused_code_generation(state: Dict[str, Any], analysis: str, llm_mod chain = prompt | llm_model | StrOutputParser() # Execute chain with validated state - return chain.invoke({ - "analysis": analysis, - "generated_code": validated_state.generated_code - }) + return chain.invoke( + {"analysis": analysis, "generated_code": validated_state.generated_code} + ) except KeyError as e: - raise InvalidCorrectionStateError(f"Missing required key in state dictionary: {e}") + raise InvalidCorrectionStateError( + f"Missing required key in state dictionary: {e}" + ) except Exception as e: raise CodeGenerationError(f"Syntax code generation failed: {str(e)}") -def execution_focused_code_generation(state: Dict[str, Any], analysis: str, llm_model) -> str: +def execution_focused_code_generation( + state: Dict[str, Any], analysis: str, llm_model +) -> str: """ Generates corrected code based on execution error analysis. @@ -165,18 +177,21 @@ def execution_focused_code_generation(state: Dict[str, Any], analysis: str, llm_ chain = prompt | llm_model | StrOutputParser() # Execute chain with validated state - return chain.invoke({ - "analysis": analysis, - "generated_code": validated_state.generated_code - }) + return chain.invoke( + {"analysis": analysis, "generated_code": validated_state.generated_code} + ) except KeyError as e: - raise InvalidCorrectionStateError(f"Missing required key in state dictionary: {e}") + raise InvalidCorrectionStateError( + f"Missing required key in state dictionary: {e}" + ) except Exception as e: raise CodeGenerationError(f"Execution code generation failed: {str(e)}") -def validation_focused_code_generation(state: Dict[str, Any], analysis: str, llm_model) -> str: +def validation_focused_code_generation( + state: Dict[str, Any], analysis: str, llm_model +) -> str: """ Generates corrected code based on validation error analysis. @@ -203,7 +218,7 @@ def validation_focused_code_generation(state: Dict[str, Any], analysis: str, llm # Validate state using Pydantic model validated_state = ValidationCorrectionState( generated_code=state.get("generated_code", ""), - json_schema=state.get("json_schema", {}) + json_schema=state.get("json_schema", {}), ) if not analysis or not isinstance(analysis, str): @@ -217,19 +232,25 @@ def validation_focused_code_generation(state: Dict[str, Any], analysis: str, llm chain = prompt | llm_model | StrOutputParser() # Execute chain with validated state - return chain.invoke({ - "analysis": analysis, - "generated_code": validated_state.generated_code, - "json_schema": validated_state.json_schema, - }) + return chain.invoke( + { + "analysis": analysis, + "generated_code": validated_state.generated_code, + "json_schema": validated_state.json_schema, + } + ) except KeyError as e: - raise InvalidCorrectionStateError(f"Missing required key in state dictionary: {e}") + raise InvalidCorrectionStateError( + f"Missing required key in state dictionary: {e}" + ) except Exception as e: raise CodeGenerationError(f"Validation code generation failed: {str(e)}") -def semantic_focused_code_generation(state: Dict[str, Any], analysis: str, llm_model) -> str: +def semantic_focused_code_generation( + state: Dict[str, Any], analysis: str, llm_model +) -> str: """ Generates corrected code based on semantic error analysis. @@ -258,7 +279,7 @@ def semantic_focused_code_generation(state: Dict[str, Any], analysis: str, llm_m validated_state = SemanticCorrectionState( generated_code=state.get("generated_code", ""), execution_result=state.get("execution_result", {}), - reference_answer=state.get("reference_answer", {}) + reference_answer=state.get("reference_answer", {}), ) if not analysis or not isinstance(analysis, str): @@ -277,14 +298,22 @@ def semantic_focused_code_generation(state: Dict[str, Any], analysis: str, llm_m chain = prompt | llm_model | StrOutputParser() # Execute chain with validated state - return chain.invoke({ - "analysis": analysis, - "generated_code": validated_state.generated_code, - "generated_result": json.dumps(validated_state.execution_result, indent=2), - "reference_result": json.dumps(validated_state.reference_answer, indent=2), - }) + return chain.invoke( + { + "analysis": analysis, + "generated_code": validated_state.generated_code, + "generated_result": json.dumps( + validated_state.execution_result, indent=2 + ), + "reference_result": json.dumps( + validated_state.reference_answer, indent=2 + ), + } + ) except KeyError as e: - raise InvalidCorrectionStateError(f"Missing required key in state dictionary: {e}") + raise InvalidCorrectionStateError( + f"Missing required key in state dictionary: {e}" + ) except Exception as e: raise CodeGenerationError(f"Semantic code generation failed: {str(e)}") diff --git a/scrapegraphai/utils/research_web.py b/scrapegraphai/utils/research_web.py index df1b07c9..d633084d 100644 --- a/scrapegraphai/utils/research_web.py +++ b/scrapegraphai/utils/research_web.py @@ -3,68 +3,81 @@ error handling, validation, and security features. """ -import re import random +import re import time -from typing import List, Dict, Union, Optional from functools import wraps +from typing import Dict, List, Optional, Union import requests from bs4 import BeautifulSoup -from pydantic import BaseModel, Field, validator from langchain_community.tools import DuckDuckGoSearchResults +from pydantic import BaseModel, Field, validator class ResearchWebError(Exception): """Base exception for research web errors.""" + pass class SearchConfigError(ResearchWebError): """Exception raised when search configuration is invalid.""" + pass class SearchRequestError(ResearchWebError): """Exception raised when search request fails.""" + pass class ProxyConfig(BaseModel): """Model for proxy configuration validation.""" + server: str = Field(..., description="Proxy server address including port") - username: Optional[str] = Field(None, description="Username for proxy authentication") - password: Optional[str] = Field(None, description="Password for proxy authentication") + username: Optional[str] = Field( + None, description="Username for proxy authentication" + ) + password: Optional[str] = Field( + None, description="Password for proxy authentication" + ) class SearchConfig(BaseModel): """Model for search configuration validation.""" + query: str = Field(..., description="Search query") search_engine: str = Field("duckduckgo", description="Search engine to use") max_results: int = Field(10, description="Maximum number of results to return") port: Optional[int] = Field(8080, description="Port for SearXNG") timeout: int = Field(10, description="Request timeout in seconds") - proxy: Optional[Union[str, Dict, ProxyConfig]] = Field(None, description="Proxy configuration") + proxy: Optional[Union[str, Dict, ProxyConfig]] = Field( + None, description="Proxy configuration" + ) serper_api_key: Optional[str] = Field(None, description="API key for Serper") region: Optional[str] = Field(None, description="Country/region code") language: str = Field("en", description="Language code") - @validator('search_engine') + @validator("search_engine") def validate_search_engine(cls, v): """Validate search engine.""" valid_engines = {"duckduckgo", "bing", "searxng", "serper"} if v.lower() not in valid_engines: - raise ValueError(f"Search engine must be one of: {', '.join(valid_engines)}") + raise ValueError( + f"Search engine must be one of: {', '.join(valid_engines)}" + ) return v.lower() - @validator('query') + @validator("query") def validate_query(cls, v): """Validate search query.""" if not v or not isinstance(v, str): raise ValueError("Query must be a non-empty string") return v - @validator('max_results') + @validator("max_results") def validate_max_results(cls, v): """Validate max results.""" if v < 1 or v > 100: @@ -73,7 +86,7 @@ def validate_max_results(cls, v): # Define advanced PDF detection regex -PDF_REGEX = re.compile(r'\.pdf(#.*)?(\?.*)?$', re.IGNORECASE) +PDF_REGEX = re.compile(r"\.pdf(#.*)?(\?.*)?$", re.IGNORECASE) # Rate limiting decorator @@ -101,7 +114,9 @@ def wrapper(*args, **kwargs): result = func(*args, **kwargs) last_called[0] = time.time() return result + return wrapper + return decorator @@ -116,7 +131,7 @@ def sanitize_search_query(query: str) -> str: str: Sanitized query. """ # Remove potential command injection characters - sanitized = re.sub(r'[;&|`$()\[\]{}<>]', '', query) + sanitized = re.sub(r"[;&|`$()\[\]{}<>]", "", query) # Trim whitespace sanitized = sanitized.strip() return sanitized @@ -128,7 +143,7 @@ def sanitize_search_query(query: str) -> str: "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36", - "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1" + "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1", ] @@ -190,7 +205,7 @@ def search_on_web( proxy=proxy, serper_api_key=serper_api_key, region=region, - language=language + language=language, ) # Format proxy once @@ -209,26 +224,17 @@ def search_on_web( elif config.search_engine == "bing": results = _search_bing( - config.query, - config.max_results, - config.timeout, - formatted_proxy + config.query, config.max_results, config.timeout, formatted_proxy ) elif config.search_engine == "searxng": results = _search_searxng( - config.query, - config.max_results, - config.port, - config.timeout + config.query, config.max_results, config.port, config.timeout ) elif config.search_engine == "serper": results = _search_serper( - config.query, - config.max_results, - config.serper_api_key, - config.timeout + config.query, config.max_results, config.serper_api_key, config.timeout ) return filter_pdf_links(results) @@ -256,14 +262,9 @@ def _search_bing( Returns: List[str]: List of URLs from search results """ - headers = { - "User-Agent": get_random_user_agent() - } + headers = {"User-Agent": get_random_user_agent()} - params = { - "q": query, - "count": max_results - } + params = {"q": query, "count": max_results} proxies = {"http": proxy, "https": proxy} if proxy else None @@ -273,7 +274,7 @@ def _search_bing( params=params, headers=headers, proxies=proxies, - timeout=timeout + timeout=timeout, ) response.raise_for_status() @@ -293,9 +294,7 @@ def _search_bing( raise SearchRequestError(f"Bing search failed: {str(e)}") -def _search_searxng( - query: str, max_results: int, port: int, timeout: int -) -> List[str]: +def _search_searxng(query: str, max_results: int, port: int, timeout: int) -> List[str]: """ Helper function for SearXNG search. @@ -308,9 +307,7 @@ def _search_searxng( Returns: List[str]: List of URLs from search results """ - headers = { - "User-Agent": get_random_user_agent() - } + headers = {"User-Agent": get_random_user_agent()} params = { "q": query, @@ -319,7 +316,7 @@ def _search_searxng( "language": "en", "time_range": "", "engines": "duckduckgo,bing,brave", - "results": max_results + "results": max_results, } try: @@ -327,7 +324,7 @@ def _search_searxng( f"http://localhost:{port}/search", params=params, headers=headers, - timeout=timeout + timeout=timeout, ) response.raise_for_status() @@ -356,22 +353,16 @@ def _search_serper( if not api_key: raise SearchConfigError("Serper API key is required") - headers = { - "X-API-KEY": api_key, - "Content-Type": "application/json" - } + headers = {"X-API-KEY": api_key, "Content-Type": "application/json"} - data = { - "q": query, - "num": max_results - } + data = {"q": query, "num": max_results} try: response = requests.post( "https://google.serper.dev/search", json=data, headers=headers, - timeout=timeout + timeout=timeout, ) response.raise_for_status() @@ -427,7 +418,9 @@ def filter_pdf_links(urls: List[str]) -> List[str]: return [url for url in urls if not PDF_REGEX.search(url)] -def verify_request_signature(request_data: Dict, signature: str, secret_key: str) -> bool: +def verify_request_signature( + request_data: Dict, signature: str, secret_key: str +) -> bool: """ Verify the signature of an incoming request. @@ -439,8 +432,8 @@ def verify_request_signature(request_data: Dict, signature: str, secret_key: str Returns: bool: True if signature is valid, False otherwise """ - import hmac import hashlib + import hmac import json # Sort keys for consistent serialization @@ -448,9 +441,7 @@ def verify_request_signature(request_data: Dict, signature: str, secret_key: str # Create HMAC signature computed_signature = hmac.new( - secret_key.encode(), - data_string.encode(), - hashlib.sha256 + secret_key.encode(), data_string.encode(), hashlib.sha256 ).hexdigest() # Compare signatures using constant-time comparison to prevent timing attacks