Skip to content

Commit 5f3bdd3

Browse files
committed
Support for incidents
1 parent 3157850 commit 5f3bdd3

File tree

7 files changed

+261
-4
lines changed

7 files changed

+261
-4
lines changed

cmd/incident-tap/cmd/app.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/alecthomas/kingpin/v2"
1717
kitlog "github.com/go-kit/log"
1818
"github.com/go-kit/log/level"
19+
"github.com/incident-io/singer-tap/client"
1920
"github.com/incident-io/singer-tap/config"
2021
"github.com/incident-io/singer-tap/tap"
2122
"github.com/pkg/errors"
@@ -45,7 +46,6 @@ func Run(ctx context.Context) (err error) {
4546
logger = level.NewFilter(logger, level.AllowInfo())
4647
}
4748
logger = kitlog.With(logger, "ts", kitlog.DefaultTimestampUTC, "caller", kitlog.DefaultCaller)
48-
logger = level.Debug(logger) // by default, logger is debug only
4949
stdlog.SetOutput(kitlog.NewStdlibAdapter(logger))
5050

5151
// Root context to the application.
@@ -67,7 +67,19 @@ func Run(ctx context.Context) (err error) {
6767
return err
6868
}
6969

70-
err = tap.Run(ctx, logger, cfg)
70+
if cfg.Endpoint == "" {
71+
cfg.Endpoint = "https://api.incident.io"
72+
}
73+
cl, err := client.New(ctx, cfg.APIKey, cfg.Endpoint, Version())
74+
if err != nil {
75+
return err
76+
}
77+
78+
// Singer requires taps to output to STDOUT. We log to STDERR so the debug log output
79+
// can be streamed separately.
80+
ol := tap.NewOutputLogger(os.Stdout)
81+
82+
err = tap.Run(ctx, logger, ol, cl)
7183
if err != nil {
7284
return err
7385
}

go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,12 @@ require (
3838
github.com/mattn/go-colorable v0.1.13 // indirect
3939
github.com/mattn/go-isatty v0.0.17 // indirect
4040
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
41+
github.com/samber/lo v1.38.1 // indirect
4142
github.com/valyala/bytebufferpool v1.0.0 // indirect
4243
github.com/valyala/fasttemplate v1.2.2 // indirect
4344
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
4445
golang.org/x/crypto v0.12.0 // indirect
46+
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
4547
golang.org/x/mod v0.12.0 // indirect
4648
golang.org/x/net v0.14.0 // indirect
4749
golang.org/x/sys v0.12.0 // indirect

go.sum

+4
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
101101
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
102102
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
103103
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
104+
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
105+
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
104106
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
105107
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
106108
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
@@ -122,6 +124,8 @@ github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8
122124
github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU=
123125
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
124126
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
127+
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
128+
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
125129
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
126130
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
127131
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=

tap/output.go

+113
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package tap
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io"
7+
"strings"
8+
)
9+
10+
// OutputType is the type of Singer tap output for each message.
11+
type OutputType string
12+
13+
var (
14+
OutputTypeSchema OutputType = "SCHEMA"
15+
OutputTypeRecord OutputType = "RECORD"
16+
)
17+
18+
// Output is what we log to STDOUT as a message provided to the downstream Singer target.
19+
//
20+
// This tap supports type types of output:
21+
//
22+
// - SCHEMA: Specifies the schema of this stream in JSON schema format.
23+
// - RECORD: A record from the stream.
24+
//
25+
// We (currently) do not support the other types of output such as STATE.
26+
type Output struct {
27+
// Type is the type of the stream, e.g. "SCHEMA" or "RECORD"
28+
Type OutputType `json:"type,omitempty"`
29+
// Stream is the name of the stream, e.g. "users"
30+
Stream string `json:"stream,omitempty"`
31+
// Schema is the schema of the stream, if Type == "SCHEMA", in JSON schema format.
32+
Schema *Schema `json:"schema,omitempty"`
33+
// Record is a record from the stream, if Type == "RECORD".
34+
Record map[string]any `json:"record,omitempty"`
35+
// KeyProperties is a list of strings indicating which properties make up the primary
36+
// key for this stream.
37+
//
38+
// Each item in the list must be the name of a top-level property defined in the schema.
39+
// A value for key_properties must be provided, but it may be an empty list to indicate
40+
// that there is no primary key.
41+
KeyProperties []string `json:"key_properties,omitempty"`
42+
// BookmarkProperties is an optional list of strings indicating which properties
43+
// should be used to bookmark the stream, such as "last_updated_at".
44+
BookmarkProperties []string `json:"bookmark_properties,omitempty"`
45+
}
46+
47+
// Schema is a JSON schema for a stream.
48+
type Schema struct {
49+
// Type is the type of the schema, e.g. "object"
50+
Type []string `json:"type"`
51+
// HasAdditionalProperties indicates whether the schema allows additional properties
52+
// not defined in the schema.
53+
HasAdditionalProperties bool `json:"additionalProperties"`
54+
// Properties is a map of property names to their schema.
55+
Properties map[string]Property `json:"properties"`
56+
}
57+
58+
// Property is a property in a JSON schema.
59+
type Property struct {
60+
// Types is a list of types that this property can be, e.g. "string" or "integer".
61+
Types []string `json:"type"`
62+
// CustomFormat is a custom format for this property, e.g. "date-time".
63+
CustomFormat string `json:"format,omitempty"`
64+
}
65+
66+
func (s Property) IsBoolean() bool {
67+
return s.hasType("boolean")
68+
}
69+
70+
func (s Property) IsNumber() bool {
71+
return s.hasType("number")
72+
}
73+
74+
func (s Property) IsInteger() bool {
75+
return s.hasType("integer")
76+
}
77+
78+
func (s Property) hasType(typeName string) bool {
79+
for _, t := range s.Types {
80+
if strings.EqualFold(t, typeName) {
81+
return true
82+
}
83+
}
84+
return false
85+
}
86+
87+
func (s Property) IsDateTime() bool {
88+
return s.CustomFormat == "date-time"
89+
}
90+
91+
// OutputLogger is a logger that logs to STDOUT in the format expected by the downstream
92+
// Singer target.
93+
type OutputLogger struct {
94+
w io.Writer
95+
}
96+
97+
func NewOutputLogger(w io.Writer) *OutputLogger {
98+
return &OutputLogger{w: w}
99+
}
100+
101+
func (o *OutputLogger) Log(op *Output) error {
102+
data, err := json.Marshal(op)
103+
if err != nil {
104+
return err
105+
}
106+
107+
_, err = fmt.Fprintln(o.w, string(data))
108+
if err != nil {
109+
return err
110+
}
111+
112+
return nil
113+
}

