Skip to content

Commit

Permalink
Add WithMessageProducer option (#284)
Browse files Browse the repository at this point in the history
* add message producer to control output

* add WithMessageProducer options

* join import groups

* add MessageProducer tests

* fix typo, remove go 1.7 checks
  • Loading branch information
tinogoehlert authored Apr 8, 2020
1 parent e49f804 commit 06f6482
Show file tree
Hide file tree
Showing 12 changed files with 264 additions and 115 deletions.
17 changes: 7 additions & 10 deletions logging/logrus/client_interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
Expand All @@ -19,7 +20,8 @@ func UnaryClientInterceptor(entry *logrus.Entry, opts ...Option) grpc.UnaryClien
fields := newClientLoggerFields(ctx, method)
startTime := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
logFinalClientLine(o, entry.WithFields(fields), startTime, err, "finished client unary call")
newCtx := ctxlogrus.ToContext(ctx, entry.WithFields(fields))
logFinalClientLine(newCtx, o, startTime, err, "finished client unary call")
return err
}
}
Expand All @@ -31,26 +33,21 @@ func StreamClientInterceptor(entry *logrus.Entry, opts ...Option) grpc.StreamCli
fields := newClientLoggerFields(ctx, method)
startTime := time.Now()
clientStream, err := streamer(ctx, desc, cc, method, opts...)
logFinalClientLine(o, entry.WithFields(fields), startTime, err, "finished client streaming call")
newCtx := ctxlogrus.ToContext(ctx, entry.WithFields(fields))
logFinalClientLine(newCtx, o, startTime, err, "finished client streaming call")
return clientStream, err
}
}

