go实现的简单压测工具

1、参数概览

依赖github.com/ddliu/go-httpclient进行http接口请求
依赖github.com/antlabs/pcurl解析curl

输入参数:

  • -c,concurrency,并发数,启动协程数
  • -n, totalNumber,单个协程发送的请求个数
  • -u,curl字符串
  • -p,如果不使用-u,可以将curl地址放在文件中,使用-p传入curl文件地址
  • -e,expectCode,期望response返回的状态码

2、核心代码

1、网络请求server/dispose.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
func init() {
// 注册verify校验器
verify.RegisterVerifyHttp(verify.GetVerifyKey("statusCode"), verify.VerifyHttpByStatusCode)
}

func Dispose(ctx context.Context, req *request.Request, concurrency, totalNumber uint64) {
ch := make(chan *response.ResponseResult, 1000)
wg := sync.WaitGroup{}
wgReceiving := sync.WaitGroup{}

wgReceiving.Add(1)

// 统计数据详情
go func() {
defer wgReceiving.Done()
statistics.HandleStatics(concurrency, ch)
}()

// 传递的-c参数,为每个协程创建-n次请求
for i := uint64(0); i < concurrency; i++ {
wg.Add(1)
chanId := i

go func() {
defer wg.Done()
serveHTTP(ctx, chanId, totalNumber, ch, req)
}()
}

wg.Wait()
time.Sleep(time.Millisecond)
close(ch)
wgReceiving.Wait()
}

// 真正发送请求的方法
// chanId 每个协程的身份Id
// ch 用于接受http接口响应数据
// req 根据curl解析出来的request结构体
func serveHTTP(ctx context.Context, chanId, totalNumber uint64, ch chan<- *response.ResponseResult, req *request.Request) {
for i := uint64(0); i < totalNumber; i++ {
if ctx.Err() != nil {
fmt.Printf("ctx.Err err: %v \n", ctx.Err())
break
}
header := make(map[string]string)
for k := range req.Header {
header[k] = req.Header.Get(k)
}

respStatusCode := constants.Success
isSucceed := false

start := time.Now()
resp, err := httpclient.Do(req.Method, req.URL.String(), header, nil)
cost := uint64(time.Since(start).Nanoseconds()) //统计耗时
if err != nil || resp == nil {
respStatusCode = constants.RequestFailed
} else {
// 校验response code与-e是否相同
respStatusCode, isSucceed = verify.GetVerify(verify.GetVerifyKey("statusCode"))(req, resp)
}

result := &response.ResponseResult{
Id: fmt.Sprintf("%d-%d", chanId, i),
ChanId: chanId,
Cost: cost,
IsSucceed: isSucceed,
StatusCode: respStatusCode,
}
// 写数据
ch <- result
}
}

2、校验器verify/verify.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type Verify func(*request.Request, *httpclient.Response) (constants.ErrCode, bool)

var (
verifyMap = make(map[string]Verify)
mutex sync.RWMutex
)

func RegisterVerifyHttp(key string, verifyFunc Verify) {
mutex.Lock()
defer mutex.Unlock()
verifyMap[key] = verifyFunc
}

// request 解析curl所得
// response http请求结果
func VerifyHttpByStatusCode(request *request.Request, response *httpclient.Response) (constants.ErrCode, bool) {
responseCode := response.StatusCode
if responseCode == request.ExpectedCode {
return constants.ErrCode(responseCode), true
}
return constants.ErrCode(responseCode), false
}

func GetVerifyKey(t string) string {
return fmt.Sprintf("http.%s", t)
}

func GetVerify(key string) Verify {
verify, ok := verifyMap[key]
if !ok {
panic("verify方法不存在")
}
return verify
}

3、解析curl request/request.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type Request struct {
*http.Request
ExpectedCode int //-e参数输入
}

func NewRequest(curl, path string, expectedCode int) (*Request, error) {
// 优先使用文件解析curl
if path != "" {
file, err := os.Open(path)
if err != nil {
fmt.Printf("open curl file %s failed, err: %+v\n", path, err)
return nil, err
}
defer file.Close()

buf, err := io.ReadAll(file)
if err != nil {
fmt.Printf("read curl file %s failed, err: %+v\n", path, err)
return nil, err
}
curl = string(buf)
}

req, err := pcurl.ParseAndRequest(curl)
if err != nil {
fmt.Printf("parse curl file %s failed, err: %+v\n", path, err)
return nil, err
}
return &Request{
Request: req,
ExpectedCode: expectedCode,
}, nil
}

