Skip to content

Commit 6148549

Browse files
committed
feat: refactor AllCompleted and ParallelCompleted.
1 parent 90fbf2c commit 6148549

File tree

2 files changed

+17
-98
lines changed

2 files changed

+17
-98
lines changed

all.go

Lines changed: 8 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package async
22

33
import (
44
"context"
5-
"sync"
6-
"sync/atomic"
75
)
86

97
// All executes the functions asynchronously until all functions have been finished. If some
@@ -98,44 +96,14 @@ func allCompleted(
9896
parent context.Context,
9997
funcs ...AsyncFn,
10098
) ([][]any, error) {
101-
validateAsyncFuncs(funcs...)
102-
103-
out := make([][]any, len(funcs))
104-
if len(funcs) == 0 {
105-
return out, nil
106-
}
107-
108-
parent = getContext(parent)
109-
110-
errs := make([]error, len(funcs))
111-
errNum := atomic.Int32{}
112-
113-
wg := sync.WaitGroup{}
114-
wg.Add(len(funcs))
115-
116-
for i := 0; i < len(funcs); i++ {
117-
go func(n int) {
118-
fn := funcs[n]
119-
120-
childCtx, childCanFunc := context.WithCancel(parent)
121-
defer childCanFunc()
122-
defer wg.Done()
123-
124-
ret, err := invokeAsyncFn(fn, childCtx, nil)
125-
if err != nil {
126-
errNum.Add(1)
127-
errs[n] = err
128-
}
129-
out[n] = ret
130-
}(i)
131-
}
132-
133-
wg.Wait()
134-
if errNum.Load() == 0 {
135-
return out, nil
136-
}
99+
paralleler := builtinPool.Get().(*Paralleler)
100+
defer func() {
101+
builtinPool.Put(paralleler)
102+
}()
137103

138-
err := convertErrorListToExecutionErrors(errs, int(errNum.Load()))
104+
paralleler.
105+
WithContext(parent).
106+
Add(funcs...)
139107

140-
return out, err
108+
return paralleler.RunCompleted()
141109
}

parallel.go

Lines changed: 9 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package async
22

33
import (
44
"context"
5-
"sync"
6-
"sync/atomic"
75
)
86

97
// Parallel runs the functions asynchronously with the specified concurrency limitation. It will
@@ -95,62 +93,15 @@ func parallelCompleted(
9593
concurrency int,
9694
funcs ...AsyncFn,
9795
) ([][]any, error) {
98-
// the number of concurrency should be 0 (no limitation) or greater than 0.
99-
if concurrency < 0 {
100-
panic(ErrInvalidConcurrency)
101-
}
102-
validateAsyncFuncs(funcs...)
103-
104-
out := make([][]any, len(funcs))
105-
106-
if len(funcs) == 0 {
107-
return out, nil
108-
}
109-
110-
ctx := getContext(parent)
111-
errs := make([]error, len(funcs))
112-
errNum := atomic.Int32{}
113-
114-
wg := sync.WaitGroup{}
115-
wg.Add(len(funcs))
116-
117-
var conch chan empty // channel for concurrency limit
118-
// no concurrency limitation if the value of the number is 0
119-
if concurrency > 0 {
120-
conch = make(chan empty, concurrency)
121-
}
122-
123-
for i := 0; i < len(funcs); i++ {
124-
if conch != nil {
125-
conch <- empty{}
126-
}
127-
128-
go func(n int) {
129-
defer wg.Done()
130-
131-
fn := funcs[n]
132-
childCtx, childCanFunc := context.WithCancel(ctx)
133-
defer childCanFunc()
134-
135-
ret, err := invokeAsyncFn(fn, childCtx, nil)
136-
if err != nil {
137-
errs[n] = err
138-
errNum.Add(1)
139-
}
140-
out[n] = ret
141-
142-
if conch != nil {
143-
<-conch
144-
}
145-
}(i)
146-
}
147-
148-
wg.Wait()
149-
if errNum.Load() == 0 {
150-
return out, nil
151-
}
96+
paralleler := builtinPool.Get().(*Paralleler)
97+
defer func() {
98+
builtinPool.Put(paralleler)
99+
}()
152100

153-
err := convertErrorListToExecutionErrors(errs, int(errNum.Load()))
101+
paralleler.
102+
WithContext(parent).
103+
WithConcurrency(concurrency).
104+
Add(funcs...)
154105

155-
return out, err
106+
return paralleler.RunCompleted()
156107
}

0 commit comments

Comments
 (0)