首页
仓库
文档
nginx手册
Docker手册
workerman
Flask
PHP
python
RabbitMQ
其他
Linux
占位1
占位2
目录
###客户端安装 composer require php-amqplib/php-amqplib ##简单模式 ###简单模式 适合一个订阅者,逐一处理。 生产者 ```php <?php require_once __DIR__ . '/vendor/autoload.php'; #发布者 use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');//创建到服务器的连接 $channel = $connection->channel(); //创建一个通道 $channel->queue_declare('hello', false, false, false, false); //声明一个队列 $msg = new AMQPMessage('Hello World!'); //创建一个消息 $channel->basic_publish($msg, '', 'hello'); //发送消息到队列 echo " [x] Sent 'Hello World!'\n"; $channel->close(); $connection->close(); ?> ``` 订阅者 ```php <? require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; ##订阅者 $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] Received ', $msg->getBody(), "\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); try { $channel->consume(); } catch (\Throwable $exception) { echo $exception->getMessage(); } $channel->close(); $connection->close(); ``` ###列队模式和持久消息 适合N个订阅,轮流取任务。 发布 ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); $data = implode(' ', array_slice($argv, 1)); if (empty($data)) { $data = "Hello World!"; } $msg = new AMQPMessage( $data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) #消息标记为持久 ); $channel->basic_publish($msg, '', 'task_queue'); echo ' [x] Sent ', $data, "\n"; $channel->close(); $connection->close(); ``` 订阅 ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] Received ', $msg->getBody(), "\n"; sleep(substr_count($msg->getBody(), '.')); echo " [x] Done\n"; $msg->ack(); ##手动通知 }; $channel->basic_qos(null, 1, false);#处理完才取下一条。 $channel->basic_consume('task_queue', '', false, false, false, false, $callback); try { $channel->consume(); } catch (\Throwable $exception) { echo $exception->getMessage(); } $channel->close(); $connection->close(); ``` ##交换机模式 ###订阅发布 适合N个订阅者,同时处理同一条数据。 发布 ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); ### 交换机名称,fanout 订阅/发布模式 $channel->exchange_declare('logs', 'fanout', false, false, false); $data = "info: Hello World!"; $msg = new AMQPMessage($data); ### 发布消息到交换机 $channel->basic_publish($msg, 'logs'); echo ' [x] Sent ', $data, "\n"; $channel->close(); $connection->close(); ``` 订阅 ```php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); ### 交换机名称,fanout 订阅/发布模式 $channel->exchange_declare('logs', 'fanout', false, false, false); ###创建一个非持久队列 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); ###绑定列队到交换机 $channel->queue_bind($queue_name, 'logs'); echo " [*] Waiting for logs. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] ', $msg->getBody(), "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); try { $channel->consume(); } catch (\Throwable $exception) { echo $exception->getMessage(); } $channel->close(); $connection->close(); ``` ###关键字模式 N给订阅者,分别实现不同的关键字订阅,发布者根据条件发送不同关键字。 发布 ```php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); ### 交换机名称 direct关键字模式 $channel->exchange_declare('direct_logs', 'direct', false, false, false); $severity = '关键字' $data = "Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'direct_logs', $severity); echo ' [x] Sent ', $severity, ':', $data, "\n"; $channel->close(); $connection->close(); ``` 订阅 ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('direct_logs', 'direct', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, 'direct_logs', '关键字'); echo " [*] Waiting for logs. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] ', $msg->getRoutingKey(), ':', $msg->getBody(), "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); try { $channel->consume(); } catch (\Throwable $exception) { echo $exception->getMessage(); } $channel->close(); $connection->close(); ``` ###模糊关键字 同一个条消息,模糊匹配N给订阅者,分别做对应的处理。 例如: 推送`abc.abc` ,订阅 `abc.*`和`abc.abc`同时匹配执行 推送 `abc.a` ,`abc.*`匹配执行 发布 ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); ###交换机名称 topic模糊关键字模式 $channel->exchange_declare('topic_logs', 'topic', false, false, false); $routing_key = 'anonymous.info'; $data = "Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'topic_logs', $routing_key); echo ' [x] Sent ', $routing_key, ':', $data, "\n"; $channel->close(); $connection->close(); ``` 订阅 ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('topic_logs', 'topic', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, 'topic_logs', 'anonymous.info'); echo " [*] Waiting for logs. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] ', $msg->getRoutingKey(), ':', $msg->getBody(), "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); try { $channel->consume(); } catch (\Throwable $exception) { echo $exception->getMessage(); } $channel->close(); $connection->close(); ```