Skip to content

Commit 333dfa2

Browse files
committed
Add a stream filter for ... filtering streams
This implements the same interface as every stream and acts as a filter on the stream it wraps allowing us to nicely filter out disabled fields from both schemas and their results. We could probably get more clever on the result generation to save having to loop over again but for now this works yay!
1 parent d7fc34e commit 333dfa2

File tree

6 files changed

+77
-39
lines changed

6 files changed

+77
-39
lines changed

go.mod

+1-3
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@ require (
66
github.com/alecthomas/kingpin/v2 v2.3.2
77
github.com/deepmap/oapi-codegen v1.12.4
88
github.com/fatih/structs v1.1.0
9-
github.com/ghodss/yaml v1.0.0
109
github.com/go-kit/log v0.2.0
1110
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible
12-
github.com/google/go-jsonnet v0.20.0
1311
github.com/hashicorp/go-cleanhttp v0.5.2
1412
github.com/hashicorp/go-retryablehttp v0.7.2
1513
github.com/onsi/ginkgo/v2 v2.13.0
@@ -22,6 +20,7 @@ require (
2220
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
2321
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
2422
github.com/asaskevich/govalidator v0.0.0-20200108200545-475eaeb16496 // indirect
23+
github.com/fatih/color v1.12.0 // indirect
2524
github.com/getkin/kin-openapi v0.107.0 // indirect
2625
github.com/go-logfmt/logfmt v0.5.1 // indirect
2726
github.com/go-logr/logr v1.2.4 // indirect
@@ -52,5 +51,4 @@ require (
5251
golang.org/x/tools v0.12.0 // indirect
5352
gopkg.in/yaml.v2 v2.4.0 // indirect
5453
gopkg.in/yaml.v3 v3.0.1 // indirect
55-
sigs.k8s.io/yaml v1.1.0 // indirect
5654
)

go.sum

+4-8
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
2424
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
2525
github.com/getkin/kin-openapi v0.107.0 h1:bxhL6QArW7BXQj8NjXfIJQy680NsMKd25nwhvpCXchg=
2626
github.com/getkin/kin-openapi v0.107.0/go.mod h1:9Dhr+FasATJZjS4iOLvB0hkaxgYdulrNYm2e9epLWOo=
27-
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
28-
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
2927
github.com/go-kit/log v0.2.0 h1:7i2K3eKTos3Vc0enKCfnVcgHh2olr/MyfboYq7cAcFw=
3028
github.com/go-kit/log v0.2.0/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
3129
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
@@ -47,8 +45,6 @@ github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219 h1:utua3L2IbQJmauC
4745
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
4846
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
4947
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
50-
github.com/google/go-jsonnet v0.20.0 h1:WG4TTSARuV7bSm4PMB4ohjxe33IHT5WVTrJSU33uT4g=
51-
github.com/google/go-jsonnet v0.20.0/go.mod h1:VbgWF9JX7ztlv770x/TolZNGGFfiHEVx9G6ca2eUmeA=
5248
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
5349
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
5450
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
@@ -82,11 +78,13 @@ github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ
8278
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
8379
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
8480
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
81+
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
8582
github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
8683
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
8784
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
8885
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
8986
github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84=
87+
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
9088
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
9189
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
9290
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
@@ -105,8 +103,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
105103
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
106104
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
107105
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
108-
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
109-
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
110106
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
111107
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
112108
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@@ -135,6 +131,8 @@ golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
135131
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
136132
golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
137133
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
134+
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
135+
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
138136
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
139137
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
140138
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -160,5 +158,3 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C
160158
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
161159
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
162160
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
163-
sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs=
164-
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=

tap/catalog.go

+6-13
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package tap
22

33
import (
44
"github.com/incident-io/singer-tap/model"
5-
"github.com/pkg/errors"
6-
"github.com/samber/lo"
75
)
86

97
// A catalog can contain several streams or "entries"
@@ -60,18 +58,13 @@ func (c *Catalog) GetEnabledStreams() []CatalogEntry {
6058
return enabledStreams
6159
}
6260

63-
func (c *Catalog) GetDisabledFields(streamName string) (map[string]bool, error) {
61+
func (c *CatalogEntry) GetDisabledFields() map[string]bool {
6462
// Just something to enable quick lookups of fields by name
65-
var disabledField = map[string]bool{}
63+
var disabledFields = map[string]bool{}
6664

6765
// For the given stream, get the enabled fields
68-
catalogEntry, found := lo.Find(c.Streams, func(stream CatalogEntry) bool { return stream.Stream == streamName })
69-
if !found {
70-
return nil, errors.Errorf("stream %s not found", streamName)
71-
}
72-
7366
// For this catalog entry, get the metadata, and build a list of all the enabled fields
74-
for _, metadata := range *catalogEntry.Metadata {
67+
for _, metadata := range *c.Metadata {
7568
// Ignore the top level metadata
7669
if len(metadata.Breadcrumb) == 0 {
7770
continue
@@ -81,17 +74,17 @@ func (c *Catalog) GetDisabledFields(streamName string) (map[string]bool, error)
8174
if metadata.Metadata.Selected != nil {
8275
// If so, check its set to false!
8376
if !*metadata.Metadata.Selected {
84-
disabledField[metadata.Breadcrumb[len(metadata.Breadcrumb)-1]] = true
77+
disabledFields[metadata.Breadcrumb[len(metadata.Breadcrumb)-1]] = true
8578
}
8679
} else {
8780
// There's no selected key, so check if WE have set the selected by default
8881
if !metadata.Metadata.SelectedByDefault {
89-
disabledField[metadata.Breadcrumb[len(metadata.Breadcrumb)-1]] = true
82+
disabledFields[metadata.Breadcrumb[len(metadata.Breadcrumb)-1]] = true
9083
}
9184
}
9285
}
9386

94-
return disabledField, nil
87+
return disabledFields
9588
}
9689

9790
func NewDefaultCatalog(streams map[string]Stream) *Catalog {

tap/metadata.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package tap
22

3-
import "github.com/incident-io/singer-tap/model"
3+
import (
4+
"github.com/incident-io/singer-tap/model"
5+
)
46

57
type Metadata struct {
68
// Pointer to where in the schmea this metadata applies

tap/stream_filter.go

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package tap
2+
3+
import (
4+
"context"
5+
6+
kitlog "github.com/go-kit/log"
7+
"github.com/incident-io/singer-tap/client"
8+
"github.com/incident-io/singer-tap/model"
9+
)
10+
11+
type Filter struct {
12+
Stream Stream
13+
CatalogEntry CatalogEntry
14+
}
15+
16+
func (s *Filter) Output() *Output {
17+
output := s.Stream.Output()
18+
19+
// // We need to filter the schema based on the catalog entry we have
20+
output.Schema.Properties = s.filterProperties(output.Schema.Properties, s.CatalogEntry)
21+
22+
return output
23+
}
24+
25+
func (s *Filter) GetRecords(ctx context.Context, logger kitlog.Logger, cl *client.ClientWithResponses) ([]map[string]any, error) {
26+
records, err := s.Stream.GetRecords(ctx, logger, cl)
27+
if err != nil {
28+
return nil, err
29+
}
30+
31+
disabledFields := s.CatalogEntry.GetDisabledFields()
32+
33+
// Filter out the disabled fields from each record (ew)
34+
for _, record := range records {
35+
for fieldName := range disabledFields {
36+
delete(record, fieldName)
37+
}
38+
}
39+
40+
return records, nil
41+
}
42+
43+
func (s *Filter) filterProperties(properties map[string]model.Property, catalogEntry CatalogEntry) map[string]model.Property {
44+
filteredProperties := map[string]model.Property{}
45+
disabledFields := catalogEntry.GetDisabledFields()
46+
47+
for propertyName, property := range properties {
48+
// If the field is disabled, skip it
49+
if _, ok := disabledFields[propertyName]; ok {
50+
continue
51+
}
52+
53+
filteredProperties[propertyName] = property
54+
}
55+
56+
return filteredProperties
57+
}

tap/tap.go

+6-14
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@ func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *clien
1818
enabledStreams := catalog.GetEnabledStreams()
1919

2020
for _, catalogEntry := range enabledStreams {
21-
stream := streams[catalogEntry.Stream]
21+
// Use a filter to ensure we only output the fields we want
22+
stream := Filter{
23+
Stream: streams[catalogEntry.Stream],
24+
CatalogEntry: catalogEntry,
25+
}
26+
2227
logger := kitlog.With(logger, "stream", catalogEntry.Stream)
2328

2429
logger.Log("msg", "outputting schema")
@@ -34,19 +39,6 @@ func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *clien
3439
return err
3540
}
3641

37-
// Get the enabled fields for this stream
38-
disabledFields, err := catalog.GetDisabledFields(catalogEntry.Stream)
39-
if err != nil {
40-
return err
41-
}
42-
43-
// Filter out the disabled fields from each record (ew)
44-
for _, record := range records {
45-
for fieldName := range disabledFields {
46-
delete(record, fieldName)
47-
}
48-
}
49-
5042
logger.Log("msg", "outputting records", "count", len(records))
5143
for _, record := range records {
5244
op := &Output{

0 commit comments

Comments
 (0)