Skip to content

Commit 52ef012

Browse files
authored
Merge pull request #262 from christian-olsen/master
feat: Support HTTP basic authentication for schema registry connections
2 parents 556bed0 + d34d114 commit 52ef012

File tree

5 files changed

+61
-10
lines changed

5 files changed

+61
-10
lines changed

cmd/kaf/consume.go

+2
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ func withConsumerGroup(ctx context.Context, client sarama.Client, topic, group s
162162
errorExit("Failed to create consumer group: %v", err)
163163
}
164164

165+
schemaCache = getSchemaCache()
166+
165167
err = cg.Consume(ctx, []string{topic}, &g{})
166168
if err != nil {
167169
errorExit("Error on consume: %v", err)

cmd/kaf/kaf.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ func onInit() {
207207
// Any set flags override the configuration
208208
if schemaRegistryURL != "" {
209209
currentCluster.SchemaRegistryURL = schemaRegistryURL
210+
currentCluster.SchemaRegistryCredentials = nil
210211
}
211212

212213
if brokersFlag != nil {
@@ -247,7 +248,12 @@ func getSchemaCache() (cache *avro.SchemaCache) {
247248
if currentCluster.SchemaRegistryURL == "" {
248249
return nil
249250
}
250-
cache, err := avro.NewSchemaCache(currentCluster.SchemaRegistryURL)
251+
var username, password string
252+
if creds := currentCluster.SchemaRegistryCredentials; creds != nil {
253+
username = creds.Username
254+
password = creds.Password
255+
}
256+
cache, err := avro.NewSchemaCache(currentCluster.SchemaRegistryURL, username, password)
251257
if err != nil {
252258
errorExit("Unable to get schema cache :%v\n", err)
253259
}
+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
clusters:
2+
- name: local
3+
brokers:
4+
- localhost:9092
5+
SASL: null
6+
TLS: null
7+
security-protocol: ""
8+
version: "1.0.0"
9+
schema-registry-url: https://schema.registry.url
10+
schema-registry-credentials:
11+
username: httpbasicauthuser
12+
password: mypasswordisnotsobasic

pkg/avro/schema.go

+27-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package avro
22

33
import (
4+
"encoding/base64"
45
"encoding/binary"
6+
"net/http"
57
"sync"
68

79
schemaregistry "github.com/Landoop/schema-registry"
@@ -23,9 +25,32 @@ type SchemaCache struct {
2325
codecsBySchemaID map[int]*cachedCodec
2426
}
2527

28+
type transport struct {
29+
underlyingTransport http.RoundTripper
30+
encodedCredentials string
31+
}
32+
33+
// RoundTrip wraps the underlying transport's RoundTripper and injects a
34+
// HTTP Basic authentication header if credentials are provided.
35+
func (t *transport) RoundTrip(req *http.Request) (*http.Response, error) {
36+
if t.encodedCredentials != "" {
37+
req.Header.Add("Authorization", "Basic "+t.encodedCredentials)
38+
}
39+
return t.underlyingTransport.RoundTrip(req)
40+
}
41+
2642
// NewSchemaCache returns a new Cache instance
27-
func NewSchemaCache(url string) (*SchemaCache, error) {
28-
client, err := schemaregistry.NewClient(url)
43+
func NewSchemaCache(url string, username string, password string) (*SchemaCache, error) {
44+
var encodedCredentials string
45+
if username != "" {
46+
encodedCredentials = base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
47+
}
48+
httpClient := &http.Client{Transport: &transport{
49+
underlyingTransport: http.DefaultTransport,
50+
encodedCredentials: encodedCredentials,
51+
}}
52+
53+
client, err := schemaregistry.NewClient(url, schemaregistry.UsingClient(httpClient))
2954
if err != nil {
3055
return nil, err
3156
}

pkg/config/config.go

+13-7
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,20 @@ type TLS struct {
2626
Insecure bool
2727
}
2828

29+
type SchemaRegistryCredentials struct {
30+
Username string `yaml:"username"`
31+
Password string `yaml:"password"`
32+
}
33+
2934
type Cluster struct {
30-
Name string
31-
Version string `yaml:"version"`
32-
Brokers []string `yaml:"brokers"`
33-
SASL *SASL `yaml:"SASL"`
34-
TLS *TLS `yaml:"TLS"`
35-
SecurityProtocol string `yaml:"security-protocol"`
36-
SchemaRegistryURL string `yaml:"schema-registry-url"`
35+
Name string
36+
Version string `yaml:"version"`
37+
Brokers []string `yaml:"brokers"`
38+
SASL *SASL `yaml:"SASL"`
39+
TLS *TLS `yaml:"TLS"`
40+
SecurityProtocol string `yaml:"security-protocol"`
41+
SchemaRegistryURL string `yaml:"schema-registry-url"`
42+
SchemaRegistryCredentials *SchemaRegistryCredentials `yaml:"schema-registry-credentials"`
3743
}
3844

3945
type Config struct {

0 commit comments

Comments
 (0)