Skip to content
This repository was archived by the owner on Nov 8, 2022. It is now read-only.

Clean left pods on kubernetes cluster startup #675

Merged
merged 3 commits into from
May 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions integration_tests/pkg/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"regexp"
"testing"
"time"
"flag"

"github.com/Sirupsen/logrus"
"github.com/intelsdi-x/swan/integration_tests/test_helpers"
"github.com/intelsdi-x/swan/pkg/conf"
"github.com/intelsdi-x/swan/pkg/executor"
"github.com/intelsdi-x/swan/pkg/kubernetes"
. "github.com/smartystreets/goconvey/convey"
Expand Down Expand Up @@ -115,3 +117,94 @@ func TestLocalKubernetesPodExecution(t *testing.T) {
})
})
}

// Please see `pkg/kubernetes/README.md` for prerequisites for this test.
func TestLocalKubernetesPodBrokenExecution(t *testing.T) {

flag.Set("kubernetes_cluster_clean_left_pods_on_startup", "true")

conf.ParseFlags()

podName := "swan-testpod"
hyperkubeBinPath := testhelpers.AssertFileExists("hyperkube")

logrus.SetLevel(logrus.ErrorLevel)
Convey("While having local executor", t, func() {

local := executor.NewLocal()

Convey("We are able to launch kubernetes cluster on one node", func() {
config := kubernetes.DefaultConfig()

kubernetesAddress := fmt.Sprintf("http://127.0.0.1:%d", config.KubeAPIPort)

k8sLauncher := kubernetes.New(local, local, config)
So(k8sLauncher, ShouldNotBeNil)

k8sHandle, err := k8sLauncher.Launch()
So(err, ShouldBeNil)

defer executor.StopAndEraseOutput(k8sHandle)

Convey("And kubectl shows that local host is in Ready state", func() {
terminated, err := k8sHandle.Wait(100 * time.Millisecond)
So(err, ShouldBeNil)
So(terminated, ShouldBeFalse)

output, err := exec.Command(hyperkubeBinPath, "kubectl", "-s", kubernetesAddress, "get", "nodes").Output()
So(err, ShouldBeNil)

host, err := os.Hostname()
So(err, ShouldBeNil)

re, err := regexp.Compile(fmt.Sprintf("%s.*?Ready", host))
So(err, ShouldBeNil)

match := re.Find(output)
So(match, ShouldNotBeNil)

Convey("And we are able to create pod", func() {
output, err := exec.Command(hyperkubeBinPath, "kubectl", "-s", kubernetesAddress, "run", podName, "--image=intelsdi/swan", "--restart=Never", "--", "sleep", "inf").Output()
So(err, ShouldBeNil)

re, err := regexp.Compile("created")
So(err, ShouldBeNil)
match := re.Find(output)
So(match, ShouldNotBeNil)

time.Sleep(10 * time.Second)

Convey("If kubernetes is stopped pod shall be alive", func() {
_ = executor.StopAndEraseOutput(k8sHandle)

output, err := exec.Command("sudo", "docker", "ps").Output()
So(err, ShouldBeNil)
So(string(output), ShouldContainSubstring, podName)

Convey("After starting again kubernetes old pod shall be removed", func() {
k8sHandle, err = k8sLauncher.Launch()
So(err, ShouldBeNil)

output, err := exec.Command("sudo", "docker", "ps").Output()
So(err, ShouldBeNil)
So(string(output), ShouldNotContainSubstring, podName)

Convey("kubernetes launcher shall start pod without error", func() {
config := executor.DefaultKubernetesConfig()
config.Address = kubernetesAddress
config.ContainerImage = "intelsdi/swan"
config.PodName = podName
k8sExecutor, err := executor.NewKubernetes(config)
So(err, ShouldBeNil)

podHandle1, err := k8sExecutor.Execute("sleep inf")
So(err, ShouldBeNil)
defer executor.StopAndEraseOutput(podHandle1)
})
})
})
})
})
})
})
}
71 changes: 53 additions & 18 deletions pkg/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var (

//KubernetesMasterFlag indicates where Kubernetes control plane will be launched.
KubernetesMasterFlag = conf.NewStringFlag("kubernetes_cluster_run_control_plane_on_host", "Address of a host where Kubernetes control plane will be run (when using -kubernetes and not connecting to existing cluster).", "127.0.0.1")

kubeCleanLeftPods = conf.NewBoolFlag("kubernetes_cluster_clean_left_pods_on_startup", "Delete all pods which are detected during cluster startup. Usefull after dirty shutdown when some pods may not be properly deleted.", false)
)

