消息队列技术
消息队列是分布式系统中重要的中间件,用于实现系统解耦、异步处理和削峰填谷。
🎯 学习目标
掌握主流消息队列技术的原理、特性和应用场景,学会构建可靠的异步消息系统。
消息队列的价值
系统解耦
- 服务独立 - 生产者和消费者独立部署
- 技术栈无关 - 不同语言和框架间通信
- 故障隔离 - 单个服务故障不影响整体
异步处理
- 响应速度 - 提升用户体验
- 并发处理 - 提高系统吞吐量
- 批量处理 - 优化资源利用
🚀 主流消息队列
RabbitMQ
import pika
# 建立连接
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='order_queue', durable=True)
# 发送消息
channel.basic_publish(
exchange='',
routing_key='order_queue',
body='Order processed',
properties=pika.BasicProperties(delivery_mode=2) # 持久化
)
print("消息已发送")
connection.close()
Apache Kafka
// Kafka生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =
new ProducerRecord<>("user-events", "user-123", "login");
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("消息发送成功: " + metadata.offset());
}
});
producer.close();
📊 消息模式
点对点模式
# RabbitMQ工作队列配置
queue:
name: task_queue
durable: true
exclusive: false
auto_delete: false
consumer:
prefetch_count: 1 # 公平分发
ack_mode: manual # 手动确认
发布订阅模式
// Redis发布订阅
const redis = require('redis');
const publisher = redis.createClient();
const subscriber = redis.createClient();
// 发布消息
publisher.publish('news', JSON.stringify({
title: '系统维护通知',
content: '今晚22:00-24:00系统维护',
timestamp: Date.now()
}));
// 订阅消息
subscriber.subscribe('news');
subscriber.on('message', (channel, message) => {
const data = JSON.parse(message);
console.log(`收到${channel}频道消息:`, data);
});
🏗️ 高可用架构
Kafka集群部署
# Kafka集群配置
# server-1.properties
broker.id=1
listeners=PLAINTEXT://kafka1:9092
log.dirs=/var/kafka-logs
num.partitions=3
default.replication.factor=3
# server-2.properties
broker.id=2
listeners=PLAINTEXT://kafka2:9092
log.dirs=/var/kafka-logs
# server-3.properties
broker.id=3
listeners=PLAINTEXT://kafka3:9092
log.dirs=/var/kafka-logs
RabbitMQ集群
# 启动RabbitMQ集群
# 节点1
rabbitmq-server -detached
# 节点2
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# 节点3
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# 查看集群状态
rabbitmqctl cluster_status
🔍 监控和运维
Kafka监控
# 查看主题信息
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic user-events
# 查看消费者组状态
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group user-service
# 性能测试
kafka-producer-perf-test.sh --topic test --num-records 100000 --record-size 1024 --throughput 10000 --producer-props bootstrap.servers=localhost:9092
下一步: 深入学习消息队列性能优化、事务消息和流处理技术。