From 5daf6df318dc34ad90670b0c94d4ec01980b0f01 Mon Sep 17 00:00:00 2001 From: kakune55 Date: Fri, 26 Sep 2025 12:02:41 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=B0=83=E6=95=B4sse=E7=BC=93=E5=86=B2?= =?UTF-8?q?=E5=8C=BA=E7=9A=84=E5=88=B7=E6=96=B0=E7=AD=96=E7=95=A5=20?= =?UTF-8?q?=E5=B9=B3=E6=BB=91=E7=9B=B8=E5=BA=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 1 + go.sum | 2 + main.go | 228 +++++++++++++++++++++++++++++++++++++++----------------- 3 files changed, 161 insertions(+), 70 deletions(-) diff --git a/go.mod b/go.mod index 3e60725..1326375 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect + github.com/lmittmann/tint v1.1.2 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect diff --git a/go.sum b/go.sum index 0123a87..395f4eb 100644 --- a/go.sum +++ b/go.sum @@ -31,6 +31,8 @@ github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/lmittmann/tint v1.1.2 h1:2CQzrL6rslrsyjqLDwD11bZ5OpLBPU+g3G/r5LSfS8w= +github.com/lmittmann/tint v1.1.2/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= diff --git a/main.go b/main.go index 9e88b6f..e676c6b 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,8 @@ package main import ( + "bufio" + "errors" "flag" "fmt" "io" @@ -8,10 +10,13 @@ import ( "net/http" "net/url" "os" + "runtime/debug" "strings" "sync/atomic" + "time" "github.com/gin-gonic/gin" + "github.com/lmittmann/tint" ) // 全局请求计数器,使用原子操作确保线程安全 @@ -20,64 +25,54 @@ var requestCounter int64 func main() { + port := flag.Int("port", 8080, "代理服务器监听的端口") + debug := flag.Bool("debug", false, "是否启用调试模式") + logFile := flag.String("log", "", "日志文件路径,默认为控制台彩色输出") + flag.Parse() - port := flag.Int("port", 8080, "代理服务器监听的端口") - debug := flag.Bool("debug", false, "是否启用调试模式") - logFile := flag.String("log", "", "日志文件路径,默认为标准输出") - flag.Parse() + // 使用 tint + LevelVar + var levelVar = new(slog.LevelVar) + if *debug { + levelVar.Set(slog.LevelDebug) + } else { + levelVar.Set(slog.LevelInfo) + } - // 配置slog - var logger *slog.Logger - - // 根据调试模式设置日志级别 - var logLevel slog.Level - if *debug { - logLevel = slog.LevelDebug - } else { - logLevel = slog.LevelInfo - } - - // 创建处理器选项 - opts := &slog.HandlerOptions{ - Level: logLevel, - } - - if *logFile != "" { - file, err := os.OpenFile(*logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - slog.Error("无法打开日志文件", "error", err) - os.Exit(1) - } - logger = slog.New(slog.NewJSONHandler(file, opts)) - } else { - logger = slog.New(slog.NewJSONHandler(os.Stdout, opts)) - } - slog.SetDefault(logger) + // 组合输出 writer + var writer io.Writer = os.Stderr // 默认彩色输出到 stderr + if *logFile != "" { + f, err := os.OpenFile(*logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + fmt.Fprintf(os.Stderr, "无法打开日志文件: %v\n", err) + os.Exit(1) + } + // 同时输出到彩色终端和文件(文件里不需要颜色,tint 会根据是否是终端决定) + writer = io.MultiWriter(os.Stderr, f) + } + handler := tint.NewHandler(writer, &tint.Options{ + AddSource: true, + Level: levelVar, + TimeFormat: "2006-01-02 15:04:05", + }) + slog.SetDefault(slog.New(handler)) - if *debug { - gin.SetMode(gin.DebugMode) // 启用调试模式 - } else { - gin.SetMode(gin.ReleaseMode) // 在调试时暂时注释掉 - } - - r := gin.Default() + if *debug { + gin.SetMode(gin.DebugMode) + } else { + gin.SetMode(gin.ReleaseMode) + } - // 处理根路径 + r := gin.New() // 不使用默认 Logger,改为自定义 slog 统一输出 + r.Use(SlogLogger(), SlogRecovery()) r.GET("/", HelloPage) - - // 使用 "catch-all" 路由来捕获所有代理请求 - // 这里我们使用 /proxy/* 前缀来避免与根路径冲突 - r.Any("/proxy/*proxyPath", proxyHandler) - - // 为了保持向后兼容,我们也可以处理直接的URL请求 - // 检查是否以协议开头的路径 - r.Any("/:protocol/*remainder", protocolHandler) + r.Any("/proxy/*proxyPath", proxyHandler) + r.Any(":protocol/*remainder", protocolHandler) - slog.Info("HTTP 代理服务器启动", "port", *port) - if err := r.Run(fmt.Sprintf(":%d", *port)); err != nil { - slog.Error("启动服务器失败", "error", err) - } + slog.Info("HTTP 代理服务器启动", "port", *port, "debug", *debug) + if err := r.Run(fmt.Sprintf(":%d", *port)); err != nil { + slog.Error("启动服务器失败", "error", err) + } } // normalizeURL 规范化URL格式,处理缺少斜杠的情况 @@ -188,9 +183,15 @@ func executeProxy(c *gin.Context, targetURLStr string) { "method", c.Request.Method, "uri", c.Request.RequestURI, "target", targetURLStr) - + + // 自定义 Transport,禁止自动压缩(避免 gzip 聚合导致 SSE 延迟) + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DisableCompression: true, + } + client := &http.Client{Transport: transport} + // 创建到目标服务器的请求 - // 注意:我们直接将原始请求的 Body 传递过去 proxyReq, err := http.NewRequest(c.Request.Method, targetURLStr, c.Request.Body) if err != nil { slog.Error("创建代理请求失败", "reqID", reqID, "error", err) @@ -198,11 +199,11 @@ func executeProxy(c *gin.Context, targetURLStr string) { return } - // 复制原始请求的 Headers - proxyReq.Header = c.Request.Header + // 复制原始请求的 Headers (Clone 避免引用共享) + proxyReq.Header = c.Request.Header.Clone() + // 禁止上游压缩,保证事件粒度 + proxyReq.Header.Del("Accept-Encoding") - // 发送代理请求 - client := &http.Client{} resp, err := client.Do(proxyReq) if err != nil { slog.Error("请求目标服务器失败", "reqID", reqID, "error", err) @@ -211,28 +212,70 @@ func executeProxy(c *gin.Context, targetURLStr string) { } defer resp.Body.Close() - slog.Debug("收到响应", - "reqID", reqID, - "status_code", resp.StatusCode, - "status", resp.Status) - - // 复制目标服务器响应的 Headers 到原始响应 + contentType := resp.Header.Get("Content-Type") + isSSE := strings.HasPrefix(contentType, "text/event-stream") + + slog.Debug("收到响应", "reqID", reqID, "status_code", resp.StatusCode, "status", resp.Status, "isSSE", isSSE) + + // 复制响应头 for key, values := range resp.Header { for _, value := range values { c.Header(key, value) } } + // SSE 需要去掉不合适的头并设置必要头 + if isSSE { + c.Writer.Header().Del("Content-Length") + c.Writer.Header().Del("Transfer-Encoding") + c.Header("Content-Type", "text/event-stream") + c.Header("Cache-Control", "no-cache") + c.Header("Connection", "keep-alive") + c.Header("X-Accel-Buffering", "no") // 防止某些反向代理缓冲 + } - // 将目标服务器的响应状态码设置到原始响应 + // 设置状态码 c.Status(resp.StatusCode) - // 将目标服务器的响应 Body 直接流式传输到客户端 - // 使用 io.Copy 更高效,并能处理各种编码(如 chunked) - bytesCopied, err := io.Copy(c.Writer, resp.Body) - if err != nil { - slog.Error("写入响应 Body 时出错", "reqID", reqID, "error", err) + // 立即 flush 头部,尤其是 SSE + if flusher, ok := c.Writer.(http.Flusher); ok { + flusher.Flush() + } + + if !isSSE { + // 普通请求直接复制主体 + bytesCopied, err := io.Copy(c.Writer, resp.Body) + if err != nil { + slog.Error("写入响应 Body 时出错", "reqID", reqID, "error", err) + } + slog.Debug("响应写入完成", "reqID", reqID, "bytes_copied", bytesCopied) + return + } + + // SSE 模式:逐行读取并 flush,保持事件实时性 + reader := bufio.NewReader(resp.Body) + w := c.Writer + flusher, _ := w.(http.Flusher) + + for { + line, err := reader.ReadBytes('\n') + if len(line) > 0 { + if _, werr := w.Write(line); werr != nil { + slog.Warn("SSE 写失败", "reqID", reqID, "error", werr) + return + } + if flusher != nil { + flusher.Flush() + } + } + if err != nil { + if errors.Is(err, io.EOF) { + slog.Debug("SSE 结束(EOF)", "reqID", reqID) + } else { + slog.Error("读取 SSE 失败", "reqID", reqID, "error", err) + } + return + } } - slog.Debug("响应写入完成", "reqID", reqID, "bytes_copied", bytesCopied) } @@ -248,4 +291,49 @@ func HelloPage(c *gin.Context) { str += " 目标URL: https://example.com --> 代理URL: http://AnyproxyIP/proxy/https://example.com\n\n" str += "目标URL必须以 https:// 或 http:// 开头。\n\n" c.String(200, str) +} + +// SlogLogger 统一请求日志中间件 +func SlogLogger() gin.HandlerFunc { + return func(c *gin.Context) { + start := time.Now() + path := c.Request.URL.Path + rawQuery := c.Request.URL.RawQuery + c.Next() + latency := time.Since(start) + status := c.Writer.Status() + size := c.Writer.Size() + method := c.Request.Method + ip := c.ClientIP() + if rawQuery != "" { + path = path + "?" + rawQuery + } + slog.Log(c, slog.LevelInfo, "HTTP 请求", + slog.String("method", method), + slog.String("path", path), + slog.Int("status", status), + slog.Duration("latency", latency), + slog.Int("size", size), + slog.String("ip", ip), + slog.String("ua", c.GetHeader("User-Agent")), + ) + } +} + +// SlogRecovery 捕获 panic,输出堆栈 +func SlogRecovery() gin.HandlerFunc { + return func(c *gin.Context) { + defer func() { + if rcv := recover(); rcv != nil { + stack := debug.Stack() + slog.Error("发生 panic", + "error", rcv, + "stack", string(stack), + "path", c.Request.URL.Path, + ) + c.AbortWithStatus(http.StatusInternalServerError) + } + }() + c.Next() + } } \ No newline at end of file