Skip to content

Commit 01e72ec

Browse files
committed
Add the discovery flag and method, along with model package
We want to be able to output schemas when we run in discovery mode so this PR adds that flag along with the associated method to do so. It just goes through the streams and calls Schema() along with adding a few other fields. In the future we will also need to generate metadata as part of that (and consume the file back in). The other change here is the start of pulling out the models we use to generate schemas and serialize data into their own package. This PR just pulls out the schema itself - but a followup will add a bunch of models representing entities in our API that we expose. Will just follow a simple pattern of having a schema / serialize method on them.
1 parent cbc6956 commit 01e72ec

File tree

8 files changed

+176
-53
lines changed

8 files changed

+176
-53
lines changed

cmd/tap-incident/cmd/app.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ var (
2828
app = kingpin.New("tap-incident", "Extract data from incident.io for use with Singer").Version(versionStanza())
2929

3030
// Global flags
31-
debug = app.Flag("debug", "Enable debug logging").Default("false").Bool()
32-
configFile = app.Flag("config", "Configuration file").ExistingFile()
33-
catalogFile = app.Flag("catalog", "If set, allows filtering which streams would be synced").ExistingFile()
31+
debug = app.Flag("debug", "Enable debug logging").Default("false").Bool()
32+
configFile = app.Flag("config", "Configuration file").ExistingFile()
33+
catalogFile = app.Flag("catalog", "If set, allows filtering which streams would be synced").ExistingFile()
34+
discoveryMode = app.Flag("discover", "If set, only outputs the catalog and exits").Default("false").Bool()
3435
)
3536

