From b7b7998dddd6ce62794d756aa2a6751674469ac7 Mon Sep 17 00:00:00 2001 From: epipho Date: Thu, 25 Dec 2014 02:09:07 -0500 Subject: [PATCH 1/3] api: allow machine metadata to be updated via the http api --- api/machines.go | 74 ++++++++++++++++++-- api/machines_test.go | 156 ++++++++++++++++++++++++++++++++++++++++-- client/api.go | 2 + registry/fake.go | 18 +++++ registry/interface.go | 2 + registry/machine.go | 67 +++++++++++++++--- 6 files changed, 300 insertions(+), 19 deletions(-) diff --git a/api/machines.go b/api/machines.go index 990d7d80c..e8442ad4f 100644 --- a/api/machines.go +++ b/api/machines.go @@ -15,9 +15,11 @@ package api import ( - "fmt" + "encoding/json" + "errors" "net/http" "path" + "regexp" "github.com/coreos/fleet/client" "github.com/coreos/fleet/log" @@ -25,6 +27,10 @@ import ( "github.com/coreos/fleet/schema" ) +var ( + metadataPathRegex = regexp.MustCompile("^/([^/]+)/metadata/([A-Za-z0-9_.-]+$)") +) + func wireUpMachinesResource(mux *http.ServeMux, prefix string, cAPI client.API) { res := path.Join(prefix, "machines") mr := machinesResource{cAPI} @@ -35,12 +41,24 @@ type machinesResource struct { cAPI client.API } +type machineMetadataOp struct { + Operation string `json:"op"` + Path string + Value string +} + func (mr *machinesResource) ServeHTTP(rw http.ResponseWriter, req *http.Request) { - if req.Method != "GET" { - sendError(rw, http.StatusBadRequest, fmt.Errorf("only HTTP GET supported against this resource")) - return + switch req.Method { + case "GET": + mr.list(rw, req) + case "PATCH": + mr.patch(rw, req) + default: + sendError(rw, http.StatusMethodNotAllowed, errors.New("only GET and PATCH supported against this resource")) } +} +func (mr *machinesResource) list(rw http.ResponseWriter, req *http.Request) { token, err := findNextPageToken(req.URL) if err != nil { sendError(rw, http.StatusBadRequest, err) @@ -62,6 +80,54 @@ func (mr *machinesResource) ServeHTTP(rw http.ResponseWriter, req *http.Request) sendResponse(rw, http.StatusOK, page) } +func (mr *machinesResource) patch(rw http.ResponseWriter, req *http.Request) { + var ops []machineMetadataOp + dec := json.NewDecoder(req.Body) + if err := dec.Decode(&ops); err != nil { + sendError(rw, http.StatusBadRequest, err) + return + } + + for _, op := range ops { + if op.Operation != "add" && op.Operation != "remove" && op.Operation != "replace" { + sendError(rw, http.StatusBadRequest, errors.New("invalid op: expect add, remove, or replace")) + return + } + + if metadataPathRegex.FindStringSubmatch(op.Path) == nil { + sendError(rw, http.StatusBadRequest, errors.New("machine metadata path invalid")) + return + } + + if op.Operation != "remove" && len(op.Value) == 0 { + sendError(rw, http.StatusBadRequest, errors.New("invalid value: add and replace require a value")) + return + } + } + + for _, op := range ops { + // regex already validated above + s := metadataPathRegex.FindStringSubmatch(op.Path) + machID := s[1] + key := s[2] + + if op.Operation == "remove" { + err := mr.cAPI.DeleteMachineMetadata(machID, key) + if err != nil { + sendError(rw, http.StatusInternalServerError, err) + return + } + } else { + err := mr.cAPI.SetMachineMetadata(machID, key, op.Value) + if err != nil { + sendError(rw, http.StatusInternalServerError, err) + return + } + } + } + sendResponse(rw, http.StatusNoContent, "") +} + func getMachinePage(cAPI client.API, tok PageToken) (*schema.MachinePage, error) { all, err := cAPI.Machines() if err != nil { diff --git a/api/machines_test.go b/api/machines_test.go index 969b5c590..674680371 100644 --- a/api/machines_test.go +++ b/api/machines_test.go @@ -19,6 +19,7 @@ import ( "net/http/httptest" "reflect" "strconv" + "strings" "testing" "github.com/coreos/fleet/client" @@ -26,16 +27,22 @@ import ( "github.com/coreos/fleet/registry" ) -func TestMachinesList(t *testing.T) { +func fakeMachinesSetup() (*machinesResource, *httptest.ResponseRecorder) { fr := registry.NewFakeRegistry() fr.SetMachines([]machine.MachineState{ - {ID: "XXX", PublicIP: "", Metadata: nil}, + {ID: "XXX", PublicIP: "", Metadata: map[string]string{}}, {ID: "YYY", PublicIP: "1.2.3.4", Metadata: map[string]string{"ping": "pong"}}, }) fAPI := &client.RegistryClient{Registry: fr} resource := &machinesResource{cAPI: fAPI} rw := httptest.NewRecorder() - req, err := http.NewRequest("GET", "http://example.com", nil) + + return resource, rw +} + +func TestMachinesList(t *testing.T) { + resource, rw := fakeMachinesSetup() + req, err := http.NewRequest("GET", "http://example.com/machines", nil) if err != nil { t.Fatalf("Failed creating http.Request: %v", err) } @@ -63,11 +70,23 @@ func TestMachinesList(t *testing.T) { } } +func TestMachinesListBadMethod(t *testing.T) { + resource, rw := fakeMachinesSetup() + req, err := http.NewRequest("POST", "http://example.com/machines", nil) + if err != nil { + t.Fatalf("Failed creating http.Request: %v", err) + } + + resource.ServeHTTP(rw, req) + + err = assertErrorResponse(rw, http.StatusMethodNotAllowed) + if err != nil { + t.Error(err.Error()) + } +} + func TestMachinesListBadNextPageToken(t *testing.T) { - fr := registry.NewFakeRegistry() - fAPI := &client.RegistryClient{Registry: fr} - resource := &machinesResource{fAPI} - rw := httptest.NewRecorder() + resource, rw := fakeMachinesSetup() req, err := http.NewRequest("GET", "http://example.com/machines?nextPageToken=EwBMLg==", nil) if err != nil { t.Fatalf("Failed creating http.Request: %v", err) @@ -136,3 +155,126 @@ func TestExtractMachinePage(t *testing.T) { } } } + +func TestMachinesPatchAddModify(t *testing.T) { + reqBody := ` + [{"op": "add", "path": "/XXX/metadata/foo", "value": "bar"}, + {"op": "replace", "path": "/YYY/metadata/ping", "value": "splat"}] + ` + + resource, rw := fakeMachinesSetup() + req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody)) + if err != nil { + t.Fatalf("Failed creating http.Request: %v", err) + } + + resource.ServeHTTP(rw, req) + if rw.Code != http.StatusNoContent { + t.Errorf("Expected 204, got %d", rw.Code) + } + + // fetch machine to make sure data has been added + req, err = http.NewRequest("GET", "http://example.com/machines", nil) + if err != nil { + t.Fatalf("Failed creating http.Request: %v", err) + } + rw.Body.Reset() + resource.ServeHTTP(rw, req) + + if rw.Body == nil { + t.Error("Received nil response body") + } else { + body := rw.Body.String() + expected := `{"machines":[{"id":"XXX","metadata":{"foo":"bar"}},{"id":"YYY","metadata":{"ping":"splat"},"primaryIP":"1.2.3.4"}]}` + if body != expected { + t.Errorf("Expected body:\n%s\n\nReceived body:\n%s\n", expected, body) + } + } +} + +func TestMachinesPatchDelete(t *testing.T) { + reqBody := ` + [{"op": "remove", "path": "/XXX/metadata/foo"}, + {"op": "remove", "path": "/YYY/metadata/ping"}] + ` + + resource, rw := fakeMachinesSetup() + req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody)) + if err != nil { + t.Fatalf("Failed creating http.Request: %v", err) + } + + resource.ServeHTTP(rw, req) + if rw.Code != http.StatusNoContent { + t.Errorf("Expected 204, got %d", rw.Code) + } + + // fetch machine to make sure data has been added + req, err = http.NewRequest("GET", "http://example.com/machines", nil) + if err != nil { + t.Fatalf("Failed creating http.Request: %v", err) + } + rw.Body.Reset() + resource.ServeHTTP(rw, req) + + if rw.Body == nil { + t.Error("Received nil response body") + } else { + body := rw.Body.String() + expected := `{"machines":[{"id":"XXX"},{"id":"YYY","primaryIP":"1.2.3.4"}]}` + if body != expected { + t.Errorf("Expected body:\n%s\n\nReceived body:\n%s\n", expected, body) + } + } +} + +func TestMachinesPatchBadOp(t *testing.T) { + reqBody := ` + [{"op": "noop", "path": "/XXX/metadata/foo", "value": "bar"}] + ` + + resource, rw := fakeMachinesSetup() + req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody)) + if err != nil { + t.Fatalf("Failed creating http.Request: %v", err) + } + + resource.ServeHTTP(rw, req) + if rw.Code != http.StatusBadRequest { + t.Errorf("Expected 400, got %d", rw.Code) + } +} + +func TestMachinesPatchBadPath(t *testing.T) { + reqBody := ` + [{"op": "add", "path": "/XXX/foo", "value": "bar"}] + ` + + resource, rw := fakeMachinesSetup() + req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody)) + if err != nil { + t.Fatalf("Failed creating http.Request: %v", err) + } + + resource.ServeHTTP(rw, req) + if rw.Code != http.StatusBadRequest { + t.Errorf("Expected 400, got %d", rw.Code) + } +} + +func TestMachinesPatchBadValue(t *testing.T) { + reqBody := ` + [{"op": "add", "path": "/XXX/foo"}] + ` + + resource, rw := fakeMachinesSetup() + req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody)) + if err != nil { + t.Fatalf("Failed creating http.Request: %v", err) + } + + resource.ServeHTTP(rw, req) + if rw.Code != http.StatusBadRequest { + t.Errorf("Expected 400, got %d", rw.Code) + } +} diff --git a/client/api.go b/client/api.go index 470007dfe..10a146e1d 100644 --- a/client/api.go +++ b/client/api.go @@ -21,6 +21,8 @@ import ( type API interface { Machines() ([]machine.MachineState, error) + SetMachineMetadata(machID, key, value string) error + DeleteMachineMetadata(machID, key string) error Unit(string) (*schema.Unit, error) Units() ([]*schema.Unit, error) diff --git a/registry/fake.go b/registry/fake.go index 2a6c8bc5a..e6613ae87 100644 --- a/registry/fake.go +++ b/registry/fake.go @@ -279,6 +279,24 @@ func (f *FakeRegistry) UnitHeartbeat(name, machID string, ttl time.Duration) err func (f *FakeRegistry) ClearUnitHeartbeat(string) {} +func (f *FakeRegistry) SetMachineMetadata(machID string, key string, value string) error { + for _, mach := range f.machines { + if mach.ID == machID { + mach.Metadata[key] = value + } + } + return nil +} + +func (f *FakeRegistry) DeleteMachineMetadata(machID string, key string) error { + for _, mach := range f.machines { + if mach.ID == machID { + delete(mach.Metadata, key) + } + } + return nil +} + func NewFakeClusterRegistry(dVersion *semver.Version, eVersion int) *FakeClusterRegistry { return &FakeClusterRegistry{ dVersion: dVersion, diff --git a/registry/interface.go b/registry/interface.go index 270a2ef9d..13b3da642 100644 --- a/registry/interface.go +++ b/registry/interface.go @@ -37,6 +37,8 @@ type Registry interface { SetUnitTargetState(name string, state job.JobState) error SetMachineState(ms machine.MachineState, ttl time.Duration) (uint64, error) UnscheduleUnit(name, machID string) error + SetMachineMetadata(machID, key, value string) error + DeleteMachineMetadata(machID, key string) error UnitRegistry } diff --git a/registry/machine.go b/registry/machine.go index 3ed8a1812..46e438899 100644 --- a/registry/machine.go +++ b/registry/machine.go @@ -43,17 +43,26 @@ func (r *EtcdRegistry) Machines() (machines []machine.MachineState, err error) { } for _, node := range resp.Node.Nodes { - for _, obj := range node.Nodes { - if !strings.HasSuffix(obj.Key, "/object") { - continue - } + var mach machine.MachineState + var metadata map[string]string - var mach machine.MachineState - err = unmarshal(obj.Value, &mach) - if err != nil { - return + for _, obj := range node.Nodes { + if strings.HasSuffix(obj.Key, "/object") { + err = unmarshal(obj.Value, &mach) + if err != nil { + return + } + } else if strings.HasSuffix(obj.Key, "/metadata") { + // Load metadata into a separate map to avoid stepping on it when deserializing the object key + metadata = make(map[string]string, len(obj.Nodes)) + for _, mdnode := range obj.Nodes { + metadata[path.Base(mdnode.Key)] = mdnode.Value + } } + } + if mach.ID != "" { + mach.Metadata = mergeMetadata(mach.Metadata, metadata) machines = append(machines, mach) } } @@ -94,6 +103,23 @@ func (r *EtcdRegistry) SetMachineState(ms machine.MachineState, ttl time.Duratio return resp.Node.ModifiedIndex, nil } +func (r *EtcdRegistry) SetMachineMetadata(machID string, key string, value string) error { + update := etcd.Set{ + Key: path.Join(r.keyPrefix, machinePrefix, machID, "metadata", key), + Value: value, + } + + _, err := r.etcd.Do(&update) + return err +} + +func (r *EtcdRegistry) DeleteMachineMetadata(machID string, key string) error { + // Deleting a key sets its value to "" to allow for intelligent merging + // between the machine-defined metadata and the dynamic metadata. + // See mergeMetadata for more detail. + return r.SetMachineMetadata(machID, key, "") +} + func (r *EtcdRegistry) RemoveMachineState(machID string) error { req := etcd.Delete{ Key: path.Join(r.keyPrefix, machinePrefix, machID, "object"), @@ -104,3 +130,28 @@ func (r *EtcdRegistry) RemoveMachineState(machID string) error { } return err } + +// mergeMetadata merges the machine-set metadata with the dynamic metadata to better facilitate +// machines leaving and rejoining a cluster. +// Merging metadata uses the following rules: +// - Any keys that are only in one collection are added as-is +// - Any keys that exist in both, the dynamic value takes precence +// - Any keys that have a zero-value string in the dynamic metadata are considered deleted +// and are not included in the final collection +func mergeMetadata(machineMetadata, dynamicMetadata map[string]string) map[string]string { + if dynamicMetadata == nil { + return machineMetadata + } + finalMetadata := make(map[string]string, len(dynamicMetadata)) + for k, v := range machineMetadata { + finalMetadata[k] = v + } + for k, v := range dynamicMetadata { + if v == "" { + delete(finalMetadata, k) + } else { + finalMetadata[k] = v + } + } + return finalMetadata +} From 5941e1f32e855b8fdf8233a8ffb7105872809656 Mon Sep 17 00:00:00 2001 From: epipho Date: Sat, 28 Feb 2015 20:23:06 -0500 Subject: [PATCH 2/3] Agent reconciler now uses machine state from registry when reconciling global units --- agent/reconcile.go | 8 +++++- agent/reconcile_test.go | 1 + api/machines.go | 2 +- registry/fake.go | 13 ++++++++++ registry/interface.go | 1 + registry/machine.go | 57 +++++++++++++++++++++++++++++------------ 6 files changed, 64 insertions(+), 18 deletions(-) diff --git a/agent/reconcile.go b/agent/reconcile.go index a81803418..07fd9cd65 100644 --- a/agent/reconcile.go +++ b/agent/reconcile.go @@ -127,7 +127,13 @@ func desiredAgentState(a *Agent, reg registry.Registry) (*AgentState, error) { return nil, err } - ms := a.Machine.State() + // fetch full machine state from registry instead of + // using the local version to allow for dynamic metadata + ms, err := reg.MachineState(a.Machine.State().ID) + if err != nil { + log.Errorf("Failed fetching machine state from Registry: %v", err) + return nil, err + } as := AgentState{ MState: &ms, Units: make(map[string]*job.Unit), diff --git a/agent/reconcile_test.go b/agent/reconcile_test.go index 159bf08d7..efdaf8b82 100644 --- a/agent/reconcile_test.go +++ b/agent/reconcile_test.go @@ -254,6 +254,7 @@ MachineMetadata=dog=woof`), reg := registry.NewFakeRegistry() reg.SetJobs(tt.regJobs) a := makeAgentWithMetadata(tt.metadata) + reg.SetMachines([]machine.MachineState{a.Machine.State()}) as, err := desiredAgentState(a, reg) if err != nil { t.Errorf("case %d: unexpected error: %v", i, err) diff --git a/api/machines.go b/api/machines.go index e8442ad4f..2beff4aa3 100644 --- a/api/machines.go +++ b/api/machines.go @@ -125,7 +125,7 @@ func (mr *machinesResource) patch(rw http.ResponseWriter, req *http.Request) { } } } - sendResponse(rw, http.StatusNoContent, "") + sendResponse(rw, http.StatusNoContent, nil) } func getMachinePage(cAPI client.API, tok PageToken) (*schema.MachinePage, error) { diff --git a/registry/fake.go b/registry/fake.go index e6613ae87..e99d7b0d4 100644 --- a/registry/fake.go +++ b/registry/fake.go @@ -297,6 +297,19 @@ func (f *FakeRegistry) DeleteMachineMetadata(machID string, key string) error { return nil } +func (f *FakeRegistry) MachineState(machID string) (machine.MachineState, error) { + f.RLock() + defer f.RUnlock() + + for _, mach := range f.machines { + if mach.ID == machID { + return mach, nil + } + } + + return machine.MachineState{}, errors.New("Machine state not found") +} + func NewFakeClusterRegistry(dVersion *semver.Version, eVersion int) *FakeClusterRegistry { return &FakeClusterRegistry{ dVersion: dVersion, diff --git a/registry/interface.go b/registry/interface.go index 13b3da642..2224b4d69 100644 --- a/registry/interface.go +++ b/registry/interface.go @@ -36,6 +36,7 @@ type Registry interface { ScheduleUnit(name, machID string) error SetUnitTargetState(name string, state job.JobState) error SetMachineState(ms machine.MachineState, ttl time.Duration) (uint64, error) + MachineState(machID string) (machine.MachineState, error) UnscheduleUnit(name, machID string) error SetMachineMetadata(machID, key, value string) error DeleteMachineMetadata(machID, key string) error diff --git a/registry/machine.go b/registry/machine.go index 46e438899..cb307a210 100644 --- a/registry/machine.go +++ b/registry/machine.go @@ -44,25 +44,12 @@ func (r *EtcdRegistry) Machines() (machines []machine.MachineState, err error) { for _, node := range resp.Node.Nodes { var mach machine.MachineState - var metadata map[string]string - - for _, obj := range node.Nodes { - if strings.HasSuffix(obj.Key, "/object") { - err = unmarshal(obj.Value, &mach) - if err != nil { - return - } - } else if strings.HasSuffix(obj.Key, "/metadata") { - // Load metadata into a separate map to avoid stepping on it when deserializing the object key - metadata = make(map[string]string, len(obj.Nodes)) - for _, mdnode := range obj.Nodes { - metadata[path.Base(mdnode.Key)] = mdnode.Value - } - } + mach, err = readMachineState(&node) + if err != nil { + return } if mach.ID != "" { - mach.Metadata = mergeMetadata(mach.Metadata, metadata) machines = append(machines, mach) } } @@ -103,6 +90,21 @@ func (r *EtcdRegistry) SetMachineState(ms machine.MachineState, ttl time.Duratio return resp.Node.ModifiedIndex, nil } +func (r *EtcdRegistry) MachineState(machID string) (machine.MachineState, error) { + req := etcd.Get{ + Key: path.Join(r.keyPrefix, machinePrefix, machID), + Sorted: true, + Recursive: true, + } + + resp, err := r.etcd.Do(&req) + if err != nil { + return machine.MachineState{}, err + } + + return readMachineState(resp.Node) +} + func (r *EtcdRegistry) SetMachineMetadata(machID string, key string, value string) error { update := etcd.Set{ Key: path.Join(r.keyPrefix, machinePrefix, machID, "metadata", key), @@ -155,3 +157,26 @@ func mergeMetadata(machineMetadata, dynamicMetadata map[string]string) map[strin } return finalMetadata } + +// readMachineState reads machine state from an etcd node +func readMachineState(node *etcd.Node) (mach machine.MachineState, err error) { + var metadata map[string]string + + for _, obj := range node.Nodes { + if strings.HasSuffix(obj.Key, "/object") { + err = unmarshal(obj.Value, &mach) + if err != nil { + return + } + } else if strings.HasSuffix(obj.Key, "/metadata") { + // Load metadata into a separate map to avoid stepping on it when deserializing the object key + metadata = make(map[string]string, len(obj.Nodes)) + for _, mdnode := range obj.Nodes { + metadata[path.Base(mdnode.Key)] = mdnode.Value + } + } + } + + mach.Metadata = mergeMetadata(mach.Metadata, metadata) + return +} From 33542ef83d92217145338b53e3b353196de510b8 Mon Sep 17 00:00:00 2001 From: epipho Date: Sun, 15 Mar 2015 18:15:18 -0400 Subject: [PATCH 3/3] Adding documentation for dynamic metadata api --- Documentation/api-v1.md | 584 +++++++++++---------- Documentation/unit-files-and-scheduling.md | 320 +++++------ 2 files changed, 465 insertions(+), 439 deletions(-) diff --git a/Documentation/api-v1.md b/Documentation/api-v1.md index 996acdfd2..0b5b235d9 100644 --- a/Documentation/api-v1.md +++ b/Documentation/api-v1.md @@ -1,279 +1,305 @@ -# fleet API v1 - -The fleet API allows you to manage the state of the cluster using JSON over HTTP. - -## Managing Units - -Create and modify Unit entities to communicate to fleet the desired state of the cluster. -This simply declares what *should* be happening; the backend system still has to react to the changes in this desired state. -The actual state of the system is communicated with UnitState entities. - -### Unit Entity - -- **name**: (readonly) unique identifier of entity -- **options**: list of UnitOption entities -- **desiredState**: state the user wishes the Unit to be in ("inactive", "loaded", or "launched") -- **currentState**: (readonly) state the Unit is currently in (same possible values as desiredState) -- **machineID**: ID of machine to which the Unit is scheduled - -A UnitOption represents a single option in a systemd unit file. - -- **section**: name of section that contains the option (e.g. "Unit", "Service", "Socket") -- **name**: name of option (e.g. "BindsTo", "After", "ExecStart") -- **value**: value of option (e.g. "/usr/bin/docker run busybox /bin/sleep 1000") - -### Create a Unit - -#### Request - -Create a Unit by passing a partial Unit entity to the /units resource. -The options and desiredState fields are required, and all other Unit fields will be ignored. - -The base request looks like this: - -``` -PUT /units/ HTTP/1.1 - -{"desiredState": , "options": [