Skip to content

Commit 36ee24f

Browse files
authored
fix: improve error handling and increase timeouts in GetCollectionIDs (#114)
1 parent 47b146f commit 36ee24f

File tree

3 files changed

+20
-10
lines changed

3 files changed

+20
-10
lines changed

couchbase/client.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type Client interface {
4242
GetFailOverLogs(vbID uint16) ([]gocbcore.FailoverEntry, error)
4343
OpenStream(vbID uint16, collectionIDs map[uint32]string, offset *models.Offset, observer Observer) error
4444
CloseStream(vbID uint16) error
45-
GetCollectionIDs(scopeName string, collectionNames []string) map[uint32]string
45+
GetCollectionIDs(scopeName string, collectionNames []string) (map[uint32]string, error)
4646
GetAgentConfigSnapshot() (*gocbcore.ConfigSnapshot, error)
4747
GetDcpAgentConfigSnapshot() (*gocbcore.ConfigSnapshot, error)
4848
GetAgentQueues() []*models.AgentQueue
@@ -469,6 +469,7 @@ func (s *client) DcpClose() {
469469
logger.Log.Info("dcp connection closed %s", s.config.Hosts)
470470
}
471471

472+
//nolint:funlen
472473
func (s *client) GetVBucketSeqNos(awareCollection bool) (*wrapper.ConcurrentSwissMap[uint16, uint64], error) {
473474
snapshot, err := s.GetDcpAgentConfigSnapshot()
474475
if err != nil {
@@ -486,7 +487,10 @@ func (s *client) GetVBucketSeqNos(awareCollection bool) (*wrapper.ConcurrentSwis
486487

487488
hasCollectionSupport := awareCollection && s.dcpAgent.HasCollectionsSupport()
488489

489-
cIds := s.GetCollectionIDs(s.config.ScopeName, s.config.CollectionNames)
490+
cIds, err := s.GetCollectionIDs(s.config.ScopeName, s.config.CollectionNames)
491+
if err != nil {
492+
return nil, err
493+
}
490494
collectionIDs := make([]uint32, 0, len(cIds))
491495
for collectionID := range cIds {
492496
collectionIDs = append(collectionIDs, collectionID)
@@ -759,7 +763,7 @@ func (s *client) getCollectionID(ctx context.Context, scopeName string, collecti
759763
scopeName,
760764
collectionName,
761765
gocbcore.GetCollectionIDOptions{
762-
Deadline: time.Now().Add(time.Second * 5),
766+
Deadline: time.Now().Add(time.Second * 30),
763767
RetryStrategy: gocbcore.NewBestEffortRetryStrategy(nil),
764768
},
765769
func(result *gocbcore.GetCollectionIDResult, err error) {
@@ -780,8 +784,8 @@ func (s *client) getCollectionID(ctx context.Context, scopeName string, collecti
780784
return collectionID, <-ch
781785
}
782786

783-
func (s *client) GetCollectionIDs(scopeName string, collectionNames []string) map[uint32]string {
784-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
787+
func (s *client) GetCollectionIDs(scopeName string, collectionNames []string) (map[uint32]string, error) {
788+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
785789
defer cancel()
786790

787791
collectionIDs := map[uint32]string{}
@@ -791,14 +795,14 @@ func (s *client) GetCollectionIDs(scopeName string, collectionNames []string) ma
791795
collectionID, err := s.getCollectionID(ctx, scopeName, collectionName)
792796
if err != nil {
793797
logger.Log.Error("error while get collection ids, err: %v", err)
794-
panic(err)
798+
return nil, err
795799
}
796800

797801
collectionIDs[collectionID] = collectionName
798802
}
799803
}
800804

801-
return collectionIDs
805+
return collectionIDs, nil
802806
}
803807

804808
func NewClient(config *config.Dcp) Client {

couchbase/healthcheck_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func (m *mockClient) CloseStream(vbID uint16) error {
235235
panic("implement me")
236236
}
237237

238-
func (m *mockClient) GetCollectionIDs(scopeName string, collectionNames []string) map[uint32]string {
238+
func (m *mockClient) GetCollectionIDs(scopeName string, collectionNames []string) (map[uint32]string, error) {
239239
panic("implement me")
240240
}
241241

dcp.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,15 @@ func (s *dcp) Start() {
117117

118118
tc := tracing.NewTracerComponent()
119119

120+
collectionIDs, err := s.client.GetCollectionIDs(s.config.ScopeName, s.config.CollectionNames)
121+
if err != nil {
122+
logger.Log.Error("error while getting vBucket seqNos, err: %v", err)
123+
panic(err)
124+
}
125+
120126
s.stream = stream.NewStream(
121127
s.client, s.metadata, s.config, s.version, s.bucketInfo, s.vBucketDiscovery,
122-
s.listener, s.client.GetCollectionIDs(s.config.ScopeName, s.config.CollectionNames), s.stopCh, s.bus, s.eventHandler,
128+
s.listener, collectionIDs, s.stopCh, s.bus, s.eventHandler,
123129
tc,
124130
)
125131

@@ -147,7 +153,7 @@ func (s *dcp) Start() {
147153

148154
s.stream.Open()
149155

150-
err := s.bus.SubscribeAsync(helpers.MembershipChangedBusEventName, s.membershipChangedListener, true)
156+
err = s.bus.SubscribeAsync(helpers.MembershipChangedBusEventName, s.membershipChangedListener, true)
151157
if err != nil {
152158
logger.Log.Error("error while subscribe to membership changed event, err: %v", err)
153159
panic(err)

0 commit comments

Comments
 (0)