3637
func Run(ctx context.Context) (err error) {
@@ -79,7 +80,11 @@ func Run(ctx context.Context) (err error) {
7980
// can be streamed separately.
8081
ol := tap.NewOutputLogger(os.Stdout)
8182

82-
err = tap.Run(ctx, logger, ol, cl)
83+
if *discoveryMode {
84+
err = tap.Discover(ctx, logger, ol)
85+
} else {
86+
err = tap.Sync(ctx, logger, ol, cl)
87+
}
8388
if err != nil {
8489
return err
8590
}

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.21
55
require (
66
github.com/alecthomas/kingpin/v2 v2.3.2
77
github.com/deepmap/oapi-codegen v1.12.4
8+
github.com/fatih/structs v1.1.0
89
github.com/ghodss/yaml v1.0.0
910
github.com/go-kit/log v0.2.0
1011
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ github.com/deepmap/oapi-codegen v1.12.4/go.mod h1:3lgHGMu6myQ2vqbbTXH2H1o4eXFTGn
2020
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
2121
github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc=
2222
github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
23+
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
24+
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
2325
github.com/getkin/kin-openapi v0.107.0 h1:bxhL6QArW7BXQj8NjXfIJQy680NsMKd25nwhvpCXchg=
2426
github.com/getkin/kin-openapi v0.107.0/go.mod h1:9Dhr+FasATJZjS4iOLvB0hkaxgYdulrNYm2e9epLWOo=
2527
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=

model/schema.go

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package model
2+
3+
import (
4+
"strings"
5+
6+
"github.com/fatih/structs"
7+
)
8+
9+
// Schema is a JSON schema for a stream.
10+
type Schema struct {
11+
// Type is the type of the schema, e.g. "object" - for some reason singer docs
12+
// have this as an array and often nullable eg: `"type": ["null", "object"]`
13+
Type []string `json:"type"`
14+
// HasAdditionalProperties indicates whether the schema allows additional properties
15+
// not defined in the schema.
16+
HasAdditionalProperties bool `json:"additionalProperties"`
17+
// Properties is a map of property names to their schema.
18+
Properties map[string]Property `json:"properties"`
19+
}
20+
21+
// Property is a property in a JSON schema.
22+
type Property struct {
23+
// Types is a list of types that this property can be, e.g. "string" or "integer".
24+
Types []string `json:"type"`
25+
// CustomFormat is a custom format for this property, e.g. "date-time".
26+
CustomFormat string `json:"format,omitempty"`
27+
// For nested structures a property can have its own properties.
28+
Properties map[string]Property `json:"properties,omitempty"`
29+
// For array structures we define the type of the items in the array
30+
Items *ArrayItem `json:"items,omitempty"`
31+
}
32+
33+
// ArrayItem is the type and properties of an item in an array.
34+
type ArrayItem struct {
35+
Type string `json:"type"`
36+
Properties map[string]Property `json:"properties,omitempty"`
37+
}
38+
39+
func (s Property) IsBoolean() bool {
40+
return s.hasType("boolean")
41+
}
42+
43+
func (s Property) IsNumber() bool {
44+
return s.hasType("number")
45+
}
46+
47+
func (s Property) IsInteger() bool {
48+
return s.hasType("integer")
49+
}
50+
51+
func (s Property) hasType(typeName string) bool {
52+
for _, t := range s.Types {
53+
if strings.EqualFold(t, typeName) {
54+
return true
55+
}
56+
}
57+
return false
58+
}
59+
60+
func (s Property) IsDateTime() bool {
61+
return s.CustomFormat == "date-time"
62+
}
63+
64+
// As a shortcut for simple leaf nodes we can just dump everything (we can also dump everything higher level probably too)
65+
// If we're just going to dump everything why bother with serialisers? Good question.
66+
// a) Initial thoughts were that we want some control on the fields we output potentially - for example
67+
// ignoring deprecated fields.
68+
// b) We also might need to be cleverer when it comes to catalog config that enables / disables optional fields
69+
// in the output.
70+
//
71+
// Keeping this as a single callsite so it's easy to find where we're doing this in future.
72+
func DumpToMap(input interface{}) map[string]any {
73+
structs.DefaultTagName = "json"
74+
return structs.Map(input)
75+
}
76+
77+
func AsOptional(p Property) Property {
78+
p.Types = append(p.Types, "null")
79+
return p
80+
}
81+
82+
func AsArray(p Property) Property {
83+
return Property{
84+
Types: []string{"array"},
85+
Items: &ArrayItem{
86+
Type: "object",
87+
Properties: p.Properties,
88+
},
89+
}
90+
}

tap/catalog.go

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package tap
2+
3+
import "github.com/incident-io/singer-tap/model"
4+
5+
// A catalog can contain several streams or "entries"
6+
type CatalogEntry struct {
7+
// Name of the stream
8+
Stream string `json:"stream"`
9+
10+
// Unique identifier for this stream
11+
// Allows for multiple sources that have duplicate stream names
12+
TapStreamID string `json:"tap_stream_id"`
13+
14+
// The given schema for this stream
15+
Schema model.Schema `json:"schema"`
16+
17+
// Optional metadata for this stream
18+
// Metadata *[]Metadata `json:"metadata,omitempty"`
19+
}
20+
21+
// Actual catalog that we export
22+
// contains an array of all our streams
23+
type Catalog struct {
24+
Streams []CatalogEntry `json:"streams"`
25+
}
26+
27+
func NewCatalog(streams map[string]Stream) *Catalog {
28+
entries := []CatalogEntry{}
29+
30+
for name, stream := range streams {
31+
catalogEntry := CatalogEntry{
32+
Stream: name,
33+
TapStreamID: name,
34+
Schema: *stream.Output().Schema,
35+
}
36+
37+
entries = append(entries, catalogEntry)
38+
}
39+
40+
return &Catalog{
41+
Streams: entries,
42+
}
43+
}

tap/output.go

+17-46
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import (
44
"encoding/json"
55
"fmt"
66
"io"
7-
"strings"
7+
8+
"github.com/incident-io/singer-tap/model"
89
)
910

1011
// OutputType is the type of Singer tap output for each message.
@@ -29,7 +30,7 @@ type Output struct {
2930
// Stream is the name of the stream, e.g. "users"
3031
Stream string `json:"stream,omitempty"`
3132
// Schema is the schema of the stream, if Type == "SCHEMA", in JSON schema format.
32-
Schema *Schema `json:"schema,omitempty"`
33+
Schema *model.Schema `json:"schema,omitempty"`
3334
// Record is a record from the stream, if Type == "RECORD".
3435
Record map[string]any `json:"record,omitempty"`
3536
// TimeExtracted is the time that this record was extracted, if Type == "RECORD".
@@ -46,50 +47,6 @@ type Output struct {
4647
BookmarkProperties []string `json:"bookmark_properties,omitempty"`
4748
}
4849

49-
// Schema is a JSON schema for a stream.
50-
type Schema struct {
51-
// Type is the type of the schema, e.g. "object"
52-
Type []string `json:"type"`
53-
// HasAdditionalProperties indicates whether the schema allows additional properties
54-
// not defined in the schema.
55-
HasAdditionalProperties bool `json:"additionalProperties"`
56-
// Properties is a map of property names to their schema.
57-
Properties map[string]Property `json:"properties"`
58-
}
59-
60-
// Property is a property in a JSON schema.
61-
type Property struct {
62-
// Types is a list of types that this property can be, e.g. "string" or "integer".
63-
Types []string `json:"type"`
64-
// CustomFormat is a custom format for this property, e.g. "date-time".
65-
CustomFormat string `json:"format,omitempty"`
66-
}
67-
68-
func (s Property) IsBoolean() bool {
69-
return s.hasType("boolean")
70-
}
71-
72-
func (s Property) IsNumber() bool {
73-
return s.hasType("number")
74-
}
75-
76-
func (s Property) IsInteger() bool {
77-
return s.hasType("integer")
78-
}
79-
80-
func (s Property) hasType(typeName string) bool {
81-
for _, t := range s.Types {
82-
if strings.EqualFold(t, typeName) {
83-
return true
84-
}
85-
}
86-
return false
87-
}
88-
89-
func (s Property) IsDateTime() bool {
90-
return s.CustomFormat == "date-time"
91-
}
92-
9350
// OutputLogger is a logger that logs to STDOUT in the format expected by the downstream
9451
// Singer target.
9552
type OutputLogger struct {
@@ -113,3 +70,17 @@ func (o *OutputLogger) Log(op *Output) error {
11370

11471
return nil
11572
}
73+
74+
func (o *OutputLogger) CataLog(catalog *Catalog) error {
75+
data, err := json.Marshal(catalog)
76+
if err != nil {
77+
return err
78+
}
79+
80+
_, err = fmt.Fprintln(o.w, string(data))
81+
if err != nil {
82+
return err
83+
}
84+
85+
return nil
86+
}

tap/stream_incidents.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
kitlog "github.com/go-kit/log"
77
"github.com/incident-io/singer-tap/client"
8+
"github.com/incident-io/singer-tap/model"
89
"github.com/pkg/errors"
910
"github.com/samber/lo"
1011
)
@@ -20,9 +21,9 @@ func (s *StreamIncidents) Output() *Output {
2021
return &Output{
2122
Type: OutputTypeSchema,
2223
Stream: "incidents",
23-
Schema: &Schema{
24+
Schema: &model.Schema{
2425
Type: []string{"object"},
25-
Properties: map[string]Property{
26+
Properties: map[string]model.Property{
2627
"id": {
2728
Types: []string{"string"},
2829
},

tap/tap.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/incident-io/singer-tap/client"
99
)
1010

11-
func Run(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client.ClientWithResponses) error {
11+
func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client.ClientWithResponses) error {
1212
for name, stream := range streams {
1313
logger := kitlog.With(logger, "stream", name)
1414

@@ -40,3 +40,13 @@ func Run(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client
4040

4141
return nil
4242
}
43+
44+
func Discover(ctx context.Context, logger kitlog.Logger, ol *OutputLogger) error {
45+
catalog := NewCatalog(streams)
46+
47+
if err := ol.CataLog(catalog); err != nil {
48+
return err
49+
}
50+
51+
return nil
52+
}

0 commit comments

Comments
 (0)