Skip to content

Commit 7b7d9b3

Browse files
committed
refactor all txn methods, seems safe for all APIs now
1 parent d35e7f5 commit 7b7d9b3

24 files changed

+671
-556
lines changed

cluster/calcium/build.go

+18-14
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"io"
99
"os"
10+
"time"
1011

1112
enginetypes "github.com/projecteru2/core/engine/types"
1213
"github.com/projecteru2/core/types"
@@ -100,7 +101,7 @@ func (c *Calcium) buildFromExist(ctx context.Context, ref, existID string) (chan
100101
ch <- buildErrMsg(err)
101102
return
102103
}
103-
104+
go cleanupNodeImages(node, []string{imageID}, c.config.GlobalTimeout)
104105
ch <- &types.BuildImageMessage{ID: imageID}
105106
}), nil
106107
}
@@ -153,21 +154,9 @@ func (c *Calcium) pushImage(ctx context.Context, resp io.ReadCloser, node *types
153154
// 无论如何都删掉build机器的
154155
// 事实上他不会跟cached pod一样
155156
// 一样就砍死
156-
go func(tag string) {
157-
// context 这里的不应该受到 client 的影响
158-
_, err := node.Engine.ImageRemove(context.Background(), tag, false, true)
159-
if err != nil {
160-
log.Errorf("[BuildImage] Remove image error: %s", err)
161-
}
162-
spaceReclaimed, err := node.Engine.ImageBuildCachePrune(ctx, true)
163-
if err != nil {
164-
log.Errorf("[BuildImage] Remove build image cache error: %s", err)
165-
}
166-
log.Infof("[BuildImage] Clean cached image and release space %d", spaceReclaimed)
167-
}(tag)
168-
169157
ch <- &types.BuildImageMessage{Stream: fmt.Sprintf("finished %s\n", tag), Status: "finished", Progress: tag}
170158
}
159+
go cleanupNodeImages(node, tags, c.config.GlobalTimeout)
171160
}), nil
172161

173162
}
@@ -180,3 +169,18 @@ func withImageBuiltChannel(f func(chan *types.BuildImageMessage)) chan *types.Bu
180169
}()
181170
return ch
182171
}
172+
173+
func cleanupNodeImages(node *types.Node, IDs []string, ttl time.Duration) {
174+
ctx, cancel := context.WithTimeout(context.Background(), ttl)
175+
defer cancel()
176+
for _, ID := range IDs {
177+
if _, err := node.Engine.ImageRemove(ctx, ID, false, true); err != nil {
178+
log.Errorf("[BuildImage] Remove image error: %s", err)
179+
}
180+
}
181+
if spaceReclaimed, err := node.Engine.ImageBuildCachePrune(ctx, true); err != nil {
182+
log.Errorf("[BuildImage] Remove build image cache error: %s", err)
183+
} else {
184+
log.Infof("[BuildImage] Clean cached image and release space %d", spaceReclaimed)
185+
}
186+
}

cluster/calcium/create.go

