Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion app/api/v1/admin/config.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,85 @@
import os
import re

from fastapi import APIRouter, Depends, HTTPException

from app.core.auth import verify_app_key
from app.core.config import config
from app.core.storage import get_storage as resolve_storage, LocalStorage, RedisStorage, SQLStorage
from app.core.logger import logger

router = APIRouter()

_CFG_CHAR_REPLACEMENTS = str.maketrans(
{
"\u2010": "-",
"\u2011": "-",
"\u2012": "-",
"\u2013": "-",
"\u2014": "-",
"\u2212": "-",
"\u2018": "'",
"\u2019": "'",
"\u201c": '"',
"\u201d": '"',
"\u00a0": " ",
"\u2007": " ",
"\u202f": " ",
"\u200b": "",
"\u200c": "",
"\u200d": "",
"\ufeff": "",
}
)


def _sanitize_proxy_text(value, *, remove_all_spaces: bool = False) -> str:
text = "" if value is None else str(value)
text = text.translate(_CFG_CHAR_REPLACEMENTS)
if remove_all_spaces:
text = re.sub(r"\s+", "", text)
else:
text = text.strip()
return text.encode("latin-1", errors="ignore").decode("latin-1")


def _sanitize_proxy_config_payload(data: dict) -> dict:
if not isinstance(data, dict):
return data
payload = dict(data)
proxy = payload.get("proxy")
if not isinstance(proxy, dict):
return payload

sanitized_proxy = dict(proxy)
changed = False

if "user_agent" in sanitized_proxy:
raw = sanitized_proxy.get("user_agent")
val = _sanitize_proxy_text(raw, remove_all_spaces=False)
if val != raw:
sanitized_proxy["user_agent"] = val
changed = True

if "cf_cookies" in sanitized_proxy:
raw = sanitized_proxy.get("cf_cookies")
val = _sanitize_proxy_text(raw, remove_all_spaces=False)
if val != raw:
sanitized_proxy["cf_cookies"] = val
changed = True

if "cf_clearance" in sanitized_proxy:
raw = sanitized_proxy.get("cf_clearance")
val = _sanitize_proxy_text(raw, remove_all_spaces=True)
if val != raw:
sanitized_proxy["cf_clearance"] = val
changed = True

if changed:
logger.warning("Sanitized proxy config fields before saving")
payload["proxy"] = sanitized_proxy
return payload


