两种更优写法:
方法一:保留你原来的 []chan Obj 结构,完善关闭
package main
import (
	"fmt"
	"testing"
	"time"
)
type Obj struct {
	index int
	str   string
}
func fn(index int, str string, ch chan Obj) {
	defer close(ch) // 保证函数结束时关闭 channel
	time.Sleep(time.Second)
	ch <- Obj{index: index, str: str}
}
func TestQiWei(t *testing.T) {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
	channels := make([]chan Obj, len(arr))
	for i, s := range arr {
		channels[i] = make(chan Obj, 1) // ✅ 改成缓冲 1,避免阻塞
		go fn(i, s, channels[i])
	}
	list := []Obj{}
	for _, channel := range channels {
		if a, ok := <-channel; ok { // ✅ 安全读,避免因关闭导致 panic
			list = append(list, a)
		}
	}
	fmt.Println(list)
}
- ✅ 每个 goroutine 写完自动 
close(ch)。 - ✅ 使用 
make(chan Obj, 1)避免 goroutine 阻塞。 - ✅ 读时用 
ok := <-ch防止意外 panic。 
方法二:统一一个结果 channel(推荐 ⚡)
写法更简洁,不需要维护一堆 chan,只用一个 channel + WaitGroup。
package main
import (
	"fmt"
	"sync"
	"testing"
	"time"
)
type Obj struct {
	index int
	str   string
}
func fn(index int, str string, ch chan Obj, wg *sync.WaitGroup) {
	defer wg.Done()
	time.Sleep(time.Second)
	ch <- Obj{index: index, str: str}
}
func TestQiWei(t *testing.T) {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
	ch := make(chan Obj, len(arr)) // ✅ 一个总结果 channel
	var wg sync.WaitGroup
	for i, s := range arr {
		wg.Add(1)
		go fn(i, s, ch, &wg)
	}
	// 另起 goroutine 等待所有任务完成再关闭 channel
	go func() {
		wg.Wait()
		close(ch)
	}()
	list := []Obj{}
	for obj := range ch { // ✅ 自动收集所有结果,直到 channel 被关闭
		list = append(list, obj)
	}
	fmt.Println(list)
}
- ✅ 不会产生泄漏,channel 最终一定关闭。
 - ✅ 支持并发收集结果,代码更简洁。
 - ✅ 不存在 panic 风险。
 
📌 总结
- 你的原始写法不会立刻 panic,但存在 阻塞风险、channel 未关闭、扩展性差。
 - 最推荐的方法是 方法二(一个结果 channel + WaitGroup),既安全又简洁。
 
封装
直接把 方法二 封装成一个可复用的并发收集工具函数。
以后你只需要传入任务数组,它会自动并发执行、收集结果,返回一个 []Obj。
🔧 封装后的工具函数
package main
import (
	"fmt"
	"sync"
	"time"
)
// Obj 是结果类型
type Obj struct {
	Index int
	Str   string
}
// worker 是实际执行的函数逻辑
// 这里我写了一个例子(Sleep 模拟耗时任务)
// 你可以把具体逻辑替换掉
func worker(index int, str string) Obj {
	time.Sleep(time.Second)
	return Obj{Index: index, Str: str}
}
// RunJobs 接收字符串数组并发执行,返回结果切片
func RunJobs(arr []string) []Obj {
	ch := make(chan Obj, len(arr)) // 收集所有结果
	var wg sync.WaitGroup
	for i, s := range arr {
		wg.Add(1)
		go func(i int, s string) {
			defer wg.Done()
			result := worker(i, s)
			ch <- result
		}(i, s)
	}
	// 等待所有任务完成后关闭 channel
	go func() {
		wg.Wait()
		close(ch)
	}()
	// 收集所有结果
	results := make([]Obj, 0, len(arr))
	for obj := range ch {
		results = append(results, obj)
	}
	return results
}
// 示例测试
func main() {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
	list := RunJobs(arr)
	fmt.Println(list)
}
✅ 特点
- 可复用:
RunJobs可以接收任意[]string并发处理。 - 安全:使用 
WaitGroup+close(ch),不会造成 goroutine 泄漏。 - 不阻塞:结果 channel 有缓冲,写入不会卡死。
 - 简洁:主逻辑只需要调用 
