From 0140d2e9bf4d30631bc4b878928108a9f92a8aaf Mon Sep 17 00:00:00 2001 From: Paul Morie Date: Thu, 6 Feb 2020 18:27:27 -0500 Subject: [PATCH 1/2] Move webhook onto sharedmain.WebHookMain --- cmd/webhook/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index 5ab906e54169..5f437109b641 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -216,7 +216,7 @@ func main() { SecretName: "webhook-certs", }) - sharedmain.MainWithContext(ctx, "webhook", + sharedmain.WebhookMainWithContext(ctx, "webhook", certificates.NewController, NewDefaultingAdmissionController, NewValidationAdmissionController, From 9850f48d8e3916ec1389ecb2d743ca6ea34eeb4a Mon Sep 17 00:00:00 2001 From: Paul Morie Date: Fri, 21 Feb 2020 10:31:49 -0500 Subject: [PATCH 2/2] Update dependency on pkg --- Gopkg.lock | 4 +- .../pkg/injection/sharedmain/main.go | 259 +++++++++++++----- 2 files changed, 185 insertions(+), 78 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 0cad78dc4bde..52dd535d9969 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1810,7 +1810,7 @@ [[projects]] branch = "master" - digest = "1:f44b87ebe4aefb51530107b1d09dce6b81fc35c45117e4025f1dbcb33315a4a8" + digest = "1:0ef5900b861d1a580ec02438c991f7b241efa83151bf2d29db3055ff390a2b1f" name = "knative.dev/pkg" packages = [ "apis", @@ -1917,7 +1917,7 @@ "websocket", ] pruneopts = "T" - revision = "d9a38f13e8b9aa736f714b793ee28788de1b30e0" + revision = "1d60dd6107fb4348f28e2a7b7f35e7d4299b33c6" [[projects]] branch = "master" diff --git a/vendor/knative.dev/pkg/injection/sharedmain/main.go b/vendor/knative.dev/pkg/injection/sharedmain/main.go index 0e97b1988acc..e25a066507ba 100644 --- a/vendor/knative.dev/pkg/injection/sharedmain/main.go +++ b/vendor/knative.dev/pkg/injection/sharedmain/main.go @@ -93,120 +93,113 @@ func GetLoggingConfig(ctx context.Context) (*logging.Config, error) { return logging.NewConfigFromConfigMap(loggingConfigMap) } +// Main runs the generic main flow for non-webhook controllers with a new +// context. Use WebhookMainWith* if you need to serve webhooks. func Main(component string, ctors ...injection.ControllerConstructor) { // Set up signals so we handle the first shutdown signal gracefully. MainWithContext(signals.NewContext(), component, ctors...) } +// MainWithContext runs the generic main flow for non-webhook controllers. Use +// WebhookMainWithContext if you need to serve webhooks. func MainWithContext(ctx context.Context, component string, ctors ...injection.ControllerConstructor) { - var ( - masterURL = flag.String("master", "", - "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") - kubeconfig = flag.String("kubeconfig", "", - "Path to a kubeconfig. Only required if out-of-cluster.") - ) - flag.Parse() - - cfg, err := GetConfig(*masterURL, *kubeconfig) - if err != nil { - log.Fatalf("Error building kubeconfig: %v", err) - } - MainWithConfig(ctx, component, cfg, ctors...) + MainWithConfig(ctx, component, ParseAndGetConfigOrDie(), ctors...) } +// MainWithConfig runs the generic main flow for non-webhook controllers. Use +// WebhookMainWithConfig if you need to serve webhooks. func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, ctors ...injection.ControllerConstructor) { log.Printf("Registering %d clients", len(injection.Default.GetClients())) log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories())) log.Printf("Registering %d informers", len(injection.Default.GetInformers())) log.Printf("Registering %d controllers", len(ctors)) - // Report stats on Go memory usage every 30 seconds. - msp := metrics.NewMemStatsAll() - msp.Start(ctx, 30*time.Second) - - if err := view.Register(msp.DefaultViews()...); err != nil { - log.Fatalf("Error exporting go memstats view: %v", err) - } + MemStatsOrDie(ctx) - // Adjust our client's rate limits based on the number of controller's we are running. + // Adjust our client's rate limits based on the number of controllers we are running. cfg.QPS = float32(len(ctors)) * rest.DefaultQPS cfg.Burst = len(ctors) * rest.DefaultBurst ctx, informers := injection.Default.SetupInformers(ctx, cfg) - // Set up our logger. - loggingConfig, err := GetLoggingConfig(ctx) - if err != nil { - log.Fatalf("Error reading/parsing logging configuration: %v", err) - } - logger, atomicLevel := logging.NewLoggerFromConfig(loggingConfig, component) + logger, atomicLevel := SetupLoggerOrDie(ctx, component) defer flush(logger) ctx = logging.WithLogger(ctx, logger) + profilingHandler := profiling.NewHandler(logger, false) - // Obtain K8s clientset. - kc := kubeclient.Get(ctx) - if err := version.CheckMinimumVersion(kc.Discovery()); err != nil { - logger.Fatalw("Version check failed", zap.Error(err)) - } + CheckK8sClientMinimumVersionOrDie(ctx, logger) + cmw := SetupConfigMapWatchOrDie(ctx, logger) + controllers, _ := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...) + WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component) + WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component) - // Create ConfigMaps watcher with optional label-based filter. - var cmLabelReqs []labels.Requirement - if cmLabel := system.ResourceLabel(); cmLabel != "" { - req, err := configmap.FilterConfigByLabelExists(cmLabel) - if err != nil { - logger.With(zap.Error(err)).Fatalf("Failed to generate requirement for label %q") - } - logger.Infof("Setting up ConfigMap watcher with label selector %q", req) - cmLabelReqs = append(cmLabelReqs, *req) + logger.Info("Starting configuration manager...") + if err := cmw.Start(ctx.Done()); err != nil { + logger.Fatalw("Failed to start configuration manager", zap.Error(err)) } - // TODO(mattmoor): This should itself take a context and be injection-based. - cmw := configmap.NewInformedWatcher(kc, system.Namespace(), cmLabelReqs...) + logger.Info("Starting informers...") + if err := controller.StartInformers(ctx.Done(), informers...); err != nil { + logger.Fatalw("Failed to start informers", zap.Error(err)) + } + logger.Info("Starting controllers...") + go controller.StartAll(ctx.Done(), controllers...) - // Based on the reconcilers we have linked, build up the set of controllers to run. - controllers := make([]*controller.Impl, 0, len(ctors)) - webhooks := make([]interface{}, 0) - for _, cf := range ctors { - ctrl := cf(ctx, cmw) - controllers = append(controllers, ctrl) + profilingServer := profiling.NewServer(profilingHandler) + eg, egCtx := errgroup.WithContext(ctx) + eg.Go(profilingServer.ListenAndServe) - // Build a list of any reconcilers that implement webhook.AdmissionController - switch c := ctrl.Reconciler.(type) { - case webhook.AdmissionController, webhook.ConversionController: - webhooks = append(webhooks, c) - } + // This will block until either a signal arrives or one of the grouped functions + // returns an error. + <-egCtx.Done() + + profilingServer.Shutdown(context.Background()) + // Don't forward ErrServerClosed as that indicates we're already shutting down. + if err := eg.Wait(); err != nil && err != http.ErrServerClosed { + logger.Errorw("Error while running server", zap.Error(err)) } +} - profilingHandler := profiling.NewHandler(logger, false) +// WebhookMainWithContext runs the generic main flow for controllers and +// webhooks. Use MainWithContext if you do not need to serve webhooks. +func WebhookMainWithContext(ctx context.Context, component string, ctors ...injection.ControllerConstructor) { + WebhookMainWithConfig(ctx, component, ParseAndGetConfigOrDie(), ctors...) +} - // Watch the logging config map and dynamically update logging levels. - if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(logging.ConfigMapName(), - metav1.GetOptions{}); err == nil { - cmw.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, component)) - } else if !apierrors.IsNotFound(err) { - logger.With(zap.Error(err)).Fatalf("Error reading ConfigMap %q", logging.ConfigMapName()) - } +// WebhookMainWithConfig runs the generic main flow for controllers and webhooks +// with the given config. Use MainWithConfig if you do not need to serve +// webhooks. +func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Config, ctors ...injection.ControllerConstructor) { + log.Printf("Registering %d clients", len(injection.Default.GetClients())) + log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories())) + log.Printf("Registering %d informers", len(injection.Default.GetInformers())) + log.Printf("Registering %d controllers", len(ctors)) - // Watch the observability config map - if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(metrics.ConfigMapName(), - metav1.GetOptions{}); err == nil { - cmw.Watch(metrics.ConfigMapName(), - metrics.UpdateExporterFromConfigMap(component, logger), - profilingHandler.UpdateFromConfigMap) - } else if !apierrors.IsNotFound(err) { - logger.With(zap.Error(err)).Fatalf("Error reading ConfigMap %q", metrics.ConfigMapName()) - } + MemStatsOrDie(ctx) + + // Adjust our client's rate limits based on the number of controllers we are running. + cfg.QPS = float32(len(ctors)) * rest.DefaultQPS + cfg.Burst = len(ctors) * rest.DefaultBurst + ctx, informers := injection.Default.SetupInformers(ctx, cfg) + logger, atomicLevel := SetupLoggerOrDie(ctx, component) + defer flush(logger) + ctx = logging.WithLogger(ctx, logger) + profilingHandler := profiling.NewHandler(logger, false) + + CheckK8sClientMinimumVersionOrDie(ctx, logger) + cmw := SetupConfigMapWatchOrDie(ctx, logger) + controllers, webhooks := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...) + WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component) + WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component) + + logger.Info("Starting configuration manager...") if err := cmw.Start(ctx.Done()); err != nil { logger.Fatalw("Failed to start configuration manager", zap.Error(err)) } - - // Start all of the informers and wait for them to sync. - logger.Info("Starting informers.") + logger.Info("Starting informers...") if err := controller.StartInformers(ctx.Done(), informers...); err != nil { logger.Fatalw("Failed to start informers", zap.Error(err)) } - - // Start all of the controllers. logger.Info("Starting controllers...") go controller.StartAll(ctx.Done(), controllers...) @@ -245,3 +238,117 @@ func flush(logger *zap.SugaredLogger) { logger.Sync() metrics.FlushExporter() } + +// ParseAndGetConfigOrDie parses the rest config flags and creates a client or +// dies by calling log.Fatalf. +func ParseAndGetConfigOrDie() *rest.Config { + var ( + masterURL = flag.String("master", "", + "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") + kubeconfig = flag.String("kubeconfig", "", + "Path to a kubeconfig. Only required if out-of-cluster.") + ) + flag.Parse() + + cfg, err := GetConfig(*masterURL, *kubeconfig) + if err != nil { + log.Fatalf("Error building kubeconfig: %v", err) + } + + return cfg +} + +// MemStatsOrDie sets up reporting on Go memory usage every 30 seconds or dies +// by calling log.Fatalf. +func MemStatsOrDie(ctx context.Context) { + msp := metrics.NewMemStatsAll() + msp.Start(ctx, 30*time.Second) + + if err := view.Register(msp.DefaultViews()...); err != nil { + log.Fatalf("Error exporting go memstats view: %v", err) + } +} + +// SetupLoggerOrDie sets up the logger using the config from the given context +// and returns a logger and atomic level, or dies by calling log.Fatalf. +func SetupLoggerOrDie(ctx context.Context, component string) (*zap.SugaredLogger, zap.AtomicLevel) { + loggingConfig, err := GetLoggingConfig(ctx) + if err != nil { + log.Fatalf("Error reading/parsing logging configuration: %v", err) + } + return logging.NewLoggerFromConfig(loggingConfig, component) +} + +// CheckK8sClientMinimumVersionOrDie checks that the hosting Kubernetes cluster +// is at least the minimum allowable version or dies by calling log.Fatalf. +func CheckK8sClientMinimumVersionOrDie(ctx context.Context, logger *zap.SugaredLogger) { + kc := kubeclient.Get(ctx) + if err := version.CheckMinimumVersion(kc.Discovery()); err != nil { + logger.Fatalw("Version check failed", zap.Error(err)) + } +} + +// SetupConfigMapWatchOrDie establishes a watch of the configmaps in the system +// namespace that are labeled to be watched or dies by calling log.Fatalf. +func SetupConfigMapWatchOrDie(ctx context.Context, logger *zap.SugaredLogger) *configmap.InformedWatcher { + kc := kubeclient.Get(ctx) + // Create ConfigMaps watcher with optional label-based filter. + var cmLabelReqs []labels.Requirement + if cmLabel := system.ResourceLabel(); cmLabel != "" { + req, err := configmap.FilterConfigByLabelExists(cmLabel) + if err != nil { + logger.With(zap.Error(err)).Fatalf("Failed to generate requirement for label %q") + } + logger.Infof("Setting up ConfigMap watcher with label selector %q", req) + cmLabelReqs = append(cmLabelReqs, *req) + } + // TODO(mattmoor): This should itself take a context and be injection-based. + return configmap.NewInformedWatcher(kc, system.Namespace(), cmLabelReqs...) +} + +// WatchLoggingConfigOrDie establishes a watch of the logging config or dies by +// calling log.Fatalf. Note, if the config does not exist, it will be defaulted +// and this method will not die. +func WatchLoggingConfigOrDie(ctx context.Context, cmw *configmap.InformedWatcher, logger *zap.SugaredLogger, atomicLevel zap.AtomicLevel, component string) { + if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(logging.ConfigMapName(), + metav1.GetOptions{}); err == nil { + cmw.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, component)) + } else if !apierrors.IsNotFound(err) { + logger.With(zap.Error(err)).Fatalf("Error reading ConfigMap %q", logging.ConfigMapName()) + } +} + +// WatchObservabilityConfigOrDie establishes a watch of the logging config or +// dies by calling log.Fatalf. Note, if the config does not exist, it will be +// defaulted and this method will not die. +func WatchObservabilityConfigOrDie(ctx context.Context, cmw *configmap.InformedWatcher, profilingHandler *profiling.Handler, logger *zap.SugaredLogger, component string) { + if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(metrics.ConfigMapName(), + metav1.GetOptions{}); err == nil { + cmw.Watch(metrics.ConfigMapName(), + metrics.UpdateExporterFromConfigMap(component, logger), + profilingHandler.UpdateFromConfigMap) + } else if !apierrors.IsNotFound(err) { + logger.With(zap.Error(err)).Fatalf("Error reading ConfigMap %q", metrics.ConfigMapName()) + } +} + +// ControllersAndWebhooksFromCtors returns a list of the controllers and a list +// of the webhooks created from the given constructors. +func ControllersAndWebhooksFromCtors(ctx context.Context, + cmw *configmap.InformedWatcher, + ctors ...injection.ControllerConstructor) ([]*controller.Impl, []interface{}) { + controllers := make([]*controller.Impl, 0, len(ctors)) + webhooks := make([]interface{}, 0) + for _, cf := range ctors { + ctrl := cf(ctx, cmw) + controllers = append(controllers, ctrl) + + // Build a list of any reconcilers that implement webhook.AdmissionController + switch c := ctrl.Reconciler.(type) { + case webhook.AdmissionController, webhook.ConversionController: + webhooks = append(webhooks, c) + } + } + + return controllers, webhooks +}