什么是 RabbitMQ?
RabbitMQ 是一个开源的消息代理。它支持 AMQP 协议,用于在分布式系统中发送和接收消息。它通过队列管理消息,支持多种通信方式(如点对点、发布/订阅)。它常用于微服务之间传递数据。
为什么在 ASP.NET Core 中使用 RabbitMQ?
在 ASP.NET Core 中使用 RabbitMQ 有以下好处
- o 异步通信:服务之间可以解耦,减少依赖。
- o 高可用性:即使消费者暂时下线,消息也不会丢失。
- o 可扩展性强:适合处理大量并发任务。
- o 平台无关:可在多个系统中运行,支持多种开发语言。
- o 容错机制:消息可以保存到磁盘,防止丢失。
常见用途包括
- o 微服务间的数据同步
- o 后台任务(如发邮件、写日志)
- o 分布式系统的事件通知
安装 RabbitMQ
本地安装步骤
- 1. 前往 RabbitMQ 官网 下载安装包。
- 2. 确保已安装 Erlang,因为 RabbitMQ 需要它运行。
- 3. 默认的管理界面地址是 http://localhost:15672,默认用户名和密码为 guest/guest。
使用 Docker 安装(推荐)
docker run -d --hostname my-rabbit --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
- o 5672 是 RabbitMQ 的主端口。
- o 15672 是管理界面的端口。
- o 使用 rabbitmq:3-management 镜像可以开启 Web 控制台。
验证是否安装成功
打开浏览器访问 http://localhost:15672,输入默认账号登录。查看是否有队列或交换机信息。
4. 创建 ASP.NET Core 项目
创建一个新项目
dotnet new webapi -n RabbitMQExample
cd RabbitMQExample
添加 RabbitMQ 客户端库
dotnet add package RabbitMQ.Client
如果想用更高级的功能,也可以安装 MassTransit,可以参考昨日发布的文章
5. 核心概念
- o 生产者:发送消息的应用程序。
- o 消费者:接收消息的应用程序。
- o 队列:存储消息的地方,按顺序处理。
- o 交换机:决定消息如何分发给队列。常用类型:
- o Direct:精确匹配路由键。
- o Topic:模糊匹配路由键。
- o Fanout:广播给所有绑定队列。
- o Headers:根据消息头来决定路由。
6. 实现 RabbitMQ
6.1 配置连接信息
在 appsettings.json 文件中添加如下内容:
{
"RabbitMQ": {
"HostName": "localhost",
"Port": 5672,
"UserName": "guest",
"Password": "guest",
"QueueName": "my-queue"
}
}
6.2 创建连接服务
新建一个类 RabbitMQService.cs:
public interface IRabbitMQService
{
IConnection CreateConnection();
}
public class RabbitMQService : IRabbitMQService
{
private readonly IConfiguration _config;
public RabbitMQService(IConfiguration config)
{
_config = config;
}
public IConnection CreateConnection()
{
var factory = new ConnectionFactory
{
HostName = _config["RabbitMQ:HostName"],
Port = int.Parse(_config["RabbitMQ:Port"]),
UserName = _config["RabbitMQ:UserName"],
Password = _config["RabbitMQ:Password"]
};
return factory.CreateConnection();
}
}
注册服务:
builder.Services.AddSingleton<IRabbitMQService, RabbitMQService>();
6.3 发送消息(生产者)
创建一个类 ProducerService.cs:
public class ProducerService
{
private readonly IRabbitMQService _rabbitMQService;
private readonly string _queueName;
public ProducerService(IRabbitMQService service, IConfiguration config)
{
_rabbitMQService = service;
_queueName = config["RabbitMQ:QueueName"];
}
public void PublishMessage(string message)
{
using var connection = _rabbitMQService.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: _queueName, basicProperties: null, body: body);
Console.WriteLine(#34;发送消息:{message}");
}
}
注册服务:
builder.Services.AddSingleton<ProducerService>();
在控制器中调用:
[ApiController]
[Route("api/[controller]")]
public class MessageController : ControllerBase
{
private readonly ProducerService _producer;
public MessageController(ProducerService producer)
{
_producer = producer;
}
[HttpPost]
public IActionResult Post([FromBody] string message)
{
_producer.PublishMessage(message);
return Ok("消息已发送");
}
}
6.4 接收消息(消费者)
创建一个后台服务 ConsumerService.cs:
public class ConsumerService : BackgroundService
{
private readonly IRabbitMQService _rabbitMQService;
private readonly string _queueName;
public ConsumerService(IRabbitMQService service, IConfiguration config)
{
_rabbitMQService = service;
_queueName = config["RabbitMQ:QueueName"];
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var connection = _rabbitMQService.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(#34;收到消息:{message}");
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);
await Task.CompletedTask;
}
}
注册服务:
builder.Services.AddHostedService<ConsumerService>();
·············· END ··············
如果你觉得这篇文章对你有帮助,欢迎点赞、收藏并分享给更多开发者!让我们一起学习,共同进步!