refactor Lib/ASCache so that we can easily determine cacheable activities (we will not cache limited activities anymore), refactor the Daemon/Convo so that we will not waste a process for each channel (similar to Daemon/Fetchparents)

This commit is contained in:
Mario
2025-10-26 17:43:10 +00:00
parent 03af9d415f
commit 7cf7aa397e
5 changed files with 111 additions and 65 deletions

View File

@@ -12,52 +12,53 @@ class Convo {
logger('convo invoked: ' . print_r($argv, true));
if ($argc != 4) {
if ($argc < 4) {
return;
}
$id = $argv[1];
$channel_id = intval($argv[2]);
$contact_hash = $argv[3];
$channel = channelx_by_n($channel_id);
if (!$channel) {
$channels = explode(',', $argv[1]);
if (!$channels) {
return;
}
$r = q("SELECT abook.*, xchan.* FROM abook left join xchan on abook_xchan = xchan_hash
WHERE abook_channel = %d and abook_xchan = '%s' LIMIT 1",
intval($channel_id),
dbesc($contact_hash)
);
if (!$r) {
$observer_hash = $argv[2];
if (!$observer_hash) {
return;
}
$contact = array_shift($r);
$obj = new ASCollection($id, $channel);
$messages = $obj->get();
if (!$messages) {
$mid = $argv[3];
if (!$mid) {
return;
}
foreach ($messages as $message) {
if (is_string($message)) {
$message = Activity::fetch($message, $channel);
$force = $argv[4] ?? false;
foreach ($channels as $channel_id) {
$channel = channelx_by_n($channel_id);
$obj = new ASCollection($mid, $channel);
$messages = $obj->get();
if (!$messages) {
continue;
}
// set client flag because comments will probably just be objects and not full blown activities
// and that lets us use implied_create
$AS = new ActivityStreams($message);
if ($AS->is_valid() && is_array($AS->obj)) {
$item = Activity::decode_note($AS);
$item['item_fetched'] = true;
Activity::store($channel, $contact['abook_xchan'], $AS, $item);
foreach ($messages as $message) {
if (is_string($message)) {
$message = Activity::fetch($message, $channel);
}
// set client flag because comments will probably just be objects and not full blown activities
// and that lets us use implied_create
$AS = new ActivityStreams($message);
if ($AS->is_valid() && is_array($AS->obj)) {
$item = Activity::decode_note($AS);
$item['item_fetched'] = true;
Activity::store($channel, $observer_hash, $AS, $item, false, $force);
}
}
}
return;

View File

@@ -7,27 +7,63 @@ namespace Zotlabs\Lib;
*/
class ASCache {
public static function isEnabled() {
public static function isEnabled()
{
return Config::Get('system', 'as_object_cache_enabled', true);
}
public static function getAge() {
public static function getAge(): string
{
return Config::Get('system', 'as_object_cache_time', '10 MINUTE');
}
public static function Get($key) {
public static function Get(string $key): array
{
if (!self::isEnabled()) {
return;
return [];
}
return Cache::get($key, self::getAge());
$ret = Cache::get($key, self::getAge());
if ($ret) {
return unserialise($ret);
}
return [];
}
public static function Set($key, $value) {
public static function Set(string $key, array $obj): void
{
if (!self::isEnabled()) {
return;
}
Cache::set($key, $value);
if (!self::isCacheable($obj)) {
return;
}
Cache::set($key, serialise($obj));
}
public static function isCacheable(array $obj): bool
{
$to = [];
$cc = [];
if (isset($obj['to'])) {
$to = is_array($obj['to']) ? $obj['to'] : [$obj['to']];
}
if (isset($obj['cc'])) {
$cc = is_array($obj['cc']) ? $obj['cc'] : [$obj['cc']];
}
$receivers = array_merge($to, $cc);
if ($receivers && !in_array(ACTIVITY_PUBLIC_INBOX, $receivers)) {
return false;
}
return true;
}
}

View File

@@ -40,7 +40,7 @@ class ASCollection {
// logger('fetching: ' . $obj);
$data = Activity::fetch($obj, $channel);
if ($data) {
ASCache::Set($obj, serialise($data));
ASCache::Set($obj, $data);
}
}
@@ -108,13 +108,13 @@ class ASCollection {
$cached = ASCache::Get($this->nextpage);
if ($cached) {
// logger('cached: ' . $this->nextpage);
$data = unserialise($cached);
$data = $cached;
}
else {
$data = Activity::fetch($this->nextpage, $this->channel);
if ($data) {
// logger('fetching: ' . $this->nextpage);
ASCache::Set($this->nextpage, serialise($data));
ASCache::Set($this->nextpage, $data);
}
}

View File

@@ -3103,29 +3103,19 @@ class Activity {
send_status_notifications($x['item_id'], $x['item']);
sync_an_item($channel['channel_id'], $x['item_id']);
}
if ($fetch_parents && $parent && !intval($parent[0]['item_private'])) {
logger('topfetch', LOGGER_DEBUG);
// if the thread owner is a connnection, we will already receive any additional comments to their posts
// but if they are not we can try to fetch others in the background
$connected = q("SELECT abook.*, xchan.* FROM abook left join xchan on abook_xchan = xchan_hash
WHERE abook_channel = %d and abook_xchan = '%s' LIMIT 1",
intval($channel['channel_id']),
dbesc($parent[0]['owner_xchan'])
dbesc($x['item']['owner_xchan'])
);
if (!$connected) {
// determine if the top-level post provides a replies collection
if ($parent[0]['obj']) {
$parent[0]['obj'] = json_decode($parent[0]['obj'], true);
}
logger('topfetch: ' . print_r($parent[0], true), LOGGER_ALL);
$id = ((array_path_exists('obj/replies/id', $parent[0])) ? $parent[0]['obj']['replies']['id'] : false);
if (!$id) {
$id = ((array_path_exists('obj/replies', $parent[0]) && is_string($parent[0]['obj']['replies'])) ? $parent[0]['obj']['replies'] : false);
}
if ($id) {
Master::Summon(['Convo', $id, $channel['channel_id'], $observer_hash]);
if (isset($act->obj['replies']['id'])) {
App::$cache['as_fetch_collection'][$act->obj['replies']['id']]['channels'][] = $channel['channel_id'];
App::$cache['as_fetch_collection'][$act->obj['replies']['id']]['force'] = intval($force);
}
}
}
@@ -3175,7 +3165,7 @@ class Activity {
$cached = ASCache::Get($current_item['parent_mid']);
if ($cached) {
// logger('cached: ' . $current_item['parent_mid']);
$n = unserialise($cached);
$n = $cached;
}
else {
// logger('fetching: ' . $current_item['parent_mid']);
@@ -3183,7 +3173,7 @@ class Activity {
if (!$n) {
break;
}
ASCache::Set($current_item['parent_mid'], serialise($n));
ASCache::Set($current_item['parent_mid'], $n);
}
$a = new ActivityStreams($n);
@@ -3654,7 +3644,7 @@ class Activity {
// logger('fetching: ' . $url);
$a = self::fetch($url);
if ($a) {
ASCache::Set($url, serialise($a));
ASCache::Set($url, $a);
}
}
@@ -3803,10 +3793,9 @@ class Activity {
public static function init_background_fetch(string $observer_hash = '') {
if (isset(App::$cache['zot_fetch_objects'])) {
$channels_str = '';
foreach (App::$cache['zot_fetch_objects'] as $mid => $info) {
$force = $info['force'];
$channels_str = '';
foreach ($info['channels'] as $c) {
if ($channels_str) {
@@ -3825,10 +3814,9 @@ class Activity {
return;
}
$channels_str = '';
foreach (App::$cache['as_fetch_objects'] as $mid => $info) {
$force = $info['force'];
$channels_str = '';
foreach ($info['channels'] as $c) {
if ($channels_str) {
@@ -3840,6 +3828,27 @@ class Activity {
Master::Summon(['Fetchparents', $channels_str, $observer_hash, $mid, $force]);
}
}
if (isset(App::$cache['as_fetch_collection'])) {
if (!$observer_hash) {
logger('Attempt to initiate Convo daemon without observer');
return;
}
foreach (App::$cache['as_fetch_collection'] as $mid => $info) {
$force = $info['force'];
$channels_str = '';
foreach ($info['channels'] as $c) {
if ($channels_str) {
$channels_str .= ',';
}
$channels_str .= $c;
}
Master::Summon(['Convo', $channels_str, $observer_hash, $mid, $force]);
}
}
}
public static function addToCollection($channel, $object, $target, $sourceItem = null, $deliver = true) {

View File

@@ -97,7 +97,7 @@ class ActivityStreams {
}
// cache for future use
ASCache::Set($this->id, 'json:' . $this->raw);
ASCache::Set($this->id, $this->data);
$this->type = $this->get_primary_type();
$this->actor = $this->get_actor('actor', '', '');
@@ -419,7 +419,7 @@ class ActivityStreams {
// logger('AS fetching: ' . $x);
$y = $this->fetch_property($x);
if ($y) {
ASCache::Set($x, serialise($y));
ASCache::Set($x, $y);
}
}
if (is_array($y)) {