Skip to content

Commit 413c5c4

Browse files
authored
Merge pull request #2 from incident-io/rob/add-discovery-mode-and-model-basics
Add the discovery flag and method, along with model package
2 parents cbc6956 + 01e72ec commit 413c5c4

File tree

8 files changed

+176
-53
lines changed

8 files changed

+176
-53
lines changed

cmd/tap-incident/cmd/app.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ var (
2828
app = kingpin.New("tap-incident", "Extract data from incident.io for use with Singer").Version(versionStanza())
2929

3030
// Global flags
31-
debug = app.Flag("debug", "Enable debug logging").Default("false").Bool()
32-
configFile = app.Flag("config", "Configuration file").ExistingFile()
33-
catalogFile = app.Flag("catalog", "If set, allows filtering which streams would be synced").ExistingFile()
31+
debug = app.Flag("debug", "Enable debug logging").Default("false").Bool()
32+
configFile = app.Flag("config", "Configuration file").ExistingFile()
33+
catalogFile = app.Flag("catalog", "If set, allows filtering which streams would be synced").ExistingFile()
34+
discoveryMode = app.Flag("discover", "If set, only outputs the catalog and exits").Default("false").Bool()
3435
)
3536

3637
func Run(ctx context.Context) (err error) {
@@ -79,7 +80,11 @@ func Run(ctx context.Context) (err error) {
7980
// can be streamed separately.
8081
ol := tap.NewOutputLogger(os.Stdout)
8182

82-
err = tap.Run(ctx, logger, ol, cl)
83+
if *discoveryMode {
84+
err = tap.Discover(ctx, logger, ol)
85+
} else {
86+
err = tap.Sync(ctx, logger, ol, cl)
87+
}
8388
if err != nil {
8489
return err
8590
}

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.21
55
require (
66
github.com/alecthomas/kingpin/v2 v2.3.2
77
github.com/deepmap/oapi-codegen v1.12.4
8+
github.com/fatih/structs v1.1.0
89
github.com/ghodss/yaml v1.0.0
910
github.com/go-kit/log v0.2.0
1011
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ github.com/deepmap/oapi-codegen v1.12.4/go.mod h1:3lgHGMu6myQ2vqbbTXH2H1o4eXFTGn
2020
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
2121
github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc=
2222
github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
23+
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
24+
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
2325
github.com/getkin/kin-openapi v0.107.0 h1:bxhL6QArW7BXQj8NjXfIJQy680NsMKd25nwhvpCXchg=
2426
github.com/getkin/kin-openapi v0.107.0/go.mod h1:9Dhr+FasATJZjS4iOLvB0hkaxgYdulrNYm2e9epLWOo=
2527
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=

model/schema.go

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package model
2+
3+
import (
4+
"strings"
5+
6+
"github.com/fatih/structs"
7+
)
8+
9+
// Schema is a JSON schema for a stream.
10+
type Schema struct {
11+
// Type is the type of the schema, e.g. "object" - for some reason singer docs
12+
// have this as an array and often nullable eg: `"type": ["null", "object"]`
13+
Type []string `json:"type"`
14+
// HasAdditionalProperties indicates whether the schema allows additional properties
15+
// not defined in the schema.
16+
HasAdditionalProperties bool `json:"additionalProperties"`
17+
// Properties is a map of property names to their schema.
18+
Properties map[string]Property `json:"properties"`
19+
}
20+
21+
// Property is a property in a JSON schema.
22+
type Property struct {
23+
// Types is a list of types that this property can be, e.g. "string" or "integer".
24+
Types []string `json:"type"`
25+
// CustomFormat is a custom format for this property, e.g. "date-time".
26+
CustomFormat string `json:"format,omitempty"`
27+
// For nested structures a property can have its own properties.
28+
Properties map[string]Property `json:"properties,omitempty"`
29+
// For array structures we define the type of the items in the array
30+
Items *ArrayItem `json:"items,omitempty"`
31+
}
32+
33+
// ArrayItem is the type and properties of an item in an array.
34+
type ArrayItem struct {
35+
Type string `json:"type"`
36+
Properties map[string]Property `json:"properties,omitempty"`
37+
}
38+
39+
func (s Property) IsBoolean() bool {
40+
return s.hasType("boolean")
41+
}
42+
43+
func (s Property) IsNumber() bool {
44+
return s.hasType("number")
45+
}
46+
47+
func (s Property) IsInteger() bool {
48+
return s.hasType("integer")
49+
}
50+
51+
func (s Property) hasType(typeName string) bool {
52+
for _, t := range s.Types {
53+
if strings.EqualFold(t, typeName) {
54+
return true
55+
}
56+
}
57+
return false
58+
}
59+
60+
func (s Property) IsDateTime() bool {
61+
return s.CustomFormat == "date-time"
62+
}
63+
64+
// As a shortcut for simple leaf nodes we can just dump everything (we can also dump everything higher level probably too)
65+
// If we're just going to dump everything why bother with serialisers? Good question.
66+
// a) Initial thoughts were that we want some control on the fields we output potentially - for example
67+
// ignoring deprecated fields.
68+
// b) We also might need to be cleverer when it comes to catalog config that enables / disables optional fields
69+
// in the output.
70+
//
71+
// Keeping this as a single callsite so it's easy to find where we're doing this in future.
72+
func DumpToMap(input interface{}) map[string]any {
73+
structs.DefaultTagName = "json"
74+
return structs.Map(input)
75+
}
76+
77+
func AsOptional(p Property) Property {
78+
p.Types = append(p.Types, "null")
79+
return p
80+
}
81+
82+
func AsArray(p Property) Property {
83+
return Property{
84+
Types: []string{"array"},
85+
Items: &ArrayItem{
86+
Type: "object",
87+
Properties: p.Properties,
88+
},
89+
}
90+
}

tap/catalog.go

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package tap
2+
3+
import "github.com/incident-io/singer-tap/model"
4+
5+
// A catalog can contain several streams or "entries"
6+
type CatalogEntry struct {
7+
// Name of the stream
8+
Stream string `json:"stream"`
9+
10+
// Unique identifier for this stream
11+
// Allows for multiple sources that have duplicate stream names
12+
TapStreamID string `json:"tap_stream_id"`
13+
14+
// The given schema for this stream
15+
Schema model.Schema `json:"schema"`
16+
17+
// Optional metadata for this stream
18+
// Metadata *[]Metadata `json:"metadata,omitempty"`
19+
}
20+
21+
// Actual catalog that we export
22+
// contains an array of all our streams
23+
type Catalog struct {
24+
Streams []CatalogEntry `json:"streams"`
25+
}
26+
27+
func NewCatalog(streams map[string]Stream) *Catalog {
28+
entries := []CatalogEntry{}
29+
30+
for name, stream := range streams {
31+
catalogEntry := CatalogEntry{
32+
Stream: name,
33+
TapStreamID: name,
34+
Schema: *stream.Output().Schema,
35+
}
36+
37+
entries = append(entries, catalogEntry)
38+
}
39+
40+
return &Catalog{
41+
Streams: entries,
42+
}
43+
}

tap/output.go

+17-46
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import (
44
"encoding/json"
55
"fmt"
66
"io"
7-
"strings"
7+
8+
"github.com/incident-io/singer-tap/model"
89
)
910

1011
// OutputType is the type of Singer tap output for each message.
@@ -29,7 +30,7 @@ type Output struct {
2930
// Stream is the name of the stream, e.g. "users"
3031
Stream string `json:"stream,omitempty"`
3132
// Schema is the schema of the stream, if Type == "SCHEMA", in JSON schema format.
32-
Schema *Schema `json:"schema,omitempty"`
33+
Schema *model.Schema `json:"schema,omitempty"`
3334
// Record is a record from the stream, if Type == "RECORD".
3435
Record map[string]any `json:"record,omitempty"`
3536
// TimeExtracted is the time that this record was extracted, if Type == "RECORD".
@@ -46,50 +47,6 @@ type Output struct {
4647
BookmarkProperties []string `json:"bookmark_properties,omitempty"`
4748
}
4849

49-
// Schema is a JSON schema for a stream.
50-
type Schema struct {
51-
// Type is the type of the schema, e.g. "object"
52-
Type []string `json:"type"`
53-
// HasAdditionalProperties indicates whether the schema allows additional properties
54-
// not defined in the schema.
55-
HasAdditionalProperties bool `json:"additionalProperties"`
56-
// Properties is a map of property names to their schema.
57-
Properties map[string]Property `json:"properties"`
58-
}
59-
60-
// Property is a property in a JSON schema.
61-
type Property struct {
62-
// Types is a list of types that this property can be, e.g. "string" or "integer".
63-
Types []string `json:"type"`
64-
// CustomFormat is a custom format for this property, e.g. "date-time".
65-
CustomFormat string `json:"format,omitempty"`
66-
}
67-
68-
func (s Property) IsBoolean() bool {
69-
return s.hasType("boolean")
70-
}
71-
72-
func (s Property) IsNumber() bool {
73-
return s.hasType("number")
74-
}
75-
76-
func (s Property) IsInteger() bool {
77-
return s.hasType("integer")
78-
}
79-
80-
func (s Property) hasType(typeName string) bool {
81-
for _, t := range s.Types {
82-
if strings.EqualFold(t, typeName) {
83-
return true
84-
}
85-
}
86-
return false
87-
}
88-
89-
func (s Property) IsDateTime() bool {
90-
return s.CustomFormat == "date-time"
91-
}
92-
9350
// OutputLogger is a logger that logs to STDOUT in the format expected by the downstream
9451
// Singer target.
9552
type OutputLogger struct {
@@ -113,3 +70,17 @@ func (o *OutputLogger) Log(op *Output) error {
11370

11471
return nil
11572
}
73+
74+
func (o *OutputLogger) CataLog(catalog *Catalog) error {
75+
data, err := json.Marshal(catalog)
76+
if err != nil {
77+
return err
78+
}
79+
80+
_, err = fmt.Fprintln(o.w, string(data))
81+
if err != nil {
82+
return err
83+
}
84+
85+
return nil
86+
}

tap/stream_incidents.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
kitlog "github.com/go-kit/log"
77
"github.com/incident-io/singer-tap/client"
8+
"github.com/incident-io/singer-tap/model"
89
"github.com/pkg/errors"
910
"github.com/samber/lo"
1011
)
@@ -20,9 +21,9 @@ func (s *StreamIncidents) Output() *Output {
2021
return &Output{
2122
Type: OutputTypeSchema,
2223
Stream: "incidents",
23-
Schema: &Schema{
24+
Schema: &model.Schema{
2425
Type: []string{"object"},
25-
Properties: map[string]Property{
26+
Properties: map[string]model.Property{
2627
"id": {
2728
Types: []string{"string"},
2829
},

tap/tap.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/incident-io/singer-tap/client"
99
)
1010

11-
func Run(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client.ClientWithResponses) error {
11+
func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client.ClientWithResponses) error {
1212
for name, stream := range streams {
1313
logger := kitlog.With(logger, "stream", name)
1414

@@ -40,3 +40,13 @@ func Run(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client
4040

4141
return nil
4242
}
43+
44+
func Discover(ctx context.Context, logger kitlog.Logger, ol *OutputLogger) error {
45+
catalog := NewCatalog(streams)
46+
47+
if err := ol.CataLog(catalog); err != nil {
48+
return err
49+
}
50+
51+
return nil
52+
}

0 commit comments

Comments
 (0)