4、数据统计statstics/statistics.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// HandleStatics 所有耗时变量均为纳秒
func HandleStatics(concurrency uint64, ch <-chan *response.ResponseResult) {
var (
requestCostTimeList []uint64 // 耗时数组
processingTime uint64 = 0 // processingTime 处理总耗时
requestCostTime uint64 = 0 // requestCostTime 请求总时间
maxTime uint64 = 0 // maxTime 至今为止单个请求最大耗时
minTime uint64 = 0 // minTime 至今为止单个请求最小耗时
successNum uint64 = 0
failureNum uint64 = 0
chanIdLen uint64 = 0 // chanIdLen 并发数
stopChan = make(chan bool)
mutex = sync.RWMutex{}
chanIds = make(map[int]bool)
)

startTime := uint64(time.Now().UnixNano())
respCodeMap := sync.Map{}
ticker := time.NewTicker(time.Second)
go func() {
for {
select {
case <-ticker.C:
endTime := uint64(time.Now().UnixNano())
mutex.Lock()
go calculateData(concurrency, processingTime, endTime-startTime, maxTime, minTime, successNum, failureNum, chanIdLen, &respCodeMap)
mutex.Unlock()
case <-stopChan:
return
}
}
}()
printHeader()
for respRes := range ch {
mutex.Lock()
processingTime = processingTime + respRes.Cost
if maxTime <= respRes.Cost {
maxTime = respRes.Cost
}
if minTime == 0 {
minTime = respRes.Cost
} else if minTime > respRes.Cost {
minTime = respRes.Cost
}
if respRes.IsSucceed {
successNum = successNum + 1
} else {
failureNum = failureNum + 1
}

// 统计response状态码
if value, ok := respCodeMap.Load(respRes.StatusCode); ok {
total, _ := value.(int)
respCodeMap.Store(respRes.StatusCode, total+1)
} else {
respCodeMap.Store(respRes.StatusCode, 1)
}

// 统计并发数
if _, ok := chanIds[int(respRes.ChanId)]; !ok {
chanIds[int(respRes.ChanId)] = true
chanIdLen = uint64(len(chanIds))
}
requestCostTimeList = append(requestCostTimeList, respRes.Cost)
mutex.Unlock()
}
// 数据全部接受完成,停止定时输出统计数据
stopChan <- true
endTime := uint64(time.Now().UnixNano())
requestCostTime = endTime - startTime
calculateData(concurrency, processingTime, requestCostTime, maxTime, minTime, successNum, failureNum, chanIdLen, &respCodeMap)

fmt.Printf("\n\n")
fmt.Println("************************* 结果 stat ****************************")
fmt.Println("处理协程数量:", concurrency)
fmt.Println("请求总数(并发数*请求数 -c * -n):", successNum+failureNum, "总请求时间:",
fmt.Sprintf("%.3f", float64(requestCostTime)/1e9),
"秒", "successNum:", successNum, "failureNum:", failureNum)
printTop(requestCostTimeList)
fmt.Println("************************* 结果 end ****************************")
fmt.Printf("\n\n")
}

func calculateData(concurrent, processingTime, costTime, maxTime, minTime, successNum, failureNum, chanIdLen uint64, respCodeMap *sync.Map) {
if processingTime == 0 || chanIdLen == 0 {
return
}

var qps, averageTime, maxTimeFloat, minTimeFloat, requestCostTimeFloat float64

// 平均 QPS 成功数*总协程数/总耗时 (每秒)
qps = float64(successNum*1e9*concurrent) / float64(processingTime)

// 平均耗时 总耗时/总请求数/并发数 纳秒=>毫秒
if successNum != 0 && concurrent != 0 {
averageTime = float64(processingTime) / float64(successNum*1e6)
}
maxTimeFloat = float64(maxTime) / 1e6
minTimeFloat = float64(minTime) / 1e6
requestCostTimeFloat = float64(costTime) / 1e9

result := fmt.Sprintf("%4.0fs│%7d│%7d│%7d│%8.2f│%11.2f│%11.2f│%11.2f│%v",
requestCostTimeFloat, chanIdLen, successNum, failureNum, qps, maxTimeFloat, minTimeFloat, averageTime, printMap(respCodeMap))
fmt.Println(result)
}

func printHeader() {
fmt.Printf("\n\n")
fmt.Println("─────┬───────┬───────┬───────┬────────┬───────────┬───────────┬───────────┬────────")
fmt.Println(" 耗时│ 并发数│ 成功数│ 失败数│ qps │最长耗时/ms│最短耗时/ms│平均耗时/ms│ 状态码")
fmt.Println("─────┼───────┼───────┼───────┼────────┼───────────┼───────────┼───────────┼────────")
return
}

// 打印响应状态码及数量, 如 200:5
func printMap(respCodeMap *sync.Map) (mapStr string) {
var mapArr []string

respCodeMap.Range(func(key, value interface{}) bool {
mapArr = append(mapArr, fmt.Sprintf("%v:%v", key, value))
return true
})
sort.Strings(mapArr)
mapStr = strings.Join(mapArr, ";")
return
}

