Skip to content

Commit

Permalink
Improve the stability of the operator in big clusters (#283)
Browse files Browse the repository at this point in the history
This change improves the stability of the operator for big clusters. When used
to create big clusters (> 40 nodes), the number of things it had to do per pod
starts adding up. Calls to 'kubectl exec' to get various state (cmds package)
can take a while. There is also a lot of memory pressure when 'kubectl exec' is
called frequently -- we send the command to the REST endpoint that uses gzip to
compress/uncompress the input/output. The compression library, which we don't
have any control over, uses a lot of memory. Go is a garbage collection
language, so the memory is eventually freed, but the GC can take some time to
kick in.

The approach taken in this change is to batch as many 'kubectl exec' commands
as we can. This change impacts the podfacts collector, installer reconciler and
create db reconciler. The end result is that there is less memory pressure and
it is quicker for a reconcile iteration to get through its various state
checks.

Refactor of the main.go module. Most of the command line argument handling was
moved to a separate package called opcfg. This was changed because I wanted to
flow down the command line arguments into the controllers so that we can log
things only when in dev mode. So, we need to put the arguments in an exported
struct from a new package that the controllers would have access to.

Included in this is a change to make it easier to run the operator locally. A
new script was added that handles proper setup and teardown.
  • Loading branch information
spilchen authored Nov 9, 2022
1 parent 77588c9 commit efb566b
Show file tree
Hide file tree
Showing 38 changed files with 810 additions and 716 deletions.
4 changes: 2 additions & 2 deletions DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ This method runs the operator synchronously in your shell. It is the fastest way
Enter the following command:

```shell
make install run ENABLE_WEBHOOKS=false
make install run
```

Press **Ctrl+C** to stop the operator.
Expand All @@ -186,7 +186,7 @@ Press **Ctrl+C** to stop the operator.

This disables the webhook from running too, as running the webhook requires TLS certs to be available.

This will monitor VerticaDB CR's across all namespaces. So, make sure that when you run in this mode that you don't have any old VerticaDB's lying around.
This will monitor VerticaDB CR's in the current namespace only. You need to update your kubectl config and change the namespace if you want to monitor a different one.

### Option 2: Kubernetes Deployment

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ build: generate fmt vet ## Build manager binary.
go build -o bin/manager cmd/operator/main.go

run: manifests generate fmt vet ## Run a controller from your host.
go run cmd/operator/main.go -enable-profiler
scripts/run-operator.sh

docker-build-operator: manifests generate fmt vet ## Build operator docker image with the manager.
docker build -t ${OPERATOR_IMG} -f docker-operator/Dockerfile .
Expand Down
5 changes: 5 additions & 0 deletions changes/unreleased/Fixed-20221107-103657.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
kind: Fixed
body: Improve the stability of the operator in big clusters
time: 2022-11-07T10:36:57.957077201-04:00
custom:
Issue: "283"
185 changes: 22 additions & 163 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,13 @@ import (
"log"
"os"
"strconv"
"time"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
"net/http"
_ "net/http/pprof" // nolint:gosec

"github.com/go-logr/zapr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
lumberjack "gopkg.in/natefinch/lumberjack.v2"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand All @@ -46,92 +42,27 @@ import (
"github.com/vertica/vertica-kubernetes/pkg/builder"
"github.com/vertica/vertica-kubernetes/pkg/controllers/vas"
"github.com/vertica/vertica-kubernetes/pkg/controllers/vdb"
"github.com/vertica/vertica-kubernetes/pkg/opcfg"
"github.com/vertica/vertica-kubernetes/pkg/security"
//+kubebuilder:scaffold:imports
)

const (
DefaultMaxFileSize = 500
DefaultMaxFileAge = 7
DefaultMaxFileRotation = 3
DefaultLevel = "info"
DefaultDevMode = true
DefaultZapcoreLevel = zapcore.InfoLevel
First = 100
ThereAfter = 100
CertDir = "/tmp/k8s-webhook-server/serving-certs"
CertDir = "/tmp/k8s-webhook-server/serving-certs"
)

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)

type FlagConfig struct {
MetricsAddr string
EnableLeaderElection bool
ProbeAddr string
EnableProfiler bool
ServiceAccountName string
PrefixName string // Prefix of the name of all objects created when the operator was deployed
WebhookCertSecret string // when this is empty we will generate the webhook cert
LogArgs *Logging
}

type Logging struct {
FilePath string
Level string
MaxFileSize int
MaxFileAge int
MaxFileRotation int
DevMode bool
}

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))

