fix incorrect charts while rps<1

update readme.md
optimize timeout interruption
This commit is contained in:
six-ddc 2021-06-18 23:26:03 +08:00
parent 08afb6e761
commit 25ded301fa
5 changed files with 194 additions and 95 deletions

110
README.md
View File

@ -1,5 +1,113 @@
# plow # plow
A high-performance HTTP benchmarking tool with real-time web UI and terminal displaying
Plow is a HTTP(S) benchmarking tool, written in Golang. It uses excellent [fasthttp](https://github.com/valyala/fasthttp#http-client-comparison-with-nethttp) instead of Go's default net/http due to its lightning fast performance.
Plow runs at a specified connections(`-c`) concurrently and **real-time** records a summary statistics, histogram of execution time and calculates percentiles to display on Web UI and terminal. It can run for a set duration(`-d`), for a fixed number of requests(`-n`), or until Ctrl-C interrupted.
The implementation of real-time computing Histograms and Quantiles using stream-based algorithms inspired by [prometheus](https://github.com/prometheus/client_golang) with low memory and CPU bounds. so it's almost no additional performance overhead for benchmarking.
![](https://github.com/six-ddc/plow/blob/main/demo.gif?raw=true) ![](https://github.com/six-ddc/plow/blob/main/demo.gif?raw=true)
```text
./plow http://127.0.0.1:8080/hello -c 20
Benchmarking http://127.0.0.1:8080/hello using 20 connection(s).
> Real-time charts is listening on http://127.0.0.1:18888/
Summary:
Elapsed 8.6s
Count 969657
2xx 776392
4xx 193265
RPS 112741.713
Reads 10.192MB/s
Writes 6.774MB/s
Statistics Min Mean StdDev Max
Latency 32µs 176µs 37µs 1.839ms
RPS 108558.4 112818.12 2456.63 115949.98
Latency Percentile:
P50 P75 P90 P95 P99 P99.9 P99.99
173µs 198µs 222µs 238µs 274µs 352µs 498µs
Latency Histogram:
141µs 273028 ■■■■■■■■■■■■■■■■■■■■■■■■
177µs 458955 ■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
209µs 204717 ■■■■■■■■■■■■■■■■■■
235µs 26146 ■■
269µs 6029 ■
320µs 721
403µs 58
524µs 3
```
## Installation
Binary and image distributions are available through the [releases](https://github.com/six-ddc/plow/releases) assets page.
### Via Go
```bash
go get github.com/six-ddc/plow
```
### Via Homebrew
*Coming soon*
## Usage
### Options
```bash
usage: plow [<flags>] <url>
A high-performance HTTP benchmarking tool with real-time web UI and terminal displaying
Example:
plow http://127.0.0.1:8080/ -c 20 -n 100000
plow https://httpbin.org/post -c 20 -d 5m --body @file.json -T 'application/json' -m POST
Flags:
--help Show context-sensitive help.
-c, --concurrency=1 Number of connections to run concurrently
-n, --requests=-1 Number of requests to run
-d, --duration=DURATION Duration of test, examples: -d 10s -d 3m
-i, --interval=200ms Print snapshot result every interval, use 0 to print once at the end
--seconds Use seconds as time unit to print
--body=BODY HTTP request body, if start the body with @, the rest should be a filename to read
--stream Specify whether to stream file specified by '--body @file' using chunked encoding or to read into memory
-m, --method="GET" HTTP method
-H, --header=K:V ... Custom HTTP headers
--host=HOST Host header
-T, --content=CONTENT Content-Type header
--listen=":18888" Listen addr to serve Web UI
--link="127.0.0.1:18888" Link addr used for show Web html and request backend server
--timeout=DURATION Timeout for each http request
--dial-timeout=DURATION Timeout for dial addr
--req-timeout=DURATION Timeout for full request writing
--resp-timeout=DURATION Timeout for full response reading
--socks5=ip:port Socks5 proxy
--version Show application version.
Args:
<url> request url
```
### Examples
Basic usage:
```bash
plow http://127.0.0.1:8080/ -c 20 -n 10000 -d 10s
```
POST a json file:
```bash
plow http://127.0.0.1:8080/ -c 20 --body @file.json -T 'application/json' -m POST
```
### License
See [LICENSE](https://github.com/six-ddc/plow/blob/master/LICENSE).

View File

@ -12,27 +12,10 @@ import (
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"net" "net"
"strings" "strings"
"sync"
"text/template" "text/template"
"time" "time"
) )
func init() {
templates.PageTpl = `
{{- define "page" }}
<!DOCTYPE html>
<html>
{{- template "header" . }}
<body>
<p align="center">🚀 <a href="https://github.com/six-ddc/plow"><b>plow</b></a> <em>is a high-performance HTTP benchmarking tool with real-time web UI and terminal displaying</em></p>
<style> .box { justify-content:center; display:flex; flex-wrap:wrap } </style>
<div class="box"> {{- range .Charts }} {{ template "base" . }} {{- end }} </div>
</body>
</html>
{{ end }}
`
}
var ( var (
assertsPath = "/echarts/statics/" assertsPath = "/echarts/statics/"
apiPath = "/data" apiPath = "/data"
@ -43,7 +26,7 @@ var (
) )
const ( const (
DefaultTemplate string = ` ViewTpl = `
$(function () { setInterval({{ .ViewID }}_sync, {{ .Interval }}); }); $(function () { setInterval({{ .ViewID }}_sync, {{ .Interval }}); });
function {{ .ViewID }}_sync() { function {{ .ViewID }}_sync() {
$.ajax({ $.ajax({
@ -64,10 +47,23 @@ function {{ .ViewID }}_sync() {
} }
}); });
}` }`
PageTpl = `
{{- define "page" }}
<!DOCTYPE html>
<html>
{{- template "header" . }}
<body>
<p align="center">🚀 <a href="https://github.com/six-ddc/plow"><b>Plow</b></a> %s</p>
<style> .box { justify-content:center; display:flex; flex-wrap:wrap } </style>
<div class="box"> {{- range .Charts }} {{ template "base" . }} {{- end }} </div>
</body>
</html>
{{ end }}
`
) )
func (c *Charts) genViewTemplate(vid, route string) string { func (c *Charts) genViewTemplate(vid, route string) string {
tpl, err := template.New("view").Parse(DefaultTemplate) tpl, err := template.New("view").Parse(ViewTpl)
if err != nil { if err != nil {
panic("failed to parse template " + err.Error()) panic("failed to parse template " + err.Error())
} }
@ -88,7 +84,7 @@ func (c *Charts) genViewTemplate(vid, route string) string {
buf := bytes.Buffer{} buf := bytes.Buffer{}
if err := tpl.Execute(&buf, d); err != nil { if err := tpl.Execute(&buf, d); err != nil {
panic("statsview: failed to execute template " + err.Error()) panic("failed to execute template " + err.Error())
} }
return buf.String() return buf.String()
@ -137,7 +133,7 @@ func (c *Charts) newRPSView() components.Charter {
} }
type Metrics struct { type Metrics struct {
Values []float64 `json:"values"` Values []interface{} `json:"values"`
Time string `json:"time"` Time string `json:"time"`
} }
@ -146,12 +142,11 @@ type Charts struct {
linkAddr string linkAddr string
page *components.Page page *components.Page
ln net.Listener ln net.Listener
lock sync.Mutex
reportData ChartsReport
dataFunc func() *ChartsReport dataFunc func() *ChartsReport
} }
func NewCharts(listenAddr string, linkAddr string, dataFunc func() *ChartsReport) (*Charts, error) { func NewCharts(listenAddr string, linkAddr string, dataFunc func() *ChartsReport, desc string) (*Charts, error) {
templates.PageTpl = fmt.Sprintf(PageTpl, desc)
ln, err := net.Listen("tcp4", listenAddr) ln, err := net.Listen("tcp4", listenAddr)
if err != nil { if err != nil {
return nil, err return nil, err
@ -180,17 +175,24 @@ func (c *Charts) Handler(ctx *fasthttp.RequestCtx) {
default: default:
if strings.HasPrefix(path, apiPath) { if strings.HasPrefix(path, apiPath) {
view := path[len(apiPath)+1:] view := path[len(apiPath)+1:]
var values []float64 var values []interface{}
c.lock.Lock() reportData := c.dataFunc()
switch view { switch view {
case latencyView: case latencyView:
values = append(values, c.dataFunc().Latency.min/1e6) if reportData != nil {
values = append(values, c.dataFunc().Latency.Mean()/1e6) values = append(values, reportData.Latency.min/1e6)
values = append(values, c.dataFunc().Latency.max/1e6) values = append(values, reportData.Latency.Mean()/1e6)
case rpsView: values = append(values, reportData.Latency.max/1e6)
values = append(values, c.dataFunc().RPS) } else {
values = append(values, nil, nil, nil)
}
case rpsView:
if reportData != nil {
values = append(values, reportData.RPS)
} else {
values = append(values, nil)
}
} }
c.lock.Unlock()
metrics := &Metrics{ metrics := &Metrics{
Time: time.Now().Format(timeFormat), Time: time.Now().Format(timeFormat),
Values: values, Values: values,
@ -203,17 +205,6 @@ func (c *Charts) Handler(ctx *fasthttp.RequestCtx) {
} }
func (c *Charts) Serve() { func (c *Charts) Serve() {
go func() {
ticker := time.NewTicker(refreshInterval)
for {
select {
case <-ticker.C:
c.lock.Lock()
c.reportData = *c.dataFunc()
c.lock.Unlock()
}
}
}()
server := fasthttp.Server{ server := fasthttp.Server{
Handler: cors.DefaultHandler().CorsMiddleware(c.Handler), Handler: cors.DefaultHandler().CorsMiddleware(c.Handler),
} }

18
main.go
View File

@ -2,10 +2,11 @@ package main
import ( import (
"fmt" "fmt"
"gopkg.in/alecthomas/kingpin.v3-unstable"
"io/ioutil" "io/ioutil"
"os" "os"
"strings" "strings"
"gopkg.in/alecthomas/kingpin.v3-unstable"
) )
var ( var (
@ -22,7 +23,7 @@ var (
host = kingpin.Flag("host", "Host header").String() host = kingpin.Flag("host", "Host header").String()
contentType = kingpin.Flag("content", "Content-Type header").Short('T').String() contentType = kingpin.Flag("content", "Content-Type header").Short('T').String()
chartsListenAddr = kingpin.Flag("listen", "Listen addr to serve Web UI").Default("127.0.0.1:18888").String() chartsListenAddr = kingpin.Flag("listen", "Listen addr to serve Web UI").Default(":18888").String()
chartsLinkAddr = kingpin.Flag("link", "Link addr used for show Web html and request backend server").Default("127.0.0.1:18888").String() chartsLinkAddr = kingpin.Flag("link", "Link addr used for show Web html and request backend server").Default("127.0.0.1:18888").String()
timeout = kingpin.Flag("timeout", "Timeout for each http request").PlaceHolder("DURATION").Duration() timeout = kingpin.Flag("timeout", "Timeout for each http request").PlaceHolder("DURATION").Duration()
dialTimeout = kingpin.Flag("dial-timeout", "Timeout for dial addr").PlaceHolder("DURATION").Duration() dialTimeout = kingpin.Flag("dial-timeout", "Timeout for dial addr").PlaceHolder("DURATION").Duration()
@ -91,14 +92,17 @@ Example:
host: *host, host: *host,
} }
fmt.Printf("Benchmarking %s", *url) var desc string
desc = fmt.Sprintf("Benchmarking %s", *url)
if *requests > 0 { if *requests > 0 {
fmt.Printf(" with %d request(s)", *requests) desc += fmt.Sprintf(" with %d request(s)", *requests)
} }
if *duration > 0 { if *duration > 0 {
fmt.Printf(" for %s", duration.String()) desc += fmt.Sprintf(" for %s", duration.String())
} }
fmt.Printf(" using %d connection(s)\n", *concurrency) desc += fmt.Sprintf(" using %d connection(s).", *concurrency)
fmt.Println(desc)
if *chartsListenAddr != "" { if *chartsListenAddr != "" {
fmt.Printf("> Real-time charts is listening on http://%s/\n", *chartsLinkAddr) fmt.Printf("> Real-time charts is listening on http://%s/\n", *chartsLinkAddr)
} }
@ -115,7 +119,7 @@ Example:
go report.Collect(requester.RecordChan()) go report.Collect(requester.RecordChan())
if *chartsListenAddr != "" { if *chartsListenAddr != "" {
charts, err := NewCharts(*chartsListenAddr, *chartsLinkAddr, report.Charts) charts, err := NewCharts(*chartsListenAddr, *chartsLinkAddr, report.Charts, desc)
if err != nil { if err != nil {
errAndExit(err.Error()) errAndExit(err.Error())
return return

View File

@ -76,6 +76,7 @@ type StreamReport struct {
latencyWithinSec *Stats latencyWithinSec *Stats
rpsWithinSec float64 rpsWithinSec float64
noDateWithinSec bool
readBytes int64 readBytes int64
writeBytes int64 writeBytes int64
@ -131,6 +132,9 @@ func (s *StreamReport) Collect(records <-chan *ReportRecord) {
*s.latencyWithinSec = *latencyWithinSecTemp *s.latencyWithinSec = *latencyWithinSecTemp
s.rpsWithinSec = rps s.rpsWithinSec = rps
latencyWithinSecTemp.Reset() latencyWithinSecTemp.Reset()
s.noDateWithinSec = false
} else {
s.noDateWithinSec = true
} }
s.lock.Unlock() s.lock.Unlock()
case <-s.doneChan: case <-s.doneChan:
@ -271,10 +275,15 @@ type ChartsReport struct {
func (s *StreamReport) Charts() *ChartsReport { func (s *StreamReport) Charts() *ChartsReport {
s.lock.Lock() s.lock.Lock()
cr := &ChartsReport{ var cr *ChartsReport
if s.noDateWithinSec {
cr = nil
} else {
cr = &ChartsReport{
RPS: s.rpsWithinSec, RPS: s.rpsWithinSec,
Latency: *s.latencyWithinSec, Latency: *s.latencyWithinSec,
} }
}
s.lock.Unlock() s.lock.Unlock()
return cr return cr
} }

View File

@ -8,8 +8,6 @@ import (
"go.uber.org/automaxprocs/maxprocs" "go.uber.org/automaxprocs/maxprocs"
"io/ioutil" "io/ioutil"
"net" "net"
"net/http"
_ "net/http/pprof"
url2 "net/url" url2 "net/url"
"os" "os"
"os/signal" "os/signal"
@ -23,6 +21,7 @@ import (
var ( var (
startTime = time.Now() startTime = time.Now()
sendOnCloseError interface{}
) )
type ReportRecord struct { type ReportRecord struct {
@ -38,10 +37,16 @@ var recordPool = sync.Pool{
} }
func init() { func init() {
go func() { // Honoring env GOMAXPROCS
http.ListenAndServe("0.0.0.0:6060", nil)
}()
_, _ = maxprocs.Set() _, _ = maxprocs.Set()
defer func() {
sendOnCloseError = recover()
}()
func() {
cc := make(chan struct{}, 1)
close(cc)
cc <- struct{}{}
}()
} }
type MyConn struct { type MyConn struct {
@ -91,6 +96,7 @@ type Requester struct {
httpHeader *fasthttp.RequestHeader httpHeader *fasthttp.RequestHeader
recordChan chan *ReportRecord recordChan chan *ReportRecord
closeOnce sync.Once
report *StreamReport report *StreamReport
errCount int64 errCount int64
wg sync.WaitGroup wg sync.WaitGroup
@ -206,37 +212,10 @@ func (r *Requester) RecordChan() <-chan *ReportRecord {
return r.recordChan return r.recordChan
} }
func getErrorType(err error) string { func (r *Requester) closeRecord() {
switch err { r.closeOnce.Do(func() {
case fasthttp.ErrTimeout: close(r.recordChan)
return "Timeout" })
case fasthttp.ErrNoFreeConns:
return "NoFreeConns"
case fasthttp.ErrConnectionClosed:
return "ConnClosed"
case fasthttp.ErrDialTimeout:
return "DialTimeout"
default:
if opErr, ok := err.(*net.OpError); ok {
err = opErr.Err
}
switch t := err.(type) {
case *net.DNSError:
return "DNS"
case *os.SyscallError:
if errno, ok := t.Err.(syscall.Errno); ok {
switch errno {
case syscall.ECONNREFUSED:
return "ConnRefused"
case syscall.ETIMEDOUT:
return "Timeout"
case syscall.EADDRNOTAVAIL:
return "AddrNotAvail"
}
}
}
}
return "Unknown"
} }
func (r *Requester) DoRequest(req *fasthttp.Request, resp *fasthttp.Response, rr *ReportRecord) { func (r *Requester) DoRequest(req *fasthttp.Request, resp *fasthttp.Response, rr *ReportRecord) {
@ -291,20 +270,28 @@ func (r *Requester) Run() {
r.cancel = cancelFunc r.cancel = cancelFunc
go func() { go func() {
<-sigs <-sigs
r.closeRecord()
cancelFunc() cancelFunc()
}() }()
startTime = time.Now()
if r.duration > 0 { if r.duration > 0 {
time.AfterFunc(r.duration, func() { time.AfterFunc(r.duration, func() {
r.closeRecord()
cancelFunc() cancelFunc()
}) })
} }
startTime = time.Now()
semaphore := r.requests semaphore := r.requests
for i := 0; i < r.concurrency; i++ { for i := 0; i < r.concurrency; i++ {
r.wg.Add(1) r.wg.Add(1)
go func() { go func() {
defer r.wg.Done() defer func() {
r.wg.Done()
v := recover()
if v != nil && v != sendOnCloseError {
panic(v)
}
}()
req := &fasthttp.Request{} req := &fasthttp.Request{}
resp := &fasthttp.Response{} resp := &fasthttp.Response{}
r.httpHeader.CopyTo(&req.Header) r.httpHeader.CopyTo(&req.Header)
@ -351,5 +338,5 @@ func (r *Requester) Run() {
} }
r.wg.Wait() r.wg.Wait()
close(r.recordChan) r.closeRecord()
} }