我司最近将消息中间件改为了 mqtt,因为 mqtt 的特性(同一个 client_id 不能进行多次连接),导致客户端与 mqtt 服务器之间必须使用一个长连接。一开始我是用队列 while 循环拉取来实现发消息这个操作的。后来闲的蛋疼加上想学点新的东西,就想用 swoole 写个 tcp 服务器,然后就入了坑了.....
现在,我在每个 task 进程中都启动了一个 mqtt 连接,在压测时,这个 mqtt 连接经常会出现 errno=11 (资源不可用)的错误,我查了下相关资料,这个错误出现的原因是因为 mqtt 连接的 socket 写缓冲区满了,消息无法写进写缓冲区。关于这块我很疑惑,task 进程不应该是个单线程的进程么?这样 mqtt 连接一次只处理一个消息,那为什么还会出现写缓冲区满的错误呢? 另外一个问题就是在测试性能时,我用 swoole 写的这个服务器性能竟然和 while 循环拉取队列的性能差不多,差不多都是每秒发送 380 条消息左右。我都要疯了,究竟是我什么地方写的不对,我现在都怀疑人生了!!!!
<?php
namespace swoole;
//如果以守护进程启动后,必须使用绝对地址
include_once __DIR__."/mqtt/MessageId.php";
include_once __DIR__."/mqtt/phpMQTT.php";
class TcpServer
{
private $serv;
private $mqtt_config;
private $mqtt_connect;
const PROCESS_NAME = 'swoole';
public function __construct()
{
//创建 Server 对象,监听 127.0.0.1:9501 端口
$this->serv = new \swoole_server('0.0.0.0', 9501);
//设置属性
$this->serv->set(array(
//守护进程化。设置 daemonize => 1 时,程序将转入后台作为守护进程运行。长时间运行的服务器端程序必须启用此项。
//启用守护进程后,CWD (当前目录)环境变量的值会发生变更,相对路径的文件读写会出错。PHP 程序中必须使用绝对路径
'daemonize' => true,
'worker_num' => 2, //异步非阻塞代码一般设为 CPU 的 1-4 倍。
'max_request' => 10000, //一个 worker 进程在处理完超过此数值的任务后将自动退出,主要作用是解决 PHP 进程内存溢出问题
'max_conn' => 1024, //进程最大连接数
'task_worker_num' => 20, //Task 进程最大值不得超过 cpu_num*1000,该进程是同步阻塞的,里面不得调用异步 IO 函数
'task_ipc_mode' => 3, //worker 进程与 task 进程之间的通信模式,3 为队列通信并且设置为了争抢模式,使用消息队列通信,如果 Task 进程处理能力低于投递速度,可能会引起 Worker 进程阻塞。
'message_queue_key' => 0x72000100, //指定消息队列的 key
'task_max_request' => 10000, //task 进程最大任务数
'log_file' => '/tmp/swoole.log', //日志文件
'log_level' => 4, //需要记录的错误级别
'tcp_fastopen' => true, //开启 TCP 快连接
));
//监听服务启动事件
$this->serv->on('Start', array($this, 'onStart'));
//监听管理进程启动事件
$this->serv->on('ManagerStart', array($this, 'onManagerStart'));
//监听工作进程启动事件
$this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
//监听工作进程异常退出事件
$this->serv->on('WorkerError', array($this, 'onWorkerError'));
//监听工作检测停止事件
$this->serv->on('WorkerStop', array($this, 'onWorkerStop'));
//监听连接进入事件
$this->serv->on('Connect', array($this, 'onConnect'));
//监听数据接受事件
$this->serv->on('Receive', array($this, 'onReceive'));
//监听连接关闭事件
$this->serv->on('Close', array($this, 'onClose'));
//监听 task 进程接收任务事件
$this->serv->on('Task', array($this, 'onTask'));
//监听 Task 进程完成任务事件
$this->serv->on('Finish', array($this, 'onFinish'));
//启动服务器
$this->serv->start();
}
//onStart 回调中,仅允许 echo、打印 Log、修改进程名称。不得执行其他操作
public function onStart($serv)
{
swoole_set_process_name(self::PROCESS_NAME.'_master');
}
//在这个回调函数中可以修改管理进程的名称
public function onManagerStart($serv)
{
swoole_set_process_name(self::PROCESS_NAME.'_manager');
}
//此事件在 Worker 进程 /Task 进程启动时发生,这里创建的对象可以在进程生命周期内使用
public function onWorkerStart($serv, $worker_id)
{
//引入常用函数文件,由于可能会发送更改,所以在 worker 进程开始时引用
include_once __DIR__.'/mqtt/__cron.php';
$jobType = $serv->taskworker ? 'Tasker' : 'Worker';
swoole_set_process_name(self::PROCESS_NAME.'_'.$jobType.'_'.$worker_id);
//在 task 进程中启动 mqtt 连接
if ($serv->taskworker) {
echo "Task 进程({$worker_id})启动\r\n";
//获取配置
$this->mqtt_config = get_mqtt_config($worker_id);
//连接服务器(这里为了以后能加入多个 mqtt 实例,所以我们将连接放入一个数组中)
foreach ($this->mqtt_config as $key=>$item) {
$mqtt = get_mqtt($item, 'o2o_mqtt', []);
//如果没有连接上 mqtt 服务器,关闭当前进程
if (!$mqtt) {
$serv->stop($worker_id, true);
}
$mqtt_arr[$key] = $mqtt;
}
$this->mqtt_connect = $mqtt_arr;
//30S 发送一次心跳包
$serv->tick(30000, function () use ($serv, $worker_id) {
//发送心跳包
foreach ($this->mqtt_config as $key=>$item) {
if (!$this->mqtt_connect[$key]['obj']->ping()) {
//如果 ping 失败就重新连接
echo "{$item['addr']} ping 失败,退出当前 task 进程($worker_id)\r\n";
$mqtt = get_mqtt($item, 'o2o_mqtt', []);
if (!$mqtt) {
$serv->stop($worker_id, true);
}
$this->mqtt_connect[$key] = $mqtt;
}
}
});
}
//在 worker 进程判断文件是否更新
if (!$serv->taskworker) {
//清除文件状态缓存,这个是为了防止下面 filemtime 从缓存中读取
clearstatcache();
$filemtime = filemtime(__FILE__);
//30S 检测一次文件更新
$serv->tick(30000, function () use ($serv, $worker_id, $filemtime) {
//检查文件更新
clearstatcache();
//如果文件变化,则重启所有的 work 进程
if ($filemtime != filemtime(__FILE__)) {
echo "文件更新,重启所有 woker/task 进程\r\n";
$serv->reload();
}
});
}
}
public function onWorkerError($serv, $worker_id, $worker_pid, $exit_code, $signal)
{
echo "{$worker_id} Error\r\n";
}
//此函数在 Worker 进程中执行
public function onWorkerStop($serv,$worker_id)
{
//zend_opcache 的 opcache 清理函数,防止某些服务器开启了 opcache
opcache_reset();
}
//此函数在 Worker 进程中执行
public function onConnect($serv, $fd)
{
//echo "Client: connect.\n";
}
//此函数在 Worker 进程中执行
public function onReceive($serv, $fd, $from_id, $data)
{
$param['data'] = json_decode($data,true);
$param['fd'] = $fd;
//向 task 进程投递任务
$serv->task(json_encode($param));
}
//此函数在 Task 进程中执行
public function onTask($serv, $task_id, $src_worker_id, $data)
{
$st = microtime(true);
$param = json_decode($data, true);
$data = $param['data'];
$fd = $param['fd'];
$return = ['code' => 2, 'msg' => 'mqtt 消息发送失败'];
foreach ($this->mqtt_connect as $key=>$value) {
if ($value['minDevNo'] < $data['device_id'] && $value['maxDevNo'] > $data['device_id']) {
$res = send_message($value['obj'], $data['mqtt_topic'], $data['message']);
if ($res) {
$return['code'] = 1;
$return['msg'] = 'mqtt 消息发送成功';
echo "接收到数据".$data['message'].', 发往'.$data['mqtt_topic']."成功".time()."\r\n";
}else{
//断线重连
echo "接收到数据".$data['message'].', 发往'.$data['mqtt_topic']."失败,重新连接 mqtt\r\n";
foreach ($this->mqtt_config as $k=>$item) {
$mqtt = get_mqtt($item, 'o2o_mqtt', []);
if (!$mqtt) {
$serv->stop($serv->worker_id, true);
}
$mqtt_arr[$k] = $mqtt;
}
$this->mqtt_connect = $mqtt_arr;
}
}
}
$res = json_encode($return);
$serv->send($fd, $res);
$et = microtime(true);
echo "任务{$src_worker_id}-{$serv->worker_id}-{$task_id}完成,花费时间".($et-$st)."S\r\n";
return $res;
}
//此函数在 worker 进程中执行
public function onFinish($serv, $task_id, $data)
{
//echo "{$task_id}回调完成\r\n";
}
public function onClose($server, $fd, $reactorId)
{
//echo "Client: close.\n";
}
}
$serv = new TcpServer();
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.