Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move webhooks onto new sharedmain methods #6824

Merged
merged 2 commits into from
Feb 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func main() {
SecretName: "webhook-certs",
})

sharedmain.MainWithContext(ctx, "webhook",
sharedmain.WebhookMainWithContext(ctx, "webhook",
certificates.NewController,
NewDefaultingAdmissionController,
NewValidationAdmissionController,
Expand Down
259 changes: 183 additions & 76 deletions vendor/knative.dev/pkg/injection/sharedmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,120 +93,113 @@ func GetLoggingConfig(ctx context.Context) (*logging.Config, error) {
return logging.NewConfigFromConfigMap(loggingConfigMap)
}

// Main runs the generic main flow for non-webhook controllers with a new
// context. Use WebhookMainWith* if you need to serve webhooks.
func Main(component string, ctors ...injection.ControllerConstructor) {
// Set up signals so we handle the first shutdown signal gracefully.
MainWithContext(signals.NewContext(), component, ctors...)
}

// MainWithContext runs the generic main flow for non-webhook controllers. Use
// WebhookMainWithContext if you need to serve webhooks.
func MainWithContext(ctx context.Context, component string, ctors ...injection.ControllerConstructor) {
var (
masterURL = flag.String("master", "",
"The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "",
"Path to a kubeconfig. Only required if out-of-cluster.")
)
flag.Parse()

cfg, err := GetConfig(*masterURL, *kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err)
}
MainWithConfig(ctx, component, cfg, ctors...)
MainWithConfig(ctx, component, ParseAndGetConfigOrDie(), ctors...)
}

// MainWithConfig runs the generic main flow for non-webhook controllers. Use
// WebhookMainWithConfig if you need to serve webhooks.
func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, ctors ...injection.ControllerConstructor) {
log.Printf("Registering %d clients", len(injection.Default.GetClients()))
log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories()))
log.Printf("Registering %d informers", len(injection.Default.GetInformers()))
log.Printf("Registering %d controllers", len(ctors))

// Report stats on Go memory usage every 30 seconds.
msp := metrics.NewMemStatsAll()
msp.Start(ctx, 30*time.Second)

if err := view.Register(msp.DefaultViews()...); err != nil {
log.Fatalf("Error exporting go memstats view: %v", err)
}
MemStatsOrDie(ctx)

// Adjust our client's rate limits based on the number of controller's we are running.
// Adjust our client's rate limits based on the number of controllers we are running.
cfg.QPS = float32(len(ctors)) * rest.DefaultQPS
cfg.Burst = len(ctors) * rest.DefaultBurst

ctx, informers := injection.Default.SetupInformers(ctx, cfg)

// Set up our logger.
loggingConfig, err := GetLoggingConfig(ctx)
if err != nil {
log.Fatalf("Error reading/parsing logging configuration: %v", err)
}
logger, atomicLevel := logging.NewLoggerFromConfig(loggingConfig, component)
logger, atomicLevel := SetupLoggerOrDie(ctx, component)
defer flush(logger)
ctx = logging.WithLogger(ctx, logger)
profilingHandler := profiling.NewHandler(logger, false)

// Obtain K8s clientset.
kc := kubeclient.Get(ctx)
if err := version.CheckMinimumVersion(kc.Discovery()); err != nil {
logger.Fatalw("Version check failed", zap.Error(err))
}
CheckK8sClientMinimumVersionOrDie(ctx, logger)
cmw := SetupConfigMapWatchOrDie(ctx, logger)
controllers, _ := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...)
WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component)
WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component)

// Create ConfigMaps watcher with optional label-based filter.
var cmLabelReqs []labels.Requirement
if cmLabel := system.ResourceLabel(); cmLabel != "" {
req, err := configmap.FilterConfigByLabelExists(cmLabel)
if err != nil {
logger.With(zap.Error(err)).Fatalf("Failed to generate requirement for label %q")
}
logger.Infof("Setting up ConfigMap watcher with label selector %q", req)
cmLabelReqs = append(cmLabelReqs, *req)
logger.Info("Starting configuration manager...")
if err := cmw.Start(ctx.Done()); err != nil {
logger.Fatalw("Failed to start configuration manager", zap.Error(err))
}
// TODO(mattmoor): This should itself take a context and be injection-based.
cmw := configmap.NewInformedWatcher(kc, system.Namespace(), cmLabelReqs...)
logger.Info("Starting informers...")
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
logger.Fatalw("Failed to start informers", zap.Error(err))
}
logger.Info("Starting controllers...")
go controller.StartAll(ctx.Done(), controllers...)

