Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the race condition by confirming creation/deletion of machine objects #316

Merged
merged 1 commit into from
Jun 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 53 additions & 25 deletions pkg/controller/machineset/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/golang/glog"
"github.com/kubernetes-incubator/apiserver-builder/pkg/builders"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
Expand All @@ -34,14 +35,18 @@ import (
clusterapiclientset "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset"
listers "sigs.k8s.io/cluster-api/pkg/client/listers_generated/cluster/v1alpha1"
"sigs.k8s.io/cluster-api/pkg/controller/sharedinformers"
"sigs.k8s.io/cluster-api/util"
)

// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = v1alpha1.SchemeGroupVersion.WithKind("MachineSet")

// reconcileMutexSleepSec is the duration to sleep before releasing the mutex lock that is held for reconcilation.
// See https://github.com/kubernetes-sigs/cluster-api/issues/245
var reconcileMutexSleepSec = time.Second
// stateConfirmationTimeout is the amount of time allowed to wait for desired state.
var stateConfirmationTimeout = 10 * time.Second

// stateConfirmationInterval is the amount of time between polling for the desired state.
// The polling is against a local memory cache.
var stateConfirmationInterval = 100 * time.Millisecond
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this too fast?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is against an local internal cache, it shouldnt be an issue.
i can raise the interval if it is a concern


