Skip to content

Commit

Permalink
Introduce k8s API request executor
Browse files Browse the repository at this point in the history
  • Loading branch information
Nizamski committed Jun 5, 2023
1 parent 8d86c42 commit 61d0489
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 12 deletions.
1 change: 1 addition & 0 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

type Cache[K comparable, V any] interface {
Read(k K) V
ReadOptional(k K) (V, bool)
Write(k K, v V) V
Remove(k K)
Size() int
Expand Down
9 changes: 9 additions & 0 deletions internal/cache/in-memory-cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ func (c *InMemoryCache[K, V]) Read(k K) V {
return v
}

func (c *InMemoryCache[K, V]) ReadOptional(k K) (V, bool) {
c.RLock()
defer c.RUnlock()

v, ok := c.entries[k]

return v, ok
}

func (c *InMemoryCache[K, V]) Write(k K, v V) V {
c.Lock()
defer c.Unlock()
Expand Down
23 changes: 14 additions & 9 deletions internal/executors/executor_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ type Context interface {
GetMap(k string) (map[string]string, error)
GetList(k string) ([]string, error)
GetBoolean(k string) (bool, error)
GetInput() cache.MapCache[string, string]
GetStore() cache.MapCache[string, string]
}

type ExecutorContext struct {
input map[string]string
input cache.MapCache[string, string]
store cache.MapCache[string, string]
}

Expand All @@ -34,25 +35,25 @@ var (

func NewExecutorContext(input map[string]string, store map[string]string) ExecutorContext {
return ExecutorContext{
input: input,
input: cache.NewInMemoryCache[string, string]().FromMap(input),
store: cache.NewInMemoryCache[string, string]().FromMap(store),
}
}

func (e *ExecutorContext) GetString(k string) string {
return e.input[k]
return e.input.Read(k)
}

func (e *ExecutorContext) GetRequiredString(k string) (string, error) {
if v, ok := e.input[k]; ok {
if v, ok := e.input.ReadOptional(k); ok {
return v, nil
}

return "", NewRequiredKeyValidationError(k)
}

func (e *ExecutorContext) GetNumber(k string) (uint64, error) {
s, ok := e.input[k]
s, ok := e.input.ReadOptional(k)
if !ok {
return 0, nil
}
Expand All @@ -66,7 +67,7 @@ func (e *ExecutorContext) GetNumber(k string) (uint64, error) {
}

func (e *ExecutorContext) GetRequiredNumber(k string) (uint64, error) {
if _, ok := e.input[k]; ok {
if _, ok := e.input.ReadOptional(k); ok {
return e.GetNumber(k)
}

Expand All @@ -75,7 +76,7 @@ func (e *ExecutorContext) GetRequiredNumber(k string) (uint64, error) {

func (e *ExecutorContext) GetMap(k string) (map[string]string, error) {
m := make(map[string]string)
s, ok := e.input[k]
s, ok := e.input.ReadOptional(k)
if !ok {
return m, nil
}
Expand All @@ -89,7 +90,7 @@ func (e *ExecutorContext) GetMap(k string) (map[string]string, error) {

func (e *ExecutorContext) GetList(k string) ([]string, error) {
l := make([]string, 0)
s, ok := e.input[k]
s, ok := e.input.ReadOptional(k)
if !ok {
return l, nil
}
Expand All @@ -102,7 +103,7 @@ func (e *ExecutorContext) GetList(k string) ([]string, error) {
}

func (e *ExecutorContext) GetBoolean(k string) (bool, error) {
s, ok := e.input[k]
s, ok := e.input.ReadOptional(k)
if !ok {
return false, nil
}
Expand All @@ -115,6 +116,10 @@ func (e *ExecutorContext) GetBoolean(k string) (bool, error) {
return b, nil
}

func (e *ExecutorContext) GetInput() cache.MapCache[string, string] {
return e.input
}

func (e *ExecutorContext) GetStore() cache.MapCache[string, string] {
return e.store
}
3 changes: 2 additions & 1 deletion internal/executors/executor_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ const (
ExecutorType_VOID
ExecutorType_HTTP
ExecutorType_SCRIPT
ExecutorType_KUBERNETES_API_REQUEST
)

var (
executorTypeNames = [...]string{"VOID", "HTTP"}
executorTypeNames = [...]string{"VOID", "HTTP", "KUBERNETES_API_REQUEST"}
)

func (t ExecutorType) String() string {
Expand Down
5 changes: 3 additions & 2 deletions internal/executors/factory/executor_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ type ExecutorFactory struct {
func newExecutorFactory() ExecutorFactory {
return ExecutorFactory{
generators: map[pb.TaskType]ExecutorGenerator{
pb.TaskType_TASK_TYPE_VOID: voidExecutorGenerator(),
pb.TaskType_TASK_TYPE_HTTP: httpRequestExecutorGenerator(),
pb.TaskType_TASK_TYPE_VOID: voidExecutorGenerator(),
pb.TaskType_TASK_TYPE_HTTP: httpRequestExecutorGenerator(),
pb.TaskType_TASK_TYPE_KUBERNETES_API_REQUEST: kubernetesApiRequestExecutorGenerator(),
},
}
}
Expand Down
7 changes: 7 additions & 0 deletions internal/executors/factory/executor_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package factory
import (
"github.com/SAP/remote-work-processor/internal/executors"
"github.com/SAP/remote-work-processor/internal/executors/http"
"github.com/SAP/remote-work-processor/internal/executors/kubernetes"
"github.com/SAP/remote-work-processor/internal/executors/void"
)

Expand All @@ -19,3 +20,9 @@ func httpRequestExecutorGenerator() ExecutorGenerator {
return &http.HttpRequestExecutor{}, nil
}
}

func kubernetesApiRequestExecutorGenerator() ExecutorGenerator {
return func() (executors.Executor, error) {
return &kubernetes.KubernetesApiRequestExecutor{}, nil
}
}
19 changes: 19 additions & 0 deletions internal/executors/http/bearer_authorization_header.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package http

import (
"fmt"
)

type bearerAuthorizationHeader struct {
token string
}

func NewBearerAuthorizationHeader(t string) AuthorizationHeaderGenerator {
return &bearerAuthorizationHeader{
token: t,
}
}

func (h *bearerAuthorizationHeader) Generate() (AuthorizationHeader, error) {
return NewAuthorizationHeaderView(fmt.Sprintf("Bearer %s", h.token)), nil
}
217 changes: 217 additions & 0 deletions internal/executors/kubernetes/kubernetes_api_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package kubernetes

import (
"fmt"
"os"
"strings"

pb "github.com/SAP/remote-work-processor/build/proto/generated"
"github.com/SAP/remote-work-processor/internal/cache"
"github.com/SAP/remote-work-processor/internal/executors"
"github.com/SAP/remote-work-processor/internal/executors/http"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

const (
API_VERSION string = "apiVersion"
NAMESPACE string = "namespace"
RESOURCE_TYPE string = "resourceType"
RESOURCE_NAME string = "resourceName"
PATH string = "path"
QUERY string = "query"
SHOULD_USE_LOCAL_DATA string = "shouldUseLocalData"
KUBECONFIG string = "kubeconfig"
CERT_AUTHORITY_DATA string = "certificateAuthorityData"
TOKEN string = "token"

RESPONSE_BODY string = "body"
RESPONSE_STATUS string = "status"

USER string = "user"
PASSWORD string = "password"
AUTHORIZATION_HEADER string = "authorizationHeader"
SERVER string = "server"
URL string = "url"
TRUSTED_CERTS string = "trustedCerts"

API_V1 string = "v1"
)

var API_V1_RESOURCE_TYPES = map[string]bool{"componentstatuses": true, "configmaps": true, "endpoints": true, "events": true, "limitranges": true, "namespaces": true, "persistentvolumeclaims": true, "pods": true, "podtemplates": true, "replicationcontrollers": true, "resourcequotas": true, "secrets": true, "serviceaccounts": true, "services": true, "nodes": true, "persistentvolumes": true}

type KubernetesApiRequestExecutor struct {
executors.Executor
httpExecutor http.HttpRequestExecutor
}

func (e *KubernetesApiRequestExecutor) Execute(ctx executors.ExecutorContext) *executors.ExecutorResult {
config, err := buildConfig(ctx)

if err != nil {
return err
}

if shouldUseKubeconfig(ctx) && config == nil {
return executors.NewExecutorResult(
executors.Status(pb.TaskExecutionResponseMessage_TASK_STATE_FAILED_NON_RETRYABLE),
executors.ErrorString("Kubeconfig needed but could not be resolved"),
)
}

input := ctx.GetInput()

prepareBasicAuth(ctx, input, config)
prepareToken(ctx, config, input)
prepareUrl(ctx, config, input)
prepareTrustedCerts(ctx, config, input)

return buildOutput(e.httpExecutor.Execute(ctx))
}

func buildConfig(ctx executors.ExecutorContext) (*rest.Config, *executors.ExecutorResult) {
useLocalData, err := ctx.GetBoolean(SHOULD_USE_LOCAL_DATA)
// useLocalData = true

if err != nil {
return nil, nonRetriableExecutionResult(err)
}

if useLocalData {
return getLocalKubeConfig(), nil
} else {
return buildProvidedKubeConfig(ctx)
}
}

func getLocalKubeConfig() *rest.Config {
rules := clientcmd.NewDefaultClientConfigLoadingRules()
overrides := &clientcmd.ConfigOverrides{}

kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides)

config, err := kubeConfig.ClientConfig()
if err != nil {
os.Exit(1)
}

return config
}

func buildProvidedKubeConfig(ctx executors.ExecutorContext) (*rest.Config, *executors.ExecutorResult) {
if ctx.GetString(KUBECONFIG) == "" {
return nil, nil
}

kubeconfig, err := clientcmd.NewClientConfigFromBytes([]byte(ctx.GetString(KUBECONFIG)))
if err != nil {
return nil, nonRetriableExecutionResult(err)
}

config, err := kubeconfig.ClientConfig()
if err != nil {
return nil, nonRetriableExecutionResult(err)
}

return config, nil
}

func shouldUseKubeconfig(ctx executors.ExecutorContext) bool {
return ctx.GetString(SERVER) == "" ||
(ctx.GetString(USER) == "" && ctx.GetString(TOKEN) == "") ||
(ctx.GetString(PASSWORD) == "" && ctx.GetString(TOKEN) == "")
}

func prepareBasicAuth(ctx executors.ExecutorContext, input cache.MapCache[string, string], config *rest.Config) {
user := ctx.GetString(USER)
if user == "" && config != nil {
user = config.Username
}

password := ctx.GetString(PASSWORD)
if password == "" && config != nil {
password = config.Password
}

input.Write(USER, user)
input.Write(PASSWORD, password)
}

func prepareToken(ctx executors.ExecutorContext, config *rest.Config, input cache.MapCache[string, string]) {
token := ctx.GetString(TOKEN)
if token == "" && config != nil {
token = config.BearerToken
}

if token != "" {
auth, _ := http.NewBearerAuthorizationHeader(token).Generate()
input.Write(AUTHORIZATION_HEADER, auth.GetValue())
}
}

func prepareUrl(ctx executors.ExecutorContext, config *rest.Config, input cache.MapCache[string, string]) {
server := ctx.GetString(SERVER)
if server == "" && config != nil {
server = config.Host
}

input.Write(URL, buildUrl(ctx, server))
}

func prepareTrustedCerts(ctx executors.ExecutorContext, config *rest.Config, input cache.MapCache[string, string]) {
trustedCerts := ctx.GetString(CERT_AUTHORITY_DATA)
if trustedCerts == "" && config != nil {
trustedCerts = string(config.TLSClientConfig.CAData)
}

input.Write(TRUSTED_CERTS, trustedCerts)
}

func buildUrl(ctx executors.ExecutorContext, server string) string {
var sb strings.Builder
var apiPathName string
apiVersion := ctx.GetString(API_VERSION)

if API_V1_RESOURCE_TYPES[ctx.GetString(RESOURCE_TYPE)] && apiVersion == API_V1 {
apiPathName = "api"
} else {
apiPathName = "apis"
}

sb.WriteString(fmt.Sprintf("%s/%s/%s", server, apiPathName, apiVersion))

appendOptional(&sb, ctx, NAMESPACE, "/namespaces/%s")
appendOptional(&sb, ctx, RESOURCE_TYPE, "/%s")
appendOptional(&sb, ctx, RESOURCE_NAME, "/%s")
appendOptional(&sb, ctx, PATH, "%s")
appendOptional(&sb, ctx, QUERY, "%s")

return sb.String()
}

func buildOutput(result *executors.ExecutorResult) *executors.ExecutorResult {
output := make(map[string]any)
output[RESPONSE_BODY] = result.Output[RESPONSE_BODY]
output[RESPONSE_STATUS] = result.Output[RESPONSE_STATUS]

return executors.NewExecutorResult(
executors.Output(output),
executors.Status(result.Status),
executors.ErrorString(result.Error),
)
}

func appendOptional(sb *strings.Builder, ctx executors.ExecutorContext, key string, valueFormat string) {
value := ctx.GetString(key)
if ctx.GetString(key) != "" {
sb.Write([]byte(fmt.Sprintf(valueFormat, value)))
}
}

func nonRetriableExecutionResult(err error) *executors.ExecutorResult {
return executors.NewExecutorResult(
executors.Status(pb.TaskExecutionResponseMessage_TASK_STATE_FAILED_NON_RETRYABLE),
executors.Error(err),
)
}

0 comments on commit 61d0489

Please sign in to comment.