port queue improvements from streams

This commit is contained in:
Mario
2023-02-12 10:41:23 +00:00
parent 2bfd18e6cd
commit 724b8cc6a5
2 changed files with 48 additions and 57 deletions

View File

@@ -7,79 +7,56 @@ use Zotlabs\Lib\Queue as LibQueue;
class Queue {
static public function run($argc, $argv) {
require_once('include/items.php');
require_once('include/bbcode.php');
if ($argc > 1)
$queue_id = $argv[1];
else
$queue_id = EMPTY_STR;
$queue_id = ($argc > 1) ? $argv[1] : '';
logger('queue: start');
// delete all queue items more than 3 days old
// but first mark these sites dead if we haven't heard from them in a month
$r = q("select outq_posturl from outq where outq_created < %s - INTERVAL %s",
db_utcnow(), db_quoteinterval('3 DAY')
$oldqItems = q("select outq_posturl from outq where outq_created < %s - INTERVAL %s",
db_utcnow(),
db_quoteinterval('3 DAY')
);
if ($r) {
foreach ($r as $rr) {
$h = parse_url($rr['outq_posturl']);
$desturl = $h['scheme'] . '://' . $h['host'] . (isset($h['port']) ? ':' . $h['port'] : '');
if ($oldqItems) {
foreach ($oldqItems as $qItem) {
$h = parse_url($qItem['outq_posturl']);
$site_url = $h['scheme'] . '://' . $h['host'] . (($h['port']) ? ':' . $h['port'] : '');
q("update site set site_dead = 1 where site_dead = 0 and site_url = '%s' and site_update < %s - INTERVAL %s",
dbesc($desturl),
db_utcnow(), db_quoteinterval('1 MONTH')
dbesc($site_url),
db_utcnow(),
db_quoteinterval('1 MONTH')
);
}
}
logger('Removing ' . count($oldqItems) . ' old queue entries');
q("DELETE FROM outq WHERE outq_created < %s - INTERVAL %s",
db_utcnow(), db_quoteinterval('3 DAY')
db_utcnow(),
db_quoteinterval('3 DAY')
);
$deliveries = [];
if ($queue_id) {
$r = q("SELECT * FROM outq WHERE outq_hash = '%s' LIMIT 1",
$qItems = q("SELECT * FROM outq WHERE outq_hash = '%s' LIMIT 1",
dbesc($queue_id)
);
logger('queue deliver: ' . $qItems[0]['outq_hash'] . ' to ' . $qItems[0]['outq_posturl'], LOGGER_DEBUG);
LibQueue\Queue::deliver($qItems[0]);
}
else {
// For the first 12 hours we'll try to deliver every 15 minutes
// After that, we'll only attempt delivery once per hour.
// This currently only handles the default queue drivers ('zot' or '') which we will group by posturl
// so that we don't start off a thousand deliveries for a couple of dead hubs.
// The zot driver will deliver everything destined for a single hub once contact is made (*if* contact is made).
// Other drivers will have to do something different here and may need their own query.
// Note: this requires some tweaking as new posts to long dead hubs once a day will keep them in the
// "every 15 minutes" category. We probably need to prioritise them when inserted into the queue
// or just prior to this query based on recent and long-term delivery history. If we have good reason to believe
// the site is permanently down, there's no reason to attempt delivery at all, or at most not more than once
// or twice a day.
$sqlrandfunc = db_getfunc('rand');
$r = q("SELECT *,$sqlrandfunc as rn FROM outq WHERE outq_delivered = 0 and outq_scheduled < %s order by rn limit 1",
$qItems = q("SELECT * FROM outq WHERE outq_delivered = 0 and outq_scheduled < %s ",
db_utcnow()
);
while ($r) {
foreach ($r as $rv) {
LibQueue::deliver($rv);
if ($qItems) {
foreach ($qItems as $qItem) {
$deliveries[] = $qItem['outq_hash'];
}
$r = q("SELECT *,$sqlrandfunc as rn FROM outq WHERE outq_delivered = 0 and outq_scheduled < %s order by rn limit 1",
db_utcnow()
);
do_delivery($deliveries);
}
}
if (!$r)
return;
foreach ($r as $rv) {
LibQueue::deliver($rv);
}
return;
}
}
}

View File

@@ -65,16 +65,32 @@ class Queue {
);
}
static function remove($id,$channel_id = 0) {
logger('queue: remove queue item ' . $id,LOGGER_DEBUG);
public static function remove($id, $channel_id = 0) {
logger('queue: remove queue item ' . $id, LOGGER_DEBUG);
$sql_extra = (($channel_id) ? " and outq_channel = " . intval($channel_id) . " " : '');
q("DELETE FROM outq WHERE outq_hash = '%s' $sql_extra",
// figure out what endpoint it is going to.
$record = q("select outq_posturl from outq where outq_hash = '%s' $sql_extra",
dbesc($id)
);
}
if ($record) {
q("DELETE FROM outq WHERE outq_hash = '%s' $sql_extra",
dbesc($id)
);
// If there's anything remaining in the queue for this site, move one of them to the next active
// queue run by setting outq_scheduled back to the present. We may be attempting to deliver it
// as a 'piled_up' delivery, but this ensures the site has an active queue entry as long as queued
// entries still exist for it. This fixes an issue where one immediate delivery left everything
// else for that site undeliverable since all the other entries had been pushed far into the future.
q("update outq set outq_scheduled = '%s' where outq_posturl = '%s' limit 1",
dbesc(datetime_convert()),
dbesc($record[0]['outq_posturl'])
);
}
}
static function remove_by_posturl($posturl) {
logger('queue: remove queue posturl ' . $posturl,LOGGER_DEBUG);
@@ -84,8 +100,6 @@ class Queue {
);
}
static function set_delivered($id,$channel = 0) {
logger('queue: set delivered ' . $id,LOGGER_DEBUG);
$sql_extra = (($channel['channel_id']) ? " and outq_channel = " . intval($channel['channel_id']) . " " : '');