From 362a27173e31bc36565a331ee8f2d6eb657f1e5c Mon Sep 17 00:00:00 2001 From: Hidde Beydals Date: Tue, 31 Jan 2023 14:50:08 +0000 Subject: [PATCH] Patch CRDs with origin labels This allows the applied CRDs to be traced using the same labels as currently applied to resources using a Kustomize post-render. Kustomize is not used here as the apply logic for CRDs is different from the approach used during releasing, where we inject the labels in such a way that they are written back to the Helm storage in the rendered manifest. This to match Helm's logic from which our present code is already derived (buth with support for policies). This also moves the full responsibility of dealing with the install of CRDs to ourselves, as we no longer fall back to Helm's logic when `Create` is configured as a policy during a Helm install. As this would not allow us to add the labels. Signed-off-by: Hidde Beydals --- internal/runner/runner.go | 156 +++++++++++++++++++++++++------------- 1 file changed, 103 insertions(+), 53 deletions(-) diff --git a/internal/runner/runner.go b/internal/runner/runner.go index e829c9f4f..da2dc8116 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -38,12 +38,15 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" v2 "github.com/fluxcd/helm-controller/api/v2beta1" ) +var accessor = meta.NewAccessor() + type ActionError struct { Err error CapturedLogs string @@ -95,7 +98,7 @@ func postRenderers(hr v2.HelmRelease) (postrender.PostRenderer, error) { return &combinedRenderer, nil } -// Install runs an Helm install action for the given v2beta1.HelmRelease. +// Install runs a Helm install action for the given v2beta1.HelmRelease. func (r *Runner) Install(hr v2.HelmRelease, chart *chart.Chart, values chartutil.Values) (*release.Release, error) { r.mu.Lock() defer r.mu.Unlock() @@ -110,33 +113,31 @@ func (r *Runner) Install(hr v2.HelmRelease, chart *chart.Chart, values chartutil install.DisableHooks = hr.Spec.GetInstall().DisableHooks install.DisableOpenAPIValidation = hr.Spec.GetInstall().DisableOpenAPIValidation install.Replace = hr.Spec.GetInstall().Replace - var legacyCRDsPolicy = v2.Create - if hr.Spec.GetInstall().SkipCRDs { - legacyCRDsPolicy = v2.Skip + install.SkipCRDs = true + install.Devel = true + + if hr.Spec.TargetNamespace != "" { + install.CreateNamespace = hr.Spec.GetInstall().CreateNamespace } - cRDsPolicy, err := r.validateCRDsPolicy(hr.Spec.GetInstall().CRDs, legacyCRDsPolicy) + + renderer, err := postRenderers(hr) if err != nil { return nil, wrapActionErr(r.logBuffer, err) } - if cRDsPolicy == v2.Skip || cRDsPolicy == v2.CreateReplace { - install.SkipCRDs = true + install.PostRenderer = renderer + + // If user opted-in to install (or replace) CRDs, install them first. + var legacyCRDsPolicy = v2.Create + if hr.Spec.GetInstall().SkipCRDs { + legacyCRDsPolicy = v2.Skip } - install.Devel = true - renderer, err := postRenderers(hr) + cRDsPolicy, err := r.validateCRDsPolicy(hr.Spec.GetInstall().CRDs, legacyCRDsPolicy) if err != nil { return nil, wrapActionErr(r.logBuffer, err) } - install.PostRenderer = renderer - if hr.Spec.TargetNamespace != "" { - install.CreateNamespace = hr.Spec.GetInstall().CreateNamespace - } - - if cRDsPolicy == v2.CreateReplace { - crds := chart.CRDObjects() - if len(crds) > 0 { - if err := r.applyCRDs(cRDsPolicy, hr, chart); err != nil { - return nil, wrapActionErr(r.logBuffer, err) - } + if cRDsPolicy != v2.Skip && len(chart.CRDObjects()) > 0 { + if err := r.applyCRDs(cRDsPolicy, chart, setOriginVisitor(hr.Namespace, hr.Name)); err != nil { + return nil, wrapActionErr(r.logBuffer, err) } } @@ -163,24 +164,24 @@ func (r *Runner) Upgrade(hr v2.HelmRelease, chart *chart.Chart, values chartutil upgrade.Force = hr.Spec.GetUpgrade().Force upgrade.CleanupOnFail = hr.Spec.GetUpgrade().CleanupOnFail upgrade.Devel = true + renderer, err := postRenderers(hr) if err != nil { return nil, wrapActionErr(r.logBuffer, err) } upgrade.PostRenderer = renderer + // If user opted-in to upgrade CRDs, upgrade them first. cRDsPolicy, err := r.validateCRDsPolicy(hr.Spec.GetUpgrade().CRDs, v2.Skip) if err != nil { return nil, wrapActionErr(r.logBuffer, err) } - if cRDsPolicy != v2.Skip { - crds := chart.CRDObjects() - if len(crds) > 0 { - if err := r.applyCRDs(cRDsPolicy, hr, chart); err != nil { - return nil, wrapActionErr(r.logBuffer, err) - } + if cRDsPolicy != v2.Skip && len(chart.CRDObjects()) > 0 { + if err := r.applyCRDs(cRDsPolicy, chart, setOriginVisitor(hr.Namespace, hr.Name)); err != nil { + return nil, wrapActionErr(r.logBuffer, err) } } + rel, err := upgrade.Run(hr.GetReleaseName(), chart, values.AsMap()) return rel, wrapActionErr(r.logBuffer, err) } @@ -196,7 +197,7 @@ func (r *Runner) validateCRDsPolicy(policy v2.CRDsPolicy, defaultValue v2.CRDsPo case v2.CreateReplace: break default: - return policy, fmt.Errorf("invalid CRD upgrade policy '%s' defined in field upgradeCRDs, valid values are '%s', '%s' or '%s'", + return policy, fmt.Errorf("invalid CRD policy '%s' defined in field CRDsPolicy, valid values are '%s', '%s' or '%s'", policy, v2.Skip, v2.Create, v2.CreateReplace, ) } @@ -210,37 +211,45 @@ func (*rootScoped) Name() meta.RESTScopeName { } // This has been adapted from https://github.com/helm/helm/blob/v3.5.4/pkg/action/install.go#L127 -func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart.Chart) error { - cfg := r.config - cfg.Log("apply CRDs with policy %s", policy) +func (r *Runner) applyCRDs(policy v2.CRDsPolicy, chart *chart.Chart, visitorFunc ...resource.VisitorFunc) error { + r.config.Log("apply CRDs with policy %s", policy) + // Collect all CRDs from all files in `crds` directory. allCrds := make(kube.ResourceList, 0) for _, obj := range chart.CRDObjects() { // Read in the resources - res, err := cfg.KubeClient.Build(bytes.NewBuffer(obj.File.Data), false) + res, err := r.config.KubeClient.Build(bytes.NewBuffer(obj.File.Data), false) if err != nil { - cfg.Log("failed to parse CRDs from %s: %s", obj.Name, err) + r.config.Log("failed to parse CRDs from %s: %s", obj.Name, err) return errors.New(fmt.Sprintf("failed to parse CRDs from %s: %s", obj.Name, err)) } allCrds = append(allCrds, res...) } - totalItems := []*resource.Info{} + + // Visit CRDs with any provided visitor functions. + for _, visitor := range visitorFunc { + if err := allCrds.Visit(visitor); err != nil { + return err + } + } + + var totalItems []*resource.Info switch policy { case v2.Skip: break case v2.Create: for i := range allCrds { - if rr, err := cfg.KubeClient.Create(allCrds[i : i+1]); err != nil { + if rr, err := r.config.KubeClient.Create(allCrds[i : i+1]); err != nil { crdName := allCrds[i].Name // If the error is CRD already exists, continue. if apierrors.IsAlreadyExists(err) { - cfg.Log("CRD %s is already present. Skipping.", crdName) + r.config.Log("CRD %s is already present. Skipping.", crdName) if rr != nil && rr.Created != nil { totalItems = append(totalItems, rr.Created...) } continue } - cfg.Log("failed to create CRD %s: %s", crdName, err) + r.config.Log("failed to create CRD %s: %s", crdName, err) return errors.New(fmt.Sprintf("failed to create CRD %s: %s", crdName, err)) } else { if rr != nil && rr.Created != nil { @@ -248,16 +257,15 @@ func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart } } } - break case v2.CreateReplace: config, err := r.config.RESTClientGetter.ToRESTConfig() if err != nil { - r.logBuffer.Log("Error while creating Kubernetes client config: %s", err) + r.config.Log("Error while creating Kubernetes client config: %s", err) return err } clientset, err := apiextension.NewForConfig(config) if err != nil { - r.logBuffer.Log("Error while creating Kubernetes clientset for apiextension: %s", err) + r.config.Log("Error while creating Kubernetes clientset for apiextension: %s", err) return err } client := clientset.ApiextensionsV1().CustomResourceDefinitions() @@ -265,21 +273,21 @@ func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart // Note, we build the originals from the current set of CRDs // and therefore this upgrade will never delete CRDs that existed in the former release // but no longer exist in the current release. - for _, r := range allCrds { - if o, err := client.Get(context.TODO(), r.Name, v1.GetOptions{}); err == nil && o != nil { + for _, res := range allCrds { + if o, err := client.Get(context.TODO(), res.Name, metav1.GetOptions{}); err == nil && o != nil { o.GetResourceVersion() original = append(original, &resource.Info{ Client: clientset.ApiextensionsV1().RESTClient(), Mapping: &meta.RESTMapping{ Resource: schema.GroupVersionResource{ Group: "apiextensions.k8s.io", - Version: r.Mapping.GroupVersionKind.Version, + Version: res.Mapping.GroupVersionKind.Version, Resource: "customresourcedefinition", }, GroupVersionKind: schema.GroupVersionKind{ Kind: "CustomResourceDefinition", Group: "apiextensions.k8s.io", - Version: r.Mapping.GroupVersionKind.Version, + Version: res.Mapping.GroupVersionKind.Version, }, Scope: &rootScoped{}, }, @@ -289,13 +297,13 @@ func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart ResourceVersion: o.ObjectMeta.ResourceVersion, }) } else if !apierrors.IsNotFound(err) { - cfg.Log("failed to get CRD %s: %s", r.Name, err) + r.config.Log("failed to get CRD %s: %s", res.Name, err) return err } } // Send them to Kube - if rr, err := cfg.KubeClient.Update(original, allCrds, true); err != nil { - cfg.Log("failed to apply CRD %s", err) + if rr, err := r.config.KubeClient.Update(original, allCrds, true); err != nil { + r.config.Log("failed to apply CRD %s", err) return errors.New(fmt.Sprintf("failed to apply CRD %s", err)) } else { if rr != nil { @@ -310,21 +318,21 @@ func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart } } } - break } + if len(totalItems) > 0 { // Invalidate the local cache, since it will not have the new CRDs // present. - discoveryClient, err := cfg.RESTClientGetter.ToDiscoveryClient() + discoveryClient, err := r.config.RESTClientGetter.ToDiscoveryClient() if err != nil { - cfg.Log("Error in cfg.RESTClientGetter.ToDiscoveryClient(): %s", err) + r.config.Log("Error in cfg.RESTClientGetter.ToDiscoveryClient(): %s", err) return err } - cfg.Log("Clearing discovery cache") + r.config.Log("Clearing discovery cache") discoveryClient.Invalidate() // Give time for the CRD to be recognized. - if err := cfg.KubeClient.Wait(totalItems, 60*time.Second); err != nil { - cfg.Log("Error waiting for items: %s", err) + if err := r.config.KubeClient.Wait(totalItems, 60*time.Second); err != nil { + r.config.Log("Error waiting for items: %s", err) return err } // Make sure to force a rebuild of the cache. @@ -402,3 +410,45 @@ func wrapActionErr(log *LogBuffer, err error) error { } return err } + +func setOriginVisitor(namespace, name string) resource.VisitorFunc { + return func(info *resource.Info, err error) error { + if err != nil { + return err + } + if err = mergeLabels(info.Object, originLabels(namespace, name)); err != nil { + return fmt.Errorf( + "%s origin labels could not be updated: %s", + resourceString(info), err, + ) + } + return nil + } +} + +func mergeLabels(obj runtime.Object, labels map[string]string) error { + current, err := accessor.Labels(obj) + if err != nil { + return err + } + return accessor.SetLabels(obj, mergeStrStrMaps(current, labels)) +} + +func resourceString(info *resource.Info) string { + _, k := info.Mapping.GroupVersionKind.ToAPIVersionAndKind() + return fmt.Sprintf( + "%s %q in namespace %q", + k, info.Name, info.Namespace, + ) +} + +func mergeStrStrMaps(current, desired map[string]string) map[string]string { + result := make(map[string]string) + for k, v := range current { + result[k] = v + } + for k, desiredVal := range desired { + result[k] = desiredVal + } + return result +}