侧边栏壁纸
博主头像
SeaDream乄造梦

Dream,Don't stop a day of hard and don't give up a little hope。 ——不停止一日努力&&不放弃一点希望。

  • 累计撰写 97 篇文章
  • 累计创建 21 个标签
  • 累计收到 15 条评论

目 录CONTENT

文章目录

go channel应用

SeaDream乄造梦
2025-09-04 / 0 评论 / 0 点赞 / 3 阅读 / 18,664 字
温馨提示:
亲爱的,如果觉得博主很有趣就留下你的足迹,并收藏下链接在走叭

两种更优写法:

方法一:保留你原来的 []chan Obj 结构,完善关闭

package main

import (
	"fmt"
	"testing"
	"time"
)

type Obj struct {
	index int
	str   string
}

func fn(index int, str string, ch chan Obj) {
	defer close(ch) // 保证函数结束时关闭 channel
	time.Sleep(time.Second)
	ch <- Obj{index: index, str: str}
}

func TestQiWei(t *testing.T) {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
	channels := make([]chan Obj, len(arr))

	for i, s := range arr {
		channels[i] = make(chan Obj, 1) // ✅ 改成缓冲 1,避免阻塞
		go fn(i, s, channels[i])
	}

	list := []Obj{}
	for _, channel := range channels {
		if a, ok := <-channel; ok { // ✅ 安全读,避免因关闭导致 panic
			list = append(list, a)
		}
	}
	fmt.Println(list)
}
  • ✅ 每个 goroutine 写完自动 close(ch)
  • ✅ 使用 make(chan Obj, 1) 避免 goroutine 阻塞。
  • ✅ 读时用 ok := <-ch 防止意外 panic。

方法二:统一一个结果 channel(推荐 ⚡)

写法更简洁,不需要维护一堆 chan,只用一个 channel + WaitGroup

package main

import (
	"fmt"
	"sync"
	"testing"
	"time"
)

type Obj struct {
	index int
	str   string
}

func fn(index int, str string, ch chan Obj, wg *sync.WaitGroup) {
	defer wg.Done()
	time.Sleep(time.Second)
	ch <- Obj{index: index, str: str}
}

func TestQiWei(t *testing.T) {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}

	ch := make(chan Obj, len(arr)) // ✅ 一个总结果 channel
	var wg sync.WaitGroup

	for i, s := range arr {
		wg.Add(1)
		go fn(i, s, ch, &wg)
	}

	// 另起 goroutine 等待所有任务完成再关闭 channel
	go func() {
		wg.Wait()
		close(ch)
	}()

	list := []Obj{}
	for obj := range ch { // ✅ 自动收集所有结果,直到 channel 被关闭
		list = append(list, obj)
	}
	fmt.Println(list)
}
  • ✅ 不会产生泄漏,channel 最终一定关闭。
  • ✅ 支持并发收集结果,代码更简洁。
  • ✅ 不存在 panic 风险。

📌 总结

  • 你的原始写法不会立刻 panic,但存在 阻塞风险channel 未关闭扩展性差
  • 最推荐的方法是 方法二(一个结果 channel + WaitGroup),既安全又简洁。

封装

直接把 方法二 封装成一个可复用的并发收集工具函数。
以后你只需要传入任务数组,它会自动并发执行、收集结果,返回一个 []Obj


🔧 封装后的工具函数

package main

import (
	"fmt"
	"sync"
	"time"
)

// Obj 是结果类型
type Obj struct {
	Index int
	Str   string
}

// worker 是实际执行的函数逻辑
// 这里我写了一个例子(Sleep 模拟耗时任务)
// 你可以把具体逻辑替换掉
func worker(index int, str string) Obj {
	time.Sleep(time.Second)
	return Obj{Index: index, Str: str}
}

