diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index e9279d39fcc07..f035a23d9b42a 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -103,6 +103,7 @@ type Elasticsearch struct { ClusterStatsOnlyFromMaster bool `toml:"cluster_stats_only_from_master"` IndicesInclude []string `toml:"indices_include"` IndicesLevel string `toml:"indices_level"` + RemoteStoreStats bool `toml:"remote_store_stats"` NodeStats []string `toml:"node_stats"` Username string `toml:"username"` Password string `toml:"password"` @@ -248,6 +249,15 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { } } + if e.RemoteStoreStats { + // Here we use e.IndicesInclude; you might adjust if needed. + for _, indexName := range e.IndicesInclude { + if err := e.gatherRemoteStoreStats(s+"/_remotestore/stats/"+indexName, indexName, acc); err != nil { + acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))) + } + } + } + if e.ClusterStats && (e.serverInfo[s].isMaster() || !e.ClusterStatsOnlyFromMaster || !e.Local) { if err := e.gatherClusterStats(s+"/_cluster/stats", acc); err != nil { acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))) @@ -622,6 +632,80 @@ func (e *Elasticsearch) gatherSingleIndexStats(name string, index indexStat, now return nil } +func (e *Elasticsearch) gatherRemoteStoreStats(url string, indexName string, acc telegraf.Accumulator) error { + var remoteData map[string]interface{} + if err := e.gatherJSONData(url, &remoteData); err != nil { + return err + } + now := time.Now() + + if shards, ok := remoteData["_shards"].(map[string]interface{}); ok { + globalTags := map[string]string{"index_name": indexName} + acc.AddFields("elasticsearch_remotestore_global", shards, globalTags, now) + } + + indicesRaw, ok := remoteData["indices"].(map[string]interface{}) + if !ok { + return fmt.Errorf("remote store API response missing 'indices' field") + } + + idxRaw, exists := indicesRaw[indexName] + if !exists { + return fmt.Errorf("index %s not found in remote store stats", indexName) + } + + idxData, ok := idxRaw.(map[string]interface{}) + if !ok { + return fmt.Errorf("unexpected format for index %s data", indexName) + } + + shardsRaw, ok := idxData["shards"].(map[string]interface{}) + if !ok { + return fmt.Errorf("shards field missing or malformed for index %s", indexName) + } + + for shardID, shardEntries := range shardsRaw { + entries, ok := shardEntries.([]interface{}) + if !ok { + continue + } + // Process each shard entry (primary and replicas) + for _, entry := range entries { + f := jsonparser.JSONFlattener{} + if err := f.FullFlattenJSON("", entry, true, true); err != nil { + return err + } + + tags := map[string]string{ + "index_name": indexName, + "shard_id": shardID, + } + if entryMap, ok := entry.(map[string]interface{}); ok { + if routing, exists := entryMap["routing"].(map[string]interface{}); exists { + if state, ok := routing["state"].(string); ok { + tags["routing_state"] = state + } + if primary, ok := routing["primary"].(bool); ok { + if primary { + tags["shard_type"] = "primary" + } else { + tags["shard_type"] = "replica" + } + } + if node, ok := routing["node"].(string); ok { + tags["node_id"] = node + } + } + } + + delete(f.Fields, "routing") + acc.AddFields("elasticsearch_remotestore_stats_shards", f.Fields, tags, now) + } + } + + return nil +} + func (e *Elasticsearch) getCatMaster(url string) (string, error) { req, err := http.NewRequest("GET", url, nil) if err != nil { diff --git a/plugins/inputs/elasticsearch/elasticsearch_test.go b/plugins/inputs/elasticsearch/elasticsearch_test.go index 1ed61e731ce1f..b374d5c37ecff 100644 --- a/plugins/inputs/elasticsearch/elasticsearch_test.go +++ b/plugins/inputs/elasticsearch/elasticsearch_test.go @@ -1,13 +1,12 @@ package elasticsearch import ( + "github.com/stretchr/testify/require" "io" "net/http" "strings" "testing" - "github.com/stretchr/testify/require" - "github.com/influxdata/telegraf/testutil" ) @@ -352,6 +351,44 @@ func TestGatherClusterIndiceShardsStats(t *testing.T) { replicaTags) } +func TestGatherRemoteStoreStats(t *testing.T) { + es := newElasticsearchWithClient() + es.RemoteStoreStats = true + es.Servers = []string{"http://example.com:9200"} + es.IndicesInclude = []string{"remote-index"} + es.client.Transport = newTransportMock(remoteStoreResponse) + var acc testutil.Accumulator + url := "http://example.com:9200/_remotestore/stats/remote-index" + err := es.gatherRemoteStoreStats(url, "remote-index", &acc) + require.NoError(t, err) + + globalTags := map[string]string{"index_name": "remote-index"} + expectedGlobalFields := map[string]interface{}{ + "total": float64(4), + "successful": float64(4), + "failed": float64(0), + } + acc.AssertContainsTaggedFields(t, "elasticsearch_remotestore_global", expectedGlobalFields, globalTags) + + primaryTags := map[string]string{ + "index_name": "remote-index", + "shard_id": "0", + "routing_state": "STARTED", + "shard_type": "primary", + "node_id": "q1VxWZnCTICrfRc2bRW3nw", + } + acc.AssertContainsTaggedFields(t, "elasticsearch_remotestore_stats_shards", remoteStoreIndicesPrimaryShardsExpected, primaryTags) + + replicaTags := map[string]string{ + "index_name": "remote-index", + "shard_id": "1", + "routing_state": "STARTED", + "shard_type": "replica", + "node_id": "q1VxWZnCTICrfRc2bRW3nw", + } + acc.AssertContainsTaggedFields(t, "elasticsearch_remotestore_stats_shards", remoteStoreIndicesReplicaShardsExpected, replicaTags) +} + func newElasticsearchWithClient() *Elasticsearch { es := NewElasticsearch() es.client = &http.Client{} diff --git a/plugins/inputs/elasticsearch/sample.conf b/plugins/inputs/elasticsearch/sample.conf index 1c4bff938c173..2ddf17c91757f 100644 --- a/plugins/inputs/elasticsearch/sample.conf +++ b/plugins/inputs/elasticsearch/sample.conf @@ -61,3 +61,9 @@ ## the wildcard. Metrics then are gathered for only the ## 'num_most_recent_indices' amount of most recent indices. # num_most_recent_indices = 0 + + ## With the remote-backed store (s3) enabled feature. This will + ## allow the plugin to fetch the stats from the remote s3 store + ## when the feature is enabled for the cluster itself. + ## To work this require local = true + remote_store_stats = false \ No newline at end of file diff --git a/plugins/inputs/elasticsearch/testdata_test.go b/plugins/inputs/elasticsearch/testdata_test.go index 69224ea02f6b7..78db0be86e3cc 100644 --- a/plugins/inputs/elasticsearch/testdata_test.go +++ b/plugins/inputs/elasticsearch/testdata_test.go @@ -5653,6 +5653,211 @@ const clusterIndicesShardsResponse = ` } }` +const remoteStoreResponse = ` +{ + "_shards": { + "total": 4, + "successful": 4, + "failed": 0 + }, + "indices": { + "remote-index": { + "shards": { + "0": [{ + "routing": { + "state": "STARTED", + "primary": true, + "node": "q1VxWZnCTICrfRc2bRW3nw" + }, + "segment": { + "download": {}, + "upload": { + "local_refresh_timestamp_in_millis": 1694171634102, + "remote_refresh_timestamp_in_millis": 1694171634102, + "refresh_time_lag_in_millis": 0, + "refresh_lag": 0, + "bytes_lag": 0, + "backpressure_rejection_count": 0, + "consecutive_failure_count": 0, + "total_uploads": { + "started": 5, + "succeeded": 5, + "failed": 0 + }, + "total_upload_size": { + "started_bytes": 15342, + "succeeded_bytes": 15342, + "failed_bytes": 0 + }, + "remote_refresh_size_in_bytes": { + "last_successful": 0, + "moving_avg": 3068.4 + }, + "upload_speed_in_bytes_per_sec": { + "moving_avg": 99988.2 + }, + "remote_refresh_latency_in_millis": { + "moving_avg": 44.0 + } + } + }, + "translog": { + "upload": { + "last_successful_upload_timestamp": 1694171633644, + "total_uploads": { + "started": 6, + "failed": 0, + "succeeded": 6 + }, + "total_upload_size": { + "started_bytes": 1932, + "failed_bytes": 0, + "succeeded_bytes": 1932 + }, + "total_upload_time_in_millis": 21478, + "upload_size_in_bytes": { + "moving_avg": 322.0 + }, + "upload_speed_in_bytes_per_sec": { + "moving_avg": 2073.8333333333335 + }, + "upload_time_in_millis": { + "moving_avg": 3579.6666666666665 + } + }, + "download": {} + } + }, + { + "routing": { + "state": "STARTED", + "primary": false, + "node": "EZuen5Y5Sv-eDCLwh9gv-Q" + }, + "segment": { + "download": { + "last_sync_timestamp": 1694171634148, + "total_download_size": { + "started_bytes": 15112, + "succeeded_bytes": 15112, + "failed_bytes": 0 + }, + "download_size_in_bytes": { + "last_successful": 2910, + "moving_avg": 1259.3333333333333 + }, + "download_speed_in_bytes_per_sec": { + "moving_avg": 382387.3333333333 + } + }, + "upload": {} + }, + "translog": { + "upload": {}, + "download": {} + } + } + ], + "1": [{ + "routing": { + "state": "STARTED", + "primary": false, + "node": "q1VxWZnCTICrfRc2bRW3nw" + }, + "segment": { + "download": { + "last_sync_timestamp": 1694171633181, + "total_download_size": { + "started_bytes": 18978, + "succeeded_bytes": 18978, + "failed_bytes": 0 + }, + "download_size_in_bytes": { + "last_successful": 325, + "moving_avg": 1265.2 + }, + "download_speed_in_bytes_per_sec": { + "moving_avg": 456047.6666666667 + } + }, + "upload": {} + }, + "translog": { + "upload": {}, + "download": {} + } + }, + { + "routing": { + "state": "STARTED", + "primary": true, + "node": "EZuen5Y5Sv-eDCLwh9gv-Q" + }, + "segment": { + "download": {}, + "upload": { + "local_refresh_timestamp_in_millis": 1694171633122, + "remote_refresh_timestamp_in_millis": 1694171633122, + "refresh_time_lag_in_millis": 0, + "refresh_lag": 0, + "bytes_lag": 0, + "backpressure_rejection_count": 0, + "consecutive_failure_count": 0, + "total_uploads": { + "started": 6, + "succeeded": 6, + "failed": 0 + }, + "total_upload_size": { + "started_bytes": 19208, + "succeeded_bytes": 19208, + "failed_bytes": 0 + }, + "remote_refresh_size_in_bytes": { + "last_successful": 0, + "moving_avg": 3201.3333333333335 + }, + "upload_speed_in_bytes_per_sec": { + "moving_avg": 109612.0 + }, + "remote_refresh_latency_in_millis": { + "moving_avg": 25.333333333333332 + } + } + }, + "translog": { + "upload": { + "last_successful_upload_timestamp": 1694171633106, + "total_uploads": { + "started": 7, + "failed": 0, + "succeeded": 7 + }, + "total_upload_size": { + "started_bytes": 2405, + "failed_bytes": 0, + "succeeded_bytes": 2405 + }, + "total_upload_time_in_millis": 27748, + "upload_size_in_bytes": { + "moving_avg": 343.57142857142856 + }, + "upload_speed_in_bytes_per_sec": { + "moving_avg": 1445.857142857143 + }, + "upload_time_in_millis": { + "moving_avg": 3964.0 + } + }, + "download": {} + } + } + ] + } + } + } +}` + var clusterIndicesPrimaryShardsExpected = map[string]interface{}{ "commit_generation": float64(4), "commit_num_docs": float64(340), @@ -5848,3 +6053,50 @@ var clusterIndicesReplicaShardsExpected = map[string]interface{}{ "warmer_total": float64(3), "warmer_total_time_in_millis": float64(0), } + +var remoteStoreIndicesPrimaryShardsExpected = map[string]interface{}{ + "routing_state": "STARTED", + "routing_primary": true, + "segment_upload_remote_refresh_size_in_bytes_last_successful": float64(0), + "translog_upload_total_upload_size_started_bytes": float64(1932), + "segment_upload_refresh_time_lag_in_millis": float64(0), + "segment_upload_remote_refresh_timestamp_in_millis": float64(1694171634102), + "segment_upload_total_uploads_failed": float64(0), + "segment_upload_local_refresh_timestamp_in_millis": float64(1694171634102), + "translog_upload_total_upload_size_succeeded_bytes": float64(1932), + "segment_upload_remote_refresh_size_in_bytes_moving_avg": float64(3068.4), + "segment_upload_total_uploads_succeeded": float64(5), + "segment_upload_total_upload_size_succeeded_bytes": float64(15342), + "translog_upload_total_uploads_failed": float64(0), + "translog_upload_upload_size_in_bytes_moving_avg": float64(322), + "segment_upload_bytes_lag": float64(0), + "translog_upload_upload_time_in_millis_moving_avg": float64(3579.6666666666665), + "segment_upload_total_uploads_started": float64(5), + "translog_upload_total_uploads_succeeded": float64(6), + "translog_upload_total_uploads_started": float64(6), + "routing_node": "q1VxWZnCTICrfRc2bRW3nw", + "segment_upload_total_upload_size_failed_bytes": float64(0), + "segment_upload_backpressure_rejection_count": float64(0), + "segment_upload_consecutive_failure_count": float64(0), + "translog_upload_total_upload_time_in_millis": float64(21478), + "segment_upload_upload_speed_in_bytes_per_sec_moving_avg": float64(99988.2), + "translog_upload_upload_speed_in_bytes_per_sec_moving_avg": float64(2073.8333333333335), + "translog_upload_total_upload_size_failed_bytes": float64(0), + "segment_upload_total_upload_size_started_bytes": float64(15342), + "translog_upload_last_successful_upload_timestamp": float64(1694171633644), + "segment_upload_remote_refresh_latency_in_millis_moving_avg": float64(44), + "segment_upload_refresh_lag": float64(0), +} + +var remoteStoreIndicesReplicaShardsExpected = map[string]interface{}{ + "routing_state": "STARTED", + "routing_primary": false, + "routing_node": "q1VxWZnCTICrfRc2bRW3nw", + "segment_download_download_size_in_bytes_moving_avg": float64(1265.2), + "segment_download_last_sync_timestamp": float64(1694171633181), + "segment_download_total_download_size_started_bytes": float64(18978), + "segment_download_total_download_size_succeeded_bytes": float64(18978), + "segment_download_total_download_size_failed_bytes": float64(0), + "segment_download_download_size_in_bytes_last_successful": float64(325), + "segment_download_download_speed_in_bytes_per_sec_moving_avg": float64(456047.6666666667), +}