浮头导航网

专注编程技术分享的开发者社区

ASP.NET Core 中使用 RabbitMQ 的基础指南

什么是 RabbitMQ?

RabbitMQ 是一个开源的消息代理。它支持 AMQP 协议,用于在分布式系统中发送和接收消息。它通过队列管理消息,支持多种通信方式(如点对点、发布/订阅)。它常用于微服务之间传递数据。

为什么在 ASP.NET Core 中使用 RabbitMQ?

在 ASP.NET Core 中使用 RabbitMQ 有以下好处

  • o 异步通信:服务之间可以解耦,减少依赖。
  • o 高可用性:即使消费者暂时下线,消息也不会丢失。
  • o 可扩展性强:适合处理大量并发任务。
  • o 平台无关:可在多个系统中运行,支持多种开发语言。
  • o 容错机制:消息可以保存到磁盘,防止丢失。

常见用途包括

  • o 微服务间的数据同步
  • o 后台任务(如发邮件、写日志)
  • o 分布式系统的事件通知

安装 RabbitMQ

本地安装步骤

  1. 1. 前往 RabbitMQ 官网 下载安装包。
  2. 2. 确保已安装 Erlang,因为 RabbitMQ 需要它运行。
  3. 3. 默认的管理界面地址是 http://localhost:15672,默认用户名和密码为 guest/guest

使用 Docker 安装(推荐)

docker run -d --hostname my-rabbit --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
  • o 5672 是 RabbitMQ 的主端口。
  • o 15672 是管理界面的端口。
  • o 使用 rabbitmq:3-management 镜像可以开启 Web 控制台。

验证是否安装成功

打开浏览器访问 http://localhost:15672,输入默认账号登录。查看是否有队列或交换机信息。

4. 创建 ASP.NET Core 项目

创建一个新项目

dotnet new webapi -n RabbitMQExample
cd RabbitMQExample

添加 RabbitMQ 客户端库

dotnet add package RabbitMQ.Client

如果想用更高级的功能,也可以安装 MassTransit,可以参考昨日发布的文章

5. 核心概念

  • o 生产者:发送消息的应用程序。
  • o 消费者:接收消息的应用程序。
  • o 队列:存储消息的地方,按顺序处理。
  • o 交换机:决定消息如何分发给队列。常用类型:
    • o Direct:精确匹配路由键。
    • o Topic:模糊匹配路由键。
    • o Fanout:广播给所有绑定队列。
    • o Headers:根据消息头来决定路由。

6. 实现 RabbitMQ

6.1 配置连接信息

appsettings.json 文件中添加如下内容:

{
  "RabbitMQ": {
    "HostName": "localhost",
    "Port": 5672,
    "UserName": "guest",
    "Password": "guest",
    "QueueName": "my-queue"
  }
}

6.2 创建连接服务

新建一个类 RabbitMQService.cs

public interface IRabbitMQService
{
    IConnection CreateConnection();
}

public class RabbitMQService : IRabbitMQService
{
    private readonly IConfiguration _config;

    public RabbitMQService(IConfiguration config)
    {
        _config = config;
    }

    public IConnection CreateConnection()
    {
        var factory = new ConnectionFactory
        {
            HostName = _config["RabbitMQ:HostName"],
            Port = int.Parse(_config["RabbitMQ:Port"]),
            UserName = _config["RabbitMQ:UserName"],
            Password = _config["RabbitMQ:Password"]
        };
        return factory.CreateConnection();
    }
}

注册服务:

builder.Services.AddSingleton<IRabbitMQService, RabbitMQService>();

6.3 发送消息(生产者)

创建一个类 ProducerService.cs

public class ProducerService
{
    private readonly IRabbitMQService _rabbitMQService;
    private readonly string _queueName;

    public ProducerService(IRabbitMQService service, IConfiguration config)
    {
        _rabbitMQService = service;
        _queueName = config["RabbitMQ:QueueName"];
    }

    public void PublishMessage(string message)
    {
        using var connection = _rabbitMQService.CreateConnection();
        using var channel = connection.CreateModel();

        channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

        var body = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(exchange: "", routingKey: _queueName, basicProperties: null, body: body);
        Console.WriteLine(#34;发送消息:{message}");
    }
}

注册服务:

builder.Services.AddSingleton<ProducerService>();

在控制器中调用:

[ApiController]
[Route("api/[controller]")]
public class MessageController : ControllerBase
{
    private readonly ProducerService _producer;

    public MessageController(ProducerService producer)
    {
        _producer = producer;
    }

    [HttpPost]
    public IActionResult Post([FromBody] string message)
    {
        _producer.PublishMessage(message);
        return Ok("消息已发送");
    }
}

6.4 接收消息(消费者)

创建一个后台服务 ConsumerService.cs

public class ConsumerService : BackgroundService
{
    private readonly IRabbitMQService _rabbitMQService;
    private readonly string _queueName;

    public ConsumerService(IRabbitMQService service, IConfiguration config)
    {
        _rabbitMQService = service;
        _queueName = config["RabbitMQ:QueueName"];
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        using var connection = _rabbitMQService.CreateConnection();
        using var channel = connection.CreateModel();

        channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine(#34;收到消息:{message}");
            channel.BasicAck(ea.DeliveryTag, false);
        };

        channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);

        await Task.CompletedTask;
    }
}

注册服务:

builder.Services.AddHostedService<ConsumerService>();


·············· END ··············


如果你觉得这篇文章对你有帮助,欢迎点赞、收藏并分享给更多开发者!让我们一起学习,共同进步!

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言