// RunJobs 接收字符串数组并发执行,返回结果切片
func RunJobs(arr []string) []Obj {
	ch := make(chan Obj, len(arr)) // 收集所有结果
	var wg sync.WaitGroup

	for i, s := range arr {
		wg.Add(1)
		go func(i int, s string) {
			defer wg.Done()
			result := worker(i, s)
			ch <- result
		}(i, s)
	}

	// 等待所有任务完成后关闭 channel
	go func() {
		wg.Wait()
		close(ch)
	}()

	// 收集所有结果
	results := make([]Obj, 0, len(arr))
	for obj := range ch {
		results = append(results, obj)
	}
	return results
}

// 示例测试
func main() {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
	list := RunJobs(arr)
	fmt.Println(list)
}

✅ 特点

  • 可复用RunJobs 可以接收任意 []string 并发处理。
  • 安全:使用 WaitGroup + close(ch),不会造成 goroutine 泄漏。
  • 不阻塞:结果 channel 有缓冲,写入不会卡死。
  • 简洁:主逻辑只需要调用 RunJobs(arr) 就能直接拿到结果。

要不要我再帮你升级一下,把 worker 也做成 可传入的函数参数?这样你可以传不同的业务逻辑进去(比如查数据库、调接口),而不仅限于字符串→Obj
👌 那我帮你把 worker 也抽象成参数传入,这样你就能传不同的逻辑进去,而 RunJobs 就是一个通用的并发收集函数了。


🔧 通用并发收集工具函数(函数作为参数)

package main

import (
	"fmt"
	"sync"
	"time"
)

// Obj 是示例结果类型
type Obj struct {
	Index int
	Str   string
}

// RunJobs 并发执行任务
// arr: 输入任务数组
// worker: 任务逻辑函数,接收 (index, value),返回结果 T
// 返回所有结果切片
func RunJobs[T any](arr []string, worker func(index int, value string) T) []T {
	ch := make(chan T, len(arr))
	var wg sync.WaitGroup

	for i, s := range arr {
		wg.Add(1)
		go func(i int, s string) {
			defer wg.Done()
			ch <- worker(i, s) // 运行外部传入逻辑
		}(i, s)
	}

	// 等待所有任务完成后关闭 channel
	go func() {
		wg.Wait()
		close(ch)
	}()

	// 收集所有结果
	results := make([]T, 0, len(arr))
	for obj := range ch {
		results = append(results, obj)
	}
	return results
}

// 示例 worker
func exampleWorker(index int, str string) Obj {
	time.Sleep(time.Second) // 模拟耗时
	return Obj{Index: index, Str: str}
}

// 使用示例
func main() {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}

	// 使用 exampleWorker
	results := RunJobs(arr, exampleWorker)

	fmt.Println(results)
}

✅ 特点

  1. 泛型支持:你可以返回任何类型,不限于 Obj

    • RunJobs 返回的是 []T,只要你传的 worker 返回 T 就行。
  2. 灵活传入逻辑

    // 直接用匿名函数
    results := RunJobs(arr, func(i int, s string) Obj {
        return Obj{Index: i, Str: s + "_ok"}
    })
    
  3. 线程安全:所有 goroutine 用 WaitGroup 管理,channel 最终关闭。

  4. 不会阻塞:结果 channel 带缓冲,避免 goroutine 卡死。


要不要我帮你再写一个 带并发数限制(worker pool) 的版本?这样可以避免任务过多时开太多 goroutine(比如上千个任务)。

这样就不会因为任务太多(几千上万个)而开出几千个 goroutine,能够 控制最大并发数,既安全又高效。


🔧 带并发数限制的 RunJobsPool

package main

import (
	"fmt"
	"sync"
	"time"
)

// Obj 示例结果类型
type Obj struct {
	Index int
	Str   string
}