RunJobs(arr)就能直接拿到结果。 
要不要我再帮你升级一下,把 worker 也做成 可传入的函数参数?这样你可以传不同的业务逻辑进去(比如查数据库、调接口),而不仅限于字符串→Obj。
👌 那我帮你把 worker 也抽象成参数传入,这样你就能传不同的逻辑进去,而 RunJobs 就是一个通用的并发收集函数了。
🔧 通用并发收集工具函数(函数作为参数)
package main
import (
	"fmt"
	"sync"
	"time"
)
// Obj 是示例结果类型
type Obj struct {
	Index int
	Str   string
}
// RunJobs 并发执行任务
// arr: 输入任务数组
// worker: 任务逻辑函数,接收 (index, value),返回结果 T
// 返回所有结果切片
func RunJobs[T any](arr []string, worker func(index int, value string) T) []T {
	ch := make(chan T, len(arr))
	var wg sync.WaitGroup
	for i, s := range arr {
		wg.Add(1)
		go func(i int, s string) {
			defer wg.Done()
			ch <- worker(i, s) // 运行外部传入逻辑
		}(i, s)
	}
	// 等待所有任务完成后关闭 channel
	go func() {
		wg.Wait()
		close(ch)
	}()
	// 收集所有结果
	results := make([]T, 0, len(arr))
	for obj := range ch {
		results = append(results, obj)
	}
	return results
}
// 示例 worker
func exampleWorker(index int, str string) Obj {
	time.Sleep(time.Second) // 模拟耗时
	return Obj{Index: index, Str: str}
}
// 使用示例
func main() {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
	// 使用 exampleWorker
	results := RunJobs(arr, exampleWorker)
	fmt.Println(results)
}
✅ 特点
- 
泛型支持:你可以返回任何类型,不限于
Obj。RunJobs返回的是[]T,只要你传的worker返回T就行。
 - 
灵活传入逻辑:
// 直接用匿名函数 results := RunJobs(arr, func(i int, s string) Obj { return Obj{Index: i, Str: s + "_ok"} }) - 
线程安全:所有 goroutine 用
WaitGroup管理,channel 最终关闭。 - 
不会阻塞:结果 channel 带缓冲,避免 goroutine 卡死。
 
要不要我帮你再写一个 带并发数限制(worker pool) 的版本?这样可以避免任务过多时开太多 goroutine(比如上千个任务)。
这样就不会因为任务太多(几千上万个)而开出几千个 goroutine,能够 控制最大并发数,既安全又高效。
🔧 带并发数限制的 RunJobsPool
package main
import (
	"fmt"
	"sync"
	"time"
)
// Obj 示例结果类型
type Obj struct {
	Index int
	Str   string
}
// RunJobsPool 并发执行任务(带并发数限制)
// arr: 输入任务数组
// worker: 任务逻辑函数,接收 (index, value),返回结果 T
// maxWorkers: 最大并发数
func RunJobsPool[T any](arr []string, worker func(index int, value string) T, maxWorkers int) []T {
	if maxWorkers <= 0 {
		maxWorkers = 1
	}
	ch := make(chan T, len(arr)) // 结果收集
	jobCh := make(chan struct {
		index int
		value string
	}, len(arr)) // 任务队列
	// 把任务丢进 jobCh
	for i, s := range arr {
		jobCh <- struct {
			index int
			value string
		}{i, s}
	}
	close(jobCh)
	var wg sync.WaitGroup
	// 启动固定数量的 worker goroutine
	for w := 0; w < maxWorkers; w++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for job := range jobCh {
				ch <- worker(job.index, job.value)
			}
		}()
	}
	// 等待所有 worker 完成后关闭结果 channel
	go func() {
		wg.Wait()
		close(ch)
	}()
	// 收集结果
	results := make([]T, 0, len(arr))
	for obj := range ch {
		results = append(results, obj)
	}
	return results
}
// 示例 worker
func exampleWorker(index int, str string) Obj {
	time.Sleep(500 * time.Millisecond) // 模拟耗时任务
	return Obj{Index: index, Str: str}
}
// 使用示例
func main() {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
	// 限制最大并发为 3
	results := RunJobsPool(arr, exampleWorker, 3)
	fmt.Println(results)
}
✅ 特点
- 
可控并发数
maxWorkers控制最大 goroutine 数。- 如果任务很多,比如 1 万个,不会开 1 万个 goroutine,而是开 
maxWorkers个 goroutine 轮流处理。 
 - 
