Skip to content

Commit

Permalink
Merge pull request #3036 from redpanda-data/mihaitodor-fix-redpanda-m…
Browse files Browse the repository at this point in the history
…igrator-output-schema-registration

Fix `redpanda_migrator` output destination schema registration
  • Loading branch information
mihaitodor authored Nov 26, 2024
2 parents 244e40d + 9fdf9c6 commit 6ec36e7
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 26 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ All notable changes to this project will be documented in this file.

- Add support for `spanner` driver to SQL plugins. (@yufeng-deng)

### Changed

- The `redpanda_migrator` output now registers destination schemas with all the subjects associated with the source schema ID extracted from each message. (@mihaitodor)

## 4.41.0 - 2024-11-25

### Added
Expand Down
10 changes: 7 additions & 3 deletions internal/impl/confluent/sr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,13 @@ func (c *Client) GetSchemaByID(ctx context.Context, id int, includeDeleted bool)
return schema, nil
}

// GetSchemaByIDAndSubject gets a schema by its global identifier.
func (c *Client) GetSchemaByIDAndSubject(ctx context.Context, id int, subject string, includeDeleted bool) (sr.Schema, error) {
return c.GetSchemaByID(sr.WithParams(ctx, sr.Subject(subject)), id, includeDeleted)
// GetSubjectsBySchemaID returns the registered subjects for a given schema ID.
func (c *Client) GetSubjectsBySchemaID(ctx context.Context, id int, includeDeleted bool) ([]string, error) {
if includeDeleted {
ctx = sr.WithParams(ctx, sr.ShowDeleted)
}

return c.clientSR.SubjectsByID(ctx, id)
}

// GetLatestSchemaVersionForSchemaIDAndSubject gets the latest version of a schema by its global identifier scoped to the provided subject.
Expand Down
8 changes: 4 additions & 4 deletions internal/impl/kafka/enterprise/redpanda_migrator_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,14 @@ func (w *RedpandaMigratorWriter) WriteBatch(ctx context.Context, b service.Messa
for recordIdx, record := range records {
schemaID, _, err := ch.DecodeID(record.Value)
if err != nil {
return fmt.Errorf("failed to extract schema ID from message index %d for topic %q: %s", recordIdx, record.Topic, err)
return fmt.Errorf("failed to extract schema ID from message index %d: %s", recordIdx, err)
}

var destSchemaID int
if cachedID, ok := w.schemaIDCache.Load(schemaID); !ok {
destSchemaID, err = w.schemaRegistryOutput.GetDestinationSchemaID(ctx, schemaID, record.Topic)
destSchemaID, err = w.schemaRegistryOutput.GetDestinationSchemaID(ctx, schemaID)
if err != nil {
return fmt.Errorf("failed to fetch destination schema ID from message index %d for topic %q: %s", recordIdx, record.Topic, err)
return fmt.Errorf("failed to fetch destination schema ID from message index %d: %s", recordIdx, err)
}
w.schemaIDCache.Store(schemaID, destSchemaID)
} else {
Expand All @@ -290,7 +290,7 @@ func (w *RedpandaMigratorWriter) WriteBatch(ctx context.Context, b service.Messa

err = sr.UpdateID(record.Value, destSchemaID)
if err != nil {
return fmt.Errorf("failed to extract schema ID from message index %d for topic %q: %s", recordIdx, record.Topic, err)
return fmt.Errorf("failed to update schema ID in message index %d: %s", recordIdx, err)
}
}
}
Expand Down
51 changes: 35 additions & 16 deletions internal/impl/kafka/enterprise/schema_registry_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,28 +239,47 @@ func (o *schemaRegistryOutput) Close(_ context.Context) error {

//------------------------------------------------------------------------------

// GetDestinationSchemaID attempts to fetch the schema ID for the provided source schema ID and subject. It will first
// migrate it to the destination Schema Registry if it doesn't exist there yet.
func (o *schemaRegistryOutput) GetDestinationSchemaID(ctx context.Context, id int, subject string) (int, error) {
schema, err := o.inputClient.GetSchemaByIDAndSubject(ctx, id, subject, false)
// GetDestinationSchemaID attempts to fetch the schema ID for the provided source schema ID. It will first migrate it to
// the destination Schema Registry if it doesn't exist there yet.
func (o *schemaRegistryOutput) GetDestinationSchemaID(ctx context.Context, id int) (int, error) {
schema, err := o.inputClient.GetSchemaByID(ctx, id, false)
if err != nil {
return -1, fmt.Errorf("failed to get schema for ID %d and subject %q: %s", id, subject, err)
return -1, fmt.Errorf("failed to get schema for ID %d: %s", id, err)
}

latestVersion, err := o.inputClient.GetLatestSchemaVersionForSchemaIDAndSubject(ctx, id, subject)
schemaSubjects, err := o.inputClient.GetSubjectsBySchemaID(ctx, id, false)
if err != nil {
return -1, fmt.Errorf("failed to get schema for ID %d and subject %q: %s", id, subject, err)
return -1, fmt.Errorf("failed to get subjects for schema ID %d: %s", id, err)
}

return o.getOrCreateSchemaID(
ctx,
franz_sr.SubjectSchema{
Subject: subject,
Version: latestVersion,
ID: id,
Schema: schema,
},
)
if len(schemaSubjects) == 0 {
return -1, fmt.Errorf("no subjects found for schema ID %d", id)
}

// Register the schema with all the subjects it's associated with in the source Schema Registry. Each call should
// return the same destination schema ID.
var destinationID int
for _, subject := range schemaSubjects {
latestVersion, err := o.inputClient.GetLatestSchemaVersionForSchemaIDAndSubject(ctx, id, subject)
if err != nil {
return -1, fmt.Errorf("failed to get schema for ID %d and subject %q: %s", id, subject, err)
}

destinationID, err = o.getOrCreateSchemaID(
ctx,
franz_sr.SubjectSchema{
Subject: subject,
Version: latestVersion,
ID: id,
Schema: schema,
},
)
if err != nil {
return -1, fmt.Errorf("failed to get destination schema ID for source schema ID %d, subject %q and version %d: %s", id, subject, latestVersion, err)
}
}

return destinationID, nil
}

// schemaLineageCacheKey is used as a lightweight key for the schema ID map cache so we don't store the full schemas in
Expand Down
9 changes: 6 additions & 3 deletions internal/impl/kafka/enterprise/schema_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func TestSchemaRegistry(t *testing.T) {
output = dummySchema
case "/schemas/ids/2":
output = dummySchemaWithRef
case "/schemas/ids/1/subjects":
output = []string{"foo"}
case "/schemas/ids/2/subjects":
output = []string{"bar"}
case "/schemas/ids/1/versions":
output = []map[string]any{{"subject": "foo", "version": 1}}
case "/schemas/ids/2/versions":
Expand Down Expand Up @@ -97,7 +101,6 @@ func TestSchemaRegistry(t *testing.T) {

inputConf, err := schemaRegistryInputSpec().ParseYAML(fmt.Sprintf(`
url: %s
subject: foo
`, ts.URL), nil)
require.NoError(t, err)

Expand Down Expand Up @@ -139,10 +142,10 @@ subject: ${! @schema_registry_subject }

// Ensure that the written schemas are correctly returned.
// TODO: Use a secondary test server for the writer so we can check that they're actually written.
destID, err := writer.GetDestinationSchemaID(ctx, 1, "foo")
destID, err := writer.GetDestinationSchemaID(ctx, 1)
require.NoError(t, err)
assert.Equal(t, 1, destID)
destID, err = writer.GetDestinationSchemaID(ctx, 2, "bar")
destID, err = writer.GetDestinationSchemaID(ctx, 2)
require.NoError(t, err)
assert.Equal(t, 2, destID)
}

0 comments on commit 6ec36e7

Please sign in to comment.