feat: 网关全面优化 - SSE流式转发/连接池/Token优先级/持久化

- 移除JWT格式过滤(token.count('.')>=2),改为len>200
- HUAWEI_TOKEN环境变量设为最高优先级
- Token持久化到/etc/huawei-gateway.env,重启自动恢复
- SSE流式转发(stream=True + iter_content)
- requests.Session连接池(20连接, 3次重试)
- Waitress线程数16→32
- 过滤hop-by-hop头(Connection/Keep-Alive/Upgrade)
- pip安装增加waitress依赖
- 新增独立huawei_gateway.py文件
This commit is contained in:
chaos
2026-07-02 16:23:22 +08:00
parent dbbac3d146
commit e9c635dbd0
2 changed files with 522 additions and 17 deletions
+107 -17
View File
@@ -408,7 +408,7 @@ fi
# 安装依赖(使用国内镜像)
log_info "正在安装 Flask 和 requests..."
$PIP_CMD install --upgrade pip --quiet -i https://pypi.tuna.tsinghua.edu.cn/simple 2>/dev/null
$PIP_CMD install flask requests --quiet -i https://pypi.tuna.tsinghua.edu.cn/simple
$PIP_CMD install flask requests waitress --quiet -i https://pypi.tuna.tsinghua.edu.cn/simple
# ================= 创建网关核心代码 =================
mkdir -p /usr/local/bin /var/log /var/run
@@ -419,8 +419,12 @@ cat << 'PYEOF' > /usr/local/bin/huawei_gateway.py
华为云 Token 动态网关
- 6小时缓存机制
- 支持内存扫描自动刷新
- HUAWEI_TOKEN 环境变量最高优先级
- Token 持久化到 /etc/huawei-gateway.env
- SSE 流式转发
- 连接池 (20 连接)
- 401 自动重试
- 兼容生产环境 (Waitress/Gunicorn)
- 兼容生产环境 (Waitress 32线程 / Gunicorn)
"""
import os
import re
@@ -557,7 +561,7 @@ def scan_pid_mem(pid):
for match in TOKEN_PATTERN.finditer(data):
token = match.group(1).decode('ascii', errors='replace')
# 验证 token 格式(华为云 token 通常是 JWT 格式)
if len(token) > 200 and token.count('.') >= 2:
if len(token) > 200:
return token
remaining -= len(data)
@@ -571,6 +575,34 @@ def scan_pid_mem(pid):
def find_token_in_memory():
"""在所有进程中扫描 Token"""
# HUAWEI_TOKEN 环境变量优先级最高
env_token = os.environ.get('HUAWEI_TOKEN', '').strip()
if env_token and len(env_token) > 200 and not cache.is_blacklisted(env_token):
cached = cache.get()
if cached != env_token:
cache.set(env_token)
logger.info("Token 从 HUAWEI_TOKEN 环境变量加载")
return env_token
# 从持久化文件加载
env_file = '/etc/huawei-gateway.env'
if os.path.isfile(env_file):
try:
with open(env_file, 'r') as f:
for line in f:
line = line.strip()
if line.startswith('HUAWEI_TOKEN='):
file_token = line.split('=', 1)[1].strip().strip('"').strip("'")
if file_token and len(file_token) > 200 and not cache.is_blacklisted(file_token):
cached = cache.get()
if cached != file_token:
cache.set(file_token)
logger.info("Token 从持久化文件加载")
return file_token
break
except (OSError, IOError):
pass
if cache.is_scan_cooldown() and cache.get():
return cache.get()
@@ -616,6 +648,16 @@ def find_token_in_memory():
return cache.get() # 返回可能过期的缓存作为兜底
# ================= HTTP 会话池 =================
http_session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=20,
pool_maxsize=20,
max_retries=3
)
http_session.mount('https://', adapter)
http_session.mount('http://', adapter)
# ================= Flask 应用 =================
app = Flask(__name__)
@@ -639,7 +681,13 @@ def set_token():
if not token or len(token) < 100:
return {"error": "请提供有效的 token"}, 400
cache.set(token)
logger.info("Token 已手动注入")
# 持久化到文件以便重启后恢复
try:
with open('/etc/huawei-gateway.env', 'w') as f:
f.write(f'HUAWEI_TOKEN={token}\n')
except (OSError, IOError):
pass
logger.info("Token 已手动注入并持久化")
return {"status": "ok", "token_fingerprint": cache._fingerprint(token)}, 200
@@ -661,7 +709,7 @@ def proxy(subpath):
headers = {}
for k, v in request.headers:
kl = k.lower()
if kl not in ('host', 'content-length', 'connection', 'accept-encoding'):
if kl not in ('host', 'content-length', 'connection', 'accept-encoding', 'transfer-encoding'):
headers[k] = v
headers['Authorization'] = f'Bearer {real_token}'
@@ -670,7 +718,8 @@ def proxy(subpath):
target_url = f'https://{TARGET_HOST}/v2/{subpath}'
try:
resp = requests.request(
# 使用 stream=True 支持 SSE 流式转发
resp = http_session.request(
method=request.method,
url=target_url,
headers=headers,
@@ -678,37 +727,62 @@ def proxy(subpath):
cookies=request.cookies,
allow_redirects=False,
timeout=60,
stream=False
stream=True
)
# 401 兜底:Token 可能提前过期,加入黑名单后强制刷新重试
if resp.status_code == 401:
resp.close()
logger.warning("收到 401,将当前 Token 加入黑名单并强制刷新...")
cache.blacklist_current()
new_token = find_token_in_memory()
if new_token and new_token != real_token:
headers['Authorization'] = f'Bearer {new_token}'
resp = requests.request(
resp = http_session.request(
method=request.method,
url=target_url,
headers=headers,
data=request.get_data(),
cookies=request.cookies,
allow_redirects=False,
timeout=60
timeout=60,
stream=True
)
# 构建响应
# 过滤 hop-by-hop 头和压缩编码头
skip_headers = {'transfer-encoding', 'content-encoding', 'content-length',
'connection', 'keep-alive', 'upgrade'}
response_headers = []
for k, v in resp.headers.items():
if k.lower() not in ('transfer-encoding', 'content-encoding', 'content-length'):
if k.lower() not in skip_headers:
response_headers.append((k, v))
return Response(
resp.content,
status=resp.status_code,
headers=response_headers
)
# SSE 流式转发
content_type = resp.headers.get('Content-Type', '')
if 'text/event-stream' in content_type or resp.headers.get('Transfer-Encoding', '') == 'chunked':
def sse_stream():
try:
for chunk in resp.iter_content(chunk_size=4096):
if chunk:
yield chunk
finally:
resp.close()
return Response(
sse_stream(),
status=resp.status_code,
headers=response_headers,
direct_passthrough=True
)
else:
# 非流式响应:读取完整内容
content = resp.content
resp.close()
return Response(
content,
status=resp.status_code,
headers=response_headers
)
except requests.exceptions.Timeout:
logger.error("请求华为云 API 超时")
@@ -725,11 +799,27 @@ def main():
port = int(sys.argv[1]) if len(sys.argv) > 1 else 8080
host = sys.argv[2] if len(sys.argv) > 2 else '127.0.0.1'
# 从持久化文件加载 token
env_file = '/etc/huawei-gateway.env'
if os.path.isfile(env_file):
try:
with open(env_file, 'r') as f:
for line in f:
line = line.strip()
if line.startswith('HUAWEI_TOKEN=') and 'HUAWEI_TOKEN' not in os.environ:
val = line.split('=', 1)[1].strip().strip('"').strip("'")
if val and len(val) > 200:
os.environ['HUAWEI_TOKEN'] = val
logger.info("从持久化文件恢复 Token")
break
except (OSError, IOError):
pass
# 尝试使用生产级 WSGI 服务器
try:
import waitress
logger.info(f"使用 Waitress 启动网关 ({host}:{port})")
waitress.serve(app, host=host, port=port, threads=16)
waitress.serve(app, host=host, port=port, threads=32)
except ImportError:
try:
import gunicorn.app.wsgiapp