Skip to content
This repository was archived by the owner on Nov 8, 2022. It is now read-only.

Commit 1b47aa4

Browse files
committed
Introduced tests for hanging pods in kubernetes
Implemented test checking if removing unwanted pods in k8s works. Reworked the code responsible for the removal of the pods. Signed-off-by: Maciej Patelczyk <maciej.patelczyk@intel.com>
1 parent 997858b commit 1b47aa4

File tree

5 files changed

+223
-58
lines changed

5 files changed

+223
-58
lines changed

integration_tests/pkg/kubernetes/kubernetes_test.go

+93
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ import (
2121
"regexp"
2222
"testing"
2323
"time"
24+
"flag"
2425

2526
"github.com/Sirupsen/logrus"
2627
"github.com/intelsdi-x/swan/integration_tests/test_helpers"
28+
"github.com/intelsdi-x/swan/pkg/conf"
2729
"github.com/intelsdi-x/swan/pkg/executor"
2830
"github.com/intelsdi-x/swan/pkg/kubernetes"
2931
. "github.com/smartystreets/goconvey/convey"
@@ -115,3 +117,94 @@ func TestLocalKubernetesPodExecution(t *testing.T) {
115117
})
116118
})
117119
}
120+
121+
// Please see `pkg/kubernetes/README.md` for prerequisites for this test.
122+
func TestLocalKubernetesPodBrokenExecution(t *testing.T) {
123+
124+
flag.Set("kubernetes_cluster_clean_left_pods_on_startup", "true")
125+
126+
conf.ParseFlags()
127+
128+
podName := "swan-testpod"
129+
hyperkubeBinPath := testhelpers.AssertFileExists("hyperkube")
130+
131+
logrus.SetLevel(logrus.ErrorLevel)
132+
Convey("While having local executor", t, func() {
133+
134+
local := executor.NewLocal()
135+
136+
Convey("We are able to launch kubernetes cluster on one node", func() {
137+
config := kubernetes.DefaultConfig()
138+
139+
kubernetesAddress := fmt.Sprintf("http://127.0.0.1:%d", config.KubeAPIPort)
140+
141+
k8sLauncher := kubernetes.New(local, local, config)
142+
So(k8sLauncher, ShouldNotBeNil)
143+
144+
k8sHandle, err := k8sLauncher.Launch()
145+
So(err, ShouldBeNil)
146+
147+
defer executor.StopAndEraseOutput(k8sHandle)
148+
149+
Convey("And kubectl shows that local host is in Ready state", func() {
150+
terminated, err := k8sHandle.Wait(100 * time.Millisecond)
151+
So(err, ShouldBeNil)
152+
So(terminated, ShouldBeFalse)
153+
154+
output, err := exec.Command(hyperkubeBinPath, "kubectl", "-s", kubernetesAddress, "get", "nodes").Output()
155+
So(err, ShouldBeNil)
156+
157+
host, err := os.Hostname()
158+
So(err, ShouldBeNil)
159+
160+
re, err := regexp.Compile(fmt.Sprintf("%s.*?Ready", host))
161+
So(err, ShouldBeNil)
162+
163+
match := re.Find(output)
164+
So(match, ShouldNotBeNil)
165+
166+
Convey("And we are able to create pod", func() {
167+
output, err := exec.Command(hyperkubeBinPath, "kubectl", "-s", kubernetesAddress, "run", podName, "--image=intelsdi/swan", "--restart=Never", "--", "sleep", "inf").Output()
168+
So(err, ShouldBeNil)
169+
170+
re, err := regexp.Compile("created")
171+
So(err, ShouldBeNil)
172+
match := re.Find(output)
173+
So(match, ShouldNotBeNil)
174+
175+
time.Sleep(10 * time.Second)
176+
177+
Convey("If kubernetes is stopped pod shall be alive", func() {
178+
_ = executor.StopAndEraseOutput(k8sHandle)
179+
180+
output, err := exec.Command("sudo", "docker", "ps").Output()
181+
So(err, ShouldBeNil)
182+
So(string(output), ShouldContainSubstring, podName)
183+
184+
Convey("After starting again kubernetes old pod shall be removed", func() {
185+
k8sHandle, err = k8sLauncher.Launch()
186+
So(err, ShouldBeNil)
187+
188+
output, err := exec.Command("sudo", "docker", "ps").Output()
189+
So(err, ShouldBeNil)
190+
So(string(output), ShouldNotContainSubstring, podName)
191+
192+
Convey("kubernetes launcher shall start pod without error", func() {
193+
config := executor.DefaultKubernetesConfig()
194+
config.Address = kubernetesAddress
195+
config.ContainerImage = "intelsdi/swan"
196+
config.PodName = podName
197+
k8sExecutor, err := executor.NewKubernetes(config)
198+
So(err, ShouldBeNil)
199+
200+
podHandle1, err := k8sExecutor.Execute("sleep inf")
201+
So(err, ShouldBeNil)
202+
defer executor.StopAndEraseOutput(podHandle1)
203+
})
204+
})
205+
})
206+
})
207+
})
208+
})
209+
})
210+
}

