code = $code; $this->queue = $this->app->db->name('SystemQueue')->where(['code' => $this->code])->find(); if (empty($this->queue)) { $this->app->log->error("Qeueu initialize failed, Queue {$code} not found."); throw new \think\Exception("Qeueu initialize failed, Queue {$code} not found."); } $this->code = $this->queue['code']; $this->title = $this->queue['title']; $this->data = json_decode($this->queue['exec_data'], true) ?: []; } return $this; } /** * 判断是否WIN环境 * @return boolean */ protected function iswin() { return ProcessService::instance()->iswin(); } /** * 重发异步任务 * @param integer $wait 等待时间 * @return $this * @throws \think\Exception * @throws \think\db\exception\DataNotFoundException * @throws \think\db\exception\DbException * @throws \think\db\exception\ModelNotFoundException */ public function reset($wait = 0) { if (empty($this->queue)) { $this->app->log->error("Qeueu reset failed, Queue {$this->code} data cannot be empty!"); throw new \think\Exception("Qeueu reset failed, Queue {$this->code} data cannot be empty!"); } $map = ['code' => $this->code]; $this->app->db->name('SystemQueue')->where($map)->strict(false)->failException(true)->update([ 'exec_pid' => '0', 'exec_time' => time() + $wait, 'status' => '1', ]); return $this->initialize($this->code); } /** * 添加清理7天前的记录及超时任务 * @throws \think\Exception * @throws \think\db\exception\DataNotFoundException * @throws \think\db\exception\DbException * @throws \think\db\exception\ModelNotFoundException */ public function addCleanQueue() { $this->register('清理7天前记录及执行超时的任务', "xtask:clean", 0, [], 0, 3600); } /** * 注册异步处理任务 * @param string $title 任务名称 * @param string $command 执行内容 * @param integer $later 延时时间 * @param array $data 任务附加数据 * @param integer $rscript 任务类型(0单例,1多例) * @param integer $loops 循环等待时间 * @return $this * @throws \think\Exception * @throws \think\db\exception\DataNotFoundException * @throws \think\db\exception\DbException * @throws \think\db\exception\ModelNotFoundException */ public function register($title, $command, $later = 0, $data = [], $rscript = 1, $loops = 0) { $map = [['title', '=', $title], ['status', 'in', ['1', '2']]]; if (empty($rscript) && $this->app->db->name('SystemQueue')->where($map)->count() > 0) { throw new \think\Exception(lang('think_library_queue_exist')); } $this->app->db->name('SystemQueue')->strict(false)->failException(true)->insert([ 'code' => $this->code = 'QE' . CodeExtend::uniqidDate(16), 'title' => $title, 'command' => $command, 'attempts' => '0', 'rscript' => intval(boolval($rscript)), 'exec_data' => json_encode($data, JSON_UNESCAPED_UNICODE), 'exec_time' => $later > 0 ? time() + $later : time(), 'enter_time' => '0', 'outer_time' => '0', 'loops_time' => $loops, ]); return $this->initialize($this->code); } /** * 执行任务处理 * @param array $data 任务参数 * @return mixed */ public function execute(array $data = []) { } }