ComposerUpdate

This commit is contained in:
Anyon 2020-03-23 14:59:52 +08:00
parent 097497f87a
commit 93dacf03fd
25 changed files with 124 additions and 122 deletions

View File

@ -28,11 +28,15 @@ class Queue extends Controller
/** /**
* 任务进度查询 * 任务进度查询
* @login true * @login true
* @throws \think\admin\Exception
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/ */
public function progress() public function progress()
{ {
$input = $this->_vali(['code.require' => '任务编号不能为空!']); $input = $this->_vali(['code.require' => '任务编号不能为空!']);
$result = QueueService::instance()->progress($input['code']); $result = QueueService::instance()->initialize($input['code'])->progress();
$this->success('获取任务进度成功!', $result); $this->success('获取任务进度成功!', $result);
} }

View File

@ -112,12 +112,14 @@
<i class="layui-icon font-s12">&#xe669;</i> <i class="layui-icon font-s12">&#xe669;</i>
</a> </a>
{/eq} {/eq}
{if auth("remove") }
{if auth("remove") and in_array($vo.status,[1,3,4])}
<a data-action='{:url("remove")}' data-confirm="确定要删除该任务吗?" data-value="id#{$vo.id}" data-tips-text="删除该任务" class='layui-badge layui-bg-red margin-left-5'> <a data-action='{:url("remove")}' data-confirm="确定要删除该任务吗?" data-value="id#{$vo.id}" data-tips-text="删除该任务" class='layui-badge layui-bg-red margin-left-5'>
<i class="layui-icon">&#xe640;</i> <i class="layui-icon">&#xe640;</i>
</a> </a>
{/if} {/if}
<a onclick="$.loadQueue('{$vo.code}')" data-tips-text="任务进度信息" class='layui-badge layui-bg-orange margin-left-5'>
<i class="layui-icon">&#xe705;</i>
</a>
</div> </div>
<div class="color-desc">{$vo.exec_desc|raw|default="没有获取到状态描述"}</div> <div class="color-desc">{$vo.exec_desc|raw|default="没有获取到状态描述"}</div>
</td> </td>

View File