// RunJobsPool 并发执行任务(带并发数限制)
// arr: 输入任务数组
// worker: 任务逻辑函数,接收 (index, value),返回结果 T
// maxWorkers: 最大并发数
func RunJobsPool[T any](arr []string, worker func(index int, value string) T, maxWorkers int) []T {
	if maxWorkers <= 0 {
		maxWorkers = 1
	}
	ch := make(chan T, len(arr)) // 结果收集
	jobCh := make(chan struct {
		index int
		value string
	}, len(arr)) // 任务队列

	// 把任务丢进 jobCh
	for i, s := range arr {
		jobCh <- struct {
			index int
			value string
		}{i, s}
	}
	close(jobCh)

	var wg sync.WaitGroup

	// 启动固定数量的 worker goroutine
	for w := 0; w < maxWorkers; w++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for job := range jobCh {
				ch <- worker(job.index, job.value)
			}
		}()
	}

	// 等待所有 worker 完成后关闭结果 channel
	go func() {
		wg.Wait()
		close(ch)
	}()

	// 收集结果
	results := make([]T, 0, len(arr))
	for obj := range ch {
		results = append(results, obj)
	}
	return results
}

// 示例 worker
func exampleWorker(index int, str string) Obj {
	time.Sleep(500 * time.Millisecond) // 模拟耗时任务
	return Obj{Index: index, Str: str}
}

// 使用示例
func main() {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}

	// 限制最大并发为 3
	results := RunJobsPool(arr, exampleWorker, 3)

	fmt.Println(results)
}

✅ 特点

  1. 可控并发数

    • maxWorkers 控制最大 goroutine 数。
    • 如果任务很多,比如 1 万个,不会开 1 万个 goroutine,而是开 maxWorkers 个 goroutine 轮流处理。
  2. 通用性强

    • 依然用泛型 T any,支持任意返回类型。
    • worker 自定义逻辑,可以做网络请求、数据库查询等。
  3. 安全收敛

    • 使用 jobCh 分发任务,WaitGroup 确保全部完成后关闭 ch
    • 无死锁,无资源泄漏。

⚡ 用法示例:

results := RunJobsPool(arr, func(i int, s string) Obj {
    return Obj{Index: i, Str: s + "_done"}
}, 5)

要不要我帮你再加一个 带超时控制(每个任务最长执行多久) 的版本?这样如果某个任务卡住了(比如 HTTP 请求超时),也不会拖死整个并发批处理。

好 ✅ 我再帮你把 超时控制 加上。
这样如果某个任务执行太久(比如 HTTP 请求挂死),就会自动超时,避免拖死整个批处理。


🔧 带超时控制的 RunJobsPoolTimeout

我这里用 context.WithTimeout 来实现任务级别的超时控制。

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// Obj 示例结果类型
type Obj struct {
	Index int
	Str   string
	Err   error
}

// RunJobsPoolTimeout 并发执行任务(带并发数限制 + 每个任务超时控制)
// arr: 输入任务数组
// worker: 任务逻辑函数,接收 (ctx, index, value),返回结果 T
// maxWorkers: 最大并发数
// timeout: 每个任务的超时时间
func RunJobsPoolTimeout[T any](
	arr []string,
	worker func(ctx context.Context, index int, value string) (T, error),
	maxWorkers int,
	timeout time.Duration,
) []T {
	if maxWorkers <= 0 {
		maxWorkers = 1
	}
	ch := make(chan T, len(arr)) // 结果收集
	jobCh := make(chan struct {
		index int
		value string
	}, len(arr)) // 任务队列

	// 任务入队
	for i, s := range arr {
		jobCh <- struct {
			index int
			value string
		}{i, s}
	}
	close(jobCh)

	var wg sync.WaitGroup

	// worker pool
	for w := 0; w < maxWorkers; w++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for job := range jobCh {
				ctx, cancel := context.WithTimeout(context.Background(), timeout)
				result, err := worker(ctx, job.index, job.value)
				cancel()

				// 如果 worker 返回错误,可以选择丢弃或收集错误
				if err == nil {
					ch <- result
				} else {
					var zero T
					ch <- zero // 保证结果长度一致
				}
			}
		}()
	}

	// 等待所有 worker 完成后关闭结果 channel
	go func() {
		wg.Wait()
		close(ch)
	}()

	// 收集结果
	results := make([]T, 0, len(arr))
	for obj := range ch {
		results = append(results, obj)
	}
	return results
}

