From 1d7583d738219574ae3d45d294dc94e7e406472b Mon Sep 17 00:00:00 2001
From: Yassine Doghri <yassine@doghri.fr>
Date: Fri, 16 Aug 2024 15:28:28 +0000
Subject: [PATCH] fix(fediverse): add "processing" and "failed" statuses to
 better manage broadcast load

fixes #511
---
 modules/Fediverse/Commands/Broadcast.php      | 56 ++++++++++++-------
 ...-01-02-120000_update_activities_status.php | 31 ++++++++++
 .../Fediverse/Helpers/fediverse_helper.php    |  1 +
 modules/Fediverse/Models/ActivityModel.php    |  3 +-
 4 files changed, 71 insertions(+), 20 deletions(-)
 create mode 100644 modules/Fediverse/Database/Migrations/2018-01-02-120000_update_activities_status.php

diff --git a/modules/Fediverse/Commands/Broadcast.php b/modules/Fediverse/Commands/Broadcast.php
index ed718f0b68..4f2b3e5f6e 100644
--- a/modules/Fediverse/Commands/Broadcast.php
+++ b/modules/Fediverse/Commands/Broadcast.php
@@ -5,6 +5,7 @@ declare(strict_types=1);
 namespace Modules\Fediverse\Commands;
 
 use CodeIgniter\CLI\BaseCommand;
+use Exception;
 use Override;
 
 class Broadcast extends BaseCommand
@@ -22,32 +23,49 @@ class Broadcast extends BaseCommand
 
         // retrieve scheduled activities from database
         $scheduledActivities = model('ActivityModel', false)
-            ->getScheduledActivities();
+            ->getScheduledActivities(10);
+
+        foreach ($scheduledActivities as $scheduledActivity) {
+            // set activity post to processing
+            model('ActivityModel', false)
+                ->update($scheduledActivity->id, [
+                    'status' => 'processing',
+                ]);
+        }
 
         // Send activity to all followers
         foreach ($scheduledActivities as $scheduledActivity) {
-            if ($scheduledActivity->target_actor_id !== null) {
-                if ($scheduledActivity->actor_id !== $scheduledActivity->target_actor_id) {
-                    // send activity to targeted actor
-                    send_activity_to_actor(
+            try {
+                if ($scheduledActivity->target_actor_id !== null) {
+                    if ($scheduledActivity->actor_id !== $scheduledActivity->target_actor_id) {
+                        // send activity to targeted actor
+                        send_activity_to_actor(
+                            $scheduledActivity->actor,
+                            $scheduledActivity->targetActor,
+                            json_encode($scheduledActivity->payload, JSON_THROW_ON_ERROR)
+                        );
+                    }
+                } else {
+                    // send activity to all actor followers
+                    send_activity_to_followers(
                         $scheduledActivity->actor,
-                        $scheduledActivity->targetActor,
-                        json_encode($scheduledActivity->payload, JSON_THROW_ON_ERROR)
+                        json_encode($scheduledActivity->payload, JSON_THROW_ON_ERROR),
                     );
                 }
-            } else {
-                // send activity to all actor followers
-                send_activity_to_followers(
-                    $scheduledActivity->actor,
-                    json_encode($scheduledActivity->payload, JSON_THROW_ON_ERROR),
-                );
-            }
 
-            // set activity post to delivered
-            model('ActivityModel', false)
-                ->update($scheduledActivity->id, [
-                    'status' => 'delivered',
-                ]);
+                // set activity post to delivered
+                model('ActivityModel', false)
+                    ->update($scheduledActivity->id, [
+                        'status' => 'delivered',
+                    ]);
+
+            } catch (Exception) {
+                // set activity post to delivered
+                model('ActivityModel', false)
+                    ->update($scheduledActivity->id, [
+                        'status' => 'failed',
+                    ]);
+            }
         }
     }
 }
diff --git a/modules/Fediverse/Database/Migrations/2018-01-02-120000_update_activities_status.php b/modules/Fediverse/Database/Migrations/2018-01-02-120000_update_activities_status.php
new file mode 100644
index 0000000000..c8ccf0e92e
--- /dev/null
+++ b/modules/Fediverse/Database/Migrations/2018-01-02-120000_update_activities_status.php
@@ -0,0 +1,31 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * @copyright  2024 Ad Aures
+ * @license    https://www.gnu.org/licenses/agpl-3.0.en.html AGPL3
+ * @link       https://castopod.org/
+ */
+
+namespace Modules\Fediverse\Migrations;
+
+use App\Database\Migrations\BaseMigration;
+use Override;
+
+class UpdateActivitiesStatus extends BaseMigration
+{
+    #[Override]
+    public function up(): void
+    {
+        $fields = [
+            'status' => [
+                'type'       => 'ENUM',
+                'constraint' => ['queued', 'processing', 'delivered', 'failed'],
+                'null'       => true,
+            ],
+        ];
+
+        $this->forge->modifyColumn('fediverse_activities', $fields);
+    }
+}
diff --git a/modules/Fediverse/Helpers/fediverse_helper.php b/modules/Fediverse/Helpers/fediverse_helper.php
index 0b8d53ed39..09cdb7d8ca 100644
--- a/modules/Fediverse/Helpers/fediverse_helper.php
+++ b/modules/Fediverse/Helpers/fediverse_helper.php
@@ -131,6 +131,7 @@ if (! function_exists('send_activity_to_followers')) {
      */
     function send_activity_to_followers(Actor $actor, string $activityPayload): void
     {
+        // TODO: send activities in parallel with https://www.php.net/manual/en/function.curl-multi-init.php
         foreach ($actor->followers as $follower) {
             send_activity_to_actor($actor, $follower, $activityPayload);
         }
diff --git a/modules/Fediverse/Models/ActivityModel.php b/modules/Fediverse/Models/ActivityModel.php
index 47341b93d0..548ee4a48e 100644
--- a/modules/Fediverse/Models/ActivityModel.php
+++ b/modules/Fediverse/Models/ActivityModel.php
@@ -123,11 +123,12 @@ class ActivityModel extends UuidModel
     /**
      * @return Activity[]
      */
-    public function getScheduledActivities(): array
+    public function getScheduledActivities(int $limit = 10): array
     {
         return $this->where('`scheduled_at` <= UTC_TIMESTAMP()', null, false)
             ->where('status', 'queued')
             ->orderBy('scheduled_at', 'ASC')
+            ->limit($limit)
             ->findAll();
     }
 
-- 
GitLab