通用性强
- 依然用泛型 
T any,支持任意返回类型。 worker自定义逻辑,可以做网络请求、数据库查询等。
 - 依然用泛型 
 - 
安全收敛
- 使用 
jobCh分发任务,WaitGroup确保全部完成后关闭ch。 - 无死锁,无资源泄漏。
 
 - 使用 
 
⚡ 用法示例:
results := RunJobsPool(arr, func(i int, s string) Obj {
    return Obj{Index: i, Str: s + "_done"}
}, 5)
要不要我帮你再加一个 带超时控制(每个任务最长执行多久) 的版本?这样如果某个任务卡住了(比如 HTTP 请求超时),也不会拖死整个并发批处理。
好 ✅ 我再帮你把 超时控制 加上。
这样如果某个任务执行太久(比如 HTTP 请求挂死),就会自动超时,避免拖死整个批处理。
🔧 带超时控制的 RunJobsPoolTimeout
我这里用 context.WithTimeout 来实现任务级别的超时控制。
package main
import (
	"context"
	"fmt"
	"sync"
	"time"
)
// Obj 示例结果类型
type Obj struct {
	Index int
	Str   string
	Err   error
}
// RunJobsPoolTimeout 并发执行任务(带并发数限制 + 每个任务超时控制)
// arr: 输入任务数组
// worker: 任务逻辑函数,接收 (ctx, index, value),返回结果 T
// maxWorkers: 最大并发数
// timeout: 每个任务的超时时间
func RunJobsPoolTimeout[T any](
	arr []string,
	worker func(ctx context.Context, index int, value string) (T, error),
	maxWorkers int,
	timeout time.Duration,
) []T {
	if maxWorkers <= 0 {
		maxWorkers = 1
	}
	ch := make(chan T, len(arr)) // 结果收集
	jobCh := make(chan struct {
		index int
		value string
	}, len(arr)) // 任务队列
	// 任务入队
	for i, s := range arr {
		jobCh <- struct {
			index int
			value string
		}{i, s}
	}
	close(jobCh)
	var wg sync.WaitGroup
	// worker pool
	for w := 0; w < maxWorkers; w++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for job := range jobCh {
				ctx, cancel := context.WithTimeout(context.Background(), timeout)
				result, err := worker(ctx, job.index, job.value)
				cancel()
				// 如果 worker 返回错误,可以选择丢弃或收集错误
				if err == nil {
					ch <- result
				} else {
					var zero T
					ch <- zero // 保证结果长度一致
				}
			}
		}()
	}
	// 等待所有 worker 完成后关闭结果 channel
	go func() {
		wg.Wait()
		close(ch)
	}()
	// 收集结果
	results := make([]T, 0, len(arr))
	for obj := range ch {
		results = append(results, obj)
	}
	return results
}
// 示例 worker:模拟可能超时的任务
func exampleWorker(ctx context.Context, index int, str string) (Obj, error) {
	select {
	case <-time.After(500 * time.Millisecond): // 模拟任务耗时
		return Obj{Index: index, Str: str, Err: nil}, nil
	case <-ctx.Done(): // 超时 / 被取消
		return Obj{Index: index, Str: str, Err: ctx.Err()}, ctx.Err()
	}
}
// 使用示例
func main() {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
	// 限制最大并发为 3,每个任务最多执行 300ms
	results := RunJobsPoolTimeout(arr, exampleWorker, 3, 300*time.Millisecond)
	fmt.Println(results)
}
✅ 特点
- 
并发数限制
maxWorkers控制最大 goroutine 数。
 - 
