Skip to content

Commit

Permalink
Cherry-pick add option skip-wait-for-delete-timeout changes
Browse files Browse the repository at this point in the history
This PR moves code from 2 upstream PRs to this repo. It adds support for context(used to cancel waitForDelete) and
allows to ignore pods that have a DeletionTimestamp older than a user-provided.
See below PRs for more detailed description:

kubernetes/kubernetes#85577
kubernetes/kubernetes#85574
  • Loading branch information
Alexander Demichev committed Jan 13, 2020
1 parent 463c63d commit 1546dd8
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 10 deletions.
101 changes: 92 additions & 9 deletions pkg/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package drain

import (
"context"
"errors"
"fmt"
"math"
Expand All @@ -43,6 +44,8 @@ import (
)

type DrainOptions struct {
Ctx context.Context

// Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet.
Force bool

Expand Down Expand Up @@ -72,6 +75,12 @@ type DrainOptions struct {

// Logger allows callers to plug in their preferred logger.
Logger golog.Logger

// SkipWaitForDeleteTimeoutSeconds ignores pods that have a
// DeletionTimeStamp > N seconds. It's up to the user to decide when this
// option is appropriate; examples include the Node is unready and the pods
// won't drain otherwise
SkipWaitForDeleteTimeoutSeconds int
}

// Takes a pod and returns a bool indicating whether or not to operate on the
Expand All @@ -94,8 +103,21 @@ const (
kLocalStorageWarning = "deleting pods with local storage"
kUnmanagedFatal = "pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet (use Force to override)"
kUnmanagedWarning = "deleting pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet"
podSkipMsgTemplate = "pod %q has DeletionTimestamp older than %v seconds, skipping\n"
)

type waitForDeleteParams struct {
ctx context.Context
pods []corev1.Pod
interval time.Duration
timeout time.Duration
usingEviction bool
getPodFn func(string, string) (*corev1.Pod, error)
globalTimeout time.Duration
skipWaitForDeleteTimeoutSeconds int
logger golog.Logger
}

// GetNodes looks up the nodes (either given by name as arguments or
// by the Selector option).
func GetNodes(client typedcorev1.NodeInterface, nodes []string, selector string) (out []*corev1.Node, err error) {
Expand Down Expand Up @@ -418,10 +440,22 @@ func evictPods(client typedpolicyv1beta1.PolicyV1beta1Interface, pods []corev1.P
globalTimeout = options.Timeout
}

ctx, cancel := context.WithTimeout(options.getContext(), globalTimeout)
defer cancel()

for _, pod := range pods {
go func(pod corev1.Pod, returnCh chan error, stopCh chan struct{}) {
var err error
for {
logf(options.Logger, "evicting pod %q\n", pod.Name)
select {
case <-ctx.Done():
// return here or we'll leak a goroutine.
returnCh <- fmt.Errorf("error when evicting pod %q: global timeout reached: %v", pod.Name, globalTimeout)
return
default:
}

err = evictPod(client, pod, policyGroupVersion, options.GracePeriodSeconds)
if err == nil {
break
Expand All @@ -442,8 +476,20 @@ func evictPods(client typedpolicyv1beta1.PolicyV1beta1Interface, pods []corev1.P
return
}
}
podArray := []corev1.Pod{pod}
_, err = waitForDelete(podArray, 1*time.Second, time.Duration(globalTimeout), true, options.Logger, getPodFn)

params := waitForDeleteParams{
ctx: ctx,
pods: []corev1.Pod{pod},
interval: 1 * time.Second,
timeout: time.Duration(math.MaxInt64),
usingEviction: true,
getPodFn: getPodFn,
globalTimeout: globalTimeout,
skipWaitForDeleteTimeoutSeconds: options.SkipWaitForDeleteTimeoutSeconds,
logger: options.Logger,
}

_, err = waitForDelete(params)
if err == nil {
returnCh <- nil
} else {
Expand Down Expand Up @@ -486,7 +532,20 @@ func deletePods(client typedcorev1.CoreV1Interface, pods []corev1.Pod, options *
return err
}
}
_, err := waitForDelete(pods, 1*time.Second, globalTimeout, false, options.Logger, getPodFn)

params := waitForDeleteParams{
ctx: options.getContext(),
pods: pods,
interval: 1 * time.Second,
timeout: time.Duration(math.MaxInt64),
usingEviction: false,
getPodFn: getPodFn,
globalTimeout: globalTimeout,
skipWaitForDeleteTimeoutSeconds: options.SkipWaitForDeleteTimeoutSeconds,
logger: options.Logger,
}

_, err := waitForDelete(params)
return err
}

Expand All @@ -502,36 +561,53 @@ func buildDeleteOptions(pod *corev1.Pod, gracePeriodSeconds int) *metav1.DeleteO
return deleteOptions
}

func waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, logger golog.Logger, getPodFn func(string, string) (*corev1.Pod, error)) ([]corev1.Pod, error) {
func waitForDelete(params waitForDeleteParams) ([]corev1.Pod, error) {
pods := params.pods

var verbStr string
if usingEviction {
if params.usingEviction {
verbStr = "evicted"
} else {
verbStr = "deleted"
}

err := wait.PollImmediate(interval, timeout, func() (bool, error) {
err := wait.PollImmediate(params.interval, params.timeout, func() (bool, error) {
pendingPods := []corev1.Pod{}
for i, pod := range pods {
p, err := getPodFn(pod.Namespace, pod.Name)
p, err := params.getPodFn(pod.Namespace, pod.Name)
if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
logf(logger, "pod %q removed (%s)", pod.Name, verbStr)
logf(params.logger, "pod %q removed (%s)", pod.Name, verbStr)
continue
} else if err != nil {
return false, err
} else {
if shouldSkipPod(*p, params.skipWaitForDeleteTimeoutSeconds) {
logf(params.logger, podSkipMsgTemplate, pod.Name, params.skipWaitForDeleteTimeoutSeconds)
continue
}
pendingPods = append(pendingPods, pods[i])
}
}
pods = pendingPods
if len(pendingPods) > 0 {
return false, nil
select {
case <-params.ctx.Done():
return false, fmt.Errorf("global timeout reached: %v", params.globalTimeout)
default:
return false, nil
}
}
return true, nil
})
return pods, err
}

func shouldSkipPod(pod corev1.Pod, skipDeletedTimeoutSeconds int) bool {
return skipDeletedTimeoutSeconds > 0 &&
!pod.ObjectMeta.DeletionTimestamp.IsZero() &&
int(time.Now().Sub(pod.ObjectMeta.GetDeletionTimestamp().Time).Seconds()) > skipDeletedTimeoutSeconds
}

// SupportEviction uses Discovery API to find out if the server
// supports the eviction subresource. If supported, it will return
// its groupVersion; otherwise it will return an empty string.
Expand Down Expand Up @@ -604,3 +680,10 @@ func logf(logger golog.Logger, format string, v ...interface{}) {
logger.Logf(format, v...)
}
}

func (o *DrainOptions) getContext() context.Context {
if o.Ctx != nil {
return o.Ctx
}
return context.Background()
}
34 changes: 33 additions & 1 deletion pkg/drain/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package drain

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"regexp"
"strconv"
Expand Down Expand Up @@ -694,12 +696,42 @@ func TestDeletePods(t *testing.T) {
return nil, errors.New("This is a random error for testing")
},
},
{
description: "Skip Deleted Pod",
interval: 200 * time.Millisecond,
timeout: 3 * time.Second,
expectPendingPods: false,
expectError: false,
expectedError: nil,
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
oldPodMap, _ := createPods(false)
if oldPod, found := oldPodMap[name]; found {
dTime := &metav1.Time{Time: time.Now().Add(time.Duration(100) * time.Second * -1)}
oldPod.ObjectMeta.SetDeletionTimestamp(dTime)
return &oldPod, nil
}
return nil, fmt.Errorf("%q: not found", name)
},
},
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
_, pods := createPods(false)
pendingPods, err := waitForDelete(pods, test.interval, test.timeout, false, nil, test.getPodFn)
ctx := context.Background()

params := waitForDeleteParams{
ctx: ctx,
pods: pods,
interval: test.interval,
timeout: test.timeout,
usingEviction: false,
getPodFn: test.getPodFn,
globalTimeout: time.Duration(math.MaxInt64),
skipWaitForDeleteTimeoutSeconds: 10,
}

pendingPods, err := waitForDelete(params)

if test.expectError {
if err == nil {
Expand Down

0 comments on commit 1546dd8

Please sign in to comment.