在 go 中,一个 channel 无法被多个 goroutine 同时“接收”同一消息;默认行为是竞争式消费。要实现“一个事件通知所有监听者”,需通过 fan-out 模式手动广播——即从源 channel 读取一次,再分别写入多个目标 channel。
Go 的 channel 是点对点通信原语,不具备内置广播能力。当你将同一个 incoming channel 同时传给 processEmail 和 processPagerDuty,两个 goroutine 实际上在竞争接收——每次仅有一个能成功读到事件,这正是你观察到“只有第一个 goroutine 收到事件”的根本原因。
要实现真正的“一对多”事件分发(即每个监听者都收到同一份事件副本),必须引入显式广播逻辑。推荐采用经典的 fan-out 模式:由一个中央分发 goroutine 从源 channel 读取事件,然后并发地、独立地将该事件发送至多个专用 consumer channel。以下是改造后的完整、可运行示例:
package main
import (
"fmt"
"time"
)
type Event struct {
Host string
Command string
Output string
}
// 全局事件源(只供写入)
var incoming = make(chan Event, 10)
// 各服务专属接收 channel(缓冲避免阻塞分发器)
var (
emailChan = make(chan Event, 10)
pagerDutyChan = make(chan Event, 10)
)
// 【关键】广播分发器:读取一次,发给所有订阅者
func broadcast() {
for e := range incoming {
// 并发发送,确保各 consumer 独立接收(不相互阻塞)
go func(event Event) {
select {
case emailChan <- event:
default:
fmt.Println("⚠️ emailChan full, dropped event")
}
}(e)
go func(event Event) {
select {
case pagerDutyChan <- event:
default:
fmt.Println("⚠️ pagerDutyChan full, dropped event")
}
}(e)
}
}
func processEmail(ticker *time.Ticker) {
for {
select {
case t := <-ticker.C:
fmt.Println("? Email Tick at", t)
case e := <-emailChan:
fmt.Println("? EMAIL GOT AN EVENT!")
fmt.Printf("%+v\n", e)
}
}
}
func processPagerDuty(ticker *time.Ticker) {
for {
select {
case t := <-ticker.C:
fmt.
Println("? PagerDuty Tick at", t)
case e := <-pagerDutyChan:
fmt.Println("? PAGERDUTY GOT AN EVENT!")
fmt.Printf("%+v\n", e)
}
}
}
func eventAdd() {
e := Event{
Host: "web01-east.domain.com",
Command: "foo",
Output: "bar",
}
incoming <- e // 写入源 channel,触发广播
}
func main() {
// 启动广播器(必须在任何写入前启动)
go broadcast()
// 启动各处理器
emailTicker := time.NewTicker(10 * time.Second)
go processEmail(emailTicker)
pdTicker := time.NewTicker(1 * time.Second)
go processPagerDuty(pdTicker)
// 模拟 API 调用
time.AfterFunc(2*time.Second, eventAdd)
time.AfterFunc(5*time.Second, eventAdd)
// 保持主 goroutine 运行
select {}
}✅ 关键设计要点说明:
⚠️ 注意事项:
通过此模式,你既能保持 Go channel 的简洁性,又能精准实现事件广播语义——每个监听者都获得完整、独立的事件副本,真正达成“一个事件,多方响应”。