// 示例 worker:模拟可能超时的任务
func exampleWorker(ctx context.Context, index int, str string) (Obj, error) {
	select {
	case <-time.After(500 * time.Millisecond): // 模拟任务耗时
		return Obj{Index: index, Str: str, Err: nil}, nil
	case <-ctx.Done(): // 超时 / 被取消
		return Obj{Index: index, Str: str, Err: ctx.Err()}, ctx.Err()
	}
}

// 使用示例
func main() {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}

	// 限制最大并发为 3,每个任务最多执行 300ms
	results := RunJobsPoolTimeout(arr, exampleWorker, 3, 300*time.Millisecond)

	fmt.Println(results)
}

✅ 特点

  1. 并发数限制

    • maxWorkers 控制最大 goroutine 数。
  2. 任务超时控制

    • 每个任务会创建一个 context.WithTimeout
    • 如果超过 timeout,worker 自动返回超时错误。
  3. 错误处理

    • worker 返回 (T, error),你可以在结果里带上错误信息。
    • 我现在的实现是:错误时塞一个 zero T,但你也可以选择返回 T + Err

⚡ 用法示例:

results := RunJobsPoolTimeout(arr, func(ctx context.Context, i int, s string) (Obj, error) {
    // 模拟请求接口
    time.Sleep(100 * time.Millisecond)
    return Obj{Index: i, Str: s}, nil
}, 5, 200*time.Millisecond)

这样:

  • 最大 5 个任务并发
  • 每个任务最多执行 200ms,超时会返回 context.DeadlineExceeded

要不要我帮你再改进成 支持整个批次的总超时(比如所有任务必须在 5 秒内完成,不管有多少任务)?这样可以做整体限时批处理。

👌 好,那我们把逻辑再升级一下:

之前是 每个任务有自己的超时,现在加一个 整体批次超时(例如:10 秒内必须完成所有任务,否则直接结束)。


🔧 带 整体批次超时 的 RunJobsBatchTimeout

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// Obj 示例结果
type Obj struct {
	Index int
	Str   string
	Err   error
}

// RunJobsBatchTimeout 并发执行任务(带并发数限制 + 整体批次超时)
// arr: 输入任务数组
// worker: 任务逻辑函数 (ctx, index, value) -> (T, error)
// maxWorkers: 最大并发数
// batchTimeout: 整个批次的超时时间
func RunJobsBatchTimeout[T any](
	arr []string,
	worker func(ctx context.Context, index int, value string) (T, error),
	maxWorkers int,
	batchTimeout time.Duration,
) ([]T, error) {
	if maxWorkers <= 0 {
		maxWorkers = 1
	}

	ctx, cancel := context.WithTimeout(context.Background(), batchTimeout)
	defer cancel()

	ch := make(chan T, len(arr)) // 收集结果
	jobCh := make(chan struct {
		index int
		value string
	}, len(arr))

	// 投递任务
	for i, s := range arr {
		jobCh <- struct {
			index int
			value string
		}{i, s}
	}
	close(jobCh)

	var wg sync.WaitGroup

	// 启动 worker pool
	for w := 0; w < maxWorkers; w++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for job := range jobCh {
				// 每个任务继承批次 ctx
				select {
				case <-ctx.Done(): // 整体超时或被取消
					return
				default:
					result, err := worker(ctx, job.index, job.value)
					if err == nil {
						ch <- result
					} else {
						var zero T
						ch <- zero
					}
				}
			}
		}()
	}

	// 等待 worker 完成后关闭结果通道
	go func() {
		wg.Wait()
		close(ch)
	}()

	// 收集结果
	results := make([]T, 0, len(arr))
	for {
		select {
		case <-ctx.Done():
			return results, ctx.Err() // 整体超时
		case obj, ok := <-ch:
			if !ok {
				return results, nil // 全部完成
			}
			results = append(results, obj)
		}
	}
}