任务超时控制
- 每个任务会创建一个 
context.WithTimeout。 - 如果超过 
timeout,worker 自动返回超时错误。 
 - 每个任务会创建一个 
 - 
错误处理
- worker 返回 
(T, error),你可以在结果里带上错误信息。 - 我现在的实现是:错误时塞一个 
zero T,但你也可以选择返回T+Err。 
 - worker 返回 
 
⚡ 用法示例:
results := RunJobsPoolTimeout(arr, func(ctx context.Context, i int, s string) (Obj, error) {
    // 模拟请求接口
    time.Sleep(100 * time.Millisecond)
    return Obj{Index: i, Str: s}, nil
}, 5, 200*time.Millisecond)
这样:
- 最大 5 个任务并发
 - 每个任务最多执行 200ms,超时会返回 
context.DeadlineExceeded 
要不要我帮你再改进成 支持整个批次的总超时(比如所有任务必须在 5 秒内完成,不管有多少任务)?这样可以做整体限时批处理。
👌 好,那我们把逻辑再升级一下:
之前是 每个任务有自己的超时,现在加一个 整体批次超时(例如:10 秒内必须完成所有任务,否则直接结束)。
🔧 带 整体批次超时 的 RunJobsBatchTimeout
package main
import (
	"context"
	"fmt"
	"sync"
	"time"
)
// Obj 示例结果
type Obj struct {
	Index int
	Str   string
	Err   error
}
// RunJobsBatchTimeout 并发执行任务(带并发数限制 + 整体批次超时)
// arr: 输入任务数组
// worker: 任务逻辑函数 (ctx, index, value) -> (T, error)
// maxWorkers: 最大并发数
// batchTimeout: 整个批次的超时时间
func RunJobsBatchTimeout[T any](
	arr []string,
	worker func(ctx context.Context, index int, value string) (T, error),
	maxWorkers int,
	batchTimeout time.Duration,
) ([]T, error) {
	if maxWorkers <= 0 {
		maxWorkers = 1
	}
	ctx, cancel := context.WithTimeout(context.Background(), batchTimeout)
	defer cancel()
	ch := make(chan T, len(arr)) // 收集结果
	jobCh := make(chan struct {
		index int
		value string
	}, len(arr))
	// 投递任务
	for i, s := range arr {
		jobCh <- struct {
			index int
			value string
		}{i, s}
	}
	close(jobCh)
	var wg sync.WaitGroup
	// 启动 worker pool
	for w := 0; w < maxWorkers; w++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for job := range jobCh {
				// 每个任务继承批次 ctx
				select {
				case <-ctx.Done(): // 整体超时或被取消
					return
				default:
					result, err := worker(ctx, job.index, job.value)
					if err == nil {
						ch <- result
					} else {
						var zero T
						ch <- zero
					}
				}
			}
		}()
	}
	// 等待 worker 完成后关闭结果通道
	go func() {
		wg.Wait()
		close(ch)
	}()
	// 收集结果
	results := make([]T, 0, len(arr))
	for {
		select {
		case <-ctx.Done():
			return results, ctx.Err() // 整体超时
		case obj, ok := <-ch:
			if !ok {
				return results, nil // 全部完成
			}
			results = append(results, obj)
		}
	}
}
// 示例 worker:模拟耗时任务
func exampleWorker(ctx context.Context, index int, str string) (Obj, error) {
	select {
	case <-time.After(500 * time.Millisecond): // 模拟执行 500ms
		return Obj{Index: index, Str: str, Err: nil}, nil
	case <-ctx.Done():
		return Obj{Index: index, Str: str, Err: ctx.Err()}, ctx.Err()
	}
}
// 使用示例
func main() {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
	// 限制并发为 3,整体批次超时 2 秒
	results, err := RunJobsBatchTimeout(arr, exampleWorker, 3, 2*time.Second)
	fmt.Println("Results:", results)
	fmt.Println("Error:", err)
}
✅ 功能总结
- 
整体批次超时
context.WithTimeout包裹整个批次,超时后所有 worker 会收到ctx.Done(),自动退出。- 返回的 
error会是context.DeadlineExceeded。 
 - 