pkg/kubernetes/kubernetes.go

+39-41
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ type k8s struct {
135135
master executor.Executor
136136
minion executor.Executor // Current single minion is strictly connected with getReadyNodes() function and expectedKubeletNodesCount const.
137137
config Config
138-
client *kubernetes.Clientset
138+
139+
k8sPodAPI // Private interface
139140

140141
isListening netutil.IsListeningFunction // For mocking purposes.
141142
getReadyNodes getReadyNodesFunc // For mocking purposes.
@@ -147,66 +148,39 @@ type k8s struct {
147148
// In case of the same executor they will be on the same host (high risk of interferences).
148149
// NOTE: Currently we support only single-kubelet (single-minion) kubernetes.
149150
func New(master executor.Executor, minion executor.Executor, config Config) executor.Launcher {
150-
client, err := kubernetes.NewForConfig(
151-
&rest.Config{
152-
Host: config.KubeAPIAddr,
153-
},
154-
)
155-
if err != nil {
156-
panic(err)
157-
}
158151

159-
return k8s{
152+
return &k8s{
160153
master: master,
161154
minion: minion,
162155
config: config,
163-
client: client,
156+
k8sPodAPI: newK8sPodAPI(config),
164157
isListening: netutil.IsListening,
165158
getReadyNodes: getReadyNodes,
166159
}
167160
}
168161

169162
// Name returns human readable name for job.
170-
func (m k8s) Name() string {
163+
func (m *k8s) Name() string {
171164
return "Kubernetes [single-kubelet]"
172165
}
173166

174167
// Launch starts the kubernetes cluster. It returns a cluster
175168
// represented as a Task Handle instance.
176169
// Error is returned when Launcher is unable to start a cluster.
177-
func (m k8s) Launch() (handle executor.TaskHandle, err error) {
170+
func (m *k8s) Launch() (handle executor.TaskHandle, err error) {
178171
for retry := uint64(0); retry <= m.config.RetryCount; retry++ {
179172
handle, err = m.tryLaunchCluster()
180173
if err != nil {
181174
log.Warningf("could not launch Kubernetes cluster: %q. Retry number: %d", err.Error(), retry)
182175
continue
183176
}
184-
185177
return handle, nil
186178
}
187-
188-
pods, err := m.getPodsFromNode(m.kubeletHost)
189-
if err != nil {
190-
log.Warnf("Could not check if there are dangling nodes on Kubelet: %s", err)
191-
} else {
192-
if len(pods) != 0 && kubeCleanLeftPods.Value() == false {
193-
log.Warnf("Kubelet on node %q has %d dangling nodes. Use `kubectl` to delete them or set %q flag to let Swan remove them", m.kubeletHost, len(pods), kubeCleanLeftPods.Name)
194-
} else if len(pods) != 0 && kubeCleanLeftPods.Value() == true {
195-
log.Infof("Kubelet on node %q has %d dangling nodes. Attempt to clean them", m.kubeletHost, len(pods))
196-
err = m.cleanNode(m.kubeletHost, pods)
197-
if err != nil {
198-
log.Errorf("Could not clean dangling pods: %s", err)
199-
} else {
200-
log.Infof("Dangling pods on node %q has been deleted", m.kubeletHost)
201-
}
202-
}
203-
}
204-
205179
log.Errorf("Could not launch Kubernetes cluster: %q", err.Error())
206180
return nil, err
207181
}
208182

209-
func (m k8s) tryLaunchCluster() (executor.TaskHandle, error) {
183+
func (m *k8s) tryLaunchCluster() (executor.TaskHandle, error) {
210184
handle, err := m.launchCluster()
211185
if err != nil {
212186
return nil, err
@@ -221,10 +195,34 @@ func (m k8s) tryLaunchCluster() (executor.TaskHandle, error) {
221195
}
222196
return nil, err
223197
}
198+
// Optional removal of the unwanted pods in swan's namespace
199+
pods, err := m.getPodsFromNode(m.kubeletHost)
200+
if err != nil {
201+
log.Warningf("Could not retreive list of pods from host %s. Error: %s", m.kubeletHost, err)
202+
// if getPodsFromNode returns error it means cluster is not useable. Delete it.
203+
stopErr := handle.Stop()
204+
if stopErr != nil {
205+
log.Warningf("Errors while stopping k8s cluster: %v", stopErr)
206+
}
207+
return nil, err
208+
}
209+
if len(pods) != 0 {
210+
if kubeCleanLeftPods.Value() {
211+
log.Infof("Kubelet on node %q has %d dangling pods. Attempt to clean them", m.kubeletHost, len(pods))
212+
err = m.cleanNode(m.kubeletHost, pods)
213+
if err != nil {
214+
log.Errorf("Could not clean dangling pods: %s", err)
215+
} else {
216+
log.Infof("Dangling pods on node %q has been deleted", m.kubeletHost)
217+
}
218+
} else {
219+
log.Warnf("Kubelet on node %q has %d dangling pods. Use `kubectl` to delete them or set %q flag to let Swan remove them", m.kubeletHost, len(pods), kubeCleanLeftPods.Name)
220+
}
221+
}
224222
return handle, nil
225223
}
226224

227-
func (m k8s) launchCluster() (executor.TaskHandle, error) {
225+
func (m *k8s) launchCluster() (executor.TaskHandle, error) {
228226
// Launch apiserver using master executor.
229227
kubeAPIServer := m.getKubeAPIServerCommand()
230228
apiHandle, err := m.launchService(kubeAPIServer)
@@ -283,7 +281,7 @@ func (m k8s) launchCluster() (executor.TaskHandle, error) {
283281
}
284282

285283
// launchService executes service and check if it is listening on it's endpoint.
286-
func (m k8s) launchService(command kubeCommand) (executor.TaskHandle, error) {
284+
func (m *k8s) launchService(command kubeCommand) (executor.TaskHandle, error) {
287285
handle, err := command.exec.Execute(command.raw)
288286
if err != nil {
289287
return nil, errors.Wrapf(err, "execution of command %q on %q failed", command.raw, command.exec.Name())
@@ -303,7 +301,7 @@ func (m k8s) launchService(command kubeCommand) (executor.TaskHandle, error) {
303301
}
304302

305303
// getKubeAPIServerCommand returns command for apiserver.
306-
func (m k8s) getKubeAPIServerCommand() kubeCommand {
304+
func (m *k8s) getKubeAPIServerCommand() kubeCommand {
307305
return kubeCommand{m.master,
308306
fmt.Sprint(
309307
fmt.Sprintf("hyperkube apiserver"),
@@ -323,7 +321,7 @@ func (m k8s) getKubeAPIServerCommand() kubeCommand {
323321
}
324322

325323
// getKubeControllerCommand returns command for controller-manager.
326-
func (m k8s) getKubeControllerCommand() kubeCommand {
324+
func (m *k8s) getKubeControllerCommand() kubeCommand {
327325
return kubeCommand{m.master,
328326
fmt.Sprint(
329327
fmt.Sprintf("hyperkube controller-manager"),
@@ -335,7 +333,7 @@ func (m k8s) getKubeControllerCommand() kubeCommand {
335333
}
336334

337335
// getKubeSchedulerCommand returns command for scheduler.
338-
func (m k8s) getKubeSchedulerCommand() kubeCommand {
336+
func (m *k8s) getKubeSchedulerCommand() kubeCommand {
339337
return kubeCommand{m.master,
340338
fmt.Sprint(
341339
fmt.Sprintf("hyperkube scheduler"),
@@ -347,7 +345,7 @@ func (m k8s) getKubeSchedulerCommand() kubeCommand {
347345
}
348346

349347
// getKubeletCommand returns command for kubelet.
350-
func (m k8s) getKubeletCommand() kubeCommand {
348+
func (m *k8s) getKubeletCommand() kubeCommand {
351349
return kubeCommand{m.minion,
352350
fmt.Sprint(
353351
fmt.Sprintf("hyperkube kubelet"),
@@ -361,7 +359,7 @@ func (m k8s) getKubeletCommand() kubeCommand {
361359
}
362360

363361
// getKubeProxyCommand returns command for proxy.
364-
func (m k8s) getKubeProxyCommand() kubeCommand {
362+
func (m *k8s) getKubeProxyCommand() kubeCommand {
365363
return kubeCommand{m.minion,
366364
fmt.Sprint(
367365
fmt.Sprintf("hyperkube proxy"),
@@ -372,7 +370,7 @@ func (m k8s) getKubeProxyCommand() kubeCommand {
372370
), m.config.KubeProxyPort}
373371
}
374372

375-
func (m k8s) waitForReadyNode(apiServerAddress string) error {
373+
func (m *k8s) waitForReadyNode(apiServerAddress string) error {
376374
for idx := 0; idx < nodeCheckRetryCount; idx++ {
377375
nodes, err := m.getReadyNodes(apiServerAddress)
378376
if err != nil {

pkg/kubernetes/kubernetes_test.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ func TestKubernetesLauncher(t *testing.T) {
6868
handle := getMockedTaskHandle(outputFile)
6969

7070
// Prepare Kubernetes Launcher
71-
var k8sLauncher k8s
72-
k8sLauncher = New(master, minion, config).(k8s)
71+
var k8sLauncher *k8s
72+
k8sLauncher = New(master, minion, config).(*k8s)
7373

7474
Convey("When configuration is passed to Kubernetes Launcher", func() {
7575
handle := &mocks.TaskHandle{}
@@ -129,6 +129,13 @@ func TestKubernetesLauncher(t *testing.T) {
129129
k8sLauncher.isListening = getIsListeningFunc(true)
130130
k8sLauncher.getReadyNodes = getNodeListFunc([]v1.Node{v1.Node{}}, nil)
131131

132+
mockAPI := &mockK8sPodAPI{}
133+
134+
mockAPI.On("getPodsFromNode", mock.AnythingOfType("string")).Return(nil, nil)
135+
mockAPI.On("killPods", mock.AnythingOfType("[]v1.Pod")).Return(nil)
136+
137+
k8sLauncher.k8sPodAPI = mockAPI
138+
132139
resultHandle, err := k8sLauncher.Launch()
133140
So(err, ShouldBeNil)
134141
So(resultHandle, ShouldNotBeNil)

pkg/kubernetes/mock_k8s_pod_api.go

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package kubernetes
2+
3+
import mock "github.com/stretchr/testify/mock"
4+
import v1 "k8s.io/client-go/pkg/api/v1"
5+
6+
// mockK8sPodAPI is an autogenerated mock type for the k8sPodAPI type
7+
type mockK8sPodAPI struct {
8+
mock.Mock
9+
}
10+
11+
// getPodsFromNode provides a mock function with given fields: nodeName
12+
func (_m *mockK8sPodAPI) getPodsFromNode(nodeName string) ([]v1.Pod, error) {
13+
ret := _m.Called(nodeName)
14+
15+
var r0 []v1.Pod
16+
if rf, ok := ret.Get(0).(func(string) []v1.Pod); ok {
17+
r0 = rf(nodeName)
18+
} else {
19+
if ret.Get(0) != nil {
20+
r0 = ret.Get(0).([]v1.Pod)
21+
}
22+
}
23+
24+
var r1 error
25+
if rf, ok := ret.Get(1).(func(string) error); ok {
26+
r1 = rf(nodeName)
27+
} else {
28+
r1 = ret.Error(1)
29+
}
30+
31+
return r0, r1
32+
}
33+
34+
// killPods provides a mock function with given fields: pods
35+
func (_m *mockK8sPodAPI) killPods(pods []v1.Pod) error {
36+
ret := _m.Called(pods)
37+
38+
var r0 error
39+
if rf, ok := ret.Get(0).(func([]v1.Pod) error); ok {
40+
r0 = rf(pods)
41+
} else {
42+
r0 = ret.Error(0)
43+
}
44+
45+
return r0
46+
}
47+
48+
var _ k8sPodAPI = (*mockK8sPodAPI)(nil)

0 commit comments

Comments
 (0)