住房建设部官方网站,辽宁省建设厅网站中级职称公示,最新网游排行榜2024,黄山网站网站建设背景
公司准备引入swoole和rabbitmq来处理公司业务。因此#xff0c;我引入hyperf框架#xff0c;想用swoole的多进程来实现。
自定义启动服务封装
?php
/*** 进程启动服务【manager】*/
declare(strict_types1);namespace App\Command;use Swoole;
use Swoole\Proce…背景
公司准备引入swoole和rabbitmq来处理公司业务。因此我引入hyperf框架想用swoole的多进程来实现。
自定义启动服务封装
?php
/*** 进程启动服务【manager】*/
declare(strict_types1);namespace App\Command;use Swoole;
use Swoole\Process;
use Swoole\Process\Pool;
use App\Process\BaseProcess;
use Hyperf\Command\Command as HyperfCommand;
use Psr\Container\ContainerInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;/*** Command*/
#[Command]
class TaskProcessCommand extends HyperfCommand
{const MANAGER_PROCESS_PID_PATH BASE_PATH . /runtime/taskProcess.pid;/*** var ContainerInterface*/protected $container;protected $coroutine false;public function __construct(ContainerInterface $container){$this-container $container;parent::__construct(task);}public function configure(){parent::configure();$this-setDescription(自定义进程任务);$this-addOption(daemonize, d, InputOption::VALUE_NONE, 守护进程化);$this-addArgument(action, InputArgument::REQUIRED, start/stop/restart 启动/关闭/重启);}public function handle(){$action $this-input-getArgument(action);if ($action start) {$this-start();} elseif ($action stop) {$this-stop();} elseif ($action restart) {$this-restart();} else {echo 不支持的action, 请输入 -h 参数查看 . PHP_EOL;}}/*** 重启php bin/hyperf.php task restart*/protected function restart(){$this-stop();$this-start();}/*** 停止php bin/hyperf.php task stop*/protected function stop(){if (file_exists(self::MANAGER_PROCESS_PID_PATH)) {//后期可以写入数据表根据状态进行重启$managerPid file_get_contents(self::MANAGER_PROCESS_PID_PATH);echo stopping...\n;echo kill pid $managerPid \n;$managerPid intval($managerPid);$startTime time();$timeout config(server.settings.max_wait_time, 10);Process::kill($managerPid);//等待主进程结束while (Process::kill($managerPid, 0)) {//waiting process stopecho waiting...\r;usleep(100000);echo \r;echo waiting.\r;usleep(100000);echo \r;//超时 强杀所有子进程if ($managerPid 0 time() - $startTime $timeout) {echo wait timeout, kill -9 child process, pid: $managerPid \n;echo shell_exec(ps -ef|awk $3~/^{$managerPid}$/) . PHP_EOL;echo shell_exec(ps -ef|awk $3~/^{$managerPid}$/ {print $2}|xargs kill -9) . PHP_EOL;}}unlink(self::MANAGER_PROCESS_PID_PATH);echo stopped. \n;} else {echo 找不到manager pid, path: . self::MANAGER_PROCESS_PID_PATH;}}/*** 启动php bin/hyperf.php task start* 守护进程启动php bin/hyperf.php task start -d*/protected function start(){$processConfig config(processes);if ($processConfig) {echo start now.\n;$daemonize $this-input-getOption(daemonize);if ($daemonize) {//重定向标准输出到指定日志文件fclose(STDOUT);fclose(STDERR);$STDOUT fopen(BASE_PATH . /runtime/logs/taskProcess_output.log, ab);$STDERR fopen(BASE_PATH . /runtime/logs/taskProcess_error.log, ab);Process::daemon(true, true);}//save pidfile_put_contents(self::MANAGER_PROCESS_PID_PATH, getmypid());//TODO 后期可以根据需要写入配置或者数据表开启多个主进程、挂载多个子进程BaseProcess::setProcessName(manager);//主进程$startFuncMap [];foreach ($processConfig as $processClass) {$processObj new $processClass;if ($processObj-isEnable ($processObj instanceof BaseProcess) isset($processObj-nums) $processObj-nums 0) {for ($i 0; $i $processObj-nums; $i) {$startFuncMap[] [[$processObj, handle],$processObj-enableCoroutine ?? false,$i,];}}}$pool new Pool(count($startFuncMap), SWOOLE_IPC_UNIXSOCK, 0, false);$pool-on(workerStart, function (Pool $pool, int $workerId) use ($startFuncMap) {[$func, $enableCoroutine, $idx] $startFuncMap[$workerId];if ($enableCoroutine) {run(function () use ($func, $pool, $workerId, $idx) {$pm $func[0];//process下的类$idx 1;BaseProcess::setProcessName($pm-name . [{$idx}/{$pm-nums}]);//多个子进程call_user_func($func, $pool, $workerId);});} else {$func($pool, $workerId);//baseProcess下的handle}});$pool-on(Message, function (Swoole\Process\Pool $pool, string $data) {echo process Message,data .json_encode($data). PHP_EOL;});//进程关闭$pool-on(WorkerStop, function (Swoole\Process\Pool $pool, int $workerId) {echo process WorkerId{$workerId} is stopped. PHP_EOL;});$pool-start();} else {printf(没有可启动的自定义进程, 请在配置task_process中声明且继承%s\n, BaseProcess::class);}}/*** 查看运行状态php bin/hyperf.php task status*/protected function status(){//TODO 查看任务执行状态}public function getProcess($pid -1){if ($pid -1) {$pid getmypid();}return static::$process[$pid] ?? null;}public function getAllProcess(){return static::$process;}
}
基础process封装
此处可以用hyperf框架自带的也可以自己封装
?phpdeclare (strict_types 1);namespace App\Process;use Swoole;
use Swoole\Process\Pool;abstract class BaseProcess {/*** 进程数* var integer*/public $nums 0;/*** 进程名称* var string*/public $name ;/*** 是否启用协程* var bool*/public $enableCoroutine true;/*** 是否随进程启动服务* var bool*/public $isEnable true;protected $isRunning true;protected $process;static $signal 0;function __construct() {//进程自动命名if (empty($this-name)) {$this-name trim(str_replace(\\, ., str_replace(__NAMESPACE__, , get_called_class())), .);}}final public function handle(Pool $pool, int $workerId): void {try {$this-processInit($pool-getProcess());$this-beforeRun();while (true) {//进程结束信号if (BaseProcess::$signal SIGTERM) {$this-onProcessExit();break;}$this-run();}} catch (\Throwable $e) {throw $e;}}protected function onProcessExit() {$this-isRunning false;}protected function processInit($process) {$this-process $process;echo process {$this-name} start, pid: . getmypid().PHP_EOL;//注册信号处理器实现优雅重启(等待任务执行完后或者等待超时)pcntl_signal(SIGTERM, function () {BaseProcess::$signal SIGTERM;$maxWaitTime config(server.settings.max_wait_time, 5);$sTime time();//检查进程任务状态Swoole\Timer::tick(500, function () use ($sTime, $maxWaitTime) {$coStat \Swoole\Coroutine::stats();//如果主循环结束且其它协程任务执行完清理定时器以退出进程if (!$this-isRunning $coStat[coroutine_num] 1) {Swoole\Timer::clearAll();$this-process-exit();}//等待超时强制结束进程elseif (time() - $sTime $maxWaitTime) {Swoole\Timer::clearAll();if ($this-isRunning) {$this-onProcessExit();}$this-process-exit();}});});}public static function setProcessName(string $name) {swoole_set_process_name(env(APP_NAME, app) . .taskProcess. . $name);}/*** 事件循环前调用* return [type] [description]*/abstract function beforeRun();/*** 事件循环注意这里不能使用死循环* return [type] [description]*/abstract function run();}使用demo
demo1
?phpdeclare (strict_types 1);namespace App\Process;/*** test*/
class TestProcess extends BaseProcess {/*** 进程数* var integer*/public $nums 5;public $enableCoroutine true;/*** 不随服务启动进程* var bool */public $isEnable false;public function beforeRun() {//事件循环前执行比如一些初始化工作}public function run() {//事件循环主体echo date(Y-m-d H:i:s).PHP_EOL;usleep(1000);}}demo2
?phpnamespace App\Process;use App\Amqp\Producer\JbtyProducer;
use App\Amqp\Producer\UpdateZeroStockProducer;
use App\Library\Jbchip\JbchipRequest;
use App\Model\HqchipGoodsModel;
use App\Model\IcbaseGoodsModel;
use App\Model\JbtyGoodsModel;
use App\Model\LcscGoodsModel;
use App\Model\OneyacGoodsModel;
use Hyperf\Amqp\Producer;
use Hyperf\Redis\Redis;
use Hyperf\Utils\ApplicationContext;class UpdateZeroStock extends BaseProcess
{const ZERO_STOCK_KEY platform_zero_stock_cache_key;/*** 进程数* var integer*/public $nums 1;public $enableCoroutine true;/*** 随服务启动进程* var bool */public $isEnabletrue;public function beforeRun() {//事件循环前执行比如一些初始化工作}public function run() {//事件循环主体$this-updateZeroStock();echo date(Y-m-d H:i:s).PHP_EOL;sleep(300);}public function updateZeroStock(){// 1.全量更新$list_hq HqchipGoodsModel::select([id,spu,stock,manufacturer])-where(excute_time,,8)-limit(1000)-get();$container ApplicationContext::getContainer();$redis $container-get(Redis::class);$producer ApplicationContext::getContainer()-get(Producer::class);$today date(Y-m-d);if($list_hq){foreach ($list_hq as $item){$spu trim($item[spu]);$zeroStockKey $this-getZeroStockKey($today,hqchip,$item[manufacturer]);if($redis-exists($zeroStockKey) !$redis-hGet($zeroStockKey,$spu)){$sendData $item;$sendData[appKey] $this-appSecretKey();$sendData[platform] hqchip;$message new UpdateZeroStockProducer($sendData);$res $producer-produce($message);echo date(Y-m-d H:i:s) . rabbitmq hqchip sendMq: .$res . PHP_EOL;}}}}/*** 零库存缓存KEY* param $brand* param $sku* return string*/private function getZeroStockKey($day,$platfrom,$brand){return self::ZERO_STOCK_KEY .:. $platfrom .: . $day . : . $brand;}/*** 密钥生产* return string*/private function appSecretKey(){$a chipmall-spiderV2 . date(Y-m-d);$appKey base64_encode(md5($a) .||. base64_encode(time() . | . $a));return $appKey;}
}在配置中进程需要执行的服务 以守护进程方式启动服务
php bin/hyperf.php task start -d查看进程命令
ps -ef|grep taskProcess疑惑
这次封装还存在两个点需要完善 1.重复执行
php bin/hyperf.php task start -d会启动多个manager进程
2、没有封装查看进程状态的status方法