并发数限制
maxWorkers控制 goroutine 数量,避免爆炸性创建。
 - 
安全收敛
WaitGroup确保 worker 都退出。close(ch)后,range 自动结束,安全收集。
 - 
结果部分保留
- 如果超时,返回已经完成的部分结果 + 超时错误。
 - 如果正常完成,返回完整结果 + 
nil错误。 
 
⚡ 用法示例:
results, err := RunJobsBatchTimeout(arr, func(ctx context.Context, i int, s string) (Obj, error) {
    time.Sleep(300 * time.Millisecond)
    return Obj{Index: i, Str: s}, nil
}, 5, 1*time.Second)
if err != nil {
    fmt.Println("任务超时:", err)
}
fmt.Println("已完成结果:", results)
要不要我帮你再加一个 混合模式:既支持「单任务超时」,又支持「整体批次超时」?这样你可以灵活控制。
好 ✅ 我帮你写一个 混合模式:
- 单任务超时:每个任务都不能超过 
taskTimeout。 - 整体批次超时:整个批处理必须在 
batchTimeout内完成。 
这样就能同时控制「个别任务卡住」和「批次拖太久」。
🔧 RunJobsHybridTimeout(混合模式)
package main
import (
	"context"
	"fmt"
	"sync"
	"time"
)
// Obj 示例结果
type Obj struct {
	Index int
	Str   string
	Err   error
}
// RunJobsHybridTimeout 并发执行任务(混合模式:整体批次超时 + 单任务超时)
// arr: 输入任务数组
// worker: 任务逻辑函数 (ctx, index, value) -> (T, error)
// maxWorkers: 最大并发数
// batchTimeout: 整个批次的超时时间
// taskTimeout: 每个任务的超时时间
func RunJobsHybridTimeout[T any](
	arr []string,
	worker func(ctx context.Context, index int, value string) (T, error),
	maxWorkers int,
	batchTimeout time.Duration,
	taskTimeout time.Duration,
) ([]T, error) {
	if maxWorkers <= 0 {
		maxWorkers = 1
	}
	// 批次级 context
	batchCtx, batchCancel := context.WithTimeout(context.Background(), batchTimeout)
	defer batchCancel()
	ch := make(chan T, len(arr)) // 收集结果
	jobCh := make(chan struct {
		index int
		value string
	}, len(arr))
	// 投递任务
	for i, s := range arr {
		jobCh <- struct {
			index int
			value string
		}{i, s}
	}
	close(jobCh)
	var wg sync.WaitGroup
	// worker pool
	for w := 0; w < maxWorkers; w++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for job := range jobCh {
				// 每个任务有单独的超时 + 继承批次 ctx
				taskCtx, taskCancel := context.WithTimeout(batchCtx, taskTimeout)
				result, err := worker(taskCtx, job.index, job.value)
				taskCancel()
				if err == nil {
					ch <- result
				} else {
					var zero T
					ch <- zero
				}
			}
		}()
	}
	// 等待所有 worker 完成后关闭结果通道
	go func() {
		wg.Wait()
		close(ch)
	}()
	// 收集结果
	results := make([]T, 0, len(arr))
	for {
		select {
		case <-batchCtx.Done():
			return results, batchCtx.Err() // 整体批次超时
		case obj, ok := <-ch:
			if !ok {
				return results, nil // 全部完成
			}
			results = append(results, obj)
		}
	}
}
// 示例 worker:模拟任务可能超时
func exampleWorker(ctx context.Context, index int, str string) (Obj, error) {
	select {
	case <-time.After(500 * time.Millisecond): // 模拟任务耗时
		return Obj{Index: index, Str: str, Err: nil}, nil
	case <-ctx.Done():
		return Obj{Index: index, Str: str, Err: ctx.Err()}, ctx.Err()
	}
}
// 使用示例
func main() {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
	// 并发 3,批次超时 2s,每个任务最多 300ms
	results, err := RunJobsHybridTimeout(arr, exampleWorker, 3, 2*time.Second, 300*time.Millisecond)
	fmt.Println("Results:", results)
	fmt.Println("Error:", err)
}
✅ 功能总结
- 
单任务超时
- 每个任务继承批次 
context,并套一层taskTimeout。 - 如果单个任务超时,返回 
ctx.Err()。 
 - 每个任务继承批次 
 - 
