Skip to content
This repository has been archived by the owner on Dec 1, 2018. It is now read-only.

add cluster_name to influxdb sink #1635

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type InfluxdbConfig struct {
WithFields bool
InsecureSsl bool
RetentionPolicy string
ClusterName string
}

func NewClient(c InfluxdbConfig) (InfluxdbClient, error) {
Expand Down Expand Up @@ -79,6 +80,7 @@ func BuildConfig(uri *url.URL) (*InfluxdbConfig, error) {
WithFields: false,
InsecureSsl: false,
RetentionPolicy: "0",
ClusterName: "default",
}

if len(uri.Host) > 0 {
Expand Down Expand Up @@ -121,5 +123,9 @@ func BuildConfig(uri *url.URL) (*InfluxdbConfig, error) {
config.InsecureSsl = val
}

if len(opts["cluster_name"]) >= 1 {
config.ClusterName = opts["cluster_name"][0]
}

return &config, nil
}
1 change: 1 addition & 0 deletions docs/sink-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ The following options are available:
* `secure` - Connect securely to InfluxDB (default: `false`)
* `insecuressl` - Ignore SSL certificate validity (default: `false`)
* `withfields` - Use [InfluxDB fields](storage-schema.md#using-fields) (default: `false`)
* `cluster_name` - cluster name for different Kubernetes clusters. (default: `default`)

### Google Cloud Monitoring
This sink supports monitoring metrics only.
Expand Down
3 changes: 3 additions & 0 deletions events/sinks/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func (sink *influxdbSink) ExportEvents(eventBatch *core.EventBatch) {
if err != nil {
glog.Warningf("Failed to convert event to point: %v", err)
}

point.Tags["cluster_name"] = sink.c.ClusterName

dataPoints = append(dataPoints, *point)
if len(dataPoints) >= maxSendBatchSize {
sink.sendData(dataPoints)
Expand Down
5 changes: 5 additions & 0 deletions metrics/sinks/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ func (sink *influxdbSink) ExportData(dataBatch *core.DataBatch) {
point.Tags[key] = value
}
}

point.Tags["cluster_name"] = sink.c.ClusterName

dataPoints = append(dataPoints, point)
if len(dataPoints) >= maxSendBatchSize {
sink.sendData(dataPoints)
Expand Down Expand Up @@ -131,6 +134,7 @@ func (sink *influxdbSink) ExportData(dataBatch *core.DataBatch) {
},
Time: dataBatch.Timestamp.UTC(),
}

for key, value := range metricSet.Labels {
if value != "" {
point.Tags[key] = value
Expand All @@ -141,6 +145,7 @@ func (sink *influxdbSink) ExportData(dataBatch *core.DataBatch) {
point.Tags[key] = value
}
}
point.Tags["cluster_name"] = sink.c.ClusterName

dataPoints = append(dataPoints, point)
if len(dataPoints) >= maxSendBatchSize {
Expand Down