最新版本any路由器本地网关即可修复(auto mode可以同步开启!)

最新版本any路由器本地网关即可修复(auto mode可以同步开启!)
最新版本any路由器本地网关即可修复(auto mode可以同步开启!)

any路由器因为跟claude code有一些参数适配的问题,所以我们可以在本地架设一个简单的网关,将参数在本地拦截,然后修改一下,再传给any大善人,就可以绕过这些参数适配的小问题了。

claudecode最新版本适用,不需要回退版本

这个本地网关做了什么

  1. 在本地监听 127.0.0.1:1998。
  2. 把 Claude Code 的请求转发到上游any的端口。
  3. 自动补认证头(Authorization / x-api-key)。
  4. 对 haiku 请求额外修正:补 context-1m-2025-08-07,并加 thinking.budget_tokens=1024。
  5. 把请求和响应写到 gateway_requests.jsonl 方便排错。

极简启动步骤

  1. 先开网关

export ANTHROPIC_BASE_URL=“any大善人地址”
export ANTHROPIC_AUTH_TOKEN=“你的真实token”
python3 /Users/Apple/Desktop/code/claude_gateway.py

  1. 新开一个终端再开 Claude Code

ANTHROPIC_BASE_URL=“http://127.0.0.1:1998” claude --enable-auto-mode

截图为证:

image

image

网关代码如下(vibe写的,很多冗余,大佬可以自行修改):

#!/usr/bin/env python3
import base64
import json
import os
import threading
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.error import HTTPError, URLError
from urllib.parse import urlsplit, urlunsplit
from urllib.request import Request, urlopen

LISTEN_HOST = os.getenv("CLAUDE_GATEWAY_HOST", "127.0.0.1")
LISTEN_PORT = int(os.getenv("CLAUDE_GATEWAY_PORT", "1998"))
UPSTREAM_BASE_URL = os.getenv(
    "ANTHROPIC_BASE_URL", "https://a-ocnfniawgw.cn-shanghai.fcapp.run"
)
UPSTREAM_AUTH_TOKEN = os.getenv("ANTHROPIC_AUTH_TOKEN", "")
UPSTREAM_TIMEOUT = float(os.getenv("CLAUDE_GATEWAY_TIMEOUT", "120"))
LOG_PATH = os.getenv(
    "CLAUDE_GATEWAY_LOG", os.path.join(os.path.dirname(__file__), "gateway_requests.jsonl")
)

LOG_LOCK = threading.Lock()


def utc_now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


def ensure_log_parent_exists() -> None:
    parent = os.path.dirname(LOG_PATH)
    if parent:
        os.makedirs(parent, exist_ok=True)


def decode_body_for_log(body: bytes) -> dict:
    if not body:
        return {"encoding": "utf-8", "text": ""}

    try:
        return {"encoding": "utf-8", "text": body.decode("utf-8")}
    except UnicodeDecodeError:
        return {"encoding": "base64", "text": base64.b64encode(body).decode("ascii")}


def append_log(record: dict) -> None:
    ensure_log_parent_exists()
    line = json.dumps(record, ensure_ascii=False)
    with LOG_LOCK:
        with open(LOG_PATH, "a", encoding="utf-8") as f:
            f.write(line + "\n")


def build_upstream_url(base_url: str, incoming_path: str) -> str:
    base = urlsplit(base_url)
    incoming = urlsplit(incoming_path)

    incoming_path_only = incoming.path or "/"
    base_path = base.path.rstrip("/")

    if base_path:
        merged_path = f"{base_path}{incoming_path_only}"
    else:
        merged_path = incoming_path_only

    return urlunsplit((base.scheme, base.netloc, merged_path, incoming.query, ""))


def rewrite_request_headers(headers: dict, path: str) -> dict:
    rewritten = dict(headers)

    if UPSTREAM_AUTH_TOKEN:
        has_x_api_key = any(k.lower() == "x-api-key" for k in rewritten)
        has_authorization = any(k.lower() == "authorization" for k in rewritten)

        if not has_x_api_key:
            rewritten["x-api-key"] = UPSTREAM_AUTH_TOKEN
        if not has_authorization:
            rewritten["Authorization"] = f"Bearer {UPSTREAM_AUTH_TOKEN}"

    # 先做骨架,后续按 any 规则逐步覆写。
    return rewritten


def rewrite_request_body(body: bytes, headers: dict, path: str) -> bytes:
    if not body:
        return body

    content_type = ""
    for k, v in headers.items():
        if k.lower() == "content-type":
            content_type = v
            break

    if "application/json" not in content_type.lower():
        return body

    try:
        payload = json.loads(body.decode("utf-8"))
    except (UnicodeDecodeError, json.JSONDecodeError):
        return body

    model = str(payload.get("model", "")).lower()
    if not model.startswith("claude-haiku"):
        return body

    beta_key = None
    for k in headers.keys():
        if k.lower() == "anthropic-beta":
            beta_key = k
            break

    raw_beta = headers.get(beta_key, "") if beta_key else ""
    beta_features = [item.strip() for item in raw_beta.split(",") if item.strip()]
    if "context-1m-2025-08-07" not in beta_features:
        beta_features.append("context-1m-2025-08-07")

    merged_beta = ",".join(beta_features)
    if beta_key:
        headers[beta_key] = merged_beta
    else:
        headers["anthropic-beta"] = merged_beta

    payload["thinking"] = {"type": "enabled", "budget_tokens": 1024}
    return json.dumps(payload, ensure_ascii=False, separators=(",", ":")).encode("utf-8")


