Skip to content
Snippets Groups Projects
Commit cf9e0724 authored by Yassine Doghri's avatar Yassine Doghri
Browse files

fix(fediverse): add "processing" and "failed" statuses to better manage broadcast load

fixes #511
parent 88d7c0b5
No related branches found
No related tags found
No related merge requests found
Pipeline #18801 passed
Pipeline: Castopod

#18806

    Pipeline: Castopod

    #18805

      Pipeline: castopod.org

      #18804

        ......@@ -5,6 +5,7 @@ declare(strict_types=1);
        namespace Modules\Fediverse\Commands;
        use CodeIgniter\CLI\BaseCommand;
        use Exception;
        class Broadcast extends BaseCommand
        {
        ......@@ -20,32 +21,49 @@ class Broadcast extends BaseCommand
        // retrieve scheduled activities from database
        $scheduledActivities = model('ActivityModel', false)
        ->getScheduledActivities();
        ->getScheduledActivities(10);
        foreach ($scheduledActivities as $scheduledActivity) {
        // set activity post to processing
        model('ActivityModel', false)
        ->update($scheduledActivity->id, [
        'status' => 'processing',
        ]);
        }
        // Send activity to all followers
        foreach ($scheduledActivities as $scheduledActivity) {
        if ($scheduledActivity->target_actor_id !== null) {
        if ($scheduledActivity->actor_id !== $scheduledActivity->target_actor_id) {
        // send activity to targeted actor
        send_activity_to_actor(
        try {
        if ($scheduledActivity->target_actor_id !== null) {
        if ($scheduledActivity->actor_id !== $scheduledActivity->target_actor_id) {
        // send activity to targeted actor
        send_activity_to_actor(
        $scheduledActivity->actor,
        $scheduledActivity->targetActor,
        json_encode($scheduledActivity->payload, JSON_THROW_ON_ERROR)
        );
        }
        } else {
        // send activity to all actor followers
        send_activity_to_followers(
        $scheduledActivity->actor,
        $scheduledActivity->targetActor,
        json_encode($scheduledActivity->payload, JSON_THROW_ON_ERROR)
        json_encode($scheduledActivity->payload, JSON_THROW_ON_ERROR),
        );
        }
        } else {
        // send activity to all actor followers
        send_activity_to_followers(
        $scheduledActivity->actor,
        json_encode($scheduledActivity->payload, JSON_THROW_ON_ERROR),
        );
        }
        // set activity post to delivered
        model('ActivityModel', false)
        ->update($scheduledActivity->id, [
        'status' => 'delivered',
        ]);
        // set activity post to delivered
        model('ActivityModel', false)
        ->update($scheduledActivity->id, [
        'status' => 'delivered',
        ]);
        } catch (Exception) {
        // set activity post to delivered
        model('ActivityModel', false)
        ->update($scheduledActivity->id, [
        'status' => 'failed',
        ]);
        }
        }
        }
        }
        <?php
        declare(strict_types=1);
        /**
        * @copyright 2024 Ad Aures
        * @license https://www.gnu.org/licenses/agpl-3.0.en.html AGPL3
        * @link https://castopod.org/
        */
        namespace Modules\Fediverse\Migrations;
        use App\Database\Migrations\BaseMigration;
        class UpdateActivitiesStatus extends BaseMigration
        {
        public function up(): void
        {
        $fields = [
        'status' => [
        'type' => 'ENUM',
        'constraint' => ['queued', 'processing', 'delivered', 'failed'],
        'null' => true,
        ],
        ];
        $this->forge->modifyColumn('fediverse_activities', $fields);
        }
        }
        ......@@ -131,6 +131,7 @@ if (! function_exists('send_activity_to_followers')) {
        */
        function send_activity_to_followers(Actor $actor, string $activityPayload): void
        {
        // TODO: send activities in parallel with https://www.php.net/manual/en/function.curl-multi-init.php
        foreach ($actor->followers as $follower) {
        send_activity_to_actor($actor, $follower, $activityPayload);
        }
        ......
        ......@@ -123,11 +123,12 @@ class ActivityModel extends UuidModel
        /**
        * @return Activity[]
        */
        public function getScheduledActivities(): array
        public function getScheduledActivities(int $limit = 10): array
        {
        return $this->where('`scheduled_at` <= UTC_TIMESTAMP()', null, false)
        ->where('status', 'queued')
        ->orderBy('scheduled_at', 'ASC')
        ->limit($limit)
        ->findAll();
        }
        ......
        0% Loading or .
        You are about to add 0 people to the discussion. Proceed with caution.
        Please register or to comment