// +controller:group=cluster,version=v1alpha1,kind=MachineSet,resource=machinesets
type MachineSetControllerImpl struct {
Expand All @@ -59,9 +64,6 @@ type MachineSetControllerImpl struct {
machineLister listers.MachineLister

informers *sharedinformers.SharedInformers

// msKeyMuxMap holds a mutex lock for reconcilation keyed on the machineset key
msKeyMuxMap map[string]sync.Mutex
}

// Init initializes the controller and is called by the generated code
Expand All @@ -85,8 +87,6 @@ func (c *MachineSetControllerImpl) Init(arguments sharedinformers.ControllerInit

c.informers = arguments.GetSharedInformers()

c.msKeyMuxMap = make(map[string]sync.Mutex)

c.waitForCacheSync()
}

Expand All @@ -110,20 +110,6 @@ func (c *MachineSetControllerImpl) waitForCacheSync() {
// note that the current state of the cluster is calculated based on the number of machines
// that are owned by the given machineSet (key).
func (c *MachineSetControllerImpl) Reconcile(machineSet *v1alpha1.MachineSet) error {
key, err := cache.MetaNamespaceKeyFunc(machineSet)
if err != nil {
glog.Errorf("Couldn't get key for object %+v.", machineSet)
return err
}

// Lock on Reconcile, this is to avoid the change of a machine object to cause the same machineset to Reconcile
// during the creation/deletion of machines, causing the incorrect number of machines to created/deleted
// TODO: Find a less heavy handed approach to avoid concurrent machineset reconcilation.
mux := c.msKeyMuxMap[key]
mux.Lock()
defer mux.Unlock()
defer time.Sleep(reconcileMutexSleepSec)

glog.V(4).Infof("Reconcile machineset %v", machineSet.Name)
allMachines, err := c.machineLister.Machines(machineSet.Namespace).List(labels.Everything())
if err != nil {
Expand Down Expand Up @@ -191,26 +177,29 @@ func (c *MachineSetControllerImpl) syncReplicas(ms *v1alpha1.MachineSet, machine
return fmt.Errorf("the Replicas field in Spec for machineset %v is nil, this should not be allowed.", ms.Name)
}
diff := len(machines) - int(*(ms.Spec.Replicas))

if diff < 0 {
diff *= -1
glog.Infof("Too few replicas for %v %s/%s, need %d, creating %d", controllerKind, ms.Namespace, ms.Name, *(ms.Spec.Replicas), diff)

var machineList []*v1alpha1.Machine
var errstrings []string
for i := 0; i < diff; i++ {
glog.Infof("creating machine %d of %d, ( spec.replicas(%d) > currentMachineCount(%d) )", i+1, diff, *(ms.Spec.Replicas), len(machines))
machine := c.createMachine(ms)
_, err := c.clusterAPIClient.ClusterV1alpha1().Machines(ms.Namespace).Create(machine)
newMachine, err := c.clusterAPIClient.ClusterV1alpha1().Machines(ms.Namespace).Create(machine)
if err != nil {
glog.Errorf("unable to create a machine = %s, due to %v", machine.Name, err)
errstrings = append(errstrings, err.Error())
continue
}
machineList = append(machineList, newMachine)
}

if len(errstrings) > 0 {
return fmt.Errorf(strings.Join(errstrings, "; "))
}

return nil
return c.waitForMachineCreation(machineList)
} else if diff > 0 {
glog.Infof("Too many replicas for %v %s/%s, need %d, deleting %d", controllerKind, ms.Namespace, ms.Name, *(ms.Spec.Replicas), diff)

Expand Down Expand Up @@ -241,6 +230,7 @@ func (c *MachineSetControllerImpl) syncReplicas(ms *v1alpha1.MachineSet, machine
}
default:
}
return c.waitForMachineDeletion(machinesToDelete)
}

return nil
Expand Down Expand Up @@ -342,3 +332,41 @@ func getMachinesToDelete(filteredMachines []*v1alpha1.Machine, diff int) []*v1al
// see: https://github.com/kubernetes/kube-deploy/issues/625
return filteredMachines[:diff]
}

func (c *MachineSetControllerImpl) waitForMachineCreation(machineList []*v1alpha1.Machine) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitForMachineCreation and Deletion probably have some common code with deployer/clusterctl. Maybe have a separate PR in the future to abstract out to put it into util.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree.
will clean up in future PR.

for _, machine := range machineList {
pollErr := util.Poll(stateConfirmationInterval, stateConfirmationTimeout, func() (bool, error) {
_, err := c.machineLister.Machines(machine.Namespace).Get(machine.Name)
glog.Error(err)
if err == nil {
return true, nil
}
if errors.IsNotFound(err) {
return false, nil
}
return false, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please log all errors explicitly for debugging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error from the retry loop goes to pollErr which is always logged.

})
if pollErr != nil {
glog.Error(pollErr)
return fmt.Errorf("failed waiting for machine object to be created. %v", pollErr)
}
}
return nil
}

func (c *MachineSetControllerImpl) waitForMachineDeletion(machineList []*v1alpha1.Machine) error {
for _, machine := range machineList {
pollErr := util.Poll(stateConfirmationInterval, stateConfirmationTimeout, func() (bool, error) {
m, err := c.machineLister.Machines(machine.Namespace).Get(machine.Name)
if errors.IsNotFound(err) || !m.DeletionTimestamp.IsZero() {
return true, nil
}
return false, err
})
if pollErr != nil {
glog.Error(pollErr)
return fmt.Errorf("failed waiting for machine object to be deleted. %v", pollErr)
}
}
return nil
}
124 changes: 119 additions & 5 deletions pkg/controller/machineset/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ import (
"time"

"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
clienttesting "k8s.io/client-go/testing"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"

"sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1"
"sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset/fake"
v1alpha1listers "sigs.k8s.io/cluster-api/pkg/client/listers_generated/cluster/v1alpha1"
Expand All @@ -36,15 +40,60 @@ const (
labelKey = "type"
)

type fakeMachineLister struct {
indexer cache.Indexer
}

// List lists all Machines in the indexer.
func (s *fakeMachineLister) List(selector labels.Selector) (ret []*v1alpha1.Machine, err error) {
err = cache.ListAll(s.indexer, selector, func(m interface{}) {
ret = append(ret, m.(*v1alpha1.Machine))
})
return ret, err
}

// Machines returns an object that can list and get Machines.
func (s *fakeMachineLister) Machines(namespace string) v1alpha1listers.MachineNamespaceLister {
return fakeMachineNamespaceLister{indexer: s.indexer, namespace: namespace}
}

type fakeMachineNamespaceLister struct {
indexer cache.Indexer
namespace string
}

func (s fakeMachineNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.Machine, err error) {
err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
ret = append(ret, m.(*v1alpha1.Machine))
})
return ret, err
}

func (s fakeMachineNamespaceLister) Get(name string) (*v1alpha1.Machine, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(v1alpha1.Resource("machine"), name)
}
return obj.(*v1alpha1.Machine), nil
}

