Skip to content

Commit bb7c652

Browse files
authored
Log rewrite and refactor (#594)
* revise sentry report * revise sentry report * bind sentry and log * fix log issue * struct logs! * remove useless code * rewrite todo context
1 parent 3687d4a commit bb7c652

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

112 files changed

+946
-784
lines changed

client/clientpool.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func NewCoreRPCClientPool(ctx context.Context, config *PoolConfig) (*Pool, error
4242
rpc, err = NewClient(ctx, addr, config.Auth)
4343
})
4444
if err != nil {
45-
log.Errorf(ctx, err, "[NewCoreRPCClientPool] connect to %s failed", addr)
45+
log.WithFunc("client.NewCoreRPCClientPool").Errorf(ctx, err, "connect to %s failed", addr)
4646
continue
4747
}
4848
rpcClient := rpc.GetRPCClient()
@@ -94,11 +94,12 @@ func checkAlive(ctx context.Context, rpc *clientWithStatus, timeout time.Duratio
9494
utils.WithTimeout(ctx, timeout, func(ctx context.Context) {
9595
_, err = rpc.client.Info(ctx, &pb.Empty{})
9696
})
97+
logger := log.WithFunc("client.checkAlive")
9798
if err != nil {
98-
log.Errorf(ctx, err, "[ClientPool] connect to %s failed", rpc.addr)
99+
logger.Errorf(ctx, err, "connect to %s failed", rpc.addr)
99100
return false
100101
}
101-
log.Debugf(ctx, "[ClientPool] connect to %s success", rpc.addr)
102+
logger.Debugf(ctx, "connect to %s success", rpc.addr)
102103
return true
103104
}
104105

client/interceptor/retry.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package interceptor
22

