RabbitMQ入门指南PHP版(一)

介绍

RabbitMQ 是一个消息代理。核心原理是接收和发送消息。

生产者

  • 生产 (Producing) 就是发送消息。发送消息的程序就是一个生产者 (producer)。用 "P" 表示

队列

  • 队列 (queue) 就是邮箱的名称。消息存储在一个队列(queue)中通过你的应用程序和 RabbitMQ 进行传输

消费者

  • 消费(Consuming)就是获取消息。等待获取消息的程序就是一个消费者(consumer)。用 "C" 表示

RabbitMQ 使用的是 AMQP 协议,对于 PHP 来说你可以选择 php-amqplib 作为 RabbitMQ 的客户端。

Hello World!

Hello World!

设计一个简单的流程:生产者(producer)把消息发送到一个名为 “hello” 的队列中。消费者(consumer)从这个队列中获取消息。

发送消息 Sending

发送消息

发送程序会发送一个消息到队列中。首先要做的事情就是建立一个到 RabbitMQ 服务器的连接。

send.php

<?php
	<!-- %1 -->
	require_once __DIR__ . '/vendor/autoload.php';
	use PhpAmqpLib\Connection\AMQPStreamConnection;
	use PhpAmqpLib\Message\AMQPMessage;

	<!-- %2 -->
	$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
	$channel = $connection->channel();
	$channel->queue_declare('hello', false, false, false, false);

	<!-- %3 -->
	$msg = new AMQPMessage('Hello World!');
	$channel->basic_publish($msg, '', 'hello');

	<!-- %4 -->
	echo " [x] Sent 'Hello World!'\n";

	<!-- %5 -->
	$channel->close();
	$connection->close();
?>
  1. 添加必要的库
  2. 建立一个到 RabbitMQ 服务器的连接;创建一个名为 hello 的队列,然后把消息发送到这个队列中。
  3. 发送的消息存入 $msg;发送的消息存入 $msg;在 RabbitMQ 中,消息是不能直接发送到队列,它需要发送到交换机(exchange)中,将 $msg 通过一个空字符串来标识的交换机投递到 hello 队列中。
  4. 提示发送成功信息。
  5. 断开连接。

获取消息 Receiving

获取消息

receive.php

<?php
	<!-- %1 -->
	require_once __DIR__ . '/vendor/autoload.php';
	use PhpAmqpLib\Connection\AMQPStreamConnection;

	<!-- %2 -->
	$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
	$channel = $connection->channel();
	$channel->queue_declare('hello', false, false, false, false);

	<!-- %3 -->
	echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

	<!-- %4 -->
	$callback = function($msg) {
	  echo " [x] Received ", $msg->body, "\n";
	};

	<!-- %5 -->
	$channel->basic_consume('hello', '', false, true, false, false, $callback);

	<!-- %6 -->
	while(count($channel->callbacks)) {
	    $channel->wait();
	}

	<!-- %7 -->
	$channel->close();
	$connection->close();
?>
  1. 添加必要的库
  2. 建立一个到 RabbitMQ 服务器的连接;创建一个名为 hello 的队列,然后把消息发送到这个队列中。在 receive.php 中重复声明队列防止程序运行顺序导致的错误。
  3. 提示中断程序信息。
  4. 为队列定义一个回调(callback)函数,使用回调函数提示发送成功信息。
  5. 使用回调函数从名为 hello 的队列中接收消息
  6. 输入一个用来等待消息数据并且在需要的时候运行回调函数的无限循环
  7. 断开连接。

运行代码

源代码:
send.php
receive.php

$ php send.php

在终端中运行程序发送消息,send 程序在每次运行之后就会停止。

$ php receive.php

在终端中运行程序获取消息,receive 程序并不会退出。它一直在准备获取消息。可通过 Ctrl-C 来中止。

工作队列 Work Queues

工作队列

工作队列(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。

准备 Preparation

这里使用 sleep() 函数来模拟复杂的任务,在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)将会耗时 1 秒钟。比如 "Hello..." 就会耗时 3 秒钟。

发送消息

按照计划发送任意消息任务到工作队列中

修改 send.php 的第 2,3,4 部分得到 new_task.php,修改 receive.php 的第 2,4,5 部分得到 worker.php,使其按照计划发送任务到我们的工作队列中。

new_task.py