整体批次超时
batchCtx控制整个批处理生命周期。- 超过 
batchTimeout后,所有任务立即取消。 
 - 
并发数限制
maxWorkers控制并发任务数。
 - 
结果部分返回
- 如果单任务超时:该任务结果会是 
zero T。 - 如果整体批次超时:返回已完成的部分结果 + 
context.DeadlineExceeded。 
 - 如果单任务超时:该任务结果会是 
 
⚡ 用法示例:
results, err := RunJobsHybridTimeout(arr, func(ctx context.Context, i int, s string) (Obj, error) {
    // 模拟请求接口
    time.Sleep(200 * time.Millisecond)
    return Obj{Index: i, Str: s}, nil
}, 5, 2*time.Second, 300*time.Millisecond)
要不要我帮你再封装一层,直接返回 成功结果 + 错误结果分开(两个 slice),这样就能更清楚哪些任务成功,哪些失败/超时?
明白了 ⚡,我给你一个完善后的最终版本,同时逐条解释为什么这样写,确保没有 panic,也不会出现 未关闭的 channel。
你的原始问题
- 可能的 panic:如果在 
close(ch)后再次向 channel 写入,会 panic。 - 未关闭 channel:当前代码里 
ch <- aa后,channel 一直没关闭。虽然读取一次没问题,但会造成 资源泄漏。 - 扩展性差:如果以后改为多次读取,会更容易出 bug。
 
完善后的代码
package main
import (
	"fmt"
	"sync"
	"testing"
	"time"
)
type Obj struct {
	index int
	str   string
}
func fn(index int, str string, ch chan Obj, wg *sync.WaitGroup) {
	defer wg.Done()
	defer close(ch) // ✅ 保证每个 channel 在写完后关闭
	time.Sleep(time.Second)
	ch <- Obj{index: index, str: str}
}
func TestQiWei(t *testing.T) {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
	channels := make([]chan Obj, len(arr))
	var wg sync.WaitGroup
	wg.Add(len(arr))
	for i, s := range arr {
		channels[i] = make(chan Obj, 1) // ✅ 设置缓冲避免 goroutine 阻塞
		go fn(i, s, channels[i], &wg)
	}
	// ✅ 等待所有 goroutine 完成
	wg.Wait()
	list := make([]Obj, 0, len(arr))
	for _, ch := range channels {
		// ✅ 从已关闭的 channel 安全读取
		for obj := range ch {
			list = append(list, obj)
		}
	}
	fmt.Println(list)
}
改进点解释
- 
defer close(ch)- 确保每个 goroutine 在写入后关闭自己持有的 channel。
 - 避免资源泄漏。
 - 防止 panic:只允许写一次,
close保证不会再写。 
 - 
channel 设置为缓冲
1- 避免 goroutine 在 
ch <- obj时阻塞等待主协程读取。 - 主协程可以稍后统一收集。
 
 - 避免 goroutine 在 
 - 
sync.WaitGroup- 确保所有 goroutine 写完再读取。
 - 避免还没写入就被主协程读取,造成错误或数据缺失。
 
 - 
for obj := range ch- 读取时使用 
range,因为 channel 已经关闭,读取会自动退出。 - 避免死锁。
 
 - 读取时使用 
 