func logFinalClientLine(o *options, entry *logrus.Entry, startTime time.Time, err error, msg string) {
func logFinalClientLine(ctx context.Context, o *options, startTime time.Time, err error, msg string) {
code := o.codeFunc(err)
level := o.levelFunc(code)
durField, durVal := o.durationFunc(time.Now().Sub(startTime))
fields := logrus.Fields{
"grpc.code": code.String(),
durField: durVal,
}
if err != nil {
fields[logrus.ErrorKey] = err
}
levelLogf(
entry.WithFields(fields),
level,
msg)
o.messageFunc(ctx, msg, level, code, err, fields)
}

func newClientLoggerFields(ctx context.Context, fullMethodString string) logrus.Fields {
Expand Down
45 changes: 34 additions & 11 deletions logging/logrus/client_interceptors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package grpc_logrus_test

import (
"io"
"runtime"
"strings"
"testing"

"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -27,10 +25,6 @@ func customClientCodeToLevel(c codes.Code) logrus.Level {
}

func TestLogrusClientSuite(t *testing.T) {
if strings.HasPrefix(runtime.Version(), "go1.7") {
t.Skipf("Skipping due to json.RawMessage incompatibility with go1.7")
return
}
opts := []grpc_logrus.Option{
grpc_logrus.WithLevels(customClientCodeToLevel),
}
Expand Down Expand Up @@ -130,10 +124,6 @@ func (s *logrusClientSuite) TestPingError_WithCustomLevels() {
}

func TestLogrusClientOverrideSuite(t *testing.T) {
if strings.HasPrefix(runtime.Version(), "go1.7") {
t.Skip("Skipping due to json.RawMessage incompatibility with go1.7")
return
}
opts := []grpc_logrus.Option{
grpc_logrus.WithDurationField(grpc_logrus.DurationToDurationField),
}
Expand Down Expand Up @@ -187,3 +177,36 @@ func (s *logrusClientOverrideSuite) TestPingList_HasOverrides() {
assert.NotContains(s.T(), msgs[0], "grpc.time_ms", "message must not contain default duration")
assert.Contains(s.T(), msgs[0], "grpc.duration", "message must contain overridden duration")
}

func TestZapLoggingClientMessageProducerSuite(t *testing.T) {
opts := []grpc_logrus.Option{
grpc_logrus.WithMessageProducer(StubMessageProducer),
}
b := newLogrusBaseSuite(t)
b.logger.Level = logrus.DebugLevel // a lot of our stuff is on debug level by default
b.InterceptorTestSuite.ClientOpts = []grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_logrus.UnaryClientInterceptor(logrus.NewEntry(b.logger), opts...)),
grpc.WithStreamInterceptor(grpc_logrus.StreamClientInterceptor(logrus.NewEntry(b.logger), opts...)),
}
suite.Run(t, &logrusClientMessageProducerSuite{b})
}

type logrusClientMessageProducerSuite struct {
*logrusBaseSuite
}

func (s *logrusClientMessageProducerSuite) TestPing_HasOverriddenMessageProducer() {
_, err := s.Client.Ping(s.SimpleCtx(), goodPing)
assert.NoError(s.T(), err, "there must be not be an on a successful call")

msgs := s.getOutputJSONs()
require.Len(s.T(), msgs, 1, "one log statement should be logged")

assert.Equal(s.T(), msgs[0]["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain the correct service name")
assert.Equal(s.T(), msgs[0]["grpc.method"], "Ping", "all lines must contain the correct method name")
assert.Equal(s.T(), msgs[0]["msg"], "custom message", "handler's message must contain the correct message")
assert.Equal(s.T(), msgs[0]["span.kind"], "client", "all lines must contain the kind of call (client)")
assert.Equal(s.T(), msgs[0]["level"], "debug", "OK codes must be logged on debug level.")

assert.Contains(s.T(), msgs[0], "grpc.time_ms", "interceptor log statement should contain execution time (duration in ms)")
}
38 changes: 37 additions & 1 deletion logging/logrus/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
package grpc_logrus

import (
"context"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/logging"
grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
)
Expand All @@ -17,6 +19,7 @@ var (
shouldLog: grpc_logging.DefaultDeciderMethod,
codeFunc: grpc_logging.DefaultErrorToCode,
durationFunc: DefaultDurationToField,
messageFunc: DefaultMessageProducer,
}
)

Expand All @@ -25,6 +28,7 @@ type options struct {
shouldLog grpc_logging.Decider
codeFunc grpc_logging.ErrorToCode
durationFunc DurationToField
messageFunc MessageProducer
}

func evaluateServerOpt(opts []Option) *options {
Expand Down Expand Up @@ -83,6 +87,13 @@ func WithDurationField(f DurationToField) Option {
}
}

// WithMessageProducer customizes the function for message formation.
func WithMessageProducer(f MessageProducer) Option {
return func(o *options) {
o.messageFunc = f
}
}

// DefaultCodeToLevel is the default implementation of gRPC return codes to log levels for server side.
func DefaultCodeToLevel(code codes.Code) logrus.Level {
switch code {
Expand Down Expand Up @@ -183,3 +194,28 @@ func DurationToDurationField(duration time.Duration) (key string, value interfac
func durationToMilliseconds(duration time.Duration) float32 {
return float32(duration.Nanoseconds()/1000) / 1000
}

// MessageProducer produces a user defined log message
type MessageProducer func(ctx context.Context, format string, level logrus.Level, code codes.Code, err error, fields logrus.Fields)

// DefaultMessageProducer writes the default message
func DefaultMessageProducer(ctx context.Context, format string, level logrus.Level, code codes.Code, err error, fields logrus.Fields) {
if err != nil {
fields[logrus.ErrorKey] = err
}
entry := ctxlogrus.Extract(ctx).WithContext(ctx).WithFields(fields)
switch level {
case logrus.DebugLevel:
entry.Debugf(format)
case logrus.InfoLevel:
entry.Infof(format)
case logrus.WarnLevel:
entry.Warningf(format)
case logrus.ErrorLevel:
entry.Errorf(format)
case logrus.FatalLevel:
entry.Fatalf(format)
case logrus.PanicLevel:
entry.Panicf(format)
}
}
36 changes: 3 additions & 33 deletions logging/logrus/server_interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"path"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
Expand Down Expand Up @@ -44,12 +44,7 @@ func UnaryServerInterceptor(entry *logrus.Entry, opts ...Option) grpc.UnaryServe
fields[logrus.ErrorKey] = err
}

levelLogf(
// re-extract logger from newCtx, as it may have extra fields that changed in the holder.
ctxlogrus.Extract(newCtx).WithContext(newCtx).WithFields(fields),
level,
"finished unary call with code "+code.String())

o.messageFunc(newCtx, "finished unary call with code "+code.String(), level, code, err, fields)
return resp, err
}
}
Expand All @@ -75,37 +70,12 @@ func StreamServerInterceptor(entry *logrus.Entry, opts ...Option) grpc.StreamSer
"grpc.code": code.String(),
durField: durVal,
}
if err != nil {
fields[logrus.ErrorKey] = err
}

levelLogf(
// re-extract logger from newCtx, as it may have extra fields that changed in the holder.
ctxlogrus.Extract(newCtx).WithContext(newCtx).WithFields(fields),
level,
"finished streaming call with code "+code.String())

o.messageFunc(newCtx, "finished streaming call with code "+code.String(), level, code, err, fields)
return err
}
}

func levelLogf(entry *logrus.Entry, level logrus.Level, format string, args ...interface{}) {
switch level {
case logrus.DebugLevel:
entry.Debugf(format, args...)
case logrus.InfoLevel:
entry.Infof(format, args...)
case logrus.WarnLevel:
entry.Warningf(format, args...)
case logrus.ErrorLevel:
entry.Errorf(format, args...)
case logrus.FatalLevel:
entry.Fatalf(format, args...)
case logrus.PanicLevel:
entry.Panicf(format, args...)
}
}

