C#队列学习笔记:RabbitMQ安装及使用

一、环境搭建

1.1、由于RabbitMQ是使用Erlang语言开发的,因此要安装Erlang运行时环境,下载地址:Erlang官网下载  CSDN分享下载

1.2、去RabbitMQ官网下载RabbitMQ Server服务端程序,选择合适的平台版本下载并安装。

RabbitMQ安装时,会自动在Windows服务中创建RabbitMQ服务,并自动启动。

1.3、开始->所有程序->RabbitMQ Server->RabbitMQ Command Prompt (sbin dir):

运行RabbitMQ Command Prompt与cmd下cd C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.3\sbin的效果是一样的。

1.3.1、sbin目录下的rabbitmqctl.bat,是用来查看和控制服务端状态的。运行rabbitmqctl status检查RabbitMQ状态:

1.3.3、RabbitMQ Server上面也有用户概念,使用rabbitmqctl list_users命令,可以看到目前的用户:

可以看到,现在只有一个名为gues角色为administratort的用户,这个是RabbitMQ默认为我们创建的,它有RabbitMQ的所有权限。一般情况下,我们需要新建一个自己的用户,并设置密码及授予权限,同时设置为管理员。操作方法如下:

rabbitmqctl add_user hello world
rabbitmqctl set_permissions hello ".*" ".*" ".*"
rabbitmqctl set_user_tags hello administrator

上面的第一命令添加了一个名为hello的用户并设置了密码world;第二条命令为用户hello分别授予对所有消息队列的配置、读和写的权限;第三条命令将用户hello设置为管理员。

现在我们可以将默认的guest用户删掉,使用下面的命令即可:

rabbitmqctl delete_user guest

如果要修改密码,可以使用下面的命令:

rabbitmqctl change_password {username} {newpassowrd}

二、管理界面

RabbitMQ还有一个管理界面,是以插件形式提供的,通过该界面可以查看RabbitMQ Server当前的状态。启用命令如下:

rabbitmq-plugins enable rabbitmq_management

现在,在浏览器中输入 http://server-name:15672/ 即可。

注:server-name为计算机名或IP地址,如果是本地的,直接用localhost即可。登录界面,使用我们之前创建的hello用户登录。

三、开始使用

在.NET中使用RabbitMQ需要下载RabbitMQ客户端程序集,下载解压后在bin下找到RabbitMQ.Client.dll,并添加引用到项目中。

3.1、Hello World

为了展示RabbitMQ的基本使用,我们发送一个HelloWorld消息,然后接收并处理。

3.1.1、创建一个名为Send的客户端控制台程序,用来将消息发送到RabbitMQ消息队列中,代码如下:

class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.声明队列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    //5.构建byte消息数据包
                    string message = args.Length > 0 ? args[0] : "Hello World";
                    var body = Encoding.UTF8.GetBytes(message); //消息是以二进制数组的形式传输的
                    //6.发送数据包
                    channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
                    Console.WriteLine($"Send {message}");
                    Console.Read();
                }
            }
            #endregion
        }
    }

Send.cs

3.1.2、创建一个名为Receive的服务端控制台程序,用来接收RabbitMQ消息队列中的消息,代码如下:

class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.创建信道
                using (var channel = connection.CreateModel())
                {
                    //4.声明队列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    //5.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel);
                    //6.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine($"Received {message}");
                    };
                    //7.启动消费者
                    channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }

Receive.cs

3.1.3、先运行消息接收端,再运行消息发送端,结果如下:

从上面的代码中可以看出,发送端和接收端的代码前4步都是一样的。主要的区别在于发送端调用channel.BasicPublish方法发送消息,而接收端需要实例化一个EventingBasicConsumer实例来进行消息处理。另外一点需要注意的是:消息接收端和发送端的队列名称(queue)必须保持一致,这里指定的队列名称为hello。

3.2、工作队列

工作队列(work queues,又称Task Queues)的主要思想是:为了避免立即执行一些实时性要求不高但是比较耗资源或时间的操作(如写日志),把任务当作消息发送到队列中,由一个运行在后台的工作者(worker)进程取出并处理。当有多个工作者(workers)运行时,任务会在它们之间共享。

现在发送一些字符串来模拟耗时的任务,在字符串中加上点号(.)来表示任务的复杂程度。一个点号将会耗时1秒钟,比如"Hello World..."就会耗时3秒钟。

