diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go new file mode 100644 index 0000000..105637d --- /dev/null +++ b/internal/metrics/metrics.go @@ -0,0 +1,65 @@ +package metrics + +import ( + "sync" + "sync/atomic" + "time" +) + +// 环形秒级窗口,用于计算 QPS / QPM。 +// 只针对转发请求调用 Inc。 + +type bucket struct { + second int64 // Unix 秒 + count int64 +} + +var ( + buckets [60]bucket + mu sync.Mutex + total atomic.Int64 +) + +// Inc 增加一次请求计数 +func Inc() { + now := time.Now().Unix() + idx := now % 60 + mu.Lock() + b := &buckets[idx] + if b.second != now { // 该槽位属于旧秒,重置 + b.second = now + b.count = 0 + } + b.count++ + mu.Unlock() + total.Add(1) +} + +// QPS 返回当前秒内的请求数 +func QPS() int64 { + now := time.Now().Unix() + idx := now % 60 + mu.Lock() + b := buckets[idx] + mu.Unlock() + if b.second == now { return b.count } + return 0 +} + +// QPM 返回最近 60 秒内的请求总数 +func QPM() int64 { + now := time.Now().Unix() + var sum int64 + mu.Lock() + for i := 0; i < 60; i++ { + b := buckets[i] + if now-b.second < 60 { // 在窗口内 + sum += b.count + } + } + mu.Unlock() + return sum +} + +// Total 返回累计转发请求数 +func Total() int64 { return total.Load() } diff --git a/internal/middleware/metrics.go b/internal/middleware/metrics.go new file mode 100644 index 0000000..1db8e69 --- /dev/null +++ b/internal/middleware/metrics.go @@ -0,0 +1,127 @@ +package middleware + +import ( + "sync/atomic" + "time" + + "github.com/gin-gonic/gin" +) + +// 简易 QPS / QPM 统计:使用滑动窗口环形数组按秒/按分钟聚合 +// secondBuckets: 最近 60 秒每秒的请求计数 +// minuteBuckets: 最近 60 分钟每分钟的请求计数 + +var ( + secondBuckets [60]atomic.Int64 + minuteBuckets [60]atomic.Int64 + lastSecond int64 + lastMinute int64 + // 总请求数 (复用可选) + totalRequests atomic.Int64 +) + +func init() { + now := time.Now() + lastSecond = now.Unix() + lastMinute = now.Unix() / 60 +} + +// AddRequest 在收到一个请求时调用,通常在请求完成后计数 +func AddRequest() { + now := time.Now() + sec := now.Unix() + min := sec / 60 + + // 处理秒级 bucket + oldSec := atomic.LoadInt64(&lastSecond) + if sec != oldSec { + // 跨秒:清理可能跨越多个秒的间隙 + if atomic.CompareAndSwapInt64(&lastSecond, oldSec, sec) { + steps := int(sec - oldSec) + if steps > 60 { steps = 60 } + for i := 1; i <= steps; i++ { + idx := int((oldSec+int64(i)) % 60) + secondBuckets[idx].Store(0) + } + } + } + secIdx := int(sec % 60) + secondBuckets[secIdx].Add(1) + + // 处理分钟级 bucket + oldMin := atomic.LoadInt64(&lastMinute) + if min != oldMin { + if atomic.CompareAndSwapInt64(&lastMinute, oldMin, min) { + steps := int(min - oldMin) + if steps > 60 { steps = 60 } + for i := 1; i <= steps; i++ { + idx := int((oldMin+int64(i)) % 60) + minuteBuckets[idx].Store(0) + } + } + } + minIdx := int(min % 60) + minuteBuckets[minIdx].Add(1) + + totalRequests.Add(1) +} + +// CurrentQPS 返回最近 1 秒(当前秒)的请求数 +func CurrentQPS() int64 { + sec := time.Now().Unix() + if sec != atomic.LoadInt64(&lastSecond) { return 0 } + return secondBuckets[sec%60].Load() +} + +// AvgQPSRecent60 返回最近 60 秒平均 QPS +func AvgQPSRecent60() float64 { + sec := time.Now().Unix() + total := int64(0) + last := atomic.LoadInt64(&lastSecond) + for i := 0; i < 60; i++ { + // 只统计在窗口内(未被清零)的 bucket + bucketSec := sec - int64(i) + if bucketSec <= last && last-bucketSec < 60 { + idx := bucketSec % 60 + total += secondBuckets[idx].Load() + } + } + return float64(total) / 60.0 +} + +// CurrentQPM 返回当前分钟的请求数 +func CurrentQPM() int64 { + min := time.Now().Unix() / 60 + if min != atomic.LoadInt64(&lastMinute) { return 0 } + return minuteBuckets[min%60].Load() +} + +// AvgQPMRecent60 返回最近 60 分钟的平均 QPM +func AvgQPMRecent60() float64 { + min := time.Now().Unix() / 60 + total := int64(0) + last := atomic.LoadInt64(&lastMinute) + for i := 0; i < 60; i++ { + bucketMin := min - int64(i) + if bucketMin <= last && last-bucketMin < 60 { + idx := bucketMin % 60 + total += minuteBuckets[idx].Load() + } + } + return float64(total) / 60.0 +} + +// TotalRequests 返回总请求量(从进程启动以来) +func TotalRequests() int64 { return totalRequests.Load() } + +// MetricsHandler 输出当前指标 +func MetricsHandler(c *gin.Context) { + c.JSON(200, gin.H{ + "qps_current": CurrentQPS(), + "qps_avg_60s": AvgQPSRecent60(), + "qpm_current": CurrentQPM(), + "qpm_avg_60m": AvgQPMRecent60(), + "total": TotalRequests(), + "timestamp": time.Now().Unix(), + }) +} diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 83c3870..8f1bde3 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -13,6 +13,7 @@ import ( "github.com/gin-gonic/gin" + "anyproxy/internal/metrics" "anyproxy/internal/middleware" ) @@ -83,6 +84,9 @@ func (p *Proxy) forward(c *gin.Context, target string) { } defer resp.Body.Close() + // 仅在真正进行了一次上游转发并得到响应后计数 + metrics.Inc() + mediaType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")) isSSE := mediaType == "text/event-stream" @@ -137,7 +141,9 @@ func (p *Proxy) forward(c *gin.Context, target string) { // HelloPage 返回简单状态页面 func HelloPage(c *gin.Context) { - count := totalForwarded.Load() + count := metrics.Total() + qps := metrics.QPS() + qpm := metrics.QPM() // 推断外部可见协议与主机(支持反向代理常见头) scheme := "http" @@ -152,7 +158,7 @@ func HelloPage(c *gin.Context) { } base := scheme + "://" + host - str := fmt.Sprintf("AnyProxy 服务器正在运行... 已转发 %d 个请求", count) + str := fmt.Sprintf("AnyProxy 服务器正在运行...\n累计转发(不含本页): %d\n当前QPS: %d\n最近1分钟QPM: %d", count, qps, qpm) str += "\n\n使用方法:\n" str += "方式1 - 直接协议路径: \n" str += fmt.Sprintf(" 目标URL: https://example.com/path --> 代理URL: %s/https/example.com/path\n", base) diff --git a/main.go b/main.go index ff5d63a..e65cc5b 100644 --- a/main.go +++ b/main.go @@ -61,6 +61,7 @@ func main() { r.Use(middleware.Recovery(logger), middleware.RequestID(), middleware.Logger(logger)) r.GET("/", proxy.HelloPage) // 欢迎页面 + r.GET("/metrics", middleware.MetricsHandler) // 指标接口 r.Any("/proxy/*proxyPath", p.HandleProxyPath) // 处理 /proxy/*path 形式的请求 r.Any(":protocol/*remainder", p.HandleProtocol) // 处理 /:protocol/*remainder 形式的请求