This repository was archived by the owner on Dec 5, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcollector_streaming.go
128 lines (111 loc) · 3.3 KB
/
collector_streaming.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package ftdc
import (
"io"
"github.com/pkg/errors"
)
type streamingCollector struct {
output io.Writer
maxSamples int
count int
Collector
}
// NewStreamingCollector wraps the underlying collector, writing the
// data to the underlying writer after the underlying collector is
// filled. This is similar to the batch collector, but allows the
// collector to drop FTDC data from memory. Chunks are flushed to disk
// when the collector as collected the "maxSamples" number of
// samples during the Add operation.
func NewStreamingCollector(maxSamples int, writer io.Writer) Collector {
return newStreamingCollector(maxSamples, writer)
}
func newStreamingCollector(maxSamples int, writer io.Writer) *streamingCollector {
return &streamingCollector{
maxSamples: maxSamples,
output: writer,
Collector: &betterCollector{
maxDeltas: maxSamples,
},
}
}
func (c *streamingCollector) Reset() { c.count = 0; c.Collector.Reset() }
func (c *streamingCollector) Add(in interface{}) error {
if c.count-1 >= c.maxSamples {
if err := FlushCollector(c, c.output); err != nil {
return errors.Wrap(err, "problem flushing collector contents")
}
}
if err := c.Collector.Add(in); err != nil {
return errors.Wrapf(err, "adding sample #%d", c.count+1)
}
c.count++
return nil
}
// FlushCollector writes the contents of a collector out to an
// io.Writer. This is useful in the context of any collector, but is
// particularly useful in the context of streaming collectors, which
// flush data periodically and may have cached data.
func FlushCollector(c Collector, writer io.Writer) error {
if writer == nil {
return errors.New("invalid writer")
}
if c.Info().SampleCount == 0 {
return nil
}
payload, err := c.Resolve()
if err != nil {
return errors.WithStack(err)
}
n, err := writer.Write(payload)
if err != nil {
return errors.WithStack(err)
}
if n != len(payload) {
return errors.New("problem flushing data")
}
c.Reset()
return nil
}
type streamingDynamicCollector struct {
output io.Writer
hash string
metricCount int
*streamingCollector
}
// NewStreamingDynamicCollector has the same semantics as the dynamic
// collector but wraps the streaming collector rather than the batch
// collector. Chunks are flushed during the Add() operation when the
// schema changes or the chunk is full.
func NewStreamingDynamicCollector(max int, writer io.Writer) Collector {
return &streamingDynamicCollector{
output: writer,
streamingCollector: newStreamingCollector(max, writer),
}
}
func (c *streamingDynamicCollector) Reset() {
c.streamingCollector = newStreamingCollector(c.streamingCollector.maxSamples, c.output)
c.metricCount = 0
c.hash = ""
}
func (c *streamingDynamicCollector) Add(in interface{}) error {
doc, err := readDocument(in)
if err != nil {
return errors.WithStack(err)
}
docHash, num := metricKeyHash(doc)
if c.hash == "" {
c.hash = docHash
c.metricCount = num
if c.streamingCollector.count > 0 {
if err := FlushCollector(c, c.output); err != nil {
return errors.WithStack(err)
}
}
return errors.WithStack(c.streamingCollector.Add(doc))
}
if c.metricCount != num || c.hash != docHash {
if err := FlushCollector(c, c.output); err != nil {
return errors.WithStack(err)
}
}
return errors.WithStack(c.streamingCollector.Add(doc))
}