Skip to content

Commit

Permalink
Update dependency on pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
pmorie committed Feb 21, 2020
1 parent 0140d2e commit 9850f48
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 78 deletions.
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

259 changes: 183 additions & 76 deletions vendor/knative.dev/pkg/injection/sharedmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,120 +93,113 @@ func GetLoggingConfig(ctx context.Context) (*logging.Config, error) {
return logging.NewConfigFromConfigMap(loggingConfigMap)
}

// Main runs the generic main flow for non-webhook controllers with a new
// context. Use WebhookMainWith* if you need to serve webhooks.
func Main(component string, ctors ...injection.ControllerConstructor) {
// Set up signals so we handle the first shutdown signal gracefully.
MainWithContext(signals.NewContext(), component, ctors...)
}

// MainWithContext runs the generic main flow for non-webhook controllers. Use
// WebhookMainWithContext if you need to serve webhooks.
func MainWithContext(ctx context.Context, component string, ctors ...injection.ControllerConstructor) {
var (
masterURL = flag.String("master", "",
"The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "",
"Path to a kubeconfig. Only required if out-of-cluster.")
)
flag.Parse()

cfg, err := GetConfig(*masterURL, *kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err)
}
MainWithConfig(ctx, component, cfg, ctors...)
MainWithConfig(ctx, component, ParseAndGetConfigOrDie(), ctors...)
}

// MainWithConfig runs the generic main flow for non-webhook controllers. Use
// WebhookMainWithConfig if you need to serve webhooks.
func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, ctors ...injection.ControllerConstructor) {
log.Printf("Registering %d clients", len(injection.Default.GetClients()))
log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories()))
log.Printf("Registering %d informers", len(injection.Default.GetInformers()))
log.Printf("Registering %d controllers", len(ctors))

// Report stats on Go memory usage every 30 seconds.
msp := metrics.NewMemStatsAll()
msp.Start(ctx, 30*time.Second)

if err := view.Register(msp.DefaultViews()...); err != nil {
log.Fatalf("Error exporting go memstats view: %v", err)
}
MemStatsOrDie(ctx)

// Adjust our client's rate limits based on the number of controller's we are running.
// Adjust our client's rate limits based on the number of controllers we are running.
cfg.QPS = float32(len(ctors)) * rest.DefaultQPS
cfg.Burst = len(ctors) * rest.DefaultBurst

ctx, informers := injection.Default.SetupInformers(ctx, cfg)

// Set up our logger.
loggingConfig, err := GetLoggingConfig(ctx)
if err != nil {
log.Fatalf("Error reading/parsing logging configuration: %v", err)
}
logger, atomicLevel := logging.NewLoggerFromConfig(loggingConfig, component)
logger, atomicLevel := SetupLoggerOrDie(ctx, component)
defer flush(logger)
ctx = logging.WithLogger(ctx, logger)
profilingHandler := profiling.NewHandler(logger, false)

// Obtain K8s clientset.
kc := kubeclient.Get(ctx)
if err := version.CheckMinimumVersion(kc.Discovery()); err != nil {
logger.Fatalw("Version check failed", zap.Error(err))
}
CheckK8sClientMinimumVersionOrDie(ctx, logger)
cmw := SetupConfigMapWatchOrDie(ctx, logger)
controllers, _ := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...)
WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component)
WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component)

// Create ConfigMaps watcher with optional label-based filter.
var cmLabelReqs []labels.Requirement
if cmLabel := system.ResourceLabel(); cmLabel != "" {
req, err := configmap.FilterConfigByLabelExists(cmLabel)
if err != nil {
logger.With(zap.Error(err)).Fatalf("Failed to generate requirement for label %q")
}
logger.Infof("Setting up ConfigMap watcher with label selector %q", req)
cmLabelReqs = append(cmLabelReqs, *req)
logger.Info("Starting configuration manager...")
if err := cmw.Start(ctx.Done()); err != nil {
logger.Fatalw("Failed to start configuration manager", zap.Error(err))
}
// TODO(mattmoor): This should itself take a context and be injection-based.
cmw := configmap.NewInformedWatcher(kc, system.Namespace(), cmLabelReqs...)
logger.Info("Starting informers...")
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
logger.Fatalw("Failed to start informers", zap.Error(err))
}
logger.Info("Starting controllers...")
go controller.StartAll(ctx.Done(), controllers...)