class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.声明队列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    //5.构建byte消息数据包
                    string message = args.Length > 0 ? string.Join(" ", args) : "Hello World...";
                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2;//设置消息是否持久化 1:非持久化 2:持久化
                    var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的
                    //6.发送数据包
                    channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
                    Console.WriteLine($"Send {message}");
                    Console.Read();
                }
            }
            #endregion
        }
    }

Send.cs

class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.创建信道
                using (var channel = connection.CreateModel())
                {
                    //4.声明队列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    //5.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel);
                    //6.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                    };
                    //7.启动消费者
                    channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }

Receive.cs

3.3轮询分发

使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。

现在,我们先启动两个接收端,等待接受消息,然后启动一个发送端开始发送消息(cmd->send.exe所在的目录)。

上面发了10条信息,两个接收端各收到5条信息。

默认情况下,RabbitMQ会将每个消息按照顺序依次分发给下一个消费者,所以每个消费者接收到的消息个数大致是平均的。 这种消息分发的方式称之为轮询(round-robin)。

3.4、消息响应

当处理一个比较耗时得任务的时候,也许想知道消费者(consumers)是否运行到一半就挂掉。在当前的代码中,当RabbitMQ将消息发送给消费者之后,马上就会将该消息从队列中移除。此时,如果把处理这个消息的工作者(worker)停掉,正在处理的这条消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。

我们不想丢失任何任务消息,如果一个工作者挂掉了,我们希望该消息能够重新发送给其它的工作者。

为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)机制。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ才会释放并删除这条消息。如果消费者挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其它消费者。这样,即使工作者偶尔的挂掉,也不会丢失消息。

消息是没有超时这个概念的。当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。

消息响应默认是开启的。在之前的例子中使用了no_Ack=true标识把它关闭。是时候移除这个标识了,当工作者完成了任务,就会发送一个响应。

下面修改Receive.cs,主要改动的是:将 autoAck:true修改为autoAck:fasle,以及在消息处理完毕后手动调用BasicAck方法进行手动消息确认。

class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.创建信道
                using (var channel = connection.CreateModel())
                {
                    //4.声明队列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    //5.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel);
                    //6.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                        //7.发送消息确认信号(手动消息确认)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //8.启动消费者(noAck: false 启用消息响应)
                    channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }

Receive.cs

一个很常见的错误就是忘掉了BasicAck这个方法,这个错误很常见,但是后果很严重。当客户端退出时,待处理的消息就会被重新分发,但是RabitMQ会消耗越来越多的内存,因为这些没有被应答的消息不能够被释放。调试这种case,可以使用rabbitmqctl打印messages_unacknowledged字段。

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.3\sbin>rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages_ready  messages_unacknowledged
hello   1       0

3.5、消息持久化

消息确认确保了即使消费端异常,消息也不会丢失能够被重新分发处理。但是如果RabbitMQ服务端异常,消息依然会丢失。除非我们指定durable:true,否则当RabbitMQ退出或崩溃时,消息将依然会丢失。通过指定durable:true(队列),并指定Persistent=true(消息),来告知RabbitMQ将消息持久化。一句话概括:需要保证队列和消息都是持久化的。

class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //5.构建byte消息数据包
                    string message = args.Length > 0 ? args[0] : "Hello World";
                    var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的
                    //6.发送数据包(指定basicProperties)
                    channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);
                    Console.WriteLine($"Send {message}");
                    Console.Read();
                }
            }
            #endregion
        }
    }

Send.cs

class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.创建信道
                using (var channel = connection.CreateModel())
                {
                    //4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //5.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel);
                    //6.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                        //7.发送消息确认信号(手动消息确认)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //8.启动消费者(noAck: false 启用消息响应)
                    channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }

Receive.cs

将消息标记为持久性不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并且还没有保存时​​,仍然有一个很短的时间窗口。RabbitMQ可能只是将消息保存到了缓存中,并没有将其写入到磁盘上。持久化不是一定能够保证的,但是对于一个简单任务队列来说已经足够。

如果需要确保消息队列的持久化,可以使用publisher confirms

3.6、公平分发

你可能会注意到,消息的分发可能并没有如我们想要的那样公平分配。比如,对于两个工作者。当奇数个消息的任务比较重但是偶数个消息任务比较轻时,奇数个工作者始终处于忙碌状态,而偶数个工作者始终处于空闲状态,但是RabbitMQ并不知道这些,它仍然会平均依次地分发消息。

