ComposerUpdate

This commit is contained in:
Anyon 2020-05-02 09:43:07 +08:00
parent 0efb9180c1
commit 8c6e29c7cd
7 changed files with 42 additions and 74 deletions

8
composer.lock generated
View File

@ -909,12 +909,12 @@
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/zoujingli/ThinkLibrary.git", "url": "https://github.com/zoujingli/ThinkLibrary.git",
"reference": "646ddaf7017822aabbc9aa7c3e8e4e0aa095c515" "reference": "bff122f985c71ac8cf37139c6ce6dbba83482df1"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/zoujingli/ThinkLibrary/zipball/646ddaf7017822aabbc9aa7c3e8e4e0aa095c515", "url": "https://api.github.com/repos/zoujingli/ThinkLibrary/zipball/bff122f985c71ac8cf37139c6ce6dbba83482df1",
"reference": "646ddaf7017822aabbc9aa7c3e8e4e0aa095c515", "reference": "bff122f985c71ac8cf37139c6ce6dbba83482df1",
"shasum": "", "shasum": "",
"mirrors": [ "mirrors": [
{ {
@ -958,7 +958,7 @@
], ],
"description": "ThinkPHP v6.0 Development Library", "description": "ThinkPHP v6.0 Development Library",
"homepage": "http://framework.thinkadmin.top", "homepage": "http://framework.thinkadmin.top",
"time": "2020-05-01T12:57:12+00:00" "time": "2020-05-02T01:35:55+00:00"
}, },
{ {
"name": "zoujingli/wechat-developer", "name": "zoujingli/wechat-developer",

View File

@ -935,12 +935,12 @@
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/zoujingli/ThinkLibrary.git", "url": "https://github.com/zoujingli/ThinkLibrary.git",
"reference": "646ddaf7017822aabbc9aa7c3e8e4e0aa095c515" "reference": "bff122f985c71ac8cf37139c6ce6dbba83482df1"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/zoujingli/ThinkLibrary/zipball/646ddaf7017822aabbc9aa7c3e8e4e0aa095c515", "url": "https://api.github.com/repos/zoujingli/ThinkLibrary/zipball/bff122f985c71ac8cf37139c6ce6dbba83482df1",
"reference": "646ddaf7017822aabbc9aa7c3e8e4e0aa095c515", "reference": "bff122f985c71ac8cf37139c6ce6dbba83482df1",
"shasum": "", "shasum": "",
"mirrors": [ "mirrors": [
{ {
@ -956,7 +956,7 @@
"ext-json": "*", "ext-json": "*",
"topthink/framework": "^6.0" "topthink/framework": "^6.0"
}, },
"time": "2020-05-01T12:57:12+00:00", "time": "2020-05-02T01:35:55+00:00",
"type": "library", "type": "library",
"extra": { "extra": {
"think": { "think": {

2
vendor/services.php vendored
View File

@ -1,5 +1,5 @@
<?php <?php
// This file is automatically generated at:2020-05-01 21:06:44 // This file is automatically generated at:2020-05-02 09:41:51
declare (strict_types = 1); declare (strict_types = 1);
return array ( return array (
0 => 'think\\app\\Service', 0 => 'think\\app\\Service',

View File

@ -15,11 +15,7 @@
namespace think\admin\command; namespace think\admin\command;
use Psr\Log\NullLogger;
use think\admin\Command; use think\admin\Command;
use think\admin\service\ProcessService;
use think\console\Input;
use think\console\Output;
/** /**
* 系统任务基类 * 系统任务基类
@ -33,21 +29,4 @@ abstract class Queue extends Command
* @var string * @var string
*/ */
protected $table = 'SystemQueue'; protected $table = 'SystemQueue';
/**
* 进程服务对象
* @var ProcessService
*/
protected $process;
/**
* 任务指令初始化
* @param Input $input
* @param Output $output
*/
protected function initialize(Input $input, Output $output)
{
$this->app->db->setLog(new NullLogger());
$this->process = ProcessService::instance();
}
} }

View File

@ -15,6 +15,7 @@
namespace think\admin\command\queue; namespace think\admin\command\queue;
use Psr\Log\NullLogger;
use think\admin\command\Queue; use think\admin\command\Queue;
use think\Collection; use think\Collection;
use think\console\Input; use think\console\Input;
@ -35,6 +36,17 @@ class ListenQueue extends Queue
$this->setName('xtask:listen')->setDescription('Start task listening main process'); $this->setName('xtask:listen')->setDescription('Start task listening main process');
} }
/**
* 初始化任务监听指令
* @param Input $input
* @param Output $output
*/
protected function initialize(Input $input, Output $output)
{
parent::initialize($input, $output);
$this->app->db->setLog(new NullLogger());
}
/** /**
* 启动进程守护监听 * 启动进程守护监听
* @param Input $input 输入对象 * @param Input $input 输入对象

View File

@ -28,25 +28,12 @@ use think\console\Output;
*/ */
class WorkQueue extends Queue class WorkQueue extends Queue
{ {
/** /**
* 当前任务编号 * 执行任务编号
* @var string * @var string
*/ */
protected $code; protected $code;
/**
* 当前任务数据
* @var array
*/
protected $queue;
/**
* 当前任务服务
* @var QueueService
*/
protected $qService;
/** /**
* 配置指定信息 * 配置指定信息
*/ */
@ -70,35 +57,34 @@ class WorkQueue extends Queue
if (empty($this->code)) { if (empty($this->code)) {
$this->output->error('Task number needs to be specified for task execution'); $this->output->error('Task number needs to be specified for task execution');
} else try { } else try {
$this->queue = $this->app->db->name($this->table)->where(['code' => $this->code, 'status' => '1'])->find(); $this->queue->initialize($this->code);
$this->qService = QueueService::instance()->initialize($this->code); if (empty($this->queue->record) || intval($this->queue->record['status']) !== 1) {
if (empty($this->queue)) {
// 这里不做任何处理(该任务可能在其它地方已经在执行) // 这里不做任何处理(该任务可能在其它地方已经在执行)
$this->output->warning($message = "The or status of task {$this->code} is abnormal"); $this->output->warning($message = "The or status of task {$this->code} is abnormal");
} else { } else {
// 锁定任务状态 // 锁定任务状态,防止任务再次被执行
$this->app->db->name($this->table)->strict(false)->where(['code' => $this->code])->update([ $this->app->db->name($this->table)->strict(false)->where(['code' => $this->code])->update([
'enter_time' => microtime(true), 'attempts' => $this->app->db->raw('attempts+1'), 'enter_time' => microtime(true), 'attempts' => $this->app->db->raw('attempts+1'),
'outer_time' => '0', 'exec_pid' => getmypid(), 'exec_desc' => '', 'status' => '2', 'outer_time' => '0', 'exec_pid' => getmypid(), 'exec_desc' => '', 'status' => '2',
]); ]);
$this->qService->progress(2, '>>> 任务处理开始 <<<', 0); $this->queue->progress(2, '>>> 任务处理开始 <<<', 0);
// 设置进程标题 // 设置进程标题
if ($this->process->iswin()) { if ($this->process->iswin()) {
$this->setProcessTitle("ThinkAdmin {$this->process->version()} Queue - {$this->queue['title']}"); $this->setProcessTitle("ThinkAdmin {$this->process->version()} Queue - {$this->queue->title}");
} }
// 执行任务内容 // 执行任务内容
defined('WorkQueueCall') or define('WorkQueueCall', true); defined('WorkQueueCall') or define('WorkQueueCall', true);
defined('WorkQueueCode') or define('WorkQueueCode', $this->code); defined('WorkQueueCode') or define('WorkQueueCode', $this->code);
if (class_exists($command = $this->queue['command'])) { if (class_exists($command = $this->queue->record['command'])) {
// 自定义服务,支持返回消息(支持异常结束,异常码可选择 3|4 设置任务状态) // 自定义服务,支持返回消息(支持异常结束,异常码可选择 3|4 设置任务状态)
if (method_exists($command, 'instance') && ($class = $command::instance()) instanceof QueueService) { if (method_exists($command, 'instance') && ($class = $command::instance()) instanceof QueueService) {
$this->update('3', $class->initialize($this->code)->execute(json_decode($this->queue['exec_data'], true) ?: [])); $this->update('3', $class->initialize($this->code)->execute($this->queue->data));
} else { } else {
throw new \think\admin\Exception("自定义 {$command} 未继承 QueueService"); throw new \think\admin\Exception("自定义 {$command} 未继承 QueueService");
} }
} else { } else {
// 自定义指令,不支持返回消息(支持异常结束,异常码可选择 3|4 设置任务状态) // 自定义指令,不支持返回消息(支持异常结束,异常码可选择 3|4 设置任务状态)
$attr = explode(' ', trim(preg_replace('|\s+|', ' ', $this->queue['command']))); $attr = explode(' ', trim(preg_replace('|\s+|', ' ', $this->queue->record['command'])));
$this->update('3', $this->app->console->call(array_shift($attr), $attr)->fetch(), false); $this->update('3', $this->app->console->call(array_shift($attr), $attr)->fetch(), false);
} }
} }
@ -127,19 +113,19 @@ class WorkQueue extends Queue
$this->output->writeln(is_string($message) ? $message : ''); $this->output->writeln(is_string($message) ? $message : '');
// 任务进度标记 // 任务进度标记
if (!empty($desc[0])) { if (!empty($desc[0])) {
$this->qService->progress($status, ">>> {$desc[0]} <<<"); $this->queue->progress($status, ">>> {$desc[0]} <<<");
} }
if ($status == 3) { if ($status == 3) {
$this->qService->progress($status, '>>> 任务处理完成 <<<', 100); $this->queue->progress($status, '>>> 任务处理完成 <<<', 100);
} elseif ($status == 4) { } elseif ($status == 4) {
$this->qService->progress($status, '>>> 任务处理失败 <<<'); $this->queue->progress($status, '>>> 任务处理失败 <<<');
} }
// 注册循环任务 // 注册循环任务
if (isset($this->queue['loops_time']) && $this->queue['loops_time'] > 0) { if (isset($this->queue->record['loops_time']) && $this->queue->record['loops_time'] > 0) {
try { try {
$this->qService->initialize($this->code)->reset($this->queue['loops_time']); $this->queue->initialize($this->code)->reset($this->queue->record['loops_time']);
} catch (\Exception|\Error $exception) { } catch (\Exception|\Error $exception) {
$this->app->log->error("Queue {$this->queue['code']} Loops Failed. {$exception->getMessage()}"); $this->app->log->error("Queue {$this->queue->record['code']} Loops Failed. {$exception->getMessage()}");
} }
} }
} }

View File

@ -48,7 +48,7 @@ class QueueService extends Service
* 当前任务数据 * 当前任务数据
* @var array * @var array
*/ */
public $queue = []; public $record = [];
/** /**
* 数据初始化 * 数据初始化
@ -63,26 +63,17 @@ class QueueService extends Service
{ {
if (!empty($code)) { if (!empty($code)) {
$this->code = $code; $this->code = $code;
$this->queue = $this->app->db->name('SystemQueue')->where(['code' => $this->code])->find(); $this->record = $this->app->db->name('SystemQueue')->where(['code' => $this->code])->find();
if (empty($this->queue)) { if (empty($this->record)) {
$this->app->log->error("Qeueu initialize failed, Queue {$code} not found."); $this->app->log->error("Qeueu initialize failed, Queue {$code} not found.");
throw new \think\admin\Exception("Qeueu initialize failed, Queue {$code} not found."); throw new \think\admin\Exception("Qeueu initialize failed, Queue {$code} not found.");
} }
list($this->code, $this->title) = [$this->queue['code'], $this->queue['title']]; [$this->code, $this->title] = [$this->record['code'], $this->record['title']];
$this->data = json_decode($this->queue['exec_data'], true) ?: []; $this->data = json_decode($this->record['exec_data'], true) ?: [];
} }
return $this; return $this;
} }
/**
* 判断是否WIN环境
* @return boolean
*/
protected function iswin()
{
return ProcessService::instance()->iswin();
}
/** /**
* 重发异步任务 * 重发异步任务
* @param integer $wait 等待时间 * @param integer $wait 等待时间
@ -94,7 +85,7 @@ class QueueService extends Service
*/ */
public function reset($wait = 0) public function reset($wait = 0)
{ {
if (empty($this->queue)) { if (empty($this->record)) {
$this->app->log->error("Qeueu reset failed, Queue {$this->code} data cannot be empty!"); $this->app->log->error("Qeueu reset failed, Queue {$this->code} data cannot be empty!");
throw new \think\admin\Exception("Qeueu reset failed, Queue {$this->code} data cannot be empty!"); throw new \think\admin\Exception("Qeueu reset failed, Queue {$this->code} data cannot be empty!");
} }