diff --git a/changelog.d/search-healthcheck.add b/changelog.d/search-healthcheck.add
new file mode 100644
index 000000000..4974925e7
--- /dev/null
+++ b/changelog.d/search-healthcheck.add
@@ -0,0 +1 @@
+Monitoring of search backend health to control the processing of jobs in the search indexing Oban queue
diff --git a/config/config.exs b/config/config.exs
index b69044a2b..8b9a588b7 100644
--- a/config/config.exs
+++ b/config/config.exs
@@ -579,7 +579,7 @@ config :pleroma, Oban,
attachments_cleanup: 1,
new_users_digest: 1,
mute_expire: 5,
- search_indexing: 10,
+ search_indexing: [limit: 10, paused: true],
rich_media_expiration: 2
],
plugins: [Oban.Plugins.Pruner],
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 649bb11c8..d266d1836 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -109,7 +109,8 @@ defmodule Pleroma.Application do
streamer_registry() ++
background_migrators() ++
shout_child(shout_enabled?()) ++
- [Pleroma.Gopher.Server]
+ [Pleroma.Gopher.Server] ++
+ [Pleroma.Search.Healthcheck]
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
# for other strategies and supported options
diff --git a/lib/pleroma/search.ex b/lib/pleroma/search.ex
index 3b266e59b..fd0218cb8 100644
--- a/lib/pleroma/search.ex
+++ b/lib/pleroma/search.ex
@@ -10,8 +10,12 @@ defmodule Pleroma.Search do
end
def search(query, options) do
- search_module = Pleroma.Config.get([Pleroma.Search, :module], Pleroma.Activity)
-
+ search_module = Pleroma.Config.get([Pleroma.Search, :module])
search_module.search(options[:for_user], query, options)
end
+
+ def healthcheck_endpoints do
+ search_module = Pleroma.Config.get([Pleroma.Search, :module])
+ search_module.healthcheck_endpoints
+ end
end
diff --git a/lib/pleroma/search/database_search.ex b/lib/pleroma/search/database_search.ex
index 31bfc7e33..11e99e7f1 100644
--- a/lib/pleroma/search/database_search.ex
+++ b/lib/pleroma/search/database_search.ex
@@ -48,6 +48,9 @@ defmodule Pleroma.Search.DatabaseSearch do
@impl true
def remove_from_index(_object), do: :ok
+ @impl true
+ def healthcheck_endpoints, do: nil
+
def maybe_restrict_author(query, %User{} = author) do
Activity.Queries.by_author(query, author)
end
diff --git a/lib/pleroma/search/healthcheck.ex b/lib/pleroma/search/healthcheck.ex
new file mode 100644
index 000000000..e562c8478
--- /dev/null
+++ b/lib/pleroma/search/healthcheck.ex
@@ -0,0 +1,86 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2024 Pleroma Authors
+# SPDX-License-Identifier: AGPL-3.0-only
+defmodule Pleroma.Search.Healthcheck do
+ @doc """
+ Monitors health of search backend to control processing of events based on health and availability.
+ """
+ use GenServer
+ require Logger
+
+ @queue :search_indexing
+ @tick :timer.seconds(5)
+ @timeout :timer.seconds(2)
+
+ def start_link(_) do
+ GenServer.start_link(__MODULE__, [], name: __MODULE__)
+ end
+
+ @impl true
+ def init(_) do
+ state = %{healthy: false}
+ {:ok, state, {:continue, :start}}
+ end
+
+ @impl true
+ def handle_continue(:start, state) do
+ tick()
+ {:noreply, state}
+ end
+
+ @impl true
+ def handle_info(:check, state) do
+ urls = Pleroma.Search.healthcheck_endpoints()
+
+ new_state =
+ if check(urls) do
+ Oban.resume_queue(queue: @queue)
+ Map.put(state, :healthy, true)
+ else
+ Oban.pause_queue(queue: @queue)
+ Map.put(state, :healthy, false)
+ end
+
+ maybe_log_state_change(state, new_state)
+
+ tick()
+ {:noreply, new_state}
+ end
+
+ @impl true
+ def handle_call(:state, _from, state) do
+ {:reply, state, state, :hibernate}
+ end
+
+ def state, do: GenServer.call(__MODULE__, :state)
+
+ def check([]), do: true
+
+ def check(urls) when is_list(urls) do
+ Enum.all?(
+ urls,
+ fn url ->
+ case Pleroma.HTTP.get(url, [], recv_timeout: @timeout) do
+ {:ok, %{status: 200}} -> true
+ _ -> false
+ end
+ end
+ )
+ end
+
+ def check(_), do: true
+
+ defp tick do
+ Process.send_after(self(), :check, @tick)
+ end
+
+ defp maybe_log_state_change(%{healthy: true}, %{healthy: false}) do
+ Logger.error("Pausing Oban queue #{@queue} due to search backend healthcheck failure")
+ end
+
+ defp maybe_log_state_change(%{healthy: false}, %{healthy: true}) do
+ Logger.info("Resuming Oban queue #{@queue} due to search backend healthcheck pass")
+ end
+
+ defp maybe_log_state_change(_, _), do: :ok
+end
diff --git a/lib/pleroma/search/meilisearch.ex b/lib/pleroma/search/meilisearch.ex
index 2bff663e8..08c2f3d86 100644
--- a/lib/pleroma/search/meilisearch.ex
+++ b/lib/pleroma/search/meilisearch.ex
@@ -178,4 +178,15 @@ defmodule Pleroma.Search.Meilisearch do
def remove_from_index(object) do
meili_delete("/indexes/objects/documents/#{object.id}")
end
+
+ @impl true
+ def healthcheck_endpoints do
+ endpoint =
+ Config.get([Pleroma.Search.Meilisearch, :url])
+ |> URI.parse()
+ |> Map.put(:path, "/health")
+ |> URI.to_string()
+
+ [endpoint]
+ end
end
diff --git a/lib/pleroma/search/search_backend.ex b/lib/pleroma/search/search_backend.ex
index 68bc48cec..13c887bc2 100644
--- a/lib/pleroma/search/search_backend.ex
+++ b/lib/pleroma/search/search_backend.ex
@@ -21,4 +21,12 @@ defmodule Pleroma.Search.SearchBackend do
from index.
"""
@callback remove_from_index(object :: Pleroma.Object.t()) :: :ok | {:error, any()}
+
+ @doc """
+ Healthcheck endpoints of search backend infrastructure to monitor for controlling
+ processing of jobs in the Oban queue.
+
+ It is expected a 200 response is healthy and other responses are unhealthy.
+ """
+ @callback healthcheck_endpoints :: list() | nil
end
diff --git a/test/pleroma/search/healthcheck_test.exs b/test/pleroma/search/healthcheck_test.exs
new file mode 100644
index 000000000..e7649d949
--- /dev/null
+++ b/test/pleroma/search/healthcheck_test.exs
@@ -0,0 +1,49 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2024 Pleroma Authors
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Search.HealthcheckTest do
+ use Pleroma.DataCase
+
+ import Tesla.Mock
+
+ alias Pleroma.Search.Healthcheck
+
+ @good1 "http://good1.example.com/healthz"
+ @good2 "http://good2.example.com/health"
+ @bad "http://bad.example.com/healthy"
+
+ setup do
+ mock(fn
+ %{method: :get, url: @good1} ->
+ %Tesla.Env{
+ status: 200,
+ body: ""
+ }
+
+ %{method: :get, url: @good2} ->
+ %Tesla.Env{
+ status: 200,
+ body: ""
+ }
+
+ %{method: :get, url: @bad} ->
+ %Tesla.Env{
+ status: 503,
+ body: ""
+ }
+ end)
+
+ :ok
+ end
+
+ test "true for 200 responses" do
+ assert Healthcheck.check([@good1])
+ assert Healthcheck.check([@good1, @good2])
+ end
+
+ test "false if any response is not a 200" do
+ refute Healthcheck.check([@bad])
+ refute Healthcheck.check([@good1, @bad])
+ end
+end