RabbitMQ入门指南PHP版(二)

RabbitMQ 入门指南 PHP 版系列文章: RabbitMQ 入门指南 PHP 版(一)

发布/订阅 Publish/Subscribe

分发一个消息给多个消费者(consumers)的模式称为「发布 / 订阅」。若要构建一个简单的日志系统用于发送日志消息和获取消息并输出内容,可利用发布 / 订阅模式实现。

交换机 Exchanges

RabbitMQ 中完整的消息模型:

  • 生产者(producer)是发布消息的应用程序。
  • 队列(queue)用于消息存储的缓冲。
  • 消费者(consumer)是接收消息的应用程序。

RabbitMQ 消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列,其只需要把消息发送给一个交换机(exchange)代为处理。消息处理规则通过交换机类型(exchange type)来定义。

交换机类型:

  • 直连交换机(direct)
  • 主题交换机(topic)
  • 头交换机(headers)
  • 扇型交换机(fanout)

交换机

先创建一个名为 logs 的扇型交换机,它将把消息发送给它所知道的所有队列:

$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');

其中 basic_publish() 的第一个变量为发送消息的目标队列。
其中 basic_publish() 的第二个变量为交换机设置 routing_key。

交换机列表

使用 rabbitmqctl 命令可列出服务器上所有的交换机:

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

其中名为 amq.* 的交换机是 RabbitMQ 默认创建的。

匿名的交换机

在介绍发送消息 Sending 时举例的 send.php 中有如下代码:

$channel->basic_publish($msg, '', 'hello');

在该例中我们使用空字符串 '' 设置使用默认交换机。

临时队列 Temporary queues

调用 queue_declare 方法时,函数参数 queue 的值为空可使 RabbitMQ 创建一个随机命名的队列:

list($queue_name, ,) = $channel->queue_declare("");

若要使其与消费者(consumer)断开连接时被立即删除,可声明该队列为 exclusive。

绑定 Bindings

绑定

交换机和队列之间的联系我们称之为绑定(binding),绑定时可添加 routing_key 参数。为避免与 basic_publish 的 routing_key 参数混淆,将此处的 routing_key 称为绑定键(binding key)。绑定键将会影响交换机对消息进行的路由操作。扇型交换机会忽略该值。

不带绑定键的绑定:

$channel->queue_bind($queue_name, 'logs');

带绑定键为 black 的绑定:

$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);

绑定列表

使用 rabbitmqctl 命令可列出所有现存的绑定。

$ sudo rabbitmqctl list_bindings

运行代码

emit_log.php
receive_logs.php

由于发布消息要求确定存在的交换机,故在 RabbitMQ 连接成功后声明一个交换机。日志消息发布程序把消息发送给 logs 交换机而不是匿名交换机。若未绑定队列到交换机,消息将丢失。若没有消费者监听,则消息会被忽略。执行下列命令将实现 logs 交换机把数据发送给两个系统命名的队列。

$ php receive_logs.php > logs_from_rabbit.log               //将日志存入文件
$ php receive_logs.php                                     //查看日志
$ php emit_log.php                                         //发送日志
$ sudo rabbitmqctl list_bindings                          //确认已创建的队列绑定
Listing bindings ...
 ...
logs    amq.gen-TJWkez28YpImbWdRKMa8sg==                []
logs    amq.gen-x0kymA4yPzAT6BoC/YP+zw==                []
...done.

路由 Routing

直连交换机 Direct exchange

直连交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息该分发到哪个队列。
举个栗子:

直连交换机

上图中直连交换机 X 和 Q1,Q2 两个队列进行了绑定。 第一个队列有一个绑定,绑定键为 orange。
第二个队列有两个绑定,绑定键为 black 和 green。
当路由键为 orange 的消息发布到交换机,会被路由到队列 Q1。
当路由键为 black 者 green 的消息发布到交换机,会路由到 Q2。
其他的所有消息都将会被丢弃。

多重绑定 Multiple bindings

多重绑定

可使用相同的绑定键绑定多个队列。使用 black 绑定键添加 X 和 Q1、Q2 之间的绑定,则带有 black 路由键的消息会同时发送到 Q1 和 Q2。

情景应用

如需把严重的错误日志信息写入日志文件(存储到磁盘),同时仍把所有日志信息输出到控制台中,可利用路由功能实现:将消息发送到直连交换机,以日志级别作为路由键。日志级别 severity 的值是 info、warning、error。接收日志的脚本可根据日志级别来选择其要处理的日志。

发送日志

创建一个交换机(exchange):

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

发送消息:

$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);

订阅

根据严重级别分别创建绑定:

foreach($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

运行代码

源代码:
emit_log_direct.php
receive_logs_direct.php

若只需保存 warning 和 error 级别的日志到磁盘:

$ php receive_logs_direct.php warning error > logs_from_rabbit.log

在新终端中输出所有日志信息:

$ php receive_logs_direct.php info warning error
 [*] Waiting for logs. To exit press CTRL+C

输出 error 级别的日志:

$ php emit_log_direct.php error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'

相关文章 RabbitMQ - RabbitMQ tutorial - Publish/Subscribe RabbitMQ - RabbitMQ tutorial - Routing RabbitMQ 能为你做些什么? - 发布/订阅 RabbitMQ 能为你做些什么? - 路由

updatedupdated2023-09-272023-09-27