Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump to v1.32.0 #144

Merged
merged 3 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions AIVEN_CHANGES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Reasons for this fork

## Input Plugins

### ClickHouse

* Add extra metrics to monitor the replication queue

### Elasticsearch

* add cross cluster replication metrics ( they dont work for elasticsearch but its a first step until we have an opensearch plugin )

### Aiven Procstat

* basically a clone of procstat containing incompatible changes that are likely not upstreamable
* needed a way to parse multiple unit files in invocation of `systemctl` for performance Reasons
* the way that telegraf provides ( globbing ) does not fit our systemd unit structure
* we need to check units inside of containers

### MySQL

* added aggregated IOPerf Stats ( probably upstreamable )

## Output Plugins

### Aiven Postgresql

* added postgresql output plugin from scratch to work with timescaledb ( probably upstreamable, although influxdata is not keen on supporting timescaledb as it seems )
* predates the upstream postgresql plugin and was subsequently moved to the aiven prefix

### Prometheus Client

* added incompatible metric name replacements ( not sure exactely why it was needed, but its now our api and we have to keep it )

## Serializers

### Prometheus and Prometheus Remote Write

* changes to make `Plugins.Prometheus Client` work for the same reasons as stated there
2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (a *Agent) testRunInputs(
// Run plugins that require multiple gathers to calculate rate
// and delta metrics twice.
switch input.Config.Name {
case "cpu", "mongodb", "procstat":
case "cpu", "mongodb", "procstat", "aiven-procstat":
nulAcc := NewAccumulator(input, nul)
nulAcc.SetPrecision(getPrecision(precision, interval))
if err := input.Input.Gather(nulAcc); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions plugins/inputs/aiven-procstat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Aiven Procstat Input Plugin

Was copied from the procstat input. Divergences:

* add 'systemd_units' configuration parameter. A list that specifies the units to fetch the pids from
* to that end it parses the output from `systemctl status` in one go instead of invoking `systemctl status [...]` for every unit
* it is not possible to use the globbing feature of the original `procstat` input for several reasons, one being that the tags are not expanded with the glob, the other is that the units we are targeting are not named glob friendly
9 changes: 9 additions & 0 deletions plugins/inputs/aiven-procstat/dev/telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[agent]
interval="1s"
flush_interval="1s"

[[inputs.procstat]]
exe = "telegraf"

[[outputs.file]]
files = ["stdout"]
96 changes: 96 additions & 0 deletions plugins/inputs/aiven-procstat/native_finder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package aiven_procstat

import (
"fmt"
"io/ioutil"
"regexp"
"strconv"
"strings"

"github.com/shirou/gopsutil/v3/process"
)

//NativeFinder uses gopsutil to find processes
type NativeFinder struct {
}

//NewNativeFinder ...
func NewNativeFinder() (PIDFinder, error) {
return &NativeFinder{}, nil
}

//Uid will return all pids for the given user
func (pg *NativeFinder) Uid(user string) ([]PID, error) {
var dst []PID
procs, err := process.Processes()
if err != nil {
return dst, err
}
for _, p := range procs {
username, err := p.Username()
if err != nil {
//skip, this can happen if we don't have permissions or
//the pid no longer exists
continue
}
if username == user {
dst = append(dst, PID(p.Pid))
}
}
return dst, nil
}

//PidFile returns the pid from the pid file given.
func (pg *NativeFinder) PidFile(path string) ([]PID, error) {
var pids []PID
pidString, err := ioutil.ReadFile(path)
if err != nil {
return pids, fmt.Errorf("Failed to read pidfile '%s'. Error: '%s'",
path, err)
}
pid, err := strconv.ParseInt(strings.TrimSpace(string(pidString)), 10, 32)
if err != nil {
return pids, err
}
pids = append(pids, PID(pid))
return pids, nil

}

//FullPattern matches on the command line when the process was executed
func (pg *NativeFinder) FullPattern(pattern string) ([]PID, error) {
var pids []PID
regxPattern, err := regexp.Compile(pattern)
if err != nil {
return pids, err
}
procs, err := pg.FastProcessList()
if err != nil {
return pids, err
}
for _, p := range procs {
cmd, err := p.Cmdline()
if err != nil {
//skip, this can be caused by the pid no longer existing
//or you having no permissions to access it
continue
}
if regxPattern.MatchString(cmd) {
pids = append(pids, PID(p.Pid))
}
}
return pids, err
}

func (pg *NativeFinder) FastProcessList() ([]*process.Process, error) {
pids, err := process.Pids()
if err != nil {
return nil, err
}

result := make([]*process.Process, len(pids))
for i, pid := range pids {
result[i] = &process.Process{Pid: pid}
}
return result, nil
}
32 changes: 32 additions & 0 deletions plugins/inputs/aiven-procstat/native_finder_notwindows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// +build !windows

package aiven_procstat

import (
"regexp"
)

//Pattern matches on the process name
func (pg *NativeFinder) Pattern(pattern string) ([]PID, error) {
var pids []PID
regxPattern, err := regexp.Compile(pattern)
if err != nil {
return pids, err
}
procs, err := pg.FastProcessList()
if err != nil {
return pids, err
}
for _, p := range procs {
name, err := p.Exe()
if err != nil {
//skip, this can be caused by the pid no longer existing
//or you having no permissions to access it
continue
}
if regxPattern.MatchString(name) {
pids = append(pids, PID(p.Pid))
}
}
return pids, err
}
29 changes: 29 additions & 0 deletions plugins/inputs/aiven-procstat/native_finder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package aiven_procstat

import (
"testing"

"github.com/stretchr/testify/require"
)

func BenchmarkPattern(b *testing.B) {
f, err := NewNativeFinder()
require.NoError(b, err)
for n := 0; n < b.N; n++ {
_, err := f.Pattern(".*")
if err != nil {
panic(err)
}
}
}

func BenchmarkFullPattern(b *testing.B) {
f, err := NewNativeFinder()
require.NoError(b, err)
for n := 0; n < b.N; n++ {
_, err := f.FullPattern(".*")
if err != nil {
panic(err)
}
}
}
30 changes: 30 additions & 0 deletions plugins/inputs/aiven-procstat/native_finder_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package aiven_procstat

import (
"regexp"
)

// Pattern matches on the process name
func (pg *NativeFinder) Pattern(pattern string) ([]PID, error) {
var pids []PID
regxPattern, err := regexp.Compile(pattern)
if err != nil {
return pids, err
}
procs, err := pg.FastProcessList()
if err != nil {
return pids, err
}
for _, p := range procs {
name, err := p.Name()
if err != nil {
//skip, this can be caused by the pid no longer existing
//or you having no permissions to access it
continue
}
if regxPattern.MatchString(name) {
pids = append(pids, PID(p.Pid))
}
}
return pids, err
}
49 changes: 49 additions & 0 deletions plugins/inputs/aiven-procstat/native_finder_windows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package aiven_procstat

import (
"fmt"
"testing"

"os/user"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestGather_RealPattern(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
pg, err := NewNativeFinder()
require.NoError(t, err)
pids, err := pg.Pattern(`procstat`)
require.NoError(t, err)
fmt.Println(pids)
assert.Equal(t, len(pids) > 0, true)
}

func TestGather_RealFullPattern(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
pg, err := NewNativeFinder()
require.NoError(t, err)
pids, err := pg.FullPattern(`%procstat%`)
require.NoError(t, err)
fmt.Println(pids)
assert.Equal(t, len(pids) > 0, true)
}

func TestGather_RealUser(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
user, err := user.Current()
require.NoError(t, err)
pg, err := NewNativeFinder()
require.NoError(t, err)
pids, err := pg.Uid(user.Username)
require.NoError(t, err)
fmt.Println(pids)
assert.Equal(t, len(pids) > 0, true)
}
90 changes: 90 additions & 0 deletions plugins/inputs/aiven-procstat/pgrep.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package aiven_procstat

import (
"fmt"
"io/ioutil"
"os/exec"
"strconv"
"strings"

"github.com/influxdata/telegraf/internal"
)

// Implementation of PIDGatherer that execs pgrep to find processes
type Pgrep struct {
path string
}

func NewPgrep() (PIDFinder, error) {
path, err := exec.LookPath("pgrep")
if err != nil {
return nil, fmt.Errorf("Could not find pgrep binary: %s", err)
}
return &Pgrep{path}, nil
}

func (pg *Pgrep) PidFile(path string) ([]PID, error) {
var pids []PID
pidString, err := ioutil.ReadFile(path)
if err != nil {
return pids, fmt.Errorf("Failed to read pidfile '%s'. Error: '%s'",
path, err)
}
pid, err := strconv.ParseInt(strings.TrimSpace(string(pidString)), 10, 32)
if err != nil {
return pids, err
}
pids = append(pids, PID(pid))
return pids, nil
}

func (pg *Pgrep) Pattern(pattern string) ([]PID, error) {
args := []string{pattern}
return find(pg.path, args)
}

func (pg *Pgrep) Uid(user string) ([]PID, error) {
args := []string{"-u", user}
return find(pg.path, args)
}

func (pg *Pgrep) FullPattern(pattern string) ([]PID, error) {
args := []string{"-f", pattern}
return find(pg.path, args)
}

func find(path string, args []string) ([]PID, error) {
out, err := run(path, args)
if err != nil {
return nil, err
}

return parseOutput(out)
}

func run(path string, args []string) (string, error) {
out, err := exec.Command(path, args...).Output()

//if exit code 1, ie no processes found, do not return error
if i, _ := internal.ExitStatus(err); i == 1 {
return "", nil
}

if err != nil {
return "", fmt.Errorf("Error running %s: %s", path, err)
}
return string(out), err
}

func parseOutput(out string) ([]PID, error) {
pids := []PID{}
fields := strings.Fields(out)
for _, field := range fields {
pid, err := strconv.ParseInt(field, 10, 32)
if err != nil {
return nil, err
}
pids = append(pids, PID(pid))
}
return pids, nil
}
Loading
Loading