Skip to content

Commit f29c7a9

Browse files
feat: improve healtcheck implementation (#111)
1 parent 7770345 commit f29c7a9

File tree

2 files changed

+319
-33
lines changed

2 files changed

+319
-33
lines changed

couchbase/healthcheck.go

+67-33
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package couchbase
22

33
import (
4+
"context"
5+
"sync"
46
"time"
57

6-
"github.com/Trendyol/go-dcp/logger"
7-
88
"github.com/Trendyol/go-dcp/config"
9+
"github.com/Trendyol/go-dcp/logger"
910
)
1011

1112
type HealthCheck interface {
@@ -14,47 +15,80 @@ type HealthCheck interface {
1415
}
1516

1617
type healthCheck struct {
17-
config *config.HealthCheck
18-
client Client
19-
running bool
18+
config *config.HealthCheck
19+
client Client
20+
cancelFunc context.CancelFunc
21+
wg sync.WaitGroup
22+
startOnce sync.Once
23+
stopOnce sync.Once
2024
}
2125

22-
func (h *healthCheck) Start() {
23-
h.running = true
26+
func NewHealthCheck(config *config.HealthCheck, client Client) HealthCheck {
27+
return &healthCheck{
28+
config: config,
29+
client: client,
30+
}
31+
}
2432

25-
go func() {
26-
for h.running {
27-
time.Sleep(h.config.Interval)
33+
func (h *healthCheck) Start() {
34+
h.startOnce.Do(func() {
35+
ctx, cancel := context.WithCancel(context.Background())
36+
h.cancelFunc = cancel
37+
h.wg.Add(1)
38+
go h.run(ctx)
39+
})
40+
}
2841

29-
retry := 5
42+
func (h *healthCheck) Stop() {
43+
h.stopOnce.Do(func() {
44+
if h.cancelFunc != nil {
45+
h.cancelFunc()
46+
}
47+
h.wg.Wait()
48+
})
49+
}
3050

31-
for {
32-
_, err := h.client.Ping()
33-
if err == nil {
34-
break
35-
} else {
36-
logger.Log.Warn("cannot health check, err: %v", err)
37-
}
51+
func (h *healthCheck) run(ctx context.Context) {
52+
defer h.wg.Done()
3853

39-
retry--
40-
if retry == 0 {
41-
logger.Log.Error("error while health check: %v", err)
42-
panic(err)
43-
}
54+
ticker := time.NewTicker(h.config.Interval)
55+
defer ticker.Stop()
4456

45-
time.Sleep(time.Second)
46-
}
57+
for {
58+
select {
59+
case <-ctx.Done():
60+
logger.Log.Info("Health check stopped.")
61+
return
62+
case <-ticker.C:
63+
h.performHealthCheck(ctx)
4764
}
48-
}()
65+
}
4966
}
5067

51-
func (h *healthCheck) Stop() {
52-
h.running = false
53-
}
68+
func (h *healthCheck) performHealthCheck(ctx context.Context) {
69+
const maxRetries = 5
70+
retryInterval := time.Second
5471

55-
func NewHealthCheck(config *config.HealthCheck, client Client) HealthCheck {
56-
return &healthCheck{
57-
config: config,
58-
client: client,
72+
for attempt := 1; attempt <= maxRetries; attempt++ {
73+
_, err := h.client.Ping()
74+
if err == nil {
75+
logger.Log.Trace("Health check success")
76+
return
77+
}
78+
79+
logger.Log.Warn("Health check attempt %d/%d failed: %v", attempt, maxRetries, err)
80+
81+
if attempt < maxRetries {
82+
select {
83+
case <-ctx.Done():
84+
logger.Log.Info("Health check canceled during retry.")
85+
return
86+
case <-time.After(retryInterval):
87+
// Retry after waiting.
88+
}
89+
} else {
90+
logger.Log.Error("Health check failed after %d attempts: %v", maxRetries, err)
91+
panic(err)
92+
}
5993
}
6094
}

couchbase/healthcheck_test.go

+252
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
package couchbase
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
"github.com/Trendyol/go-dcp/config"
10+
"github.com/Trendyol/go-dcp/logger"
11+
"github.com/Trendyol/go-dcp/models"
12+
"github.com/Trendyol/go-dcp/wrapper"
13+
"github.com/couchbase/gocbcore/v10"
14+
)
15+
16+
func init() {
17+
logger.InitDefaultLogger("info")
18+
}
19+
20+
func TestHealthCheck_Start_Stop(t *testing.T) {
21+
// Arrange
22+
cfg := &config.HealthCheck{
23+
Interval: 100 * time.Millisecond,
24+
}
25+
26+
pingCh := make(chan struct{}, 3)
27+
28+
mc := &mockClient{}
29+
mc.PingFunc = func() (*models.PingResult, error) {
30+
mc.PingCallCount++
31+
32+
// Non-blocking send to avoid deadlocks if channel is full
33+
select {
34+
case pingCh <- struct{}{}:
35+
default:
36+
}
37+
38+
return &models.PingResult{}, nil
39+
}
40+
41+
sut := NewHealthCheck(cfg, mc)
42+
43+
// Act
44+
sut.Start()
45+
46+
expectedPings := 3
47+
48+
// Assert
49+
50+
// Use a timeout to prevent the test from hanging indefinitely
51+
timeout := time.After(2 * time.Second)
52+
53+
for i := 0; i < expectedPings; i++ {
54+
select {
55+
case <-pingCh:
56+
// Ping was called, continue to wait for the next one
57+
case <-timeout:
58+
// Timeout occurred before receiving all expected Ping calls
59+
sut.Stop()
60+
t.Fatalf("Timed out waiting for %d Ping calls, only received %d", expectedPings, i)
61+
}
62+
}
63+
64+
sut.Stop()
65+
66+
// Assert
67+
if mc.PingCallCount < expectedPings {
68+
t.Fatalf("Ping should have been called at least %d times, but it was called %d times", expectedPings, mc.PingCallCount)
69+
}
70+
}
71+
72+
func TestHealthCheck_PingFailure(t *testing.T) {
73+
// Arrange
74+
cfg := &config.HealthCheck{
75+
Interval: 100 * time.Millisecond,
76+
}
77+
78+
mc := &mockClient{}
79+
80+
pingCh := make(chan struct{}, 3)
81+
82+
// Define the behavior for Ping: fail the first two times, then succeed.
83+
mc.PingFunc = func() (*models.PingResult, error) {
84+
mc.PingCallCount++
85+
86+
select {
87+
case pingCh <- struct{}{}:
88+
default:
89+
}
90+
91+
if mc.PingCallCount <= 2 {
92+
return nil, errors.New("ping failed")
93+
}
94+
return &models.PingResult{}, nil
95+
}
96+
97+
sut := NewHealthCheck(cfg, mc)
98+
99+
// Act
100+
sut.Start()
101+
102+
expectedPings := 3
103+
104+
// Use a timeout to prevent the test from hanging indefinitely
105+
timeout := time.After(10 * time.Second)
106+
107+
// Wait for the expected number of Ping calls
108+
for i := 0; i < expectedPings; i++ {
109+
select {
110+
case <-pingCh:
111+
// Ping was called, continue to wait for the next one
112+
case <-timeout:
113+
// Timeout occurred before receiving all expected Ping calls
114+
sut.Stop()
115+
t.Fatalf("Timed out waiting for %d Ping calls, only received %d", expectedPings, i)
116+
}
117+
}
118+
119+
// Stop the health check after receiving the expected number of Ping calls
120+
sut.Stop()
121+
122+
// Assert
123+
if mc.PingCallCount < expectedPings {
124+
t.Fatalf("Ping should have been called at least %d times, but it was called %d times", expectedPings, mc.PingCallCount)
125+
}
126+
}
127+
128+
func TestHealthCheck_PanicFailure(t *testing.T) {
129+
// Arrange
130+
cfg := &config.HealthCheck{
131+
Interval: 100 * time.Millisecond,
132+
}
133+
134+
mc := &mockClient{}
135+
136+
pingCh := make(chan struct{}, 5)
137+
138+
mc.PingFunc = func() (*models.PingResult, error) {
139+
mc.PingCallCount++
140+
pingCh <- struct{}{}
141+
return nil, errors.New("ping failed")
142+
}
143+
144+
sut := healthCheck{
145+
config: cfg,
146+
client: mc,
147+
}
148+
149+
// Act
150+
sut.wg.Add(1)
151+
go func() {
152+
defer func() {
153+
if r := recover(); r == nil {
154+
t.Error("test should be panic!")
155+
}
156+
}()
157+
158+
sut.run(context.Background())
159+
}()
160+
161+
// Assert
162+
expectedPings := 5
163+
timeout := time.After(10 * time.Second)
164+
165+
for i := 0; i < expectedPings; i++ {
166+
select {
167+
case <-pingCh:
168+
case <-timeout:
169+
t.Fatalf("Timed out waiting for Ping call %d", i+1)
170+
}
171+
}
172+
173+
if mc.PingCallCount < expectedPings {
174+
t.Fatalf("Ping should have been called at least %d times, but it was called %d times", expectedPings, mc.PingCallCount)
175+
}
176+
}
177+
178+
type mockClient struct {
179+
PingFunc func() (*models.PingResult, error)
180+
PingCallCount int
181+
}
182+
183+
var _ Client = (*mockClient)(nil)
184+
185+
func (m *mockClient) Ping() (*models.PingResult, error) {
186+
if m.PingFunc != nil {
187+
return m.PingFunc()
188+
}
189+
190+
m.PingCallCount++
191+
return &models.PingResult{}, nil
192+
}
193+
194+
func (m *mockClient) GetAgent() *gocbcore.Agent {
195+
panic("implement me")
196+
}
197+
198+
func (m *mockClient) GetMetaAgent() *gocbcore.Agent {
199+
panic("implement me")
200+
}
201+
202+
func (m *mockClient) Connect() error {
203+
panic("implement me")
204+
}
205+
206+
func (m *mockClient) Close() {
207+
panic("implement me")
208+
}
209+
210+
func (m *mockClient) DcpConnect(useExpiryOpcode bool, useChangeStreams bool) error {
211+
panic("implement me")
212+
}
213+
214+
func (m *mockClient) DcpClose() {
215+
panic("implement me")
216+
}
217+
218+
func (m *mockClient) GetVBucketSeqNos(awareCollection bool) (*wrapper.ConcurrentSwissMap[uint16, uint64], error) {
219+
panic("implement me")
220+
}
221+
222+
func (m *mockClient) GetNumVBuckets() int {
223+
panic("implement me")
224+
}
225+
226+
func (m *mockClient) GetFailOverLogs(vbID uint16) ([]gocbcore.FailoverEntry, error) {
227+
panic("implement me")
228+
}
229+
230+
func (m *mockClient) OpenStream(vbID uint16, collectionIDs map[uint32]string, offset *models.Offset, observer Observer) error {
231+
panic("implement me")
232+
}
233+
234+
func (m *mockClient) CloseStream(vbID uint16) error {
235+
panic("implement me")
236+
}
237+
238+
func (m *mockClient) GetCollectionIDs(scopeName string, collectionNames []string) map[uint32]string {
239+
panic("implement me")
240+
}
241+
242+
func (m *mockClient) GetAgentConfigSnapshot() (*gocbcore.ConfigSnapshot, error) {
243+
panic("implement me")
244+
}
245+
246+
func (m *mockClient) GetDcpAgentConfigSnapshot() (*gocbcore.ConfigSnapshot, error) {
247+
panic("implement me")
248+
}
249+
250+
func (m *mockClient) GetAgentQueues() []*models.AgentQueue {
251+
panic("implement me")
252+
}

0 commit comments

Comments
 (0)