Skip to content

Commit

Permalink
telegraf(elasticsearch): Ass stats for the Remote Backed API, when Op…
Browse files Browse the repository at this point in the history
…ensearch with remote store is enabled
  • Loading branch information
Mmuzaf committed Feb 4, 2025
1 parent d8adb1e commit 40aae45
Show file tree
Hide file tree
Showing 4 changed files with 381 additions and 2 deletions.
84 changes: 84 additions & 0 deletions plugins/inputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type Elasticsearch struct {
ClusterStatsOnlyFromMaster bool `toml:"cluster_stats_only_from_master"`
IndicesInclude []string `toml:"indices_include"`
IndicesLevel string `toml:"indices_level"`
RemoteStoreStats bool `toml:"remote_store_stats"`
NodeStats []string `toml:"node_stats"`
Username string `toml:"username"`
Password string `toml:"password"`
Expand Down Expand Up @@ -248,6 +249,15 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
}
}

if e.RemoteStoreStats {
// Here we use e.IndicesInclude; you might adjust if needed.
for _, indexName := range e.IndicesInclude {
if err := e.gatherRemoteStoreStats(s+"/_remotestore/stats/"+indexName, indexName, acc); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
}
}
}

if e.ClusterStats && (e.serverInfo[s].isMaster() || !e.ClusterStatsOnlyFromMaster || !e.Local) {
if err := e.gatherClusterStats(s+"/_cluster/stats", acc); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
Expand Down Expand Up @@ -622,6 +632,80 @@ func (e *Elasticsearch) gatherSingleIndexStats(name string, index indexStat, now
return nil
}

func (e *Elasticsearch) gatherRemoteStoreStats(url string, indexName string, acc telegraf.Accumulator) error {
var remoteData map[string]interface{}
if err := e.gatherJSONData(url, &remoteData); err != nil {
return err
}
now := time.Now()

if shards, ok := remoteData["_shards"].(map[string]interface{}); ok {
globalTags := map[string]string{"index_name": indexName}
acc.AddFields("elasticsearch_remotestore_global", shards, globalTags, now)
}

indicesRaw, ok := remoteData["indices"].(map[string]interface{})
if !ok {
return fmt.Errorf("remote store API response missing 'indices' field")
}

idxRaw, exists := indicesRaw[indexName]
if !exists {
return fmt.Errorf("index %s not found in remote store stats", indexName)
}

idxData, ok := idxRaw.(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected format for index %s data", indexName)
}

shardsRaw, ok := idxData["shards"].(map[string]interface{})
if !ok {
return fmt.Errorf("shards field missing or malformed for index %s", indexName)
}

for shardID, shardEntries := range shardsRaw {
entries, ok := shardEntries.([]interface{})
if !ok {
continue
}
// Process each shard entry (primary and replicas)
for _, entry := range entries {
f := jsonparser.JSONFlattener{}
if err := f.FullFlattenJSON("", entry, true, true); err != nil {
return err
}

tags := map[string]string{
"index_name": indexName,
"shard_id": shardID,
}
if entryMap, ok := entry.(map[string]interface{}); ok {
if routing, exists := entryMap["routing"].(map[string]interface{}); exists {
if state, ok := routing["state"].(string); ok {
tags["routing_state"] = state
}
if primary, ok := routing["primary"].(bool); ok {
if primary {
tags["shard_type"] = "primary"
} else {
tags["shard_type"] = "replica"
}
}
if node, ok := routing["node"].(string); ok {
tags["node_id"] = node
}
}
}

delete(f.Fields, "routing")
acc.AddFields("elasticsearch_remotestore_stats_shards", f.Fields, tags, now)
}
}

return nil
}

func (e *Elasticsearch) getCatMaster(url string) (string, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
Expand Down
41 changes: 39 additions & 2 deletions plugins/inputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package elasticsearch

import (
"github.com/stretchr/testify/require"
"io"
"net/http"
"strings"
"testing"

"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/testutil"
)

Expand Down Expand Up @@ -352,6 +351,44 @@ func TestGatherClusterIndiceShardsStats(t *testing.T) {
replicaTags)
}

func TestGatherRemoteStoreStats(t *testing.T) {
es := newElasticsearchWithClient()
es.RemoteStoreStats = true
es.Servers = []string{"http://example.com:9200"}
es.IndicesInclude = []string{"remote-index"}
es.client.Transport = newTransportMock(remoteStoreResponse)
var acc testutil.Accumulator
url := "http://example.com:9200/_remotestore/stats/remote-index"
err := es.gatherRemoteStoreStats(url, "remote-index", &acc)
require.NoError(t, err)

globalTags := map[string]string{"index_name": "remote-index"}
expectedGlobalFields := map[string]interface{}{
"total": float64(4),
"successful": float64(4),
"failed": float64(0),
}
acc.AssertContainsTaggedFields(t, "elasticsearch_remotestore_global", expectedGlobalFields, globalTags)

primaryTags := map[string]string{
"index_name": "remote-index",
"shard_id": "0",
"routing_state": "STARTED",
"shard_type": "primary",
"node_id": "q1VxWZnCTICrfRc2bRW3nw",
}
acc.AssertContainsTaggedFields(t, "elasticsearch_remotestore_stats_shards", remoteStoreIndicesPrimaryShardsExpected, primaryTags)

replicaTags := map[string]string{
"index_name": "remote-index",
"shard_id": "1",
"routing_state": "STARTED",
"shard_type": "replica",
"node_id": "q1VxWZnCTICrfRc2bRW3nw",
}
acc.AssertContainsTaggedFields(t, "elasticsearch_remotestore_stats_shards", remoteStoreIndicesReplicaShardsExpected, replicaTags)
}

func newElasticsearchWithClient() *Elasticsearch {
es := NewElasticsearch()
es.client = &http.Client{}
Expand Down
6 changes: 6 additions & 0 deletions plugins/inputs/elasticsearch/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,9 @@
## the wildcard. Metrics then are gathered for only the
## 'num_most_recent_indices' amount of most recent indices.
# num_most_recent_indices = 0

## With the remote-backed store (s3) enabled feature. This will
## allow the plugin to fetch the stats from the remote s3 store
## when the feature is enabled for the cluster itself.
## To work this require local = true
remote_store_stats = false
Loading

0 comments on commit 40aae45

Please sign in to comment.