setName('xtask:_work')->setDescription('Create a process to execute task'); $this->addArgument('code', Argument::OPTIONAL, 'TaskNumber'); $this->addArgument('spts', Argument::OPTIONAL, 'Separator'); } /** * 执行指令的任务 * @param Input $input 输入对象 * @param Output $output 输出对象 * @throws \think\db\exception\DbException */ protected function execute(Input $input, Output $output) { set_time_limit(0); $this->code = trim($input->getArgument('code')); if (empty($this->code)) { $this->output->error('Task number needs to be specified for task execution'); } else try { $this->queue = $this->app->db->name($this->table)->where(['code' => $this->code, 'status' => '1'])->find(); $this->qService = QueueService::instance()->initialize($this->code); if (empty($this->queue)) { // 这里不做任何处理(该任务可能在其它地方已经在执行) $this->output->warning($message = "The or status of task {$this->code} is abnormal"); } else { // 锁定任务状态 $this->app->db->name($this->table)->strict(false)->where(['code' => $this->code])->update([ 'enter_time' => microtime(true), 'attempts' => $this->app->db->raw('attempts+1'), 'outer_time' => '0', 'exec_pid' => getmypid(), 'exec_desc' => '', 'status' => '2', ]); $this->qService->progress(2, '>>> 任务处理开始 <<<', 0); // 设置进程标题 if ($this->process->iswin()) { $this->setProcessTitle("ThinkAdmin {$this->process->version()} Queue - {$this->queue['title']}"); } // 执行任务内容 defined('WorkQueueCall') or define('WorkQueueCall', true); defined('WorkQueueCode') or define('WorkQueueCode', $this->code); if (class_exists($command = $this->queue['command'])) { // 自定义服务,支持返回消息(支持异常结束,异常码可选择 3|4 设置任务状态) if (method_exists($command, 'instance') && ($class = $command::instance()) instanceof QueueService) { $this->update('3', $class->initialize($this->code)->execute(json_decode($this->queue['exec_data'], true) ?: [])); } else { throw new \think\admin\Exception("自定义 {$command} 未继承 QueueService"); } } else { // 自定义指令,不支持返回消息(支持异常结束,异常码可选择 3|4 设置任务状态) $attr = explode(' ', trim(preg_replace('|\s+|', ' ', $this->queue['command']))); $this->update('3', $this->app->console->call(array_shift($attr), $attr)->fetch(), false); } } } catch (\Exception|\Error $exception) { $code = $exception->getCode(); if (intval($code) !== 3) $code = 4; $this->update($code, $exception->getMessage()); } } /** * 修改当前任务状态 * @param integer $status 任务状态 * @param string $message 消息内容 * @param boolean $issplit 是否分隔 * @throws \think\db\exception\DbException */ protected function update($status, $message, $issplit = true) { // 更新当前任务 $info = trim(is_string($message) ? $message : ''); $desc = $issplit ? explode("\n", $info) : [$message]; $this->app->db->name($this->table)->strict(false)->where(['code' => $this->code])->update([ 'status' => $status, 'outer_time' => microtime(true), 'exec_pid' => getmypid(), 'exec_desc' => $desc[0], ]); $this->output->writeln(is_string($message) ? $message : ''); // 任务进度标记 if (!empty($desc[0])) { $this->qService->progress($status, ">>> {$desc[0]} <<<"); } if ($status == 3) { $this->qService->progress($status, '>>> 任务处理完成 <<<', 100); } elseif ($status == 4) { $this->qService->progress($status, '>>> 任务处理失败 <<<'); } // 注册循环任务 if (isset($this->queue['loops_time']) && $this->queue['loops_time'] > 0) { try { $this->qService->initialize($this->code)->reset($this->queue['loops_time']); } catch (\Exception $exception) { $this->app->log->error("Queue {$this->queue['code']} Loops Failed. {$exception->getMessage()}"); } } } }