queueworker: cleanup and implement auto sleep interval (hidden config for now)

This commit is contained in:
Mario
2022-12-18 10:19:06 +00:00
parent 4d54755057
commit e36677b757
2 changed files with 57 additions and 75 deletions

View File

@@ -2,11 +2,9 @@
namespace Zotlabs\Lib;
use Ramsey\Uuid\Uuid;
use Ramsey\Uuid\Exception\UnableToBuildUuidException;
class QueueWorker {
public static $queueworker = null;
@@ -14,54 +12,25 @@ class QueueWorker {
public static $workermaxage = 0;
public static $workersleep = 100;
public static $default_priorities = [
'Notifier' => 10,
'Deliver' => 10,
'Cache_query' => 10,
'Content_importer' => 1,
'File_importer' => 1,
'Channel_purge' => 1,
'Directory' => 1
'Notifier' => 10,
'Deliver' => 10,
'Cache_query' => 10,
'Content_importer' => 1,
'File_importer' => 1,
'Channel_purge' => 1,
'Directory' => 1
];
private static function qbegin() {
switch (ACTIVE_DBTYPE) {
case DBTYPE_MYSQL:
q('BEGIN');
break;
case DBTYPE_POSTGRES:
q('BEGIN');
break;
}
return;
q('BEGIN');
}
private static function qcommit() {
switch (ACTIVE_DBTYPE) {
case DBTYPE_MYSQL:
//q("UNLOCK TABLES");
q("COMMIT");
break;
case DBTYPE_POSTGRES:
q("COMMIT");
break;
}
return;
q("COMMIT");
}
private static function qrollback() {
switch (ACTIVE_DBTYPE) {
case DBTYPE_MYSQL:
q("ROLLBACK");
//q("UNLOCK TABLES");
break;
case DBTYPE_POSTGRES:
q("ROLLBACK");
break;
}
return;
q("ROLLBACK");
}
public static function Summon($argv) {
@@ -70,7 +39,7 @@ class QueueWorker {
$priority = 0; // @TODO allow reprioritization
if(isset(self::$default_priorities[$argv[0]])) {
if (isset(self::$default_priorities[$argv[0]])) {
$priority = self::$default_priorities[$argv[0]];
}
@@ -83,8 +52,7 @@ class QueueWorker {
);
if ($r) {
logger("Summon: Ignoring duplicate workerq task", LOGGER_DEBUG);
logger(print_r($workinfo,true));
$argv = [];
logger(print_r($workinfo, true));
return;
}
@@ -100,12 +68,12 @@ class QueueWorker {
return;
}
self::qcommit();
hz_syslog('INSERTED: ' . $workinfo_json, LOGGER_DEBUG);
logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG);
}
$workers = self::GetWorkerCount();
if ($workers <= self::$maxworkers) {
hz_syslog("Less <= max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG);
if ($workers < self::$maxworkers) {
logger($workers . '/' . self::$maxworkers . ' workers active', LOGGER_DEBUG);
$phpbin = get_config('system', 'phpbin', 'php');
proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']);
}
@@ -116,7 +84,7 @@ class QueueWorker {
if ($argv[0] !== 'Queueworker') {
$priority = 0; // @TODO allow reprioritization
if(isset(self::$default_priorities[$argv[0]])) {
if (isset(self::$default_priorities[$argv[0]])) {
$priority = self::$default_priorities[$argv[0]];
}
@@ -129,9 +97,7 @@ class QueueWorker {
);
if ($r) {
logger("Release: Duplicate task - do not insert.", LOGGER_DEBUG);
logger(print_r($workinfo,true));
$argv = [];
logger(print_r($workinfo, true));
return;
}
@@ -177,14 +143,18 @@ class QueueWorker {
if (self::$queueworker) {
return self::$queueworker;
}
$wid = uniqid('', true);
usleep(mt_rand(500000, 3000000)); //Sleep .5 - 3 seconds before creating a new worker.
$workers = self::GetWorkerCount();
if ($workers >= self::$maxworkers) {
usleep(mt_rand(300000, 1000000)); //Sleep .3 - 1 seconds before creating a new worker.
if (self::GetWorkerCount() >= self::$maxworkers) {
logger("Too many active workers ($workers) max = " . self::$maxworkers, LOGGER_DEBUG);
return false;
}
self::$queueworker = $wid;
return $wid;
}
@@ -193,17 +163,13 @@ class QueueWorker {
self::qbegin();
if (ACTIVE_DBTYPE == DBTYPE_POSTGRES) {
$work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;");
}
else {
$work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;");
}
$work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;");
if (!$work) {
self::qrollback();
return false;
}
$id = $work[0]['workerq_id'];
$work = q("UPDATE workerq SET workerq_reservationid = '%s', workerq_processtimeout = %s + INTERVAL %s WHERE workerq_id = %d",
@@ -224,25 +190,38 @@ class QueueWorker {
}
public static function Process() {
$sleep = intval(get_config('queueworker', 'queue_worker_sleep', 100));
$auto_queue_worker_sleep = get_config('queueworker', 'auto_queue_worker_sleep', 0);
if (!self::GetWorkerID()) {
hz_syslog('Unable to get worker ID. Exiting.', LOGGER_DEBUG);
if ($auto_queue_worker_sleep) {
set_config('queueworker', 'queue_worker_sleep', $sleep + 100);
}
logger('Unable to get worker ID. Exiting.', LOGGER_DEBUG);
killme();
}
$jobs = 0;
$workid = self::getWorkId();
if ($auto_queue_worker_sleep && $sleep > 100) {
$next_sleep = $sleep - 100;
set_config('queueworker', 'queue_worker_sleep', (($next_sleep < 100) ? 100 : $next_sleep));
}
$jobs = 0;
$workid = self::getWorkId();
$load_average_sleep = false;
self::$workersleep = get_config('queueworker', 'queue_worker_sleep');
self::$workersleep = ((intval(self::$workersleep) > 100) ? intval(self::$workersleep) : 100);
self::$workersleep = $sleep;
self::$workersleep = ((intval(self::$workersleep) > 100) ? intval(self::$workersleep) : 100);
if (function_exists('sys_getloadavg') && get_config('queueworker', 'load_average_sleep')) {
// experimental!
$load_average_sleep = true;
}
while ($workid) {
if ($load_average_sleep) {
$load_average = sys_getloadavg();
$load_average = sys_getloadavg();
self::$workersleep = intval($load_average[0]) * 10000;
if (!self::$workersleep) {
@@ -250,6 +229,8 @@ class QueueWorker {
}
}
logger('queue_worker_sleep: ' . self::$workersleep, LOGGER_DEBUG);
usleep(self::$workersleep);
self::qbegin();
@@ -259,8 +240,8 @@ class QueueWorker {
if (isset($workitem[0])) {
// At least SOME work to do.... in case there's more, let's ramp up workers.
$workers = self::GetWorkerCount();
if ($workers <= self::$maxworkers) {
logger("Less <= max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG);
if ($workers < self::$maxworkers) {
logger($workers . '/' . self::$maxworkers . ' workers active', LOGGER_DEBUG);
$phpbin = get_config('system', 'phpbin', 'php');
proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']);
}
@@ -277,9 +258,11 @@ class QueueWorker {
$rnd = random_string();
hz_syslog('PROCESSING: ' . $rnd . ' ' . print_r($argv,true));
logger('PROCESSING: ' . $rnd . ' ' . print_r($argv, true));
$cls::run($argc, $argv);
hz_syslog('COMPLETED: ' . $rnd);
logger('COMPLETED: ' . $rnd);
// @FIXME: Right now we assume that if we get a return, everything is OK.
// At some point we may want to test whether the run returns true/false
@@ -326,7 +309,6 @@ class QueueWorker {
//Make sure nothing new came in
$work = q("select * from workerq");
}
return;
}
/**
@@ -335,7 +317,7 @@ class QueueWorker {
* @param string $data
* @return string $uuid
*/
private static function getUuid($data) {
private static function getUuid(string $data) {
$namespace = '3a112e42-f147-4ccf-a78b-f6841339ea2a';
try {
$uuid = Uuid::uuid5($namespace, $data)->toString();

View File

@@ -66,7 +66,7 @@ class Queueworker extends Controller {
$maxqueueworkers = get_config('queueworker', 'max_queueworkers', 4);
$maxqueueworkers = ($maxqueueworkers > 3) ? $maxqueueworkers : 4;
set_config('queueworker', 'max_queueworkers', $maxqueueworkers);
//set_config('queueworker', 'max_queueworkers', $maxqueueworkers);
$sc = '';
@@ -81,7 +81,7 @@ class Queueworker extends Controller {
$workermaxage = get_config('queueworker', 'queueworker_max_age');
$workermaxage = ($workermaxage >= 120) ? $workermaxage : 300;
set_config('queueworker', 'max_queueworker_age', $workermaxage);
//set_config('queueworker', 'max_queueworker_age', $workermaxage);
$sc .= replace_macros(get_markup_template('field_input.tpl'), [
'$field' => [
@@ -94,7 +94,7 @@ class Queueworker extends Controller {
$queueworkersleep = get_config('queueworker', 'queue_worker_sleep');
$queueworkersleep = ($queueworkersleep > 100) ? $queueworkersleep : 100;
set_config('queueworker', 'queue_worker_sleep', $queueworkersleep);
//set_config('queueworker', 'queue_worker_sleep', $queueworkersleep);
$sc .= replace_macros(get_markup_template('field_input.tpl'), [
'$field' => [