plow/report.go

282 lines
5.5 KiB
Go

package main
import (
"github.com/beorn7/perks/histogram"
"github.com/beorn7/perks/quantile"
"math"
"sync"
"time"
)
var quantiles = []float64{0.50, 0.75, 0.90, 0.95, 0.99, 0.999, 0.9999}
var quantilesTarget = map[float64]float64{
0.50: 0.01,
0.75: 0.01,
0.90: 0.001,
0.95: 0.001,
0.99: 0.001,
0.999: 0.0001,
0.9999: 0.00001,
}
type Stats struct {
count int64
sum float64
sumSq float64
min float64
max float64
}
func (s *Stats) Update(v float64) {
s.count++
s.sum += v
s.sumSq += v * v
if v < s.min || s.count == 1 {
s.min = v
}
if v > s.max || s.count == 1 {
s.max = v
}
}
func (s *Stats) Stddev() float64 {
num := (float64(s.count) * s.sumSq) - math.Pow(s.sum, 2)
div := float64(s.count * (s.count - 1))
if div == 0 {
return 0
}
return math.Sqrt(num / div)
}
func (s *Stats) Mean() float64 {
if s.count == 0 {
return 0
}
return s.sum / float64(s.count)
}
func (s *Stats) Reset() {
s.count = 0
s.sum = 0
s.sumSq = 0
s.min = 0
s.max = 0
}
type StreamReport struct {
lock sync.Mutex
latencyStats *Stats
rpsStats *Stats
latencyQuantile *quantile.Stream
latencyHistogram *histogram.Histogram
codes map[string]int64
errors map[string]int64
latencyWithinSec *Stats
rpsWithinSec float64
noDateWithinSec bool
readBytes int64
writeBytes int64
doneChan chan struct{}
}
func NewStreamReport() *StreamReport {
return &StreamReport{
latencyQuantile: quantile.NewTargeted(quantilesTarget),
latencyHistogram: histogram.New(8),
codes: make(map[string]int64, 1),
errors: make(map[string]int64, 1),
doneChan: make(chan struct{}, 1),
latencyStats: &Stats{},
rpsStats: &Stats{},
latencyWithinSec: &Stats{},
}
}
func (s *StreamReport) insert(v float64) {
s.latencyQuantile.Insert(v)
s.latencyHistogram.Insert(v)
s.latencyStats.Update(v)
}
func (s *StreamReport) Collect(records <-chan *ReportRecord) {
latencyWithinSecTemp := &Stats{}
go func() {
ticker := time.NewTicker(time.Second)
lastCount := int64(0)
lastTime := startTime
for {
select {
case <-ticker.C:
s.lock.Lock()
dc := s.latencyStats.count - lastCount
if dc > 0 {
rps := float64(dc) / time.Since(lastTime).Seconds()
s.rpsStats.Update(rps)
lastCount = s.latencyStats.count
lastTime = time.Now()
*s.latencyWithinSec = *latencyWithinSecTemp
s.rpsWithinSec = rps
latencyWithinSecTemp.Reset()
s.noDateWithinSec = false
} else {
s.noDateWithinSec = true
}
s.lock.Unlock()
case <-s.doneChan:
return
}
}
}()
for {
r, ok := <-records
if !ok {
close(s.doneChan)
break
}
s.lock.Lock()
latencyWithinSecTemp.Update(float64(r.cost))
s.insert(float64(r.cost))
if r.code != "" {
s.codes[r.code] ++
}
if r.error != "" {
s.errors[r.error] ++
}
s.readBytes = r.readBytes
s.writeBytes = r.writeBytes
s.lock.Unlock()
recordPool.Put(r)
}
}
type SnapshotReport struct {
Elapsed time.Duration
Count int64
Codes map[string]int64
Errors map[string]int64
RPS float64
ReadThroughput float64
WriteThroughput float64
Stats *struct {
Min time.Duration
Mean time.Duration
StdDev time.Duration
Max time.Duration
}
RpsStats *struct {
Min float64
Mean float64
StdDev float64
Max float64
}
Percentiles []*struct {
Percentile float64
Latency time.Duration
}
Histograms []*struct {
Mean time.Duration
Count int
}
}
func (s *StreamReport) Snapshot() *SnapshotReport {
s.lock.Lock()
rs := &SnapshotReport{
Elapsed: time.Since(startTime),
Count: s.latencyStats.count,
Stats: &struct {
Min time.Duration
Mean time.Duration
StdDev time.Duration
Max time.Duration
}{time.Duration(s.latencyStats.min), time.Duration(s.latencyStats.Mean()),
time.Duration(s.latencyStats.Stddev()), time.Duration(s.latencyStats.max)},
}
if s.rpsStats.count > 0 {
rs.RpsStats = &struct {
Min float64
Mean float64
StdDev float64
Max float64
}{s.rpsStats.min, s.rpsStats.Mean(),
s.rpsStats.Stddev(), s.rpsStats.max}
}
elapseInSec := rs.Elapsed.Seconds()
rs.RPS = float64(rs.Count) / elapseInSec
rs.ReadThroughput = float64(s.readBytes) / 1024.0 / 1024.0 / elapseInSec
rs.WriteThroughput = float64(s.writeBytes) / 1024.0 / 1024.0 / elapseInSec
rs.Codes = make(map[string]int64, len(s.codes))
for k, v := range s.codes {
rs.Codes[k] = v
}
rs.Errors = make(map[string]int64, len(s.errors))
for k, v := range s.errors {
rs.Errors[k] = v
}
rs.Percentiles = make([]*struct {
Percentile float64
Latency time.Duration
}, len(quantiles))
for i, p := range quantiles {
rs.Percentiles[i] = &struct {
Percentile float64
Latency time.Duration
}{p, time.Duration(s.latencyQuantile.Query(p))}
}
hisBins := s.latencyHistogram.Bins()
rs.Histograms = make([]*struct {
Mean time.Duration
Count int
}, len(hisBins))
for i, b := range hisBins {
rs.Histograms[i] = &struct {
Mean time.Duration
Count int
}{time.Duration(b.Mean()), b.Count}
}
s.lock.Unlock()
return rs
}
func (s *StreamReport) Done() <-chan struct{} {
return s.doneChan
}
type ChartsReport struct {
RPS float64
Latency Stats
}
func (s *StreamReport) Charts() *ChartsReport {
s.lock.Lock()
var cr *ChartsReport
if s.noDateWithinSec {
cr = nil
} else {
cr = &ChartsReport{
RPS: s.rpsWithinSec,
Latency: *s.latencyWithinSec,
}
}
s.lock.Unlock()
return cr
}