feat: 添加指标统计功能,支持QPS和QPM监控
This commit is contained in:
65
internal/metrics/metrics.go
Normal file
65
internal/metrics/metrics.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// 环形秒级窗口,用于计算 QPS / QPM。
|
||||
// 只针对转发请求调用 Inc。
|
||||
|
||||
type bucket struct {
|
||||
second int64 // Unix 秒
|
||||
count int64
|
||||
}
|
||||
|
||||
var (
|
||||
buckets [60]bucket
|
||||
mu sync.Mutex
|
||||
total atomic.Int64
|
||||
)
|
||||
|
||||
// Inc 增加一次请求计数
|
||||
func Inc() {
|
||||
now := time.Now().Unix()
|
||||
idx := now % 60
|
||||
mu.Lock()
|
||||
b := &buckets[idx]
|
||||
if b.second != now { // 该槽位属于旧秒,重置
|
||||
b.second = now
|
||||
b.count = 0
|
||||
}
|
||||
b.count++
|
||||
mu.Unlock()
|
||||
total.Add(1)
|
||||
}
|
||||
|
||||
// QPS 返回当前秒内的请求数
|
||||
func QPS() int64 {
|
||||
now := time.Now().Unix()
|
||||
idx := now % 60
|
||||
mu.Lock()
|
||||
b := buckets[idx]
|
||||
mu.Unlock()
|
||||
if b.second == now { return b.count }
|
||||
return 0
|
||||
}
|
||||
|
||||
// QPM 返回最近 60 秒内的请求总数
|
||||
func QPM() int64 {
|
||||
now := time.Now().Unix()
|
||||
var sum int64
|
||||
mu.Lock()
|
||||
for i := 0; i < 60; i++ {
|
||||
b := buckets[i]
|
||||
if now-b.second < 60 { // 在窗口内
|
||||
sum += b.count
|
||||
}
|
||||
}
|
||||
mu.Unlock()
|
||||
return sum
|
||||
}
|
||||
|
||||
// Total 返回累计转发请求数
|
||||
func Total() int64 { return total.Load() }
|
||||
127
internal/middleware/metrics.go
Normal file
127
internal/middleware/metrics.go
Normal file
@@ -0,0 +1,127 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// 简易 QPS / QPM 统计:使用滑动窗口环形数组按秒/按分钟聚合
|
||||
// secondBuckets: 最近 60 秒每秒的请求计数
|
||||
// minuteBuckets: 最近 60 分钟每分钟的请求计数
|
||||
|
||||
var (
|
||||
secondBuckets [60]atomic.Int64
|
||||
minuteBuckets [60]atomic.Int64
|
||||
lastSecond int64
|
||||
lastMinute int64
|
||||
// 总请求数 (复用可选)
|
||||
totalRequests atomic.Int64
|
||||
)
|
||||
|
||||
func init() {
|
||||
now := time.Now()
|
||||
lastSecond = now.Unix()
|
||||
lastMinute = now.Unix() / 60
|
||||
}
|
||||
|
||||
// AddRequest 在收到一个请求时调用,通常在请求完成后计数
|
||||
func AddRequest() {
|
||||
now := time.Now()
|
||||
sec := now.Unix()
|
||||
min := sec / 60
|
||||
|
||||
// 处理秒级 bucket
|
||||
oldSec := atomic.LoadInt64(&lastSecond)
|
||||
if sec != oldSec {
|
||||
// 跨秒:清理可能跨越多个秒的间隙
|
||||
if atomic.CompareAndSwapInt64(&lastSecond, oldSec, sec) {
|
||||
steps := int(sec - oldSec)
|
||||
if steps > 60 { steps = 60 }
|
||||
for i := 1; i <= steps; i++ {
|
||||
idx := int((oldSec+int64(i)) % 60)
|
||||
secondBuckets[idx].Store(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
secIdx := int(sec % 60)
|
||||
secondBuckets[secIdx].Add(1)
|
||||
|
||||
// 处理分钟级 bucket
|
||||
oldMin := atomic.LoadInt64(&lastMinute)
|
||||
if min != oldMin {
|
||||
if atomic.CompareAndSwapInt64(&lastMinute, oldMin, min) {
|
||||
steps := int(min - oldMin)
|
||||
if steps > 60 { steps = 60 }
|
||||
for i := 1; i <= steps; i++ {
|
||||
idx := int((oldMin+int64(i)) % 60)
|
||||
minuteBuckets[idx].Store(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
minIdx := int(min % 60)
|
||||
minuteBuckets[minIdx].Add(1)
|
||||
|
||||
totalRequests.Add(1)
|
||||
}
|
||||
|
||||
// CurrentQPS 返回最近 1 秒(当前秒)的请求数
|
||||
func CurrentQPS() int64 {
|
||||
sec := time.Now().Unix()
|
||||
if sec != atomic.LoadInt64(&lastSecond) { return 0 }
|
||||
return secondBuckets[sec%60].Load()
|
||||
}
|
||||
|
||||
// AvgQPSRecent60 返回最近 60 秒平均 QPS
|
||||
func AvgQPSRecent60() float64 {
|
||||
sec := time.Now().Unix()
|
||||
total := int64(0)
|
||||
last := atomic.LoadInt64(&lastSecond)
|
||||
for i := 0; i < 60; i++ {
|
||||
// 只统计在窗口内(未被清零)的 bucket
|
||||
bucketSec := sec - int64(i)
|
||||
if bucketSec <= last && last-bucketSec < 60 {
|
||||
idx := bucketSec % 60
|
||||
total += secondBuckets[idx].Load()
|
||||
}
|
||||
}
|
||||
return float64(total) / 60.0
|
||||
}
|
||||
|
||||
// CurrentQPM 返回当前分钟的请求数
|
||||
func CurrentQPM() int64 {
|
||||
min := time.Now().Unix() / 60
|
||||
if min != atomic.LoadInt64(&lastMinute) { return 0 }
|
||||
return minuteBuckets[min%60].Load()
|
||||
}
|
||||
|
||||
// AvgQPMRecent60 返回最近 60 分钟的平均 QPM
|
||||
func AvgQPMRecent60() float64 {
|
||||
min := time.Now().Unix() / 60
|
||||
total := int64(0)
|
||||
last := atomic.LoadInt64(&lastMinute)
|
||||
for i := 0; i < 60; i++ {
|
||||
bucketMin := min - int64(i)
|
||||
if bucketMin <= last && last-bucketMin < 60 {
|
||||
idx := bucketMin % 60
|
||||
total += minuteBuckets[idx].Load()
|
||||
}
|
||||
}
|
||||
return float64(total) / 60.0
|
||||
}
|
||||
|
||||
// TotalRequests 返回总请求量(从进程启动以来)
|
||||
func TotalRequests() int64 { return totalRequests.Load() }
|
||||
|
||||
// MetricsHandler 输出当前指标
|
||||
func MetricsHandler(c *gin.Context) {
|
||||
c.JSON(200, gin.H{
|
||||
"qps_current": CurrentQPS(),
|
||||
"qps_avg_60s": AvgQPSRecent60(),
|
||||
"qpm_current": CurrentQPM(),
|
||||
"qpm_avg_60m": AvgQPMRecent60(),
|
||||
"total": TotalRequests(),
|
||||
"timestamp": time.Now().Unix(),
|
||||
})
|
||||
}
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
|
||||
"anyproxy/internal/metrics"
|
||||
"anyproxy/internal/middleware"
|
||||
)
|
||||
|
||||
@@ -83,6 +84,9 @@ func (p *Proxy) forward(c *gin.Context, target string) {
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// 仅在真正进行了一次上游转发并得到响应后计数
|
||||
metrics.Inc()
|
||||
|
||||
mediaType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type"))
|
||||
isSSE := mediaType == "text/event-stream"
|
||||
|
||||
@@ -137,7 +141,9 @@ func (p *Proxy) forward(c *gin.Context, target string) {
|
||||
|
||||
// HelloPage 返回简单状态页面
|
||||
func HelloPage(c *gin.Context) {
|
||||
count := totalForwarded.Load()
|
||||
count := metrics.Total()
|
||||
qps := metrics.QPS()
|
||||
qpm := metrics.QPM()
|
||||
|
||||
// 推断外部可见协议与主机(支持反向代理常见头)
|
||||
scheme := "http"
|
||||
@@ -152,7 +158,7 @@ func HelloPage(c *gin.Context) {
|
||||
}
|
||||
base := scheme + "://" + host
|
||||
|
||||
str := fmt.Sprintf("AnyProxy 服务器正在运行... 已转发 %d 个请求", count)
|
||||
str := fmt.Sprintf("AnyProxy 服务器正在运行...\n累计转发(不含本页): %d\n当前QPS: %d\n最近1分钟QPM: %d", count, qps, qpm)
|
||||
str += "\n\n使用方法:\n"
|
||||
str += "方式1 - 直接协议路径: \n"
|
||||
str += fmt.Sprintf(" 目标URL: https://example.com/path --> 代理URL: %s/https/example.com/path\n", base)
|
||||
|
||||
1
main.go
1
main.go
@@ -61,6 +61,7 @@ func main() {
|
||||
r.Use(middleware.Recovery(logger), middleware.RequestID(), middleware.Logger(logger))
|
||||
|
||||
r.GET("/", proxy.HelloPage) // 欢迎页面
|
||||
r.GET("/metrics", middleware.MetricsHandler) // 指标接口
|
||||
r.Any("/proxy/*proxyPath", p.HandleProxyPath) // 处理 /proxy/*path 形式的请求
|
||||
r.Any(":protocol/*remainder", p.HandleProtocol) // 处理 /:protocol/*remainder 形式的请求
|
||||
|
||||
|
||||
Reference in New Issue
Block a user