+119-91
Original file line numberDiff line numberDiff line change
@@ -63,21 +63,39 @@ func (c *Calcium) doCreateContainer(ctx context.Context, opts *types.DeployOptio
6363
for _, nodeInfo := range nodesInfo {
6464
go metrics.Client.SendDeployCount(nodeInfo.Deploy)
6565
go func(nodeInfo types.NodeInfo, index int) {
66-
defer func() {
67-
if err := c.store.DeleteProcessing(context.Background(), opts, nodeInfo); err != nil {
68-
log.Errorf("[doCreateContainer] remove processing status failed %v", err)
69-
}
70-
wg.Done()
71-
}()
72-
73-
messages := c.doCreateContainerOnNode(ctx, nodeInfo, opts, index)
74-
for i, m := range messages {
75-
ch <- m
76-
// decr processing count
77-
if err := c.store.UpdateProcessing(context.Background(), opts, nodeInfo.Name, nodeInfo.Deploy-i-1); err != nil {
78-
log.Warnf("[doCreateContainer] Update processing count failed %v", err)
79-
}
80-
}
66+
_ = utils.Txn(
67+
ctx,
68+
func(ctx context.Context) error {
69+
for i, m := range c.doCreateContainerOnNode(ctx, nodeInfo, opts, index) {
70+
_ = utils.Txn(
71+
ctx,
72+
func(ctx context.Context) error {
73+
ch <- m
74+
return nil
75+
},
76+
func(ctx context.Context) error {
77+
// decr processing count
78+
if err := c.store.UpdateProcessing(ctx, opts, nodeInfo.Name, nodeInfo.Deploy-i-1); err != nil {
79+
log.Warnf("[doCreateContainer] Update processing count failed %v", err)
80+
}
81+
return nil
82+
},
83+
nil,
84+
c.config.GlobalTimeout,
85+
)
86+
}
87+
return nil
88+
},
89+
func(ctx context.Context) error {
90+
if err := c.store.DeleteProcessing(ctx, opts, nodeInfo); err != nil {
91+
log.Errorf("[doCreateContainer] remove processing status failed %v", err)
92+
}
93+
wg.Done()
94+
return nil
95+
},
96+
nil,
97+
c.config.GlobalTimeout,
98+
)
8199
}(nodeInfo, index)
82100
index += nodeInfo.Deploy
83101
}
@@ -101,7 +119,7 @@ func (c *Calcium) doCreateContainerOnNode(ctx context.Context, nodeInfo types.No
101119
}
102120

103121
node := &types.Node{}
104-
if err := c.Transaction(
122+
if err := utils.Txn(
105123
ctx,
106124
// if
107125
func(ctx context.Context) (err error) {
@@ -121,17 +139,18 @@ func (c *Calcium) doCreateContainerOnNode(ctx context.Context, nodeInfo types.No
121139
// rollback, will use background context
122140
func(ctx context.Context) (err error) {
123141
log.Errorf("[doCreateContainerOnNode] Error when create and start a container, %v", ms[i].Error)
124-
if ms[i].ContainerID == "" {
125-
if err = c.withNodeLocked(ctx, nodeInfo.Name, func(node *types.Node) error {
126-
return c.store.UpdateNodeResource(ctx, node, cpu, opts.CPUQuota, opts.Memory, opts.Storage, volumePlan.IntoVolumeMap(), store.ActionIncr)
127-
}); err != nil {
128-
log.Errorf("[doCreateContainer] Reset node %s failed %v", nodeInfo.Name, err)
129-
}
130-
} else {
142+
if ms[i].ContainerID != "" {
131143
log.Warnf("[doCreateContainer] Create container failed %v, and container %s not removed", ms[i].Error, ms[i].ContainerID)
144+
return
145+
}
146+
if err = c.withNodeLocked(ctx, nodeInfo.Name, func(node *types.Node) error {
147+
return c.store.UpdateNodeResource(ctx, node, cpu, opts.CPUQuota, opts.Memory, opts.Storage, volumePlan.IntoVolumeMap(), store.ActionIncr)
148+
}); err != nil {
149+
log.Errorf("[doCreateContainer] Reset node resource %s failed %v", nodeInfo.Name, err)
132150
}
133151
return
134152
},
153+
c.config.GlobalTimeout,
135154
); err != nil {
136155
continue
137156
}
@@ -185,82 +204,91 @@ func (c *Calcium) doCreateAndStartContainer(
185204
Publish: map[string][]string{},
186205
}
187206
var err error
188-
189-
defer func() {
190-
createContainerMessage.Error = err
191-
if err != nil && container.ID != "" {
192-
if err := c.doRemoveContainer(context.Background(), container, true); err != nil {
193-
log.Errorf("[doCreateAndStartContainer] create and start container failed, and remove it failed also, %s, %v", container.ID, err)
194-
return
195-
}
196-
createContainerMessage.ContainerID = ""
197-
}
198-
}()
199-
200-
// get config
201-
config := c.doMakeContainerOptions(no, cpu, volumePlan, opts, node)
202-
container.Name = config.Name
203-
container.Labels = config.Labels
204-
createContainerMessage.ContainerName = container.Name
205-
206-
// create container
207207
var containerCreated *enginetypes.VirtualizationCreated
208-
containerCreated, err = node.Engine.VirtualizationCreate(ctx, config)
209-
if err != nil {
210-
return createContainerMessage
211-
}
212-
container.ID = containerCreated.ID
213208

214-
// Copy data to container
215-
if len(opts.Data) > 0 {
216-
for dst, src := range opts.Data {
217-
if _, err = src.Seek(0, io.SeekStart); err != nil {
218-
return createContainerMessage
219-
}
220-
if err = c.doSendFileToContainer(ctx, node.Engine, containerCreated.ID, dst, src, true, true); err != nil {
221-
return createContainerMessage
209+
_ = utils.Txn(
210+
ctx,
211+
func(ctx context.Context) error {
212+
// get config
213+
config := c.doMakeContainerOptions(no, cpu, volumePlan, opts, node)
214+
container.Name = config.Name
215+
container.Labels = config.Labels
216+
createContainerMessage.ContainerName = container.Name
217+
218+
// create container
219+
containerCreated, err = node.Engine.VirtualizationCreate(ctx, config)
220+
if err != nil {
221+
return err
222222
}
223-
}
224-
}
223+
container.ID = containerCreated.ID
225224

226-
// deal with hook
227-
if len(opts.AfterCreate) > 0 && container.Hook != nil {
228-
container.Hook = &types.Hook{
229-
AfterStart: append(opts.AfterCreate, container.Hook.AfterStart...),
230-
Force: container.Hook.Force,
231-
}
232-
}
225+
// Copy data to container
226+
if len(opts.Data) > 0 {
227+
for dst, src := range opts.Data {
228+
if _, err = src.Seek(0, io.SeekStart); err != nil {
229+
return err
230+
}
231+
if err = c.doSendFileToContainer(ctx, node.Engine, container.ID, dst, src, true, true); err != nil {
232+
return err
233+
}
234+
}
235+
}
233236

234-
// start first
235-
createContainerMessage.Hook, err = c.doStartContainer(ctx, container, opts.IgnoreHook)
236-
if err != nil {
237-
return createContainerMessage
238-
}
237+
// deal with hook
238+
if len(opts.AfterCreate) > 0 && container.Hook != nil {
239+
container.Hook = &types.Hook{
240+
AfterStart: append(opts.AfterCreate, container.Hook.AfterStart...),
241+
Force: container.Hook.Force,
242+
}
243+
}
239244

240-
// inspect real meta
241-
var containerInfo *enginetypes.VirtualizationInfo
242-
containerInfo, err = container.Inspect(ctx) // 补充静态元数据
243-
if err != nil {
244-
return createContainerMessage
245-
}
245+
// start first
246+
createContainerMessage.Hook, err = c.doStartContainer(ctx, container, opts.IgnoreHook)
247+
if err != nil {
248+
return err
249+
}
246250

247-
// update meta
248-
if containerInfo.Networks != nil {
249-
createContainerMessage.Publish = utils.MakePublishInfo(containerInfo.Networks, opts.Entrypoint.Publish)
250-
}
251-
// reset users
252-
if containerInfo.User != container.User {
253-
container.User = containerInfo.User
254-
}
255-
// reset container.hook
256-
container.Hook = opts.Entrypoint.Hook
251+
// inspect real meta
252+
var containerInfo *enginetypes.VirtualizationInfo
253+
containerInfo, err = container.Inspect(ctx) // 补充静态元数据
254+
if err != nil {
255+
return err
256+
}
257257

258-
// store eru container
259-
if err = c.store.AddContainer(ctx, container); err != nil {
260-
return createContainerMessage
261-
}
262-
// non-empty message.ContainerID signals that "core saves metadata of this container"
263-
createContainerMessage.ContainerID = containerCreated.ID
258+
// update meta
259+
if containerInfo.Networks != nil {
260+
createContainerMessage.Publish = utils.MakePublishInfo(containerInfo.Networks, opts.Entrypoint.Publish)
261+
}
262+
// reset users
263+
if containerInfo.User != container.User {
264+
container.User = containerInfo.User
265+
}
266+
// reset container.hook
267+
container.Hook = opts.Entrypoint.Hook
268+
return nil
269+
},
270+
func(ctx context.Context) error {
271+
// store eru container
272+
if err = c.store.AddContainer(ctx, container); err != nil {
273+
return err
274+
}
275+
// non-empty message.ContainerID means "core saves metadata of this container"
276+
createContainerMessage.ContainerID = container.ID
277+
return nil
278+
},
279+
func(ctx context.Context) error {
280+
createContainerMessage.Error = err
281+
if err != nil && container.ID != "" {
282+
if err := c.doRemoveContainer(ctx, container, true); err != nil {
283+
log.Errorf("[doCreateAndStartContainer] create and start container failed, and remove it failed also, %s, %v", container.ID, err)
284+
return err
285+
}
286+
createContainerMessage.ContainerID = ""
287+
}
288+
return nil
289+
},
290+
c.config.GlobalTimeout,
291+
)
264292

265293
return createContainerMessage
266294
}

cluster/calcium/dissociate.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/projecteru2/core/store"
77
"github.com/projecteru2/core/types"
8+
"github.com/projecteru2/core/utils"
89
log "github.com/sirupsen/logrus"
910
)
1011

@@ -16,7 +17,7 @@ func (c *Calcium) DissociateContainer(ctx context.Context, IDs []string) (chan *
1617
for _, ID := range IDs {
1718
err := c.withContainerLocked(ctx, ID, func(container *types.Container) error {
1819
return c.withNodeLocked(ctx, container.Nodename, func(node *types.Node) (err error) {
19-
return c.Transaction(
20+
return utils.Txn(
2021
ctx,
2122
// if
2223
func(ctx context.Context) error {
@@ -29,6 +30,7 @@ func (c *Calcium) DissociateContainer(ctx context.Context, IDs []string) (chan *
2930
},
3031
// rollback
3132
nil,
33+
c.config.GlobalTimeout,
3234
)
3335
})
3436
})

cluster/calcium/image.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func (c *Calcium) RemoveImage(ctx context.Context, podname, nodename string, ima
1616
step = 1
1717
}
1818

19-
nodes, err := c.GetNodes(ctx, podname, nodename, nil, false)
19+
nodes, err := c.getNodes(ctx, podname, nodename, nil, false)
2020
if err != nil {
2121
return ch, err
2222
}
@@ -76,7 +76,7 @@ func (c *Calcium) CacheImage(ctx context.Context, podname, nodename string, imag
7676
step = 1
7777
}
7878

79-
nodes, err := c.GetNodes(ctx, podname, nodename, nil, false)
79+
nodes, err := c.getNodes(ctx, podname, nodename, nil, false)
8080
if err != nil {
8181
return ch, err
8282
}

cluster/calcium/lock.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, f func(no
5555
func (c *Calcium) withContainersLocked(ctx context.Context, IDs []string, f func(containers map[string]*types.Container) error) error {
5656
containers := map[string]*types.Container{}
5757
locks := map[string]lock.DistributedLock{}
58-
defer func() { c.doUnlockAll(context.Background(), locks) }()
58+
defer func() { c.doUnlockAll(ctx, locks) }()
5959
cs, err := c.GetContainers(ctx, IDs)
6060
if err != nil {
6161
return err
@@ -74,8 +74,8 @@ func (c *Calcium) withContainersLocked(ctx context.Context, IDs []string, f func
7474
func (c *Calcium) withNodesLocked(ctx context.Context, podname, nodename string, labels map[string]string, all bool, f func(nodes map[string]*types.Node) error) error {
7575
nodes := map[string]*types.Node{}
7676
locks := map[string]lock.DistributedLock{}
77-
defer func() { c.doUnlockAll(context.Background(), locks) }()
78-
ns, err := c.GetNodes(ctx, podname, nodename, labels, all)
77+
defer func() { c.doUnlockAll(ctx, locks) }()
78+
ns, err := c.getNodes(ctx, podname, nodename, labels, all)
7979
if err != nil {
8080
return err
8181
}

0 commit comments

Comments
 (0)