Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for comma-separated URLs #1364

Merged
merged 3 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion schemaregistry/internal/client_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

// ClientConfig is used to pass multiple configuration options to the Schema Registry client.
type ClientConfig struct {
// SchemaRegistryURL determines the URL of Schema Registry.
// SchemaRegistryURL is a comma-space separated list of URLs for the Schema Registry.
SchemaRegistryURL string

// BasicAuthUserInfo specifies the user info in the form of {username}:{password}.
Expand Down
83 changes: 51 additions & 32 deletions schemaregistry/internal/rest_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewRequest(method string, endpoint string, body interface{}, arguments ...i

// RestService represents a REST client
type RestService struct {
url *url.URL
urls []*url.URL
headers http.Header
maxRetries int
retriesWaitMs int
Expand All @@ -124,21 +124,22 @@ type RestService struct {
// NewRestService returns a new REST client for the Confluent Schema Registry
func NewRestService(conf *ClientConfig) (*RestService, error) {
urlConf := conf.SchemaRegistryURL
u, err := url.Parse(urlConf)

if err != nil {
return nil, err
urlStrs := strings.Split(urlConf, ",")
urls := make([]*url.URL, len(urlStrs))
for i, urlStr := range urlStrs {
u, err := url.Parse(strings.TrimSpace(urlStr))
if err != nil {
return nil, err
}
urls[i] = u
}

headers, err := NewAuthHeader(u, conf)
headers, err := NewAuthHeader(urls[0], conf)
if err != nil {
return nil, err
}

headers.Add("Content-Type", "application/vnd.schemaregistry.v1+json")
if err != nil {
return nil, err
}

if conf.HTTPClient == nil {
transport, err := configureTransport(conf)
Expand All @@ -155,7 +156,7 @@ func NewRestService(conf *ClientConfig) (*RestService, error) {
}

return &RestService{
url: u,
urls: urls,
headers: headers,
maxRetries: conf.MaxRetries,
retriesWaitMs: conf.RetriesWaitMs,
Expand Down Expand Up @@ -337,19 +338,51 @@ func NewAuthHeader(service *url.URL, conf *ClientConfig) (http.Header, error) {
return header, err
}

// HandleRequest sends a HTTP(S) request to the Schema Registry, placing results into the response object
// HandleRequest sends a request to the Schema Registry, iterating over the list of URLs
func (rs *RestService) HandleRequest(request *API, response interface{}) error {
urlPath := path.Join(rs.url.Path, fmt.Sprintf(request.endpoint, request.arguments...))
endpoint, err := rs.url.Parse(urlPath)
if err != nil {
var resp *http.Response
var err error
for i, u := range rs.urls {
resp, err = rs.HandleHTTPRequest(u, request)
if err != nil {
if i == len(rs.urls)-1 {
return err
}
continue
}
if isSuccess(resp.StatusCode) || !isRetriable(resp.StatusCode) || i >= rs.maxRetries {
break
}
}
defer resp.Body.Close()
if isSuccess(resp.StatusCode) {
if err = json.NewDecoder(resp.Body).Decode(response); err != nil {
return err
}
return nil
}

var failure rest.Error
if err = json.NewDecoder(resp.Body).Decode(&failure); err != nil {
return err
}

return &failure
}

// HandleHTTPRequest sends a HTTP(S) request to the Schema Registry, placing results into the response object
func (rs *RestService) HandleHTTPRequest(url *url.URL, request *API) (*http.Response, error) {
urlPath := path.Join(url.Path, fmt.Sprintf(request.endpoint, request.arguments...))
endpoint, err := url.Parse(urlPath)
if err != nil {
return nil, err
}

var readCloser io.ReadCloser
if request.body != nil {
outbuf, err := json.Marshal(request.body)
if err != nil {
return err
return nil, err
}
readCloser = ioutil.NopCloser(bytes.NewBuffer(outbuf))
}
Expand All @@ -365,30 +398,16 @@ func (rs *RestService) HandleRequest(request *API, response interface{}) error {
for i := 0; i < rs.maxRetries+1; i++ {
resp, err = rs.Do(req)
if err != nil {
return err
return nil, err
}

if isSuccess(resp.StatusCode) || !isRetriable(resp.StatusCode) || i >= rs.maxRetries {
break
return resp, nil
}

time.Sleep(rs.fullJitter(i))
}

defer resp.Body.Close()
if resp.StatusCode == 200 {
if err = json.NewDecoder(resp.Body).Decode(response); err != nil {
return err
}
return nil
}

var failure rest.Error
if err := json.NewDecoder(resp.Body).Decode(&failure); err != nil {
return err
}

return &failure
return nil, fmt.Errorf("failed to send request after %d retries", rs.maxRetries)
}

func (rs *RestService) fullJitter(retriesAttempted int) time.Duration {
Expand Down
4 changes: 3 additions & 1 deletion schemaregistry/serde/avrov2/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,8 @@ func TestAvroSerdeWithCELFieldTransformDisable(t *testing.T) {
OnFailure: nil,
Disabled: &[]bool{true}[0],
})
ser.RuleRegistry = &registry

id, err := client.Register("topic1-value", info, false)
serde.MaybeFail("Schema registration", err)
if id <= 0 {
Expand All @@ -960,7 +962,7 @@ func TestAvroSerdeWithCELFieldTransformDisable(t *testing.T) {
deser.MessageFactory = testMessageFactory

newobj, err := deser.Deserialize("topic1", bytes)
serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj))
serde.MaybeFail("deserialization", err, serde.Expect(newobj.(*DemoSchema).StringField, "hi"))
}

func TestAvroSerdeWithCELFieldTransform(t *testing.T) {
Expand Down