Skip to content

Commit f5a88a4

Browse files
authored
Merge pull request #5 from Versent/fix_capture_ts_before_iterators
fix(iterators) Remove difference between start time of iterators.
2 parents bbab2ca + 7bb6ee3 commit f5a88a4

File tree

6 files changed

+67
-38
lines changed

6 files changed

+67
-38
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ console.log
44
trace.out
55
dist
66
node_modules
7+
/bin

Makefile

+8-14
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,12 @@ GO ?= go
66

77
# Install all the build and lint dependencies
88
setup:
9-
go get -u github.com/alecthomas/gometalinter
10-
go get -u github.com/golang/dep/cmd/dep
11-
go get -u github.com/pierrre/gotestcover
12-
go get -u golang.org/x/tools/cmd/cover
13-
go get github.com/vektra/mockery/...
14-
gometalinter --install
15-
dep ensure
9+
@$(GO) get -u github.com/alecthomas/gometalinter
10+
@$(GO) get -u github.com/golang/dep/cmd/dep
11+
@$(GO) get -u github.com/axw/gocov/...
12+
@$(GO) get github.com/vektra/mockery/...
13+
@gometalinter --install
14+
@dep ensure
1615
.PHONY: setup
1716

1817
# Install from source.
@@ -23,17 +22,12 @@ install:
2322

2423
# Run all the tests
2524
test:
26-
@gotestcover $(TEST_OPTIONS) -covermode=atomic -coverprofile=coverage.txt $(SOURCE_FILES) -run $(TEST_PATTERN) -timeout=2m
25+
@gocov test -timeout=2m ./... | gocov report
2726
.PHONY: test
2827

29-
# Run all the tests and opens the coverage report
30-
cover: test
31-
@$(GO) tool cover -html=coverage.txt
32-
.PHONY: cover
33-
3428
# Run all the linters
3529
lint:
36-
gometalinter --vendor ./...
30+
gometalinter --deadline 300s --vendor ./...
3731
.PHONY: lint
3832

3933
# Run all the tests and code checks

cmd/kinesis-tail/main.go

+28-19
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/alecthomas/kingpin"
1414
"github.com/aws/aws-sdk-go/aws"
15+
"github.com/aws/aws-sdk-go/aws/credentials"
1516
"github.com/aws/aws-sdk-go/aws/session"
1617
"github.com/aws/aws-sdk-go/service/kinesis"
1718
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
@@ -30,6 +31,8 @@ var (
3031
tracing = kingpin.Flag("trace", "Enable trace mode.").Short('t').Bool()
3132
debug = kingpin.Flag("debug", "Enable debug logging.").Short('d').Bool()
3233
region = kingpin.Flag("region", "Configure the aws region.").Short('r').String()
34+
profile = kingpin.Flag("profile", "Configure the aws profile.").Short('p').String()
35+
timestamp = kingpin.Flag("timestamp", "Start time in epoch milliseconds.").Short('T').Int64()
3336
cwlogsCommand = kingpin.Command("cwlogs", "Process cloudwatch logs data from kinesis.")
3437
includes = cwlogsCommand.Flag("include", "Include anything in log group names which match the supplied string.").Strings()
3538
excludes = cwlogsCommand.Flag("exclude", "Exclude anything in log group names which match the supplied string.").Strings()
@@ -58,7 +61,6 @@ func main() {
5861
}
5962

6063
defer trace.Stop()
61-
6264
}
6365

6466
if *debug {
@@ -68,44 +70,35 @@ func main() {
6870
logger.SetLevel(logrus.DebugLevel)
6971
}
7072

71-
sess := session.Must(session.NewSession())
72-
73-
var svc kinesisiface.KinesisAPI
74-
75-
if *region == "" {
76-
svc = kinesis.New(sess)
77-
} else {
78-
// Create a Kinesis client with additional configuration
79-
svc = kinesis.New(sess, aws.NewConfig().WithRegion(*region))
80-
}
73+
svc := newKinesis(region, profile)
8174

8275
logger.Debug("built kinesis service")
8376

8477
switch subCommand {
8578
case "cwlogs":
86-
err := processLogData(svc, *cwlogsStream, *includes, *excludes)
79+
err := processLogData(svc, *cwlogsStream, *timestamp, *includes, *excludes)
8780
if err != nil {
8881
logger.WithError(err).Fatal("failed to process log data")
8982
}
9083
case "raw":
91-
err := processRawData(svc, *rawStream, *timeout, *count)
84+
err := processRawData(svc, *rawStream, *timeout, *timestamp, *count)
9285
if err != nil {
9386
logger.WithError(err).Fatal("failed to process log data")
9487
}
9588
}
9689

9790
}
9891

