Skip to content

Commit b55b1f7

Browse files
dvaseekaractrlaltlucamurarudobrerazvancristianpetrache
authored
End to end testing with KRaft cluster (#92)
* Fix flaky test by deleting nodeports explicitly (#67) * Upgrade Kafka to 3.6.0 (#69) * Upgrade dependencies * Fix wrong port on expectEnvoyWithConfigAz2Tls test (#70) * Upgrade Kafka to 3.6.1 (#71) Co-authored-by: Petruț™ <cpetrache@adobe.com> * Upgrade Kafka image to use Java v21 (#72) * Added arm64 to docker build platforms (#73) * Added arm64 to docker build platforms * Regenerated headers for 2024 * Upgrading Kafka to 3.7.0 (#77) * Update codeql-analysis.yml (#78) * [INTERNAL] Create uniq leader ID per operator deployment (#76) * [INTERNAL] Get watched namespaces from env variable (#75) (cherry picked from commit de6500b) * [CORE-106517] Fix outdated config in the sample (#83) * Cross-compile koperator for arm and intel. (#84) * Adding Contour Ingress support (#82) * Allow property security-inter-broker-protocol (#85) * adding the ability to use security-inter-broker-protocol in koperator * updating util.go to remove _ for generated names * adding replace all for external listener port name * fixing other places where externallistener name is used to not have _ * adding an alternative way to identify which port to use for kafka administration and cc connection * taking out comments for pr push * fixing kafka crd * setting omitempty so it will not be required * adding generated crds * adding comments with context for new flag UsedForKafkaAdminCommunication * Use getBrokerReadOnlyConfig function to get properties and update unit test - security_inter_broker_protocol_Set * Update crds to match generated manifest --------- Co-authored-by: Cameron Wright <red82277@adobe.com> Co-authored-by: Ha Van <red83362@adobe.com> * Revert "Allow concurrent broker restarts from same AZ (broker rack) (#62)" This reverts commit 514fa07. * Fixed build issues * Fix TestGenerateBrokerConfig * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added watch namesapces * Added tmate for debugging * Added tmate for debugging * Added tmate for debugging * Added enabled projectcontour helm install * Enabled cloud-provider-kind * Added ProjectContour cluster role * updated certificate name * updated certificate name * Run without SSL * Removing Project Contour * Adding cloud-provider-kind * Removing cloud-provider - manually adding during test * trigger test * Remove SnpshotClusterAndCompare * Increased log length for Snapshot and Compare * Re-Add Snapshot and compare * Increased log length for Snapshot and Compare * Increased log length even more * Add Uninstall Contour CRDs * Re-Add KafkaCluster_SSL Tests * Removing BanzaiCloud Helm Chart from list of repos * pushing up latest go.sum * Clean up Merge * Enabling Tmate to debug e2e Test * Revert Cert Changes * Revert "Revert Cert Changes" This reverts commit 5c5b19c. * Enable sslClientAuth * trigger test * WIP: Fix Listener Config * Clean up test case results - tc-1 * Clean up test case results - tc-2 * Updated Kraft Test Cases * Cleanup Linting Issues * Remove Tmate Debugger * Run Kraft CLuster E2E * Increate Timeout to allow pod termination * Trigger Test * Added Debugger * Fix App Labels for Controllers * Revert image upate * Revert "Fix App Labels for Controllers" This reverts commit a3cf8a5. * Include Broker/Controller Labels for Headless SVC Selector * Logic for controller listener * add controller service * Added Headless-Controller-SVC Labels * Fix controller addresses and labels for brokers * Empty commit to trigger e2e * Set up kafka-3 as controller only for troubleshooting * Empty commit to trigger e2e * Use controller address for JMXTemplate * Update uninstall timeout to 600s * fix lint * fix lint * fix lint * Enable TMate Debugger * Trigger E2E * Updated BrokerIdLabelkey * Updated BrokerIdLabelkey * Check for Kraft mode when setting the controller listener * Check for Kraft mode when setting the controller listener * Disable tmate from e2e test * adding //nolint:unparam to testProduceConsumeInternal func now that it is used twice * moving the //nolint:unparam to the none-ssl version * Fixed BrokerLabel Test * Add additional test cases for TestGetBrokerLabels * Add additional test cases for TestGetBrokerLabels * commenting out broker-1 in test to fix kraft test * adding conditional to check if kraft mode is enabled before selecting which expected results in test --------- Co-authored-by: ctrlaltluc <96051211+ctrlaltluc@users.noreply.github.com> Co-authored-by: Adi Muraru <amuraru@adobe.com> Co-authored-by: Razvan Dobre <dobre@adobe.com> Co-authored-by: Cristian-Petrut Petrache <cristianpetrache@gmail.com> Co-authored-by: Petruț™ <cpetrache@adobe.com> Co-authored-by: Adrian Muraru <adi.muraru@gmail.com> Co-authored-by: Adrian <1664229+azun@users.noreply.github.com> Co-authored-by: aguzovatii <guzovatii.anatolii@gmail.com> Co-authored-by: cawright-rh <cawright@redhat.com> Co-authored-by: Cameron Wright <red82277@adobe.com> Co-authored-by: Ha Van <red83362@adobe.com> Co-authored-by: Daniel Vaseekaran <red10447@adobe.com> Co-authored-by: Ha Van <168012087+musubi7726@users.noreply.github.com>
1 parent 15f6634 commit b55b1f7

16 files changed

+338
-71
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ test: generate fmt vet bin/setup-envtest
102102
test-e2e:
103103
IMG_E2E=${IMG_E2E} go test github.com/banzaicloud/koperator/tests/e2e \
104104
-v \
105-
-timeout 20m \
105+
-timeout 45m \
106106
-tags e2e \
107107
--ginkgo.show-node-events \
108108
--ginkgo.trace \

api/util/util.go

+8
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ func LabelsForKafka(name string) map[string]string {
4040
return map[string]string{"app": "kafka", "kafka_cr": name}
4141
}
4242

43+
func LabelsForBroker(name string) map[string]string {
44+
return map[string]string{"isBrokerNode": "true", "app": "kafka", "kafka_cr": name}
45+
}
46+
47+
func LabelsForController(name string) map[string]string {
48+
return map[string]string{"isControllerNode": "true", "app": "kafka", "kafka_cr": name}
49+
}
50+
4351
// StringSliceContains returns true if list contains s
4452
func StringSliceContains(list []string, s string) bool {
4553
for _, v := range list {

api/v1beta1/kafkacluster_types.go

+16-5
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ const (
4545
// ProcessRolesKey is used to identify which process roles the Kafka pod has
4646
ProcessRolesKey = "processRoles"
4747

48+
// IsBrokerNodeKey is used to identify if the kafka pod is either a broker or a broker_controller
49+
IsBrokerNodeKey = "isBrokerNode"
50+
51+
// IsControllerNodeKey is used to identify if the kafka pod is a controller or broker_controller
52+
IsControllerNodeKey = "isControllerNode"
53+
4854
// DefaultCruiseControlImage is the default CC image used when users don't specify it in CruiseControlConfig.Image
4955
DefaultCruiseControlImage = "ghcr.io/banzaicloud/cruise-control:2.5.123"
5056

@@ -1095,14 +1101,19 @@ func (bConfig *BrokerConfig) GetBrokerAnnotations() map[string]string {
10951101
}
10961102

10971103
// GetBrokerLabels returns the labels that are applied to broker pods
1098-
func (bConfig *BrokerConfig) GetBrokerLabels(kafkaClusterName string, brokerId int32) map[string]string {
1104+
func (bConfig *BrokerConfig) GetBrokerLabels(kafkaClusterName string, brokerId int32, kRaftMode bool) map[string]string {
1105+
kraftLabels := make(map[string]string, 0)
1106+
if kRaftMode {
1107+
kraftLabels = map[string]string{
1108+
ProcessRolesKey: strings.Join(bConfig.Roles, "_"),
1109+
IsControllerNodeKey: fmt.Sprintf("%t", bConfig.IsControllerNode()),
1110+
IsBrokerNodeKey: fmt.Sprintf("%t", bConfig.IsBrokerNode()),
1111+
}
1112+
}
10991113
return util.MergeLabels(
11001114
bConfig.BrokerLabels,
11011115
util.LabelsForKafka(kafkaClusterName),
1102-
map[string]string{
1103-
BrokerIdLabelKey: fmt.Sprintf("%d", brokerId),
1104-
ProcessRolesKey: strings.Join(bConfig.Roles, "_"),
1105-
},
1116+
kraftLabels, map[string]string{BrokerIdLabelKey: fmt.Sprintf("%d", brokerId)},
11061117
)
11071118
}
11081119

api/v1beta1/kafkacluster_types_test.go

+118-6
Original file line numberDiff line numberDiff line change
@@ -436,13 +436,125 @@ func TestGetBrokerLabels(t *testing.T) {
436436

437437
expectedBrokerId = 0
438438
)
439+
testCases := []struct {
440+
testName string
441+
brokerConfig *BrokerConfig
442+
expectedLabels map[string]string
443+
kRaftMode bool
444+
}{
445+
{
446+
testName: "Labels in zookeeper mode",
447+
expectedLabels: map[string]string{
448+
AppLabelKey: expectedDefaultLabelApp,
449+
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
450+
KafkaCRLabelKey: expectedKafkaCRName,
451+
"test_label_key": "test_label_value",
452+
},
453+
brokerConfig: &BrokerConfig{
454+
Roles: nil,
455+
BrokerLabels: map[string]string{
456+
AppLabelKey: "test_app",
457+
BrokerIdLabelKey: "test_id",
458+
KafkaCRLabelKey: "test_cr_name",
459+
"test_label_key": "test_label_value",
460+
},
461+
},
462+
kRaftMode: false,
463+
},
464+
{
465+
testName: "Labels for broker in kraft mode",
466+
expectedLabels: map[string]string{
467+
AppLabelKey: expectedDefaultLabelApp,
468+
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
469+
KafkaCRLabelKey: expectedKafkaCRName,
470+
"test_label_key": "test_label_value",
471+
ProcessRolesKey: "broker",
472+
IsBrokerNodeKey: "true",
473+
IsControllerNodeKey: "false",
474+
},
475+
brokerConfig: &BrokerConfig{
476+
Roles: []string{"broker"},
477+
BrokerLabels: map[string]string{
478+
AppLabelKey: "test_app",
479+
BrokerIdLabelKey: "test_id",
480+
KafkaCRLabelKey: "test_cr_name",
481+
"test_label_key": "test_label_value",
482+
},
483+
},
484+
kRaftMode: true,
485+
},
486+
{
487+
testName: "Labels for controller in kraft mode",
488+
expectedLabels: map[string]string{
489+
AppLabelKey: expectedDefaultLabelApp,
490+
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
491+
KafkaCRLabelKey: expectedKafkaCRName,
492+
"test_label_key": "test_label_value",
493+
ProcessRolesKey: "controller",
494+
IsBrokerNodeKey: "false",
495+
IsControllerNodeKey: "true",
496+
},
497+
brokerConfig: &BrokerConfig{
498+
Roles: []string{"controller"},
499+
BrokerLabels: map[string]string{
500+
AppLabelKey: "test_app",
501+
BrokerIdLabelKey: "test_id",
502+
KafkaCRLabelKey: "test_cr_name",
503+
"test_label_key": "test_label_value",
504+
},
505+
},
506+
kRaftMode: true,
507+
},
508+
{
509+
testName: "Labels for controller/broker in kraft mode",
510+
expectedLabels: map[string]string{
511+
AppLabelKey: expectedDefaultLabelApp,
512+
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
513+
KafkaCRLabelKey: expectedKafkaCRName,
514+
"test_label_key": "test_label_value",
515+
ProcessRolesKey: "controller_broker",
516+
IsBrokerNodeKey: "true",
517+
IsControllerNodeKey: "true",
518+
},
519+
brokerConfig: &BrokerConfig{
520+
Roles: []string{"controller", "broker"},
521+
BrokerLabels: map[string]string{
522+
AppLabelKey: "test_app",
523+
BrokerIdLabelKey: "test_id",
524+
KafkaCRLabelKey: "test_cr_name",
525+
"test_label_key": "test_label_value",
526+
},
527+
},
528+
kRaftMode: true,
529+
},
530+
}
531+
532+
for _, test := range testCases {
533+
t.Run(test.testName, func(t *testing.T) {
534+
result := test.brokerConfig.GetBrokerLabels(expectedKafkaCRName, expectedBrokerId, test.kRaftMode)
535+
if !reflect.DeepEqual(result, test.expectedLabels) {
536+
t.Error("Expected:", test.expectedLabels, "Got:", result)
537+
}
538+
})
539+
}
540+
}
541+
542+
func TestGetBrokerLabelKraft(t *testing.T) {
543+
const (
544+
expectedDefaultLabelApp = "kafka"
545+
expectedKafkaCRName = "kafka"
546+
547+
expectedBrokerId = 0
548+
)
439549

440550
expected := map[string]string{
441-
AppLabelKey: expectedDefaultLabelApp,
442-
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
443-
KafkaCRLabelKey: expectedKafkaCRName,
444-
"test_label_key": "test_label_value",
445-
ProcessRolesKey: "broker",
551+
AppLabelKey: expectedDefaultLabelApp,
552+
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
553+
KafkaCRLabelKey: expectedKafkaCRName,
554+
"test_label_key": "test_label_value",
555+
ProcessRolesKey: "broker",
556+
IsBrokerNodeKey: "true",
557+
IsControllerNodeKey: "false",
446558
}
447559

448560
brokerConfig := &BrokerConfig{
@@ -455,7 +567,7 @@ func TestGetBrokerLabels(t *testing.T) {
455567
},
456568
}
457569

458-
result := brokerConfig.GetBrokerLabels(expectedKafkaCRName, expectedBrokerId)
570+
result := brokerConfig.GetBrokerLabels(expectedKafkaCRName, expectedBrokerId, true)
459571

460572
if !reflect.DeepEqual(result, expected) {
461573
t.Error("Expected:", expected, "Got:", result)

config/samples/kraft/simplekafkacluster_kraft.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ spec:
5353
brokerConfig:
5454
processRoles:
5555
- controller
56-
- broker
56+
# - broker
5757
- id: 4
5858
brokerConfigGroup: "default"
5959
brokerConfig:

controllers/tests/kafkacluster_controller_kafka_test.go

+78-36
Original file line numberDiff line numberDiff line change
@@ -525,48 +525,90 @@ func expectKafkaCRStatus(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster
525525
Expect(kafkaCluster.Status.State).To(Equal(v1beta1.KafkaClusterRunning))
526526
Expect(kafkaCluster.Status.AlertCount).To(Equal(0))
527527

528-
Expect(kafkaCluster.Status.ListenerStatuses).To(Equal(v1beta1.ListenerStatuses{
529-
InternalListeners: map[string]v1beta1.ListenerStatusList{
530-
"internal": {
531-
{
532-
Name: "any-broker",
533-
Address: fmt.Sprintf("%s-all-broker.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
534-
},
535-
{
536-
Name: "broker-0",
537-
Address: fmt.Sprintf("%s-0.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
538-
},
539-
{
540-
Name: "broker-1",
541-
Address: fmt.Sprintf("%s-1.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
542-
},
543-
{
544-
Name: "broker-2",
545-
Address: fmt.Sprintf("%s-2.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
528+
if kafkaCluster.Spec.KRaftMode == false {
529+
Expect(kafkaCluster.Status.ListenerStatuses).To(Equal(v1beta1.ListenerStatuses{
530+
InternalListeners: map[string]v1beta1.ListenerStatusList{
531+
"internal": {
532+
{
533+
Name: "any-broker",
534+
Address: fmt.Sprintf("%s-all-broker.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
535+
},
536+
{
537+
Name: "broker-0",
538+
Address: fmt.Sprintf("%s-0.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
539+
},
540+
{
541+
Name: "broker-1",
542+
Address: fmt.Sprintf("%s-1.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
543+
},
544+
{
545+
Name: "broker-2",
546+
Address: fmt.Sprintf("%s-2.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
547+
},
546548
},
547549
},
548-
},
549-
ExternalListeners: map[string]v1beta1.ListenerStatusList{
550-
"test": {
551-
{
552-
Name: "any-broker",
553-
Address: "test.host.com:29092",
554-
},
555-
{
556-
Name: "broker-0",
557-
Address: "test.host.com:19090",
550+
ExternalListeners: map[string]v1beta1.ListenerStatusList{
551+
"test": {
552+
{
553+
Name: "any-broker",
554+
Address: "test.host.com:29092",
555+
},
556+
{
557+
Name: "broker-0",
558+
Address: "test.host.com:19090",
559+
},
560+
{
561+
Name: "broker-1",
562+
Address: "test.host.com:19091",
563+
},
564+
{
565+
Name: "broker-2",
566+
Address: "test.host.com:19092",
567+
},
558568
},
559-
{
560-
Name: "broker-1",
561-
Address: "test.host.com:19091",
569+
},
570+
}))
571+
}
572+
if kafkaCluster.Spec.KRaftMode == true {
573+
Expect(kafkaCluster.Status.ListenerStatuses).To(Equal(v1beta1.ListenerStatuses{
574+
InternalListeners: map[string]v1beta1.ListenerStatusList{
575+
"internal": {
576+
{
577+
Name: "any-broker",
578+
Address: fmt.Sprintf("%s-all-broker.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
579+
},
580+
{
581+
Name: "broker-0",
582+
Address: fmt.Sprintf("%s-0.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
583+
},
584+
{
585+
Name: "broker-2",
586+
Address: fmt.Sprintf("%s-2.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
587+
},
562588
},
563-
{
564-
Name: "broker-2",
565-
Address: "test.host.com:19092",
589+
},
590+
ExternalListeners: map[string]v1beta1.ListenerStatusList{
591+
"test": {
592+
{
593+
Name: "any-broker",
594+
Address: "test.host.com:29092",
595+
},
596+
{
597+
Name: "broker-0",
598+
Address: "test.host.com:19090",
599+
},
600+
{
601+
Name: "broker-1",
602+
Address: "test.host.com:19091",
603+
},
604+
{
605+
Name: "broker-2",
606+
Address: "test.host.com:19092",
607+
},
566608
},
567609
},
568-
},
569-
}))
610+
}))
611+
}
570612
for _, brokerState := range kafkaCluster.Status.BrokersState {
571613
Expect(brokerState.Version).To(Equal("3.4.1"))
572614
Expect(brokerState.Image).To(Equal(kafkaCluster.Spec.GetClusterImage()))

pkg/jmxextractor/extractor.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ import (
2929
)
3030

3131
const (
32-
headlessServiceJMXTemplate = "http://%s-%d." + kafka.HeadlessServiceTemplate + ".%s.svc.%s:%d"
33-
serviceJMXTemplate = "http://%s-%d.%s.svc.%s:%d"
34-
versionRegexGroup = "version"
32+
headlessServiceJMXTemplate = "http://%s-%d." + kafka.HeadlessServiceTemplate + ".%s.svc.%s:%d"
33+
headlessControllerServiceJMXTemplate = "http://%s-%d." + kafka.HeadlessControllerServiceTemplate + ".%s.svc.%s:%d"
34+
serviceJMXTemplate = "http://%s-%d.%s.svc.%s:%d"
35+
versionRegexGroup = "version"
3536
)
3637

3738
var newJMXExtractor = createNewJMXExtractor
@@ -74,9 +75,14 @@ func NewMockJMXExtractor() {
7475
func (exp *jmxExtractor) ExtractDockerImageAndVersion(brokerId int32, brokerConfig *v1beta1.BrokerConfig,
7576
clusterImage string, headlessServiceEnabled bool) (*v1beta1.KafkaVersion, error) {
7677
var requestURL string
78+
7779
if headlessServiceEnabled {
80+
var jmxTemplate = headlessServiceJMXTemplate
81+
if brokerConfig.IsControllerNode() {
82+
jmxTemplate = headlessControllerServiceJMXTemplate
83+
}
7884
requestURL =
79-
fmt.Sprintf(headlessServiceJMXTemplate,
85+
fmt.Sprintf(jmxTemplate,
8086
exp.clusterName, brokerId, exp.clusterName, exp.clusterNamespace,
8187
exp.kubernetesClusterDomain, 9020)
8288
} else {

0 commit comments

Comments
 (0)