Skip to content

Commit 22cbe6f

Browse files
authoredJul 4, 2024··
zerome: retry until nothing is found (#10)
* zerome: retry until nothing is found Signed-off-by: Seena Fallah <seenafallah@gmail.com> * ci: update golangci-lint to 1.59.1 Signed-off-by: Seena Fallah <seenafallah@gmail.com> --------- Signed-off-by: Seena Fallah <seenafallah@gmail.com>
1 parent 994e673 commit 22cbe6f

File tree

3 files changed

+37
-18
lines changed

3 files changed

+37
-18
lines changed
 

‎.github/workflows/go.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,6 @@ jobs:
6464
- name: Golangci-lint
6565
uses: golangci/golangci-lint-action@v4
6666
with:
67-
version: v1.59.0
67+
version: v1.59.1
6868
args: --timeout 3m
6969
skip-pkg-cache: true

‎internal/zerome/zerome.go

+24-12
Original file line numberDiff line numberDiff line change
@@ -21,32 +21,42 @@ func New(metrics []Metric) *Client {
2121
}
2222
}
2323

24-
func (c *Client) Run(ctx context.Context, wg *sync.WaitGroup) {
24+
func (c *Client) Run(ctx context.Context, outerWG *sync.WaitGroup) {
2525
for _, metric := range c.metrics {
26-
go func(metric Metric, wg *sync.WaitGroup) {
27-
defer wg.Done()
26+
go func(metric Metric, outerWG *sync.WaitGroup) {
27+
defer outerWG.Done()
28+
29+
var wg sync.WaitGroup
2830

2931
for {
3032
select {
3133
case <-ctx.Done():
34+
wg.Wait()
35+
3236
return
3337
case <-time.After(metric.Interval):
34-
err := c.ZeroMe(ctx, metric)
35-
if err != nil {
36-
slog.ErrorContext(ctx, "Failed to zero metric", "metric", metric.Name, "error", err)
37-
}
38+
wg.Add(1)
39+
40+
go func() {
41+
defer wg.Done()
42+
43+
err := c.ZeroMe(ctx, time.Now(), metric)
44+
if err != nil {
45+
slog.ErrorContext(ctx, "Failed to zero metric", "metric", metric.Name, "error", err)
46+
}
47+
}()
3848
}
3949
}
40-
}(metric, wg)
50+
}(metric, outerWG)
4151
}
4252
}
4353

44-
func (c *Client) ZeroMe(ctx context.Context, metric Metric) error {
54+
func (c *Client) ZeroMe(ctx context.Context, nowTime time.Time, metric Metric) error {
4555
// Query twice the interval to ensure that the metric has a missing data point in the past.
4656
queryInterval := metric.Interval * 2 //nolint:gomnd,mnd
4757

4858
// Add query interval as a delay to cover exporter scrape failures.
49-
ts := time.Now().Add(-queryInterval)
59+
ts := nowTime.Add(-queryInterval)
5060

5161
vector, err := metric.querier.Query(ctx, ts, metric.Name, queryInterval, metric.UpLabels)
5262
if err != nil {
@@ -67,9 +77,11 @@ func (c *Client) ZeroMe(ctx context.Context, metric Metric) error {
6777
return err
6878
}
6979

70-
slog.InfoContext(ctx, "Zeroed metric", "metric", metric.Name, "vector", vector)
80+
for _, s := range vector {
81+
slog.InfoContext(ctx, "Zeroed sample", "metric", metric.Name, "sample", s.String())
82+
}
7183

72-
return nil
84+
return c.ZeroMe(ctx, nowTime, metric) // Retry until nothing to zero
7385
}
7486

7587
func (c *Client) zeroTimeSeries(metric Metric, vector model.Vector) []prompb.TimeSeries {

‎internal/zerome/zerome_test.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,14 @@ func TestZeroMe(t *testing.T) {
7474
// setup querier mock
7575
mockQuerier := mock.NewMockAPI(ctrl)
7676
querier.SetV1API(mockQuerier)
77-
mockQuerier.EXPECT().Query(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(queryResult, nil, nil)
77+
mockQuerier.EXPECT().Query(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
78+
func(_, _, _ any, _ ...any) (model.Vector, *v1.Warnings, error) {
79+
r := queryResult
80+
queryResult = model.Vector{} // return empty result on next call so it will exit the loop
81+
82+
return r, nil, nil
83+
},
84+
).Times(2)
7885

7986
// setup writer mock
8087
{
@@ -91,7 +98,7 @@ func TestZeroMe(t *testing.T) {
9198

9299
client := New([]Metric{metric})
93100

94-
err := client.ZeroMe(context.Background(), metric)
101+
err := client.ZeroMe(context.Background(), nowTime, metric)
95102
require.NoError(t, err)
96103
}
97104

@@ -126,7 +133,7 @@ func TestZeroMe_QueryError(t *testing.T) {
126133

127134
client := New([]Metric{metric})
128135

129-
err := client.ZeroMe(context.Background(), metric)
136+
err := client.ZeroMe(context.Background(), time.Now(), metric)
130137
require.ErrorIs(t, err, queryErr)
131138
}
132139

@@ -160,7 +167,7 @@ func TestZeroMe_EmptyResult(t *testing.T) {
160167

161168
client := New([]Metric{metric})
162169

163-
err := client.ZeroMe(context.Background(), metric)
170+
err := client.ZeroMe(context.Background(), time.Now(), metric)
164171
require.NoError(t, err)
165172
}
166173

@@ -207,7 +214,7 @@ func TestZeroMe_WriteError(t *testing.T) {
207214

208215
client := New([]Metric{metric})
209216

210-
err := client.ZeroMe(context.Background(), metric)
217+
err := client.ZeroMe(context.Background(), nowTime, metric)
211218
require.ErrorIs(t, err, writeErr)
212219
}
213220

0 commit comments

Comments
 (0)
Please sign in to comment.