Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adjust func seq #548

Merged
merged 1 commit into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions client/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,6 @@ type Pool struct {
rpcClients []*clientWithStatus
}

func checkAlive(ctx context.Context, rpc *clientWithStatus, timeout time.Duration) bool {
var err error
utils.WithTimeout(ctx, timeout, func(ctx context.Context) {
_, err = rpc.client.Info(ctx, &pb.Empty{})
})
if err != nil {
log.Errorf("[ClientPool] connect to %s failed, err: %s", rpc.addr, err)
return false
}
log.Debugf("[ClientPool] connect to %s success", rpc.addr)
return true
}

// NewCoreRPCClientPool .
func NewCoreRPCClientPool(ctx context.Context, config *PoolConfig) (*Pool, error) {
if len(config.EruAddrs) == 0 {
Expand Down Expand Up @@ -95,6 +82,29 @@ func NewCoreRPCClientPool(ctx context.Context, config *PoolConfig) (*Pool, error
return c, nil
}

// GetClient finds the first *client.Client instance with an active connection. If all connections are dead, returns the first one.
func (c *Pool) GetClient() pb.CoreRPCClient {
for _, rpc := range c.rpcClients {
if rpc.alive {
return rpc.client
}
}
return c.rpcClients[0].client
}

func checkAlive(ctx context.Context, rpc *clientWithStatus, timeout time.Duration) bool {
var err error
utils.WithTimeout(ctx, timeout, func(ctx context.Context) {
_, err = rpc.client.Info(ctx, &pb.Empty{})
})
if err != nil {
log.Errorf("[ClientPool] connect to %s failed, err: %s", rpc.addr, err)
return false
}
log.Debugf("[ClientPool] connect to %s success", rpc.addr)
return true
}

func (c *Pool) updateClientsStatus(ctx context.Context, timeout time.Duration) {
wg := &sync.WaitGroup{}
for _, rpc := range c.rpcClients {
Expand All @@ -106,13 +116,3 @@ func (c *Pool) updateClientsStatus(ctx context.Context, timeout time.Duration) {
}
wg.Wait()
}

// GetClient finds the first *client.Client instance with an active connection. If all connections are dead, returns the first one.
func (c *Pool) GetClient() pb.CoreRPCClient {
for _, rpc := range c.rpcClients {
if rpc.alive {
return rpc.client
}
}
return c.rpcClients[0].client
}
2 changes: 1 addition & 1 deletion client/interceptor/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewStreamRetry(retryOpts RetryOptions) grpc.StreamClientInterceptor {
if _, ok := RPCNeedRetry[method]; !ok {
return stream, err
}
log.Debugf(ctx, "[NewStreamRetry] return retryStreawm for method %s", method)
log.Debugf(ctx, "[NewStreamRetry] return retryStream for method %s", method)
return &retryStream{
ctx: ctx,
ClientStream: stream,
Expand Down
68 changes: 34 additions & 34 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,40 +84,6 @@ func (c *Calcium) GetNode(ctx context.Context, nodename string) (*types.Node, er
return node, logger.Err(ctx, errors.WithStack(err))
}

func (c *Calcium) setAllWorkloadsOnNodeDown(ctx context.Context, nodename string) {
workloads, err := c.store.ListNodeWorkloads(ctx, nodename, nil)
if err != nil {
log.Errorf(ctx, "[setAllWorkloadsOnNodeDown] failed to list node workloads, node %v, err: %v", nodename, errors.WithStack(err))
return
}

for _, workload := range workloads {
appname, entrypoint, _, err := utils.ParseWorkloadName(workload.Name)
if err != nil {
log.Errorf(ctx, "[setAllWorkloadsOnNodeDown] Set workload %s on node %s as inactive failed %v", workload.ID, nodename, err)
continue
}

if workload.StatusMeta == nil {
workload.StatusMeta = &types.StatusMeta{ID: workload.ID}
}
workload.StatusMeta.Running = false
workload.StatusMeta.Healthy = false

// Set these attributes to set workload status
workload.StatusMeta.Appname = appname
workload.StatusMeta.Nodename = workload.Nodename
workload.StatusMeta.Entrypoint = entrypoint

// mark workload which belongs to this node as unhealthy
if err = c.store.SetWorkloadStatus(ctx, workload.StatusMeta, 0); err != nil {
log.Errorf(ctx, "[SetNodeAvailable] Set workload %s on node %s as inactive failed %v", workload.ID, nodename, errors.WithStack(err))
} else {
log.Infof(ctx, "[SetNodeAvailable] Set workload %s on node %s as inactive", workload.ID, nodename)
}
}
}

// SetNode set node available or not
func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error) {
logger := log.WithField("Calcium", "SetNode").WithField("opts", opts)
Expand Down Expand Up @@ -221,6 +187,40 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
})
}

func (c *Calcium) setAllWorkloadsOnNodeDown(ctx context.Context, nodename string) {
workloads, err := c.store.ListNodeWorkloads(ctx, nodename, nil)
if err != nil {
log.Errorf(ctx, "[setAllWorkloadsOnNodeDown] failed to list node workloads, node %v, err: %v", nodename, errors.WithStack(err))
return
}

for _, workload := range workloads {
appname, entrypoint, _, err := utils.ParseWorkloadName(workload.Name)
if err != nil {
log.Errorf(ctx, "[setAllWorkloadsOnNodeDown] Set workload %s on node %s as inactive failed %v", workload.ID, nodename, err)
continue
}

if workload.StatusMeta == nil {
workload.StatusMeta = &types.StatusMeta{ID: workload.ID}
}
workload.StatusMeta.Running = false
workload.StatusMeta.Healthy = false

// Set these attributes to set workload status
workload.StatusMeta.Appname = appname
workload.StatusMeta.Nodename = workload.Nodename
workload.StatusMeta.Entrypoint = entrypoint

// mark workload which belongs to this node as unhealthy
if err = c.store.SetWorkloadStatus(ctx, workload.StatusMeta, 0); err != nil {
log.Errorf(ctx, "[SetNodeAvailable] Set workload %s on node %s as inactive failed %v", workload.ID, nodename, errors.WithStack(err))
} else {
log.Infof(ctx, "[SetNodeAvailable] Set workload %s on node %s as inactive", workload.ID, nodename)
}
}
}

// filterNodes filters nodes using NodeFilter nf
// the filtering logic is introduced along with NodeFilter
// NOTE: when nf.Includes is set, they don't need to belong to podname
Expand Down
20 changes: 10 additions & 10 deletions lock/redis/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,6 @@ func (r *RedisLock) TryLock(ctx context.Context) (context.Context, error) {
return r.lock(ctx, nil)
}

func (r *RedisLock) lock(ctx context.Context, opts *redislock.Options) (context.Context, error) {
l, err := r.lc.Obtain(ctx, r.key, r.timeout, r.ttl, opts)
if err != nil {
return nil, err
}

r.l = l
return context.TODO(), nil // no need wrapped, not like etcd
}

// Unlock releases the lock
// if the lock is not acquired, will return ErrLockNotHeld
func (r *RedisLock) Unlock(ctx context.Context) error {
Expand All @@ -82,3 +72,13 @@ func (r *RedisLock) Unlock(ctx context.Context) error {
defer cancel()
return r.l.Release(lockCtx)
}

func (r *RedisLock) lock(ctx context.Context, opts *redislock.Options) (context.Context, error) {
l, err := r.lc.Obtain(ctx, r.key, r.timeout, r.ttl, opts)
if err != nil {
return nil, err
}

r.l = l
return context.TODO(), nil // no need wrapped, not like etcd
}
66 changes: 33 additions & 33 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,39 +39,6 @@ type Metrics struct {
DeployCount *prometheus.CounterVec
}

// Lazy connect
func (m *Metrics) checkConn() error {
if m.statsdClient != nil {
return nil
}
var err error
// We needn't try to renew/reconnect because of only supporting UDP protocol now
// We should add an `errorCount` to reconnect when implementing TCP protocol
if m.statsdClient, err = statsdlib.New(m.StatsdAddr, statsdlib.WithErrorHandler(func(err error) {
log.Errorf(nil, "[statsd] Sending statsd failed: %v", err) //nolint
})); err != nil {
log.Errorf(nil, "[statsd] Connect statsd failed: %v", err) //nolint
return err
}
return nil
}

func (m *Metrics) gauge(key string, value float64) error {
if err := m.checkConn(); err != nil {
return err
}
m.statsdClient.Gauge(key, value)
return nil
}

func (m *Metrics) count(key string, n int, rate float32) error {
if err := m.checkConn(); err != nil {
return err
}
m.statsdClient.Count(key, n, rate)
return nil
}

// SendNodeInfo update node resource capacity
func (m *Metrics) SendNodeInfo(nm *types.NodeMetrics) {
nodename := nm.Name
Expand Down Expand Up @@ -144,6 +111,39 @@ func (m *Metrics) SendNodeInfo(nm *types.NodeMetrics) {
}
}

// Lazy connect
func (m *Metrics) checkConn() error {
if m.statsdClient != nil {
return nil
}
var err error
// We needn't try to renew/reconnect because of only supporting UDP protocol now
// We should add an `errorCount` to reconnect when implementing TCP protocol
if m.statsdClient, err = statsdlib.New(m.StatsdAddr, statsdlib.WithErrorHandler(func(err error) {
log.Errorf(nil, "[statsd] Sending statsd failed: %v", err) //nolint
})); err != nil {
log.Errorf(nil, "[statsd] Connect statsd failed: %v", err) //nolint
return err
}
return nil
}

func (m *Metrics) gauge(key string, value float64) error {
if err := m.checkConn(); err != nil {
return err
}
m.statsdClient.Gauge(key, value)
return nil
}

func (m *Metrics) count(key string, n int, rate float32) error {
if err := m.checkConn(); err != nil {
return err
}
m.statsdClient.Count(key, n, rate)
return nil
}

// SendDeployCount update deploy counter
func (m *Metrics) SendDeployCount(n int) {
log.Info("[Metrics] Update deploy counter")
Expand Down
Loading