-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinformer.go
364 lines (331 loc) · 11.7 KB
/
informer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
// Package informer helps you manage your informers/watches and gracefully start and stop them
// It checks health for them, and restrict only 1 informer is running for 1 resource for each crd
package informer
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
"truefoundry/elasti/operator/internal/prom"
"github.com/truefoundry/elasti/pkg/values"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
kRuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/dynamic"
ctrl "sigs.k8s.io/controller-runtime"
)
const (
// TODO: Move to configMap
resolverNamespace = "elasti"
resolverDeploymentName = "elasti-resolver"
resolverServiceName = "elasti-resolver-service"
resolverPort = 8012
)
type (
// Manager helps manage lifecycle of informer
Manager struct {
client *kubernetes.Clientset
dynamicClient *dynamic.DynamicClient
logger *zap.Logger
informers sync.Map
resolver info
resyncPeriod time.Duration
healthCheckDuration time.Duration
healthCheckStopChan chan struct{}
}
info struct {
Informer cache.SharedInformer
StopCh chan struct{}
Req *RequestWatch
}
// RequestWatch is the request body sent to the informer
RequestWatch struct {
Req ctrl.Request
ResourceName string
ResourceNamespace string
GroupVersionResource *schema.GroupVersionResource
Handlers cache.ResourceEventHandlerFuncs
}
)
// NewInformerManager creates a new instance of the Informer Manager
func NewInformerManager(logger *zap.Logger, kConfig *rest.Config) *Manager {
clientSet, err := kubernetes.NewForConfig(kConfig)
if err != nil {
logger.Fatal("Error connecting with kubernetes", zap.Error(err))
}
dynamicClient, err := dynamic.NewForConfig(kConfig)
if err != nil {
logger.Fatal("Error connecting with kubernetes", zap.Error(err))
}
return &Manager{
client: clientSet,
dynamicClient: dynamicClient,
logger: logger.Named("InformerManager"),
// ResyncPeriod is the proactive resync we do, even when no events are received by the informer.
resyncPeriod: 5 * time.Minute,
healthCheckDuration: 5 * time.Second,
healthCheckStopChan: make(chan struct{}),
}
}
func (m *Manager) InitializeResolverInformer(handlers cache.ResourceEventHandlerFuncs) error {
deploymentGVR := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
}
m.resolver.Informer = cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: func(_ metav1.ListOptions) (kRuntime.Object, error) {
return m.dynamicClient.Resource(deploymentGVR).Namespace(resolverNamespace).List(context.Background(), metav1.ListOptions{
FieldSelector: "metadata.name=" + resolverDeploymentName,
})
},
WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) {
return m.dynamicClient.Resource(deploymentGVR).Namespace(resolverNamespace).Watch(context.Background(), metav1.ListOptions{
FieldSelector: "metadata.name=" + resolverDeploymentName,
})
},
},
&unstructured.Unstructured{},
m.resyncPeriod,
)
_, err := m.resolver.Informer.AddEventHandler(handlers)
if err != nil {
m.logger.Error("Failed to add event handler", zap.Error(err))
return fmt.Errorf("failed to add event handler: %w", err)
}
m.resolver.StopCh = make(chan struct{})
go m.resolver.Informer.Run(m.resolver.StopCh)
if !cache.WaitForCacheSync(m.resolver.StopCh, m.resolver.Informer.HasSynced) {
m.logger.Error("Failed to sync informer", zap.String("key", m.getKeyFromRequestWatch(m.resolver.Req)))
return errors.New("failed to sync resolver informer")
}
m.logger.Info("Resolver informer started")
return nil
}
// Start is to initiate a health check on all the running informers
// It uses HasSynced if a informer is not synced, if not, it restarts it
func (m *Manager) Start() {
m.logger.Info("Starting InformerManager")
go wait.Until(m.monitorInformers, m.healthCheckDuration, m.healthCheckStopChan)
}
// Stop is to close all the active informers and close the health monitor
func (m *Manager) Stop() {
m.logger.Info("Stopping InformerManager")
// Loop through all the informers and stop them
m.informers.Range(func(_, value interface{}) bool {
info, ok := value.(info)
if ok {
err := m.StopInformer(m.getKeyFromRequestWatch(info.Req))
if err != nil {
m.logger.Error("failed to stop informer", zap.Error(err))
}
}
return true
})
// Stop the health watch
close(m.healthCheckStopChan)
m.logger.Info("InformerManager stopped")
}
// StopForCRD is to close all the active informers for a perticular CRD
func (m *Manager) StopForCRD(crdName string) {
// Loop through all the informers and stop them
var wg sync.WaitGroup
m.informers.Range(func(key, value interface{}) bool {
wg.Add(1)
go func() {
defer wg.Done()
// Check if key starts with the crdName
if key.(string)[:len(crdName)] == crdName {
info, ok := value.(info)
if ok {
if err := m.StopInformer(m.getKeyFromRequestWatch(info.Req)); err != nil {
m.logger.Error("Failed to stop informer", zap.Error(err))
}
m.logger.Info("Stopped informer", zap.String("key", key.(string)))
}
}
}()
return true
})
wg.Wait()
}
// StopInformer is to stop a informer for a resource
// It closes the shared informer for it and deletes it from the map
func (m *Manager) StopInformer(key string) (err error) {
defer func() {
errStr := values.Success
if err != nil {
errStr = err.Error()
}
prom.InformerCounter.WithLabelValues(key, "stop", errStr).Inc()
}()
value, ok := m.informers.Load(key)
if !ok {
return fmt.Errorf("informer not found, already stopped for key: %s", key)
}
// We need to verify if the informer exists in the map
informerInfo, ok := value.(info)
if !ok {
return fmt.Errorf("failed to cast WatchInfo for key: %s", key)
}
// Close the informer, delete it from the map
close(informerInfo.StopCh)
m.informers.Delete(key)
prom.InformerGauge.WithLabelValues(key).Dec()
return nil
}
func (m *Manager) monitorInformers() {
m.informers.Range(func(key, value interface{}) bool {
info, ok := value.(info)
if ok {
if !info.Informer.HasSynced() {
m.logger.Info("Informer not synced", zap.String("key", key.(string)))
err := m.StopInformer(m.getKeyFromRequestWatch(info.Req))
if err != nil {
m.logger.Error("Error in stopping informer", zap.Error(err))
}
err = m.enableInformer(info.Req)
if err != nil {
m.logger.Error("Error in enabling informer", zap.Error(err))
}
}
}
return true
})
}
// WatchDeployment is to add a watch on a deployment
func (m *Manager) WatchDeployment(req ctrl.Request, deploymentName, namespace string, handlers cache.ResourceEventHandlerFuncs) error {
request := &RequestWatch{
Req: req,
ResourceName: deploymentName,
ResourceNamespace: namespace,
GroupVersionResource: &schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
},
Handlers: handlers,
}
return m.Add(request)
}
// Add is to add a watch on a resource
func (m *Manager) Add(req *RequestWatch) (err error) {
key := m.getKeyFromRequestWatch(req)
defer func() {
errStr := values.Success
if err != nil {
errStr = err.Error()
}
prom.InformerCounter.WithLabelValues(key, "add", errStr).Inc()
}()
m.logger.Info("Adding informer",
zap.String("group", req.GroupVersionResource.Group),
zap.String("version", req.GroupVersionResource.Version),
zap.String("resource", req.GroupVersionResource.Resource),
zap.String("resourceName", req.ResourceName),
zap.String("resourceNamespace", req.ResourceNamespace),
zap.String("crd", req.Req.String()),
)
// Proceed only if the informer is not already running, we verify by checking the map
if _, ok := m.informers.Load(key); ok {
m.logger.Info("Informer already running", zap.String("key", key))
return nil
}
//TODO: Check if the resource exists
if err = m.verifyTargetExist(req); err != nil {
return fmt.Errorf("target not found: %w", err)
}
if err = m.enableInformer(req); err != nil {
return fmt.Errorf("failed to enable to informer: %w", err)
}
prom.InformerGauge.WithLabelValues(key).Inc()
return nil
}
// enableInformer is to enable the informer for a resource
func (m *Manager) enableInformer(req *RequestWatch) error {
ctx := context.Background()
// Create an informer for the resource
informer := cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: func(_ metav1.ListOptions) (kRuntime.Object, error) {
return m.dynamicClient.Resource(*req.GroupVersionResource).Namespace(req.ResourceNamespace).List(ctx, metav1.ListOptions{
FieldSelector: "metadata.name=" + req.ResourceName,
})
},
WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) {
return m.dynamicClient.Resource(*req.GroupVersionResource).Namespace(req.ResourceNamespace).Watch(ctx, metav1.ListOptions{
FieldSelector: "metadata.name=" + req.ResourceName,
})
},
},
&unstructured.Unstructured{},
m.resyncPeriod,
)
// We pass the handlers we received as a parameter
_, err := informer.AddEventHandler(req.Handlers)
if err != nil {
m.logger.Error("Error creating informer handler", zap.Error(err))
return fmt.Errorf("enableInformer: %w", err)
}
// This channel is used to stop the informer
// We add it in the informers map, so we can stop it when required
informerStop := make(chan struct{})
go informer.Run(informerStop)
// Store the informer in the map
// This is used to manage the lifecycle of the informer
// Recover it in case it's not syncing, this is why we also store the handlers
// Stop it when the CRD or the operator is deleted
key := m.getKeyFromRequestWatch(req)
m.informers.Store(key, info{
Informer: informer,
StopCh: informerStop,
Req: req,
})
// Wait for the cache to syncß
if !cache.WaitForCacheSync(informerStop, informer.HasSynced) {
m.logger.Error("Failed to sync informer", zap.String("key", key))
return errors.New("failed to sync informer")
}
m.logger.Info("Informer started", zap.String("key", key))
return nil
}
// getKeyFromRequestWatch is to get the key for the informer map using namespace and resource name from the request
// CRDname.resourcerName.Namespace
func (m *Manager) getKeyFromRequestWatch(req *RequestWatch) string {
return fmt.Sprintf("%s/%s/%s/%s",
strings.ToLower(req.Req.Name), // CRD Name
strings.ToLower(req.ResourceNamespace), // Namespace
strings.ToLower(req.GroupVersionResource.Resource), // Resource Type
strings.ToLower(req.ResourceName)) // Resource Name
}
type KeyParams struct {
Namespace string
CRDName string
Resource string
ResourceName string
}
// GetKey is to get the key for the informer map using namespace and resource name
func (m *Manager) GetKey(param KeyParams) string {
return fmt.Sprintf("%s/%s/%s/%s",
strings.ToLower(param.CRDName), // CRD Name
strings.ToLower(param.Namespace), // Namespace
strings.ToLower(param.Resource), // Resource Type
strings.ToLower(param.ResourceName)) // Resource Name
}
// verifyTargetExist is to verify if the target resource exists
func (m *Manager) verifyTargetExist(req *RequestWatch) error {
if _, err := m.dynamicClient.Resource(*req.GroupVersionResource).Namespace(req.ResourceNamespace).Get(context.Background(), req.ResourceName, metav1.GetOptions{}); err != nil {
return fmt.Errorf("resource doesn't exist: %w | resource name: %v | resource type: %v | resource namespace: %v", err, req.ResourceName, req.GroupVersionResource.Resource, req.ResourceNamespace)
}
return nil
}