tap/stream.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package tap
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
kitlog "github.com/go-kit/log"
8+
"github.com/incident-io/singer-tap/client"
9+
)
10+
11+
var streams = map[string]Stream{}
12+
13+
func register(s Stream) {
14+
op := s.Output()
15+
if _, ok := streams[op.Stream]; ok {
16+
panic(fmt.Sprintf("stream already registered: %s", op.Stream))
17+
}
18+
19+
streams[op.Stream] = s
20+
}
21+
22+
// Stream is a data model from the incident.io API that we want to represent as a Singer
23+
// tap stream.
24+
type Stream interface {
25+
// Output is the schema of the stream, in JSON schema format.
26+
Output() *Output
27+
// GetRecords returns a slice of entries in the stream. People will eventually ask for
28+
// this to be a channel, but we're going simple and loading everything for now.
29+
GetRecords(ctx context.Context, logger kitlog.Logger, cl *client.ClientWithResponses) ([]map[string]any, error)
30+
}

tap/stream_incidents.go

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package tap
2+
3+
import (
4+
"context"
5+
6+
kitlog "github.com/go-kit/log"
7+
"github.com/incident-io/singer-tap/client"
8+
"github.com/pkg/errors"
9+
"github.com/samber/lo"
10+
)
11+
12+
func init() {
13+
register(&StreamIncidents{})
14+
}
15+
16+
type StreamIncidents struct {
17+
}
18+
19+
func (s *StreamIncidents) Output() *Output {
20+
return &Output{
21+
Type: OutputTypeSchema,
22+
Stream: "incidents",
23+
Schema: &Schema{
24+
Type: []string{"object"},
25+
Properties: map[string]Property{
26+
"id": {
27+
Types: []string{"string"},
28+
},
29+
"name": {
30+
Types: []string{"string"},
31+
},
32+
},
33+
},
34+
KeyProperties: []string{"id"},
35+
BookmarkProperties: []string{},
36+
}
37+
}
38+
39+
func (s *StreamIncidents) GetRecords(ctx context.Context, logger kitlog.Logger, cl *client.ClientWithResponses) ([]map[string]any, error) {
40+
var (
41+
after *string
42+
pageSize = int64(250)
43+
results = []map[string]any{}
44+
)
45+
46+
for {
47+
logger.Log("msg", "loading page", "page_size", pageSize, "after", after)
48+
page, err := cl.IncidentsV2ListWithResponse(ctx, &client.IncidentsV2ListParams{
49+
PageSize: &pageSize,
50+
After: after,
51+
})
52+
if err != nil {
53+
return nil, errors.Wrap(err, "listing incidents")
54+
}
55+
56+
for _, element := range page.JSON200.Incidents {
57+
results = append(results, map[string]any{
58+
"id": element.Id,
59+
"name": element.Name,
60+
})
61+
}
62+
if count := len(page.JSON200.Incidents); count == 0 {
63+
return results, nil // end pagination
64+
} else {
65+
after = lo.ToPtr(page.JSON200.Incidents[count-1].Id)
66+
}
67+
}
68+
}

tap/tap.go

+30-2
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,39 @@ package tap
22

33
import (
44
"context"
5+
"time"
56

67
kitlog "github.com/go-kit/log"
7-
"github.com/incident-io/singer-tap/config"
8+
"github.com/incident-io/singer-tap/client"
89
)
910

10-
func Run(ctx context.Context, logger kitlog.Logger, cfg *config.Config) error {
11+
func Run(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client.ClientWithResponses) error {
12+
for name, stream := range streams {
13+
logger := kitlog.With(logger, "stream", name)
14+
15+
logger.Log("msg", "outputting schema")
16+
if err := ol.Log(stream.Output()); err != nil {
17+
return err
18+
}
19+
20+
start := time.Now()
21+
logger.Log("msg", "loading records", "start", start.Format(time.RFC3339))
22+
records, err := stream.GetRecords(ctx, logger, cl)
23+
if err != nil {
24+
return err
25+
}
26+
27+
logger.Log("msg", "outputting records", "count", len(records))
28+
for _, record := range records {
29+
op := &Output{
30+
Type: OutputTypeRecord,
31+
Record: record,
32+
}
33+
if err := ol.Log(op); err != nil {
34+
return err
35+
}
36+
}
37+
}
38+
1139
return nil
1240
}

0 commit comments

Comments
 (0)