Skip to content

Commit fb44089

Browse files
authored
call plugin in parallel (#613)
* call plugin in parallel * remove lock
1 parent a29fe9b commit fb44089

File tree

1 file changed

+27
-12
lines changed

1 file changed

+27
-12
lines changed

resource/cobalt/call.go

+27-12
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,41 @@ package cobalt
22

33
import (
44
"context"
5+
"sync"
56

67
"github.com/cockroachdb/errors"
78
"github.com/projecteru2/core/log"
89
"github.com/projecteru2/core/resource/plugins"
910
)
1011

1112
func call[T any](ctx context.Context, ps []plugins.Plugin, f func(plugins.Plugin) (T, error)) (map[plugins.Plugin]T, error) {
12-
// TODO 并行化,意义不大
13+
var wg sync.WaitGroup
1314
var combinedErr error
14-
results := map[plugins.Plugin]T{}
15-
15+
var results sync.Map
1616
for _, p := range ps {
17-
result, err := f(p)
18-
if err != nil {
19-
log.WithFunc("resource.cobalt.call").Errorf(ctx, err, "failed to call plugin %+v", p.Name())
20-
combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, p.Name()))
21-
continue
22-
}
23-
results[p] = result
24-
}
17+
wg.Add(1)
18+
go func(p plugins.Plugin) {
19+
defer wg.Done()
2520

26-
return results, combinedErr
21+
result, err := f(p)
22+
if err != nil {
23+
log.WithFunc("resource.cobalt.call").Errorf(ctx, err, "failed to call plugin %+v", p.Name())
24+
results.Store(p, err)
25+
return
26+
}
27+
results.Store(p, result)
28+
}(p)
29+
}
30+
wg.Wait()
31+
ans := make(map[plugins.Plugin]T)
32+
results.Range(func(key, value any) bool {
33+
switch vt := value.(type) {
34+
case error:
35+
combinedErr = errors.CombineErrors(combinedErr, vt)
36+
case T:
37+
ans[key.(plugins.Plugin)] = vt
38+
}
39+
return true
40+
})
41+
return ans, combinedErr
2742
}

0 commit comments

Comments
 (0)