首页
仓库
文档
nginx手册
Docker手册
workerman
Flask
PHP
python
RabbitMQ
其他
Linux
占位1
占位2
目录
###关键字模式 根据我的需求,模拟搭建一个demo 我的需求,不同的关键字,代表不同的业务需求。 发送端,会发送推送,日志,订单等。 关键字定义 日志log 订单order 这样一条订单就,同时推送给2个业务模块。缺点是单一执行。执行完才取下一条任务。 推送 ts.php ```php <? require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; #建立链接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); #建立通道 $channel = $connection->channel(); ### 交换机名称 direct关键字模式 持久模式 $channel->exchange_declare('demo2024', 'direct', false, true, false); $sn=rand(111111,999999); #通知订单系统------ $severity = 'order'; $data = "sn:".$sn; $msg = new AMQPMessage( $data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) #消息标记为持久 ); $channel->basic_publish($msg, 'demo2024', $severity); echo ' [x] Sent ', $severity, ':', $data, "<br>"; # 通知日志系统------------ $severity = 'log'; $data = "sn:".$sn; $msg = new AMQPMessage( $data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) #消息标记为持久 ); $channel->basic_publish($msg, 'demo2024', $severity); echo ' [x] Sent ', $severity, ':', $data, "<br>"; $channel->close(); $connection->close(); ``` 订阅(多个订阅者) ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; #订单处理流程 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');#建立链接 $channel = $connection->channel(); #建立通道 $channel->exchange_declare('demo2024', 'direct', false, true, false);#创建交换机,交换机类型 持久模式 list($queue_name, ,) = $channel->queue_declare("", false, true, false, false); $channel->queue_bind($queue_name, 'demo2024','order');#绑定的列队 ,交换机名称,关键字 $channel->basic_qos(null, 1, false); // 一次只处理一个消息,直到它被确认 echo " [*] Waiting for logs. To exit press CTRL+C\n"; $callback = function ($msg) { try { // 处理消息 echo ' [x] ', $msg->getRoutingKey(), ':', $msg->getBody(), "\n"; file_put_contents('order_log.txt',' [x] '. $msg->getRoutingKey(). ':'. $msg->getBody().'|'.date("Y-n-j G:i:s")."\r\n",FILE_APPEND); sleep(5); $msg->ack(); // 如果处理成功,确认消息 } catch (Exception $e) { // 如果处理失败,拒绝消息并决定是否重新入队 $msg->nack(false, true); // 第二个参数为true表示重新入队 // 或者 $msg->nack(false, false); // 第二个参数为false表示不重新入队 //本身不提供,失败N次不重新排队,自己记录错误次数,选择重新入队还是不重新入队 } }; $channel->basic_consume($queue_name, '', false, false, false, false, $callback); try { $channel->consume(); } catch (\Throwable $exception) { echo 'ERROR'.$exception->getMessage().'!!!'; } $channel->close(); $connection->close(); ``` ###队列模式 如果是单一功能,可以多开订阅提高轮询执行速度。 推送 ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', true, false, false, false); for($i=1;$i<=100;$i++){ $data = "no:$i!"; $msg = new AMQPMessage( $data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) #消息标记为持久 ); $channel->basic_publish($msg, '', 'task_queue'); } echo ' [x] Sent ', $data, "\n"; $channel->close(); $connection->close(); ``` 订阅(可以启动多个轮询执行任务,提高速度) ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', true, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { try { echo ' [x] Received ', $msg->getBody(), "\n"; echo " [x] Done\n"; sleep(5); $msg->ack(); ##手动通知 } catch (Exception $e) { // 如果处理失败,拒绝消息并决定是否重新入队 $msg->nack(false, true); // 第二个参数为true表示重新入队 // 或者 $msg->nack(false, false); // 第二个参数为false表示不重新入队 //本身不提供,失败N次不重新排队,自己记录错误次数,选择重新入队还是不重新入队 } }; $channel->basic_qos(null, 1, false);#处理完才取下一条。 $channel->basic_consume('task_queue', '', true, false, false, false, $callback); try { $channel->consume(); } catch (\Throwable $exception) { echo $exception->getMessage(); } $channel->close(); $connection->close(); ``` ###后台运行 nohup php t1.php & 后台运行php文件 ps -ef|grep t1.php 查看进程id kill -9 进程id 关闭进程