C#队列学习笔记:RabbitMQ优先级队列

一、引言

在具体业务中可能会遇到一些要提前处理的消息,比如普通客户的消息按先进先出的顺序处理,Vip客户的消息要提前处理。在RabbitMQ中,消息优先级的实现方式是:在声明queue时设置队列的x-max-priority属性,然后在publish消息时,设置消息的优先级即可。

RabbitMQ优先级队列注意事项:

1)RabbitMQ3.5以后才支持优先级队列。

2)只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效。

3)优先级取值范围在0~9之间,数值越大则优先级越高。

二、示例

2.1、发送端(生产端)

新建一个控制台项目Send,并添加一个类RabbitMQConfig。

class RabbitMQConfig
    {
        public static string Host { get; set; }

        public static string VirtualHost { get; set; }

        public static string UserName { get; set; }

        public static string Password { get; set; }

        public static int Port { get; set; }

        static RabbitMQConfig()
        {
            Host = "192.168.2.242";
            VirtualHost = "/";
            UserName = "hello";
            Password = "world";
            Port = 5672;
        }
    }

RabbitMQConfig.cs

class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("按任意键开始生产。");
            Console.ReadLine();
            PriorityMessagePublish();
            Console.ReadLine();
        }

        private static void PriorityMessagePublish()
        {
            const string MessagePrefix = "message_";
            const int PublishMessageCount = 6;
            byte messagePriority = 0;

            var factory = new ConnectionFactory()
            {
                HostName = RabbitMQConfig.Host,
                Port = RabbitMQConfig.Port,
                VirtualHost = RabbitMQConfig.VirtualHost,
                UserName = RabbitMQConfig.UserName,
                Password = RabbitMQConfig.Password,
                Protocol = Protocols.DefaultProtocol
            };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //设置队列优先级,取值范围在0~255之间。
                    Dictionary<string, object> dict = new Dictionary<string, object>
                    {
                        { "x-max-priority", 255 }
                    };

                    //声明队列
                    channel.QueueDeclare(queue: "priority", durable: true, exclusive: false, autoDelete: false, arguments: dict);

                    //向该消息队列发送消息message
                    Random random = new Random();
                    for (int i = 0; i < PublishMessageCount; i++)
                    {
                        var properties = channel.CreateBasicProperties();
                        messagePriority = (byte)random.Next(0, 9);
                        properties.Priority = messagePriority;//设置消息优先级,取值范围在0~9之间。
                        var message = MessagePrefix + i.ToString();
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "", routingKey: "priority", basicProperties: properties, body: body);
                        Console.WriteLine($"{DateTime.Now.ToString()} Send {message} , Priority {messagePriority}");
                    }
                }
            }
        }
    }

Program.cs

2.2、接收端(消费端)

新建一个控制台项目Receive,按住Alt键,将发送端RabbitMQConfig类拖一个快捷方式到Receive项目中。

class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("按任意键开始消费。");
            Console.ReadLine();
            PriorityMessageSubscribe();
        }

        public static void PriorityMessageSubscribe()
        {
            var factory = new ConnectionFactory()
            {
                HostName = RabbitMQConfig.Host,
                UserName = RabbitMQConfig.UserName,
                Password = RabbitMQConfig.Password
            };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += async (model, ea) =>
                    {
                        await Task.Run(() =>
                        {
                            var message = Encoding.UTF8.GetString(ea.Body);
                            Thread.Sleep(1000 * 2);
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//手动消息确认
                            Console.WriteLine($"{DateTime.Now.ToString()} Received {message}");
                        });
                    };
                    channel.BasicConsume(queue: "priority", noAck: false, consumer: consumer);//需要启用消息响应,否则priority无效。
                    Console.ReadKey();
                }
            }
        }
    }

Program.cs

2.3、运行结果

从消费情况可以看出,message_2及message_3由于priority优先级最高都是7,所以它们会被最早消费,而message_5的priority是0,所以最后才被消费。

(0)

相关推荐