33
import (
44
"context"
5-
"strings"
65

6+
"github.com/cockroachdb/errors"
77
"github.com/projecteru2/core/log"
88

99
"github.com/cenkalti/backoff/v4"
@@ -32,7 +32,8 @@ func NewStreamRetry(retryOpts RetryOptions) grpc.StreamClientInterceptor {
3232
if _, ok := RPCNeedRetry[method]; !ok {
3333
return stream, err
3434
}
35-
log.Debugf(ctx, "[NewStreamRetry] return retryStream for method %s", method)
35+
logger := log.WithFunc("client.NewStreamRetry")
36+
logger.Debugf(ctx, "return retryStream for method %s", method)
3637
return &retryStream{
3738
ctx: ctx,
3839
ClientStream: stream,
@@ -52,12 +53,13 @@ func (s *retryStream) SendMsg(m interface{}) error {
5253
}
5354

5455
func (s *retryStream) RecvMsg(m interface{}) (err error) {
55-
if err = s.ClientStream.RecvMsg(m); err == nil || strings.Contains(err.Error(), "context canceled") {
56+
if err = s.ClientStream.RecvMsg(m); err == nil || errors.Is(err, context.Canceled) {
5657
return
5758
}
59+
logger := log.WithFunc("client.RecvMsg")
5860

5961
return backoff.Retry(func() error {
60-
log.Debug(s.ctx, "[retryStream] retry on new stream")
62+
logger.Debug(s.ctx, "retry on new stream")
6163
stream, err := s.newStream()
6264
if err != nil {
6365
// even io.EOF triggers retry, and it's what we want!

client/resolver/eru/builder.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package eru
22

3-
import "google.golang.org/grpc/resolver"
3+
import (
4+
"google.golang.org/grpc/resolver"
5+
)
46

57
type eruResolverBuilder struct{}
68

client/resolver/eru/resolver.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func New(cc resolver.ClientConn, endpoint string, authority string) *Resolver {
3131
discovery: servicediscovery.New(endpoint, authConfig),
3232
}
3333
cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: endpoint}}}) //nolint
34-
go r.sync()
34+
go r.sync(context.TODO())
3535
return r
3636
}
3737

@@ -43,25 +43,25 @@ func (r *Resolver) Close() {
4343
r.cancel()
4444
}
4545

46-
func (r *Resolver) sync() {
47-
ctx := context.TODO()
46+
func (r *Resolver) sync(ctx context.Context) {
4847
ctx, r.cancel = context.WithCancel(ctx)
4948
defer r.cancel()
50-
log.Debug(ctx, "[EruResolver] start sync service discovery")
49+
logger := log.WithFunc("resolver.sync")
50+
logger.Debug(ctx, "start sync service discovery")
5151

5252
ch, err := r.discovery.Watch(ctx)
5353
if err != nil {
54-
log.Error(ctx, err, "[EruResolver] failed to watch service status")
54+
logger.Error(ctx, err, "failed to watch service status")
5555
return
5656
}
5757
for {
5858
select {
5959
case <-ctx.Done():
60-
log.Error(ctx, ctx.Err(), "[EruResolver] watch interrupted")
60+
logger.Error(ctx, ctx.Err(), "watch interrupted")
6161
return
6262
case endpoints, ok := <-ch:
6363
if !ok {
64-
log.Info(ctx, nil, "[EruResolver] watch closed")
64+
logger.Info(ctx, nil, "watch closed")
6565
return
6666
}
6767

client/servicediscovery/eru_service_discovery.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ func New(endpoint string, authConfig types.AuthConfig) *EruServiceDiscovery {
3333
// Watch .
3434
func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err error) {
3535
cc, err := w.dial(ctx, w.endpoint, w.authConfig)
36+
logger := log.WithFunc("servicediscovery.Watch").WithField("endpoint", w.endpoint)
3637
if err != nil {
37-
log.Error(ctx, err, "[EruServiceWatch] dial failed")
38+
logger.Error(ctx, err, "dial failed")
3839
return
3940
}
4041
client := pb.NewCoreRPCClient(cc)
@@ -48,8 +49,8 @@ func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err
4849
watchCtx, cancelWatch := context.WithCancel(ctx)
4950
stream, err := client.WatchServiceStatus(watchCtx, &pb.Empty{})
5051
if err != nil {
51-
log.Error(ctx, err, "[EruServiceWatch] watch failed, try later")
52-
time.Sleep(10 * time.Second)
52+
logger.Error(ctx, err, "watch failed, try later")
53+
time.Sleep(10 * time.Second) // TODO can config
5354
continue
5455
}
5556
expectedInterval := time.Duration(math.MaxInt64) / time.Second
@@ -69,7 +70,7 @@ func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err
6970
status, err := stream.Recv()
7071
close(cancelTimer)
7172
if err != nil {
72-
log.Error(ctx, err, "[EruServiceWatch] recv failed")
73+
logger.Error(ctx, err, "recv failed")
7374
break
7475
}
7576
expectedInterval = time.Duration(status.GetIntervalInSecond())

client/utils/servicepusher.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -44,20 +44,20 @@ func (p *EndpointPusher) Push(ctx context.Context, endpoints []string) {
4444
func (p *EndpointPusher) delOutdated(ctx context.Context, endpoints []string) {
4545
p.Lock()
4646
defer p.Unlock()
47-
47+
logger := log.WithFunc("utils.EndpointPusher.delOutdated")
4848
p.pendingEndpoints.Range(func(endpoint string, cancel context.CancelFunc) bool {
4949
if !slices.Contains(endpoints, endpoint) {
5050
cancel()
5151
p.pendingEndpoints.Del(endpoint)
52-
log.Debugf(ctx, "[EruResolver] pending endpoint deleted: %s", endpoint)
52+
logger.Debugf(ctx, "pending endpoint deleted: %s", endpoint)
5353
}
5454
return true
5555
})
5656

5757
p.availableEndpoints.Range(func(endpoint string, _ struct{}) bool {
5858
if !slices.Contains(endpoints, endpoint) {
5959
p.availableEndpoints.Del(endpoint)
60-
log.Debugf(ctx, "[EruResolver] available endpoint deleted: %s", endpoint)
60+
logger.Debugf(ctx, "available endpoint deleted: %s", endpoint)
6161
}
6262
return true
6363
})
@@ -75,14 +75,15 @@ func (p *EndpointPusher) addCheck(ctx context.Context, endpoints []string) {
7575
ctx, cancel := context.WithCancel(ctx)
7676
p.pendingEndpoints.Set(endpoint, cancel)
7777
go p.pollReachability(ctx, endpoint)
78-
log.Debugf(ctx, "[EruResolver] pending endpoint added: %s", endpoint)
78+
log.WithFunc("utils.EndpointPusher.addCheck").Debugf(ctx, "pending endpoint added: %s", endpoint)
7979
}
8080
}
8181

8282
func (p *EndpointPusher) pollReachability(ctx context.Context, endpoint string) {
83+
logger := log.WithFunc("utils.EndpointPusher.pollReachability")
8384
parts := strings.Split(endpoint, ":")
8485
if len(parts) != 2 {
85-
log.Errorf(ctx, types.ErrInvaildCoreEndpointType, "[EruResolver] wrong format of endpoint: %s", endpoint)
86+
logger.Errorf(ctx, types.ErrInvaildCoreEndpointType, "wrong format of endpoint: %s", endpoint)
8687
return
8788
}
8889

@@ -91,7 +92,7 @@ func (p *EndpointPusher) pollReachability(ctx context.Context, endpoint string)
9192
for {
9293
select {
9394
case <-ctx.Done():
94-
log.Debugf(ctx, "[EruResolver] reachability goroutine ends: %s", endpoint)
95+
logger.Debugf(ctx, "reachability goroutine ends: %s", endpoint)
9596
return
9697
case <-ticker.C:
9798
p.Lock()
@@ -102,7 +103,7 @@ func (p *EndpointPusher) pollReachability(ctx context.Context, endpoint string)
102103
p.pendingEndpoints.Del(endpoint)
103104
p.availableEndpoints.Set(endpoint, struct{}{})
104105
p.pushEndpoints()
105-
log.Debugf(ctx, "[EruResolver] available endpoint added: %s", endpoint)
106+
logger.Debugf(ctx, "available endpoint added: %s", endpoint)
106107
return
107108
}
108109
}
@@ -111,7 +112,7 @@ func (p *EndpointPusher) pollReachability(ctx context.Context, endpoint string)
111112
func (p *EndpointPusher) checkReachability(ctx context.Context, host string) (err error) {
112113
pinger, err := ping.NewPinger(host)
113114
if err != nil {
114-
log.Error(ctx, err, "[EruResolver] failed to create pinger")
115+
log.WithFunc("utils.EndpointPusher.checkReachability").Error(ctx, err, "failed to create pinger")
115116
return
116117
}
117118
pinger.SetPrivileged(os.Getuid() == 0)

cluster/calcium/build.go

+18-18
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818

1919
// BuildImage will build image
2020
func (c *Calcium) BuildImage(ctx context.Context, opts *types.BuildOptions) (ch chan *types.BuildImageMessage, err error) {
21-
logger := log.WithField("Calcium", "BuildImage").WithField("opts", opts)
21+
logger := log.WithFunc("calcium.BuildImage").WithField("opts", opts)
2222
// Disable build API if scm not set
2323
if c.source == nil {
2424
return nil, types.ErrNoSCMSetting
@@ -30,7 +30,7 @@ func (c *Calcium) BuildImage(ctx context.Context, opts *types.BuildOptions) (ch
3030
return nil, err
3131
}
3232

33-
log.Infof(ctx, "[BuildImage] Building image at pod %s node %s", node.Podname, node.Name)
33+
logger.Infof(ctx, "Building image at pod %s node %s", node.Podname, node.Name)
3434

3535
var (
3636
refs []string
@@ -131,8 +131,8 @@ func (c *Calcium) buildFromExist(ctx context.Context, opts *types.BuildOptions)
131131
}
132132

133133
func (c *Calcium) pushImageAndClean(ctx context.Context, resp io.ReadCloser, node *types.Node, tags []string) (chan *types.BuildImageMessage, error) { //nolint:unparam
134-
logger := log.WithField("Calcium", "pushImage").WithField("node", node).WithField("tags", tags)
135-
log.Infof(ctx, "[BuildImage] Pushing image at pod %s node %s", node.Podname, node.Name)
134+
logger := log.WithFunc("calcium.pushImageAndClean").WithField("node", node).WithField("tags", tags)
135+
logger.Infof(ctx, "Pushing image at pod %s node %s", node.Podname, node.Name)
136136
return c.withImageBuiltChannel(func(ch chan *types.BuildImageMessage) {
137137
defer resp.Close()
138138
decoder := json.NewDecoder(resp)
@@ -144,29 +144,29 @@ func (c *Calcium) pushImageAndClean(ctx context.Context, resp io.ReadCloser, nod
144144
break
145145
}
146146
if err == context.Canceled || err == context.DeadlineExceeded {
147-
log.Error(ctx, err, "[BuildImage] context timeout")
147+
logger.Error(ctx, err, "context timeout")
148148
lastMessage.ErrorDetail.Code = -1
149149
lastMessage.ErrorDetail.Message = err.Error()
150150
lastMessage.Error = err.Error()
151151
break
152152
}
153153
malformed, _ := io.ReadAll(decoder.Buffered()) // TODO err check
154-
logger.Errorf(ctx, err, "[BuildImage] Decode build image message failed, buffered: %+v", malformed)
154+
logger.Errorf(ctx, err, "Decode build image message failed, buffered: %+v", malformed)
155155
return
156156
}
157157
ch <- message
158158
lastMessage = message
159159
}
160160

161161
if lastMessage.Error != "" {
162-
logger.Errorf(ctx, errors.New(lastMessage.Error), "[BuildImage] Build image failed %+v", lastMessage.ErrorDetail.Message)
162+
logger.Errorf(ctx, errors.New(lastMessage.Error), "Build image failed %+v", lastMessage.ErrorDetail.Message)
163163
return
164164
}
165165

166166
// push and clean
167167
for i := range tags {
168168
tag := tags[i]
169-
log.Infof(ctx, "[BuildImage] Push image %s", tag)
169+
logger.Infof(ctx, "Push image %s", tag)
170170
rc, err := node.Engine.ImagePush(ctx, tag)
171171
if err != nil {
172172
logger.Error(ctx, err)
@@ -190,8 +190,8 @@ func (c *Calcium) pushImageAndClean(ctx context.Context, resp io.ReadCloser, nod
190190

191191
}
192192

193-
func (c *Calcium) getWorkloadNode(ctx context.Context, id string) (*types.Node, error) {
194-
w, err := c.store.GetWorkload(ctx, id)
193+
func (c *Calcium) getWorkloadNode(ctx context.Context, ID string) (*types.Node, error) {
194+
w, err := c.store.GetWorkload(ctx, ID)
195195
if err != nil {
196196
return nil, err
197197
}
@@ -208,19 +208,19 @@ func (c *Calcium) withImageBuiltChannel(f func(chan *types.BuildImageMessage)) c
208208
return ch
209209
}
210210

211-
func cleanupNodeImages(ctx context.Context, node *types.Node, ids []string, ttl time.Duration) {
212-
logger := log.WithField("Calcium", "cleanupNodeImages").WithField("node", node).WithField("ids", ids).WithField("ttl", ttl)
213-
ctx, cancel := context.WithTimeout(utils.InheritTracingInfo(ctx, context.TODO()), ttl)
211+
func cleanupNodeImages(ctx context.Context, node *types.Node, IDs []string, ttl time.Duration) {
212+
logger := log.WithFunc("calcium.cleanupNodeImages").WithField("node", node).WithField("IDs", IDs).WithField("ttl", ttl)
213+
ctx, cancel := context.WithTimeout(utils.NewInheritCtx(ctx), ttl)
214214
defer cancel()
215-
for _, id := range ids {
216-
if _, err := node.Engine.ImageRemove(ctx, id, false, true); err != nil {
217-
logger.Error(ctx, err, "[BuildImage] Remove image error")
215+
for _, ID := range IDs {
216+
if _, err := node.Engine.ImageRemove(ctx, ID, false, true); err != nil {
217+
logger.Error(ctx, err, "Remove image error")
218218
}
219219
}
220220
if spaceReclaimed, err := node.Engine.ImageBuildCachePrune(ctx, true); err != nil {
221-
logger.Error(ctx, err, "[BuildImage] Remove build image cache error")
221+
logger.Error(ctx, err, "Remove build image cache error")
222222
} else {
223-
logger.Infof(ctx, "[BuildImage] Clean cached image and release space %d", spaceReclaimed)
223+
logger.Infof(ctx, "Clean cached image and release space %d", spaceReclaimed)
224224
}
225225
}
226226

cluster/calcium/calcium.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@ type Calcium struct {
3636

3737
// New returns a new cluster config
3838
func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, error) {
39+
logger := log.WithFunc("calcium.New")
3940
// set store
4041
store, err := store.NewStore(config, t)
4142
if err != nil {
42-
log.Error(ctx, err)
43+
logger.Error(ctx, err)
4344
return nil, err
4445
}
4546

@@ -52,32 +53,32 @@ func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, erro
5253
case cluster.Github:
5354
scm, err = github.New(config)
5455
default:
55-
log.Warn(ctx, "[Calcium] SCM not set, build API disabled")
56+
logger.Warn(ctx, "SCM not set, build API disabled")
5657
}
5758
if err != nil {
58-
log.Error(ctx, err, "[Calcium] SCM failed")
59+
logger.Error(ctx, err, "SCM failed")
5960
return nil, err
6061
}
6162

6263
// set watcher
63-
watcher := helium.New(config.GRPCConfig, store)
64+
watcher := helium.New(ctx, config.GRPCConfig, store)
6465

6566
// set resource plugin manager
6667
rmgr, err := resources.NewPluginsManager(config)
6768
if err != nil {
68-
log.Error(ctx, err)
69+
logger.Error(ctx, err)
6970
return nil, err
7071
}
7172

7273
// load internal plugins
7374
cpumem, err := cpumem.NewPlugin(config)
7475
if err != nil {
75-
log.Error(ctx, err, "[NewPluginManager] new cpumem plugin error")
76+
logger.Error(ctx, err, "new cpumem plugin error")
7677
return nil, err
7778
}
7879
volume, err := volume.NewPlugin(config)
7980
if err != nil {
80-
log.Error(ctx, err, "[NewPluginManager] new volume plugin error")
81+
logger.Error(ctx, err, "new volume plugin error")
8182
return nil, err
8283
}
8384
rmgr.AddPlugins(cpumem, volume)
@@ -96,13 +97,13 @@ func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, erro
9697

9798
cal.wal, err = enableWAL(config, cal, store)
9899
if err != nil {
99-
log.Error(ctx, err)
100+
logger.Error(ctx, err)
100101
return nil, err
101102
}
102103

103104
cal.identifier, err = config.Identifier()
104105
if err != nil {
105-
log.Error(ctx, err)
106+
logger.Error(ctx, err)
106107
return nil, err
107108
}
108109

0 commit comments

Comments
 (0)