<?php
	<!-- %2 -->
	$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
	$channel = $connection->channel();
	$channel->queue_declare('task_queue', false, false, false, false);

	<!-- %3 -->
	$data = implode(' ', array_slice($argv, 1));
	if(empty($data)) $data = "Hello World!";
	$msg = new AMQPMessage($data,
                           array('delivery_mode' => 2)
	);
	$channel->basic_publish($msg, '', 'task_queue');

	<!-- %4 -->
	echo " [x] Sent ", $data, "\n";
?>

2.建立一个到 RabbitMQ 服务器的连接;创建一个名为 task_queue 的队列,然后把消息发送到这个队列中。
3.发送的单条消息作为发送单元存入 $data,并将其存入 $msg 作为队列发送。
4.提示每一条消息发送成功信息。

worker.php

<?php
	<!-- %2 -->
	$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
	$channel = $connection->channel();
	$channel->queue_declare('task_queue', false, true, false, false);

	<!-- %4 -->
	$callback = function($msg){
	echo " [x] Received ", $msg->body, "\n";
	sleep(substr_count($msg->body, '.'));
	echo " [x] Done", "\n";
	$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
	};

	<!-- %5 -->
	$channel->basic_qos(null, 1, null);
	$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
?>

2.建立一个到 RabbitMQ 服务器的连接;创建一个名为 task_queue 的队列,然后把消息发送到这个队列中。在 worker.php 中重复声明队列防止程序运行顺序导致的错误。
4.为队列定义一个回调(callback)函数,使用回调函数提示发送成功信息。
5.使用回调函数从名为 task_queue 的队列中接收消息。

轮询调度 Round-robin dispatching

三个终端,两个用来运行 worker.py 脚本,即两个消费者(consumers)—— C1 和 C2 接收工作者(workers)发送的消息。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询调度

workers:

$ php new_task.php First message.
$ php new_task.php Second message..
$ php new_task.php Third message...
$ php new_task.php Fourth message....
$ php new_task.php Fifth message.....

C1:

 $ php worker.php
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'

C2:

 $ php worker.php
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

消息确认 Message acknowledgment

如果不对消息进行确认,已发送的消息将马上在内存中被移除。为了防止系统运行导致的任务消息的丢失,我们使用消息确认机制预防此类情况的发生。消费者会通过一个响应(ack),告诉 RabbitMQ 已经收到并处理了某条消息,然后 RabbitMQ 就会释放并删除这条消息。若 RabbitMQ 未收到响应,则会将消息重新发送给其他消费者(consumer)。在进程运行时使用 CTRL+C 终止进程,消息也不会丢失。
消息确认默认关闭。可使用 no_ack=True 标识将其开启:

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

忘记确认

未声明 basic_ack 标志位将导致无法释放未响应的消息,RabbitMQ 就会占用越来越多的内存。排除该错误可使用 rabbitmqctl 命令输出 messages_unacknowledged 字段:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

消息持久化 Message durability

在 RabbitMQ 退出或者崩溃时,将会丢失所有队列和消息。把「队列」和「消息」设为持久化可确保信息不会丢失。
在保证未声明同名的非持久化队列的情况下,在生产者(producer)和消费者(consumer)对应的代码中修改声明 task_queue 队列持久化:

$channel->queue_declare('hello', false, true, false, false);

将 delivery_mode 的属性设为 2,实现消息持久化:

$msg = new AMQPMessage($data,
                       array('delivery_mode' => 2)
       );

PS:RabbitMq 收到消息到保存之间存在一个很小的时间间隔,故消息持久化并不能保证真正的持久化,但已经足够应付我们的简单工作队列。若一定要保证持久化,需改写代码来支持事务(transaction)。

公平调度 Fair dispatch

公平调度

RabbitMQ 只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第 n-th 条消息发给第 n-th 个消费者。为实现 RabbitMQ 按需处理消息分发,可使用 basic.qos 方法,并设置 prefetch_count=1。这使得 RabbitMQ 同一时刻,不要发送超过 1 条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。

channel.basic_qos(prefetch_count=1)

运行代码

源代码:
new_task.php
worker.php


相关文章 RabbitMQ - RabbitMQ tutorial - "Hello World!" RabbitMQ - RabbitMQ tutorial - Work Queues RabbitMQ 能为你做些什么? - 介绍 RabbitMQ 能为你做些什么? - 工作队列

updatedupdated2023-09-272023-09-27