// Based on the reconcilers we have linked, build up the set of controllers to run.
controllers := make([]*controller.Impl, 0, len(ctors))
webhooks := make([]interface{}, 0)
for _, cf := range ctors {
ctrl := cf(ctx, cmw)
controllers = append(controllers, ctrl)
profilingServer := profiling.NewServer(profilingHandler)
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(profilingServer.ListenAndServe)

// Build a list of any reconcilers that implement webhook.AdmissionController
switch c := ctrl.Reconciler.(type) {
case webhook.AdmissionController, webhook.ConversionController:
webhooks = append(webhooks, c)
}
// This will block until either a signal arrives or one of the grouped functions
// returns an error.
<-egCtx.Done()

profilingServer.Shutdown(context.Background())
// Don't forward ErrServerClosed as that indicates we're already shutting down.
if err := eg.Wait(); err != nil && err != http.ErrServerClosed {
logger.Errorw("Error while running server", zap.Error(err))
}
}

profilingHandler := profiling.NewHandler(logger, false)
// WebhookMainWithContext runs the generic main flow for controllers and
// webhooks. Use MainWithContext if you do not need to serve webhooks.
func WebhookMainWithContext(ctx context.Context, component string, ctors ...injection.ControllerConstructor) {
WebhookMainWithConfig(ctx, component, ParseAndGetConfigOrDie(), ctors...)
}

// Watch the logging config map and dynamically update logging levels.
if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(logging.ConfigMapName(),
metav1.GetOptions{}); err == nil {
cmw.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, component))
} else if !apierrors.IsNotFound(err) {
logger.With(zap.Error(err)).Fatalf("Error reading ConfigMap %q", logging.ConfigMapName())
}
// WebhookMainWithConfig runs the generic main flow for controllers and webhooks
// with the given config. Use MainWithConfig if you do not need to serve
// webhooks.
func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Config, ctors ...injection.ControllerConstructor) {
log.Printf("Registering %d clients", len(injection.Default.GetClients()))
log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories()))
log.Printf("Registering %d informers", len(injection.Default.GetInformers()))
log.Printf("Registering %d controllers", len(ctors))

// Watch the observability config map
if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(metrics.ConfigMapName(),
metav1.GetOptions{}); err == nil {
cmw.Watch(metrics.ConfigMapName(),
metrics.UpdateExporterFromConfigMap(component, logger),
profilingHandler.UpdateFromConfigMap)
} else if !apierrors.IsNotFound(err) {
logger.With(zap.Error(err)).Fatalf("Error reading ConfigMap %q", metrics.ConfigMapName())
}
MemStatsOrDie(ctx)

// Adjust our client's rate limits based on the number of controllers we are running.
cfg.QPS = float32(len(ctors)) * rest.DefaultQPS
cfg.Burst = len(ctors) * rest.DefaultBurst
ctx, informers := injection.Default.SetupInformers(ctx, cfg)

logger, atomicLevel := SetupLoggerOrDie(ctx, component)
defer flush(logger)
ctx = logging.WithLogger(ctx, logger)
profilingHandler := profiling.NewHandler(logger, false)

CheckK8sClientMinimumVersionOrDie(ctx, logger)
cmw := SetupConfigMapWatchOrDie(ctx, logger)
controllers, webhooks := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...)
WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component)
WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component)

logger.Info("Starting configuration manager...")
if err := cmw.Start(ctx.Done()); err != nil {
logger.Fatalw("Failed to start configuration manager", zap.Error(err))
}

// Start all of the informers and wait for them to sync.
logger.Info("Starting informers.")
logger.Info("Starting informers...")
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
logger.Fatalw("Failed to start informers", zap.Error(err))
}

// Start all of the controllers.
logger.Info("Starting controllers...")
go controller.StartAll(ctx.Done(), controllers...)

Expand Down Expand Up @@ -245,3 +238,117 @@ func flush(logger *zap.SugaredLogger) {
logger.Sync()
metrics.FlushExporter()
}

// ParseAndGetConfigOrDie parses the rest config flags and creates a client or
// dies by calling log.Fatalf.
func ParseAndGetConfigOrDie() *rest.Config {
var (
masterURL = flag.String("master", "",
"The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "",
"Path to a kubeconfig. Only required if out-of-cluster.")
)
flag.Parse()

cfg, err := GetConfig(*masterURL, *kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err)
}

return cfg
}