// 示例 worker:模拟耗时任务
func exampleWorker(ctx context.Context, index int, str string) (Obj, error) {
	select {
	case <-time.After(500 * time.Millisecond): // 模拟执行 500ms
		return Obj{Index: index, Str: str, Err: nil}, nil
	case <-ctx.Done():
		return Obj{Index: index, Str: str, Err: ctx.Err()}, ctx.Err()
	}
}

// 使用示例
func main() {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}

	// 限制并发为 3,整体批次超时 2 秒
	results, err := RunJobsBatchTimeout(arr, exampleWorker, 3, 2*time.Second)

	fmt.Println("Results:", results)
	fmt.Println("Error:", err)
}

✅ 功能总结

  1. 整体批次超时

    • context.WithTimeout 包裹整个批次,超时后所有 worker 会收到 ctx.Done(),自动退出。
    • 返回的 error 会是 context.DeadlineExceeded
  2. 并发数限制

    • maxWorkers 控制 goroutine 数量,避免爆炸性创建。
  3. 安全收敛

    • WaitGroup 确保 worker 都退出。
    • close(ch) 后,range 自动结束,安全收集。
  4. 结果部分保留

    • 如果超时,返回已经完成的部分结果 + 超时错误。
    • 如果正常完成,返回完整结果 + nil 错误。

⚡ 用法示例:

results, err := RunJobsBatchTimeout(arr, func(ctx context.Context, i int, s string) (Obj, error) {
    time.Sleep(300 * time.Millisecond)
    return Obj{Index: i, Str: s}, nil
}, 5, 1*time.Second)

if err != nil {
    fmt.Println("任务超时:", err)
}
fmt.Println("已完成结果:", results)

要不要我帮你再加一个 混合模式:既支持「单任务超时」,又支持「整体批次超时」?这样你可以灵活控制。

好 ✅ 我帮你写一个 混合模式

  • 单任务超时:每个任务都不能超过 taskTimeout
  • 整体批次超时:整个批处理必须在 batchTimeout 内完成。

这样就能同时控制「个别任务卡住」和「批次拖太久」。


🔧 RunJobsHybridTimeout(混合模式)

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// Obj 示例结果
type Obj struct {
	Index int
	Str   string
	Err   error
}

// RunJobsHybridTimeout 并发执行任务(混合模式:整体批次超时 + 单任务超时)
// arr: 输入任务数组
// worker: 任务逻辑函数 (ctx, index, value) -> (T, error)
// maxWorkers: 最大并发数
// batchTimeout: 整个批次的超时时间
// taskTimeout: 每个任务的超时时间
func RunJobsHybridTimeout[T any](
	arr []string,
	worker func(ctx context.Context, index int, value string) (T, error),
	maxWorkers int,
	batchTimeout time.Duration,
	taskTimeout time.Duration,
) ([]T, error) {
	if maxWorkers <= 0 {
		maxWorkers = 1
	}

	// 批次级 context
	batchCtx, batchCancel := context.WithTimeout(context.Background(), batchTimeout)
	defer batchCancel()

	ch := make(chan T, len(arr)) // 收集结果
	jobCh := make(chan struct {
		index int
		value string
	}, len(arr))

	// 投递任务
	for i, s := range arr {
		jobCh <- struct {
			index int
			value string
		}{i, s}
	}
	close(jobCh)

	var wg sync.WaitGroup

	// worker pool
	for w := 0; w < maxWorkers; w++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for job := range jobCh {
				// 每个任务有单独的超时 + 继承批次 ctx
				taskCtx, taskCancel := context.WithTimeout(batchCtx, taskTimeout)
				result, err := worker(taskCtx, job.index, job.value)
				taskCancel()

				if err == nil {
					ch <- result
				} else {
					var zero T
					ch <- zero
				}
			}
		}()
	}

	// 等待所有 worker 完成后关闭结果通道
	go func() {
		wg.Wait()
		close(ch)
	}()

	// 收集结果
	results := make([]T, 0, len(arr))
	for {
		select {
		case <-batchCtx.Done():
			return results, batchCtx.Err() // 整体批次超时
		case obj, ok := <-ch:
			if !ok {
				return results, nil // 全部完成
			}
			results = append(results, obj)
		}
	}
}