// printTop 排序后计算 top 90 95 99
func printTop(requestCostTimeList []uint64) {
if len(requestCostTimeList) == 0 {
return
}
all := uint64Array{}
all = requestCostTimeList
sort.Sort(all)
fmt.Println("tp90:", fmt.Sprintf("%.3fms", float64(all[int(float64(len(all))*0.90)]/1e6)))
fmt.Println("tp95:", fmt.Sprintf("%.3fms", float64(all[int(float64(len(all))*0.95)]/1e6)))
fmt.Println("tp99:", fmt.Sprintf("%.3fms", float64(all[int(float64(len(all))*0.99)]/1e6)))
}

type uint64Array []uint64

func (array uint64Array) Len() int { return len(array) }
func (array uint64Array) Swap(i, j int) { array[i], array[j] = array[j], array[i] }
func (array uint64Array) Less(i, j int) bool { return array[i] < array[j] }

5、main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
var (
concurrency uint64 = 1 // 并发数
totalNumber uint64 = 1 // 请求个数
curl = ""
curlPath = "" // curl文件路径
expectedCode = 200
)

func init() {
flag.Uint64Var(&concurrency, "c", concurrency, "并发数")
flag.Uint64Var(&totalNumber, "n", totalNumber, "请求数(单个并发)")
flag.StringVar(&curl, "u", curl, "压测地址")
flag.StringVar(&curlPath, "p", curlPath, "curl文件地址")
flag.IntVar(&expectedCode, "e", expectedCode, "期望请求结果的状态码")

flag.Parse()
}

func main() {
runtime.GOMAXPROCS(getCPUNum())

if len(curl) == 0 && len(curlPath) == 0 {
fmt.Printf("示例: go run main.go -c 1 -n 1 -u https://www.baidu.com/ \n")
return
}

req, err := request.NewRequest(curl, curlPath, expectedCode)
if err != nil {
fmt.Println(err)
return
}
ctx := context.Background()
server.Dispose(ctx, req, concurrency, totalNumber)
}
func getCPUNum() int {
if runtime.NumCPU()/4 < 1 {
return 1
}
return runtime.NumCPU() / 4
}

3、验证猜想

  • 启动
    1
    go run main.go -c 1000 -n 5000 -p D:\go\go-demo\gostress\test-stress.curl 
  • qps、耗时等统计如下
    qps
    耗时

为验证工具统计正确性,配置prometheus进行对照
普罗米修斯监控指标
可以看到prometheus在http server端统计到的数据qps、tp99、tp90、tp95基本上是符合的,由此验证工具正确性

4、工具http接口&监控

1、server端监控代码monitor/monitor.go

1
2
3
4
5
6
7
8
9
10
11
12
13
// 统计qps
var HttpRequestCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_request_count",
Help: "http request count",
},
[]string{"endpoint", "port"},
)

var Histogram = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "histogram_showcase_metric",
Buckets: []float64{40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 210, 220, 230, 240, 250, 260, 270, 280, 290, 300}, // 根据场景需求配置bucket的范围
})

2、main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func init() {
prometheus.MustRegister(monitor.HttpRequestCount)
prometheus.MustRegister(monitor.Histogram)
}

func main() {
r := gin.Default()
r.GET("/metrics", PromHandler(promhttp.Handler()))
r.GET("/stress", func(c *gin.Context) {
start := time.Now()
c.JSON(http.StatusOK, "gin demo")
monitor.HttpRequestCount.WithLabelValues(c.Request.URL.Path, "8888").Inc()

n := rand.Intn(100)
if n >= 95 {
time.Sleep(100 * time.Millisecond)
} else {
time.Sleep(50 * time.Millisecond)
}

monitor.Histogram.Observe((float64)(time.Since(start) / time.Millisecond))
})

r.Run(":8888")
}

func PromHandler(handler http.Handler) gin.HandlerFunc {
return func(c *gin.Context) {
handler.ServeHTTP(c.Writer, c.Request)
}
}

5、prometheus与grafana在windows安装步骤

  • prometheus安装
    直接google,改配置如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    - job_name: "prometheus"

    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.

    static_configs:
    - targets: ["localhost:9090"]
    # 此次测试监控
    - job_name: "go-stress"
    static_configs:
    - targets: ["localhost:8888"]
    - job_name: "nginx"
    static_configs:
    - targets: ["localhost:8889"]
    # 监控windows
    - job_name: "windows"
    static_configs:
    - targets: ["localhost:9182"]

    启动后打开localhost:9090查看

  • grafana安装
    官网下载zip安装包,解压启动即可

  • grafana面板配置
    qps

    1
    sum(rate(http_request_duration_count{}[1m])) by (endpoint)

    TP90

    1
    histogram_quantile(0.90, rate(histogram_showcase_metric_bucket{instance="localhost:8888"}[1m]))

    TP99、TP95修改对应值即可