From 0937a58ad78c3d1f21a580651d2331115be172ff Mon Sep 17 00:00:00 2001 From: Becca Petrin Date: Thu, 13 Feb 2020 09:56:29 -0800 Subject: [PATCH] Add Kubernetes service registration (#8249) --- command/commands.go | 4 +- command/server.go | 2 +- go.mod | 1 + sdk/helper/certutil/certutil_test.go | 25 + sdk/helper/certutil/helpers.go | 49 ++ .../kubernetes/client/client.go | 254 ++++++++++ .../kubernetes/client/client_test.go | 95 ++++ .../kubernetes/client/cmd/kubeclient/main.go | 80 ++++ .../kubernetes/client/config.go | 98 ++++ .../kubernetes/retry_handler.go | 131 +++++ .../kubernetes/retry_handler_test.go | 450 ++++++++++++++++++ .../kubernetes/service_registration.go | 185 +++++++ .../kubernetes/service_registration_test.go | 130 +++++ .../kubernetes/testing/README.md | 54 +++ serviceregistration/kubernetes/testing/ca.crt | 18 + .../kubernetes/testing/resp-get-pod.json | 120 +++++ .../kubernetes/testing/resp-not-found.json | 13 + .../kubernetes/testing/resp-update-pod.json | 123 +++++ .../kubernetes/testing/testserver.go | 243 ++++++++++ serviceregistration/kubernetes/testing/token | 1 + .../vault/sdk/helper/certutil/helpers.go | 49 ++ 21 files changed, 2123 insertions(+), 2 deletions(-) create mode 100644 serviceregistration/kubernetes/client/client.go create mode 100644 serviceregistration/kubernetes/client/client_test.go create mode 100644 serviceregistration/kubernetes/client/cmd/kubeclient/main.go create mode 100644 serviceregistration/kubernetes/client/config.go create mode 100644 serviceregistration/kubernetes/retry_handler.go create mode 100644 serviceregistration/kubernetes/retry_handler_test.go create mode 100644 serviceregistration/kubernetes/service_registration.go create mode 100644 serviceregistration/kubernetes/service_registration_test.go create mode 100644 serviceregistration/kubernetes/testing/README.md create mode 100644 serviceregistration/kubernetes/testing/ca.crt create mode 100644 serviceregistration/kubernetes/testing/resp-get-pod.json create mode 100644 serviceregistration/kubernetes/testing/resp-not-found.json create mode 100644 serviceregistration/kubernetes/testing/resp-update-pod.json create mode 100644 serviceregistration/kubernetes/testing/testserver.go create mode 100644 serviceregistration/kubernetes/testing/token diff --git a/command/commands.go b/command/commands.go index ec48232bbfd7..49d9c2339403 100644 --- a/command/commands.go +++ b/command/commands.go @@ -66,6 +66,7 @@ import ( sr "github.com/hashicorp/vault/serviceregistration" csr "github.com/hashicorp/vault/serviceregistration/consul" + ksr "github.com/hashicorp/vault/serviceregistration/kubernetes" ) const ( @@ -161,7 +162,8 @@ var ( } serviceRegistrations = map[string]sr.Factory{ - "consul": csr.NewServiceRegistration, + "consul": csr.NewServiceRegistration, + "kubernetes": ksr.NewServiceRegistration, } ) diff --git a/command/server.go b/command/server.go index 704fd5ba9230..2c191c8948e7 100644 --- a/command/server.go +++ b/command/server.go @@ -1279,7 +1279,7 @@ CLUSTER_SYNTHESIS_COMPLETE: // If ServiceRegistration is configured, then the backend must support HA isBackendHA := coreConfig.HAPhysical != nil && coreConfig.HAPhysical.HAEnabled() - if (coreConfig.ServiceRegistration != nil) && !isBackendHA { + if !c.flagDev && (coreConfig.ServiceRegistration != nil) && !isBackendHA { c.UI.Output("service_registration is configured, but storage does not support HA") return 1 } diff --git a/go.mod b/go.mod index 69b30dc500c3..f99f8ee28673 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/aws/aws-sdk-go v1.25.41 github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect + github.com/cenkalti/backoff v2.2.1+incompatible github.com/chrismalek/oktasdk-go v0.0.0-20181212195951-3430665dfaa0 github.com/cockroachdb/apd v1.1.0 // indirect github.com/cockroachdb/cockroach-go v0.0.0-20181001143604-e0a95dfd547c diff --git a/sdk/helper/certutil/certutil_test.go b/sdk/helper/certutil/certutil_test.go index 1de9c4ee886a..b42837214125 100644 --- a/sdk/helper/certutil/certutil_test.go +++ b/sdk/helper/certutil/certutil_test.go @@ -403,6 +403,31 @@ func TestTLSConfig(t *testing.T) { } } +func TestNewCertPool(t *testing.T) { + caExample := `-----BEGIN CERTIFICATE----- +MIIC5zCCAc+gAwIBAgIBATANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQDEwptaW5p +a3ViZUNBMB4XDTE5MTIxMDIzMDUxOVoXDTI5MTIwODIzMDUxOVowFTETMBEGA1UE +AxMKbWluaWt1YmVDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANFi +/RIdMHd865X6JygTb9riX01DA3QnR+RoXDXNnj8D3LziLG2n8ItXMJvWbU3sxxyy +nX9HxJ0SIeexj1cYzdQBtJDjO1/PeuKc4CZ7zCukCAtHz8mC7BDPOU7F7pggpcQ0 +/t/pa2m22hmCu8aDF9WlUYHtJpYATnI/A5vz/VFLR9daxmkl59Qo3oHITj7vAzSx +/75r9cibpQyJ+FhiHOZHQWYY2JYw2g4v5hm5hg5SFM9yFcZ75ISI9ebyFFIl9iBY +zAk9jqv1mXvLr0Q39AVwMTamvGuap1oocjM9NIhQvaFL/DNqF1ouDQjCf5u2imLc +TraO1/2KO8fqwOZCOrMCAwEAAaNCMEAwDgYDVR0PAQH/BAQDAgKkMB0GA1UdJQQW +MBQGCCsGAQUFBwMCBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3 +DQEBCwUAA4IBAQBtVZCwCPqUUUpIClAlE9nc2fo2bTs9gsjXRmqdQ5oaSomSLE93 +aJWYFuAhxPXtlApbLYZfW2m1sM3mTVQN60y0uE4e1jdSN1ErYQ9slJdYDAMaEmOh +iSexj+Nd1scUiMHV9lf3ps5J8sYeCpwZX3sPmw7lqZojTS12pANBDcigsaj5RRyN +9GyP3WkSQUsTpWlDb9Fd+KNdkCVw7nClIpBPA2KW4BQKw/rNSvOFD61mbzc89lo0 +Q9IFGQFFF8jO18lbyWqnRBGXcS4/G7jQ3S7C121d14YLUeAYOM7pJykI1g4CLx9y +vitin0L6nprauWkKO38XgM4T75qKZpqtiOcT +-----END CERTIFICATE----- +` + if _, err := NewCertPool(bytes.NewReader([]byte(caExample))); err != nil { + t.Fatal(err) + } +} + func refreshRSA8CertBundle() *CertBundle { initTest.Do(setCerts) return &CertBundle{ diff --git a/sdk/helper/certutil/helpers.go b/sdk/helper/certutil/helpers.go index 4a35f88dca5e..3bf6ae502b67 100644 --- a/sdk/helper/certutil/helpers.go +++ b/sdk/helper/certutil/helpers.go @@ -14,6 +14,8 @@ import ( "encoding/pem" "errors" "fmt" + "io" + "io/ioutil" "math/big" "net" "net/url" @@ -804,3 +806,50 @@ func SignCertificate(data *CreationBundle) (*ParsedCertBundle, error) { return result, nil } + +func NewCertPool(reader io.Reader) (*x509.CertPool, error) { + pemBlock, err := ioutil.ReadAll(reader) + if err != nil { + return nil, err + } + certs, err := parseCertsPEM(pemBlock) + if err != nil { + return nil, fmt.Errorf("error reading certs: %s", err) + } + pool := x509.NewCertPool() + for _, cert := range certs { + pool.AddCert(cert) + } + return pool, nil +} + +// parseCertsPEM returns the x509.Certificates contained in the given PEM-encoded byte array +// Returns an error if a certificate could not be parsed, or if the data does not contain any certificates +func parseCertsPEM(pemCerts []byte) ([]*x509.Certificate, error) { + ok := false + certs := []*x509.Certificate{} + for len(pemCerts) > 0 { + var block *pem.Block + block, pemCerts = pem.Decode(pemCerts) + if block == nil { + break + } + // Only use PEM "CERTIFICATE" blocks without extra headers + if block.Type != "CERTIFICATE" || len(block.Headers) != 0 { + continue + } + + cert, err := x509.ParseCertificate(block.Bytes) + if err != nil { + return certs, err + } + + certs = append(certs, cert) + ok = true + } + + if !ok { + return certs, errors.New("data does not contain any valid RSA or ECDSA certificates") + } + return certs, nil +} diff --git a/serviceregistration/kubernetes/client/client.go b/serviceregistration/kubernetes/client/client.go new file mode 100644 index 000000000000..3c5bbf587862 --- /dev/null +++ b/serviceregistration/kubernetes/client/client.go @@ -0,0 +1,254 @@ +package client + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "net/http" + "time" + + "github.com/hashicorp/go-cleanhttp" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-retryablehttp" +) + +var ( + // Retry configuration + RetryWaitMin = 500 * time.Millisecond + RetryWaitMax = 30 * time.Second + RetryMax = 10 + + // Standard errs + ErrNamespaceUnset = errors.New(`"namespace" is unset`) + ErrPodNameUnset = errors.New(`"podName" is unset`) + ErrNotInCluster = errors.New("unable to load in-cluster configuration, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined") +) + +// New instantiates a Client. The stopCh is used for exiting retry loops +// when closed. +func New(logger hclog.Logger, stopCh <-chan struct{}) (*Client, error) { + config, err := inClusterConfig() + if err != nil { + return nil, err + } + return &Client{ + logger: logger, + config: config, + stopCh: stopCh, + }, nil +} + +// Client is a minimal Kubernetes client. We rolled our own because the existing +// Kubernetes client-go library available externally has a high number of dependencies +// and we thought it wasn't worth it for only two API calls. If at some point they break +// the client into smaller modules, or if we add quite a few methods to this client, it may +// be worthwhile to revisit that decision. +type Client struct { + logger hclog.Logger + config *Config + stopCh <-chan struct{} +} + +// GetPod gets a pod from the Kubernetes API. +func (c *Client) GetPod(namespace, podName string) (*Pod, error) { + endpoint := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", namespace, podName) + method := http.MethodGet + + // Validate that we received required parameters. + if namespace == "" { + return nil, ErrNamespaceUnset + } + if podName == "" { + return nil, ErrPodNameUnset + } + + req, err := http.NewRequest(method, c.config.Host+endpoint, nil) + if err != nil { + return nil, err + } + pod := &Pod{} + if err := c.do(req, pod); err != nil { + return nil, err + } + return pod, nil +} + +// PatchPod updates the pod's tags to the given ones. +// It does so non-destructively, or in other words, without tearing down +// the pod. +func (c *Client) PatchPod(namespace, podName string, patches ...*Patch) error { + endpoint := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", namespace, podName) + method := http.MethodPatch + + // Validate that we received required parameters. + if namespace == "" { + return ErrNamespaceUnset + } + if podName == "" { + return ErrPodNameUnset + } + if len(patches) == 0 { + // No work to perform. + return nil + } + + var jsonPatches []map[string]interface{} + for _, patch := range patches { + if patch.Operation == Unset { + return errors.New("patch operation must be set") + } + jsonPatches = append(jsonPatches, map[string]interface{}{ + "op": patch.Operation, + "path": patch.Path, + "value": patch.Value, + }) + } + body, err := json.Marshal(jsonPatches) + if err != nil { + return err + } + req, err := http.NewRequest(method, c.config.Host+endpoint, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json-patch+json") + return c.do(req, nil) +} + +// do executes the given request, retrying if necessary. +func (c *Client) do(req *http.Request, ptrToReturnObj interface{}) error { + // Finish setting up a valid request. + retryableReq, err := retryablehttp.FromRequest(req) + if err != nil { + return err + } + + // Build a context that will call the cancelFunc when we receive + // a stop from our stopChan. This allows us to exit from our retry + // loop during a shutdown, rather than hanging. + ctx, cancelFunc := context.WithCancel(context.Background()) + go func(stopCh <-chan struct{}) { + <-stopCh + cancelFunc() + }(c.stopCh) + retryableReq.WithContext(ctx) + + retryableReq.Header.Set("Authorization", "Bearer "+c.config.BearerToken) + retryableReq.Header.Set("Accept", "application/json") + + client := &retryablehttp.Client{ + HTTPClient: cleanhttp.DefaultClient(), + RetryWaitMin: RetryWaitMin, + RetryWaitMax: RetryWaitMax, + RetryMax: RetryMax, + CheckRetry: c.getCheckRetry(req), + Backoff: retryablehttp.DefaultBackoff, + } + client.HTTPClient.Transport = &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: c.config.CACertPool, + }, + } + + // Execute and retry the request. This client comes with exponential backoff and + // jitter already rolled in. + resp, err := client.Do(retryableReq) + if err != nil { + return err + } + defer func() { + if err := resp.Body.Close(); err != nil { + if c.logger.IsWarn() { + // Failing to close response bodies can present as a memory leak so it's + // important to surface it. + c.logger.Warn(fmt.Sprintf("unable to close response body: %s", err)) + } + } + }() + + // If we're not supposed to read out the body, we have nothing further + // to do here. + if ptrToReturnObj == nil { + return nil + } + + // Attempt to read out the body into the given return object. + return json.NewDecoder(resp.Body).Decode(ptrToReturnObj) +} + +func (c *Client) getCheckRetry(req *http.Request) retryablehttp.CheckRetry { + return func(ctx context.Context, resp *http.Response, err error) (bool, error) { + if resp == nil { + return true, fmt.Errorf("nil response: %s", req.URL.RequestURI()) + } + switch resp.StatusCode { + case 200, 201, 202, 204: + // Success. + return false, nil + case 401, 403: + // Perhaps the token from our bearer token file has been refreshed. + config, err := inClusterConfig() + if err != nil { + return false, err + } + if config.BearerToken == c.config.BearerToken { + // It's the same token. + return false, fmt.Errorf("bad status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode)) + } + c.config = config + // Continue to try again, but return the error too in case the caller would rather read it out. + return true, fmt.Errorf("bad status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode)) + case 404: + return false, &ErrNotFound{debuggingInfo: sanitizedDebuggingInfo(req, resp.StatusCode)} + case 500, 502, 503, 504: + // Could be transient. + return true, fmt.Errorf("unexpected status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode)) + } + // Unexpected. + return false, fmt.Errorf("unexpected status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode)) + } +} + +type Pod struct { + Metadata *Metadata `json:"metadata,omitempty"` +} + +type Metadata struct { + Name string `json:"name,omitempty"` + + // This map will be nil if no "labels" key was provided. + // It will be populated but have a length of zero if the + // key was provided, but no values. + Labels map[string]string `json:"labels,omitempty"` +} + +type PatchOperation string + +const ( + Unset PatchOperation = "unset" + Add = "add" + Replace = "replace" +) + +type Patch struct { + Operation PatchOperation + Path string + Value interface{} +} + +type ErrNotFound struct { + debuggingInfo string +} + +func (e *ErrNotFound) Error() string { + return e.debuggingInfo +} + +// sanitizedDebuggingInfo provides a returnable string that can be used for debugging. This is intentionally somewhat vague +// because we don't want to leak secrets that may be in a request or response body. +func sanitizedDebuggingInfo(req *http.Request, respStatus int) string { + return fmt.Sprintf("req method: %s, req url: %s, resp statuscode: %d", req.Method, req.URL, respStatus) +} diff --git a/serviceregistration/kubernetes/client/client_test.go b/serviceregistration/kubernetes/client/client_test.go new file mode 100644 index 000000000000..7196baaa8c7e --- /dev/null +++ b/serviceregistration/kubernetes/client/client_test.go @@ -0,0 +1,95 @@ +package client + +import ( + "os" + "testing" + + "github.com/hashicorp/go-hclog" + kubetest "github.com/hashicorp/vault/serviceregistration/kubernetes/testing" +) + +func TestClient(t *testing.T) { + testState, testConf, closeFunc := kubetest.Server(t) + defer closeFunc() + + Scheme = testConf.ClientScheme + TokenFile = testConf.PathToTokenFile + RootCAFile = testConf.PathToRootCAFile + if err := os.Setenv(EnvVarKubernetesServiceHost, testConf.ServiceHost); err != nil { + t.Fatal(err) + } + if err := os.Setenv(EnvVarKubernetesServicePort, testConf.ServicePort); err != nil { + t.Fatal(err) + } + + client, err := New(hclog.Default(), make(chan struct{})) + if err != nil { + t.Fatal(err) + } + e := &env{ + client: client, + testState: testState, + } + e.TestGetPod(t) + e.TestGetPodNotFound(t) + e.TestUpdatePodTags(t) + e.TestUpdatePodTagsNotFound(t) +} + +type env struct { + client *Client + testState *kubetest.State +} + +func (e *env) TestGetPod(t *testing.T) { + pod, err := e.client.GetPod(kubetest.ExpectedNamespace, kubetest.ExpectedPodName) + if err != nil { + t.Fatal(err) + } + if pod.Metadata.Name != "shell-demo" { + t.Fatalf("expected %q but received %q", "shell-demo", pod.Metadata.Name) + } +} + +func (e *env) TestGetPodNotFound(t *testing.T) { + _, err := e.client.GetPod(kubetest.ExpectedNamespace, "no-exist") + if err == nil { + t.Fatal("expected error because pod is unfound") + } + if _, ok := err.(*ErrNotFound); !ok { + t.Fatalf("expected *ErrNotFound but received %T", err) + } +} + +func (e *env) TestUpdatePodTags(t *testing.T) { + if err := e.client.PatchPod(kubetest.ExpectedNamespace, kubetest.ExpectedPodName, &Patch{ + Operation: Add, + Path: "/metadata/labels/fizz", + Value: "buzz", + }); err != nil { + t.Fatal(err) + } + if e.testState.NumPatches() != 1 { + t.Fatalf("expected 1 label but received %+v", e.testState) + } + if e.testState.Get("/metadata/labels/fizz")["value"] != "buzz" { + t.Fatalf("expected buzz but received %q", e.testState.Get("fizz")["value"]) + } + if e.testState.Get("/metadata/labels/fizz")["op"] != "add" { + t.Fatalf("expected add but received %q", e.testState.Get("fizz")["op"]) + } +} + +func (e *env) TestUpdatePodTagsNotFound(t *testing.T) { + err := e.client.PatchPod(kubetest.ExpectedNamespace, "no-exist", &Patch{ + Operation: Add, + Path: "/metadata/labels/fizz", + Value: "buzz", + }) + if err == nil { + t.Fatal("expected error because pod is unfound") + } + if _, ok := err.(*ErrNotFound); !ok { + t.Fatalf("expected *ErrNotFound but received %T", err) + } +} diff --git a/serviceregistration/kubernetes/client/cmd/kubeclient/main.go b/serviceregistration/kubernetes/client/cmd/kubeclient/main.go new file mode 100644 index 000000000000..536b63f22ee4 --- /dev/null +++ b/serviceregistration/kubernetes/client/cmd/kubeclient/main.go @@ -0,0 +1,80 @@ +package main + +// This code builds a minimal binary of the lightweight kubernetes +// client and exposes it for manual testing. +// The intention is that the binary can be built and dropped into +// a Kube environment like this: +// https://kubernetes.io/docs/tasks/debug-application-cluster/get-shell-running-container/ +// Then, commands can be run to test its API calls. +// The above commands are intended to be run inside an instance of +// minikube that has been started. +// After building this binary, place it in the container like this: +// $ kubectl cp kubeclient /shell-demo:/ +// At first you may get 403's, which can be resolved using this: +// https://github.com/fabric8io/fabric8/issues/6840#issuecomment-307560275 +// +// Example calls: +// ./kubeclient -call='get-pod' -namespace='default' -pod-name='shell-demo' +// ./kubeclient -call='patch-pod' -namespace='default' -pod-name='shell-demo' -patches='/metadata/labels/fizz:buzz,/metadata/labels/foo:bar' + +import ( + "encoding/json" + "flag" + "fmt" + "strings" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/vault/serviceregistration/kubernetes/client" +) + +var callToMake string +var patchesToAdd string +var namespace string +var podName string + +func init() { + flag.StringVar(&callToMake, "call", "", `the call to make: 'get-pod' or 'patch-pod'`) + flag.StringVar(&patchesToAdd, "patches", "", `if call is "patch-pod", the patches to do like so: "/metadata/labels/fizz:buzz,/metadata/labels/foo:bar"`) + flag.StringVar(&namespace, "namespace", "", "the namespace to use") + flag.StringVar(&podName, "pod-name", "", "the pod name to use") +} + +func main() { + flag.Parse() + + c, err := client.New(hclog.Default(), make(chan struct{})) + if err != nil { + panic(err) + } + + switch callToMake { + case "get-pod": + pod, err := c.GetPod(namespace, podName) + if err != nil { + panic(err) + } + b, _ := json.Marshal(pod) + fmt.Printf("pod: %s\n", b) + return + case "patch-pod": + patchPairs := strings.Split(patchesToAdd, ",") + var patches []*client.Patch + for _, patchPair := range patchPairs { + fields := strings.Split(patchPair, ":") + if len(fields) != 2 { + panic(fmt.Errorf("unable to split %s from selectors provided of %s", fields, patchesToAdd)) + } + patches = append(patches, &client.Patch{ + Operation: client.Replace, + Path: fields[0], + Value: fields[1], + }) + } + if err := c.PatchPod(namespace, podName, patches...); err != nil { + panic(err) + } + return + default: + panic(fmt.Errorf(`unsupported call provided: %q`, callToMake)) + } +} diff --git a/serviceregistration/kubernetes/client/config.go b/serviceregistration/kubernetes/client/config.go new file mode 100644 index 000000000000..4e6a0f45848c --- /dev/null +++ b/serviceregistration/kubernetes/client/config.go @@ -0,0 +1,98 @@ +package client + +import ( + "bytes" + "crypto/x509" + "io/ioutil" + "net" + "os" + + "github.com/hashicorp/vault/sdk/helper/certutil" +) + +const ( + // These environment variables aren't set by default. + // Vault may read them in if set through these environment variables. + // Example here: + // https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information/ + // The client itself does nothing directly with these variables, it's + // up to the caller. However, they live here so they'll be consistently + // named should the client ever be reused. + // We generally recommend preferring environmental settings over configured + // ones, allowing settings from the Downward API to override hard-coded + // ones. + EnvVarKubernetesNamespace = "VAULT_K8S_NAMESPACE" + EnvVarKubernetesPodName = "VAULT_K8S_POD_NAME" + + // The service host and port environment variables are + // set by default inside a Kubernetes environment. + EnvVarKubernetesServiceHost = "KUBERNETES_SERVICE_HOST" + EnvVarKubernetesServicePort = "KUBERNETES_SERVICE_PORT" +) + +var ( + // These are presented as variables so they can be updated + // to point at test fixtures if needed. They aren't passed + // into inClusterConfig to avoid dependency injection. + Scheme = "https://" + TokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token" + RootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" +) + +// inClusterConfig returns a config object which uses the service account +// kubernetes gives to services. It's intended for clients that expect to be +// running inside a service running on kubernetes. It will return ErrNotInCluster +// if called from a process not running in a kubernetes environment. +// inClusterConfig is based on this: +// https://github.com/kubernetes/client-go/blob/a56922badea0f2a91771411eaa1173c9e9243908/rest/config.go#L451 +func inClusterConfig() (*Config, error) { + host, port := os.Getenv(EnvVarKubernetesServiceHost), os.Getenv(EnvVarKubernetesServicePort) + if len(host) == 0 || len(port) == 0 { + return nil, ErrNotInCluster + } + + token, err := ioutil.ReadFile(TokenFile) + if err != nil { + return nil, err + } + + caBytes, err := ioutil.ReadFile(RootCAFile) + if err != nil { + return nil, err + } + pool, err := certutil.NewCertPool(bytes.NewReader(caBytes)) + if err != nil { + return nil, err + } + return &Config{ + Host: Scheme + net.JoinHostPort(host, port), + CACertPool: pool, + BearerToken: string(token), + BearerTokenFile: TokenFile, + }, nil +} + +// This config is based on the one returned here: +// https://github.com/kubernetes/client-go/blob/a56922badea0f2a91771411eaa1173c9e9243908/rest/config.go#L451 +// It is pared down to the absolute minimum fields used by this code. +// The CACertPool is promoted to the top level from being originally on the TLSClientConfig +// because it is the only parameter of the TLSClientConfig used by this code. +// Also, it made more sense to simply reuse the pool rather than holding raw values +// and parsing it repeatedly. +type Config struct { + CACertPool *x509.CertPool + + // Host must be a host string, a host:port pair, or a URL to the base of the apiserver. + // If a URL is given then the (optional) Path of that URL represents a prefix that must + // be appended to all request URIs used to access the apiserver. This allows a frontend + // proxy to easily relocate all of the apiserver endpoints. + Host string + + // Server requires Bearer authentication. This client will not attempt to use + // refresh tokens for an OAuth2 flow. + BearerToken string + + // Path to a file containing a BearerToken. + // If set, checks for a new token in the case of authorization errors. + BearerTokenFile string +} diff --git a/serviceregistration/kubernetes/retry_handler.go b/serviceregistration/kubernetes/retry_handler.go new file mode 100644 index 000000000000..67381bc734d5 --- /dev/null +++ b/serviceregistration/kubernetes/retry_handler.go @@ -0,0 +1,131 @@ +package kubernetes + +import ( + "fmt" + "sync" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/vault/serviceregistration/kubernetes/client" +) + +// How often to retry sending a state update if it fails. +var retryFreq = 5 * time.Second + +// retryHandler executes retries. +// It is thread-safe. +type retryHandler struct { + // These don't need a mutex because they're never mutated. + logger hclog.Logger + namespace, podName string + + // To synchronize setInitialState and patchesToRetry. + lock sync.Mutex + + // setInitialState will be nil if this has been done successfully. + // It must be done before any patches are retried. + setInitialState func() error + + // The map holds the path to the label being updated. It will only either + // not hold a particular label, or hold _the last_ state we were aware of. + // These should only be updated after initial state has been set. + patchesToRetry map[string]*client.Patch +} + +func (r *retryHandler) SetInitialState(setInitialState func() error) { + r.lock.Lock() + defer r.lock.Unlock() + if err := setInitialState(); err != nil { + if r.logger.IsWarn() { + r.logger.Warn(fmt.Sprintf("unable to set initial state due to %s, will retry", err.Error())) + } + r.setInitialState = setInitialState + } +} + +// Run must be called for retries to be started. +func (r *retryHandler) Run(shutdownCh <-chan struct{}, wait *sync.WaitGroup, c *client.Client) { + // Run this in a go func so this call doesn't block. + go func() { + // Make sure Vault will give us time to finish up here. + wait.Add(1) + defer wait.Done() + + retry := time.NewTicker(retryFreq) + defer retry.Stop() + for { + select { + case <-shutdownCh: + return + case <-retry.C: + r.retry(c) + } + } + }() +} + +// Notify adds a patch to be retried until it's either completed without +// error, or no longer needed. +func (r *retryHandler) Notify(c *client.Client, patch *client.Patch) { + r.lock.Lock() + defer r.lock.Unlock() + + // Initial state must be set first, or subsequent notifications we've + // received could get smashed by a late-arriving initial state. + // We will store this to retry it when appropriate. + if r.setInitialState != nil { + if r.logger.IsWarn() { + r.logger.Warn(fmt.Sprintf("cannot notify of present state for %s because initial state is unset", patch.Path)) + } + r.patchesToRetry[patch.Path] = patch + return + } + + // Initial state has been sent, so it's OK to attempt a patch immediately. + if err := c.PatchPod(r.namespace, r.podName, patch); err != nil { + if r.logger.IsWarn() { + r.logger.Warn(fmt.Sprintf("unable to update state for %s due to %s, will retry", patch.Path, err.Error())) + } + r.patchesToRetry[patch.Path] = patch + } +} + +func (r *retryHandler) retry(c *client.Client) { + r.lock.Lock() + defer r.lock.Unlock() + + // Initial state must be set first, or subsequent notifications we've + // received could get smashed by a late-arriving initial state. + if r.setInitialState != nil { + if err := r.setInitialState(); err != nil { + if r.logger.IsWarn() { + r.logger.Warn(fmt.Sprintf("unable to set initial state due to %s, will retry", err.Error())) + } + // On failure, we leave the initial state func populated for + // the next retry. + return + } + // On success, we set it to nil and allow the logic to continue. + r.setInitialState = nil + } + + if len(r.patchesToRetry) == 0 { + // Nothing further to do here. + return + } + + patches := make([]*client.Patch, len(r.patchesToRetry)) + i := 0 + for _, patch := range r.patchesToRetry { + patches[i] = patch + i++ + } + + if err := c.PatchPod(r.namespace, r.podName, patches...); err != nil { + if r.logger.IsWarn() { + r.logger.Warn(fmt.Sprintf("unable to update state for due to %s, will retry", err.Error())) + } + return + } + r.patchesToRetry = make(map[string]*client.Patch) +} diff --git a/serviceregistration/kubernetes/retry_handler_test.go b/serviceregistration/kubernetes/retry_handler_test.go new file mode 100644 index 000000000000..441cbaf14935 --- /dev/null +++ b/serviceregistration/kubernetes/retry_handler_test.go @@ -0,0 +1,450 @@ +package kubernetes + +import ( + "os" + "sync" + "testing" + "time" + + "github.com/hashicorp/go-hclog" + sr "github.com/hashicorp/vault/serviceregistration" + "github.com/hashicorp/vault/serviceregistration/kubernetes/client" + kubetest "github.com/hashicorp/vault/serviceregistration/kubernetes/testing" +) + +func TestRetryHandlerSimple(t *testing.T) { + if testing.Short() { + t.Skip("skipping because this test takes 10-15 seconds") + } + + testState, testConf, closeFunc := kubetest.Server(t) + defer closeFunc() + + client.Scheme = testConf.ClientScheme + client.TokenFile = testConf.PathToTokenFile + client.RootCAFile = testConf.PathToRootCAFile + if err := os.Setenv(client.EnvVarKubernetesServiceHost, testConf.ServiceHost); err != nil { + t.Fatal(err) + } + if err := os.Setenv(client.EnvVarKubernetesServicePort, testConf.ServicePort); err != nil { + t.Fatal(err) + } + + logger := hclog.NewNullLogger() + shutdownCh := make(chan struct{}) + wait := &sync.WaitGroup{} + testPatch := &client.Patch{ + Operation: client.Add, + Path: "patch-path", + Value: "true", + } + + c, err := client.New(logger, shutdownCh) + if err != nil { + t.Fatal(err) + } + + r := &retryHandler{ + logger: logger, + namespace: kubetest.ExpectedNamespace, + podName: kubetest.ExpectedPodName, + patchesToRetry: make(map[string]*client.Patch), + } + r.Run(shutdownCh, wait, c) + + if testState.NumPatches() != 0 { + t.Fatal("expected no current patches") + } + r.Notify(c, testPatch) + // Wait ample until the next try should have occurred. + <-time.NewTimer(retryFreq * 2).C + if testState.NumPatches() != 1 { + t.Fatal("expected 1 patch") + } +} + +func TestRetryHandlerAdd(t *testing.T) { + _, testConf, closeFunc := kubetest.Server(t) + defer closeFunc() + + client.Scheme = testConf.ClientScheme + client.TokenFile = testConf.PathToTokenFile + client.RootCAFile = testConf.PathToRootCAFile + if err := os.Setenv(client.EnvVarKubernetesServiceHost, testConf.ServiceHost); err != nil { + t.Fatal(err) + } + if err := os.Setenv(client.EnvVarKubernetesServicePort, testConf.ServicePort); err != nil { + t.Fatal(err) + } + + logger := hclog.NewNullLogger() + shutdownCh := make(chan struct{}) + c, err := client.New(logger, shutdownCh) + if err != nil { + t.Fatal(err) + } + + r := &retryHandler{ + logger: hclog.NewNullLogger(), + namespace: "some-namespace", + podName: "some-pod-name", + patchesToRetry: make(map[string]*client.Patch), + } + + testPatch1 := &client.Patch{ + Operation: client.Add, + Path: "one", + Value: "true", + } + testPatch2 := &client.Patch{ + Operation: client.Add, + Path: "two", + Value: "true", + } + testPatch3 := &client.Patch{ + Operation: client.Add, + Path: "three", + Value: "true", + } + testPatch4 := &client.Patch{ + Operation: client.Add, + Path: "four", + Value: "true", + } + + // Should be able to add all 4 patches. + r.Notify(c, testPatch1) + if len(r.patchesToRetry) != 1 { + t.Fatal("expected 1 patch") + } + + r.Notify(c, testPatch2) + if len(r.patchesToRetry) != 2 { + t.Fatal("expected 2 patches") + } + + r.Notify(c, testPatch3) + if len(r.patchesToRetry) != 3 { + t.Fatal("expected 3 patches") + } + + r.Notify(c, testPatch4) + if len(r.patchesToRetry) != 4 { + t.Fatal("expected 4 patches") + } + + // Adding a dupe should result in no change. + r.Notify(c, testPatch4) + if len(r.patchesToRetry) != 4 { + t.Fatal("expected 4 patches") + } + + // Adding a reversion should result in its twin being subtracted. + r.Notify(c, &client.Patch{ + Operation: client.Add, + Path: "four", + Value: "false", + }) + if len(r.patchesToRetry) != 4 { + t.Fatal("expected 4 patches") + } + + r.Notify(c, &client.Patch{ + Operation: client.Add, + Path: "three", + Value: "false", + }) + if len(r.patchesToRetry) != 4 { + t.Fatal("expected 4 patches") + } + + r.Notify(c, &client.Patch{ + Operation: client.Add, + Path: "two", + Value: "false", + }) + if len(r.patchesToRetry) != 4 { + t.Fatal("expected 4 patches") + } + + r.Notify(c, &client.Patch{ + Operation: client.Add, + Path: "one", + Value: "false", + }) + if len(r.patchesToRetry) != 4 { + t.Fatal("expected 4 patches") + } +} + +// This is meant to be run with the -race flag on. +func TestRetryHandlerRacesAndDeadlocks(t *testing.T) { + _, testConf, closeFunc := kubetest.Server(t) + defer closeFunc() + + client.Scheme = testConf.ClientScheme + client.TokenFile = testConf.PathToTokenFile + client.RootCAFile = testConf.PathToRootCAFile + if err := os.Setenv(client.EnvVarKubernetesServiceHost, testConf.ServiceHost); err != nil { + t.Fatal(err) + } + if err := os.Setenv(client.EnvVarKubernetesServicePort, testConf.ServicePort); err != nil { + t.Fatal(err) + } + + logger := hclog.NewNullLogger() + shutdownCh := make(chan struct{}) + wait := &sync.WaitGroup{} + testPatch := &client.Patch{ + Operation: client.Add, + Path: "patch-path", + Value: "true", + } + + c, err := client.New(logger, shutdownCh) + if err != nil { + t.Fatal(err) + } + + r := &retryHandler{ + logger: logger, + namespace: kubetest.ExpectedNamespace, + podName: kubetest.ExpectedPodName, + patchesToRetry: make(map[string]*client.Patch), + } + + // Now hit it as quickly as possible to see if we can produce + // races or deadlocks. + start := make(chan struct{}) + done := make(chan bool) + numRoutines := 100 + for i := 0; i < numRoutines; i++ { + go func() { + <-start + r.Notify(c, testPatch) + done <- true + }() + go func() { + <-start + r.Run(shutdownCh, wait, c) + done <- true + }() + go func() { + <-start + r.SetInitialState(func() error { + c.GetPod(kubetest.ExpectedNamespace, kubetest.ExpectedPodName) + return nil + }) + done <- true + }() + } + close(start) + + // Allow up to 5 seconds for everything to finish. + timer := time.NewTimer(5 * time.Second) + for i := 0; i < numRoutines*3; i++ { + select { + case <-timer.C: + t.Fatal("test took too long to complete, check for deadlock") + case <-done: + } + } +} + +// In this test, the API server sends bad responses for 5 seconds, +// then sends good responses, and we make sure we get the expected behavior. +func TestRetryHandlerAPIConnectivityProblemsInitialState(t *testing.T) { + if testing.Short() { + t.Skip() + } + + testState, testConf, closeFunc := kubetest.Server(t) + defer closeFunc() + kubetest.ReturnGatewayTimeouts.Store(true) + + client.Scheme = testConf.ClientScheme + client.TokenFile = testConf.PathToTokenFile + client.RootCAFile = testConf.PathToRootCAFile + client.RetryMax = 0 + if err := os.Setenv(client.EnvVarKubernetesServiceHost, testConf.ServiceHost); err != nil { + t.Fatal(err) + } + if err := os.Setenv(client.EnvVarKubernetesServicePort, testConf.ServicePort); err != nil { + t.Fatal(err) + } + + shutdownCh := make(chan struct{}) + wait := &sync.WaitGroup{} + reg, err := NewServiceRegistration(map[string]string{ + "namespace": kubetest.ExpectedNamespace, + "pod_name": kubetest.ExpectedPodName, + }, hclog.NewNullLogger(), sr.State{ + VaultVersion: "vault-version", + IsInitialized: true, + IsSealed: true, + IsActive: true, + IsPerformanceStandby: true, + }, "") + if err != nil { + t.Fatal(err) + } + if err := reg.Run(shutdownCh, wait); err != nil { + t.Fatal(err) + } + + // At this point, since the initial state can't be set, + // remotely we should have false for all these labels. + patch := testState.Get(pathToLabels + labelVaultVersion) + if patch != nil { + t.Fatal("expected no value") + } + patch = testState.Get(pathToLabels + labelActive) + if patch != nil { + t.Fatal("expected no value") + } + patch = testState.Get(pathToLabels + labelSealed) + if patch != nil { + t.Fatal("expected no value") + } + patch = testState.Get(pathToLabels + labelPerfStandby) + if patch != nil { + t.Fatal("expected no value") + } + patch = testState.Get(pathToLabels + labelInitialized) + if patch != nil { + t.Fatal("expected no value") + } + + kubetest.ReturnGatewayTimeouts.Store(false) + + // Now we need to wait to give the retry handler + // a chance to update these values. + time.Sleep(retryFreq + time.Second) + val := testState.Get(pathToLabels + labelVaultVersion)["value"] + if val != "vault-version" { + t.Fatal("expected vault-version") + } + val = testState.Get(pathToLabels + labelActive)["value"] + if val != "true" { + t.Fatal("expected true") + } + val = testState.Get(pathToLabels + labelSealed)["value"] + if val != "true" { + t.Fatal("expected true") + } + val = testState.Get(pathToLabels + labelPerfStandby)["value"] + if val != "true" { + t.Fatal("expected true") + } + val = testState.Get(pathToLabels + labelInitialized)["value"] + if val != "true" { + t.Fatal("expected true") + } +} + +// In this test, the API server sends bad responses for 5 seconds, +// then sends good responses, and we make sure we get the expected behavior. +func TestRetryHandlerAPIConnectivityProblemsNotifications(t *testing.T) { + if testing.Short() { + t.Skip() + } + + testState, testConf, closeFunc := kubetest.Server(t) + defer closeFunc() + kubetest.ReturnGatewayTimeouts.Store(true) + + client.Scheme = testConf.ClientScheme + client.TokenFile = testConf.PathToTokenFile + client.RootCAFile = testConf.PathToRootCAFile + client.RetryMax = 0 + if err := os.Setenv(client.EnvVarKubernetesServiceHost, testConf.ServiceHost); err != nil { + t.Fatal(err) + } + if err := os.Setenv(client.EnvVarKubernetesServicePort, testConf.ServicePort); err != nil { + t.Fatal(err) + } + + shutdownCh := make(chan struct{}) + wait := &sync.WaitGroup{} + reg, err := NewServiceRegistration(map[string]string{ + "namespace": kubetest.ExpectedNamespace, + "pod_name": kubetest.ExpectedPodName, + }, hclog.NewNullLogger(), sr.State{ + VaultVersion: "vault-version", + IsInitialized: false, + IsSealed: false, + IsActive: false, + IsPerformanceStandby: false, + }, "") + if err != nil { + t.Fatal(err) + } + if err := reg.Run(shutdownCh, wait); err != nil { + t.Fatal(err) + } + + if err := reg.NotifyActiveStateChange(true); err != nil { + t.Fatal(err) + } + if err := reg.NotifyInitializedStateChange(true); err != nil { + t.Fatal(err) + } + if err := reg.NotifyPerformanceStandbyStateChange(true); err != nil { + t.Fatal(err) + } + if err := reg.NotifySealedStateChange(true); err != nil { + t.Fatal(err) + } + + // At this point, since the initial state can't be set, + // remotely we should have false for all these labels. + patch := testState.Get(pathToLabels + labelVaultVersion) + if patch != nil { + t.Fatal("expected no value") + } + patch = testState.Get(pathToLabels + labelActive) + if patch != nil { + t.Fatal("expected no value") + } + patch = testState.Get(pathToLabels + labelSealed) + if patch != nil { + t.Fatal("expected no value") + } + patch = testState.Get(pathToLabels + labelPerfStandby) + if patch != nil { + t.Fatal("expected no value") + } + patch = testState.Get(pathToLabels + labelInitialized) + if patch != nil { + t.Fatal("expected no value") + } + + kubetest.ReturnGatewayTimeouts.Store(false) + + // Now we need to wait to give the retry handler + // a chance to update these values. + time.Sleep(retryFreq + time.Second) + + // They should be "true" if the Notifications were set after the + // initial state. + val := testState.Get(pathToLabels + labelVaultVersion)["value"] + if val != "vault-version" { + t.Fatal("expected vault-version") + } + val = testState.Get(pathToLabels + labelActive)["value"] + if val != "true" { + t.Fatal("expected true") + } + val = testState.Get(pathToLabels + labelSealed)["value"] + if val != "true" { + t.Fatal("expected true") + } + val = testState.Get(pathToLabels + labelPerfStandby)["value"] + if val != "true" { + t.Fatal("expected true") + } + val = testState.Get(pathToLabels + labelInitialized)["value"] + if val != "true" { + t.Fatal("expected true") + } +} diff --git a/serviceregistration/kubernetes/service_registration.go b/serviceregistration/kubernetes/service_registration.go new file mode 100644 index 000000000000..0fbdd659897f --- /dev/null +++ b/serviceregistration/kubernetes/service_registration.go @@ -0,0 +1,185 @@ +package kubernetes + +import ( + "fmt" + "os" + "strconv" + "sync" + + "github.com/hashicorp/go-hclog" + sr "github.com/hashicorp/vault/serviceregistration" + "github.com/hashicorp/vault/serviceregistration/kubernetes/client" +) + +const ( + // Labels are placed in a pod's metadata. + labelVaultVersion = "vault-version" + labelActive = "vault-active" + labelSealed = "vault-sealed" + labelPerfStandby = "vault-perf-standby" + labelInitialized = "vault-initialized" + + // This is the path to where these labels are applied. + pathToLabels = "/metadata/labels/" +) + +func NewServiceRegistration(config map[string]string, logger hclog.Logger, state sr.State, _ string) (sr.ServiceRegistration, error) { + namespace, err := getRequiredField(logger, config, client.EnvVarKubernetesNamespace, "namespace") + if err != nil { + return nil, err + } + podName, err := getRequiredField(logger, config, client.EnvVarKubernetesPodName, "pod_name") + if err != nil { + return nil, err + } + return &serviceRegistration{ + logger: logger, + namespace: namespace, + podName: podName, + initialState: state, + retryHandler: &retryHandler{ + logger: logger, + namespace: namespace, + podName: podName, + patchesToRetry: make(map[string]*client.Patch), + }, + }, nil +} + +type serviceRegistration struct { + logger hclog.Logger + namespace, podName string + client *client.Client + initialState sr.State + retryHandler *retryHandler +} + +func (r *serviceRegistration) Run(shutdownCh <-chan struct{}, wait *sync.WaitGroup) error { + c, err := client.New(r.logger, shutdownCh) + if err != nil { + return err + } + r.client = c + + // Now that we've populated the client, we can begin using the retry handler. + r.retryHandler.SetInitialState(r.setInitialState) + r.retryHandler.Run(shutdownCh, wait, c) + return nil +} + +func (r *serviceRegistration) setInitialState() error { + // Verify that the pod exists and our configuration looks good. + pod, err := r.client.GetPod(r.namespace, r.podName) + if err != nil { + return err + } + + // Now to initially label our pod. + if pod.Metadata == nil { + // This should never happen IRL, just being defensive. + return fmt.Errorf("no pod metadata on %+v", pod) + } + if pod.Metadata.Labels == nil { + // Notify the labels field, and the labels as part of that one call. + // The reason we must take a different approach to adding them is discussed here: + // https://stackoverflow.com/questions/57480205/error-while-applying-json-patch-to-kubernetes-custom-resource + if err := r.client.PatchPod(r.namespace, r.podName, &client.Patch{ + Operation: client.Add, + Path: "/metadata/labels", + Value: map[string]string{ + labelVaultVersion: r.initialState.VaultVersion, + labelActive: strconv.FormatBool(r.initialState.IsActive), + labelSealed: strconv.FormatBool(r.initialState.IsSealed), + labelPerfStandby: strconv.FormatBool(r.initialState.IsPerformanceStandby), + labelInitialized: strconv.FormatBool(r.initialState.IsInitialized), + }, + }); err != nil { + return err + } + } else { + // Create the labels through a patch to each individual field. + patches := []*client.Patch{ + { + Operation: client.Replace, + Path: pathToLabels + labelVaultVersion, + Value: r.initialState.VaultVersion, + }, + { + Operation: client.Replace, + Path: pathToLabels + labelActive, + Value: strconv.FormatBool(r.initialState.IsActive), + }, + { + Operation: client.Replace, + Path: pathToLabels + labelSealed, + Value: strconv.FormatBool(r.initialState.IsSealed), + }, + { + Operation: client.Replace, + Path: pathToLabels + labelPerfStandby, + Value: strconv.FormatBool(r.initialState.IsPerformanceStandby), + }, + { + Operation: client.Replace, + Path: pathToLabels + labelInitialized, + Value: strconv.FormatBool(r.initialState.IsInitialized), + }, + } + if err := r.client.PatchPod(r.namespace, r.podName, patches...); err != nil { + return err + } + } + return nil +} + +func (r *serviceRegistration) NotifyActiveStateChange(isActive bool) error { + r.retryHandler.Notify(r.client, &client.Patch{ + Operation: client.Replace, + Path: pathToLabels + labelActive, + Value: strconv.FormatBool(isActive), + }) + return nil +} + +func (r *serviceRegistration) NotifySealedStateChange(isSealed bool) error { + r.retryHandler.Notify(r.client, &client.Patch{ + Operation: client.Replace, + Path: pathToLabels + labelSealed, + Value: strconv.FormatBool(isSealed), + }) + return nil +} + +func (r *serviceRegistration) NotifyPerformanceStandbyStateChange(isStandby bool) error { + r.retryHandler.Notify(r.client, &client.Patch{ + Operation: client.Replace, + Path: pathToLabels + labelPerfStandby, + Value: strconv.FormatBool(isStandby), + }) + return nil +} + +func (r *serviceRegistration) NotifyInitializedStateChange(isInitialized bool) error { + r.retryHandler.Notify(r.client, &client.Patch{ + Operation: client.Replace, + Path: pathToLabels + labelInitialized, + Value: strconv.FormatBool(isInitialized), + }) + return nil +} + +func getRequiredField(logger hclog.Logger, config map[string]string, envVar, configParam string) (string, error) { + value := "" + switch { + case os.Getenv(envVar) != "": + value = os.Getenv(envVar) + case config[configParam] != "": + value = config[configParam] + default: + return "", fmt.Errorf(`%s must be provided via %q or the %q config parameter`, configParam, envVar, configParam) + } + if logger.IsDebug() { + logger.Debug(fmt.Sprintf("%q: %q", configParam, value)) + } + return value, nil +} diff --git a/serviceregistration/kubernetes/service_registration_test.go b/serviceregistration/kubernetes/service_registration_test.go new file mode 100644 index 000000000000..da6d35b0fd41 --- /dev/null +++ b/serviceregistration/kubernetes/service_registration_test.go @@ -0,0 +1,130 @@ +package kubernetes + +import ( + "os" + "strconv" + "sync" + "testing" + + "github.com/hashicorp/go-hclog" + sr "github.com/hashicorp/vault/serviceregistration" + "github.com/hashicorp/vault/serviceregistration/kubernetes/client" + kubetest "github.com/hashicorp/vault/serviceregistration/kubernetes/testing" +) + +var testVersion = "version 1" + +func TestServiceRegistration(t *testing.T) { + testState, testConf, closeFunc := kubetest.Server(t) + defer closeFunc() + + client.Scheme = testConf.ClientScheme + client.TokenFile = testConf.PathToTokenFile + client.RootCAFile = testConf.PathToRootCAFile + if err := os.Setenv(client.EnvVarKubernetesServiceHost, testConf.ServiceHost); err != nil { + t.Fatal(err) + } + if err := os.Setenv(client.EnvVarKubernetesServicePort, testConf.ServicePort); err != nil { + t.Fatal(err) + } + + if testState.NumPatches() != 0 { + t.Fatalf("expected 0 patches but have %d: %+v", testState.NumPatches(), testState) + } + shutdownCh := make(chan struct{}) + config := map[string]string{ + "namespace": kubetest.ExpectedNamespace, + "pod_name": kubetest.ExpectedPodName, + } + logger := hclog.NewNullLogger() + state := sr.State{ + VaultVersion: testVersion, + IsInitialized: true, + IsSealed: true, + IsActive: true, + IsPerformanceStandby: true, + } + reg, err := NewServiceRegistration(config, logger, state, "") + if err != nil { + t.Fatal(err) + } + if err := reg.Run(shutdownCh, &sync.WaitGroup{}); err != nil { + t.Fatal(err) + } + + // Test initial state. + if testState.NumPatches() != 5 { + t.Fatalf("expected 5 current labels but have %d: %+v", testState.NumPatches(), testState) + } + if testState.Get(pathToLabels + labelVaultVersion)["value"] != testVersion { + t.Fatalf("expected %q but received %q", testVersion, testState.Get(pathToLabels + labelVaultVersion)["value"]) + } + if testState.Get(pathToLabels + labelActive)["value"] != strconv.FormatBool(true) { + t.Fatalf("expected %q but received %q", strconv.FormatBool(true), testState.Get(pathToLabels + labelActive)["value"]) + } + if testState.Get(pathToLabels + labelSealed)["value"] != strconv.FormatBool(true) { + t.Fatalf("expected %q but received %q", strconv.FormatBool(true), testState.Get(pathToLabels + labelSealed)["value"]) + } + if testState.Get(pathToLabels + labelPerfStandby)["value"] != strconv.FormatBool(true) { + t.Fatalf("expected %q but received %q", strconv.FormatBool(true), testState.Get(pathToLabels + labelPerfStandby)["value"]) + } + if testState.Get(pathToLabels + labelInitialized)["value"] != strconv.FormatBool(true) { + t.Fatalf("expected %q but received %q", strconv.FormatBool(true), testState.Get(pathToLabels + labelInitialized)["value"]) + } + + // Test NotifyActiveStateChange. + if err := reg.NotifyActiveStateChange(false); err != nil { + t.Fatal(err) + } + if testState.Get(pathToLabels + labelActive)["value"] != strconv.FormatBool(false) { + t.Fatalf("expected %q but received %q", strconv.FormatBool(false), testState.Get(pathToLabels + labelActive)["value"]) + } + if err := reg.NotifyActiveStateChange(true); err != nil { + t.Fatal(err) + } + if testState.Get(pathToLabels + labelActive)["value"] != strconv.FormatBool(true) { + t.Fatalf("expected %q but received %q", strconv.FormatBool(true), testState.Get(pathToLabels + labelActive)["value"]) + } + + // Test NotifySealedStateChange. + if err := reg.NotifySealedStateChange(false); err != nil { + t.Fatal(err) + } + if testState.Get(pathToLabels + labelSealed)["value"] != strconv.FormatBool(false) { + t.Fatalf("expected %q but received %q", strconv.FormatBool(false), testState.Get(pathToLabels + labelSealed)["value"]) + } + if err := reg.NotifySealedStateChange(true); err != nil { + t.Fatal(err) + } + if testState.Get(pathToLabels + labelSealed)["value"] != strconv.FormatBool(true) { + t.Fatalf("expected %q but received %q", strconv.FormatBool(true), testState.Get(pathToLabels + labelSealed)["value"]) + } + + // Test NotifyPerformanceStandbyStateChange. + if err := reg.NotifyPerformanceStandbyStateChange(false); err != nil { + t.Fatal(err) + } + if testState.Get(pathToLabels + labelPerfStandby)["value"] != strconv.FormatBool(false) { + t.Fatalf("expected %q but received %q", strconv.FormatBool(false), testState.Get(pathToLabels + labelPerfStandby)["value"]) + } + if err := reg.NotifyPerformanceStandbyStateChange(true); err != nil { + t.Fatal(err) + } + if testState.Get(pathToLabels + labelPerfStandby)["value"] != strconv.FormatBool(true) { + t.Fatalf("expected %q but received %q", strconv.FormatBool(true), testState.Get(pathToLabels + labelPerfStandby)["value"]) + } + + // Test NotifyInitializedStateChange. + if err := reg.NotifyInitializedStateChange(false); err != nil { + t.Fatal(err) + } + if testState.Get(pathToLabels + labelInitialized)["value"] != strconv.FormatBool(false) { + t.Fatalf("expected %q but received %q", strconv.FormatBool(false), testState.Get(pathToLabels + labelInitialized)["value"]) + } + if err := reg.NotifyInitializedStateChange(true); err != nil { + t.Fatal(err) + } + if testState.Get(pathToLabels + labelInitialized)["value"] != strconv.FormatBool(true) { + t.Fatalf("expected %q but received %q", strconv.FormatBool(true), testState.Get(pathToLabels + labelInitialized)["value"]) + } +} diff --git a/serviceregistration/kubernetes/testing/README.md b/serviceregistration/kubernetes/testing/README.md new file mode 100644 index 000000000000..940415b6b436 --- /dev/null +++ b/serviceregistration/kubernetes/testing/README.md @@ -0,0 +1,54 @@ +# How to Test Manually + +- `$ minikube start` +- In the Vault folder, `$ make dev XC_ARCH=amd64 XC_OS=linux XC_OSARCH=linux/amd64` +- Create a file called `vault-test.yaml` with the following contents: + +``` +apiVersion: v1 +kind: Pod +metadata: + name: vault +spec: + containers: + - name: nginx + image: nginx + command: [ "sh", "-c"] + args: + - while true; do + echo -en '\n'; + printenv VAULT_K8S_POD_NAME VAULT_K8S_NAMESPACE; + sleep 10; + done; + env: + - name: VAULT_K8S_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: VAULT_K8S_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + restartPolicy: Never +``` + +- Create the pod: `$ kubectl apply -f vault-test.yaml` +- View the full initial state of the pod: `$ kubectl get pod vault -o=yaml > initialstate.txt` +- Drop the Vault binary into the pod: `$ kubectl cp bin/vault /vault:/` +- Drop to the shell within the pod: `$ kubectl exec -it vault -- /bin/bash` +- Install a text editor: `$ apt-get update`, `$ apt-get install nano` +- Write a test Vault config to `vault.config` like: + +``` +storage "inmem" {} +service_registration "kubernetes" {} +disable_mlock = true +ui = true +api_addr = "http://127.0.0.1:8200" +log_level = "debug" +``` + +- Run Vault: `$ ./vault server -config=vault.config -dev -dev-root-token-id=root` +- If 403's are received, you may need to grant RBAC, example here: https://github.com/fabric8io/fabric8/issues/6840#issuecomment-307560275 +- In a separate window outside the pod, view the resulting state of the pod: `$ kubectl get pod vault -o=yaml > currentstate.txt` +- View the differences: `$ diff initialstate.txt currentstate.txt` diff --git a/serviceregistration/kubernetes/testing/ca.crt b/serviceregistration/kubernetes/testing/ca.crt new file mode 100644 index 000000000000..077f48b86308 --- /dev/null +++ b/serviceregistration/kubernetes/testing/ca.crt @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC5zCCAc+gAwIBAgIBATANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQDEwptaW5p +a3ViZUNBMB4XDTE5MTIxMDIzMDUxOVoXDTI5MTIwODIzMDUxOVowFTETMBEGA1UE +AxMKbWluaWt1YmVDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANFi +/RIdMHd865X6JygTb9riX01DA3QnR+RoXDXNnj8D3LziLG2n8ItXMJvWbU3sxxyy +nX9HxJ0SIeexj1cYzdQBtJDjO1/PeuKc4CZ7zCukCAtHz8mC7BDPOU7F7pggpcQ0 +/t/pa2m22hmCu8aDF9WlUYHtJpYATnI/A5vz/VFLR9daxmkl59Qo3oHITj7vAzSx +/75r9cibpQyJ+FhiHOZHQWYY2JYw2g4v5hm5hg5SFM9yFcZ75ISI9ebyFFIl9iBY +zAk9jqv1mXvLr0Q39AVwMTamvGuap1oocjM9NIhQvaFL/DNqF1ouDQjCf5u2imLc +TraO1/2KO8fqwOZCOrMCAwEAAaNCMEAwDgYDVR0PAQH/BAQDAgKkMB0GA1UdJQQW +MBQGCCsGAQUFBwMCBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3 +DQEBCwUAA4IBAQBtVZCwCPqUUUpIClAlE9nc2fo2bTs9gsjXRmqdQ5oaSomSLE93 +aJWYFuAhxPXtlApbLYZfW2m1sM3mTVQN60y0uE4e1jdSN1ErYQ9slJdYDAMaEmOh +iSexj+Nd1scUiMHV9lf3ps5J8sYeCpwZX3sPmw7lqZojTS12pANBDcigsaj5RRyN +9GyP3WkSQUsTpWlDb9Fd+KNdkCVw7nClIpBPA2KW4BQKw/rNSvOFD61mbzc89lo0 +Q9IFGQFFF8jO18lbyWqnRBGXcS4/G7jQ3S7C121d14YLUeAYOM7pJykI1g4CLx9y +vitin0L6nprauWkKO38XgM4T75qKZpqtiOcT +-----END CERTIFICATE----- diff --git a/serviceregistration/kubernetes/testing/resp-get-pod.json b/serviceregistration/kubernetes/testing/resp-get-pod.json new file mode 100644 index 000000000000..229eb1fbd3b2 --- /dev/null +++ b/serviceregistration/kubernetes/testing/resp-get-pod.json @@ -0,0 +1,120 @@ +{ + "kind": "Pod", + "apiVersion": "v1", + "metadata": { + "name": "shell-demo", + "labels": {"fizz": "buzz"}, + "namespace": "default", + "selfLink": "/api/v1/namespaces/default/pods/shell-demo", + "uid": "7ecb93ff-aa64-426d-b330-2c0b2c0957a2", + "resourceVersion": "87798", + "creationTimestamp": "2020-01-10T19:22:40Z", + "annotations": { + "kubectl.kubernetes.io/last-applied-configuration": "{\"apiVersion\":\"v1\",\"kind\":\"Pod\",\"metadata\":{\"annotations\":{},\"name\":\"shell-demo\",\"namespace\":\"default\"},\"spec\":{\"containers\":[{\"image\":\"nginx\",\"name\":\"nginx\",\"volumeMounts\":[{\"mountPath\":\"/usr/share/nginx/html\",\"name\":\"shared-data\"}]}],\"dnsPolicy\":\"Default\",\"hostNetwork\":true,\"volumes\":[{\"emptyDir\":{},\"name\":\"shared-data\"}]}}\n" + } + }, + "spec": { + "volumes": [{ + "name": "shared-data", + "emptyDir": {} + }, { + "name": "default-token-5fjt9", + "secret": { + "secretName": "default-token-5fjt9", + "defaultMode": 420 + } + }], + "containers": [{ + "name": "nginx", + "image": "nginx", + "resources": {}, + "volumeMounts": [{ + "name": "shared-data", + "mountPath": "/usr/share/nginx/html" + }, { + "name": "default-token-5fjt9", + "readOnly": true, + "mountPath": "/var/run/secrets/kubernetes.io/serviceaccount" + }], + "terminationMessagePath": "/dev/termination-log", + "terminationMessagePolicy": "File", + "imagePullPolicy": "Always" + }], + "restartPolicy": "Always", + "terminationGracePeriodSeconds": 30, + "dnsPolicy": "Default", + "serviceAccountName": "default", + "serviceAccount": "default", + "nodeName": "minikube", + "hostNetwork": true, + "securityContext": {}, + "schedulerName": "default-scheduler", + "tolerations": [{ + "key": "node.kubernetes.io/not-ready", + "operator": "Exists", + "effect": "NoExecute", + "tolerationSeconds": 300 + }, { + "key": "node.kubernetes.io/unreachable", + "operator": "Exists", + "effect": "NoExecute", + "tolerationSeconds": 300 + }], + "priority": 0, + "enableServiceLinks": true + }, + "status": { + "phase": "Running", + "conditions": [{ + "type": "Initialized", + "status": "True", + "lastProbeTime": null, + "lastTransitionTime": "2020-01-10T19:22:40Z" + }, { + "type": "Ready", + "status": "True", + "lastProbeTime": null, + "lastTransitionTime": "2020-01-10T20:20:55Z" + }, { + "type": "ContainersReady", + "status": "True", + "lastProbeTime": null, + "lastTransitionTime": "2020-01-10T20:20:55Z" + }, { + "type": "PodScheduled", + "status": "True", + "lastProbeTime": null, + "lastTransitionTime": "2020-01-10T19:22:40Z" + }], + "hostIP": "192.168.99.100", + "podIP": "192.168.99.100", + "podIPs": [{ + "ip": "192.168.99.100" + }], + "startTime": "2020-01-10T19:22:40Z", + "containerStatuses": [{ + "name": "nginx", + "state": { + "running": { + "startedAt": "2020-01-10T20:20:55Z" + } + }, + "lastState": { + "terminated": { + "exitCode": 0, + "reason": "Completed", + "startedAt": "2020-01-10T19:22:53Z", + "finishedAt": "2020-01-10T20:12:03Z", + "containerID": "docker://ed8bc068cd313ea5adb72780e8015ab09ecb61ea077e39304b4a3fe581f471c4" + } + }, + "ready": true, + "restartCount": 1, + "image": "nginx:latest", + "imageID": "docker-pullable://nginx@sha256:8aa7f6a9585d908a63e5e418dc5d14ae7467d2e36e1ab4f0d8f9d059a3d071ce", + "containerID": "docker://a8ee34466791bc6f082f271f40cdfc43625cea81831b1029b1e90b4f6949f6df", + "started": true + }], + "qosClass": "BestEffort" + } +} diff --git a/serviceregistration/kubernetes/testing/resp-not-found.json b/serviceregistration/kubernetes/testing/resp-not-found.json new file mode 100644 index 000000000000..800a9624f04f --- /dev/null +++ b/serviceregistration/kubernetes/testing/resp-not-found.json @@ -0,0 +1,13 @@ +{ + "kind": "Status", + "apiVersion": "v1", + "metadata": {}, + "status": "Failure", + "message": "pods \"shell-dem\" not found", + "reason": "NotFound", + "details": { + "name": "shell-dem", + "kind": "pods" + }, + "code": 404 +} diff --git a/serviceregistration/kubernetes/testing/resp-update-pod.json b/serviceregistration/kubernetes/testing/resp-update-pod.json new file mode 100644 index 000000000000..e808691f92f5 --- /dev/null +++ b/serviceregistration/kubernetes/testing/resp-update-pod.json @@ -0,0 +1,123 @@ +{ + "kind": "Pod", + "apiVersion": "v1", + "metadata": { + "name": "shell-demo", + "namespace": "default", + "selfLink": "/api/v1/namespaces/default/pods/shell-demo", + "uid": "7ecb93ff-aa64-426d-b330-2c0b2c0957a2", + "resourceVersion": "96433", + "creationTimestamp": "2020-01-10T19:22:40Z", + "labels": { + "fizz": "buzz", + "foo": "bar" + }, + "annotations": { + "kubectl.kubernetes.io/last-applied-configuration": "{\"apiVersion\":\"v1\",\"kind\":\"Pod\",\"metadata\":{\"annotations\":{},\"name\":\"shell-demo\",\"namespace\":\"default\"},\"spec\":{\"containers\":[{\"image\":\"nginx\",\"name\":\"nginx\",\"volumeMounts\":[{\"mountPath\":\"/usr/share/nginx/html\",\"name\":\"shared-data\"}]}],\"dnsPolicy\":\"Default\",\"hostNetwork\":true,\"volumes\":[{\"emptyDir\":{},\"name\":\"shared-data\"}]}}\n" + } + }, + "spec": { + "volumes": [{ + "name": "shared-data", + "emptyDir": {} + }, { + "name": "default-token-5fjt9", + "secret": { + "secretName": "default-token-5fjt9", + "defaultMode": 420 + } + }], + "containers": [{ + "name": "nginx", + "image": "nginx", + "resources": {}, + "volumeMounts": [{ + "name": "shared-data", + "mountPath": "/usr/share/nginx/html" + }, { + "name": "default-token-5fjt9", + "readOnly": true, + "mountPath": "/var/run/secrets/kubernetes.io/serviceaccount" + }], + "terminationMessagePath": "/dev/termination-log", + "terminationMessagePolicy": "File", + "imagePullPolicy": "Always" + }], + "restartPolicy": "Always", + "terminationGracePeriodSeconds": 30, + "dnsPolicy": "Default", + "serviceAccountName": "default", + "serviceAccount": "default", + "nodeName": "minikube", + "hostNetwork": true, + "securityContext": {}, + "schedulerName": "default-scheduler", + "tolerations": [{ + "key": "node.kubernetes.io/not-ready", + "operator": "Exists", + "effect": "NoExecute", + "tolerationSeconds": 300 + }, { + "key": "node.kubernetes.io/unreachable", + "operator": "Exists", + "effect": "NoExecute", + "tolerationSeconds": 300 + }], + "priority": 0, + "enableServiceLinks": true + }, + "status": { + "phase": "Running", + "conditions": [{ + "type": "Initialized", + "status": "True", + "lastProbeTime": null, + "lastTransitionTime": "2020-01-10T19:22:40Z" + }, { + "type": "Ready", + "status": "True", + "lastProbeTime": null, + "lastTransitionTime": "2020-01-10T20:20:55Z" + }, { + "type": "ContainersReady", + "status": "True", + "lastProbeTime": null, + "lastTransitionTime": "2020-01-10T20:20:55Z" + }, { + "type": "PodScheduled", + "status": "True", + "lastProbeTime": null, + "lastTransitionTime": "2020-01-10T19:22:40Z" + }], + "hostIP": "192.168.99.100", + "podIP": "192.168.99.100", + "podIPs": [{ + "ip": "192.168.99.100" + }], + "startTime": "2020-01-10T19:22:40Z", + "containerStatuses": [{ + "name": "nginx", + "state": { + "running": { + "startedAt": "2020-01-10T20:20:55Z" + } + }, + "lastState": { + "terminated": { + "exitCode": 0, + "reason": "Completed", + "startedAt": "2020-01-10T19:22:53Z", + "finishedAt": "2020-01-10T20:12:03Z", + "containerID": "docker://ed8bc068cd313ea5adb72780e8015ab09ecb61ea077e39304b4a3fe581f471c4" + } + }, + "ready": true, + "restartCount": 1, + "image": "nginx:latest", + "imageID": "docker-pullable://nginx@sha256:8aa7f6a9585d908a63e5e418dc5d14ae7467d2e36e1ab4f0d8f9d059a3d071ce", + "containerID": "docker://a8ee34466791bc6f082f271f40cdfc43625cea81831b1029b1e90b4f6949f6df", + "started": true + }], + "qosClass": "BestEffort" + } +} diff --git a/serviceregistration/kubernetes/testing/testserver.go b/serviceregistration/kubernetes/testing/testserver.go new file mode 100644 index 000000000000..0955afed86da --- /dev/null +++ b/serviceregistration/kubernetes/testing/testserver.go @@ -0,0 +1,243 @@ +package testing + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "path" + "strings" + "sync" + "testing" + + "go.uber.org/atomic" +) + +const ( + ExpectedNamespace = "default" + ExpectedPodName = "shell-demo" + + // File names of samples pulled from real life. + caCrtFile = "ca.crt" + respGetPod = "resp-get-pod.json" + respNotFound = "resp-not-found.json" + respUpdatePod = "resp-update-pod.json" + tokenFile = "token" +) + +var ( + // ReturnGatewayTimeouts toggles whether the test server should return, + // well, gateway timeouts... + ReturnGatewayTimeouts = atomic.NewBool(false) + + pathToFiles = func() string { + wd, _ := os.Getwd() + pathParts := strings.Split(wd, "vault") + return pathParts[0] + "vault/serviceregistration/kubernetes/testing/" + }() +) + +// Conf returns the info needed to configure the client to point at +// the test server. This must be done by the caller to avoid an import +// cycle between the client and the testserver. Example usage: +// +// client.Scheme = testConf.ClientScheme +// client.TokenFile = testConf.PathToTokenFile +// client.RootCAFile = testConf.PathToRootCAFile +// if err := os.Setenv(client.EnvVarKubernetesServiceHost, testConf.ServiceHost); err != nil { +// t.Fatal(err) +// } +// if err := os.Setenv(client.EnvVarKubernetesServicePort, testConf.ServicePort); err != nil { +// t.Fatal(err) +// } +type Conf struct { + ClientScheme, PathToTokenFile, PathToRootCAFile, ServiceHost, ServicePort string +} + +// Server returns an http test server that can be used to test +// Kubernetes client code. It also retains the current state, +// and a func to close the server and to clean up any temporary +// files. +func Server(t *testing.T) (testState *State, testConf *Conf, closeFunc func()) { + testState = &State{m: &sync.Map{}} + testConf = &Conf{ + ClientScheme: "http://", + } + + // We're going to have multiple close funcs to call. + var closers []func() + closeFunc = func() { + for _, closer := range closers { + closer() + } + } + + // Read in our sample files. + token, err := readFile(tokenFile) + if err != nil { + t.Fatal(err) + } + caCrt, err := readFile(caCrtFile) + if err != nil { + t.Fatal(err) + } + notFoundResponse, err := readFile(respNotFound) + if err != nil { + t.Fatal(err) + } + getPodResponse, err := readFile(respGetPod) + if err != nil { + t.Fatal(err) + } + updatePodTagsResponse, err := readFile(respUpdatePod) + if err != nil { + t.Fatal(err) + } + + // Plant our token in a place where it can be read for the config. + tmpToken, err := ioutil.TempFile("", "token") + if err != nil { + t.Fatal(err) + } + closers = append(closers, func() { + os.Remove(tmpToken.Name()) + }) + if _, err = tmpToken.WriteString(token); err != nil { + closeFunc() + t.Fatal(err) + } + if err := tmpToken.Close(); err != nil { + closeFunc() + t.Fatal(err) + } + testConf.PathToTokenFile = tmpToken.Name() + + tmpCACrt, err := ioutil.TempFile("", "ca.crt") + if err != nil { + closeFunc() + t.Fatal(err) + } + closers = append(closers, func() { + os.Remove(tmpCACrt.Name()) + }) + if _, err = tmpCACrt.WriteString(caCrt); err != nil { + closeFunc() + t.Fatal(err) + } + if err := tmpCACrt.Close(); err != nil { + closeFunc() + t.Fatal(err) + } + testConf.PathToRootCAFile = tmpCACrt.Name() + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if ReturnGatewayTimeouts.Load() { + w.WriteHeader(504) + return + } + namespace, podName, err := parsePath(r.URL.Path) + if err != nil { + w.WriteHeader(400) + w.Write([]byte(fmt.Sprintf("unable to parse %s: %s", r.URL.Path, err.Error()))) + return + } + + switch { + case namespace != ExpectedNamespace, podName != ExpectedPodName: + w.WriteHeader(404) + w.Write([]byte(notFoundResponse)) + return + case r.Method == http.MethodGet: + w.WriteHeader(200) + w.Write([]byte(getPodResponse)) + return + case r.Method == http.MethodPatch: + var patches []interface{} + if err := json.NewDecoder(r.Body).Decode(&patches); err != nil { + w.WriteHeader(400) + w.Write([]byte(fmt.Sprintf("unable to decode patches %s: %s", r.URL.Path, err.Error()))) + return + } + for _, patch := range patches { + patchMap := patch.(map[string]interface{}) + p := patchMap["path"].(string) + testState.store(p, patchMap) + } + w.WriteHeader(200) + w.Write([]byte(updatePodTagsResponse)) + return + default: + w.WriteHeader(400) + w.Write([]byte(fmt.Sprintf("unexpected request method: %s", r.Method))) + } + })) + closers = append(closers, ts.Close) + + // ts.URL example: http://127.0.0.1:35681 + urlFields := strings.Split(ts.URL, "://") + if len(urlFields) != 2 { + closeFunc() + t.Fatal("received unexpected test url: " + ts.URL) + } + urlFields = strings.Split(urlFields[1], ":") + if len(urlFields) != 2 { + closeFunc() + t.Fatal("received unexpected test url: " + ts.URL) + } + testConf.ServiceHost = urlFields[0] + testConf.ServicePort = urlFields[1] + return testState, testConf, closeFunc +} + +type State struct { + m *sync.Map +} + +func (s *State) NumPatches() int { + l := 0 + f := func(key, value interface{}) bool { + l++ + return true + } + s.m.Range(f) + return l +} + +func (s *State) Get(key string) map[string]interface{} { + v, ok := s.m.Load(key) + if !ok { + return nil + } + patch, ok := v.(map[string]interface{}) + if !ok { + return nil + } + return patch +} + +func (s *State) store(k string, p map[string]interface{}) { + s.m.Store(k, p) +} + +// The path should be formatted like this: +// fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", namespace, podName) +func parsePath(urlPath string) (namespace, podName string, err error) { + original := urlPath + podName = path.Base(urlPath) + urlPath = strings.TrimSuffix(urlPath, "/pods/"+podName) + namespace = path.Base(urlPath) + if original != fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", namespace, podName) { + return "", "", fmt.Errorf("received unexpected path: %s", original) + } + return namespace, podName, nil +} + +func readFile(fileName string) (string, error) { + b, err := ioutil.ReadFile(pathToFiles + fileName) + if err != nil { + return "", err + } + return string(b), nil +} diff --git a/serviceregistration/kubernetes/testing/token b/serviceregistration/kubernetes/testing/token new file mode 100644 index 000000000000..42d4949f112f --- /dev/null +++ b/serviceregistration/kubernetes/testing/token @@ -0,0 +1 @@ +eyJhbGciOiJSUzI1NiIsImtpZCI6IjZVQU91ckJYcTZKRHQtWHpaOExib2EyUlFZQWZObms2d25mY3ZtVm1NNUUifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJkZWZhdWx0Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZWNyZXQubmFtZSI6ImRlZmF1bHQtdG9rZW4tNWZqdDkiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC5uYW1lIjoiZGVmYXVsdCIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6ImY0NGUyMDIxLTU2YWItNDEzNC1hMjMxLTBlMDJmNjhmNzJhNiIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpkZWZhdWx0OmRlZmF1bHQifQ.hgMbuT0hlxG04fDvI_Iyxtbwc8M-i3q3K7CqIGC_jYSjVlyezHN_0BeIB3rE0_M2xvbIs6chsWFZVsK_8Pj6ho7VT0x5PWy5n6KsqTBz8LPpjWpsaxpYQos0RzgA3KLnuzZE8Cl-v-PwWQK57jgbS4AdlXujQXdtLXJNwNAKI0pvCASA6UXP55_X845EsJkyT1J-bURSS3Le3g9A4pDoQ_MUv7hqa-p7yQEtFfYCkq1KKrUJZMRjmS4qda1rg-Em-dw9RFvQtPodRYF0DKT7A7qgmLUfIkuky3NnsQtvaUo8ZVtUiwIEfRdqw1oQIY4CSYz-wUl2xZa7n2QQBROE7w \ No newline at end of file diff --git a/vendor/github.com/hashicorp/vault/sdk/helper/certutil/helpers.go b/vendor/github.com/hashicorp/vault/sdk/helper/certutil/helpers.go index 4a35f88dca5e..3bf6ae502b67 100644 --- a/vendor/github.com/hashicorp/vault/sdk/helper/certutil/helpers.go +++ b/vendor/github.com/hashicorp/vault/sdk/helper/certutil/helpers.go @@ -14,6 +14,8 @@ import ( "encoding/pem" "errors" "fmt" + "io" + "io/ioutil" "math/big" "net" "net/url" @@ -804,3 +806,50 @@ func SignCertificate(data *CreationBundle) (*ParsedCertBundle, error) { return result, nil } + +func NewCertPool(reader io.Reader) (*x509.CertPool, error) { + pemBlock, err := ioutil.ReadAll(reader) + if err != nil { + return nil, err + } + certs, err := parseCertsPEM(pemBlock) + if err != nil { + return nil, fmt.Errorf("error reading certs: %s", err) + } + pool := x509.NewCertPool() + for _, cert := range certs { + pool.AddCert(cert) + } + return pool, nil +} + +// parseCertsPEM returns the x509.Certificates contained in the given PEM-encoded byte array +// Returns an error if a certificate could not be parsed, or if the data does not contain any certificates +func parseCertsPEM(pemCerts []byte) ([]*x509.Certificate, error) { + ok := false + certs := []*x509.Certificate{} + for len(pemCerts) > 0 { + var block *pem.Block + block, pemCerts = pem.Decode(pemCerts) + if block == nil { + break + } + // Only use PEM "CERTIFICATE" blocks without extra headers + if block.Type != "CERTIFICATE" || len(block.Headers) != 0 { + continue + } + + cert, err := x509.ParseCertificate(block.Bytes) + if err != nil { + return certs, err + } + + certs = append(certs, cert) + ok = true + } + + if !ok { + return certs, errors.New("data does not contain any valid RSA or ECDSA certificates") + } + return certs, nil +}