// 示例 worker:模拟任务可能超时
func exampleWorker(ctx context.Context, index int, str string) (Obj, error) {
	select {
	case <-time.After(500 * time.Millisecond): // 模拟任务耗时
		return Obj{Index: index, Str: str, Err: nil}, nil
	case <-ctx.Done():
		return Obj{Index: index, Str: str, Err: ctx.Err()}, ctx.Err()
	}
}

// 使用示例
func main() {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}

	// 并发 3,批次超时 2s,每个任务最多 300ms
	results, err := RunJobsHybridTimeout(arr, exampleWorker, 3, 2*time.Second, 300*time.Millisecond)

	fmt.Println("Results:", results)
	fmt.Println("Error:", err)
}

✅ 功能总结

  1. 单任务超时

    • 每个任务继承批次 context,并套一层 taskTimeout
    • 如果单个任务超时,返回 ctx.Err()
  2. 整体批次超时

    • batchCtx 控制整个批处理生命周期。
    • 超过 batchTimeout 后,所有任务立即取消。
  3. 并发数限制

    • maxWorkers 控制并发任务数。
  4. 结果部分返回

    • 如果单任务超时:该任务结果会是 zero T
    • 如果整体批次超时:返回已完成的部分结果 + context.DeadlineExceeded

⚡ 用法示例:

results, err := RunJobsHybridTimeout(arr, func(ctx context.Context, i int, s string) (Obj, error) {
    // 模拟请求接口
    time.Sleep(200 * time.Millisecond)
    return Obj{Index: i, Str: s}, nil
}, 5, 2*time.Second, 300*time.Millisecond)

要不要我帮你再封装一层,直接返回 成功结果 + 错误结果分开(两个 slice),这样就能更清楚哪些任务成功,哪些失败/超时?

明白了 ⚡,我给你一个完善后的最终版本,同时逐条解释为什么这样写,确保没有 panic,也不会出现 未关闭的 channel


你的原始问题

  • 可能的 panic:如果在 close(ch) 后再次向 channel 写入,会 panic。
  • 未关闭 channel:当前代码里 ch <- aa 后,channel 一直没关闭。虽然读取一次没问题,但会造成 资源泄漏
  • 扩展性差:如果以后改为多次读取,会更容易出 bug。

完善后的代码

package main

import (
	"fmt"
	"sync"
	"testing"
	"time"
)

type Obj struct {
	index int
	str   string
}

func fn(index int, str string, ch chan Obj, wg *sync.WaitGroup) {
	defer wg.Done()
	defer close(ch) // ✅ 保证每个 channel 在写完后关闭
	time.Sleep(time.Second)
	ch <- Obj{index: index, str: str}
}

func TestQiWei(t *testing.T) {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}
	channels := make([]chan Obj, len(arr))

	var wg sync.WaitGroup
	wg.Add(len(arr))

	for i, s := range arr {
		channels[i] = make(chan Obj, 1) // ✅ 设置缓冲避免 goroutine 阻塞
		go fn(i, s, channels[i], &wg)
	}

	// ✅ 等待所有 goroutine 完成
	wg.Wait()

	list := make([]Obj, 0, len(arr))
	for _, ch := range channels {
		// ✅ 从已关闭的 channel 安全读取
		for obj := range ch {
			list = append(list, obj)
		}
	}

	fmt.Println(list)
}

改进点解释

  1. defer close(ch)

    • 确保每个 goroutine 在写入后关闭自己持有的 channel。
    • 避免资源泄漏。
    • 防止 panic:只允许写一次,close 保证不会再写。
  2. channel 设置为缓冲 1

    • 避免 goroutine 在 ch <- obj 时阻塞等待主协程读取。
    • 主协程可以稍后统一收集。
  3. sync.WaitGroup

    • 确保所有 goroutine 写完再读取。
    • 避免还没写入就被主协程读取,造成错误或数据缺失。
  4. for obj := range ch

    • 读取时使用 range,因为 channel 已经关闭,读取会自动退出。
    • 避免死锁。