@ -56,7 +56,7 @@ class Fans extends Command
$message .= $this->$method(); $message .= $this->$method();
} }
} }
$this->queueProgressState(3, $message); $this->setQueueMessage(3, $message);
} }
/** /**
@ -67,6 +67,7 @@ class Fans extends Command
* @throws \WeChat\Exceptions\InvalidResponseException * @throws \WeChat\Exceptions\InvalidResponseException
* @throws \WeChat\Exceptions\LocalCacheException * @throws \WeChat\Exceptions\LocalCacheException
* @throws \think\Exception * @throws \think\Exception
* @throws \think\admin\Exception
* @throws \think\db\exception\DataNotFoundException * @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException * @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException * @throws \think\db\exception\ModelNotFoundException
@ -81,7 +82,7 @@ class Fans extends Command
foreach ($list['user_info_list'] as $user) { foreach ($list['user_info_list'] as $user) {
$string = str_pad(++$done, strlen($result['total']), '0', STR_PAD_LEFT); $string = str_pad(++$done, strlen($result['total']), '0', STR_PAD_LEFT);
$message = "({$string}/{$result['total']}) -> {$user['openid']} {$user['nickname']}"; $message = "({$string}/{$result['total']}) -> {$user['openid']} {$user['nickname']}";
$this->queueProgressMessage(2, $message, $done * 100 / $result['total']); $this->setQueueProgress(2, $message, $done * 100 / $result['total']);
FansService::instance()->set($user, $appid); FansService::instance()->set($user, $appid);
} }
} }

View File

@ -1,26 +0,0 @@
<?php
namespace app\wechat\command;
use think\admin\Command;
use think\console\Input;
use think\console\Output;
class Tests extends Command
{
protected function configure()
{
$this->setName('xadmin:tests')->setDescription('指令类任务测试');
}
protected function execute(Input $input, Output $output)
{
$max = 10000;
for ($i = 0; $i < $max; $i++) {
$this->queueProgressMessage(2, "已经完成了 $i 的计算", $i / $max * 100);
usleep(5000);
}
}
}

View File

@ -71,21 +71,7 @@ class Fans extends Controller
*/ */
public function sync() public function sync()
{ {
try { $this->_queue('同步微信用户数据', "xadmin:fansall", 1, [], 0);
$code = sysqueue('同步微信用户所有数据', "xadmin:fansall", 1, [], 0);
$this->success('创建任务成功,请等待完成!', $code);
} catch (Exception $exception) {
$code = $exception->getData();
if (is_string($code) && stripos($code, 'Q') === 0) {
$this->success('任务已经存在,无需再次创建!', $code);
} else {
$this->error($exception->getMessage());
}
} catch (HttpResponseException $exception) {
throw $exception;
} catch (\Exception $exception) {
$this->error("创建任务失败,{$exception->getMessage()}");
}
} }
/** /**

View File

@ -4,5 +4,4 @@ use think\Console;
Console::starting(function (Console $console) { Console::starting(function (Console $console) {
$console->addCommand('app\wechat\command\Fans'); $console->addCommand('app\wechat\command\Fans');
$console->addCommand('app\wechat\command\Tests');
}); });

8
composer.lock generated
View File

@ -909,12 +909,12 @@
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/zoujingli/ThinkLibrary.git", "url": "https://github.com/zoujingli/ThinkLibrary.git",
"reference": "6b6a6ff35710f534980ac8f14ee2bfd7531eaa2f" "reference": "f7d1be29830dc3d8c149e48dda510127d8cbddb5"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/zoujingli/ThinkLibrary/zipball/6b6a6ff35710f534980ac8f14ee2bfd7531eaa2f", "url": "https://api.github.com/repos/zoujingli/ThinkLibrary/zipball/f7d1be29830dc3d8c149e48dda510127d8cbddb5",
"reference": "6b6a6ff35710f534980ac8f14ee2bfd7531eaa2f", "reference": "f7d1be29830dc3d8c149e48dda510127d8cbddb5",
"shasum": "", "shasum": "",
"mirrors": [ "mirrors": [
{ {
@ -958,7 +958,7 @@
], ],
"description": "ThinkPHP v6.0 Development Library", "description": "ThinkPHP v6.0 Development Library",
"homepage": "http://framework.thinkadmin.top", "homepage": "http://framework.thinkadmin.top",
"time": "2020-03-23T03:44:18+00:00" "time": "2020-03-23T06:54:54+00:00"
}, },
{ {
"name": "zoujingli/wechat-developer", "name": "zoujingli/wechat-developer",

View File

@ -739,14 +739,19 @@ $(function () {
$body.on('click', '[data-queue]', function () { $body.on('click', '[data-queue]', function () {
var action = this.getAttribute('data-queue') || ''; var action = this.getAttribute('data-queue') || '';
if (action.length < 1) return $.msg.tips('任务地址不能为空!'); if (action.length < 1) return $.msg.tips('任务地址不能为空!');
$.msg.confirm(title, function (index) { this.loading = function () {
$.form.load(action, {}, 'post', function (ret) { $.form.load(action, {}, 'post', function (ret) {
if (typeof ret.data === 'string' && ret.data.indexOf('Q') === 0) { if (typeof ret.data === 'string' && ret.data.indexOf('Q') === 0) {
return $.loadQueue(ret.data), false; return $.loadQueue(ret.data), false;
} }
}); });
$.msg.close(index); $.msg.close(index);
}) };
if ($(this).attr('data-confirm')) {
return $.msg.confirm($(this).attr('data-confirm'), this.loading);
} else {
this.loading();
}
}); });
$.loadQueue = function (code) { $.loadQueue = function (code) {
layer.open({ layer.open({

2
vendor/autoload.php vendored
View File

@ -4,4 +4,4 @@
require_once __DIR__ . '/composer/autoload_real.php'; require_once __DIR__ . '/composer/autoload_real.php';
return ComposerAutoloaderInit9eebf462f38fc6a0db482795c8caf813::getLoader(); return ComposerAutoloaderInitd7300eea771f6ce10908a45ed531efa1::getLoader();

View File

@ -203,7 +203,6 @@ return array(
'app\\admin\\controller\\api\\Upload' => $baseDir . '/app/admin/controller/api/Upload.php', 'app\\admin\\controller\\api\\Upload' => $baseDir . '/app/admin/controller/api/Upload.php',
'app\\index\\controller\\Index' => $baseDir . '/app/index/controller/Index.php', 'app\\index\\controller\\Index' => $baseDir . '/app/index/controller/Index.php',
'app\\wechat\\command\\Fans' => $baseDir . '/app/wechat/command/Fans.php', 'app\\wechat\\command\\Fans' => $baseDir . '/app/wechat/command/Fans.php',
'app\\wechat\\command\\Tests' => $baseDir . '/app/wechat/command/Tests.php',
'app\\wechat\\controller\\Config' => $baseDir . '/app/wechat/controller/Config.php', 'app\\wechat\\controller\\Config' => $baseDir . '/app/wechat/controller/Config.php',
'app\\wechat\\controller\\Fans' => $baseDir . '/app/wechat/controller/Fans.php', 'app\\wechat\\controller\\Fans' => $baseDir . '/app/wechat/controller/Fans.php',
'app\\wechat\\controller\\Keys' => $baseDir . '/app/wechat/controller/Keys.php', 'app\\wechat\\controller\\Keys' => $baseDir . '/app/wechat/controller/Keys.php',

View File

@ -2,7 +2,7 @@
// autoload_real.php @generated by Composer // autoload_real.php @generated by Composer
class ComposerAutoloaderInit9eebf462f38fc6a0db482795c8caf813 class ComposerAutoloaderInitd7300eea771f6ce10908a45ed531efa1
{ {
private static $loader; private static $loader;
@ -19,15 +19,15 @@ class ComposerAutoloaderInit9eebf462f38fc6a0db482795c8caf813
return self::$loader; return self::$loader;
} }
spl_autoload_register(array('ComposerAutoloaderInit9eebf462f38fc6a0db482795c8caf813', 'loadClassLoader'), true, true); spl_autoload_register(array('ComposerAutoloaderInitd7300eea771f6ce10908a45ed531efa1', 'loadClassLoader'), true, true);
self::$loader = $loader = new \Composer\Autoload\ClassLoader(); self::$loader = $loader = new \Composer\Autoload\ClassLoader();
spl_autoload_unregister(array('ComposerAutoloaderInit9eebf462f38fc6a0db482795c8caf813', 'loadClassLoader')); spl_autoload_unregister(array('ComposerAutoloaderInitd7300eea771f6ce10908a45ed531efa1', 'loadClassLoader'));
$useStaticLoader = PHP_VERSION_ID >= 50600 && !defined('HHVM_VERSION') && (!function_exists('zend_loader_file_encoded') || !zend_loader_file_encoded()); $useStaticLoader = PHP_VERSION_ID >= 50600 && !defined('HHVM_VERSION') && (!function_exists('zend_loader_file_encoded') || !zend_loader_file_encoded());
if ($useStaticLoader) { if ($useStaticLoader) {
require_once __DIR__ . '/autoload_static.php'; require_once __DIR__ . '/autoload_static.php';
call_user_func(\Composer\Autoload\ComposerStaticInit9eebf462f38fc6a0db482795c8caf813::getInitializer($loader)); call_user_func(\Composer\Autoload\ComposerStaticInitd7300eea771f6ce10908a45ed531efa1::getInitializer($loader));
} else { } else {
$map = require __DIR__ . '/autoload_namespaces.php'; $map = require __DIR__ . '/autoload_namespaces.php';
foreach ($map as $namespace => $path) { foreach ($map as $namespace => $path) {
@ -48,19 +48,19 @@ class ComposerAutoloaderInit9eebf462f38fc6a0db482795c8caf813
$loader->register(true); $loader->register(true);
if ($useStaticLoader) { if ($useStaticLoader) {
$includeFiles = Composer\Autoload\ComposerStaticInit9eebf462f38fc6a0db482795c8caf813::$files; $includeFiles = Composer\Autoload\ComposerStaticInitd7300eea771f6ce10908a45ed531efa1::$files;
} else { } else {
$includeFiles = require __DIR__ . '/autoload_files.php'; $includeFiles = require __DIR__ . '/autoload_files.php';
} }
foreach ($includeFiles as $fileIdentifier => $file) { foreach ($includeFiles as $fileIdentifier => $file) {
composerRequire9eebf462f38fc6a0db482795c8caf813($fileIdentifier, $file); composerRequired7300eea771f6ce10908a45ed531efa1($fileIdentifier, $file);
} }
return $loader; return $loader;
} }
} }
function composerRequire9eebf462f38fc6a0db482795c8caf813($fileIdentifier, $file) function composerRequired7300eea771f6ce10908a45ed531efa1($fileIdentifier, $file)
{ {
if (empty($GLOBALS['__composer_autoload_files'][$fileIdentifier])) { if (empty($GLOBALS['__composer_autoload_files'][$fileIdentifier])) {
require $file; require $file;

View File

@ -4,7 +4,7 @@
namespace Composer\Autoload; namespace Composer\Autoload;
class ComposerStaticInit9eebf462f38fc6a0db482795c8caf813 class ComposerStaticInitd7300eea771f6ce10908a45ed531efa1
{ {
public static $files = array ( public static $files = array (
'9b552a3cc426e3287cc811caefa3cf53' => __DIR__ . '/..' . '/topthink/think-helper/src/helper.php', '9b552a3cc426e3287cc811caefa3cf53' => __DIR__ . '/..' . '/topthink/think-helper/src/helper.php',
@ -336,7 +336,6 @@ class ComposerStaticInit9eebf462f38fc6a0db482795c8caf813
'app\\admin\\controller\\api\\Upload' => __DIR__ . '/../..' . '/app/admin/controller/api/Upload.php', 'app\\admin\\controller\\api\\Upload' => __DIR__ . '/../..' . '/app/admin/controller/api/Upload.php',
'app\\index\\controller\\Index' => __DIR__ . '/../..' . '/app/index/controller/Index.php', 'app\\index\\controller\\Index' => __DIR__ . '/../..' . '/app/index/controller/Index.php',
'app\\wechat\\command\\Fans' => __DIR__ . '/../..' . '/app/wechat/command/Fans.php', 'app\\wechat\\command\\Fans' => __DIR__ . '/../..' . '/app/wechat/command/Fans.php',
'app\\wechat\\command\\Tests' => __DIR__ . '/../..' . '/app/wechat/command/Tests.php',
'app\\wechat\\controller\\Config' => __DIR__ . '/../..' . '/app/wechat/controller/Config.php', 'app\\wechat\\controller\\Config' => __DIR__ . '/../..' . '/app/wechat/controller/Config.php',
'app\\wechat\\controller\\Fans' => __DIR__ . '/../..' . '/app/wechat/controller/Fans.php', 'app\\wechat\\controller\\Fans' => __DIR__ . '/../..' . '/app/wechat/controller/Fans.php',
'app\\wechat\\controller\\Keys' => __DIR__ . '/../..' . '/app/wechat/controller/Keys.php', 'app\\wechat\\controller\\Keys' => __DIR__ . '/../..' . '/app/wechat/controller/Keys.php',
@ -635,9 +634,9 @@ class ComposerStaticInit9eebf462f38fc6a0db482795c8caf813
public static function getInitializer(ClassLoader $loader) public static function getInitializer(ClassLoader $loader)
{ {
return \Closure::bind(function () use ($loader) { return \Closure::bind(function () use ($loader) {
$loader->prefixLengthsPsr4 = ComposerStaticInit9eebf462f38fc6a0db482795c8caf813::$prefixLengthsPsr4; $loader->prefixLengthsPsr4 = ComposerStaticInitd7300eea771f6ce10908a45ed531efa1::$prefixLengthsPsr4;
$loader->prefixDirsPsr4 = ComposerStaticInit9eebf462f38fc6a0db482795c8caf813::$prefixDirsPsr4; $loader->prefixDirsPsr4 = ComposerStaticInitd7300eea771f6ce10908a45ed531efa1::$prefixDirsPsr4;
$loader->classMap = ComposerStaticInit9eebf462f38fc6a0db482795c8caf813::$classMap; $loader->classMap = ComposerStaticInitd7300eea771f6ce10908a45ed531efa1::$classMap;
}, null, ClassLoader::class); }, null, ClassLoader::class);
} }

View File

@ -935,12 +935,12 @@
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/zoujingli/ThinkLibrary.git", "url": "https://github.com/zoujingli/ThinkLibrary.git",
"reference": "6b6a6ff35710f534980ac8f14ee2bfd7531eaa2f" "reference": "f7d1be29830dc3d8c149e48dda510127d8cbddb5"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/zoujingli/ThinkLibrary/zipball/6b6a6ff35710f534980ac8f14ee2bfd7531eaa2f", "url": "https://api.github.com/repos/zoujingli/ThinkLibrary/zipball/f7d1be29830dc3d8c149e48dda510127d8cbddb5",
"reference": "6b6a6ff35710f534980ac8f14ee2bfd7531eaa2f", "reference": "f7d1be29830dc3d8c149e48dda510127d8cbddb5",
"shasum": "", "shasum": "",
"mirrors": [ "mirrors": [
{ {
@ -956,7 +956,7 @@
"ext-json": "*", "ext-json": "*",
"topthink/framework": "^6.0" "topthink/framework": "^6.0"
}, },
"time": "2020-03-23T03:44:18+00:00", "time": "2020-03-23T06:54:54+00:00",
"type": "library", "type": "library",
"extra": { "extra": {
"think": { "think": {

2
vendor/services.php vendored
View File

@ -1,5 +1,5 @@
<?php <?php
// This file is automatically generated at:2020-03-23 11:55:14 // This file is automatically generated at:2020-03-23 14:57:04
declare (strict_types = 1); declare (strict_types = 1);
return array ( return array (
0 => 'think\\app\\Service', 0 => 'think\\app\\Service',

View File

@ -29,11 +29,13 @@ use think\console\Output;
class Command extends ThinkCommand class Command extends ThinkCommand
{ {
/** /**
* 任务控制服务
* @var QueueService * @var QueueService
*/ */
protected $queue; protected $queue;
/** /**
* 进程控制服务
* @var ProcessService * @var ProcessService
*/ */
protected $process; protected $process;
@ -55,11 +57,18 @@ class Command extends ThinkCommand
* @param null|string $message 进度消息 * @param null|string $message 进度消息
* @param null|integer $progress 进度数值 * @param null|integer $progress 进度数值
* @return Command * @return Command
* @throws Exception
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/ */
protected function queueProgressMessage($status = null, $message = null, $progress = null) protected function setQueueProgress($status = null, $message = null, $progress = null)
{ {
if (defined('WorkQueueCode')) { if (defined('WorkQueueCode')) {
$this->queue->progress(WorkQueueCode, $status, $message, $progress); if ($this->queue->code !== WorkQueueCode) {
$this->queue->initialize(WorkQueueCode);
}
$this->queue->progress($status, $message, $progress);
} elseif (is_string($message)) { } elseif (is_string($message)) {
$this->output->writeln($message); $this->output->writeln($message);
} }
@ -73,10 +82,12 @@ class Command extends ThinkCommand
* @return Command * @return Command
* @throws Exception * @throws Exception
*/ */
protected function queueProgressState($status, $message) protected function setQueueMessage($status, $message)
{ {
if (defined('WorkQueueCode')) { if (defined('WorkQueueCode')) {
throw new Exception($message, $status); throw new Exception($message, $status);
} elseif (is_string($message)) {
$this->output->writeln($message);
} }
return $this; return $this;
} }

View File

@ -22,6 +22,7 @@ use think\admin\helper\QueryHelper;
use think\admin\helper\SaveHelper; use think\admin\helper\SaveHelper;
use think\admin\helper\TokenHelper; use think\admin\helper\TokenHelper;
use think\admin\helper\ValidateHelper; use think\admin\helper\ValidateHelper;
use think\admin\service\QueueService;
use think\App; use think\App;
use think\db\exception\DataNotFoundException; use think\db\exception\DataNotFoundException;
use think\db\exception\DbException; use think\db\exception\DbException;
@ -273,4 +274,32 @@ abstract class Controller extends \stdClass
return TokenHelper::instance()->init($return); return TokenHelper::instance()->init($return);
} }
/**
* 创建异步任务并返回任务编号
* @param string $title 任务名称
* @param string $command 执行内容
* @param integer $later 延时执行时间
* @param array $data 任务附加数据
* @param integer $rscript 任务类型(0单例,1多例)
* @param integer $loops 循环等待时间
*/
protected function _queue($title, $command, $later = 0, $data = [], $rscript = 1, $loops = 0)
{
try {
$queue = QueueService::instance()->register($title, $command, $later, $data, $rscript, $loops);
$this->success('创建任务成功!', $queue->code);
} catch (Exception $exception) {
$code = $exception->getData();
if (is_string($code) && stripos($code, 'Q') === 0) {
$this->success('任务已经存在,无需再次创建!', $code);
} else {
$this->error($exception->getMessage());
}
} catch (HttpResponseException $exception) {
throw $exception;
} catch (\Exception $exception) {
$this->error("创建任务失败,{$exception->getMessage()}");
}
}
} }

