首页
仓库
文档
nginx手册
Docker手册
workerman
Flask
PHP
python
RabbitMQ
其他
Linux
占位1
占位2
目录
```php <?php /** * 原生PHP向MQTT主题发布数组数据(修复连接响应异常问题) * PHP 7.4+ 原生实现,支持MQTT 3.1.1认证 * * @param array $data 要发布的数组数据 * @param string $topic MQTT主题名称 * @param array $mqttConfig MQTT服务器配置(支持用户名密码认证) * @return array 执行结果(success:是否成功, msg:提示信息, debug:调试信息) */ function publishArrayToMqttTopic(array $data, string $topic, array $mqttConfig = []): array { // 默认MQTT配置(增加用户名密码默认值) $config = array_merge([ 'host' => '127.0.0.1', // MQTT服务器地址 'port' => 1883, // MQTT端口(默认1883) 'clientId' => 'php_mqtt_' . uniqid(), // 唯一客户端ID 'qos' => 0, // QoS等级(0/1/2,默认0) 'timeout' => 5, // 连接超时时间(秒,延长至5秒) 'username' => '', // MQTT认证用户名 'password' => '', // MQTT认证密码 'keepalive'=> 60 // 保持连接时间(秒) ], $mqttConfig); // 1. 校验输入参数 if (empty($topic)) { return ['success' => false, 'msg' => 'MQTT主题名称不能为空', 'debug' => '']; } if (empty($data)) { return ['success' => false, 'msg' => '发布的数组数据不能为空', 'debug' => '']; } // 2. 数组转JSON字符串(确保中文不转义) $message = json_encode($data, JSON_UNESCAPED_UNICODE); if (json_last_error() !== JSON_ERROR_NONE) { return [ 'success' => false, 'msg' => '数组转JSON失败:' . json_last_error_msg(), 'debug' => '' ]; } // 3. 创建TCP Socket连接 $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if ($socket === false) { $errorMsg = socket_strerror(socket_last_error()); return [ 'success' => false, 'msg' => '创建Socket失败:' . $errorMsg, 'debug' => 'Socket错误码:' . socket_last_error() ]; } // 设置超时时间(更精细的超时配置) $timeout = ['sec' => $config['timeout'], 'usec' => 0]; socket_set_option($socket, SOL_SOCKET, SO_RCVTIMEO, $timeout); socket_set_option($socket, SOL_SOCKET, SO_SNDTIMEO, $timeout); socket_set_block($socket); // 设为阻塞模式,确保读取到响应 // 4. 连接MQTT服务器 $connectResult = socket_connect($socket, $config['host'], $config['port']); if ($connectResult === false) { $errorMsg = socket_strerror(socket_last_error($socket)); socket_close($socket); return [ 'success' => false, 'msg' => '连接MQTT服务器失败:' . $errorMsg, 'debug' => '连接地址:' . $config['host'] . ':' . $config['port'] . ',错误码:' . socket_last_error($socket) ]; } $debugInfo = "已成功连接MQTT服务器:{$config['host']}:{$config['port']}\n"; try { // 5. 构建MQTT CONNECT报文(支持用户名密码认证) $connectPacket = buildMqttConnectPacket( $config['clientId'], $config['username'], $config['password'], $config['keepalive'] ); $debugInfo .= "CONNECT报文长度:" . strlen($connectPacket) . "字节\n"; // 发送连接报文 $sendLen = socket_write($socket, $connectPacket, strlen($connectPacket)); if ($sendLen === false) { throw new Exception('发送CONNECT报文失败:' . socket_strerror(socket_last_error($socket))); } $debugInfo .= "已发送CONNECT报文,发送字节数:{$sendLen}\n"; // 读取连接响应(优化:循环读取直到获取完整响应) $response = ''; $startTime = microtime(true); // 循环读取,直到获取至少4字节(CONNACK报文最小长度)或超时 while (strlen($response) < 4 && (microtime(true) - $startTime) < $config['timeout']) { $buffer = socket_read($socket, 1024); if ($buffer !== false) { $response .= $buffer; } usleep(10000); // 休眠10ms,避免过度占用CPU } $debugInfo .= "读取到响应报文:" . bin2hex($response) . "(长度:" . strlen($response) . "字节)\n"; // 校验CONNACK响应(放宽判断条件,仅校验报文类型) if (empty($response)) { throw new Exception('未读取到MQTT服务器响应,可能是服务器无响应或超时'); } $packetType = ord($response[0]) & 0xF0; // 提取高4位(报文类型) if ($packetType !== 0x20) { // 0x20 = CONNACK报文类型 throw new Exception("收到非CONNACK响应,报文类型:0x" . dechex($packetType) . "(预期0x20)"); } // 解析CONNACK返回码(判断连接是否成功) if (strlen($response) >= 4) { $connAckCode = ord($response[3]); // 第4字节是返回码 $connAckMsg = getMqttConnAckMsg($connAckCode); $debugInfo .= "CONNACK返回码:{$connAckCode}({$connAckMsg})\n"; if ($connAckCode !== 0) { throw new Exception("MQTT服务器拒绝连接:{$connAckMsg}(返回码:{$connAckCode})"); } } // 6. 构建MQTT PUBLISH报文(发布消息) $publishPacket = buildMqttPublishPacket($topic, $message, $config['qos']); $sendLen = socket_write($socket, $publishPacket, strlen($publishPacket)); if ($sendLen === false) { throw new Exception('发送PUBLISH报文失败:' . socket_strerror(socket_last_error($socket))); } $debugInfo .= "已发送PUBLISH报文,发送字节数:{$sendLen},主题:{$topic}\n"; // 7. 断开连接(发送DISCONNECT报文) $disconnectPacket = chr(0xE0) . chr(0x00); socket_write($socket, $disconnectPacket, strlen($disconnectPacket)); $debugInfo .= "已发送DISCONNECT报文,连接已断开\n"; return [ 'success' => true, 'msg' => "数组数据已成功发布到MQTT主题:{$topic}", 'debug' => $debugInfo ]; } catch (Exception $e) { return [ 'success' => false, 'msg' => '发布失败:' . $e->getMessage(), 'debug' => $debugInfo . "异常信息:" . $e->getMessage() ]; } finally { // 确保关闭Socket连接 socket_close($socket); } } /** * 构建MQTT CONNECT报文(支持用户名密码认证) * @param string $clientId 客户端ID * @param string $username 用户名(空则不添加) * @param string $password 密码(空则不添加) * @param int $keepalive 保持连接时间(秒) * @return string MQTT连接报文 */ function buildMqttConnectPacket(string $clientId, string $username = '', string $password = '', int $keepalive = 60): string { // 可变报头:协议名 + 协议级别 + 连接标志 + 保持连接时间 $protocolName = chr(0x00) . chr(0x04) . 'MQTT'; // 长度(0x0004) + 协议名 $protocolLevel = chr(0x04); // MQTT 3.1.1 $connectFlags = 0x02; // 基础标志:clean session开启 // 如果有用户名/密码,设置对应的标志位 if (!empty($password)) { $connectFlags |= 0x40; // 密码标志位(bit6) } if (!empty($username)) { $connectFlags |= 0x80; // 用户名标志位(bit7) } $connectFlags = chr($connectFlags); // 保持连接时间(2字节大端序) $keepaliveHi = chr($keepalive >> 8); $keepaliveLo = chr($keepalive & 0xFF); $keepAlive = $keepaliveHi . $keepaliveLo; // 有效载荷:客户端ID + 用户名(可选) + 密码(可选) // 客户端ID $clientIdLen = chr(strlen($clientId) >> 8) . chr(strlen($clientId) & 0xFF); $payload = $clientIdLen . $clientId; // 用户名(如果有) if (!empty($username)) { $usernameLen = chr(strlen($username) >> 8) . chr(strlen($username) & 0xFF); $payload .= $usernameLen . $username; } // 密码(如果有) if (!empty($password)) { $passwordLen = chr(strlen($password) >> 8) . chr(strlen($password) & 0xFF); $payload .= $passwordLen . $password; } // 固定报头:CONNECT类型(0x10) + 剩余长度 $variableHeader = $protocolName . $protocolLevel . $connectFlags . $keepAlive; $remainingLength = strlen($variableHeader) + strlen($payload); $fixedHeader = chr(0x10) . encodeMqttRemainingLength($remainingLength); return $fixedHeader . $variableHeader . $payload; } /** * 构建MQTT PUBLISH报文 * @param string $topic 主题名称 * @param string $message 消息内容 * @param int $qos QoS等级(0/1/2) * @return string MQTT发布报文 */ function buildMqttPublishPacket(string $topic, string $message, int $qos = 0): string { // 固定报头:PUBLISH类型(0x30) + QoS标志 + 剩余长度 $publishType = 0x30 | ($qos << 1); // QoS左移1位(bit1-bit2) $fixedHeaderFirstByte = chr($publishType); // 可变报头:主题(UTF-8) + 报文ID(QoS>0时需要,这里QoS=0省略) $topicLen = chr(strlen($topic) >> 8) . chr(strlen($topic) & 0xFF); $variableHeader = $topicLen . $topic; // 有效载荷:消息内容 $payload = $message; // 计算剩余长度并编码 $remainingLength = strlen($variableHeader) + strlen($payload); $remainingLengthEncoded = encodeMqttRemainingLength($remainingLength); return $fixedHeaderFirstByte . $remainingLengthEncoded . $variableHeader . $payload; } /** * MQTT剩余长度编码 * @param int $length 剩余长度 * @return string 编码后的剩余长度 */ function encodeMqttRemainingLength(int $length): string { $encoded = ''; do { $byte = $length % 128; $length = (int)($length / 128); if ($length > 0) { $byte |= 0x80; } $encoded .= chr($byte); } while ($length > 0); return $encoded; } /** * 获取MQTT CONNACK返回码对应的说明 * @param int $code 返回码 * @return string 说明文字 */ function getMqttConnAckMsg(int $code): string { $msgMap = [ 0 => '连接成功', 1 => '不支持的协议版本', 2 => '客户端ID无效', 3 => '服务器不可用', 4 => '用户名或密码错误', 5 => '未授权连接', ]; return $msgMap[$code] ?? "未知返回码({$code})"; } // -------------------------- 测试示例(修改为你的实际配置) -------------------------- // 1. 准备要发布的数组数据 $testData = [ 'id' => 1001, 'title' => '测试消息', 'content' => '这是修复后的PHP原生MQTT发布消息', 'timestamp' => time() ]; // 2. 指定MQTT主题 $mqttTopic = 'asdf'; // 3. 配置MQTT服务器(重点:填写你的实际用户名/密码) $mqttConfig = [ 'host' => '127.0.0.1', // 替换为你的MQTT Broker地址 'port' => 1111, // 替换为你的MQTT端口(SSL用8883) 'username' => '', // 替换为你的MQTT用户名(无则留空) 'password' => '', // 替换为你的MQTT密码(无则留空) 'timeout' => 5, 'keepalive'=> 60 ]; // 4. 调用函数发布消息 $result = publishArrayToMqttTopic($testData, $mqttTopic, $mqttConfig); // 5. 输出结果(包含调试信息) echo "执行结果:\n"; var_dump($result['success']); echo "提示信息:{$result['msg']}\n"; echo "调试信息:\n{$result['debug']}\n"; ?> ```