// MemStatsOrDie sets up reporting on Go memory usage every 30 seconds or dies
// by calling log.Fatalf.
func MemStatsOrDie(ctx context.Context) {
msp := metrics.NewMemStatsAll()
msp.Start(ctx, 30*time.Second)

if err := view.Register(msp.DefaultViews()...); err != nil {
log.Fatalf("Error exporting go memstats view: %v", err)
}
}

// SetupLoggerOrDie sets up the logger using the config from the given context
// and returns a logger and atomic level, or dies by calling log.Fatalf.
func SetupLoggerOrDie(ctx context.Context, component string) (*zap.SugaredLogger, zap.AtomicLevel) {
loggingConfig, err := GetLoggingConfig(ctx)
if err != nil {
log.Fatalf("Error reading/parsing logging configuration: %v", err)
}
return logging.NewLoggerFromConfig(loggingConfig, component)
}

// CheckK8sClientMinimumVersionOrDie checks that the hosting Kubernetes cluster
// is at least the minimum allowable version or dies by calling log.Fatalf.
func CheckK8sClientMinimumVersionOrDie(ctx context.Context, logger *zap.SugaredLogger) {
kc := kubeclient.Get(ctx)
if err := version.CheckMinimumVersion(kc.Discovery()); err != nil {
logger.Fatalw("Version check failed", zap.Error(err))
}
}

// SetupConfigMapWatchOrDie establishes a watch of the configmaps in the system
// namespace that are labeled to be watched or dies by calling log.Fatalf.
func SetupConfigMapWatchOrDie(ctx context.Context, logger *zap.SugaredLogger) *configmap.InformedWatcher {
kc := kubeclient.Get(ctx)
// Create ConfigMaps watcher with optional label-based filter.
var cmLabelReqs []labels.Requirement
if cmLabel := system.ResourceLabel(); cmLabel != "" {
req, err := configmap.FilterConfigByLabelExists(cmLabel)
if err != nil {
logger.With(zap.Error(err)).Fatalf("Failed to generate requirement for label %q")
}
logger.Infof("Setting up ConfigMap watcher with label selector %q", req)
cmLabelReqs = append(cmLabelReqs, *req)
}
// TODO(mattmoor): This should itself take a context and be injection-based.
return configmap.NewInformedWatcher(kc, system.Namespace(), cmLabelReqs...)
}

// WatchLoggingConfigOrDie establishes a watch of the logging config or dies by
// calling log.Fatalf. Note, if the config does not exist, it will be defaulted
// and this method will not die.
func WatchLoggingConfigOrDie(ctx context.Context, cmw *configmap.InformedWatcher, logger *zap.SugaredLogger, atomicLevel zap.AtomicLevel, component string) {
if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(logging.ConfigMapName(),
metav1.GetOptions{}); err == nil {
cmw.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, component))
} else if !apierrors.IsNotFound(err) {
logger.With(zap.Error(err)).Fatalf("Error reading ConfigMap %q", logging.ConfigMapName())
}
}

// WatchObservabilityConfigOrDie establishes a watch of the logging config or
// dies by calling log.Fatalf. Note, if the config does not exist, it will be
// defaulted and this method will not die.
func WatchObservabilityConfigOrDie(ctx context.Context, cmw *configmap.InformedWatcher, profilingHandler *profiling.Handler, logger *zap.SugaredLogger, component string) {
if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(metrics.ConfigMapName(),
metav1.GetOptions{}); err == nil {
cmw.Watch(metrics.ConfigMapName(),
metrics.UpdateExporterFromConfigMap(component, logger),
profilingHandler.UpdateFromConfigMap)
} else if !apierrors.IsNotFound(err) {
logger.With(zap.Error(err)).Fatalf("Error reading ConfigMap %q", metrics.ConfigMapName())
}
}

// ControllersAndWebhooksFromCtors returns a list of the controllers and a list
// of the webhooks created from the given constructors.
func ControllersAndWebhooksFromCtors(ctx context.Context,
cmw *configmap.InformedWatcher,
ctors ...injection.ControllerConstructor) ([]*controller.Impl, []interface{}) {
controllers := make([]*controller.Impl, 0, len(ctors))
webhooks := make([]interface{}, 0)
for _, cf := range ctors {
ctrl := cf(ctx, cmw)
controllers = append(controllers, ctrl)

// Build a list of any reconcilers that implement webhook.AdmissionController
switch c := ctrl.Reconciler.(type) {
case webhook.AdmissionController, webhook.ConversionController:
webhooks = append(webhooks, c)
}
}

return controllers, webhooks
}