输出示例
[{0 JDO} {1 JDL} {2 SFO} {3 EMS} {4 YTO} {5 STO} {6 ZTO} {7 YD} {8 DPO} {9 CAINIAO}]
👉 这样写之后:
- 不会 panic。
 - 所有 channel 都会关闭,不会泄漏。
 - 逻辑清晰,即使以后要改成多次写入/多次读取,也能扩展。
 
要不要我帮你改成 用一个 channel 统一收集结果(比开 10 个 channel 更优雅)?
好嘞 ✅
我给你改成 一个 channel 统一收集结果 的版本,这样更简洁、性能更好,而且不用维护一堆 channel。
完整优化代码
package main
import (
	"fmt"
	"sync"
	"testing"
	"time"
)
type Obj struct {
	index int
	str   string
}
func fn(index int, str string, ch chan<- Obj, wg *sync.WaitGroup) {
	defer wg.Done()
	time.Sleep(time.Second)
	ch <- Obj{index: index, str: str}
}
func TestQiWei(t *testing.T) {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
	// ✅ 用一个 channel 统一收集
	ch := make(chan Obj, len(arr))
	var wg sync.WaitGroup
	wg.Add(len(arr))
	for i, s := range arr {
		go fn(i, s, ch, &wg)
	}
	// ✅ 等待所有 goroutine 完成后关闭 channel
	go func() {
		wg.Wait()
		close(ch)
	}()
	// ✅ 从一个 channel 统一收集结果
	list := make([]Obj, 0, len(arr))
	for obj := range ch {
		list = append(list, obj)
	}
	fmt.Println(list)
}
改进点
- 
一个 channel
- 不再维护 
channels := make([]chan Obj, 10)。 - 只要一个 channel,goroutine 全部往里面写,主协程统一收集即可。
 
 - 不再维护 
 - 
缓冲区大小 = len(arr)
- 保证不会出现阻塞。
 - goroutine 可以无等待写入。
 
 - 
关闭 channel 的时机
- 使用 
go func() { wg.Wait(); close(ch) }(),等所有 goroutine 完成后再关闭 channel。 - 保证主协程安全地 
range,不会 panic、不会死锁。 
 - 使用 
 
输出示例
[{0 JDO} {1 JDL} {2 SFO} {3 EMS} {4 YTO} {5 STO} {6 ZTO} {7 YD} {8 DPO} {9 CAINIAO}]
👉 这样相比之前版本:
- 代码更简洁(不用 10 个 channel)。
 - 不会 panic(关闭时机完全可控)。
 - 无资源泄漏(channel 用完即关闭)。
 - 可扩展,以后改成几百个 goroutine 也一样安全。
 
