-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix the race condition by confirming creation/deletion of machine objects #316
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ import ( | |
"github.com/golang/glog" | ||
"github.com/kubernetes-incubator/apiserver-builder/pkg/builders" | ||
|
||
"k8s.io/apimachinery/pkg/api/errors" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/labels" | ||
"k8s.io/client-go/kubernetes" | ||
|
@@ -34,14 +35,18 @@ import ( | |
clusterapiclientset "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset" | ||
listers "sigs.k8s.io/cluster-api/pkg/client/listers_generated/cluster/v1alpha1" | ||
"sigs.k8s.io/cluster-api/pkg/controller/sharedinformers" | ||
"sigs.k8s.io/cluster-api/util" | ||
) | ||
|
||
// controllerKind contains the schema.GroupVersionKind for this controller type. | ||
var controllerKind = v1alpha1.SchemeGroupVersion.WithKind("MachineSet") | ||
|
||
// reconcileMutexSleepSec is the duration to sleep before releasing the mutex lock that is held for reconcilation. | ||
// See https://github.com/kubernetes-sigs/cluster-api/issues/245 | ||
var reconcileMutexSleepSec = time.Second | ||
// stateConfirmationTimeout is the amount of time allowed to wait for desired state. | ||
var stateConfirmationTimeout = 10 * time.Second | ||
|
||
// stateConfirmationInterval is the amount of time between polling for the desired state. | ||
// The polling is against a local memory cache. | ||
var stateConfirmationInterval = 100 * time.Millisecond | ||
|
||
// +controller:group=cluster,version=v1alpha1,kind=MachineSet,resource=machinesets | ||
type MachineSetControllerImpl struct { | ||
|
@@ -59,9 +64,6 @@ type MachineSetControllerImpl struct { | |
machineLister listers.MachineLister | ||
|
||
informers *sharedinformers.SharedInformers | ||
|
||
// msKeyMuxMap holds a mutex lock for reconcilation keyed on the machineset key | ||
msKeyMuxMap map[string]sync.Mutex | ||
} | ||
|
||
// Init initializes the controller and is called by the generated code | ||
|
@@ -85,8 +87,6 @@ func (c *MachineSetControllerImpl) Init(arguments sharedinformers.ControllerInit | |
|
||
c.informers = arguments.GetSharedInformers() | ||
|
||
c.msKeyMuxMap = make(map[string]sync.Mutex) | ||
|
||
c.waitForCacheSync() | ||
} | ||
|
||
|
@@ -110,20 +110,6 @@ func (c *MachineSetControllerImpl) waitForCacheSync() { | |
// note that the current state of the cluster is calculated based on the number of machines | ||
// that are owned by the given machineSet (key). | ||
func (c *MachineSetControllerImpl) Reconcile(machineSet *v1alpha1.MachineSet) error { | ||
key, err := cache.MetaNamespaceKeyFunc(machineSet) | ||
if err != nil { | ||
glog.Errorf("Couldn't get key for object %+v.", machineSet) | ||
return err | ||
} | ||
|
||
// Lock on Reconcile, this is to avoid the change of a machine object to cause the same machineset to Reconcile | ||
// during the creation/deletion of machines, causing the incorrect number of machines to created/deleted | ||
// TODO: Find a less heavy handed approach to avoid concurrent machineset reconcilation. | ||
mux := c.msKeyMuxMap[key] | ||
mux.Lock() | ||
defer mux.Unlock() | ||
defer time.Sleep(reconcileMutexSleepSec) | ||
|
||
glog.V(4).Infof("Reconcile machineset %v", machineSet.Name) | ||
allMachines, err := c.machineLister.Machines(machineSet.Namespace).List(labels.Everything()) | ||
if err != nil { | ||
|
@@ -191,26 +177,29 @@ func (c *MachineSetControllerImpl) syncReplicas(ms *v1alpha1.MachineSet, machine | |
return fmt.Errorf("the Replicas field in Spec for machineset %v is nil, this should not be allowed.", ms.Name) | ||
} | ||
diff := len(machines) - int(*(ms.Spec.Replicas)) | ||
|
||
if diff < 0 { | ||
diff *= -1 | ||
glog.Infof("Too few replicas for %v %s/%s, need %d, creating %d", controllerKind, ms.Namespace, ms.Name, *(ms.Spec.Replicas), diff) | ||
|
||
var machineList []*v1alpha1.Machine | ||
var errstrings []string | ||
for i := 0; i < diff; i++ { | ||
glog.Infof("creating machine %d of %d, ( spec.replicas(%d) > currentMachineCount(%d) )", i+1, diff, *(ms.Spec.Replicas), len(machines)) | ||
machine := c.createMachine(ms) | ||
_, err := c.clusterAPIClient.ClusterV1alpha1().Machines(ms.Namespace).Create(machine) | ||
newMachine, err := c.clusterAPIClient.ClusterV1alpha1().Machines(ms.Namespace).Create(machine) | ||
if err != nil { | ||
glog.Errorf("unable to create a machine = %s, due to %v", machine.Name, err) | ||
errstrings = append(errstrings, err.Error()) | ||
continue | ||
} | ||
machineList = append(machineList, newMachine) | ||
} | ||
|
||
if len(errstrings) > 0 { | ||
return fmt.Errorf(strings.Join(errstrings, "; ")) | ||
} | ||
|
||
return nil | ||
return c.waitForMachineCreation(machineList) | ||
} else if diff > 0 { | ||
glog.Infof("Too many replicas for %v %s/%s, need %d, deleting %d", controllerKind, ms.Namespace, ms.Name, *(ms.Spec.Replicas), diff) | ||
|
||
|
@@ -241,6 +230,7 @@ func (c *MachineSetControllerImpl) syncReplicas(ms *v1alpha1.MachineSet, machine | |
} | ||
default: | ||
} | ||
return c.waitForMachineDeletion(machinesToDelete) | ||
} | ||
|
||
return nil | ||
|
@@ -342,3 +332,41 @@ func getMachinesToDelete(filteredMachines []*v1alpha1.Machine, diff int) []*v1al | |
// see: https://github.com/kubernetes/kube-deploy/issues/625 | ||
return filteredMachines[:diff] | ||
} | ||
|
||
func (c *MachineSetControllerImpl) waitForMachineCreation(machineList []*v1alpha1.Machine) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. waitForMachineCreation and Deletion probably have some common code with deployer/clusterctl. Maybe have a separate PR in the future to abstract out to put it into util. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agree. |
||
for _, machine := range machineList { | ||
pollErr := util.Poll(stateConfirmationInterval, stateConfirmationTimeout, func() (bool, error) { | ||
_, err := c.machineLister.Machines(machine.Namespace).Get(machine.Name) | ||
glog.Error(err) | ||
if err == nil { | ||
return true, nil | ||
} | ||
if errors.IsNotFound(err) { | ||
return false, nil | ||
} | ||
return false, err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please log all errors explicitly for debugging There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. error from the retry loop goes to pollErr which is always logged. |
||
}) | ||
if pollErr != nil { | ||
glog.Error(pollErr) | ||
return fmt.Errorf("failed waiting for machine object to be created. %v", pollErr) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (c *MachineSetControllerImpl) waitForMachineDeletion(machineList []*v1alpha1.Machine) error { | ||
for _, machine := range machineList { | ||
pollErr := util.Poll(stateConfirmationInterval, stateConfirmationTimeout, func() (bool, error) { | ||
m, err := c.machineLister.Machines(machine.Namespace).Get(machine.Name) | ||
if errors.IsNotFound(err) || !m.DeletionTimestamp.IsZero() { | ||
return true, nil | ||
} | ||
return false, err | ||
}) | ||
if pollErr != nil { | ||
glog.Error(pollErr) | ||
return fmt.Errorf("failed waiting for machine object to be deleted. %v", pollErr) | ||
} | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this too fast?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is against an local internal cache, it shouldnt be an issue.
i can raise the interval if it is a concern