为了改变这一状态,我们可以使用basicQos方法,设置perfetchCount=1 。这样就告诉RabbitMQ 不要在同一时间给一个工作者发送多于1个的消息。换句话说,在一个工作者还在处理消息并且没有响应消息之前,不要给它分发新的消息,而是将这条新的消息发送给下一个不那么忙碌的工作者。

//Receive.cs
//4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。)
//channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare("hello", true, false, false, null);
//将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。
//channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
channel.BasicQos(0, 1, false);

3.7完整实例

class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //5.构建byte消息数据包
                    string message = args.Length > 0 ? args[0] : "Hello World";
                    var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的
                    //6.发送数据包(指定basicProperties)
                    channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);
                    Console.WriteLine($"Send {message}");
                    Console.Read();
                }
            }
            #endregion
        }
    }

Send.cs

class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.创建信道
                using (var channel = connection.CreateModel())
                {
                    //4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    //5.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel);
                    //6.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                        //7.发送消息确认信号(手动消息确认)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //8.启动消费者(noAck: false 启用消息响应)
                    channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }

Receive.cs

四、Exchange

上面的示例,生产者和消费者直接是通过相同队列名称进行匹配衔接的。消费者订阅某个队列,生产者创建消息发布到队列中,队列再将消息转发到订阅的消费者。这样就会有一个局限性,即消费者一次只能发送消息到某一个队列。

那消费者如何才能发送消息到多个消息队列呢?

RabbitMQ提供了Exchange,它类似于路由器的功能,对消息进行路由,将消息发送到多个队列上。Exchange一方面从生产者接收消息,另一方面将消息推送到队列。但是Exchange是如何知道将消息附加到哪个队列或者直接忽略的呢?这些其实是由Exchange Type来定义的。关于Exchange的图文介绍,请看上一篇《C#队列学习笔记:RabbitMQ基础知识》,此处仅提供示例代码。

4.1、fanout

class Program
    {
        static void Main(string[] args)
        {
            #region fanout exchange type
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用fanout exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //5.构建byte消息数据包
                    for (int i = 1; i <= 10; i++)
                    {
                        string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i);
                        var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的
                        //6.发送数据包(指定exchange;fanout类型无需指定routingKey;指定basicProperties)
                        channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: properties, body: body);
                        Console.WriteLine($"Send {message}");
                    }
                    Console.Read();
                }
            }
            #endregion
        }
    }

Send.cs

class Program
    {
        static void Main(string[] args)
        {
            #region fanout exchange type
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.创建信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用fanout exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
                    //5.声明队列(随机生成队列名称)
                    var queueName = channel.QueueDeclare().QueueName;
                    //绑定队列到指定fanout类型exchange,fanout类型无需指定routingKey。
                    channel.QueueBind(queue: queueName, exchange: "fanoutEC", routingKey: "");
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    //6.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel);
                    //7.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                        //8.发送消息确认信号(手动消息确认)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //9.启动消费者(noAck: false 启用消息响应)
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }

Receive.cs

4.2、direct

class Program
    {
        static void Main(string[] args)
        {
            #region direct exchange type
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用direct exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "directEC", type: "direct");
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //5.构建byte消息数据包
                    for (int i = 1; i <= 10; i++)
                    {
                        string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i);
                        var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的
                        //6.发送数据包(指定exchange;direct类型必须指定routingKey;指定basicProperties)
                        channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: properties, body: body);
                        Console.WriteLine($"Send {message}");
                    }
                    Console.Read();
                }
            }
            #endregion
        }
    }

Send.cs

class Program
    {
        static void Main(string[] args)
        {
            #region direct exchange type
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.创建信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用direct exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "directEC", type: "direct");
                    //5.声明队列(随机生成队列名称)
                    var queueName = channel.QueueDeclare().QueueName;
                    //绑定队列到指定direct类型exchange,direct类型必须指定routingKey。
                    channel.QueueBind(queue: queueName, exchange: "directEC", routingKey: "green");
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    //6.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel);
                    //7.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                        //8.发送消息确认信号(手动消息确认)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //9.启动消费者(noAck: false 启用消息响应)
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }

Receive.cs

4.3、topic