@router.get("/verify", dependencies=[Depends(verify_app_key)])
async def admin_verify():
Expand All @@ -26,7 +98,7 @@ async def get_config():
async def update_config(data: dict):
"""更新配置"""
try:
await config.update(data)
await config.update(_sanitize_proxy_config_payload(data))
return {"status": "success", "message": "配置已更新"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Expand Down
39 changes: 35 additions & 4 deletions app/api/v1/admin/token.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import re

import orjson
from fastapi import APIRouter, Depends, HTTPException, Request
Expand All @@ -14,6 +15,33 @@

router = APIRouter()

_TOKEN_CHAR_REPLACEMENTS = str.maketrans(
{
"\u2010": "-",
"\u2011": "-",
"\u2012": "-",
"\u2013": "-",
"\u2014": "-",
"\u2212": "-",
"\u00a0": " ",
"\u2007": " ",
"\u202f": " ",
"\u200b": "",
"\u200c": "",
"\u200d": "",
"\ufeff": "",
}
)


def _sanitize_token_text(value) -> str:
token = "" if value is None else str(value)
token = token.translate(_TOKEN_CHAR_REPLACEMENTS)
token = re.sub(r"\s+", "", token)
if token.startswith("sso="):
token = token[4:]
return token.encode("ascii", errors="ignore").decode("ascii")


@router.get("/tokens", dependencies=[Depends(verify_app_key)])
async def get_tokens():
Expand Down Expand Up @@ -47,8 +75,8 @@ async def update_tokens(data: dict):
else:
continue
raw_token = token_data.get("token")
if isinstance(raw_token, str) and raw_token.startswith("sso="):
token_data["token"] = raw_token[4:]
if raw_token is not None:
token_data["token"] = _sanitize_token_text(raw_token)
token_key = token_data.get("token")
if isinstance(token_key, str):
pool_map[token_key] = token_data
Expand All @@ -66,8 +94,11 @@ async def update_tokens(data: dict):
continue

raw_token = token_data.get("token")
if isinstance(raw_token, str) and raw_token.startswith("sso="):
token_data["token"] = raw_token[4:]
if raw_token is not None:
token_data["token"] = _sanitize_token_text(raw_token)
if not token_data.get("token"):
logger.warning(f"Skip empty token in pool '{pool_name}'")
continue

base = existing_map.get(pool_name, {}).get(
token_data.get("token"), {}
Expand Down
78 changes: 71 additions & 7 deletions app/services/reverse/utils/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,52 @@
from app.core.config import get_config
from app.services.reverse.utils.statsig import StatsigGenerator

_HEADER_CHAR_REPLACEMENTS = str.maketrans(
{
"\u2010": "-", # hyphen
"\u2011": "-", # non-breaking hyphen
"\u2012": "-", # figure dash
"\u2013": "-", # en dash
"\u2014": "-", # em dash
"\u2212": "-", # minus sign
"\u2018": "'", # left single quote
"\u2019": "'", # right single quote
"\u201c": '"', # left double quote
"\u201d": '"', # right double quote
"\u00a0": " ", # nbsp
"\u2007": " ", # figure space
"\u202f": " ", # narrow nbsp
"\u200b": "", # zero width space
"\u200c": "", # zero width non-joiner
"\u200d": "", # zero width joiner
"\ufeff": "", # bom
}
)


def _sanitize_header_value(
value: Optional[str],
*,
field_name: str,
remove_all_spaces: bool = False,
) -> str:
"""Normalize header values and make sure they are latin-1 safe."""
raw = "" if value is None else str(value)
normalized = raw.translate(_HEADER_CHAR_REPLACEMENTS)
if remove_all_spaces:
normalized = re.sub(r"\s+", "", normalized)
else:
normalized = normalized.strip()

# curl_cffi header encoding defaults to latin-1.
normalized = normalized.encode("latin-1", errors="ignore").decode("latin-1")

if normalized != raw:
logger.warning(
f"Sanitized header field '{field_name}' (len {len(raw)} -> {len(normalized)})"
)
return normalized


def build_sso_cookie(sso_token: str) -> str:
"""
Expand All @@ -23,13 +69,22 @@ def build_sso_cookie(sso_token: str) -> str:
"""
# Format
sso_token = sso_token[4:] if sso_token.startswith("sso=") else sso_token
sso_token = _sanitize_header_value(
sso_token, field_name="sso_token", remove_all_spaces=True
)

# SSO Cookie
cookie = f"sso={sso_token}; sso-rw={sso_token}"

# CF Cookies
cf_cookies = get_config("proxy.cf_cookies") or ""
cf_clearance = (get_config("proxy.cf_clearance") or "").strip()
cf_cookies = _sanitize_header_value(
get_config("proxy.cf_cookies") or "", field_name="proxy.cf_cookies"
)
cf_clearance = _sanitize_header_value(
get_config("proxy.cf_clearance") or "",
field_name="proxy.cf_clearance",
remove_all_spaces=True,
)
cf_refresh_enabled = bool(get_config("proxy.enabled"))

if cf_refresh_enabled:
Expand Down Expand Up @@ -159,9 +214,12 @@ def build_ws_headers(token: Optional[str] = None, origin: Optional[str] = None,
Returns:
Dict[str, str]: The headers dictionary.
"""
user_agent = get_config("proxy.user_agent")
user_agent = _sanitize_header_value(
get_config("proxy.user_agent"), field_name="proxy.user_agent"
)
safe_origin = _sanitize_header_value(origin or "https://grok.com", field_name="origin")
headers = {
"Origin": origin or "https://grok.com",
"Origin": safe_origin,
"User-Agent": user_agent,
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Cache-Control": "no-cache",
Expand Down Expand Up @@ -194,14 +252,20 @@ def build_headers(cookie_token: str, content_type: Optional[str] = None, origin:
Returns:
Dict[str, str]: The headers dictionary.
"""
user_agent = get_config("proxy.user_agent")
user_agent = _sanitize_header_value(
get_config("proxy.user_agent"), field_name="proxy.user_agent"
)
safe_origin = _sanitize_header_value(origin or "https://grok.com", field_name="origin")
safe_referer = _sanitize_header_value(
referer or "https://grok.com/", field_name="referer"
)
headers = {
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Baggage": "sentry-environment=production,sentry-release=d6add6fb0460641fd482d767a335ef72b9b6abb8,sentry-public_key=b311e0f2690c81f25e2c4cf6d4f7ce1c",
"Origin": origin or "https://grok.com",
"Origin": safe_origin,
"Priority": "u=1, i",
"Referer": referer or "https://grok.com/",
"Referer": safe_referer,
"Sec-Fetch-Mode": "cors",
"User-Agent": user_agent,
}
Expand Down
36 changes: 35 additions & 1 deletion app/services/token/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from enum import Enum
from typing import Optional, List
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, field_validator
from datetime import datetime


Expand Down Expand Up @@ -71,6 +71,40 @@ class TokenInfo(BaseModel):
note: str = ""
last_asset_clear_at: Optional[int] = None

@field_validator("token", mode="before")
@classmethod
def _normalize_token(cls, value):
"""Normalize copied tokens to avoid unicode punctuation issues."""
if value is None:
raise ValueError("token cannot be empty")
token = str(value)
token = token.translate(
str.maketrans(
{
"\u2010": "-",
"\u2011": "-",
"\u2012": "-",
"\u2013": "-",
"\u2014": "-",
"\u2212": "-",
"\u00a0": " ",
"\u2007": " ",
"\u202f": " ",
"\u200b": "",
"\u200c": "",
"\u200d": "",
"\ufeff": "",
}
)
)
token = "".join(token.split())
if token.startswith("sso="):
token = token[4:]
token = token.encode("ascii", errors="ignore").decode("ascii")
if not token:
raise ValueError("token cannot be empty")
return token

def is_available(self) -> bool:
"""检查是否可用(状态正常且配额 > 0)"""
return self.status == TokenStatus.ACTIVE and self.quota > 0
Expand Down
Loading