消息队列
消息队列是微服务架构中实现异步通信和解耦的重要组件,它允许服务之间通过消息进行通信,提高系统的可扩展性和可靠性。
📋 学习目标
完成本教程后,你将能够:
- 理解消息队列的概念和优势
- 使用 RabbitMQ 实现消息队列
- 使用 Kafka 实现消息队列
- 实现消息的生产和消费
- 处理消息的持久化和可靠性
- 实现消息的路由和过滤
🎯 消息队列简介
什么是消息队列
消息队列是一种异步通信机制,允许服务之间通过消息进行通信,而无需直接调用。
为什么需要消息队列
优势:
- 解耦:服务之间松耦合
- 异步:提高系统响应速度
- 削峰:处理流量峰值
- 可靠性:消息持久化和重试
- 扩展性:易于水平扩展
消息队列架构
生产者 → 消息队列 → 消费者
↓ ↓ ↓
发送消息 存储消息 处理消息1
2
3
2
3
🔧 RabbitMQ 集成
什么是 RabbitMQ
RabbitMQ 是一个开源的消息代理软件,实现了 AMQP 协议。
安装 RabbitMQ
使用 Docker 运行 RabbitMQ:
bash
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=guest \
rabbitmq:3-management1
2
3
4
5
6
2
3
4
5
6
访问管理界面:http://localhost:15672
Go 客户端集成
1. 安装依赖
bash
go get github.com/rabbitmq/amqp091-go1
2. 生产者示例
go
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("连接 RabbitMQ 失败: %v", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("创建通道失败: %v", err)
}
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"hello", // 队列名称
false, // 持久化
false, // 自动删除
false, // 排他
false, // 无等待
nil, // 参数
)
if err != nil {
log.Fatalf("声明队列失败: %v", err)
}
// 发送消息
body := "Hello World!"
err = ch.Publish(
"", // 交换机
q.Name, // 路由键
false, // 强制
false, // 立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
log.Fatalf("发送消息失败: %v", err)
}
log.Printf("发送消息: %s", body)
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
3. 消费者示例
go
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("连接 RabbitMQ 失败: %v", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("创建通道失败: %v", err)
}
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"hello", // 队列名称
false, // 持久化
false, // 自动删除
false, // 排他
false, // 无等待
nil, // 参数
)
if err != nil {
log.Fatalf("声明队列失败: %v", err)
}
// 消费消息
msgs, err := ch.Consume(
q.Name, // 队列
"", // 消费者标签
true, // 自动确认
false, // 排他
false, // 无本地
false, // 无等待
nil, // 参数
)
if err != nil {
log.Fatalf("注册消费者失败: %v", err)
}
log.Println("等待消息...")
for d := range msgs {
log.Printf("收到消息: %s", d.Body)
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
🔧 Kafka 集成
什么是 Kafka
Kafka 是 Apache 开源的分布式流处理平台,用于构建实时数据管道和流应用。
安装 Kafka
使用 Docker Compose 运行 Kafka:
yaml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:90921
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Go 客户端集成
1. 安装依赖
bash
go get github.com/IBM/sarama1
2. 生产者示例
go
package main
import (
"log"
"github.com/IBM/sarama"
)
func main() {
// 配置生产者
config := sarama.NewConfig()
config.Producer.Return.Successes = true
// 创建生产者
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalf("创建生产者失败: %v", err)
}
defer producer.Close()
// 发送消息
msg := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("Hello Kafka!"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Fatalf("发送消息失败: %v", err)
}
log.Printf("消息已发送: partition=%d, offset=%d", partition, offset)
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
3. 消费者示例
go
package main
import (
"log"
"github.com/IBM/sarama"
)
func main() {
// 配置消费者
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
// 创建消费者
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalf("创建消费者失败: %v", err)
}
defer consumer.Close()
// 创建分区消费者
partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, sarama.OffsetNewest)
if err != nil {
log.Fatalf("创建分区消费者失败: %v", err)
}
defer partitionConsumer.Close()
log.Println("等待消息...")
for {
select {
case msg := <-partitionConsumer.Messages():
log.Printf("收到消息: topic=%s, partition=%d, offset=%d, value=%s",
msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
case err := <-partitionConsumer.Errors():
log.Printf("消费错误: %v", err)
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
💡 最佳实践
1. 消息持久化
go
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 持久化消息
ContentType: "text/plain",
Body: []byte(body),
}1
2
3
4
5
2
3
4
5
2. 消息确认
go
// 手动确认
ch.Consume(
q.Name,
"",
false, // 手动确认
false,
false,
false,
nil,
)
// 处理消息后确认
d.Ack(false)1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
3. 消息重试
go
// 失败后重新入队
d.Nack(false, true)1
2
2
🚀 总结
消息队列是微服务架构中实现异步通信的重要组件,通过 RabbitMQ 或 Kafka,我们可以:
- 解耦服务:服务之间松耦合
- 提高性能:异步处理提高响应速度
- 增强可靠性:消息持久化和重试
- 扩展系统:易于水平扩展
📚 扩展阅读
💻 代码示例
完整的代码示例请参考:
示例包括:
- RabbitMQ 生产者和消费者
- Kafka 生产者和消费者
- 消息确认和重试机制
⏭️ 下一步
- 服务网格 - 学习服务网格
🎉 恭喜! 你已经掌握了消息队列的基础知识。继续学习下一个主题,构建更强大的微服务系统!
