@@ -5,191 +5,108 @@ import (
5
5
"sync"
6
6
)
7
7
8
- // SilentMode disable all error message
9
- var SilentMode = false
8
+ // Silent disable all error message
9
+ var Silent = false
10
10
11
11
// globalLimit limit all flow's concurrent work number.
12
12
// <= 0 means no limits.
13
- var globalLimit = 0
14
- var globalCurrent chan struct {}
15
-
16
13
var defaultPanicHandler = func (msg interface {}) {
17
14
say (msg , "panic" )
18
15
}
19
16
20
- type job func ()
21
-
22
- func (j job ) run () {
23
- if j == nil {
24
- say ("nil job" , "error" )
25
- return
26
- }
27
- j ()
28
- }
29
-
30
- type node struct {
31
- jobs []job
32
- }
33
-
34
- func (n * node ) reset () * node {
35
- n .jobs = nil
36
- return n
37
- }
38
-
39
17
// Flow is a sync model
40
18
type Flow struct {
41
- nodes [] * node
19
+ jobs [][] func ()
42
20
panicHandler func (interface {})
43
21
44
22
// concurrent limit number
45
- limit int
46
- current chan struct {}
23
+ limit int
47
24
48
- isNew bool
49
- }
50
-
51
- func (f * Flow ) reset () * Flow {
52
- f .isNew = true
53
- f .nodes = f .nodes [:0 ]
54
- f .panicHandler = nil
55
- f .limit = 0
56
- f .current = nil
57
- return f
58
- }
59
-
60
- var nodePool * sync.Pool
61
- var flowPool * sync.Pool
62
-
63
- func init () {
64
- nodePool = new (sync.Pool )
65
- nodePool .New = func () interface {} {
66
- return newNode ()
67
- }
68
- flowPool = new (sync.Pool )
69
- flowPool .New = func () interface {} {
70
- return newFlow ()
71
- }
25
+ runOnce * sync.Once
72
26
}
73
27
74
28
// New returns a flow instance
75
29
func New () * Flow {
76
- return getFlow ()
30
+ return & Flow {
31
+ jobs : [][]func (){},
32
+ panicHandler : defaultPanicHandler ,
33
+ limit : 10 ,
34
+ runOnce : new (sync.Once ),
35
+ }
77
36
}
78
37
79
- // Limit limit all flow's concurrent goroutines number
80
- func Limit (number int ) {
81
- if globalCurrent != nil {
82
- say ("limit can only set once" , "error" )
83
- return
84
- }
85
- if number <= 0 {
86
- say ("invalid limit number" , "error" )
87
- return
38
+ // SetLimit set the max concurrent goroutines number
39
+ func (f * Flow ) SetLimit (limit int ) {
40
+ if limit < 1 {
41
+ limit = 1
88
42
}
89
- globalLimit = number
90
- globalCurrent = make (chan struct {}, globalLimit )
43
+ f .limit = limit
91
44
}
92
45
93
46
// With add funcs in this level
94
47
// With: run f1, run f2, run f3 ... (random execute order)
95
48
func (f * Flow ) With (jobs ... func ()) * Flow {
96
- if len (f .nodes ) == 0 {
97
- f .nodes = append (f .nodes , getNode ())
98
- }
99
- n := f .nodes [len (f .nodes )- 1 ]
100
- for i := 0 ; i < len (jobs ); i ++ {
101
- n .jobs = append (n .jobs , job (jobs [i ]))
49
+ if len (f .jobs ) == 0 {
50
+ f .jobs = [][]func (){[]func (){}}
102
51
}
52
+ n := len (f .jobs )
53
+ f .jobs [n - 1 ] = append (f .jobs [n - 1 ], jobs ... )
103
54
return f
104
55
}
105
56
106
57
// Next add funcs in next level
107
58
// Next: wait level1(run f1, run f2, run f3...) ... wait level2(...)... (in order)
108
59
func (f * Flow ) Next (jobs ... func ()) * Flow {
109
- f .nodes = append (f .nodes , getNode ())
110
- f .With (jobs ... )
111
- return f
112
- }
113
-
114
- // OnPanic set panicHandler
115
- func (f * Flow ) OnPanic (panicHandler func (interface {})) * Flow {
116
- f .panicHandler = panicHandler
117
- return f
118
- }
119
-
120
- // Limit limit the flow's concurrent goroutines number
121
- func (f * Flow ) Limit (number int ) * Flow {
122
- if number <= 0 {
123
- say ("invalid limit number" , "error" )
124
- return f
125
- }
126
- f .limit = number
127
- f .current = make (chan struct {}, number )
60
+ f .jobs = append (f .jobs , jobs )
128
61
return f
129
62
}
130
63
131
64
// Run execute these funcs
132
65
func (f * Flow ) Run () {
133
- if f == nil || ! f .isNew {
134
- say ("invalid flow" , "error" )
135
- return
136
- }
137
- panicHandler := defaultPanicHandler
138
- if f .panicHandler != nil {
139
- panicHandler = f .panicHandler
140
- }
141
- wg := new (sync.WaitGroup )
142
- for i := 0 ; i < len (f .nodes ); i ++ {
143
- for j := 0 ; j < len (f .nodes [i ].jobs ); j ++ {
144
- if globalLimit > 0 {
145
- globalCurrent <- struct {}{}
146
- }
147
- if f .limit > 0 {
148
- f .current <- struct {}{}
149
- }
150
- wg .Add (1 )
151
- go func (i , j int ) {
152
- defer func () {
153
- if msg := recover (); msg != nil {
154
- panicHandler (msg )
155
- }
156
- if f .limit > 0 {
157
- <- f .current
158
- }
159
- if globalLimit > 0 {
160
- <- globalCurrent
66
+ f .runOnce .Do (func () {
67
+ taskCh := make (chan func ())
68
+ for range make ([]any , f .limit ) {
69
+ go func (taskCh chan func ()) {
70
+ for job := range taskCh {
71
+ if job == nil {
72
+ taskCh <- nil
73
+ return
161
74
}
162
- wg .Done ()
163
- }()
164
- f .nodes [i ].jobs [j ].run ()
165
- }(i , j )
75
+ job ()
76
+ }
77
+ }(taskCh )
166
78
}
167
- nodePool .Put (f .nodes [i ])
168
- wg .Wait ()
169
- }
170
- flowPool .Put (f )
171
- f .isNew = false
79
+ for _ , jobs := range f .jobs {
80
+ wg := new (sync.WaitGroup )
81
+ for _ , job := range jobs {
82
+ j := job
83
+ wg .Add (1 )
84
+ taskCh <- func () {
85
+ defer func () {
86
+ if msg := recover (); msg != nil {
87
+ f .panicHandler (msg )
88
+ }
89
+ wg .Done ()
90
+ }()
91
+
92
+ j ()
93
+ }
94
+ }
95
+ wg .Wait ()
96
+ }
97
+ taskCh <- nil
98
+ })
99
+ }
100
+
101
+ // OnPanic set panicHandler
102
+ func (f * Flow ) OnPanic (panicHandler func (interface {})) * Flow {
103
+ f .panicHandler = panicHandler
104
+ return f
172
105
}
173
106
174
107
func say (msg interface {}, level string ) {
175
- if SilentMode {
108
+ if Silent {
176
109
return
177
110
}
178
111
fmt .Printf ("%s %s: %v\n " , "flow" , level , msg )
179
112
}
180
-
181
- func getNode () * node {
182
- return nodePool .Get ().(* node ).reset ()
183
- }
184
-
185
- func getFlow () * Flow {
186
- return flowPool .Get ().(* Flow ).reset ()
187
- }
188
-
189
- func newNode () * node {
190
- return new (node )
191
- }
192
-
193
- func newFlow () * Flow {
194
- return new (Flow )
195
- }
0 commit comments