fix: 调整sse缓冲区的刷新策略 平滑相应
This commit is contained in:
1
go.mod
1
go.mod
@@ -17,6 +17,7 @@ require (
|
|||||||
github.com/json-iterator/go v1.1.12 // indirect
|
github.com/json-iterator/go v1.1.12 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
|
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
|
||||||
github.com/leodido/go-urn v1.4.0 // indirect
|
github.com/leodido/go-urn v1.4.0 // indirect
|
||||||
|
github.com/lmittmann/tint v1.1.2 // indirect
|
||||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -31,6 +31,8 @@ github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu
|
|||||||
github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
|
github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
|
||||||
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
|
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
|
||||||
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
|
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
|
||||||
|
github.com/lmittmann/tint v1.1.2 h1:2CQzrL6rslrsyjqLDwD11bZ5OpLBPU+g3G/r5LSfS8w=
|
||||||
|
github.com/lmittmann/tint v1.1.2/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
|
||||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||||
|
|||||||
180
main.go
180
main.go
@@ -1,6 +1,8 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@@ -8,10 +10,13 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
|
"runtime/debug"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/lmittmann/tint"
|
||||||
)
|
)
|
||||||
|
|
||||||
// 全局请求计数器,使用原子操作确保线程安全
|
// 全局请求计数器,使用原子操作确保线程安全
|
||||||
@@ -20,61 +25,51 @@ var requestCounter int64
|
|||||||
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|
||||||
port := flag.Int("port", 8080, "代理服务器监听的端口")
|
port := flag.Int("port", 8080, "代理服务器监听的端口")
|
||||||
debug := flag.Bool("debug", false, "是否启用调试模式")
|
debug := flag.Bool("debug", false, "是否启用调试模式")
|
||||||
logFile := flag.String("log", "", "日志文件路径,默认为标准输出")
|
logFile := flag.String("log", "", "日志文件路径,默认为控制台彩色输出")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
// 配置slog
|
// 使用 tint + LevelVar
|
||||||
var logger *slog.Logger
|
var levelVar = new(slog.LevelVar)
|
||||||
|
|
||||||
// 根据调试模式设置日志级别
|
|
||||||
var logLevel slog.Level
|
|
||||||
if *debug {
|
if *debug {
|
||||||
logLevel = slog.LevelDebug
|
levelVar.Set(slog.LevelDebug)
|
||||||
} else {
|
} else {
|
||||||
logLevel = slog.LevelInfo
|
levelVar.Set(slog.LevelInfo)
|
||||||
}
|
|
||||||
|
|
||||||
// 创建处理器选项
|
|
||||||
opts := &slog.HandlerOptions{
|
|
||||||
Level: logLevel,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 组合输出 writer
|
||||||
|
var writer io.Writer = os.Stderr // 默认彩色输出到 stderr
|
||||||
if *logFile != "" {
|
if *logFile != "" {
|
||||||
file, err := os.OpenFile(*logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
|
f, err := os.OpenFile(*logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("无法打开日志文件", "error", err)
|
fmt.Fprintf(os.Stderr, "无法打开日志文件: %v\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
logger = slog.New(slog.NewJSONHandler(file, opts))
|
// 同时输出到彩色终端和文件(文件里不需要颜色,tint 会根据是否是终端决定)
|
||||||
} else {
|
writer = io.MultiWriter(os.Stderr, f)
|
||||||
logger = slog.New(slog.NewJSONHandler(os.Stdout, opts))
|
|
||||||
}
|
}
|
||||||
slog.SetDefault(logger)
|
|
||||||
|
|
||||||
|
handler := tint.NewHandler(writer, &tint.Options{
|
||||||
|
AddSource: true,
|
||||||
|
Level: levelVar,
|
||||||
|
TimeFormat: "2006-01-02 15:04:05",
|
||||||
|
})
|
||||||
|
slog.SetDefault(slog.New(handler))
|
||||||
|
|
||||||
if *debug {
|
if *debug {
|
||||||
gin.SetMode(gin.DebugMode) // 启用调试模式
|
gin.SetMode(gin.DebugMode)
|
||||||
} else {
|
} else {
|
||||||
gin.SetMode(gin.ReleaseMode) // 在调试时暂时注释掉
|
gin.SetMode(gin.ReleaseMode)
|
||||||
}
|
}
|
||||||
|
|
||||||
r := gin.Default()
|
r := gin.New() // 不使用默认 Logger,改为自定义 slog 统一输出
|
||||||
|
r.Use(SlogLogger(), SlogRecovery())
|
||||||
// 处理根路径
|
|
||||||
r.GET("/", HelloPage)
|
r.GET("/", HelloPage)
|
||||||
|
|
||||||
// 使用 "catch-all" 路由来捕获所有代理请求
|
|
||||||
// 这里我们使用 /proxy/* 前缀来避免与根路径冲突
|
|
||||||
r.Any("/proxy/*proxyPath", proxyHandler)
|
r.Any("/proxy/*proxyPath", proxyHandler)
|
||||||
|
r.Any(":protocol/*remainder", protocolHandler)
|
||||||
|
|
||||||
// 为了保持向后兼容,我们也可以处理直接的URL请求
|
slog.Info("HTTP 代理服务器启动", "port", *port, "debug", *debug)
|
||||||
// 检查是否以协议开头的路径
|
|
||||||
r.Any("/:protocol/*remainder", protocolHandler)
|
|
||||||
|
|
||||||
slog.Info("HTTP 代理服务器启动", "port", *port)
|
|
||||||
if err := r.Run(fmt.Sprintf(":%d", *port)); err != nil {
|
if err := r.Run(fmt.Sprintf(":%d", *port)); err != nil {
|
||||||
slog.Error("启动服务器失败", "error", err)
|
slog.Error("启动服务器失败", "error", err)
|
||||||
}
|
}
|
||||||
@@ -189,8 +184,14 @@ func executeProxy(c *gin.Context, targetURLStr string) {
|
|||||||
"uri", c.Request.RequestURI,
|
"uri", c.Request.RequestURI,
|
||||||
"target", targetURLStr)
|
"target", targetURLStr)
|
||||||
|
|
||||||
|
// 自定义 Transport,禁止自动压缩(避免 gzip 聚合导致 SSE 延迟)
|
||||||
|
transport := &http.Transport{
|
||||||
|
Proxy: http.ProxyFromEnvironment,
|
||||||
|
DisableCompression: true,
|
||||||
|
}
|
||||||
|
client := &http.Client{Transport: transport}
|
||||||
|
|
||||||
// 创建到目标服务器的请求
|
// 创建到目标服务器的请求
|
||||||
// 注意:我们直接将原始请求的 Body 传递过去
|
|
||||||
proxyReq, err := http.NewRequest(c.Request.Method, targetURLStr, c.Request.Body)
|
proxyReq, err := http.NewRequest(c.Request.Method, targetURLStr, c.Request.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("创建代理请求失败", "reqID", reqID, "error", err)
|
slog.Error("创建代理请求失败", "reqID", reqID, "error", err)
|
||||||
@@ -198,11 +199,11 @@ func executeProxy(c *gin.Context, targetURLStr string) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 复制原始请求的 Headers
|
// 复制原始请求的 Headers (Clone 避免引用共享)
|
||||||
proxyReq.Header = c.Request.Header
|
proxyReq.Header = c.Request.Header.Clone()
|
||||||
|
// 禁止上游压缩,保证事件粒度
|
||||||
|
proxyReq.Header.Del("Accept-Encoding")
|
||||||
|
|
||||||
// 发送代理请求
|
|
||||||
client := &http.Client{}
|
|
||||||
resp, err := client.Do(proxyReq)
|
resp, err := client.Do(proxyReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("请求目标服务器失败", "reqID", reqID, "error", err)
|
slog.Error("请求目标服务器失败", "reqID", reqID, "error", err)
|
||||||
@@ -211,28 +212,70 @@ func executeProxy(c *gin.Context, targetURLStr string) {
|
|||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
slog.Debug("收到响应",
|
contentType := resp.Header.Get("Content-Type")
|
||||||
"reqID", reqID,
|
isSSE := strings.HasPrefix(contentType, "text/event-stream")
|
||||||
"status_code", resp.StatusCode,
|
|
||||||
"status", resp.Status)
|
|
||||||
|
|
||||||
// 复制目标服务器响应的 Headers 到原始响应
|
slog.Debug("收到响应", "reqID", reqID, "status_code", resp.StatusCode, "status", resp.Status, "isSSE", isSSE)
|
||||||
|
|
||||||
|
// 复制响应头
|
||||||
for key, values := range resp.Header {
|
for key, values := range resp.Header {
|
||||||
for _, value := range values {
|
for _, value := range values {
|
||||||
c.Header(key, value)
|
c.Header(key, value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// SSE 需要去掉不合适的头并设置必要头
|
||||||
|
if isSSE {
|
||||||
|
c.Writer.Header().Del("Content-Length")
|
||||||
|
c.Writer.Header().Del("Transfer-Encoding")
|
||||||
|
c.Header("Content-Type", "text/event-stream")
|
||||||
|
c.Header("Cache-Control", "no-cache")
|
||||||
|
c.Header("Connection", "keep-alive")
|
||||||
|
c.Header("X-Accel-Buffering", "no") // 防止某些反向代理缓冲
|
||||||
|
}
|
||||||
|
|
||||||
// 将目标服务器的响应状态码设置到原始响应
|
// 设置状态码
|
||||||
c.Status(resp.StatusCode)
|
c.Status(resp.StatusCode)
|
||||||
|
|
||||||
// 将目标服务器的响应 Body 直接流式传输到客户端
|
// 立即 flush 头部,尤其是 SSE
|
||||||
// 使用 io.Copy 更高效,并能处理各种编码(如 chunked)
|
if flusher, ok := c.Writer.(http.Flusher); ok {
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
if !isSSE {
|
||||||
|
// 普通请求直接复制主体
|
||||||
bytesCopied, err := io.Copy(c.Writer, resp.Body)
|
bytesCopied, err := io.Copy(c.Writer, resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("写入响应 Body 时出错", "reqID", reqID, "error", err)
|
slog.Error("写入响应 Body 时出错", "reqID", reqID, "error", err)
|
||||||
}
|
}
|
||||||
slog.Debug("响应写入完成", "reqID", reqID, "bytes_copied", bytesCopied)
|
slog.Debug("响应写入完成", "reqID", reqID, "bytes_copied", bytesCopied)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// SSE 模式:逐行读取并 flush,保持事件实时性
|
||||||
|
reader := bufio.NewReader(resp.Body)
|
||||||
|
w := c.Writer
|
||||||
|
flusher, _ := w.(http.Flusher)
|
||||||
|
|
||||||
|
for {
|
||||||
|
line, err := reader.ReadBytes('\n')
|
||||||
|
if len(line) > 0 {
|
||||||
|
if _, werr := w.Write(line); werr != nil {
|
||||||
|
slog.Warn("SSE 写失败", "reqID", reqID, "error", werr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if flusher != nil {
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
slog.Debug("SSE 结束(EOF)", "reqID", reqID)
|
||||||
|
} else {
|
||||||
|
slog.Error("读取 SSE 失败", "reqID", reqID, "error", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -249,3 +292,48 @@ func HelloPage(c *gin.Context) {
|
|||||||
str += "目标URL必须以 https:// 或 http:// 开头。\n\n"
|
str += "目标URL必须以 https:// 或 http:// 开头。\n\n"
|
||||||
c.String(200, str)
|
c.String(200, str)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SlogLogger 统一请求日志中间件
|
||||||
|
func SlogLogger() gin.HandlerFunc {
|
||||||
|
return func(c *gin.Context) {
|
||||||
|
start := time.Now()
|
||||||
|
path := c.Request.URL.Path
|
||||||
|
rawQuery := c.Request.URL.RawQuery
|
||||||
|
c.Next()
|
||||||
|
latency := time.Since(start)
|
||||||
|
status := c.Writer.Status()
|
||||||
|
size := c.Writer.Size()
|
||||||
|
method := c.Request.Method
|
||||||
|
ip := c.ClientIP()
|
||||||
|
if rawQuery != "" {
|
||||||
|
path = path + "?" + rawQuery
|
||||||
|
}
|
||||||
|
slog.Log(c, slog.LevelInfo, "HTTP 请求",
|
||||||
|
slog.String("method", method),
|
||||||
|
slog.String("path", path),
|
||||||
|
slog.Int("status", status),
|
||||||
|
slog.Duration("latency", latency),
|
||||||
|
slog.Int("size", size),
|
||||||
|
slog.String("ip", ip),
|
||||||
|
slog.String("ua", c.GetHeader("User-Agent")),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SlogRecovery 捕获 panic,输出堆栈
|
||||||
|
func SlogRecovery() gin.HandlerFunc {
|
||||||
|
return func(c *gin.Context) {
|
||||||
|
defer func() {
|
||||||
|
if rcv := recover(); rcv != nil {
|
||||||
|
stack := debug.Stack()
|
||||||
|
slog.Error("发生 panic",
|
||||||
|
"error", rcv,
|
||||||
|
"stack", string(stack),
|
||||||
|
"path", c.Request.URL.Path,
|
||||||
|
)
|
||||||
|
c.AbortWithStatus(http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
c.Next()
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user