Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recify logging system #349

Merged
merged 1 commit into from
Mar 1, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 32 additions & 24 deletions cluster/calcium/build.go
Original file line number Diff line number Diff line change
@@ -3,62 +3,65 @@ package calcium
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"time"

"github.com/pkg/errors"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/types"
)

// BuildImage will build image
func (c *Calcium) BuildImage(ctx context.Context, opts *types.BuildOptions) (chan *types.BuildImageMessage, error) {
func (c *Calcium) BuildImage(ctx context.Context, opts *types.BuildOptions) (ch chan *types.BuildImageMessage, err error) {
logger := log.WithField("Calcium", "BuildImage").WithField("opts", opts)
// Disable build API if scm not set
if c.source == nil {
return nil, types.ErrSCMNotSet
return nil, logger.Err(errors.WithStack(types.ErrSCMNotSet))
}
// select nodes
node, err := c.selectBuildNode(ctx)
if err != nil {
return nil, err
return nil, logger.Err(errors.WithStack(err))
}
log.Infof("[BuildImage] Building image at pod %s node %s", node.Podname, node.Name)
// get refs
refs := node.Engine.BuildRefs(ctx, opts.Name, opts.Tags)

switch opts.BuildMethod {
case types.BuildFromSCM:
return c.buildFromSCM(ctx, node, refs, opts)
ch, err = c.buildFromSCM(ctx, node, refs, opts)
case types.BuildFromRaw:
return c.buildFromContent(ctx, node, refs, opts.Tar)
ch, err = c.buildFromContent(ctx, node, refs, opts.Tar)
case types.BuildFromExist:
return c.buildFromExist(ctx, refs[0], opts.ExistID)
ch, err = c.buildFromExist(ctx, refs[0], opts.ExistID)
default:
return nil, errors.New("unknown build type")
return nil, logger.Err(errors.WithStack(errors.New("unknown build type")))
}
return ch, logger.Err(errors.WithStack(err))
}

func (c *Calcium) selectBuildNode(ctx context.Context) (*types.Node, error) {
// get pod from config
// TODO can choose multiple pod here for other engine support
if c.config.Docker.BuildPod == "" {
return nil, types.ErrNoBuildPod
return nil, errors.WithStack(types.ErrNoBuildPod)
}

// get node by scheduler
nodes, err := c.ListPodNodes(ctx, c.config.Docker.BuildPod, nil, false)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
if len(nodes) == 0 {
return nil, types.ErrInsufficientNodes
return nil, errors.WithStack(types.ErrInsufficientNodes)
}
// get idle max node
return c.scheduler.MaxIdleNode(nodes)
node, err := c.scheduler.MaxIdleNode(nodes)
return node, errors.WithStack(err)
}

func (c *Calcium) buildFromSCM(ctx context.Context, node *types.Node, refs []string, opts *types.BuildOptions) (chan *types.BuildImageMessage, error) {
@@ -70,38 +73,42 @@ func (c *Calcium) buildFromSCM(ctx context.Context, node *types.Node, refs []str
path, content, err := node.Engine.BuildContent(ctx, c.source, buildContentOpts)
defer os.RemoveAll(path)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return c.buildFromContent(ctx, node, refs, content)
ch, err := c.buildFromContent(ctx, node, refs, content)
return ch, errors.WithStack(err)
}

func (c *Calcium) buildFromContent(ctx context.Context, node *types.Node, refs []string, content io.Reader) (chan *types.BuildImageMessage, error) {
resp, err := node.Engine.ImageBuild(ctx, content, refs)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return c.pushImage(ctx, resp, node, refs)
ch, err := c.pushImage(ctx, resp, node, refs)
return ch, errors.WithStack(err)
}

func (c *Calcium) buildFromExist(ctx context.Context, ref, existID string) (chan *types.BuildImageMessage, error) {
func (c *Calcium) buildFromExist(ctx context.Context, ref, existID string) (chan *types.BuildImageMessage, error) { // nolint:unparam
logger := log.WithField("Calcium", "buildFromExist").WithField("ref", ref).WithField("existID", existID)
return withImageBuiltChannel(func(ch chan *types.BuildImageMessage) {
node, err := c.getWorkloadNode(ctx, existID)
if err != nil {
ch <- buildErrMsg(err)
ch <- buildErrMsg(logger.Err(err))
return
}

imageID, err := node.Engine.ImageBuildFromExist(ctx, existID, ref)
if err != nil {
ch <- buildErrMsg(err)
ch <- buildErrMsg(logger.Err(err))
return
}
go cleanupNodeImages(node, []string{imageID}, c.config.GlobalTimeout)
ch <- &types.BuildImageMessage{ID: imageID}
}), nil
}

func (c *Calcium) pushImage(ctx context.Context, resp io.ReadCloser, node *types.Node, tags []string) (chan *types.BuildImageMessage, error) {
func (c *Calcium) pushImage(ctx context.Context, resp io.ReadCloser, node *types.Node, tags []string) (chan *types.BuildImageMessage, error) { // nolint:unparam
logger := log.WithField("Calcium", "pushImage").WithField("node", node).WithField("tags", tags)
return withImageBuiltChannel(func(ch chan *types.BuildImageMessage) {
defer resp.Close()
decoder := json.NewDecoder(resp)
@@ -119,7 +126,7 @@ func (c *Calcium) pushImage(ctx context.Context, resp io.ReadCloser, node *types
break
}
malformed, _ := ioutil.ReadAll(decoder.Buffered()) // TODO err check
log.Errorf("[BuildImage] Decode build image message failed %v, buffered: %v", err, malformed)
logger.Errorf("[BuildImage] Decode build image message failed %+v, buffered: %v", err, malformed)
return
}
ch <- message
@@ -137,7 +144,7 @@ func (c *Calcium) pushImage(ctx context.Context, resp io.ReadCloser, node *types
log.Infof("[BuildImage] Push image %s", tag)
rc, err := node.Engine.ImagePush(ctx, tag)
if err != nil {
ch <- &types.BuildImageMessage{Error: err.Error()}
ch <- &types.BuildImageMessage{Error: logger.Err(err).Error()}
continue
}

@@ -165,15 +172,16 @@ func withImageBuiltChannel(f func(chan *types.BuildImageMessage)) chan *types.Bu
}

func cleanupNodeImages(node *types.Node, ids []string, ttl time.Duration) {
logger := log.WithField("Calcium", "cleanupNodeImages").WithField("node", node).WithField("ids", ids).WithField("ttl", ttl)
ctx, cancel := context.WithTimeout(context.Background(), ttl)
defer cancel()
for _, id := range ids {
if _, err := node.Engine.ImageRemove(ctx, id, false, true); err != nil {
log.Errorf("[BuildImage] Remove image error: %s", err)
logger.Errorf("[BuildImage] Remove image error: %+v", errors.WithStack(err))
}
}
if spaceReclaimed, err := node.Engine.ImageBuildCachePrune(ctx, true); err != nil {
log.Errorf("[BuildImage] Remove build image cache error: %s", err)
logger.Errorf("[BuildImage] Remove build image cache error: %+v", errors.WithStack(err))
} else {
log.Infof("[BuildImage] Clean cached image and release space %d", spaceReclaimed)
}
12 changes: 7 additions & 5 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"context"
"strings"

"github.com/pkg/errors"
"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/discovery"
"github.com/projecteru2/core/discovery/helium"
@@ -30,16 +31,17 @@ type Calcium struct {

// New returns a new cluster config
func New(config types.Config, embeddedStorage bool) (*Calcium, error) {
logger := log.WithField("Calcium", "New").WithField("config", config)
// set store
store, err := etcdv3.New(config, embeddedStorage)
if err != nil {
return nil, err
return nil, logger.Err(errors.WithStack(err))
}

// set scheduler
potassium, err := complexscheduler.New(config)
if err != nil {
return nil, err
return nil, logger.Err(errors.WithStack(err))
}
scheduler.InitSchedulerV1(potassium)

@@ -55,16 +57,16 @@ func New(config types.Config, embeddedStorage bool) (*Calcium, error) {
log.Warn("[Calcium] SCM not set, build API disabled")
}
if err != nil {
log.Errorf("[Calcium] SCAM failed: %v", err)
return nil, err
logger.Errorf("[Calcium] SCAM failed: %+v", err)
return nil, errors.WithStack(err)
}

// set watcher
watcher := helium.New(config.GRPCConfig, store)

cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal.wal, err = newCalciumWAL(cal)
return cal, err
return cal, logger.Err(errors.WithStack(err))
}

// DisasterRecover .
3 changes: 3 additions & 0 deletions cluster/calcium/capacity.go
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ import (

// CalculateCapacity calculates capacity
func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptions) (*types.CapacityMessage, error) {
logger := log.WithField("Calcium", "CalculateCapacity").WithField("opts", opts)
var err error
msg := &types.CapacityMessage{
Total: 0,
@@ -21,6 +22,7 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio
return msg, c.withNodesLocked(ctx, opts.Podname, opts.Nodenames, nil, false, func(ctx context.Context, nodeMap map[string]*types.Node) error {
if opts.DeployStrategy != strategy.Dummy {
if _, msg.NodeCapacities, err = c.doAllocResource(ctx, nodeMap, opts); err != nil {
logger.Errorf("[Calcium.CalculateCapacity] doAllocResource failed: %+v", err)
return errors.WithStack(err)
}

@@ -31,6 +33,7 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio
var infos []strategy.Info
msg.Total, _, infos, err = c.doCalculateCapacity(nodeMap, opts)
if err != nil {
logger.Errorf("[Calcium.CalculateCapacity] doCalculateCapacity failed: %+v", err)
return errors.WithStack(err)
}
for _, info := range infos {
22 changes: 12 additions & 10 deletions cluster/calcium/control.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import (
"context"
"sync"

"github.com/pkg/errors"
"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/types"
@@ -13,6 +14,7 @@ import (

// ControlWorkload control workloads status
func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, force bool) (chan *types.ControlWorkloadMessage, error) {
logger := log.WithField("Calcium", "ControlWorkload").WithField("ids", ids).WithField("t", t).WithField("force", force)
ch := make(chan *types.ControlWorkloadMessage)

go func() {
@@ -28,20 +30,20 @@ func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, f
switch t {
case cluster.WorkloadStop:
message, err = c.doStopWorkload(ctx, workload, force)
return err
return errors.WithStack(err)
case cluster.WorkloadStart:
message, err = c.doStartWorkload(ctx, workload, force)
return err
return errors.WithStack(err)
case cluster.WorkloadRestart:
message, err = c.doStopWorkload(ctx, workload, force)
if err != nil {
return err
return errors.WithStack(err)
}
startHook, err := c.doStartWorkload(ctx, workload, force)
message = append(message, startHook...)
return err
return errors.WithStack(err)
}
return types.ErrUnknownControlType
return errors.WithStack(types.ErrUnknownControlType)
})
if err == nil {
log.Infof("[ControlWorkload] Workload %s %s", id, t)
@@ -50,7 +52,7 @@ func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, f
}
ch <- &types.ControlWorkloadMessage{
WorkloadID: id,
Error: err,
Error: logger.Err(err),
Hook: message,
}
}(id)
@@ -63,7 +65,7 @@ func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, f

func (c *Calcium) doStartWorkload(ctx context.Context, workload *types.Workload, force bool) (message []*bytes.Buffer, err error) {
if err = workload.Start(ctx); err != nil {
return message, err
return message, errors.WithStack(err)
}
// TODO healthcheck first
if workload.Hook != nil && len(workload.Hook.AfterStart) > 0 {
@@ -75,7 +77,7 @@ func (c *Calcium) doStartWorkload(ctx context.Context, workload *types.Workload,
force, workload.Engine,
)
}
return message, err
return message, errors.WithStack(err)
}

func (c *Calcium) doStopWorkload(ctx context.Context, workload *types.Workload, force bool) (message []*bytes.Buffer, err error) {
@@ -88,7 +90,7 @@ func (c *Calcium) doStopWorkload(ctx context.Context, workload *types.Workload,
force, workload.Engine,
)
if err != nil {
return message, err
return message, errors.WithStack(err)
}
}

@@ -98,5 +100,5 @@ func (c *Calcium) doStopWorkload(ctx context.Context, workload *types.Workload,
if err = workload.Stop(ctx); err != nil {
message = append(message, bytes.NewBufferString(err.Error()))
}
return message, err
return message, errors.WithStack(err)
}
6 changes: 4 additions & 2 deletions cluster/calcium/copy.go
Original file line number Diff line number Diff line change
@@ -4,14 +4,16 @@ import (
"context"
"sync"

"github.com/pkg/errors"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/types"
)

// Copy uses VirtualizationCopyFrom cp to copy specified things and send to remote
func (c *Calcium) Copy(ctx context.Context, opts *types.CopyOptions) (chan *types.CopyMessage, error) {
logger := log.WithField("Calcium", "Copy").WithField("opts", opts)
if err := opts.Validate(); err != nil {
return nil, err
return nil, logger.Err(errors.WithStack(err))
}
ch := make(chan *types.CopyMessage)
go func() {
@@ -30,7 +32,7 @@ func (c *Calcium) Copy(ctx context.Context, opts *types.CopyOptions) (chan *type
}
return nil
}); err != nil {
ch <- makeCopyMessage(id, "", "", err, nil)
ch <- makeCopyMessage(id, "", "", logger.Err(err), nil)
}
}(id, paths)
}
Loading