要不要我再给你写个 顺序收集结果 的版本(比如按照 index 排序,不受 goroutine 完成顺序影响)?
明白 ✅
目前的版本用一个 channel 收集结果,结果顺序取决于 goroutine 完成的快慢,可能乱序。
如果你想要 最终结果严格按照 index 顺序排列,可以在收集后排序,或者干脆用固定位置存储。
我给你两个方式:
方式一:收集后排序
(适合数据量大,goroutine 并发无序执行,最后统一排序)
package main
import (
	"fmt"
	"sort"
	"sync"
	"testing"
	"time"
)
type Obj struct {
	index int
	str   string
}
func fn(index int, str string, ch chan<- Obj, wg *sync.WaitGroup) {
	defer wg.Done()
	time.Sleep(time.Second)
	ch <- Obj{index: index, str: str}
}
func TestQiWei(t *testing.T) {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
	ch := make(chan Obj, len(arr))
	var wg sync.WaitGroup
	wg.Add(len(arr))
	for i, s := range arr {
		go fn(i, s, ch, &wg)
	}
	// 关闭 channel
	go func() {
		wg.Wait()
		close(ch)
	}()
	// 收集
	list := make([]Obj, 0, len(arr))
	for obj := range ch {
		list = append(list, obj)
	}
	// ✅ 排序,保证 index 顺序
	sort.Slice(list, func(i, j int) bool {
		return list[i].index < list[j].index
	})
	fmt.Println(list)
}
输出示例(顺序固定):
[{0 JDO} {1 JDL} {2 SFO} {3 EMS} {4 YTO} {5 STO} {6 ZTO} {7 YD} {8 DPO} {9 CAINIAO}]
方式二:直接按 index 存储
(更高效,不用排序)
package main
import (
	"fmt"
	"sync"
	"testing"
	"time"
)
type Obj struct {
	index int
	str   string
}
func fn(index int, str string, res []Obj, wg *sync.WaitGroup) {
	defer wg.Done()
	time.Sleep(time.Second)
	res[index] = Obj{index: index, str: str} // ✅ 直接放到对应下标
}
func TestQiWei(t *testing.T) {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
	res := make([]Obj, len(arr)) // ✅ 结果数组,顺序固定
	var wg sync.WaitGroup
	wg.Add(len(arr))
	for i, s := range arr {
		go fn(i, s, res, &wg)
	}
	wg.Wait()
	fmt.Println(res)
}
输出同样是顺序的:
[{0 JDO} {1 JDL} {2 SFO} {3 EMS} {4 YTO} {5 STO} {6 ZTO} {7 YD} {8 DPO} {9 CAINIAO}]
总结
- 方式一:通用方案,用 channel 收集,最后排序,适合复杂场景。
 - 方式二:最高效,直接用数组下标写入,保证结果天然有序。
 
👉 你要不要我帮你再写一个 混合版:用 channel 收集(方便扩展,比如多数据源),但最终输出时自动排序,兼顾灵活性和有序性?
好 ✅
那我给你写一个 混合版:
- 仍然用 一个 channel 来收集结果(方便扩展、支持多来源/多批次任务)。
 - 最终输出时 自动排序,保证顺序和稳定性。
 
混合版代码
package main
import (
	"fmt"
	"sort"
	"sync"
	"testing"
	"time"
)
type Obj struct {
	index int
	str   string
}
// goroutine 执行单元
func fn(index int, str string, ch chan<- Obj, wg *sync.WaitGroup) {
	defer wg.Done()
	time.Sleep(time.Second) // 模拟耗时任务
	ch <- Obj{index: index, str: str}
}
func TestQiWei(t *testing.T) {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
	ch := make(chan Obj, len(arr)) // 一个 channel 收集全部结果
	var wg sync.WaitGroup
	wg.Add(len(arr))
	for i, s := range arr {
		go fn(i, s, ch, &wg)
	}
	// 等待 goroutine 完成后关闭 channel
	go func() {
		wg.Wait()
		close(ch)
	}()
	// 收集所有结果
	list := make([]Obj, 0, len(arr))
	for obj := range ch {
		list = append(list, obj)
	}
	// ✅ 自动排序,确保按 index 输出
	sort.Slice(list, func(i, j int) bool {
		return list[i].index < list[j].index
	})
	fmt.Println(list)
}
特点
- 
扩展性:
- 仍然使用一个 channel,支持任意数量的 goroutine 并发写入。
 - 如果以后要接多个来源(比如多数据源、批量请求),只需把结果都丢进 channel。
 
 - 
安全性:
WaitGroup + close(ch)保证不会 panic / 死锁。- channel 用完即关,无资源泄漏。
 
 - 
稳定性:
- 最终排序,保证输出顺序严格按照 
index。 - 不受 goroutine 完成快慢影响。
 
 - 最终排序,保证输出顺序严格按照 
 
输出示例
[{0 JDO} {1 JDL} {2 SFO} {3 EMS} {4 YTO} {5 STO} {6 ZTO} {7 YD} {8 DPO} {9 CAINIAO}]
👉 这个版本既有 方式一(channel 收集)的灵活性,又有 方式二(顺序固定)的稳定性。
要不要我再帮你做个 benchmark 对比(排序版 vs 直接按 index 存储),看哪种在你的场景更高效?
      
        
      
评论区