// Based on the reconcilers we have linked, build up the set of controllers to run.
controllers := make([]*controller.Impl, 0, len(ctors))
webhooks := make([]interface{}, 0)
for _, cf := range ctors {
ctrl := cf(ctx, cmw)
controllers = append(controllers, ctrl)
profilingServer := profiling.NewServer(profilingHandler)
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(profilingServer.ListenAndServe)

// Build a list of any reconcilers that implement webhook.AdmissionController
switch c := ctrl.Reconciler.(type) {
case webhook.AdmissionController, webhook.ConversionController:
webhooks = append(webhooks, c)
}
// This will block until either a signal arrives or one of the grouped functions
// returns an error.
<-egCtx.Done()

profilingServer.Shutdown(context.Background())
// Don't forward ErrServerClosed as that indicates we're already shutting down.
if err := eg.Wait(); err != nil && err != http.ErrServerClosed {
logger.Errorw("Error while running server", zap.Error(err))
}
}

profilingHandler := profiling.NewHandler(logger, false)
// WebhookMainWithContext runs the generic main flow for controllers and
// webhooks. Use MainWithContext if you do not need to serve webhooks.
func WebhookMainWithContext(ctx context.Context, component string, ctors ...injection.ControllerConstructor) {
WebhookMainWithConfig(ctx, component, ParseAndGetConfigOrDie(), ctors...)
}

// Watch the logging config map and dynamically update logging levels.
if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(logging.ConfigMapName(),
metav1.GetOptions{}); err == nil {
cmw.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, component))
} else if !apierrors.IsNotFound(err) {
logger.With(zap.Error(err)).Fatalf("Error reading ConfigMap %q", logging.ConfigMapName())
}
// WebhookMainWithConfig runs the generic main flow for controllers and webhooks
// with the given config. Use MainWithConfig if you do not need to serve
// webhooks.
func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Config, ctors ...injection.ControllerConstructor) {
log.Printf("Registering %d clients", len(injection.Default.GetClients()))
log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories()))
log.Printf("Registering %d informers", len(injection.Default.GetInformers()))
log.Printf("Registering %d controllers", len(ctors))

// Watch the observability config map
if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(metrics.ConfigMapName(),
metav1.GetOptions{}); err == nil {
cmw.Watch(metrics.ConfigMapName(),
metrics.UpdateExporterFromConfigMap(component, logger),
profilingHandler.UpdateFromConfigMap)
} else if !apierrors.IsNotFound(err) {
logger.With(zap.Error(err)).Fatalf("Error reading ConfigMap %q", metrics.ConfigMapName())
}
MemStatsOrDie(ctx)

// Adjust our client's rate limits based on the number of controllers we are running.
cfg.QPS = float32(len(ctors)) * rest.DefaultQPS
cfg.Burst = len(ctors) * rest.DefaultBurst
ctx, informers := injection.Default.SetupInformers(ctx, cfg)

logger, atomicLevel := SetupLoggerOrDie(ctx, component)
defer flush(logger)
ctx = logging.WithLogger(ctx, logger)
profilingHandler := profiling.NewHandler(logger, false)

CheckK8sClientMinimumVersionOrDie(ctx, logger)
cmw := SetupConfigMapWatchOrDie(ctx, logger)
controllers, webhooks := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...)
WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component)
WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component)

logger.Info("Starting configuration manager...")
if err := cmw.Start(ctx.Done()); err != nil {
logger.Fatalw("Failed to start configuration manager", zap.Error(err))
}

// Start all of the informers and wait for them to sync.
logger.Info("Starting informers.")
logger.Info("Starting informers...")
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
logger.Fatalw("Failed to start informers", zap.Error(err))
}

// Start all of the controllers.
logger.Info("Starting controllers...")
go controller.StartAll(ctx.Done(), controllers...)

Expand Down Expand Up @@ -245,3 +238,117 @@ func flush(logger *zap.SugaredLogger) {
logger.Sync()
metrics.FlushExporter()
}

// ParseAndGetConfigOrDie parses the rest config flags and creates a client or
// dies by calling log.Fatalf.
func ParseAndGetConfigOrDie() *rest.Config {
var (
masterURL = flag.String("master", "",
"The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "",
"Path to a kubeconfig. Only required if out-of-cluster.")
)
flag.Parse()

cfg, err := GetConfig(*masterURL, *kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err)
}

return cfg
}