utilruntime.Must(verticacomv1beta1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}

// setLoggingFlagArgs define logging flags with specified names and default values
func (l *Logging) setLoggingFlagArgs() {
flag.StringVar(&l.FilePath, "filepath", "",
"The path to the log file. If omitted, all logging will be written to stdout.")
flag.IntVar(&l.MaxFileSize, "maxfilesize", DefaultMaxFileSize,
"The maximum size in megabytes of the log file "+
"before it gets rotated.")
flag.IntVar(&l.MaxFileAge, "maxfileage", DefaultMaxFileAge,
"The maximum number of days to retain old log files based on the timestamp encoded in the file.")
flag.IntVar(&l.MaxFileRotation, "maxfilerotation", DefaultMaxFileRotation,
"The maximum number of files that are kept in rotation before the old ones are removed.")
flag.StringVar(&l.Level, "level", DefaultLevel,
"The minimum logging level. Valid values are: debug, info, warn, and error.")
flag.BoolVar(&l.DevMode, "dev", DefaultDevMode,
"Enables development mode if true and production mode otherwise.")
}

// setFlagArgs define flags with specified names and default values
func (fc *FlagConfig) setFlagArgs() {
flag.StringVar(&fc.MetricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&fc.ProbeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&fc.EnableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.BoolVar(&fc.EnableProfiler, "enable-profiler", false,
"Enables runtime profiling collection. The profiling data can be inspected by connecting to port 6060 "+
"with the path /debug/pprof. See https://golang.org/pkg/net/http/pprof/ for more info.")
flag.StringVar(&fc.ServiceAccountName, "service-account-name", "verticadb-operator-controller-manager",
"The name of the serviceAccount to use.")
flag.StringVar(&fc.PrefixName, "prefix-name", "verticadb-operator",
"The common prefix for all objects created during the operator deployment")
flag.StringVar(&fc.WebhookCertSecret, "webhook-cert-secret", "",
"Specifies the secret that contains the webhook cert. If this option is omitted, "+
"then the operator will generate the certificate.")
fc.LogArgs = &Logging{}
fc.LogArgs.setLoggingFlagArgs()
}

// getWatchNamespace returns the Namespace the operator should be watching for changes
func getWatchNamespace() (string, error) {
// WatchNamespaceEnvVar is the constant for env variable WATCH_NAMESPACE
Expand Down Expand Up @@ -163,91 +94,19 @@ func getIsWebhookEnabled() bool {
return enabled
}

// getEncoderConfig returns a concrete encoders configuration
func getEncoderConfig(devMode bool) zapcore.EncoderConfig {
encoderConfig := zap.NewDevelopmentEncoderConfig()
if !devMode {
encoderConfig = zap.NewProductionEncoderConfig()
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
}
return encoderConfig
}

// getLogWriter returns an io.writer (setting up rolling files) converted
// into a zapcore.WriteSyncer
func getLogWriter(logArgs Logging) zapcore.WriteSyncer {
lumberJackLogger := &lumberjack.Logger{
Filename: logArgs.FilePath,
MaxSize: logArgs.MaxFileSize, // megabytes
MaxBackups: logArgs.MaxFileRotation,
MaxAge: logArgs.MaxFileAge, // days
}
return zapcore.AddSync(lumberJackLogger)
}

// getZapcoreLevel takes the level as string and returns the corresponding
// zapcore.Level. If the string level is invalid, it returns the default
// level
func getZapcoreLevel(lvl string) zapcore.Level {
var level = new(zapcore.Level)
err := level.UnmarshalText([]byte(lvl))
if err != nil {
log.Printf("unrecognized level, %s level will be used instead", DefaultLevel)
return DefaultZapcoreLevel
}
return *level
}

// getStackTrace returns an option that configures
// the logger to record a stack strace.
func getStackTrace(devMode bool) zap.Option {
lvl := zapcore.ErrorLevel
if devMode {
lvl = zapcore.WarnLevel
}
return zap.AddStacktrace(zapcore.LevelEnabler(lvl))
}

// getLogger is a wrapper that calls other functions
// to build a logger.
func getLogger(logArgs Logging) *zap.Logger {
encoderConfig := getEncoderConfig(logArgs.DevMode)
writes := []zapcore.WriteSyncer{}
opts := []zap.Option{}
lvl := zap.NewAtomicLevelAt(getZapcoreLevel(logArgs.Level))
if logArgs.FilePath != "" {
w := getLogWriter(logArgs)
writes = append(writes, w)
}
if logArgs.FilePath == "" || logArgs.DevMode {
writes = append(writes, zapcore.AddSync(os.Stdout))
}
core := zapcore.NewCore(
zapcore.NewConsoleEncoder(encoderConfig),
zapcore.NewMultiWriteSyncer(writes...),
lvl,
)
opts = append(opts, getStackTrace(logArgs.DevMode))
if !logArgs.DevMode {
// This enables sampling only in prod
core = zapcore.NewSamplerWithOptions(core, time.Second, First, ThereAfter)
}
return zap.New(core, opts...)
}

// addReconcilersToManager will add a controller for each CR that this operator
// handles. If any failure occurs, if will exit the program.
func addReconcilersToManager(mgr manager.Manager, restCfg *rest.Config, flagArgs *FlagConfig) {
func addReconcilersToManager(mgr manager.Manager, restCfg *rest.Config, oc *opcfg.OperatorConfig) {
if err := (&vdb.VerticaDBReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("VerticaDB"),
Scheme: mgr.GetScheme(),
Cfg: restCfg,
EVRec: mgr.GetEventRecorderFor(builder.OperatorName),
OpCfg: *oc,
DeploymentNames: builder.DeploymentNames{
ServiceAccountName: flagArgs.ServiceAccountName,
PrefixName: flagArgs.PrefixName,
ServiceAccountName: oc.ServiceAccountName,
PrefixName: oc.PrefixName,
},
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "VerticaDB")
Expand Down Expand Up @@ -289,7 +148,7 @@ func addWebhooksToManager(mgr manager.Manager) {
}

// setupWebhook will setup the webhook in the manager if enabled
func setupWebhook(ctx context.Context, mgr manager.Manager, restCfg *rest.Config, flagArgs *FlagConfig) error {
func setupWebhook(ctx context.Context, mgr manager.Manager, restCfg *rest.Config, oc *opcfg.OperatorConfig) error {
if getIsWebhookEnabled() {
watchNamespace, err := getWatchNamespace()
if err != nil {
Expand All @@ -298,13 +157,13 @@ func setupWebhook(ctx context.Context, mgr manager.Manager, restCfg *rest.Config
setupLog.Info("Disabling webhook since we are not watching a single namespace")
return nil
}
if flagArgs.WebhookCertSecret == "" {
if err := security.GenerateWebhookCert(ctx, &setupLog, restCfg, CertDir, flagArgs.PrefixName, watchNamespace); err != nil {
if oc.WebhookCertSecret == "" {
if err := security.GenerateWebhookCert(ctx, &setupLog, restCfg, CertDir, oc.PrefixName, watchNamespace); err != nil {
return err
}
} else {
if err := security.PatchWebhookCABundleFromSecret(ctx, &setupLog, restCfg, flagArgs.WebhookCertSecret,
flagArgs.PrefixName, watchNamespace); err != nil {
if err := security.PatchWebhookCABundleFromSecret(ctx, &setupLog, restCfg, oc.WebhookCertSecret,
oc.PrefixName, watchNamespace); err != nil {
return err
}
}
Expand All @@ -324,18 +183,18 @@ func getReadinessProbeCallback(mgr ctrl.Manager) healthz.Checker {
}

func main() {
flagArgs := &FlagConfig{}
flagArgs.setFlagArgs()
oc := &opcfg.OperatorConfig{}
oc.SetFlagArgs()
flag.Parse()

logger := getLogger(*flagArgs.LogArgs)
if flagArgs.LogArgs.FilePath != "" {
log.Printf("Now logging in file %s", flagArgs.LogArgs.FilePath)
logger := oc.GetLogger()
if oc.FilePath != "" {
log.Printf("Now logging in file %s", oc.FilePath)
}

ctrl.SetLogger(zapr.NewLogger(logger))

if flagArgs.EnableProfiler {
if oc.EnableProfiler {
go func() {
addr := "localhost:6060"
setupLog.Info("Opening profiling port", "addr", addr)
Expand All @@ -355,10 +214,10 @@ func main() {

mgr, err := ctrl.NewManager(restCfg, ctrl.Options{
Scheme: scheme,
MetricsBindAddress: flagArgs.MetricsAddr,
MetricsBindAddress: oc.MetricsAddr,
Port: 9443,
HealthProbeBindAddress: flagArgs.ProbeAddr,
LeaderElection: flagArgs.EnableLeaderElection,
HealthProbeBindAddress: oc.ProbeAddr,
LeaderElection: oc.EnableLeaderElection,
LeaderElectionID: "5c1e6227.vertica.com",
Namespace: watchNamespace,
CertDir: CertDir,
Expand All @@ -368,9 +227,9 @@ func main() {
os.Exit(1)
}

addReconcilersToManager(mgr, restCfg, flagArgs)
addReconcilersToManager(mgr, restCfg, oc)
ctx := ctrl.SetupSignalHandler()
if err := setupWebhook(ctx, mgr, restCfg, flagArgs); err != nil {
if err := setupWebhook(ctx, mgr, restCfg, oc); err != nil {
setupLog.Error(err, "unable to setup webhook")
os.Exit(1)
}
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ require (
k8s.io/apimachinery v0.24.2
k8s.io/client-go v0.24.2
sigs.k8s.io/controller-runtime v0.12.2
yunion.io/x/pkg v0.0.0-20210218105412-13a69f60034c
)

require (
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -987,5 +987,3 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZa
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
yunion.io/x/pkg v0.0.0-20210218105412-13a69f60034c h1:J/joqbA1N2mAlOl0Uqd4LpAq3+DK5aoFMdz+p9Ld7pQ=
yunion.io/x/pkg v0.0.0-20210218105412-13a69f60034c/go.mod h1:t6rEGG2sQ4J7DhFxSZVOTjNd0YO/KlfWQyK1W4tog+E=
18 changes: 14 additions & 4 deletions pkg/cmds/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type PodRunner interface {
ExecVSQL(ctx context.Context, podName types.NamespacedName, contName string, command ...string) (string, string, error)
ExecAdmintools(ctx context.Context, podName types.NamespacedName, contName string, command ...string) (string, string, error)
CopyToPod(ctx context.Context, podName types.NamespacedName, contName string, sourceFile string,
destFile string) (stdout, stderr string, err error)
destFile string, executeCmd ...string) (stdout, stderr string, err error)
}

type ClusterPodRunner struct {
Expand Down Expand Up @@ -103,16 +103,26 @@ func (c *ClusterPodRunner) ExecInPod(ctx context.Context, podName types.Namespac
return execOut.String(), execErr.String(), err
}

// CopyToPod copies a file into a container's pod
// CopyToPod copies a file into a container's pod. Optionally, it can also run a
// command after the copy has finished.
func (c *ClusterPodRunner) CopyToPod(ctx context.Context, podName types.NamespacedName,
contName string, sourceFile string, destFile string) (stdout, stderr string, err error) {
contName string, sourceFile string, destFile string, executeCmd ...string) (stdout, stderr string, err error) {
var (
execOut bytes.Buffer
execErr bytes.Buffer
)

// Copying a file is simply a cat of the contents from stdin
command := []string{"sh", "-c", fmt.Sprintf("cat > %s", destFile)}
var sb strings.Builder
sb.WriteString("cat > ")
sb.WriteString(destFile)
// If an execute command was given, we tack this on the end as something
// that will run after the file has been copied in.
if executeCmd != nil {
sb.WriteString(" && ")
sb.WriteString(strings.Join(executeCmd, " "))
}
command := []string{"sh", "-c", sb.String()}

inFile, err := os.Open(sourceFile)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions pkg/cmds/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,13 @@ func (f *FakePodRunner) ExecVSQL(ctx context.Context, podName types.NamespacedNa

// CopyToPod will mimic a real copy file into a pod
func (f *FakePodRunner) CopyToPod(ctx context.Context, podName types.NamespacedName,
contName string, sourceFile string, destFile string) (stdout, stderr string, err error) {
contName string, sourceFile string, destFile string, executeCmd ...string) (stdout, stderr string, err error) {
command := []string{"sh", "-c", fmt.Sprintf("cat > %s", destFile)}
return f.ExecInPod(ctx, podName, contName, command...)
sout, serr, err := f.ExecInPod(ctx, podName, contName, command...)
if executeCmd == nil {
return sout, serr, err
}
return f.ExecInPod(ctx, podName, contName, executeCmd...)
}

// FindCommands will search through the command history for any command that
Expand Down
Loading

0 comments on commit efb566b

Please sign in to comment.