func newLoggerForCall(ctx context.Context, entry *logrus.Entry, fullMethodString string, start time.Time) context.Context {
service := path.Dir(fullMethodString)[1:]
method := path.Base(fullMethodString)
Expand Down
53 changes: 36 additions & 17 deletions logging/logrus/server_interceptors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package grpc_logrus_test

import (
"io"
"runtime"
"strings"
"testing"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
"github.com/grpc-ecosystem/go-grpc-middleware/tags"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/testing/testproto"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
Expand All @@ -20,10 +18,6 @@ import (
)

func TestLogrusServerSuite(t *testing.T) {
if strings.HasPrefix(runtime.Version(), "go1.7") {
t.Skipf("Skipping due to json.RawMessage incompatibility with go1.7")
return
}
opts := []grpc_logrus.Option{
grpc_logrus.WithLevels(customCodeToLevel),
}
Expand Down Expand Up @@ -167,10 +161,6 @@ func (s *logrusServerSuite) TestPingList_WithCustomTags() {
}

func TestLogrusServerOverrideSuite(t *testing.T) {
if strings.HasPrefix(runtime.Version(), "go1.7") {
t.Skip("Skipping due to json.RawMessage incompatibility with go1.7")
return
}
opts := []grpc_logrus.Option{
grpc_logrus.WithDurationField(grpc_logrus.DurationToDurationField),
}
Expand Down Expand Up @@ -240,10 +230,6 @@ func (s *logrusServerOverrideSuite) TestPingList_HasOverriddenDuration() {
}

func TestLogrusServerOverrideDeciderSuite(t *testing.T) {
if strings.HasPrefix(runtime.Version(), "go1.7") {
t.Skip("Skipping due to json.RawMessage incompatibility with go1.7")
return
}
opts := []grpc_logrus.Option{
grpc_logrus.WithDecider(func(method string, err error) bool {
if err != nil && method == "/mwitkow.testproto.TestService/PingError" {
Expand Down Expand Up @@ -320,3 +306,36 @@ func (s *logrusServerOverrideDeciderSuite) TestPingList_HasOverriddenDecider() {
assert.NotContains(s.T(), msgs[0], "grpc.time_ms", "handler's message must not contain default duration")
assert.NotContains(s.T(), msgs[0], "grpc.duration", "handler's message must not contain overridden duration")
}

func TestLogrusServerMessageProducerSuite(t *testing.T) {
opts := []grpc_logrus.Option{
grpc_logrus.WithMessageProducer(StubMessageProducer),
}
b := newLogrusBaseSuite(t)
b.InterceptorTestSuite.ServerOpts = []grpc.ServerOption{
grpc_middleware.WithStreamServerChain(
grpc_ctxtags.StreamServerInterceptor(),
grpc_logrus.StreamServerInterceptor(logrus.NewEntry(b.logger), opts...)),
grpc_middleware.WithUnaryServerChain(
grpc_ctxtags.UnaryServerInterceptor(),
grpc_logrus.UnaryServerInterceptor(logrus.NewEntry(b.logger), opts...)),
}
suite.Run(t, &logrusServerMessageProducerSuite{b})
}

type logrusServerMessageProducerSuite struct {
*logrusBaseSuite
}

func (s *logrusServerMessageProducerSuite) TestPing_HasMessageProducer() {
_, err := s.Client.Ping(s.SimpleCtx(), goodPing)
require.NoError(s.T(), err, "there must be not be an error on a successful call")

msgs := s.getOutputJSONs()
require.Len(s.T(), msgs, 2, "single log statements should be logged")
assert.Equal(s.T(), msgs[0]["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain service name")
assert.Equal(s.T(), msgs[0]["grpc.method"], "Ping", "all lines must contain method name")
assert.Equal(s.T(), msgs[1]["msg"], "custom message", "user defined message producer must be used")

assert.Equal(s.T(), msgs[0]["msg"], "some ping", "handler's message must contain user message")
}
28 changes: 25 additions & 3 deletions logging/logrus/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"io"
"testing"

"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/grpc-ecosystem/go-grpc-middleware/tags"
"github.com/grpc-ecosystem/go-grpc-middleware/testing"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
grpc_testing "github.com/grpc-ecosystem/go-grpc-middleware/testing"
pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/testing/testproto"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -103,3 +103,25 @@ func (s *logrusBaseSuite) getOutputJSONs() []map[string]interface{} {

return ret
}

func StubMessageProducer(ctx context.Context, format string, level logrus.Level, code codes.Code, err error, fields logrus.Fields) {
if err != nil {
fields[logrus.ErrorKey] = err
}
format = "custom message"
entry := ctxlogrus.Extract(ctx).WithContext(ctx).WithFields(fields)
switch level {
case logrus.DebugLevel:
entry.Debugf(format)
case logrus.InfoLevel:
entry.Infof(format)
case logrus.WarnLevel:
entry.Warningf(format)
case logrus.ErrorLevel:
entry.Errorf(format)
case logrus.FatalLevel:
entry.Fatalf(format)
case logrus.PanicLevel:
entry.Panicf(format)
}
}
Loading

0 comments on commit 06f6482

Please sign in to comment.