Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch CRDs with origin labels #596

Merged
merged 1 commit into from
Feb 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 103 additions & 53 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ import (

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

v2 "github.com/fluxcd/helm-controller/api/v2beta1"
)

var accessor = meta.NewAccessor()

type ActionError struct {
Err error
CapturedLogs string
Expand Down Expand Up @@ -95,7 +98,7 @@ func postRenderers(hr v2.HelmRelease) (postrender.PostRenderer, error) {
return &combinedRenderer, nil
}

// Install runs an Helm install action for the given v2beta1.HelmRelease.
// Install runs a Helm install action for the given v2beta1.HelmRelease.
func (r *Runner) Install(hr v2.HelmRelease, chart *chart.Chart, values chartutil.Values) (*release.Release, error) {
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -110,33 +113,31 @@ func (r *Runner) Install(hr v2.HelmRelease, chart *chart.Chart, values chartutil
install.DisableHooks = hr.Spec.GetInstall().DisableHooks
install.DisableOpenAPIValidation = hr.Spec.GetInstall().DisableOpenAPIValidation
install.Replace = hr.Spec.GetInstall().Replace
var legacyCRDsPolicy = v2.Create
if hr.Spec.GetInstall().SkipCRDs {
legacyCRDsPolicy = v2.Skip
install.SkipCRDs = true
install.Devel = true

if hr.Spec.TargetNamespace != "" {
install.CreateNamespace = hr.Spec.GetInstall().CreateNamespace
}
cRDsPolicy, err := r.validateCRDsPolicy(hr.Spec.GetInstall().CRDs, legacyCRDsPolicy)

renderer, err := postRenderers(hr)
if err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
if cRDsPolicy == v2.Skip || cRDsPolicy == v2.CreateReplace {
install.SkipCRDs = true
install.PostRenderer = renderer

// If user opted-in to install (or replace) CRDs, install them first.
var legacyCRDsPolicy = v2.Create
if hr.Spec.GetInstall().SkipCRDs {
legacyCRDsPolicy = v2.Skip
}
install.Devel = true
renderer, err := postRenderers(hr)
cRDsPolicy, err := r.validateCRDsPolicy(hr.Spec.GetInstall().CRDs, legacyCRDsPolicy)
if err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
install.PostRenderer = renderer
if hr.Spec.TargetNamespace != "" {
install.CreateNamespace = hr.Spec.GetInstall().CreateNamespace
}

if cRDsPolicy == v2.CreateReplace {
crds := chart.CRDObjects()
if len(crds) > 0 {
if err := r.applyCRDs(cRDsPolicy, hr, chart); err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
if cRDsPolicy != v2.Skip && len(chart.CRDObjects()) > 0 {
if err := r.applyCRDs(cRDsPolicy, chart, setOriginVisitor(hr.Namespace, hr.Name)); err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
}

Expand All @@ -163,24 +164,24 @@ func (r *Runner) Upgrade(hr v2.HelmRelease, chart *chart.Chart, values chartutil
upgrade.Force = hr.Spec.GetUpgrade().Force
upgrade.CleanupOnFail = hr.Spec.GetUpgrade().CleanupOnFail
upgrade.Devel = true

renderer, err := postRenderers(hr)
if err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
upgrade.PostRenderer = renderer

// If user opted-in to upgrade CRDs, upgrade them first.
cRDsPolicy, err := r.validateCRDsPolicy(hr.Spec.GetUpgrade().CRDs, v2.Skip)
if err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
if cRDsPolicy != v2.Skip {
crds := chart.CRDObjects()
if len(crds) > 0 {
if err := r.applyCRDs(cRDsPolicy, hr, chart); err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
if cRDsPolicy != v2.Skip && len(chart.CRDObjects()) > 0 {
if err := r.applyCRDs(cRDsPolicy, chart, setOriginVisitor(hr.Namespace, hr.Name)); err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
}

rel, err := upgrade.Run(hr.GetReleaseName(), chart, values.AsMap())
return rel, wrapActionErr(r.logBuffer, err)
}
Expand All @@ -196,7 +197,7 @@ func (r *Runner) validateCRDsPolicy(policy v2.CRDsPolicy, defaultValue v2.CRDsPo
case v2.CreateReplace:
break
default:
return policy, fmt.Errorf("invalid CRD upgrade policy '%s' defined in field upgradeCRDs, valid values are '%s', '%s' or '%s'",
return policy, fmt.Errorf("invalid CRD policy '%s' defined in field CRDsPolicy, valid values are '%s', '%s' or '%s'",
policy, v2.Skip, v2.Create, v2.CreateReplace,
)
}
Expand All @@ -210,76 +211,83 @@ func (*rootScoped) Name() meta.RESTScopeName {
}

// This has been adapted from https://github.com/helm/helm/blob/v3.5.4/pkg/action/install.go#L127
func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart.Chart) error {
cfg := r.config
cfg.Log("apply CRDs with policy %s", policy)
func (r *Runner) applyCRDs(policy v2.CRDsPolicy, chart *chart.Chart, visitorFunc ...resource.VisitorFunc) error {
r.config.Log("apply CRDs with policy %s", policy)

// Collect all CRDs from all files in `crds` directory.
allCrds := make(kube.ResourceList, 0)
for _, obj := range chart.CRDObjects() {
// Read in the resources
res, err := cfg.KubeClient.Build(bytes.NewBuffer(obj.File.Data), false)
res, err := r.config.KubeClient.Build(bytes.NewBuffer(obj.File.Data), false)
if err != nil {
cfg.Log("failed to parse CRDs from %s: %s", obj.Name, err)
r.config.Log("failed to parse CRDs from %s: %s", obj.Name, err)
return errors.New(fmt.Sprintf("failed to parse CRDs from %s: %s", obj.Name, err))
}
allCrds = append(allCrds, res...)
}
totalItems := []*resource.Info{}

// Visit CRDs with any provided visitor functions.
for _, visitor := range visitorFunc {
if err := allCrds.Visit(visitor); err != nil {
return err
}
}

var totalItems []*resource.Info
switch policy {
case v2.Skip:
break
case v2.Create:
for i := range allCrds {
if rr, err := cfg.KubeClient.Create(allCrds[i : i+1]); err != nil {
if rr, err := r.config.KubeClient.Create(allCrds[i : i+1]); err != nil {
crdName := allCrds[i].Name
// If the error is CRD already exists, continue.
if apierrors.IsAlreadyExists(err) {
cfg.Log("CRD %s is already present. Skipping.", crdName)
r.config.Log("CRD %s is already present. Skipping.", crdName)
if rr != nil && rr.Created != nil {
totalItems = append(totalItems, rr.Created...)
}
continue
}
cfg.Log("failed to create CRD %s: %s", crdName, err)
r.config.Log("failed to create CRD %s: %s", crdName, err)
return errors.New(fmt.Sprintf("failed to create CRD %s: %s", crdName, err))
} else {
if rr != nil && rr.Created != nil {
totalItems = append(totalItems, rr.Created...)
}
}
}
break
case v2.CreateReplace:
config, err := r.config.RESTClientGetter.ToRESTConfig()
if err != nil {
r.logBuffer.Log("Error while creating Kubernetes client config: %s", err)
r.config.Log("Error while creating Kubernetes client config: %s", err)
return err
}
clientset, err := apiextension.NewForConfig(config)
if err != nil {
r.logBuffer.Log("Error while creating Kubernetes clientset for apiextension: %s", err)
r.config.Log("Error while creating Kubernetes clientset for apiextension: %s", err)
return err
}
client := clientset.ApiextensionsV1().CustomResourceDefinitions()
original := make(kube.ResourceList, 0)
// Note, we build the originals from the current set of CRDs
// and therefore this upgrade will never delete CRDs that existed in the former release
// but no longer exist in the current release.
for _, r := range allCrds {
if o, err := client.Get(context.TODO(), r.Name, v1.GetOptions{}); err == nil && o != nil {
for _, res := range allCrds {
if o, err := client.Get(context.TODO(), res.Name, metav1.GetOptions{}); err == nil && o != nil {
o.GetResourceVersion()
original = append(original, &resource.Info{
Client: clientset.ApiextensionsV1().RESTClient(),
Mapping: &meta.RESTMapping{
Resource: schema.GroupVersionResource{
Group: "apiextensions.k8s.io",
Version: r.Mapping.GroupVersionKind.Version,
Version: res.Mapping.GroupVersionKind.Version,
Resource: "customresourcedefinition",
},
GroupVersionKind: schema.GroupVersionKind{
Kind: "CustomResourceDefinition",
Group: "apiextensions.k8s.io",
Version: r.Mapping.GroupVersionKind.Version,
Version: res.Mapping.GroupVersionKind.Version,
},
Scope: &rootScoped{},
},
Expand All @@ -289,13 +297,13 @@ func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart
ResourceVersion: o.ObjectMeta.ResourceVersion,
})
} else if !apierrors.IsNotFound(err) {
cfg.Log("failed to get CRD %s: %s", r.Name, err)
r.config.Log("failed to get CRD %s: %s", res.Name, err)
return err
}
}
// Send them to Kube
if rr, err := cfg.KubeClient.Update(original, allCrds, true); err != nil {
cfg.Log("failed to apply CRD %s", err)
if rr, err := r.config.KubeClient.Update(original, allCrds, true); err != nil {
r.config.Log("failed to apply CRD %s", err)
return errors.New(fmt.Sprintf("failed to apply CRD %s", err))
} else {
if rr != nil {
Expand All @@ -310,21 +318,21 @@ func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart
}
}
}
break
}

if len(totalItems) > 0 {
// Invalidate the local cache, since it will not have the new CRDs
// present.
discoveryClient, err := cfg.RESTClientGetter.ToDiscoveryClient()
discoveryClient, err := r.config.RESTClientGetter.ToDiscoveryClient()
if err != nil {
cfg.Log("Error in cfg.RESTClientGetter.ToDiscoveryClient(): %s", err)
r.config.Log("Error in cfg.RESTClientGetter.ToDiscoveryClient(): %s", err)
return err
}
cfg.Log("Clearing discovery cache")
r.config.Log("Clearing discovery cache")
discoveryClient.Invalidate()
// Give time for the CRD to be recognized.
if err := cfg.KubeClient.Wait(totalItems, 60*time.Second); err != nil {
cfg.Log("Error waiting for items: %s", err)
if err := r.config.KubeClient.Wait(totalItems, 60*time.Second); err != nil {
r.config.Log("Error waiting for items: %s", err)
return err
}
// Make sure to force a rebuild of the cache.
Expand Down Expand Up @@ -402,3 +410,45 @@ func wrapActionErr(log *LogBuffer, err error) error {
}
return err
}

func setOriginVisitor(namespace, name string) resource.VisitorFunc {
return func(info *resource.Info, err error) error {
if err != nil {
return err
}
if err = mergeLabels(info.Object, originLabels(namespace, name)); err != nil {
return fmt.Errorf(
"%s origin labels could not be updated: %s",
resourceString(info), err,
)
}
return nil
}
}

func mergeLabels(obj runtime.Object, labels map[string]string) error {
current, err := accessor.Labels(obj)
if err != nil {
return err
}
return accessor.SetLabels(obj, mergeStrStrMaps(current, labels))
}

func resourceString(info *resource.Info) string {
_, k := info.Mapping.GroupVersionKind.ToAPIVersionAndKind()
return fmt.Sprintf(
"%s %q in namespace %q",
k, info.Name, info.Namespace,
)
}

func mergeStrStrMaps(current, desired map[string]string) map[string]string {
result := make(map[string]string)
for k, v := range current {
result[k] = v
}
for k, desiredVal := range desired {
result[k] = desiredVal
}
return result
}