diff --git a/app/api/v1/admin/config.py b/app/api/v1/admin/config.py index f0a9c2d0..d719242d 100644 --- a/app/api/v1/admin/config.py +++ b/app/api/v1/admin/config.py @@ -1,13 +1,85 @@ import os +import re from fastapi import APIRouter, Depends, HTTPException from app.core.auth import verify_app_key from app.core.config import config from app.core.storage import get_storage as resolve_storage, LocalStorage, RedisStorage, SQLStorage +from app.core.logger import logger router = APIRouter() +_CFG_CHAR_REPLACEMENTS = str.maketrans( + { + "\u2010": "-", + "\u2011": "-", + "\u2012": "-", + "\u2013": "-", + "\u2014": "-", + "\u2212": "-", + "\u2018": "'", + "\u2019": "'", + "\u201c": '"', + "\u201d": '"', + "\u00a0": " ", + "\u2007": " ", + "\u202f": " ", + "\u200b": "", + "\u200c": "", + "\u200d": "", + "\ufeff": "", + } +) + + +def _sanitize_proxy_text(value, *, remove_all_spaces: bool = False) -> str: + text = "" if value is None else str(value) + text = text.translate(_CFG_CHAR_REPLACEMENTS) + if remove_all_spaces: + text = re.sub(r"\s+", "", text) + else: + text = text.strip() + return text.encode("latin-1", errors="ignore").decode("latin-1") + + +def _sanitize_proxy_config_payload(data: dict) -> dict: + if not isinstance(data, dict): + return data + payload = dict(data) + proxy = payload.get("proxy") + if not isinstance(proxy, dict): + return payload + + sanitized_proxy = dict(proxy) + changed = False + + if "user_agent" in sanitized_proxy: + raw = sanitized_proxy.get("user_agent") + val = _sanitize_proxy_text(raw, remove_all_spaces=False) + if val != raw: + sanitized_proxy["user_agent"] = val + changed = True + + if "cf_cookies" in sanitized_proxy: + raw = sanitized_proxy.get("cf_cookies") + val = _sanitize_proxy_text(raw, remove_all_spaces=False) + if val != raw: + sanitized_proxy["cf_cookies"] = val + changed = True + + if "cf_clearance" in sanitized_proxy: + raw = sanitized_proxy.get("cf_clearance") + val = _sanitize_proxy_text(raw, remove_all_spaces=True) + if val != raw: + sanitized_proxy["cf_clearance"] = val + changed = True + + if changed: + logger.warning("Sanitized proxy config fields before saving") + payload["proxy"] = sanitized_proxy + return payload + @router.get("/verify", dependencies=[Depends(verify_app_key)]) async def admin_verify(): @@ -26,7 +98,7 @@ async def get_config(): async def update_config(data: dict): """更新配置""" try: - await config.update(data) + await config.update(_sanitize_proxy_config_payload(data)) return {"status": "success", "message": "配置已更新"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) diff --git a/app/api/v1/admin/token.py b/app/api/v1/admin/token.py index 6eec136d..d417ee88 100644 --- a/app/api/v1/admin/token.py +++ b/app/api/v1/admin/token.py @@ -1,4 +1,5 @@ import asyncio +import re import orjson from fastapi import APIRouter, Depends, HTTPException, Request @@ -14,6 +15,33 @@ router = APIRouter() +_TOKEN_CHAR_REPLACEMENTS = str.maketrans( + { + "\u2010": "-", + "\u2011": "-", + "\u2012": "-", + "\u2013": "-", + "\u2014": "-", + "\u2212": "-", + "\u00a0": " ", + "\u2007": " ", + "\u202f": " ", + "\u200b": "", + "\u200c": "", + "\u200d": "", + "\ufeff": "", + } +) + + +def _sanitize_token_text(value) -> str: + token = "" if value is None else str(value) + token = token.translate(_TOKEN_CHAR_REPLACEMENTS) + token = re.sub(r"\s+", "", token) + if token.startswith("sso="): + token = token[4:] + return token.encode("ascii", errors="ignore").decode("ascii") + @router.get("/tokens", dependencies=[Depends(verify_app_key)]) async def get_tokens(): @@ -47,8 +75,8 @@ async def update_tokens(data: dict): else: continue raw_token = token_data.get("token") - if isinstance(raw_token, str) and raw_token.startswith("sso="): - token_data["token"] = raw_token[4:] + if raw_token is not None: + token_data["token"] = _sanitize_token_text(raw_token) token_key = token_data.get("token") if isinstance(token_key, str): pool_map[token_key] = token_data @@ -66,8 +94,11 @@ async def update_tokens(data: dict): continue raw_token = token_data.get("token") - if isinstance(raw_token, str) and raw_token.startswith("sso="): - token_data["token"] = raw_token[4:] + if raw_token is not None: + token_data["token"] = _sanitize_token_text(raw_token) + if not token_data.get("token"): + logger.warning(f"Skip empty token in pool '{pool_name}'") + continue base = existing_map.get(pool_name, {}).get( token_data.get("token"), {} diff --git a/app/services/reverse/utils/headers.py b/app/services/reverse/utils/headers.py index dab23462..21c77840 100644 --- a/app/services/reverse/utils/headers.py +++ b/app/services/reverse/utils/headers.py @@ -10,6 +10,52 @@ from app.core.config import get_config from app.services.reverse.utils.statsig import StatsigGenerator +_HEADER_CHAR_REPLACEMENTS = str.maketrans( + { + "\u2010": "-", # hyphen + "\u2011": "-", # non-breaking hyphen + "\u2012": "-", # figure dash + "\u2013": "-", # en dash + "\u2014": "-", # em dash + "\u2212": "-", # minus sign + "\u2018": "'", # left single quote + "\u2019": "'", # right single quote + "\u201c": '"', # left double quote + "\u201d": '"', # right double quote + "\u00a0": " ", # nbsp + "\u2007": " ", # figure space + "\u202f": " ", # narrow nbsp + "\u200b": "", # zero width space + "\u200c": "", # zero width non-joiner + "\u200d": "", # zero width joiner + "\ufeff": "", # bom + } +) + + +def _sanitize_header_value( + value: Optional[str], + *, + field_name: str, + remove_all_spaces: bool = False, +) -> str: + """Normalize header values and make sure they are latin-1 safe.""" + raw = "" if value is None else str(value) + normalized = raw.translate(_HEADER_CHAR_REPLACEMENTS) + if remove_all_spaces: + normalized = re.sub(r"\s+", "", normalized) + else: + normalized = normalized.strip() + + # curl_cffi header encoding defaults to latin-1. + normalized = normalized.encode("latin-1", errors="ignore").decode("latin-1") + + if normalized != raw: + logger.warning( + f"Sanitized header field '{field_name}' (len {len(raw)} -> {len(normalized)})" + ) + return normalized + def build_sso_cookie(sso_token: str) -> str: """ @@ -23,13 +69,22 @@ def build_sso_cookie(sso_token: str) -> str: """ # Format sso_token = sso_token[4:] if sso_token.startswith("sso=") else sso_token + sso_token = _sanitize_header_value( + sso_token, field_name="sso_token", remove_all_spaces=True + ) # SSO Cookie cookie = f"sso={sso_token}; sso-rw={sso_token}" # CF Cookies - cf_cookies = get_config("proxy.cf_cookies") or "" - cf_clearance = (get_config("proxy.cf_clearance") or "").strip() + cf_cookies = _sanitize_header_value( + get_config("proxy.cf_cookies") or "", field_name="proxy.cf_cookies" + ) + cf_clearance = _sanitize_header_value( + get_config("proxy.cf_clearance") or "", + field_name="proxy.cf_clearance", + remove_all_spaces=True, + ) cf_refresh_enabled = bool(get_config("proxy.enabled")) if cf_refresh_enabled: @@ -159,9 +214,12 @@ def build_ws_headers(token: Optional[str] = None, origin: Optional[str] = None, Returns: Dict[str, str]: The headers dictionary. """ - user_agent = get_config("proxy.user_agent") + user_agent = _sanitize_header_value( + get_config("proxy.user_agent"), field_name="proxy.user_agent" + ) + safe_origin = _sanitize_header_value(origin or "https://grok.com", field_name="origin") headers = { - "Origin": origin or "https://grok.com", + "Origin": safe_origin, "User-Agent": user_agent, "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Cache-Control": "no-cache", @@ -194,14 +252,20 @@ def build_headers(cookie_token: str, content_type: Optional[str] = None, origin: Returns: Dict[str, str]: The headers dictionary. """ - user_agent = get_config("proxy.user_agent") + user_agent = _sanitize_header_value( + get_config("proxy.user_agent"), field_name="proxy.user_agent" + ) + safe_origin = _sanitize_header_value(origin or "https://grok.com", field_name="origin") + safe_referer = _sanitize_header_value( + referer or "https://grok.com/", field_name="referer" + ) headers = { "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Baggage": "sentry-environment=production,sentry-release=d6add6fb0460641fd482d767a335ef72b9b6abb8,sentry-public_key=b311e0f2690c81f25e2c4cf6d4f7ce1c", - "Origin": origin or "https://grok.com", + "Origin": safe_origin, "Priority": "u=1, i", - "Referer": referer or "https://grok.com/", + "Referer": safe_referer, "Sec-Fetch-Mode": "cors", "User-Agent": user_agent, } diff --git a/app/services/token/models.py b/app/services/token/models.py index 86300d90..0e8b3b5f 100644 --- a/app/services/token/models.py +++ b/app/services/token/models.py @@ -10,7 +10,7 @@ from enum import Enum from typing import Optional, List -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, field_validator from datetime import datetime @@ -71,6 +71,40 @@ class TokenInfo(BaseModel): note: str = "" last_asset_clear_at: Optional[int] = None + @field_validator("token", mode="before") + @classmethod + def _normalize_token(cls, value): + """Normalize copied tokens to avoid unicode punctuation issues.""" + if value is None: + raise ValueError("token cannot be empty") + token = str(value) + token = token.translate( + str.maketrans( + { + "\u2010": "-", + "\u2011": "-", + "\u2012": "-", + "\u2013": "-", + "\u2014": "-", + "\u2212": "-", + "\u00a0": " ", + "\u2007": " ", + "\u202f": " ", + "\u200b": "", + "\u200c": "", + "\u200d": "", + "\ufeff": "", + } + ) + ) + token = "".join(token.split()) + if token.startswith("sso="): + token = token[4:] + token = token.encode("ascii", errors="ignore").decode("ascii") + if not token: + raise ValueError("token cannot be empty") + return token + def is_available(self) -> bool: """检查是否可用(状态正常且配额 > 0)""" return self.status == TokenStatus.ACTIVE and self.quota > 0