两种更优写法:
方法一:保留你原来的 []chan Obj
结构,完善关闭
package main
import (
"fmt"
"testing"
"time"
)
type Obj struct {
index int
str string
}
func fn(index int, str string, ch chan Obj) {
defer close(ch) // 保证函数结束时关闭 channel
time.Sleep(time.Second)
ch <- Obj{index: index, str: str}
}
func TestQiWei(t *testing.T) {
arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
channels := make([]chan Obj, len(arr))
for i, s := range arr {
channels[i] = make(chan Obj, 1) // ✅ 改成缓冲 1,避免阻塞
go fn(i, s, channels[i])
}
list := []Obj{}
for _, channel := range channels {
if a, ok := <-channel; ok { // ✅ 安全读,避免因关闭导致 panic
list = append(list, a)
}
}
fmt.Println(list)
}
- ✅ 每个 goroutine 写完自动
close(ch)
。 - ✅ 使用
make(chan Obj, 1)
避免 goroutine 阻塞。 - ✅ 读时用
ok := <-ch
防止意外 panic。
方法二:统一一个结果 channel(推荐 ⚡)
写法更简洁,不需要维护一堆 chan
,只用一个 channel + WaitGroup
。
package main
import (
"fmt"
"sync"
"testing"
"time"
)
type Obj struct {
index int
str string
}
func fn(index int, str string, ch chan Obj, wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(time.Second)
ch <- Obj{index: index, str: str}
}
func TestQiWei(t *testing.T) {
arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
ch := make(chan Obj, len(arr)) // ✅ 一个总结果 channel
var wg sync.WaitGroup
for i, s := range arr {
wg.Add(1)
go fn(i, s, ch, &wg)
}
// 另起 goroutine 等待所有任务完成再关闭 channel
go func() {
wg.Wait()
close(ch)
}()
list := []Obj{}
for obj := range ch { // ✅ 自动收集所有结果,直到 channel 被关闭
list = append(list, obj)
}
fmt.Println(list)
}
- ✅ 不会产生泄漏,channel 最终一定关闭。
- ✅ 支持并发收集结果,代码更简洁。
- ✅ 不存在 panic 风险。
📌 总结
- 你的原始写法不会立刻 panic,但存在 阻塞风险、channel 未关闭、扩展性差。
- 最推荐的方法是 方法二(一个结果 channel + WaitGroup),既安全又简洁。
封装
直接把 方法二 封装成一个可复用的并发收集工具函数。
以后你只需要传入任务数组,它会自动并发执行、收集结果,返回一个 []Obj
。
🔧 封装后的工具函数
package main
import (
"fmt"
"sync"
"time"
)
// Obj 是结果类型
type Obj struct {
Index int
Str string
}
// worker 是实际执行的函数逻辑
// 这里我写了一个例子(Sleep 模拟耗时任务)
// 你可以把具体逻辑替换掉
func worker(index int, str string) Obj {
time.Sleep(time.Second)
return Obj{Index: index, Str: str}
}
// RunJobs 接收字符串数组并发执行,返回结果切片
func RunJobs(arr []string) []Obj {
ch := make(chan Obj, len(arr)) // 收集所有结果
var wg sync.WaitGroup
for i, s := range arr {
wg.Add(1)
go func(i int, s string) {
defer wg.Done()
result := worker(i, s)
ch <- result
}(i, s)
}
// 等待所有任务完成后关闭 channel
go func() {
wg.Wait()
close(ch)
}()
// 收集所有结果
results := make([]Obj, 0, len(arr))
for obj := range ch {
results = append(results, obj)
}
return results
}
// 示例测试
func main() {
arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
list := RunJobs(arr)
fmt.Println(list)
}
✅ 特点
- 可复用:
RunJobs
可以接收任意[]string
并发处理。 - 安全:使用
WaitGroup
+close(ch)
,不会造成 goroutine 泄漏。 - 不阻塞:结果 channel 有缓冲,写入不会卡死。
- 简洁:主逻辑只需要调用
RunJobs(arr)
就能直接拿到结果。
要不要我再帮你升级一下,把 worker
也做成 可传入的函数参数?这样你可以传不同的业务逻辑进去(比如查数据库、调接口),而不仅限于字符串→Obj
。
👌 那我帮你把 worker
也抽象成参数传入,这样你就能传不同的逻辑进去,而 RunJobs
就是一个通用的并发收集函数了。
🔧 通用并发收集工具函数(函数作为参数)
package main
import (
"fmt"
"sync"
"time"
)
// Obj 是示例结果类型
type Obj struct {
Index int
Str string
}
// RunJobs 并发执行任务
// arr: 输入任务数组
// worker: 任务逻辑函数,接收 (index, value),返回结果 T
// 返回所有结果切片
func RunJobs[T any](arr []string, worker func(index int, value string) T) []T {
ch := make(chan T, len(arr))
var wg sync.WaitGroup
for i, s := range arr {
wg.Add(1)
go func(i int, s string) {
defer wg.Done()
ch <- worker(i, s) // 运行外部传入逻辑
}(i, s)
}
// 等待所有任务完成后关闭 channel
go func() {
wg.Wait()
close(ch)
}()
// 收集所有结果
results := make([]T, 0, len(arr))
for obj := range ch {
results = append(results, obj)
}
return results
}
// 示例 worker
func exampleWorker(index int, str string) Obj {
time.Sleep(time.Second) // 模拟耗时
return Obj{Index: index, Str: str}
}
// 使用示例
func main() {
arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
// 使用 exampleWorker
results := RunJobs(arr, exampleWorker)
fmt.Println(results)
}
✅ 特点
-
泛型支持:你可以返回任何类型,不限于
Obj
。RunJobs
返回的是[]T
,只要你传的worker
返回T
就行。
-
灵活传入逻辑:
// 直接用匿名函数 results := RunJobs(arr, func(i int, s string) Obj { return Obj{Index: i, Str: s + "_ok"} })
-
线程安全:所有 goroutine 用
WaitGroup
管理,channel 最终关闭。 -
不会阻塞:结果 channel 带缓冲,避免 goroutine 卡死。
要不要我帮你再写一个 带并发数限制(worker pool) 的版本?这样可以避免任务过多时开太多 goroutine(比如上千个任务)。
这样就不会因为任务太多(几千上万个)而开出几千个 goroutine,能够 控制最大并发数,既安全又高效。
🔧 带并发数限制的 RunJobsPool
package main
import (
"fmt"
"sync"
"time"
)
// Obj 示例结果类型
type Obj struct {
Index int
Str string
}
// RunJobsPool 并发执行任务(带并发数限制)
// arr: 输入任务数组
// worker: 任务逻辑函数,接收 (index, value),返回结果 T
// maxWorkers: 最大并发数
func RunJobsPool[T any](arr []string, worker func(index int, value string) T, maxWorkers int) []T {
if maxWorkers <= 0 {
maxWorkers = 1
}
ch := make(chan T, len(arr)) // 结果收集
jobCh := make(chan struct {
index int
value string
}, len(arr)) // 任务队列
// 把任务丢进 jobCh
for i, s := range arr {
jobCh <- struct {
index int
value string
}{i, s}
}
close(jobCh)
var wg sync.WaitGroup
// 启动固定数量的 worker goroutine
for w := 0; w < maxWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobCh {
ch <- worker(job.index, job.value)
}
}()
}
// 等待所有 worker 完成后关闭结果 channel
go func() {
wg.Wait()
close(ch)
}()
// 收集结果
results := make([]T, 0, len(arr))
for obj := range ch {
results = append(results, obj)
}
return results
}
// 示例 worker
func exampleWorker(index int, str string) Obj {
time.Sleep(500 * time.Millisecond) // 模拟耗时任务
return Obj{Index: index, Str: str}
}
// 使用示例
func main() {
arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
// 限制最大并发为 3
results := RunJobsPool(arr, exampleWorker, 3)
fmt.Println(results)
}
✅ 特点
-
可控并发数
maxWorkers
控制最大 goroutine 数。- 如果任务很多,比如 1 万个,不会开 1 万个 goroutine,而是开
maxWorkers
个 goroutine 轮流处理。
-
通用性强
- 依然用泛型
T any
,支持任意返回类型。 worker
自定义逻辑,可以做网络请求、数据库查询等。
- 依然用泛型
-
安全收敛
- 使用
jobCh
分发任务,WaitGroup
确保全部完成后关闭ch
。 - 无死锁,无资源泄漏。
- 使用
⚡ 用法示例:
results := RunJobsPool(arr, func(i int, s string) Obj {
return Obj{Index: i, Str: s + "_done"}
}, 5)
要不要我帮你再加一个 带超时控制(每个任务最长执行多久) 的版本?这样如果某个任务卡住了(比如 HTTP 请求超时),也不会拖死整个并发批处理。
好 ✅ 我再帮你把 超时控制 加上。
这样如果某个任务执行太久(比如 HTTP 请求挂死),就会自动超时,避免拖死整个批处理。
🔧 带超时控制的 RunJobsPoolTimeout
我这里用 context.WithTimeout
来实现任务级别的超时控制。
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Obj 示例结果类型
type Obj struct {
Index int
Str string
Err error
}
// RunJobsPoolTimeout 并发执行任务(带并发数限制 + 每个任务超时控制)
// arr: 输入任务数组
// worker: 任务逻辑函数,接收 (ctx, index, value),返回结果 T
// maxWorkers: 最大并发数
// timeout: 每个任务的超时时间
func RunJobsPoolTimeout[T any](
arr []string,
worker func(ctx context.Context, index int, value string) (T, error),
maxWorkers int,
timeout time.Duration,
) []T {
if maxWorkers <= 0 {
maxWorkers = 1
}
ch := make(chan T, len(arr)) // 结果收集
jobCh := make(chan struct {
index int
value string
}, len(arr)) // 任务队列
// 任务入队
for i, s := range arr {
jobCh <- struct {
index int
value string
}{i, s}
}
close(jobCh)
var wg sync.WaitGroup
// worker pool
for w := 0; w < maxWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobCh {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
result, err := worker(ctx, job.index, job.value)
cancel()
// 如果 worker 返回错误,可以选择丢弃或收集错误
if err == nil {
ch <- result
} else {
var zero T
ch <- zero // 保证结果长度一致
}
}
}()
}
// 等待所有 worker 完成后关闭结果 channel
go func() {
wg.Wait()
close(ch)
}()
// 收集结果
results := make([]T, 0, len(arr))
for obj := range ch {
results = append(results, obj)
}
return results
}
// 示例 worker:模拟可能超时的任务
func exampleWorker(ctx context.Context, index int, str string) (Obj, error) {
select {
case <-time.After(500 * time.Millisecond): // 模拟任务耗时
return Obj{Index: index, Str: str, Err: nil}, nil
case <-ctx.Done(): // 超时 / 被取消
return Obj{Index: index, Str: str, Err: ctx.Err()}, ctx.Err()
}
}
// 使用示例
func main() {
arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
// 限制最大并发为 3,每个任务最多执行 300ms
results := RunJobsPoolTimeout(arr, exampleWorker, 3, 300*time.Millisecond)
fmt.Println(results)
}
✅ 特点
-
并发数限制
maxWorkers
控制最大 goroutine 数。
-
任务超时控制
- 每个任务会创建一个
context.WithTimeout
。 - 如果超过
timeout
,worker 自动返回超时错误。
- 每个任务会创建一个
-
错误处理
- worker 返回
(T, error)
,你可以在结果里带上错误信息。 - 我现在的实现是:错误时塞一个
zero T
,但你也可以选择返回T
+Err
。
- worker 返回
⚡ 用法示例:
results := RunJobsPoolTimeout(arr, func(ctx context.Context, i int, s string) (Obj, error) {
// 模拟请求接口
time.Sleep(100 * time.Millisecond)
return Obj{Index: i, Str: s}, nil
}, 5, 200*time.Millisecond)
这样:
- 最大 5 个任务并发
- 每个任务最多执行 200ms,超时会返回
context.DeadlineExceeded
要不要我帮你再改进成 支持整个批次的总超时(比如所有任务必须在 5 秒内完成,不管有多少任务)?这样可以做整体限时批处理。
👌 好,那我们把逻辑再升级一下:
之前是 每个任务有自己的超时,现在加一个 整体批次超时(例如:10 秒内必须完成所有任务,否则直接结束)。
🔧 带 整体批次超时 的 RunJobsBatchTimeout
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Obj 示例结果
type Obj struct {
Index int
Str string
Err error
}
// RunJobsBatchTimeout 并发执行任务(带并发数限制 + 整体批次超时)
// arr: 输入任务数组
// worker: 任务逻辑函数 (ctx, index, value) -> (T, error)
// maxWorkers: 最大并发数
// batchTimeout: 整个批次的超时时间
func RunJobsBatchTimeout[T any](
arr []string,
worker func(ctx context.Context, index int, value string) (T, error),
maxWorkers int,
batchTimeout time.Duration,
) ([]T, error) {
if maxWorkers <= 0 {
maxWorkers = 1
}
ctx, cancel := context.WithTimeout(context.Background(), batchTimeout)
defer cancel()
ch := make(chan T, len(arr)) // 收集结果
jobCh := make(chan struct {
index int
value string
}, len(arr))
// 投递任务
for i, s := range arr {
jobCh <- struct {
index int
value string
}{i, s}
}
close(jobCh)
var wg sync.WaitGroup
// 启动 worker pool
for w := 0; w < maxWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobCh {
// 每个任务继承批次 ctx
select {
case <-ctx.Done(): // 整体超时或被取消
return
default:
result, err := worker(ctx, job.index, job.value)
if err == nil {
ch <- result
} else {
var zero T
ch <- zero
}
}
}
}()
}
// 等待 worker 完成后关闭结果通道
go func() {
wg.Wait()
close(ch)
}()
// 收集结果
results := make([]T, 0, len(arr))
for {
select {
case <-ctx.Done():
return results, ctx.Err() // 整体超时
case obj, ok := <-ch:
if !ok {
return results, nil // 全部完成
}
results = append(results, obj)
}
}
}
// 示例 worker:模拟耗时任务
func exampleWorker(ctx context.Context, index int, str string) (Obj, error) {
select {
case <-time.After(500 * time.Millisecond): // 模拟执行 500ms
return Obj{Index: index, Str: str, Err: nil}, nil
case <-ctx.Done():
return Obj{Index: index, Str: str, Err: ctx.Err()}, ctx.Err()
}
}
// 使用示例
func main() {
arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
// 限制并发为 3,整体批次超时 2 秒
results, err := RunJobsBatchTimeout(arr, exampleWorker, 3, 2*time.Second)
fmt.Println("Results:", results)
fmt.Println("Error:", err)
}
✅ 功能总结
-
整体批次超时
context.WithTimeout
包裹整个批次,超时后所有 worker 会收到ctx.Done()
,自动退出。- 返回的
error
会是context.DeadlineExceeded
。
-
并发数限制
maxWorkers
控制 goroutine 数量,避免爆炸性创建。
-
安全收敛
WaitGroup
确保 worker 都退出。close(ch)
后,range 自动结束,安全收集。
-
结果部分保留
- 如果超时,返回已经完成的部分结果 + 超时错误。
- 如果正常完成,返回完整结果 +
nil
错误。
⚡ 用法示例:
results, err := RunJobsBatchTimeout(arr, func(ctx context.Context, i int, s string) (Obj, error) {
time.Sleep(300 * time.Millisecond)
return Obj{Index: i, Str: s}, nil
}, 5, 1*time.Second)
if err != nil {
fmt.Println("任务超时:", err)
}
fmt.Println("已完成结果:", results)
要不要我帮你再加一个 混合模式:既支持「单任务超时」,又支持「整体批次超时」?这样你可以灵活控制。
好 ✅ 我帮你写一个 混合模式:
- 单任务超时:每个任务都不能超过
taskTimeout
。 - 整体批次超时:整个批处理必须在
batchTimeout
内完成。
这样就能同时控制「个别任务卡住」和「批次拖太久」。
🔧 RunJobsHybridTimeout(混合模式)
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Obj 示例结果
type Obj struct {
Index int
Str string
Err error
}
// RunJobsHybridTimeout 并发执行任务(混合模式:整体批次超时 + 单任务超时)
// arr: 输入任务数组
// worker: 任务逻辑函数 (ctx, index, value) -> (T, error)
// maxWorkers: 最大并发数
// batchTimeout: 整个批次的超时时间
// taskTimeout: 每个任务的超时时间
func RunJobsHybridTimeout[T any](
arr []string,
worker func(ctx context.Context, index int, value string) (T, error),
maxWorkers int,
batchTimeout time.Duration,
taskTimeout time.Duration,
) ([]T, error) {
if maxWorkers <= 0 {
maxWorkers = 1
}
// 批次级 context
batchCtx, batchCancel := context.WithTimeout(context.Background(), batchTimeout)
defer batchCancel()
ch := make(chan T, len(arr)) // 收集结果
jobCh := make(chan struct {
index int
value string
}, len(arr))
// 投递任务
for i, s := range arr {
jobCh <- struct {
index int
value string
}{i, s}
}
close(jobCh)
var wg sync.WaitGroup
// worker pool
for w := 0; w < maxWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobCh {
// 每个任务有单独的超时 + 继承批次 ctx
taskCtx, taskCancel := context.WithTimeout(batchCtx, taskTimeout)
result, err := worker(taskCtx, job.index, job.value)
taskCancel()
if err == nil {
ch <- result
} else {
var zero T
ch <- zero
}
}
}()
}
// 等待所有 worker 完成后关闭结果通道
go func() {
wg.Wait()
close(ch)
}()
// 收集结果
results := make([]T, 0, len(arr))
for {
select {
case <-batchCtx.Done():
return results, batchCtx.Err() // 整体批次超时
case obj, ok := <-ch:
if !ok {
return results, nil // 全部完成
}
results = append(results, obj)
}
}
}
// 示例 worker:模拟任务可能超时
func exampleWorker(ctx context.Context, index int, str string) (Obj, error) {
select {
case <-time.After(500 * time.Millisecond): // 模拟任务耗时
return Obj{Index: index, Str: str, Err: nil}, nil
case <-ctx.Done():
return Obj{Index: index, Str: str, Err: ctx.Err()}, ctx.Err()
}
}
// 使用示例
func main() {
arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
// 并发 3,批次超时 2s,每个任务最多 300ms
results, err := RunJobsHybridTimeout(arr, exampleWorker, 3, 2*time.Second, 300*time.Millisecond)
fmt.Println("Results:", results)
fmt.Println("Error:", err)
}
✅ 功能总结
-
单任务超时
- 每个任务继承批次
context
,并套一层taskTimeout
。 - 如果单个任务超时,返回
ctx.Err()
。
- 每个任务继承批次
-
整体批次超时
batchCtx
控制整个批处理生命周期。- 超过
batchTimeout
后,所有任务立即取消。
-
并发数限制
maxWorkers
控制并发任务数。
-
结果部分返回
- 如果单任务超时:该任务结果会是
zero T
。 - 如果整体批次超时:返回已完成的部分结果 +
context.DeadlineExceeded
。
- 如果单任务超时:该任务结果会是
⚡ 用法示例:
results, err := RunJobsHybridTimeout(arr, func(ctx context.Context, i int, s string) (Obj, error) {
// 模拟请求接口
time.Sleep(200 * time.Millisecond)
return Obj{Index: i, Str: s}, nil
}, 5, 2*time.Second, 300*time.Millisecond)
要不要我帮你再封装一层,直接返回 成功结果 + 错误结果分开(两个 slice),这样就能更清楚哪些任务成功,哪些失败/超时?
明白了 ⚡,我给你一个完善后的最终版本,同时逐条解释为什么这样写,确保没有 panic,也不会出现 未关闭的 channel。
你的原始问题
- 可能的 panic:如果在
close(ch)
后再次向 channel 写入,会 panic。 - 未关闭 channel:当前代码里
ch <- aa
后,channel 一直没关闭。虽然读取一次没问题,但会造成 资源泄漏。 - 扩展性差:如果以后改为多次读取,会更容易出 bug。
完善后的代码
package main
import (
"fmt"
"sync"
"testing"
"time"
)
type Obj struct {
index int
str string
}
func fn(index int, str string, ch chan Obj, wg *sync.WaitGroup) {
defer wg.Done()
defer close(ch) // ✅ 保证每个 channel 在写完后关闭
time.Sleep(time.Second)
ch <- Obj{index: index, str: str}
}
func TestQiWei(t *testing.T) {
arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
channels := make([]chan Obj, len(arr))
var wg sync.WaitGroup
wg.Add(len(arr))
for i, s := range arr {
channels[i] = make(chan Obj, 1) // ✅ 设置缓冲避免 goroutine 阻塞
go fn(i, s, channels[i], &wg)
}
// ✅ 等待所有 goroutine 完成
wg.Wait()
list := make([]Obj, 0, len(arr))
for _, ch := range channels {
// ✅ 从已关闭的 channel 安全读取
for obj := range ch {
list = append(list, obj)
}
}
fmt.Println(list)
}
改进点解释
-
defer close(ch)
- 确保每个 goroutine 在写入后关闭自己持有的 channel。
- 避免资源泄漏。
- 防止 panic:只允许写一次,
close
保证不会再写。
-
channel 设置为缓冲
1
- 避免 goroutine 在
ch <- obj
时阻塞等待主协程读取。 - 主协程可以稍后统一收集。
- 避免 goroutine 在
-
sync.WaitGroup
- 确保所有 goroutine 写完再读取。
- 避免还没写入就被主协程读取,造成错误或数据缺失。
-
for obj := range ch
- 读取时使用
range
,因为 channel 已经关闭,读取会自动退出。 - 避免死锁。
- 读取时使用
输出示例
[{0 JDO} {1 JDL} {2 SFO} {3 EMS} {4 YTO} {5 STO} {6 ZTO} {7 YD} {8 DPO} {9 CAINIAO}]
👉 这样写之后:
- 不会 panic。
- 所有 channel 都会关闭,不会泄漏。
- 逻辑清晰,即使以后要改成多次写入/多次读取,也能扩展。
要不要我帮你改成 用一个 channel 统一收集结果(比开 10 个 channel 更优雅)?
好嘞 ✅
我给你改成 一个 channel 统一收集结果 的版本,这样更简洁、性能更好,而且不用维护一堆 channel。
完整优化代码
package main
import (
"fmt"
"sync"
"testing"
"time"
)
type Obj struct {
index int
str string
}
func fn(index int, str string, ch chan<- Obj, wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(time.Second)
ch <- Obj{index: index, str: str}
}
func TestQiWei(t *testing.T) {
arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
// ✅ 用一个 channel 统一收集
ch := make(chan Obj, len(arr))
var wg sync.WaitGroup
wg.Add(len(arr))
for i, s := range arr {
go fn(i, s, ch, &wg)
}
// ✅ 等待所有 goroutine 完成后关闭 channel
go func() {
wg.Wait()
close(ch)
}()
// ✅ 从一个 channel 统一收集结果
list := make([]Obj, 0, len(arr))
for obj := range ch {
list = append(list, obj)
}
fmt.Println(list)
}
改进点
-
一个 channel
- 不再维护
channels := make([]chan Obj, 10)
。 - 只要一个 channel,goroutine 全部往里面写,主协程统一收集即可。
- 不再维护
-
缓冲区大小 = len(arr)
- 保证不会出现阻塞。
- goroutine 可以无等待写入。
-
关闭 channel 的时机
- 使用
go func() { wg.Wait(); close(ch) }()
,等所有 goroutine 完成后再关闭 channel。 - 保证主协程安全地
range
,不会 panic、不会死锁。
- 使用
输出示例
[{0 JDO} {1 JDL} {2 SFO} {3 EMS} {4 YTO} {5 STO} {6 ZTO} {7 YD} {8 DPO} {9 CAINIAO}]
👉 这样相比之前版本:
- 代码更简洁(不用 10 个 channel)。
- 不会 panic(关闭时机完全可控)。
- 无资源泄漏(channel 用完即关闭)。
- 可扩展,以后改成几百个 goroutine 也一样安全。
要不要我再给你写个 顺序收集结果 的版本(比如按照 index 排序,不受 goroutine 完成顺序影响)?
明白 ✅
目前的版本用一个 channel 收集结果,结果顺序取决于 goroutine 完成的快慢,可能乱序。
如果你想要 最终结果严格按照 index 顺序排列,可以在收集后排序,或者干脆用固定位置存储。
我给你两个方式:
方式一:收集后排序
(适合数据量大,goroutine 并发无序执行,最后统一排序)
package main
import (
"fmt"
"sort"
"sync"
"testing"
"time"
)
type Obj struct {
index int
str string
}
func fn(index int, str string, ch chan<- Obj, wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(time.Second)
ch <- Obj{index: index, str: str}
}
func TestQiWei(t *testing.T) {
arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
ch := make(chan Obj, len(arr))
var wg sync.WaitGroup
wg.Add(len(arr))
for i, s := range arr {
go fn(i, s, ch, &wg)
}
// 关闭 channel
go func() {
wg.Wait()
close(ch)
}()
// 收集
list := make([]Obj, 0, len(arr))
for obj := range ch {
list = append(list, obj)
}
// ✅ 排序,保证 index 顺序
sort.Slice(list, func(i, j int) bool {
return list[i].index < list[j].index
})
fmt.Println(list)
}
输出示例(顺序固定):
[{0 JDO} {1 JDL} {2 SFO} {3 EMS} {4 YTO} {5 STO} {6 ZTO} {7 YD} {8 DPO} {9 CAINIAO}]
方式二:直接按 index 存储
(更高效,不用排序)
package main
import (
"fmt"
"sync"
"testing"
"time"
)
type Obj struct {
index int
str string
}
func fn(index int, str string, res []Obj, wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(time.Second)
res[index] = Obj{index: index, str: str} // ✅ 直接放到对应下标
}
func TestQiWei(t *testing.T) {
arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
res := make([]Obj, len(arr)) // ✅ 结果数组,顺序固定
var wg sync.WaitGroup
wg.Add(len(arr))
for i, s := range arr {
go fn(i, s, res, &wg)
}
wg.Wait()
fmt.Println(res)
}
输出同样是顺序的:
[{0 JDO} {1 JDL} {2 SFO} {3 EMS} {4 YTO} {5 STO} {6 ZTO} {7 YD} {8 DPO} {9 CAINIAO}]
总结
- 方式一:通用方案,用 channel 收集,最后排序,适合复杂场景。
- 方式二:最高效,直接用数组下标写入,保证结果天然有序。
👉 你要不要我帮你再写一个 混合版:用 channel 收集(方便扩展,比如多数据源),但最终输出时自动排序,兼顾灵活性和有序性?
好 ✅
那我给你写一个 混合版:
- 仍然用 一个 channel 来收集结果(方便扩展、支持多来源/多批次任务)。
- 最终输出时 自动排序,保证顺序和稳定性。
混合版代码
package main
import (
"fmt"
"sort"
"sync"
"testing"
"time"
)
type Obj struct {
index int
str string
}
// goroutine 执行单元
func fn(index int, str string, ch chan<- Obj, wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(time.Second) // 模拟耗时任务
ch <- Obj{index: index, str: str}
}
func TestQiWei(t *testing.T) {
arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
ch := make(chan Obj, len(arr)) // 一个 channel 收集全部结果
var wg sync.WaitGroup
wg.Add(len(arr))
for i, s := range arr {
go fn(i, s, ch, &wg)
}
// 等待 goroutine 完成后关闭 channel
go func() {
wg.Wait()
close(ch)
}()
// 收集所有结果
list := make([]Obj, 0, len(arr))
for obj := range ch {
list = append(list, obj)
}
// ✅ 自动排序,确保按 index 输出
sort.Slice(list, func(i, j int) bool {
return list[i].index < list[j].index
})
fmt.Println(list)
}
特点
-
扩展性:
- 仍然使用一个 channel,支持任意数量的 goroutine 并发写入。
- 如果以后要接多个来源(比如多数据源、批量请求),只需把结果都丢进 channel。
-
安全性:
WaitGroup + close(ch)
保证不会 panic / 死锁。- channel 用完即关,无资源泄漏。
-
稳定性:
- 最终排序,保证输出顺序严格按照
index
。 - 不受 goroutine 完成快慢影响。
- 最终排序,保证输出顺序严格按照
输出示例
[{0 JDO} {1 JDL} {2 SFO} {3 EMS} {4 YTO} {5 STO} {6 ZTO} {7 YD} {8 DPO} {9 CAINIAO}]
👉 这个版本既有 方式一(channel 收集)的灵活性,又有 方式二(顺序固定)的稳定性。
要不要我再帮你做个 benchmark 对比(排序版 vs 直接按 index 存储),看哪种在你的场景更高效?
评论区