Skip to content

Commit 62079c5

Browse files
committed
refactor: rewrite realloc update process (#248)
1 parent 28aab81 commit 62079c5

File tree

5 files changed

+49
-51
lines changed

5 files changed

+49
-51
lines changed

.github/workflows/goreleaser.yml

-3
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@ jobs:
1818
run: |
1919
echo "::set-env name=VERSION::$(git describe --tags $(git rev-list --tags --max-count=1))"
2020
21-
- name: Unshallow
22-
run: git fetch --prune --unshallow
23-
2421
- name: Set up Go
2522
uses: actions/setup-go@v2
2623
with:

cluster/calcium/realloc.go

+34-29
Original file line numberDiff line numberDiff line change
@@ -233,37 +233,42 @@ func (c *Calcium) updateContainersResources(ctx context.Context, ch chan *types.
233233
}
234234

235235
func (c *Calcium) updateResource(ctx context.Context, node *types.Node, container *types.Container, newResource *enginetypes.VirtualizationResource) error {
236-
updateResourceErr := node.Engine.VirtualizationUpdateResource(ctx, container.ID, newResource)
237-
if updateResourceErr == nil {
238-
oldVolumeSize := container.Volumes.TotalSize()
239-
container.CPU = newResource.CPU
240-
container.Quota = newResource.Quota
241-
container.Memory = newResource.Memory
242-
container.Volumes, _ = types.MakeVolumeBindings(newResource.Volumes)
243-
container.VolumePlan = types.MustToVolumePlan(newResource.VolumePlan)
244-
container.Storage += container.Volumes.TotalSize() - oldVolumeSize
245-
} else {
246-
log.Errorf("[updateResource] When Realloc container, VirtualizationUpdateResource %s failed %v", container.ID, updateResourceErr)
247-
}
248-
// 成功失败都需要修改 node 的占用
249-
// 成功的话,node 占用为新资源
250-
// 失败的话,node 占用为老资源
251-
node.CPU.Sub(container.CPU)
252-
node.SetCPUUsed(container.Quota, types.IncrUsage)
253-
node.Volume.Sub(container.VolumePlan.IntoVolumeMap())
254-
node.SetVolumeUsed(container.VolumePlan.IntoVolumeMap().Total(), types.IncrUsage)
255-
node.StorageCap -= container.Storage
256-
node.MemCap -= container.Memory
257-
if nodeID := node.GetNUMANode(container.CPU); nodeID != "" {
258-
node.DecrNUMANodeMemory(nodeID, container.Memory)
259-
}
260-
// 更新 container 元数据
261-
// since we don't rollback VirutalUpdateResource, client can't interrupt
262-
if err := c.store.UpdateContainer(context.Background(), container); err != nil {
263-
log.Errorf("[updateResource] Realloc finish but update container %s failed %v", container.ID, err)
236+
var updateErr error
237+
if err := utils.Txn(
238+
ctx,
239+
func(ctx context.Context) error {
240+
if updateErr = node.Engine.VirtualizationUpdateResource(ctx, container.ID, newResource); updateErr == nil {
241+
oldVolumeSize := container.Volumes.TotalSize()
242+
container.CPU = newResource.CPU
243+
container.Quota = newResource.Quota
244+
container.Memory = newResource.Memory
245+
container.Volumes, _ = types.MakeVolumeBindings(newResource.Volumes)
246+
container.VolumePlan = types.MustToVolumePlan(newResource.VolumePlan)
247+
container.Storage += container.Volumes.TotalSize() - oldVolumeSize
248+
}
249+
return nil
250+
},
251+
func(ctx context.Context) error {
252+
// 成功失败都需要修改 node 的占用
253+
// 成功的话,node 占用为新资源
254+
// 失败的话,node 占用为老资源
255+
node.CPU.Sub(container.CPU)
256+
node.SetCPUUsed(container.Quota, types.IncrUsage)
257+
node.Volume.Sub(container.VolumePlan.IntoVolumeMap())
258+
node.SetVolumeUsed(container.VolumePlan.IntoVolumeMap().Total(), types.IncrUsage)
259+
node.StorageCap -= container.Storage
260+
node.MemCap -= container.Memory
261+
if nodeID := node.GetNUMANode(container.CPU); nodeID != "" {
262+
node.DecrNUMANodeMemory(nodeID, container.Memory)
263+
}
264+
return c.store.UpdateContainer(ctx, container)
265+
},
266+
nil,
267+
c.config.GlobalTimeout,
268+
); err != nil {
264269
return err
265270
}
266-
return updateResourceErr
271+
return updateErr
267272
}
268273

269274
func (c *Calcium) reallocVolume(node *types.Node, containers []*types.Container, vbs types.VolumeBindings) (plans map[*types.Container]types.VolumePlan, err error) {

cluster/calcium/service.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@ type serviceWatcher struct {
1717
subs sync.Map
1818
}
1919

20-
func (w *serviceWatcher) Start(s store.Store, pushInterval time.Duration) {
20+
func (w *serviceWatcher) Start(ctx context.Context, s store.Store, pushInterval time.Duration) {
2121
w.once.Do(func() {
22-
w.start(s, pushInterval)
22+
w.start(ctx, s, pushInterval)
2323
})
2424
}
2525

26-
func (w *serviceWatcher) start(s store.Store, pushInterval time.Duration) {
27-
ch, err := s.ServiceStatusStream(context.Background())
26+
func (w *serviceWatcher) start(ctx context.Context, s store.Store, pushInterval time.Duration) {
27+
ch, err := s.ServiceStatusStream(ctx)
2828
if err != nil {
2929
log.Errorf("[WatchServiceStatus] failed to start watch: %v", err)
3030
return
@@ -84,7 +84,7 @@ func (w *serviceWatcher) Unsubscribe(id uuid.UUID) {
8484
// WatchServiceStatus returns chan of available service address
8585
func (c *Calcium) WatchServiceStatus(ctx context.Context) (<-chan types.ServiceStatus, error) {
8686
ch := make(chan types.ServiceStatus)
87-
c.watcher.Start(c.store, c.config.GRPCConfig.ServiceDiscoveryPushInterval)
87+
c.watcher.Start(ctx, c.store, c.config.GRPCConfig.ServiceDiscoveryPushInterval)
8888
id := c.watcher.Subscribe(ch)
8989
go func() {
9090
<-ctx.Done()

core.go

+9-11
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package main
22

33
import (
4-
"context"
54
"fmt"
65
"net"
76
"net/http"
@@ -46,7 +45,7 @@ func setupLog(l string) error {
4645
return nil
4746
}
4847

49-
func serve() {
48+
func serve(c *cli.Context) error {
5049
config, err := utils.LoadConfig(configPath)
5150
if err != nil {
5251
log.Fatalf("[main] %v", err)
@@ -70,7 +69,8 @@ func serve() {
7069
vibranium := rpc.New(cluster, config, rpcch)
7170
s, err := net.Listen("tcp", config.Bind)
7271
if err != nil {
73-
log.Fatalf("[main] %v", err)
72+
log.Errorf("[main] %v", err)
73+
return err
7474
}
7575

7676
opts := []grpc.ServerOption{
@@ -90,9 +90,10 @@ func serve() {
9090
pb.RegisterCoreRPCServer(grpcServer, vibranium)
9191
go func() {
9292
if err := grpcServer.Serve(s); err != nil {
93-
log.Fatalf("[main] start grpc failed %v", err)
93+
log.Errorf("[main] start grpc failed %v", err)
9494
}
9595
}()
96+
9697
if config.Profile != "" {
9798
http.Handle("/metrics", metrics.Client.ResourceMiddleware(cluster)(promhttp.Handler()))
9899
go func() {
@@ -102,10 +103,10 @@ func serve() {
102103
}()
103104
}
104105

105-
unregisterService, err := cluster.RegisterService(context.Background())
106+
unregisterService, err := cluster.RegisterService(c.Context)
106107
if err != nil {
107108
log.Errorf("[main] failed to register service: %v", err)
108-
return
109+
return err
109110
}
110111
log.Info("[main] Cluster started successfully.")
111112

@@ -122,6 +123,7 @@ func serve() {
122123
log.Info("[main] Check if cluster still have running tasks.")
123124
vibranium.Wait()
124125
log.Info("[main] cluster gracefully stopped.")
126+
return nil
125127
}
126128

127129
func main() {
@@ -147,10 +149,6 @@ func main() {
147149
Destination: &embeddedStorage,
148150
},
149151
}
150-
app.Action = func(c *cli.Context) error {
151-
serve()
152-
return nil
153-
}
154-
152+
app.Action = serve
155153
_ = app.Run(os.Args)
156154
}

utils/transaction.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@ func Txn(ctx context.Context, cond contextFunc, then contextFunc, rollback conte
4040
thenCtx, thenCancel = context.WithTimeout(context.Background(), ttl)
4141
defer thenCancel()
4242
}
43-
if then != nil {
44-
tnxErr = then(thenCtx)
45-
}
43+
tnxErr = then(thenCtx)
4644
}
4745

4846
return tnxErr

0 commit comments

Comments
 (0)