RabbitMQ 入门指南 PHP 版系列文章: RabbitMQ 入门指南 PHP 版(一) RabbitMQ 入门指南 PHP 版(二)
主题交换机 Topic exchange
为进行更为复杂和高级的路由操作,可通过使用主题交换机来实现。
主题交换机的路由键必须是一个由 .
分隔开的词语列表。绑定键也必须拥有同样的格式。携带特定路由键的消息将被主题交换机投递给绑定键与之相匹配的队列。
绑定键和路由键有两种应用方式:
- * 号表示一个单词
- # 号表示任意数量(零个或多个)单词
举个栗子:所有消息均描述动物。消息所携带的路由键由三个被 .
分隔开的单词组成,第一个单词描述动物的敏捷度,第二个单词描述动物颜色,第三个单词描述物种。格式如:<敏捷度>.<颜色>.<物种>
给定三个绑定: Q1 的绑定键为 *.orange.*,该绑定针对的是所有的桔黄色动物的信息。 Q2 的绑定键为 *.*.rabbit 和 lazy.#,该绑定针对的是所有兔子和所有懒惰的动物。
携带有 quick.orange.rabbit 的消息将会被分别投递给这两个队列; 携带着 lazy.orange.elephant 的消息会被分别投递给这两个队列; 携带有 quick.orange.fox 的消息会投递给第一个队列; 携带有 lazy.brown.fox 的消息会投递给第二个队列; 携带有 lazy.pink.rabbit 的消息只会被投递给第二个队列,即使它同时匹配第二个队列的两个绑定。 携带有 quick.brown.fox 的消息不会投递给任意队列。 携带有一个单词或者四个单词如 orange 或 quick.orange.male.rabbit 的消息时,消息将不被投递,且会被丢弃。 携带有 lazy.orange.male.rabbit 的消息会匹配最后一个绑定,并被投递到第二个队列中。 当一个队列的绑定键为 # 时,该队列将忽略消息的路由键,接收所有消息。
运行代码
源代码:
emit_log_topic.php
receive_logs_topic.php
若需接收所有日志:
$ php receive_logs_topic.php "#"
接收来自 kern
设备的日志:
$ php receive_logs_topic.php "kern.*"
只接收严重程度为 critical
的日志:
$ php receive_logs_topic.php "*.critical"
建立多个绑定:
$ php receive_logs_topic.php "kern.*" "*.critical"
发送路由键为 kern.critical
的日志:
$ php emit_log_topic.php "kern.critical" "A critical kernel error"
远程过程调用 Remote Procedure Call
若要将一个函数运行在远程终端上并且等待从远程终端获取结果,可通过远程过程调用(Remote Procedure Call, RPC) 实现。
客户端接口
创建一个简单的客户端类,用以展示 RPC 服务。它通过 “call” 方法发送 RPC 请求,并在收到回应前保持阻塞。
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "\n";
回调队列
客户端发送请求信息后,服务端将其应用到一个回复信息中。客户端在发送请求时同时发送一个回调队列(callback queue)的地址用以接收回复信息。
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$msg = new AMQPMessage(
$payload,
array('reply_to' => $queue_name));
$channel->basic_publish($msg, '', 'rpc_queue');
## ... then code to read a response message from the callback_queue ...
消息属性
AMQP 协议为消息预定义了 14 个属性。其中常用的有以下几个:
- delivery_mode(投递模式):标记消息为持久的(值为 2)或暂存的(除了 2 之外的其他任何值)。
- content_type(内容类型): 描述编码的 mime-type。
- reply_to(回复目标):命名回调队列。
- correlation_id(关联标识):关联 RPC 的响应和请求。
关联标识 Correlation Id
为使消息投递更高效,可为每个客户端只建立一个独立的回调队列,并用关联标识来辨别该响应所属的请求。如果消息的关联标识为未知,则忽略消息。
运行代码
RPC 的工作原理:
- 客户端启动时创建一个匿名独占回调队列。
- 客户端发送的消息带有 reply_to 和 correlation_id 属性。
- 请求将发送到一个 rpc_queue 队列中。
- 服务器等待接收请求。请求出现时,它将带有执行结果的消息发送给 reply_to 字段指定的队列。
- 客户端等待回调队列里的结果。消息出现时,它将 correlation_id 属性与请求匹配的消息返回给程序。
RPC 的优势:
- 服务器运行过慢时,可运行另外一个服务器端来扩展
- 客户端的 RPC 请求只发送或接收一条消息。无需 queue_declare 声明队列异步调用。单个请求只需往返一次。
构建一个 RPC 系统:包含一个客户端和一个 RPC 服务器,并创建一个模拟 RPC 服务来返回斐波那契数列。
启动服务器端:
$ php rpc_server.php
[x] Awaiting RPC requests
启动客户端,请求一个 fibonacci 队列
$ php rpc_client.php
[x] Requesting fib(30)
相关文章 RabbitMQ - RabbitMQ tutorial - Topics RabbitMQ - RabbitMQ tutorial - Remote procedure call (RPC) RabbitMQ 能为你做些什么? - 为什么需要主题交换机? RabbitMQ 能为你做些什么? - 远程过程调用