Skip to content

Commit d7fc34e

Browse files
committed
Emit column level metadata in catalog and consume it back in
We want to allow users to configure which columns they ingest or don't ingest in and out of their incident data. This PR creates column level metadata for each property in a schema (it doesn't do nested properties). This lets us say yay or nay on a column by inspecting the metadata and checking the typical "selected" or "selected-by-default" properties we have there.
1 parent e2ac5bf commit d7fc34e

File tree

3 files changed

+66
-2
lines changed

3 files changed

+66
-2
lines changed

tap/catalog.go

+36
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package tap
22

33
import (
44
"github.com/incident-io/singer-tap/model"
5+
"github.com/pkg/errors"
6+
"github.com/samber/lo"
57
)
68

79
// A catalog can contain several streams or "entries"
@@ -58,6 +60,40 @@ func (c *Catalog) GetEnabledStreams() []CatalogEntry {
5860
return enabledStreams
5961
}
6062

63+
func (c *Catalog) GetDisabledFields(streamName string) (map[string]bool, error) {
64+
// Just something to enable quick lookups of fields by name
65+
var disabledField = map[string]bool{}
66+
67+
// For the given stream, get the enabled fields
68+
catalogEntry, found := lo.Find(c.Streams, func(stream CatalogEntry) bool { return stream.Stream == streamName })
69+
if !found {
70+
return nil, errors.Errorf("stream %s not found", streamName)
71+
}
72+
73+
// For this catalog entry, get the metadata, and build a list of all the enabled fields
74+
for _, metadata := range *catalogEntry.Metadata {
75+
// Ignore the top level metadata
76+
if len(metadata.Breadcrumb) == 0 {
77+
continue
78+
}
79+
80+
// Check if the metadata has the user input "selected" bool
81+
if metadata.Metadata.Selected != nil {
82+
// If so, check its set to false!
83+
if !*metadata.Metadata.Selected {
84+
disabledField[metadata.Breadcrumb[len(metadata.Breadcrumb)-1]] = true
85+
}
86+
} else {
87+
// There's no selected key, so check if WE have set the selected by default
88+
if !metadata.Metadata.SelectedByDefault {
89+
disabledField[metadata.Breadcrumb[len(metadata.Breadcrumb)-1]] = true
90+
}
91+
}
92+
}
93+
94+
return disabledField, nil
95+
}
96+
6197
func NewDefaultCatalog(streams map[string]Stream) *Catalog {
6298
entries := []CatalogEntry{}
6399

tap/metadata.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ type MetadataFields struct {
4444
// SelectedByDefault: If the user doesn't specify should we
4545
// emit this field by default
4646
// This really only applies to available inclusion setting
47-
SelectedByDefault bool `json:"selected-by-default"`
47+
SelectedByDefault bool `json:"selected-by-default,omitempty"`
4848

4949
// ForcedReplicateMethod: we will set to FULL_TABLE for our tap
50-
ForcedReplicationMethod string `json:"forced-replication-method"`
50+
ForcedReplicationMethod string `json:"forced-replication-method,omitempty"`
5151
}
5252

5353
func (m Metadata) DefaultMetadata(schema model.Schema) []Metadata {
@@ -64,5 +64,19 @@ func (m Metadata) DefaultMetadata(schema model.Schema) []Metadata {
6464
},
6565
}
6666

67+
// For columns we want to set the inclusion to available for everything - but we set
68+
// selected by default to true as well (so unless the user speficially says no, we'll include it)
69+
// We might want to get more intelligent later - as this way people could stop themselves
70+
// from getting key data by accident
71+
for name := range schema.Properties {
72+
metadata = append(metadata, Metadata{
73+
Breadcrumb: []string{"properties", name},
74+
Metadata: MetadataFields{
75+
Inclusion: "available",
76+
SelectedByDefault: true,
77+
},
78+
})
79+
}
80+
6781
return metadata
6882
}

tap/tap.go

+14
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,25 @@ func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *clien
2828

2929
timeExtracted := time.Now().Format(time.RFC3339)
3030
logger.Log("msg", "loading records", "time_extracted", timeExtracted)
31+
3132
records, err := stream.GetRecords(ctx, logger, cl)
3233
if err != nil {
3334
return err
3435
}
3536

37+
// Get the enabled fields for this stream
38+
disabledFields, err := catalog.GetDisabledFields(catalogEntry.Stream)
39+
if err != nil {
40+
return err
41+
}
42+
43+
// Filter out the disabled fields from each record (ew)
44+
for _, record := range records {
45+
for fieldName := range disabledFields {
46+
delete(record, fieldName)
47+
}
48+
}
49+
3650
logger.Log("msg", "outputting records", "count", len(records))
3751
for _, record := range records {
3852
op := &Output{

0 commit comments

Comments
 (0)