class ClaudeGatewayHandler(BaseHTTPRequestHandler):
    protocol_version = "HTTP/1.1"

    def do_GET(self):
        self._handle_proxy()

    def do_POST(self):
        self._handle_proxy()

    def do_PUT(self):
        self._handle_proxy()

    def do_PATCH(self):
        self._handle_proxy()

    def do_DELETE(self):
        self._handle_proxy()

    def do_OPTIONS(self):
        self._handle_proxy()

    def do_HEAD(self):
        self._handle_proxy()

    def log_message(self, fmt, *args):
        return

    def _read_request_body(self) -> bytes:
        content_length = int(self.headers.get("Content-Length", "0") or "0")
        if content_length <= 0:
            return b""
        return self.rfile.read(content_length)

    def _copy_request_headers(self) -> dict:
        headers = {}
        for key, value in self.headers.items():
            key_l = key.lower()
            if key_l in {"host", "content-length", "connection", "accept-encoding"}:
                continue
            headers[key] = value
        return headers

    def _send_response(self, status: int, headers: dict, body: bytes) -> None:
        self.send_response(status)

        ignored = {"transfer-encoding", "content-length", "connection"}
        for k, v in headers.items():
            if k.lower() in ignored:
                continue
            self.send_header(k, v)

        self.send_header("Content-Length", str(len(body)))
        self.send_header("Connection", "close")
        self.end_headers()

        if self.command != "HEAD" and body:
            self.wfile.write(body)

    def _handle_proxy(self):
        req_body = self._read_request_body()
        req_headers = self._copy_request_headers()
        req_headers = rewrite_request_headers(req_headers, self.path)
        req_body = rewrite_request_body(req_body, req_headers, self.path)

        upstream_url = build_upstream_url(UPSTREAM_BASE_URL, self.path)

        request_log = {
            "timestamp": utc_now_iso(),
            "client_ip": self.client_address[0] if self.client_address else "",
            "method": self.command,
            "path": self.path,
            "upstream_url": upstream_url,
            "headers": dict(self.headers.items()),
            "body": decode_body_for_log(req_body),
            "body_length": len(req_body),
        }

        try:
            upstream_req = Request(
                url=upstream_url,
                data=req_body if req_body else None,
                headers=req_headers,
                method=self.command,
            )
            with urlopen(upstream_req, timeout=UPSTREAM_TIMEOUT) as resp:
                resp_status = resp.getcode()
                resp_headers = dict(resp.headers.items())
                resp_body = resp.read()

            request_log["response"] = {
                "status": resp_status,
                "headers": resp_headers,
                "body": decode_body_for_log(resp_body),
                "body_length": len(resp_body),
            }
            append_log(request_log)
            self._send_response(resp_status, resp_headers, resp_body)
            return

        except HTTPError as e:
            err_body = e.read() if hasattr(e, "read") else b""
            err_headers = dict(e.headers.items()) if getattr(e, "headers", None) else {}

            request_log["response"] = {
                "status": e.code,
                "headers": err_headers,
                "body": decode_body_for_log(err_body),
                "body_length": len(err_body),
            }
            append_log(request_log)
            self._send_response(e.code, err_headers, err_body)
            return

        except (URLError, TimeoutError, Exception) as e:
            error_payload = {
                "error": "gateway_upstream_error",
                "message": str(e),
            }
            error_body = json.dumps(error_payload, ensure_ascii=False).encode("utf-8")

            request_log["response"] = {
                "status": 502,
                "headers": {"Content-Type": "application/json; charset=utf-8"},
                "body": {"encoding": "utf-8", "text": error_body.decode("utf-8")},
                "body_length": len(error_body),
            }
            append_log(request_log)

            self._send_response(
                502,
                {"Content-Type": "application/json; charset=utf-8"},
                error_body,
            )


def main():
    server = ThreadingHTTPServer((LISTEN_HOST, LISTEN_PORT), ClaudeGatewayHandler)
    print(f"[gateway] listening on http://{LISTEN_HOST}:{LISTEN_PORT}")
    print(f"[gateway] upstream: {UPSTREAM_BASE_URL}")
    print(f"[gateway] auth token configured: {bool(UPSTREAM_AUTH_TOKEN)}")
    print(f"[gateway] log file: {LOG_PATH}")
    server.serve_forever()


if __name__ == "__main__":
    main()

3 个帖子 - 2 位参与者

阅读完整话题

来源: linux.do查看原文