Skip to content

Commit 5fdafb0

Browse files
authored
Merge pull request #16 from incident-io/rob/emit-and-parse-column-level-metadata
Emit column level metadata in catalog and consume it back in
2 parents e2ac5bf + 333dfa2 commit 5fdafb0

File tree

6 files changed

+117
-15
lines changed

6 files changed

+117
-15
lines changed

go.mod

+1-3
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@ require (
66
github.com/alecthomas/kingpin/v2 v2.3.2
77
github.com/deepmap/oapi-codegen v1.12.4
88
github.com/fatih/structs v1.1.0
9-
github.com/ghodss/yaml v1.0.0
109
github.com/go-kit/log v0.2.0
1110
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible
12-
github.com/google/go-jsonnet v0.20.0
1311
github.com/hashicorp/go-cleanhttp v0.5.2
1412
github.com/hashicorp/go-retryablehttp v0.7.2
1513
github.com/onsi/ginkgo/v2 v2.13.0
@@ -22,6 +20,7 @@ require (
2220
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
2321
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
2422
github.com/asaskevich/govalidator v0.0.0-20200108200545-475eaeb16496 // indirect
23+
github.com/fatih/color v1.12.0 // indirect
2524
github.com/getkin/kin-openapi v0.107.0 // indirect
2625
github.com/go-logfmt/logfmt v0.5.1 // indirect
2726
github.com/go-logr/logr v1.2.4 // indirect
@@ -52,5 +51,4 @@ require (
5251
golang.org/x/tools v0.12.0 // indirect
5352
gopkg.in/yaml.v2 v2.4.0 // indirect
5453
gopkg.in/yaml.v3 v3.0.1 // indirect
55-
sigs.k8s.io/yaml v1.1.0 // indirect
5654
)

go.sum

+4-8
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
2424
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
2525
github.com/getkin/kin-openapi v0.107.0 h1:bxhL6QArW7BXQj8NjXfIJQy680NsMKd25nwhvpCXchg=
2626
github.com/getkin/kin-openapi v0.107.0/go.mod h1:9Dhr+FasATJZjS4iOLvB0hkaxgYdulrNYm2e9epLWOo=
27-
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
28-
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
2927
github.com/go-kit/log v0.2.0 h1:7i2K3eKTos3Vc0enKCfnVcgHh2olr/MyfboYq7cAcFw=
3028
github.com/go-kit/log v0.2.0/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
3129
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
@@ -47,8 +45,6 @@ github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219 h1:utua3L2IbQJmauC
4745
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
4846
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
4947
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
50-
github.com/google/go-jsonnet v0.20.0 h1:WG4TTSARuV7bSm4PMB4ohjxe33IHT5WVTrJSU33uT4g=
51-
github.com/google/go-jsonnet v0.20.0/go.mod h1:VbgWF9JX7ztlv770x/TolZNGGFfiHEVx9G6ca2eUmeA=
5248
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
5349
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
5450
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
@@ -82,11 +78,13 @@ github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ
8278
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
8379
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
8480
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
81+
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
8582
github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
8683
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
8784
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
8885
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
8986
github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84=
87+
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
9088
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
9189
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
9290
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
@@ -105,8 +103,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
105103
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
106104
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
107105
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
108-
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
109-
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
110106
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
111107
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
112108
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@@ -135,6 +131,8 @@ golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
135131
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
136132
golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
137133
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
134+
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
135+
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
138136
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
139137
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
140138
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -160,5 +158,3 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C
160158
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
161159
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
162160
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
163-
sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs=
164-
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=

tap/catalog.go

+29
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,35 @@ func (c *Catalog) GetEnabledStreams() []CatalogEntry {
5858
return enabledStreams
5959
}
6060

61+
func (c *CatalogEntry) GetDisabledFields() map[string]bool {
62+
// Just something to enable quick lookups of fields by name
63+
var disabledFields = map[string]bool{}
64+
65+
// For the given stream, get the enabled fields
66+
// For this catalog entry, get the metadata, and build a list of all the enabled fields
67+
for _, metadata := range *c.Metadata {
68+
// Ignore the top level metadata
69+
if len(metadata.Breadcrumb) == 0 {
70+
continue
71+
}
72+
73+
// Check if the metadata has the user input "selected" bool
74+
if metadata.Metadata.Selected != nil {
75+
// If so, check its set to false!
76+
if !*metadata.Metadata.Selected {
77+
disabledFields[metadata.Breadcrumb[len(metadata.Breadcrumb)-1]] = true
78+
}
79+
} else {
80+
// There's no selected key, so check if WE have set the selected by default
81+
if !metadata.Metadata.SelectedByDefault {
82+
disabledFields[metadata.Breadcrumb[len(metadata.Breadcrumb)-1]] = true
83+
}
84+
}
85+
}
86+
87+
return disabledFields
88+
}
89+
6190
func NewDefaultCatalog(streams map[string]Stream) *Catalog {
6291
entries := []CatalogEntry{}
6392

tap/metadata.go

+19-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package tap
22

3-
import "github.com/incident-io/singer-tap/model"
3+
import (
4+
"github.com/incident-io/singer-tap/model"
5+
)
46

57
type Metadata struct {
68
// Pointer to where in the schmea this metadata applies
@@ -44,10 +46,10 @@ type MetadataFields struct {
4446
// SelectedByDefault: If the user doesn't specify should we
4547
// emit this field by default
4648
// This really only applies to available inclusion setting
47-
SelectedByDefault bool `json:"selected-by-default"`
49+
SelectedByDefault bool `json:"selected-by-default,omitempty"`
4850

4951
// ForcedReplicateMethod: we will set to FULL_TABLE for our tap
50-
ForcedReplicationMethod string `json:"forced-replication-method"`
52+
ForcedReplicationMethod string `json:"forced-replication-method,omitempty"`
5153
}
5254

5355
func (m Metadata) DefaultMetadata(schema model.Schema) []Metadata {
@@ -64,5 +66,19 @@ func (m Metadata) DefaultMetadata(schema model.Schema) []Metadata {
6466
},
6567
}
6668

69+
// For columns we want to set the inclusion to available for everything - but we set
70+
// selected by default to true as well (so unless the user speficially says no, we'll include it)
71+
// We might want to get more intelligent later - as this way people could stop themselves
72+
// from getting key data by accident
73+
for name := range schema.Properties {
74+
metadata = append(metadata, Metadata{
75+
Breadcrumb: []string{"properties", name},
76+
Metadata: MetadataFields{
77+
Inclusion: "available",
78+
SelectedByDefault: true,
79+
},
80+
})
81+
}
82+
6783
return metadata
6884
}

tap/stream_filter.go

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package tap
2+
3+
import (
4+
"context"
5+
6+
kitlog "github.com/go-kit/log"
7+
"github.com/incident-io/singer-tap/client"
8+
"github.com/incident-io/singer-tap/model"
9+
)
10+
11+
type Filter struct {
12+
Stream Stream
13+
CatalogEntry CatalogEntry
14+
}
15+
16+
func (s *Filter) Output() *Output {
17+
output := s.Stream.Output()
18+
19+
// // We need to filter the schema based on the catalog entry we have
20+
output.Schema.Properties = s.filterProperties(output.Schema.Properties, s.CatalogEntry)
21+
22+
return output
23+
}
24+
25+
func (s *Filter) GetRecords(ctx context.Context, logger kitlog.Logger, cl *client.ClientWithResponses) ([]map[string]any, error) {
26+
records, err := s.Stream.GetRecords(ctx, logger, cl)
27+
if err != nil {
28+
return nil, err
29+
}
30+
31+
disabledFields := s.CatalogEntry.GetDisabledFields()
32+
33+
// Filter out the disabled fields from each record (ew)
34+
for _, record := range records {
35+
for fieldName := range disabledFields {
36+
delete(record, fieldName)
37+
}
38+
}
39+
40+
return records, nil
41+
}
42+
43+
func (s *Filter) filterProperties(properties map[string]model.Property, catalogEntry CatalogEntry) map[string]model.Property {
44+
filteredProperties := map[string]model.Property{}
45+
disabledFields := catalogEntry.GetDisabledFields()
46+
47+
for propertyName, property := range properties {
48+
// If the field is disabled, skip it
49+
if _, ok := disabledFields[propertyName]; ok {
50+
continue
51+
}
52+
53+
filteredProperties[propertyName] = property
54+
}
55+
56+
return filteredProperties
57+
}

tap/tap.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@ func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *clien
1818
enabledStreams := catalog.GetEnabledStreams()
1919

2020
for _, catalogEntry := range enabledStreams {
21-
stream := streams[catalogEntry.Stream]
21+
// Use a filter to ensure we only output the fields we want
22+
stream := Filter{
23+
Stream: streams[catalogEntry.Stream],
24+
CatalogEntry: catalogEntry,
25+
}
26+
2227
logger := kitlog.With(logger, "stream", catalogEntry.Stream)
2328

2429
logger.Log("msg", "outputting schema")
@@ -28,6 +33,7 @@ func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *clien
2833

2934
timeExtracted := time.Now().Format(time.RFC3339)
3035
logger.Log("msg", "loading records", "time_extracted", timeExtracted)
36+
3137
records, err := stream.GetRecords(ctx, logger, cl)
3238
if err != nil {
3339
return err

0 commit comments

Comments
 (0)