// MemStatsOrDie sets up reporting on Go memory usage every 30 seconds or dies
// by calling log.Fatalf.
func MemStatsOrDie(ctx context.Context) {
msp := metrics.NewMemStatsAll()
msp.Start(ctx, 30*time.Second)

if err := view.Register(msp.DefaultViews()...); err != nil {
log.Fatalf("Error exporting go memstats view: %v", err)
}
}

// SetupLoggerOrDie sets up the logger using the config from the given context
// and returns a logger and atomic level, or dies by calling log.Fatalf.
func SetupLoggerOrDie(ctx context.Context, component string) (*zap.SugaredLogger, zap.AtomicLevel) {
loggingConfig, err := GetLoggingConfig(ctx)
if err != nil {
log.Fatalf("Error reading/parsing logging configuration: %v", err)
}
return logging.NewLoggerFromConfig(loggingConfig, component)
}

// CheckK8sClientMinimumVersionOrDie checks that the hosting Kubernetes cluster
// is at least the minimum allowable version or dies by calling log.Fatalf.
func CheckK8sClientMinimumVersionOrDie(ctx context.Context, logger *zap.SugaredLogger) {
kc := kubeclient.Get(ctx)
if err := version.CheckMinimumVersion(kc.Discovery()); err != nil {
logger.Fatalw("Version check failed", zap.Error(err))
}
}

// SetupConfigMapWatchOrDie establishes a watch of the configmaps in the system
// namespace that are labeled to be watched or dies by calling log.Fatalf.
func SetupConfigMapWatchOrDie(ctx context.Context, logger *zap.SugaredLogger) *configmap.InformedWatcher {
kc := kubeclient.Get(ctx)
// Create ConfigMaps watcher with optional label-based filter.
var cmLabelReqs []labels.Requirement
if cmLabel := system.ResourceLabel(); cmLabel != "" {
req, err := configmap.FilterConfigByLabelExists(cmLabel)
if err != nil {
logger.With(zap.Error(err)).Fatalf("Failed to generate requirement for label %q")
}
logger.Infof("Setting up ConfigMap watcher with label selector %q", req)
cmLabelReqs = append(cmLabelReqs, *req)
}
// TODO(mattmoor): This should itself take a context and be injection-based.
return configmap.NewInformedWatcher(kc, system.Namespace(), cmLabelReqs...)
}

// WatchLoggingConfigOrDie establishes a watch of the logging config or dies by
// calling log.Fatalf. Note, if the config does not exist, it will be defaulted
// and this method will not die.
func WatchLoggingConfigOrDie(ctx context.Context, cmw *configmap.InformedWatcher, logger *zap.SugaredLogger, atomicLevel zap.AtomicLevel, component string) {
if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(logging.ConfigMapName(),
metav1.GetOptions{}); err == nil {
cmw.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, component))
} else if !apierrors.IsNotFound(err) {
logger.With(zap.Error(err)).Fatalf("Error reading ConfigMap %q", logging.ConfigMapName())
}
}

// WatchObservabilityConfigOrDie establishes a watch of the logging config or
// dies by calling log.Fatalf. Note, if the config does not exist, it will be
// defaulted and this method will not die.
func WatchObservabilityConfigOrDie(ctx context.Context, cmw *configmap.InformedWatcher, profilingHandler *profiling.Handler, logger *zap.SugaredLogger, component string) {
if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(metrics.ConfigMapName(),
metav1.GetOptions{}); err == nil {
cmw.Watch(metrics.ConfigMapName(),
metrics.UpdateExporterFromConfigMap(component, logger),
profilingHandler.UpdateFromConfigMap)
} else if !apierrors.IsNotFound(err) {
logger.With(zap.Error(err)).Fatalf("Error reading ConfigMap %q", metrics.ConfigMapName())
}
}

// ControllersAndWebhooksFromCtors returns a list of the controllers and a list
// of the webhooks created from the given constructors.
func ControllersAndWebhooksFromCtors(ctx context.Context,
cmw *configmap.InformedWatcher,
ctors ...injection.ControllerConstructor) ([]*controller.Impl, []interface{}) {
controllers := make([]*controller.Impl, 0, len(ctors))
webhooks := make([]interface{}, 0)
for _, cf := range ctors {
ctrl := cf(ctx, cmw)
controllers = append(controllers, ctrl)

// Build a list of any reconcilers that implement webhook.AdmissionController
switch c := ctrl.Reconciler.(type) {
case webhook.AdmissionController, webhook.ConversionController:
webhooks = append(webhooks, c)
}
}

return controllers, webhooks
}

0 comments on commit 9850f48

Please sign in to comment.