View File

@ -16,6 +16,7 @@
namespace think\admin; namespace think\admin;
use think\admin\storage\LocalStorage; use think\admin\storage\LocalStorage;
use think\admin\storage\QiniuStorage;
use think\App; use think\App;
use think\Container; use think\Container;

View File

@ -46,8 +46,8 @@ class CleanQueue extends Queue
* 清理历史任务 * 清理历史任务
* @param Input $input * @param Input $input
* @param Output $output * @param Output $output
* @throws \think\admin\Exception
* @throws \think\db\exception\DbException * @throws \think\db\exception\DbException
* @throws \think\Exception
*/ */
protected function execute(Input $input, Output $output) protected function execute(Input $input, Output $output)
{ {
@ -63,7 +63,7 @@ class CleanQueue extends Queue
$count2 = $this->app->db->name($this->table)->where($map)->update(['status' => '4', 'exec_desc' => '执行等待超过60分钟无响应']); $count2 = $this->app->db->name($this->table)->where($map)->update(['status' => '4', 'exec_desc' => '执行等待超过60分钟无响应']);
$this->output->info("Successfully processed {$count2} unresponsive records waiting for more than 1 hour"); $this->output->info("Successfully processed {$count2} unresponsive records waiting for more than 1 hour");
// 返回消息到任务状态描述 // 返回消息到任务状态描述
if (defined('WorkQueueCall')) throw new \think\Exception("清理 {$count1} 条 + 无响应 {$count2}", 3); if (defined('WorkQueueCall')) throw new \think\admin\Exception("清理 {$count1} 条 + 无响应 {$count2}", 3);
} }
} }
} }