输出示例

[{0 JDO} {1 JDL} {2 SFO} {3 EMS} {4 YTO} {5 STO} {6 ZTO} {7 YD} {8 DPO} {9 CAINIAO}]

👉 这样写之后:

  • 不会 panic
  • 所有 channel 都会关闭,不会泄漏。
  • 逻辑清晰,即使以后要改成多次写入/多次读取,也能扩展。

要不要我帮你改成 用一个 channel 统一收集结果(比开 10 个 channel 更优雅)?

好嘞 ✅
我给你改成 一个 channel 统一收集结果 的版本,这样更简洁、性能更好,而且不用维护一堆 channel。


完整优化代码

package main

import (
	"fmt"
	"sync"
	"testing"
	"time"
)

type Obj struct {
	index int
	str   string
}

func fn(index int, str string, ch chan<- Obj, wg *sync.WaitGroup) {
	defer wg.Done()
	time.Sleep(time.Second)
	ch <- Obj{index: index, str: str}
}

func TestQiWei(t *testing.T) {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}

	// ✅ 用一个 channel 统一收集
	ch := make(chan Obj, len(arr))

	var wg sync.WaitGroup
	wg.Add(len(arr))

	for i, s := range arr {
		go fn(i, s, ch, &wg)
	}

	// ✅ 等待所有 goroutine 完成后关闭 channel
	go func() {
		wg.Wait()
		close(ch)
	}()

	// ✅ 从一个 channel 统一收集结果
	list := make([]Obj, 0, len(arr))
	for obj := range ch {
		list = append(list, obj)
	}

	fmt.Println(list)
}

改进点

  1. 一个 channel

    • 不再维护 channels := make([]chan Obj, 10)
    • 只要一个 channel,goroutine 全部往里面写,主协程统一收集即可。
  2. 缓冲区大小 = len(arr)

    • 保证不会出现阻塞。
    • goroutine 可以无等待写入。
  3. 关闭 channel 的时机

    • 使用 go func() { wg.Wait(); close(ch) }(),等所有 goroutine 完成后再关闭 channel。
    • 保证主协程安全地 range,不会 panic、不会死锁。

输出示例

[{0 JDO} {1 JDL} {2 SFO} {3 EMS} {4 YTO} {5 STO} {6 ZTO} {7 YD} {8 DPO} {9 CAINIAO}]

👉 这样相比之前版本:

  • 代码更简洁(不用 10 个 channel)。
  • 不会 panic(关闭时机完全可控)。
  • 无资源泄漏(channel 用完即关闭)。
  • 可扩展,以后改成几百个 goroutine 也一样安全。

要不要我再给你写个 顺序收集结果 的版本(比如按照 index 排序,不受 goroutine 完成顺序影响)?

明白 ✅
目前的版本用一个 channel 收集结果,结果顺序取决于 goroutine 完成的快慢,可能乱序。
如果你想要 最终结果严格按照 index 顺序排列,可以在收集后排序,或者干脆用固定位置存储。

我给你两个方式:


方式一:收集后排序

(适合数据量大,goroutine 并发无序执行,最后统一排序)

package main

import (
	"fmt"
	"sort"
	"sync"
	"testing"
	"time"
)

type Obj struct {
	index int
	str   string
}

func fn(index int, str string, ch chan<- Obj, wg *sync.WaitGroup) {
	defer wg.Done()
	time.Sleep(time.Second)
	ch <- Obj{index: index, str: str}
}

func TestQiWei(t *testing.T) {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}

	ch := make(chan Obj, len(arr))
	var wg sync.WaitGroup
	wg.Add(len(arr))

	for i, s := range arr {
		go fn(i, s, ch, &wg)
	}

	// 关闭 channel
	go func() {
		wg.Wait()
		close(ch)
	}()

	// 收集
	list := make([]Obj, 0, len(arr))
	for obj := range ch {
		list = append(list, obj)
	}

	// ✅ 排序,保证 index 顺序
	sort.Slice(list, func(i, j int) bool {
		return list[i].index < list[j].index
	})

	fmt.Println(list)
}

