-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Kubernetes service registration #8249
Merged
Merged
Changes from all commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
2dc736e
add kubernetes service registration
tyrannosaurus-becks 4dc0d00
rename env vars
tyrannosaurus-becks e75fe3b
increase commentary around env vars
tyrannosaurus-becks c211b87
improve err and comments
tyrannosaurus-becks 708306b
strip onShutdown method
tyrannosaurus-becks f38381e
use io.Reader in NewCertPool
tyrannosaurus-becks c116c5b
minor changes from feedback
tyrannosaurus-becks 38723df
Update serviceregistration/kubernetes/client/client.go
tyrannosaurus-becks 27ec6f4
Update serviceregistration/kubernetes/retry_handler.go
tyrannosaurus-becks 84cd8ec
leave patchesToRetry nil
tyrannosaurus-becks 74c438e
simplify PatchOperation enum to string
tyrannosaurus-becks b00a0c4
use retryable client for exp backoff and jitter
tyrannosaurus-becks 283ba7f
add ability to respond to application stops
tyrannosaurus-becks e92f716
replace toString with FormatBool
tyrannosaurus-becks 511de33
update vendored files
tyrannosaurus-becks 03573a8
stop ticker in retryHandler
tyrannosaurus-becks a89bc1f
update how test fails
tyrannosaurus-becks 662c272
strip req and resp body from error responses
tyrannosaurus-becks 04bc995
use a map to de-dupe patches to same path
tyrannosaurus-becks 630f180
use simpler context
tyrannosaurus-becks 59b3db7
update certutil test
tyrannosaurus-becks e2a0a7b
fix env vars in manual testing doc
tyrannosaurus-becks 255e113
Update serviceregistration/kubernetes/testing/README.md
tyrannosaurus-becks aa1b492
Update serviceregistration/kubernetes/testing/README.md
tyrannosaurus-becks 14ba35f
retry initial state if it fails
tyrannosaurus-becks 8b62a65
improve logging in retry handler
tyrannosaurus-becks 5eb0567
protect against nil response
tyrannosaurus-becks File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,254 @@ | ||
package client | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"crypto/tls" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"net/http" | ||
"time" | ||
|
||
"github.com/hashicorp/go-cleanhttp" | ||
"github.com/hashicorp/go-hclog" | ||
"github.com/hashicorp/go-retryablehttp" | ||
) | ||
|
||
var ( | ||
// Retry configuration | ||
RetryWaitMin = 500 * time.Millisecond | ||
RetryWaitMax = 30 * time.Second | ||
RetryMax = 10 | ||
|
||
// Standard errs | ||
ErrNamespaceUnset = errors.New(`"namespace" is unset`) | ||
ErrPodNameUnset = errors.New(`"podName" is unset`) | ||
ErrNotInCluster = errors.New("unable to load in-cluster configuration, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined") | ||
) | ||
|
||
// New instantiates a Client. The stopCh is used for exiting retry loops | ||
// when closed. | ||
func New(logger hclog.Logger, stopCh <-chan struct{}) (*Client, error) { | ||
config, err := inClusterConfig() | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &Client{ | ||
logger: logger, | ||
config: config, | ||
stopCh: stopCh, | ||
}, nil | ||
} | ||
|
||
// Client is a minimal Kubernetes client. We rolled our own because the existing | ||
// Kubernetes client-go library available externally has a high number of dependencies | ||
// and we thought it wasn't worth it for only two API calls. If at some point they break | ||
// the client into smaller modules, or if we add quite a few methods to this client, it may | ||
// be worthwhile to revisit that decision. | ||
type Client struct { | ||
logger hclog.Logger | ||
config *Config | ||
stopCh <-chan struct{} | ||
} | ||
|
||
// GetPod gets a pod from the Kubernetes API. | ||
func (c *Client) GetPod(namespace, podName string) (*Pod, error) { | ||
endpoint := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", namespace, podName) | ||
method := http.MethodGet | ||
|
||
// Validate that we received required parameters. | ||
if namespace == "" { | ||
return nil, ErrNamespaceUnset | ||
} | ||
if podName == "" { | ||
return nil, ErrPodNameUnset | ||
} | ||
|
||
req, err := http.NewRequest(method, c.config.Host+endpoint, nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
pod := &Pod{} | ||
if err := c.do(req, pod); err != nil { | ||
return nil, err | ||
} | ||
return pod, nil | ||
} | ||
|
||
// PatchPod updates the pod's tags to the given ones. | ||
// It does so non-destructively, or in other words, without tearing down | ||
// the pod. | ||
func (c *Client) PatchPod(namespace, podName string, patches ...*Patch) error { | ||
endpoint := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", namespace, podName) | ||
method := http.MethodPatch | ||
|
||
// Validate that we received required parameters. | ||
if namespace == "" { | ||
return ErrNamespaceUnset | ||
} | ||
if podName == "" { | ||
return ErrPodNameUnset | ||
} | ||
if len(patches) == 0 { | ||
// No work to perform. | ||
return nil | ||
} | ||
|
||
var jsonPatches []map[string]interface{} | ||
for _, patch := range patches { | ||
if patch.Operation == Unset { | ||
return errors.New("patch operation must be set") | ||
} | ||
jsonPatches = append(jsonPatches, map[string]interface{}{ | ||
"op": patch.Operation, | ||
"path": patch.Path, | ||
"value": patch.Value, | ||
}) | ||
} | ||
body, err := json.Marshal(jsonPatches) | ||
if err != nil { | ||
return err | ||
} | ||
req, err := http.NewRequest(method, c.config.Host+endpoint, bytes.NewReader(body)) | ||
if err != nil { | ||
return err | ||
} | ||
req.Header.Set("Content-Type", "application/json-patch+json") | ||
return c.do(req, nil) | ||
} | ||
|
||
// do executes the given request, retrying if necessary. | ||
func (c *Client) do(req *http.Request, ptrToReturnObj interface{}) error { | ||
// Finish setting up a valid request. | ||
retryableReq, err := retryablehttp.FromRequest(req) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Build a context that will call the cancelFunc when we receive | ||
// a stop from our stopChan. This allows us to exit from our retry | ||
// loop during a shutdown, rather than hanging. | ||
ctx, cancelFunc := context.WithCancel(context.Background()) | ||
go func(stopCh <-chan struct{}) { | ||
<-stopCh | ||
cancelFunc() | ||
}(c.stopCh) | ||
retryableReq.WithContext(ctx) | ||
|
||
retryableReq.Header.Set("Authorization", "Bearer "+c.config.BearerToken) | ||
retryableReq.Header.Set("Accept", "application/json") | ||
|
||
client := &retryablehttp.Client{ | ||
HTTPClient: cleanhttp.DefaultClient(), | ||
RetryWaitMin: RetryWaitMin, | ||
RetryWaitMax: RetryWaitMax, | ||
RetryMax: RetryMax, | ||
CheckRetry: c.getCheckRetry(req), | ||
Backoff: retryablehttp.DefaultBackoff, | ||
} | ||
client.HTTPClient.Transport = &http.Transport{ | ||
TLSClientConfig: &tls.Config{ | ||
RootCAs: c.config.CACertPool, | ||
}, | ||
} | ||
|
||
// Execute and retry the request. This client comes with exponential backoff and | ||
// jitter already rolled in. | ||
resp, err := client.Do(retryableReq) | ||
if err != nil { | ||
return err | ||
} | ||
defer func() { | ||
if err := resp.Body.Close(); err != nil { | ||
if c.logger.IsWarn() { | ||
// Failing to close response bodies can present as a memory leak so it's | ||
// important to surface it. | ||
c.logger.Warn(fmt.Sprintf("unable to close response body: %s", err)) | ||
} | ||
} | ||
}() | ||
|
||
// If we're not supposed to read out the body, we have nothing further | ||
// to do here. | ||
if ptrToReturnObj == nil { | ||
return nil | ||
} | ||
|
||
// Attempt to read out the body into the given return object. | ||
return json.NewDecoder(resp.Body).Decode(ptrToReturnObj) | ||
} | ||
|
||
func (c *Client) getCheckRetry(req *http.Request) retryablehttp.CheckRetry { | ||
return func(ctx context.Context, resp *http.Response, err error) (bool, error) { | ||
if resp == nil { | ||
return true, fmt.Errorf("nil response: %s", req.URL.RequestURI()) | ||
} | ||
switch resp.StatusCode { | ||
case 200, 201, 202, 204: | ||
// Success. | ||
return false, nil | ||
case 401, 403: | ||
// Perhaps the token from our bearer token file has been refreshed. | ||
config, err := inClusterConfig() | ||
if err != nil { | ||
return false, err | ||
} | ||
if config.BearerToken == c.config.BearerToken { | ||
// It's the same token. | ||
return false, fmt.Errorf("bad status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode)) | ||
} | ||
c.config = config | ||
// Continue to try again, but return the error too in case the caller would rather read it out. | ||
return true, fmt.Errorf("bad status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode)) | ||
case 404: | ||
return false, &ErrNotFound{debuggingInfo: sanitizedDebuggingInfo(req, resp.StatusCode)} | ||
case 500, 502, 503, 504: | ||
// Could be transient. | ||
return true, fmt.Errorf("unexpected status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode)) | ||
} | ||
// Unexpected. | ||
return false, fmt.Errorf("unexpected status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode)) | ||
} | ||
} | ||
|
||
type Pod struct { | ||
Metadata *Metadata `json:"metadata,omitempty"` | ||
} | ||
|
||
type Metadata struct { | ||
Name string `json:"name,omitempty"` | ||
|
||
// This map will be nil if no "labels" key was provided. | ||
// It will be populated but have a length of zero if the | ||
// key was provided, but no values. | ||
Labels map[string]string `json:"labels,omitempty"` | ||
} | ||
|
||
type PatchOperation string | ||
|
||
const ( | ||
Unset PatchOperation = "unset" | ||
Add = "add" | ||
Replace = "replace" | ||
) | ||
|
||
type Patch struct { | ||
Operation PatchOperation | ||
Path string | ||
Value interface{} | ||
} | ||
|
||
type ErrNotFound struct { | ||
debuggingInfo string | ||
} | ||
|
||
func (e *ErrNotFound) Error() string { | ||
return e.debuggingInfo | ||
} | ||
|
||
// sanitizedDebuggingInfo provides a returnable string that can be used for debugging. This is intentionally somewhat vague | ||
// because we don't want to leak secrets that may be in a request or response body. | ||
func sanitizedDebuggingInfo(req *http.Request, respStatus int) string { | ||
return fmt.Sprintf("req method: %s, req url: %s, resp statuscode: %d", req.Method, req.URL, respStatus) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a technical reason we don't support dev mode? Do we have any other configurations that don't work in dev mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mjarmy or @briankassouf do you happen to know the answer to this?
@jasonodonnell , I don't know the original reason for this, but I do know that Consul currently also has service registration and it presently is only available in HA. Right here, it builds how it's going to advertise itself based on whether it's a performance standby. If we were going to make service registration available in wider circumstances, I'm not sure if that could negatively impact Consul from a backwards-compatibility standpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, when I re-read this, I see the question is about dev mode, not HA. I just was thinking about HA because we'd talked about it previously.
In response to the actual question :-), I'd bet that we weren't allowed to run service registrations unless in HA mode because some of the service registration notifications only work with HA. However, this made it difficult to do just little quick tests that didn't pertain to the HA notifications. So I added a flag to allow us to test service registration if the
-dev
flag is included just to make my life a little easier. With this exception added here, I can just do a quick test without running in a clustered mode, or even if I've dropped one single Vault instance into a container. I figured that since it doesn't effect end users, should be fine.