View File

@ -41,6 +41,12 @@ class WorkQueue extends Queue
*/ */
protected $queue; protected $queue;
/**
* 当前任务服务
* @var QueueService
*/
protected $qService;
/** /**
* 配置指定信息 * 配置指定信息
*/ */
@ -65,6 +71,7 @@ class WorkQueue extends Queue
$this->output->error('Task number needs to be specified for task execution'); $this->output->error('Task number needs to be specified for task execution');
} else try { } else try {
$this->queue = $this->app->db->name($this->table)->where(['code' => $this->code, 'status' => '1'])->find(); $this->queue = $this->app->db->name($this->table)->where(['code' => $this->code, 'status' => '1'])->find();
$this->qService = QueueService::instance()->initialize($this->code);
if (empty($this->queue)) { if (empty($this->queue)) {
// 这里不做任何处理(该任务可能在其它地方已经在执行) // 这里不做任何处理(该任务可能在其它地方已经在执行)
$this->output->warning($message = "The or status of task {$this->code} is abnormal"); $this->output->warning($message = "The or status of task {$this->code} is abnormal");
@ -74,7 +81,7 @@ class WorkQueue extends Queue
'enter_time' => microtime(true), 'attempts' => $this->app->db->raw('attempts+1'), 'enter_time' => microtime(true), 'attempts' => $this->app->db->raw('attempts+1'),
'outer_time' => '0', 'exec_pid' => getmypid(), 'exec_desc' => '', 'status' => '2', 'outer_time' => '0', 'exec_pid' => getmypid(), 'exec_desc' => '', 'status' => '2',
]); ]);
QueueService::instance()->progress($this->code, 2, '>>> 任务处理开始 <<<', 0); $this->qService->progress(2, '>>> 任务处理开始 <<<', 0);
// 设置进程标题 // 设置进程标题
if ($this->process->iswin()) { if ($this->process->iswin()) {
$this->setProcessTitle("ThinkAdmin {$this->process->version()} Queue - {$this->queue['title']}"); $this->setProcessTitle("ThinkAdmin {$this->process->version()} Queue - {$this->queue['title']}");
@ -84,9 +91,8 @@ class WorkQueue extends Queue
defined('WorkQueueCode') or define('WorkQueueCode', $this->code); defined('WorkQueueCode') or define('WorkQueueCode', $this->code);
if (class_exists($command = $this->queue['command'])) { if (class_exists($command = $this->queue['command'])) {
// 自定义服务,支持返回消息(支持异常结束,异常码可选择 3|4 设置任务状态) // 自定义服务,支持返回消息(支持异常结束,异常码可选择 3|4 设置任务状态)
if ($command instanceof QueueService) { if (method_exists($command, 'instance') && ($class = $command::instance()) instanceof QueueService) {
$data = json_decode($this->queue['data'], true) ?: []; $this->update('3', $class->initialize($this->code)->execute(json_decode($this->queue['exec_data'], true) ?: []));
$this->update('3', $command::instance()->initialize($this->code)->execute($data));
} else { } else {
throw new \think\Exception("自定义 {$command} 未继承 QueueService"); throw new \think\Exception("自定义 {$command} 未继承 QueueService");
} }
@ -121,17 +127,17 @@ class WorkQueue extends Queue
$this->output->writeln(is_string($message) ? $message : ''); $this->output->writeln(is_string($message) ? $message : '');
// 任务进度标记 // 任务进度标记
if (!empty($desc[0])) { if (!empty($desc[0])) {
QueueService::instance()->progress($this->code, $status, ">>> {$desc[0]} <<<"); $this->qService->progress($status, ">>> {$desc[0]} <<<");
} }
if ($status == 3) { if ($status == 3) {
QueueService::instance()->progress($this->code, $status, '>>> 任务处理完成 <<<', 100); $this->qService->progress($status, '>>> 任务处理完成 <<<', 100);
} elseif ($status == 4) { } elseif ($status == 4) {
QueueService::instance()->progress($this->code, $status, '>>> 任务处理失败 <<<'); $this->qService->progress($status, '>>> 任务处理失败 <<<');
} }
// 注册循环任务 // 注册循环任务
if (isset($this->queue['loops_time']) && $this->queue['loops_time'] > 0) { if (isset($this->queue['loops_time']) && $this->queue['loops_time'] > 0) {
try { try {
QueueService::instance()->initialize($this->code)->reset($this->queue['loops_time']); $this->qService->initialize($this->code)->reset($this->queue['loops_time']);
} catch (\Exception $exception) { } catch (\Exception $exception) {
$this->app->log->error("Queue {$this->queue['code']} Loops Failed. {$exception->getMessage()}"); $this->app->log->error("Queue {$this->queue['code']} Loops Failed. {$exception->getMessage()}");
} }

View File

@ -93,7 +93,6 @@ if (!function_exists('sysqueue')) {
* @param integer $rscript 任务类型(0单例,1多例) * @param integer $rscript 任务类型(0单例,1多例)
* @param integer $loops 循环等待时间 * @param integer $loops 循环等待时间
* @return string * @return string
* @throws \think\Exception
* @throws \think\admin\Exception * @throws \think\admin\Exception
* @throws \think\db\exception\DataNotFoundException * @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException * @throws \think\db\exception\DbException

View File

@ -49,7 +49,7 @@ class JsonRpcClient
* @param string $method * @param string $method
* @param array $params * @param array $params
* @return mixed * @return mixed
* @throws \think\Exception * @throws \think\admin\Exception
*/ */
public function __call($method, $params) public function __call($method, $params)
{ {
@ -69,16 +69,16 @@ class JsonRpcClient
fclose($fp); fclose($fp);
$response = json_decode($response, true); $response = json_decode($response, true);
} else { } else {
throw new \think\Exception("无法连接到 {$this->proxy}"); throw new \think\admin\Exception("无法连接到 {$this->proxy}");
} }
// Final checks and return // Final checks and return
if ($response['id'] != $this->id) { if ($response['id'] != $this->id) {
throw new \think\Exception("错误的响应标记 (请求标记: {$this->id}, 响应标记: {$response['id']}"); throw new \think\admin\Exception("错误的响应标记 (请求标记: {$this->id}, 响应标记: {$response['id']}");
} }
if (is_null($response['error'])) { if (is_null($response['error'])) {
return $response['result']; return $response['result'];
} else { } else {
throw new \think\Exception("请求错误:{$response['error']['message']}", $response['error']['code']); throw new \think\admin\Exception("请求错误:{$response['error']['message']}", $response['error']['code']);
} }
} }
} }

View File

@ -30,25 +30,25 @@ class QueueService extends Service
* 当前任务编号 * 当前任务编号
* @var string * @var string
*/ */
protected $code = 0; public $code = '';
/** /**
* 当前任务标题 * 当前任务标题
* @var string * @var string
*/ */
protected $title = ''; public $title = '';
/** /**
* 当前任务参数 * 当前任务参数
* @var array * @var array
*/ */
protected $data = []; public $data = [];
/** /**
* 当前任务数据 * 当前任务数据
* @var array * @var array
*/ */
protected $queue = []; public $queue = [];
/** /**
* 数据初始化 * 数据初始化
@ -75,18 +75,6 @@ class QueueService extends Service
return $this; return $this;
} }
/**
* 获取当前对象值
* @param string $name
* @return mixed
*/
public function __get($name)
{
if (isset($this->$name)) {
return $this->$name;
}
}
/** /**
* 判断是否WIN环境 * 判断是否WIN环境
* @return boolean * @return boolean
@ -163,19 +151,18 @@ class QueueService extends Service
'outer_time' => '0', 'outer_time' => '0',
'loops_time' => $loops, 'loops_time' => $loops,
]); ]);
$this->progress($this->code, 1, '>>> 任务创建成功 <<<', 0.00); $this->progress(1, '>>> 任务创建成功 <<<', 0.00);
return $this->initialize($this->code); return $this->initialize($this->code);
} }
/** /**
* 设置任务进度信息 * 设置任务进度信息
* @param string $code 任务编号
* @param null|integer $status 任务状态 * @param null|integer $status 任务状态
* @param null|string $message 进度消息 * @param null|string $message 进度消息
* @param null|integer $progress 进度数值 * @param null|integer $progress 进度数值
* @return array * @return array
*/ */
public function progress($code, $status = null, $message = null, $progress = null) public function progress($status = null, $message = null, $progress = null)
{ {
if (is_numeric($status) && intval($status) === 3) { if (is_numeric($status) && intval($status) === 3) {
if (!is_numeric($progress)) $progress = '100.00'; if (!is_numeric($progress)) $progress = '100.00';
@ -185,13 +172,13 @@ class QueueService extends Service
if (!is_numeric($progress)) $progress = '0.00'; if (!is_numeric($progress)) $progress = '0.00';
if (is_null($message)) $message = '>>> 任务执行失败 <<<'; if (is_null($message)) $message = '>>> 任务执行失败 <<<';
} }
$ckey = "queue_{$code}_progress"; $ckey = "queue_{$this->code}_progress";
try { try {
$data = $this->app->cache->get($ckey, [ $data = $this->app->cache->get($ckey, [
'code' => $code, 'status' => $status, 'message' => $message, 'progress' => $progress, 'history' => [], 'code' => $this->code, 'status' => $status, 'message' => $message, 'progress' => $progress, 'history' => [],
]); ]);
} catch (\Exception|\TypeError $exception) { } catch (\Exception|\TypeError $exception) {
return $this->progress($code, $status, $message, $progress); return $this->progress($status, $message, $progress);
} }
if (is_numeric($status)) $data['status'] = intval($status); if (is_numeric($status)) $data['status'] = intval($status);
if (is_numeric($progress)) $progress = sprintf("%.2f", $progress); if (is_numeric($progress)) $progress = sprintf("%.2f", $progress);

View File

@ -57,8 +57,8 @@ class AliossStorage extends Storage
/** /**
* 初始化入口 * 初始化入口
* @return $this * @return Storage
* @throws \think\Exception * @throws \think\admin\Exception
* @throws \think\db\exception\DataNotFoundException * @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException * @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException * @throws \think\db\exception\ModelNotFoundException
@ -76,7 +76,7 @@ class AliossStorage extends Storage
if ($type === 'auto') $this->prefix = "//{$this->domain}"; if ($type === 'auto') $this->prefix = "//{$this->domain}";
elseif ($type === 'http') $this->prefix = "http://{$this->domain}"; elseif ($type === 'http') $this->prefix = "http://{$this->domain}";
elseif ($type === 'https') $this->prefix = "https://{$this->domain}"; elseif ($type === 'https') $this->prefix = "https://{$this->domain}";
else throw new \think\Exception('未配置阿里云URL域名哦'); else throw new \think\admin\Exception('未配置阿里云URL域名哦');
// 初始化配置并返回当前实例 // 初始化配置并返回当前实例
return parent::initialize(); return parent::initialize();
} }
@ -84,7 +84,7 @@ class AliossStorage extends Storage
/** /**
* 获取当前实例对象 * 获取当前实例对象
* @param null $name * @param null $name
* @return static * @return Storage
* @throws \think\Exception * @throws \think\Exception
* @throws \think\db\exception\DataNotFoundException * @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException * @throws \think\db\exception\DbException

View File

@ -27,7 +27,7 @@ class LocalStorage extends Storage
/** /**
* 初始化入口 * 初始化入口
* @return LocalStorage * @return Storage
* @throws \think\db\exception\DataNotFoundException * @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException * @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException * @throws \think\db\exception\ModelNotFoundException
@ -53,7 +53,7 @@ class LocalStorage extends Storage
/** /**
* 获取当前实例对象 * 获取当前实例对象
* @param null $name * @param null $name
* @return LocalStorage * @return Storage
* @throws \think\Exception * @throws \think\Exception
* @throws \think\db\exception\DataNotFoundException * @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException * @throws \think\db\exception\DbException

View File

@ -32,8 +32,8 @@ class QiniuStorage extends Storage
/** /**
* 初始化入口 * 初始化入口
* @return $this * @return Storage
* @throws \think\Exception * @throws \think\admin\Exception
* @throws \think\db\exception\DataNotFoundException * @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException * @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException * @throws \think\db\exception\ModelNotFoundException
@ -50,7 +50,7 @@ class QiniuStorage extends Storage
if ($type === 'auto') $this->prefix = "//{$this->domain}"; if ($type === 'auto') $this->prefix = "//{$this->domain}";
elseif ($type === 'http') $this->prefix = "http://{$this->domain}"; elseif ($type === 'http') $this->prefix = "http://{$this->domain}";
elseif ($type === 'https') $this->prefix = "https://{$this->domain}"; elseif ($type === 'https') $this->prefix = "https://{$this->domain}";
else throw new \think\Exception('未配置七牛云URL域名哦'); else throw new \think\admin\Exception('未配置七牛云URL域名哦');
// 初始化配置并返回当前实例 // 初始化配置并返回当前实例
return parent::initialize(); return parent::initialize();
} }
@ -58,7 +58,7 @@ class QiniuStorage extends Storage
/** /**
* 获取当前实例对象 * 获取当前实例对象
* @param null $name * @param null $name
* @return static * @return Storage
* @throws \think\Exception * @throws \think\Exception
* @throws \think\db\exception\DataNotFoundException * @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException * @throws \think\db\exception\DbException