99-
func processLogData(svc kinesisiface.KinesisAPI, stream string, includes []string, excludes []string) error {
92+
func processLogData(svc kinesisiface.KinesisAPI, stream string, timestamp int64, includes []string, excludes []string) error {
10093

10194
helper := ktail.New(svc, logger)
10295

103-
iterators, err := helper.GetStreamIterators(stream)
96+
iterators, err := helper.GetStreamIterators(stream, timestamp)
10497
if err != nil {
10598
return errors.Wrap(err, "get iterators failed")
10699
}
107100

108-
kstream := streamer.New(svc, iterators, 5000)
101+
kstream := streamer.New(svc, iterators, 5000, logger)
109102
ch := kstream.StartGetRecords()
110103

111104
messageSorter := sorter.New(os.Stdout, len(iterators), formatLogsMsg)
@@ -136,16 +129,16 @@ func processLogData(svc kinesisiface.KinesisAPI, stream string, includes []strin
136129
return nil
137130
}
138131

139-
func processRawData(svc kinesisiface.KinesisAPI, stream string, timeout int64, count int) error {
132+
func processRawData(svc kinesisiface.KinesisAPI, stream string, timeout int64, timestamp int64, count int) error {
140133

141134
helper := ktail.New(svc, logger)
142135

143-
iterators, err := helper.GetStreamIterators(stream)
136+
iterators, err := helper.GetStreamIterators(stream, timestamp)
144137
if err != nil {
145138
return errors.Wrap(err, "get iterators failed")
146139
}
147140

148-
kstream := streamer.New(svc, iterators, 5000)
141+
kstream := streamer.New(svc, iterators, 5000, logger)
149142
ch := kstream.StartGetRecords()
150143

151144
messageSorter := sorter.New(os.Stdout, len(iterators), formatRawMsg)
@@ -217,3 +210,19 @@ func formatLogsMsg(wr io.Writer, msg *ktail.LogMessage) {
217210
logger.WithError(err).Fatal("failed to create trace file")
218211
}
219212
}
213+
214+
func newKinesis(region, profile *string) kinesisiface.KinesisAPI {
215+
sess := session.Must(session.NewSession())
216+
217+
cfg := aws.NewConfig()
218+
219+
if aws.StringValue(region) != "" {
220+
cfg = cfg.WithRegion(*region)
221+
}
222+
223+
if aws.StringValue(profile) != "" {
224+
cfg = cfg.WithCredentials(credentials.NewSharedCredentials("", *profile))
225+
}
226+
227+
return kinesis.New(sess, cfg)
228+
}

pkg/ktail/kinesis.go

+18-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package ktail
22

33
import (
4+
"time"
5+
46
"github.com/aws/aws-sdk-go/aws"
57
"github.com/aws/aws-sdk-go/service/kinesis"
68
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
@@ -28,7 +30,19 @@ func New(svc kinesisiface.KinesisAPI, logger *logrus.Logger) *KinesisHelper {
2830
}
2931

3032
// GetStreamIterators build a list of iterators for the stream
31-
func (kh *KinesisHelper) GetStreamIterators(streamName string) (map[string]*string, error) {
33+
func (kh *KinesisHelper) GetStreamIterators(streamName string, timestamp int64) (map[string]*string, error) {
34+
35+
var ts time.Time
36+
37+
if timestamp > 0 {
38+
tsec := timestamp / 1000
39+
tnano := (timestamp % 1000) * 1000
40+
ts = time.Unix(tsec, tnano)
41+
} else {
42+
ts = time.Now()
43+
}
44+
45+
kh.logger.WithField("ts", ts.Unix()).Info("starting stream")
3246

3347
respDesc, err := kh.svc.DescribeStream(&kinesis.DescribeStreamInput{
3448
StreamName: aws.String(streamName),
@@ -42,7 +56,7 @@ func (kh *KinesisHelper) GetStreamIterators(streamName string) (map[string]*stri
4256
iterators := map[string]*string{}
4357

4458
for _, shard := range respDesc.StreamDescription.Shards {
45-
go kh.asyncGetShardIterator(streamName, aws.StringValue(shard.ShardId), ch)
59+
go kh.asyncGetShardIterator(streamName, aws.StringValue(shard.ShardId), ts, ch)
4660
}
4761

4862
for range respDesc.StreamDescription.Shards {
@@ -54,13 +68,14 @@ func (kh *KinesisHelper) GetStreamIterators(streamName string) (map[string]*stri
5468
return iterators, nil
5569
}
5670

57-
func (kh *KinesisHelper) asyncGetShardIterator(streamName, shardID string, ch chan *iteratorResult) {
71+
func (kh *KinesisHelper) asyncGetShardIterator(streamName, shardID string, ts time.Time, ch chan *iteratorResult) {
5872
kh.logger.WithField("shard", shardID).Debug("get shard iterator")
5973

6074
respShard, err := kh.svc.GetShardIterator(&kinesis.GetShardIteratorInput{
6175
StreamName: aws.String(streamName),
6276
ShardIteratorType: aws.String(kinesis.ShardIteratorTypeLatest),
6377
ShardId: aws.String(shardID),
78+
Timestamp: aws.Time(ts),
6479
})
6580
if err != nil {
6681
kh.logger.WithError(err).Fatal("get shard iterator failed")

pkg/ktail/kinesis_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package ktail

pkg/streamer/kinesis.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"sync"
55
"time"
66

7+
"github.com/sirupsen/logrus"
8+
79
"github.com/aws/aws-sdk-go/service/kinesis"
810
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
911
"github.com/pkg/errors"
@@ -15,6 +17,7 @@ type KinesisStreamer struct {
1517
iterators map[string]*string
1618
iteratorMutex *sync.Mutex
1719
pollFreqMs int64
20+
logger *logrus.Logger
1821
}
1922

2023
// GetRecordsEntry returns the results of the last get records request
@@ -26,12 +29,13 @@ type GetRecordsEntry struct {
2629
}
2730

2831
// New return a new configured streamer
29-
func New(svc kinesisiface.KinesisAPI, iterators map[string]*string, pollFreqMs int64) *KinesisStreamer {
32+
func New(svc kinesisiface.KinesisAPI, iterators map[string]*string, pollFreqMs int64, logger *logrus.Logger) *KinesisStreamer {
3033
return &KinesisStreamer{
3134
svc: svc,
3235
iterators: iterators,
3336
pollFreqMs: pollFreqMs,
3437
iteratorMutex: &sync.Mutex{},
38+
logger: logger,
3539
}
3640
}
3741

@@ -53,14 +57,19 @@ func (ks *KinesisStreamer) asyncGetRecords(shard string, ch chan *GetRecordsEntr
5357

5458
for now := range c {
5559

60+
if ks.iterators[shard] == nil {
61+
ks.logger.Debug("nil iterator for shard as it is CLOSED: %s", shard)
62+
continue
63+
}
64+
5665
resp, err := ks.svc.GetRecords(&kinesis.GetRecordsInput{
5766
ShardIterator: ks.iterators[shard],
5867
})
5968
if err != nil {
6069
ch <- &GetRecordsEntry{Created: now, Shard: shard, Err: errors.Wrap(err, "get records failed")}
6170
}
6271

63-
//logger.WithField("iterator", aws.StringValue(iterator)).Info("get records shard")
72+
ks.logger.WithField("iterator", resp).Debug("get records shard")
6473

6574
ch <- &GetRecordsEntry{Created: now, Shard: shard, Records: resp.Records}
6675

0 commit comments

Comments
 (0)