type kubeCommand struct {
Expand All @@ -69,6 +71,9 @@ type Config struct {
// Address range to use for services.
ServiceAddresses string

// Optional configuration option for cleaning
KubeletHost string

// Custom args to apiserver and kubelet.
KubeAPIArgs string
KubeControllerArgs string
Expand All @@ -84,7 +89,7 @@ type Config struct {
func DefaultConfig() Config {
return Config{
EtcdServers: kubeEtcdServersFlag.Value(),
EtcdPrefix: "/registry",
EtcdPrefix: "/swan",
LogLevel: 0,
AllowPrivileged: true,
KubeAPIAddr: KubernetesMasterFlag.Value(), // TODO(skonefal): This should not be part of config.
Expand Down Expand Up @@ -127,50 +132,55 @@ func UniqueConfig() Config {
type getReadyNodesFunc func(k8sAPIAddress string) ([]v1.Node, error)

type k8s struct {
master executor.Executor
minion executor.Executor // Current single minion is strictly connected with getReadyNodes() function and expectedKubeletNodesCount const.
config Config
master executor.Executor
minion executor.Executor // Current single minion is strictly connected with getReadyNodes() function and expectedKubeletNodesCount const.
config Config

k8sPodAPI // Private interface

isListening netutil.IsListeningFunction // For mocking purposes.
getReadyNodes getReadyNodesFunc // For mocking purposes.

kubeletHost string // Filled by Kubelet TaskHandle
}

// New returns a new Kubernetes launcher instance consists of one master and one minion.
// In case of the same executor they will be on the same host (high risk of interferences).
// NOTE: Currently we support only single-kubelet (single-minion) kubernetes.
func New(master executor.Executor, minion executor.Executor, config Config) executor.Launcher {
return k8s{

return &k8s{
master: master,
minion: minion,
config: config,
k8sPodAPI: newK8sPodAPI(config),
isListening: netutil.IsListening,
getReadyNodes: getReadyNodes,
}
}

// Name returns human readable name for job.
func (m k8s) Name() string {
func (m *k8s) Name() string {
return "Kubernetes [single-kubelet]"
}

// Launch starts the kubernetes cluster. It returns a cluster
// represented as a Task Handle instance.
// Error is returned when Launcher is unable to start a cluster.
func (m k8s) Launch() (handle executor.TaskHandle, err error) {
func (m *k8s) Launch() (handle executor.TaskHandle, err error) {
for retry := uint64(0); retry <= m.config.RetryCount; retry++ {
handle, err = m.tryLaunchCluster()
if err != nil {
log.Warningf("could not launch Kubernetes cluster: %q. Retry number: %d", err.Error(), retry)
continue
}

return handle, nil
}

log.Errorf("Could not launch Kubernetes cluster: %q", err.Error())
return nil, err
}

func (m k8s) tryLaunchCluster() (executor.TaskHandle, error) {
func (m *k8s) tryLaunchCluster() (executor.TaskHandle, error) {
handle, err := m.launchCluster()
if err != nil {
return nil, err
Expand All @@ -185,10 +195,34 @@ func (m k8s) tryLaunchCluster() (executor.TaskHandle, error) {
}
return nil, err
}
// Optional removal of the unwanted pods in swan's namespace
pods, err := m.getPodsFromNode(m.kubeletHost)
if err != nil {
log.Warningf("Could not retreive list of pods from host %s. Error: %s", m.kubeletHost, err)
// if getPodsFromNode returns error it means cluster is not useable. Delete it.
stopErr := handle.Stop()
if stopErr != nil {
log.Warningf("Errors while stopping k8s cluster: %v", stopErr)
}
return nil, err
}
if len(pods) != 0 {
if kubeCleanLeftPods.Value() {
log.Infof("Kubelet on node %q has %d dangling pods. Attempt to clean them", m.kubeletHost, len(pods))
err = m.cleanNode(m.kubeletHost, pods)
if err != nil {
log.Errorf("Could not clean dangling pods: %s", err)
} else {
log.Infof("Dangling pods on node %q has been deleted", m.kubeletHost)
}
} else {
log.Warnf("Kubelet on node %q has %d dangling pods. Use `kubectl` to delete them or set %q flag to let Swan remove them", m.kubeletHost, len(pods), kubeCleanLeftPods.Name)
}
}
return handle, nil
}

func (m k8s) launchCluster() (executor.TaskHandle, error) {
func (m *k8s) launchCluster() (executor.TaskHandle, error) {
// Launch apiserver using master executor.
kubeAPIServer := m.getKubeAPIServerCommand()
apiHandle, err := m.launchService(kubeAPIServer)
Expand Down Expand Up @@ -241,12 +275,13 @@ func (m k8s) launchCluster() (executor.TaskHandle, error) {
return nil, errors.Wrap(errCol.GetErrIfAny(), "cannot launch kubelet using minion executor")
}
clusterTaskHandle.AddAgent(kubeletHandle)
m.kubeletHost = kubeletHandle.Address()

return clusterTaskHandle, err
}

// launchService executes service and check if it is listening on it's endpoint.
func (m k8s) launchService(command kubeCommand) (executor.TaskHandle, error) {
func (m *k8s) launchService(command kubeCommand) (executor.TaskHandle, error) {
handle, err := command.exec.Execute(command.raw)
if err != nil {
return nil, errors.Wrapf(err, "execution of command %q on %q failed", command.raw, command.exec.Name())
Expand All @@ -266,7 +301,7 @@ func (m k8s) launchService(command kubeCommand) (executor.TaskHandle, error) {
}

// getKubeAPIServerCommand returns command for apiserver.
func (m k8s) getKubeAPIServerCommand() kubeCommand {
func (m *k8s) getKubeAPIServerCommand() kubeCommand {
return kubeCommand{m.master,
fmt.Sprint(
fmt.Sprintf("hyperkube apiserver"),
Expand All @@ -286,7 +321,7 @@ func (m k8s) getKubeAPIServerCommand() kubeCommand {
}

// getKubeControllerCommand returns command for controller-manager.
func (m k8s) getKubeControllerCommand() kubeCommand {
func (m *k8s) getKubeControllerCommand() kubeCommand {
return kubeCommand{m.master,
fmt.Sprint(
fmt.Sprintf("hyperkube controller-manager"),
Expand All @@ -298,7 +333,7 @@ func (m k8s) getKubeControllerCommand() kubeCommand {
}

// getKubeSchedulerCommand returns command for scheduler.
func (m k8s) getKubeSchedulerCommand() kubeCommand {
func (m *k8s) getKubeSchedulerCommand() kubeCommand {
return kubeCommand{m.master,
fmt.Sprint(
fmt.Sprintf("hyperkube scheduler"),
Expand All @@ -310,7 +345,7 @@ func (m k8s) getKubeSchedulerCommand() kubeCommand {
}

// getKubeletCommand returns command for kubelet.
func (m k8s) getKubeletCommand() kubeCommand {
func (m *k8s) getKubeletCommand() kubeCommand {
return kubeCommand{m.minion,
fmt.Sprint(
fmt.Sprintf("hyperkube kubelet"),
Expand All @@ -324,7 +359,7 @@ func (m k8s) getKubeletCommand() kubeCommand {
}

// getKubeProxyCommand returns command for proxy.
func (m k8s) getKubeProxyCommand() kubeCommand {
func (m *k8s) getKubeProxyCommand() kubeCommand {
return kubeCommand{m.minion,
fmt.Sprint(
fmt.Sprintf("hyperkube proxy"),
Expand All @@ -335,7 +370,7 @@ func (m k8s) getKubeProxyCommand() kubeCommand {
), m.config.KubeProxyPort}
}

func (m k8s) waitForReadyNode(apiServerAddress string) error {
func (m *k8s) waitForReadyNode(apiServerAddress string) error {
for idx := 0; idx < nodeCheckRetryCount; idx++ {
nodes, err := m.getReadyNodes(apiServerAddress)
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions pkg/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func TestKubernetesLauncher(t *testing.T) {
handle := getMockedTaskHandle(outputFile)

// Prepare Kubernetes Launcher
var k8sLauncher k8s
k8sLauncher = New(master, minion, config).(k8s)
var k8sLauncher *k8s
k8sLauncher = New(master, minion, config).(*k8s)

Convey("When configuration is passed to Kubernetes Launcher", func() {
handle := &mocks.TaskHandle{}
Expand Down Expand Up @@ -129,6 +129,13 @@ func TestKubernetesLauncher(t *testing.T) {
k8sLauncher.isListening = getIsListeningFunc(true)
k8sLauncher.getReadyNodes = getNodeListFunc([]v1.Node{v1.Node{}}, nil)

mockAPI := &mockK8sPodAPI{}

mockAPI.On("getPodsFromNode", mock.AnythingOfType("string")).Return(nil, nil)
mockAPI.On("killPods", mock.AnythingOfType("[]v1.Pod")).Return(nil)

k8sLauncher.k8sPodAPI = mockAPI

resultHandle, err := k8sLauncher.Launch()
So(err, ShouldBeNil)
So(resultHandle, ShouldNotBeNil)
Expand Down
48 changes: 48 additions & 0 deletions pkg/kubernetes/mock_k8s_pod_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package kubernetes

import mock "github.com/stretchr/testify/mock"
import v1 "k8s.io/client-go/pkg/api/v1"

// mockK8sPodAPI is an autogenerated mock type for the k8sPodAPI type
type mockK8sPodAPI struct {
mock.Mock
}

// getPodsFromNode provides a mock function with given fields: nodeName
func (_m *mockK8sPodAPI) getPodsFromNode(nodeName string) ([]v1.Pod, error) {
ret := _m.Called(nodeName)

var r0 []v1.Pod
if rf, ok := ret.Get(0).(func(string) []v1.Pod); ok {
r0 = rf(nodeName)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]v1.Pod)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(nodeName)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// killPods provides a mock function with given fields: pods
func (_m *mockK8sPodAPI) killPods(pods []v1.Pod) error {
ret := _m.Called(pods)

var r0 error
if rf, ok := ret.Get(0).(func([]v1.Pod) error); ok {
r0 = rf(pods)
} else {
r0 = ret.Error(0)
}

return r0
}

var _ k8sPodAPI = (*mockK8sPodAPI)(nil)
Loading