跳到主要内容

消息队列技术

消息队列是分布式系统中重要的中间件,用于实现系统解耦、异步处理和削峰填谷。

🎯 学习目标

掌握主流消息队列技术的原理、特性和应用场景,学会构建可靠的异步消息系统。

消息队列的价值

系统解耦

  • 服务独立 - 生产者和消费者独立部署
  • 技术栈无关 - 不同语言和框架间通信
  • 故障隔离 - 单个服务故障不影响整体

异步处理

  • 响应速度 - 提升用户体验
  • 并发处理 - 提高系统吞吐量
  • 批量处理 - 优化资源利用

🚀 主流消息队列

RabbitMQ

import pika

# 建立连接
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# 声明队列
channel.queue_declare(queue='order_queue', durable=True)

# 发送消息
channel.basic_publish(
exchange='',
routing_key='order_queue',
body='Order processed',
properties=pika.BasicProperties(delivery_mode=2) # 持久化
)

print("消息已发送")
connection.close()

Apache Kafka

// Kafka生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

ProducerRecord<String, String> record =
new ProducerRecord<>("user-events", "user-123", "login");

producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("消息发送成功: " + metadata.offset());
}
});

producer.close();

📊 消息模式

点对点模式

# RabbitMQ工作队列配置
queue:
name: task_queue
durable: true
exclusive: false
auto_delete: false

consumer:
prefetch_count: 1 # 公平分发
ack_mode: manual # 手动确认

发布订阅模式

// Redis发布订阅
const redis = require('redis');
const publisher = redis.createClient();
const subscriber = redis.createClient();

// 发布消息
publisher.publish('news', JSON.stringify({
title: '系统维护通知',
content: '今晚22:00-24:00系统维护',
timestamp: Date.now()
}));

// 订阅消息
subscriber.subscribe('news');
subscriber.on('message', (channel, message) => {
const data = JSON.parse(message);
console.log(`收到${channel}频道消息:`, data);
});

🏗️ 高可用架构

Kafka集群部署

# Kafka集群配置
# server-1.properties
broker.id=1
listeners=PLAINTEXT://kafka1:9092
log.dirs=/var/kafka-logs
num.partitions=3
default.replication.factor=3

# server-2.properties
broker.id=2
listeners=PLAINTEXT://kafka2:9092
log.dirs=/var/kafka-logs

# server-3.properties
broker.id=3
listeners=PLAINTEXT://kafka3:9092
log.dirs=/var/kafka-logs

RabbitMQ集群

# 启动RabbitMQ集群
# 节点1
rabbitmq-server -detached

# 节点2
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

# 节点3
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

# 查看集群状态
rabbitmqctl cluster_status

🔍 监控和运维

Kafka监控

# 查看主题信息
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic user-events

# 查看消费者组状态
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group user-service

# 性能测试
kafka-producer-perf-test.sh --topic test --num-records 100000 --record-size 1024 --throughput 10000 --producer-props bootstrap.servers=localhost:9092

下一步: 深入学习消息队列性能优化、事务消息和流处理技术。