class Program
    {
        static void Main(string[] args)
        {
            #region topic exchange type
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用topic exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //5.构建byte消息数据包
                    for (int i = 1; i <= 10; i++)
                    {
                        string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i);
                        var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的
                        //6.发送数据包(指定exchange;topic类型必须指定routingKey;指定basicProperties)
                        channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: properties, body: body);
                        Console.WriteLine($"Send {message}");
                    }
                    Console.Read();
                }
            }
            #endregion
        }
    }

Send.cs

class Program
    {
        static void Main(string[] args)
        {
            #region topic exchange type
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.创建信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用topic exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
                    //5.声明队列(随机生成队列名称)
                    var queueName = channel.QueueDeclare().QueueName;
                    //绑定队列到指定topic类型exchange,topic类型必须指定routingKey。
                    channel.QueueBind(queue: queueName, exchange: "topicEC", routingKey: "#.*.fast");
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    //6.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel);
                    //7.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                        //8.发送消息确认信号(手动消息确认)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //9.启动消费者(noAck: false 启用消息响应)
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }

Receive.cs

五、RPC

RPC--Remote Procedure Call,远程过程调用。RabbitMQ是如何进行远程调用的呢?示意图如下:

第一步:主要是进行远程调用的客户端需要指定接收远程回调的队列,并声明消费者监听此队列。

第二步:远程调用的服务端除了要声明消费端接收远程调用请求外,还要将结果发送到客户端用来监听结果的队列中去。

class Program
    {
        static void Main(string[] args)
        {
            #region rpc
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.声明唯一guid用来标识此次发送的远程调用请求
                    var correlationId = Guid.NewGuid().ToString();
                    //5.声明需要监听的回调队列
                    var replyQueue = channel.QueueDeclare().QueueName;
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;//将消息标记为持久性
                    properties.ReplyTo = replyQueue;//指定回调队列
                    properties.CorrelationId = correlationId;//指定消息唯一标识
                    //6.构建byte消息数据包
                    string number = args.Length > 0 ? args[0] : "30";
                    var body = Encoding.UTF8.GetBytes(number);
                    //7.发送数据包
                    channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
                    Console.WriteLine($"Request fib({number})");
                    //8.创建消费者用于处理消息回调(远程调用返回结果)
                    var callbackConsumer = new EventingBasicConsumer(channel);
                    channel.BasicConsume(queue: replyQueue, noAck: false, consumer: callbackConsumer);
                    callbackConsumer.Received += (model, ea) =>
                    {
                        //仅当消息回调的ID与发送的ID一致时,说明远程调用结果正确返回。
                        if (ea.BasicProperties.CorrelationId == correlationId)
                        {
                            var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
                            Console.WriteLine($"{responseMsg}");
                        }
                    };
                    Console.Read();
                }
            }
            #endregion
        }
    }

Send.cs

class Program
    {
        static void Main(string[] args)
        {
            #region rpc
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.创建信道
                using (var channel = connection.CreateModel())
                {
                    //4.声明队列接收远程调用请求
                    channel.QueueDeclare(queue: "rpc_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    Console.WriteLine("Waiting for message.");
                    //5.请求处理逻辑
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int n = int.Parse(message);
                        Console.WriteLine($"Receive request of Fib({n})");
                        int result = Fib(n);
                        //6.从请求的参数中获取请求的唯一标识,在消息回传时同样绑定。
                        var properties = ea.BasicProperties;
                        var replyProerties = channel.CreateBasicProperties();
                        replyProerties.CorrelationId = properties.CorrelationId;
                        //7.将远程调用结果发送到客户端监听的队列上
                        channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
                            basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
                        //8.手动发回消息确认
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine($"Return result: Fib({n})= {result}");
                    };
                    channel.BasicConsume(queue: "rpc_queue", noAck: false, consumer: consumer);
                    Console.Read();
                }
            }

            int Fib (int n)
            {
                if (n <= 2)
                    return 1;
                else
                    return Fib(n - 1) + Fib(n - 2);
            }
            #endregion
        }
    }

Receive.cs

六、总结

本文介绍了RabbitMQ消息代理在Windows上的安装以及在.NET中的使用。消息队列在构建分布式系统、提高系统的可扩展性及响应性方面,有着很重要的作用。

参考自:

https://www.cnblogs.com/yangecnu/p/Introduce-RabbitMQ.html#!comments

https://www.cnblogs.com/sheng-jie/p/7192690.html

(0)

相关推荐