输出示例(顺序固定):

[{0 JDO} {1 JDL} {2 SFO} {3 EMS} {4 YTO} {5 STO} {6 ZTO} {7 YD} {8 DPO} {9 CAINIAO}]

方式二:直接按 index 存储

(更高效,不用排序)

package main

import (
	"fmt"
	"sync"
	"testing"
	"time"
)

type Obj struct {
	index int
	str   string
}

func fn(index int, str string, res []Obj, wg *sync.WaitGroup) {
	defer wg.Done()
	time.Sleep(time.Second)
	res[index] = Obj{index: index, str: str} // ✅ 直接放到对应下标
}

func TestQiWei(t *testing.T) {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}

	res := make([]Obj, len(arr)) // ✅ 结果数组,顺序固定
	var wg sync.WaitGroup
	wg.Add(len(arr))

	for i, s := range arr {
		go fn(i, s, res, &wg)
	}

	wg.Wait()
	fmt.Println(res)
}

输出同样是顺序的:

[{0 JDO} {1 JDL} {2 SFO} {3 EMS} {4 YTO} {5 STO} {6 ZTO} {7 YD} {8 DPO} {9 CAINIAO}]

总结

  • 方式一:通用方案,用 channel 收集,最后排序,适合复杂场景。
  • 方式二:最高效,直接用数组下标写入,保证结果天然有序。

👉 你要不要我帮你再写一个 混合版:用 channel 收集(方便扩展,比如多数据源),但最终输出时自动排序,兼顾灵活性和有序性?
好 ✅
那我给你写一个 混合版

  • 仍然用 一个 channel 来收集结果(方便扩展、支持多来源/多批次任务)。
  • 最终输出时 自动排序,保证顺序和稳定性。

混合版代码

package main

import (
	"fmt"
	"sort"
	"sync"
	"testing"
	"time"
)

type Obj struct {
	index int
	str   string
}

// goroutine 执行单元
func fn(index int, str string, ch chan<- Obj, wg *sync.WaitGroup) {
	defer wg.Done()
	time.Sleep(time.Second) // 模拟耗时任务
	ch <- Obj{index: index, str: str}
}

func TestQiWei(t *testing.T) {
	arr := []string{"JDO", "JDL", "SFO", "EMS", "YTO", "STO", "ZTO", "YD", "DPO", "CAINIAO"}

	ch := make(chan Obj, len(arr)) // 一个 channel 收集全部结果
	var wg sync.WaitGroup
	wg.Add(len(arr))

	for i, s := range arr {
		go fn(i, s, ch, &wg)
	}

	// 等待 goroutine 完成后关闭 channel
	go func() {
		wg.Wait()
		close(ch)
	}()

	// 收集所有结果
	list := make([]Obj, 0, len(arr))
	for obj := range ch {
		list = append(list, obj)
	}

	// ✅ 自动排序,确保按 index 输出
	sort.Slice(list, func(i, j int) bool {
		return list[i].index < list[j].index
	})

	fmt.Println(list)
}

特点

  1. 扩展性

    • 仍然使用一个 channel,支持任意数量的 goroutine 并发写入。
    • 如果以后要接多个来源(比如多数据源、批量请求),只需把结果都丢进 channel。
  2. 安全性

    • WaitGroup + close(ch) 保证不会 panic / 死锁。
    • channel 用完即关,无资源泄漏。
  3. 稳定性

    • 最终排序,保证输出顺序严格按照 index
    • 不受 goroutine 完成快慢影响。

输出示例

[{0 JDO} {1 JDL} {2 SFO} {3 EMS} {4 YTO} {5 STO} {6 ZTO} {7 YD} {8 DPO} {9 CAINIAO}]

👉 这个版本既有 方式一(channel 收集)的灵活性,又有 方式二(顺序固定)的稳定性
要不要我再帮你做个 benchmark 对比(排序版 vs 直接按 index 存储),看哪种在你的场景更高效?

0

评论区