func TestMachineSetControllerReconcileHandler(t *testing.T) {
now := time.Now()

tests := []struct {
name string
startingMachineSets []*v1alpha1.MachineSet
startingMachines []*v1alpha1.Machine
machineSetToSync string
namespaceToSync string
confirmationTimeout *time.Duration
deletionTimestamp *time.Time
expectedMachine *v1alpha1.Machine
expectedActions []string
expectedError bool
}{
{
name: "scenario 1: the current state of the cluster is empty, thus a machine is created.",
Expand Down Expand Up @@ -95,6 +144,7 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) {
machineSetToSync: "foo",
namespaceToSync: "acme",
expectedActions: []string{"create"},
expectedMachine: machineFromMachineSet(createMachineSet(1, "foo", "bar2", "acme"), "bar2"),
},
{
name: "scenario 7: the current machine is missing owner refs, machine should be adopted.",
Expand All @@ -112,6 +162,7 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) {
machineSetToSync: "foo",
namespaceToSync: "acme",
expectedActions: []string{"create"},
expectedMachine: machineFromMachineSet(createMachineSet(1, "foo", "bar2", "acme"), "bar2"),
},
{
name: "scenario 9: the current machine is being deleted, thus a machine is created.",
Expand All @@ -120,6 +171,7 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) {
machineSetToSync: "foo",
namespaceToSync: "acme",
expectedActions: []string{"create"},
expectedMachine: machineFromMachineSet(createMachineSet(1, "foo", "bar2", "acme"), "bar2"),
},
{
name: "scenario 10: the current machine has no controller refs, owner refs preserved, machine should be adopted.",
Expand All @@ -130,11 +182,37 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) {
expectedActions: []string{"update"},
expectedMachine: machineWithMultipleOwnerRefs(createMachineSet(1, "foo", "bar2", "acme"), "bar1"),
},
{
name: "scenario 11: create confirmation timed out, err.",
startingMachineSets: []*v1alpha1.MachineSet{createMachineSet(1, "foo", "bar1", "acme")},
startingMachines: nil,
machineSetToSync: "foo",
namespaceToSync: "acme",
expectedError: true,
},
{
name: "scenario 12: delete confirmation timed out, err.",
startingMachineSets: []*v1alpha1.MachineSet{createMachineSet(0, "foo", "bar2", "acme")},
startingMachines: []*v1alpha1.Machine{machineFromMachineSet(createMachineSet(1, "foo", "bar1", "acme"), "bar1")},
machineSetToSync: "foo",
namespaceToSync: "acme",
expectedError: true,
},
{
name: "scenario 13: delete confirmation accepts delete non-zero timestamp.",
startingMachineSets: []*v1alpha1.MachineSet{createMachineSet(0, "foo", "bar2", "acme")},
startingMachines: []*v1alpha1.Machine{machineFromMachineSet(createMachineSet(1, "foo", "bar1", "acme"), "bar1")},
machineSetToSync: "foo",
namespaceToSync: "acme",
deletionTimestamp: &now,
expectedActions: []string{"delete"},
},
}

reconcileMutexSleepSec = 0
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
stateConfirmationTimeout = 1 * time.Millisecond

// setup the test scenario
rObjects := []runtime.Object{}
machinesIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
Expand All @@ -154,21 +232,57 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) {
rObjects = append(rObjects, amachineset)
}
fakeClient := fake.NewSimpleClientset(rObjects...)
machineLister := v1alpha1listers.NewMachineLister(machinesIndexer)

machineLister := fakeMachineLister{indexer: machinesIndexer}
machineSetLister := v1alpha1listers.NewMachineSetLister(machineSetIndexer)
target := &MachineSetControllerImpl{}
target.clusterAPIClient = fakeClient
target.machineSetLister = machineSetLister
target.machineLister = machineLister
target.machineLister = &machineLister

fakeClient.PrependReactor("create", "machines", func(action core.Action) (bool, runtime.Object, error) {
if test.expectedError {
return true, nil, errors.NewNotFound(v1alpha1.Resource("machine"), "somemachine")
}
if test.expectedMachine != nil {
machineLister.indexer.Add(test.expectedMachine)
}
return true, test.expectedMachine, nil
})

fakeClient.PrependReactor("delete", "machines", func(action core.Action) (bool, runtime.Object, error) {
if test.deletionTimestamp != nil {
machineLister.indexer.Delete(test.startingMachines[0])
m := test.startingMachines[0].DeepCopy()
timestamp := metav1.NewTime(*test.deletionTimestamp)
m.ObjectMeta.DeletionTimestamp = &timestamp
machineLister.indexer.Add(m)
return true, nil, nil
}
if test.expectedError {
return false, nil, nil
}
for i, action := range test.expectedActions {
if action == "delete" {
machineLister.indexer.Delete(test.startingMachines[i])
}
}
return true, nil, nil
})

// act
machineSetToTest, err := target.Get(test.namespaceToSync, test.machineSetToSync)
if err != nil {
t.Fatal(err)
}
err = target.Reconcile(machineSetToTest)
if err != nil {
t.Fatal(err)

if test.expectedError != (err != nil) {
t.Fatalf("Unexpected reconcile err: got %v, expected %v. %v", (err != nil), test.expectedError, err)
return
}
if test.expectedError {
return
}

// validate
Expand Down