Merge branch 'oban/rich-media-backfill' into 'develop'
Rich Media Backfill with Oban See merge request pleroma/pleroma!4152
This commit is contained in:
commit
d4563f67e0
1
changelog.d/rich_media_backfill.change
Normal file
1
changelog.d/rich_media_backfill.change
Normal file
@ -0,0 +1 @@
|
|||||||
|
Rich Media backfilling is now an Oban job
|
@ -180,8 +180,6 @@ config :pleroma, Pleroma.Application,
|
|||||||
|
|
||||||
config :pleroma, Pleroma.Web.Streaming, sync_streaming: true
|
config :pleroma, Pleroma.Web.Streaming, sync_streaming: true
|
||||||
|
|
||||||
config :pleroma, Pleroma.Web.MastodonAPI.StatusView, sync_fetching: true
|
|
||||||
|
|
||||||
config :pleroma, Pleroma.Uploaders.Uploader, timeout: 1_000
|
config :pleroma, Pleroma.Uploaders.Uploader, timeout: 1_000
|
||||||
|
|
||||||
config :pleroma, Pleroma.Emoji.Loader, test_emoji: true
|
config :pleroma, Pleroma.Emoji.Loader, test_emoji: true
|
||||||
|
@ -8,7 +8,6 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
|
|||||||
require Pleroma.Constants
|
require Pleroma.Constants
|
||||||
|
|
||||||
alias Pleroma.Activity
|
alias Pleroma.Activity
|
||||||
alias Pleroma.Config
|
|
||||||
alias Pleroma.HTML
|
alias Pleroma.HTML
|
||||||
alias Pleroma.Maps
|
alias Pleroma.Maps
|
||||||
alias Pleroma.Object
|
alias Pleroma.Object
|
||||||
@ -31,13 +30,7 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
|
|||||||
# pagination is restricted to 40 activities at a time
|
# pagination is restricted to 40 activities at a time
|
||||||
defp fetch_rich_media_for_activities(activities) do
|
defp fetch_rich_media_for_activities(activities) do
|
||||||
Enum.each(activities, fn activity ->
|
Enum.each(activities, fn activity ->
|
||||||
fun = fn -> Card.get_by_activity(activity) end
|
Card.get_by_activity(activity)
|
||||||
|
|
||||||
if Config.get([__MODULE__, :sync_fetching], false) do
|
|
||||||
fun.()
|
|
||||||
else
|
|
||||||
spawn(fun)
|
|
||||||
end
|
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -6,35 +6,25 @@ defmodule Pleroma.Web.RichMedia.Backfill do
|
|||||||
alias Pleroma.Web.RichMedia.Card
|
alias Pleroma.Web.RichMedia.Card
|
||||||
alias Pleroma.Web.RichMedia.Parser
|
alias Pleroma.Web.RichMedia.Parser
|
||||||
alias Pleroma.Web.RichMedia.Parser.TTL
|
alias Pleroma.Web.RichMedia.Parser.TTL
|
||||||
alias Pleroma.Workers.RichMediaExpirationWorker
|
alias Pleroma.Workers.RichMediaWorker
|
||||||
|
|
||||||
require Logger
|
require Logger
|
||||||
|
|
||||||
@backfiller Pleroma.Config.get([__MODULE__, :provider], Pleroma.Web.RichMedia.Backfill.Task)
|
|
||||||
@cachex Pleroma.Config.get([:cachex, :provider], Cachex)
|
@cachex Pleroma.Config.get([:cachex, :provider], Cachex)
|
||||||
@max_attempts 3
|
|
||||||
@retry 5_000
|
|
||||||
|
|
||||||
def start(%{url: url} = args) when is_binary(url) do
|
@spec run(map()) ::
|
||||||
|
:ok | {:error, {:invalid_metadata, any()} | :body_too_large | {:content, any()} | any()}
|
||||||
|
def run(%{"url" => url} = args) do
|
||||||
url_hash = Card.url_to_hash(url)
|
url_hash = Card.url_to_hash(url)
|
||||||
|
|
||||||
args =
|
|
||||||
args
|
|
||||||
|> Map.put(:attempt, 1)
|
|
||||||
|> Map.put(:url_hash, url_hash)
|
|
||||||
|
|
||||||
@backfiller.run(args)
|
|
||||||
end
|
|
||||||
|
|
||||||
def run(%{url: url, url_hash: url_hash, attempt: attempt} = args)
|
|
||||||
when attempt <= @max_attempts do
|
|
||||||
case Parser.parse(url) do
|
case Parser.parse(url) do
|
||||||
{:ok, fields} ->
|
{:ok, fields} ->
|
||||||
{:ok, card} = Card.create(url, fields)
|
{:ok, card} = Card.create(url, fields)
|
||||||
|
|
||||||
maybe_schedule_expiration(url, fields)
|
maybe_schedule_expiration(url, fields)
|
||||||
|
|
||||||
if Map.has_key?(args, :activity_id) do
|
with %{"activity_id" => activity_id} <- args,
|
||||||
|
false <- is_nil(activity_id) do
|
||||||
stream_update(args)
|
stream_update(args)
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -54,25 +44,16 @@ defmodule Pleroma.Web.RichMedia.Backfill do
|
|||||||
|
|
||||||
e ->
|
e ->
|
||||||
Logger.debug("Rich media error for #{url}: #{inspect(e)}")
|
Logger.debug("Rich media error for #{url}: #{inspect(e)}")
|
||||||
|
{:error, e}
|
||||||
:timer.sleep(@retry * attempt)
|
|
||||||
|
|
||||||
run(%{args | attempt: attempt + 1})
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def run(%{url: url, url_hash: url_hash}) do
|
|
||||||
Logger.debug("Rich media failure for #{url}")
|
|
||||||
|
|
||||||
negative_cache(url_hash, :timer.minutes(15))
|
|
||||||
end
|
|
||||||
|
|
||||||
defp maybe_schedule_expiration(url, fields) do
|
defp maybe_schedule_expiration(url, fields) do
|
||||||
case TTL.process(fields, url) do
|
case TTL.process(fields, url) do
|
||||||
{:ok, ttl} when is_number(ttl) ->
|
{:ok, ttl} when is_number(ttl) ->
|
||||||
timestamp = DateTime.from_unix!(ttl)
|
timestamp = DateTime.from_unix!(ttl)
|
||||||
|
|
||||||
RichMediaExpirationWorker.new(%{"url" => url}, scheduled_at: timestamp)
|
RichMediaWorker.new(%{"op" => "expire", "url" => url}, scheduled_at: timestamp)
|
||||||
|> Oban.insert()
|
|> Oban.insert()
|
||||||
|
|
||||||
_ ->
|
_ ->
|
||||||
@ -80,22 +61,14 @@ defmodule Pleroma.Web.RichMedia.Backfill do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp stream_update(%{activity_id: activity_id}) do
|
defp stream_update(%{"activity_id" => activity_id}) do
|
||||||
Pleroma.Activity.get_by_id(activity_id)
|
Pleroma.Activity.get_by_id(activity_id)
|
||||||
|> Pleroma.Activity.normalize()
|
|> Pleroma.Activity.normalize()
|
||||||
|> Pleroma.Web.ActivityPub.ActivityPub.stream_out()
|
|> Pleroma.Web.ActivityPub.ActivityPub.stream_out()
|
||||||
end
|
end
|
||||||
|
|
||||||
defp warm_cache(key, val), do: @cachex.put(:rich_media_cache, key, val)
|
defp warm_cache(key, val), do: @cachex.put(:rich_media_cache, key, val)
|
||||||
defp negative_cache(key, ttl \\ nil), do: @cachex.put(:rich_media_cache, key, nil, ttl: ttl)
|
|
||||||
end
|
|
||||||
|
|
||||||
defmodule Pleroma.Web.RichMedia.Backfill.Task do
|
defp negative_cache(key, ttl \\ :timer.minutes(15)),
|
||||||
alias Pleroma.Web.RichMedia.Backfill
|
do: @cachex.put(:rich_media_cache, key, nil, ttl: ttl)
|
||||||
|
|
||||||
def run(args) do
|
|
||||||
Task.Supervisor.start_child(Pleroma.TaskSupervisor, Backfill, :run, [args],
|
|
||||||
name: {:global, {:rich_media, args.url_hash}}
|
|
||||||
)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
@ -7,8 +7,8 @@ defmodule Pleroma.Web.RichMedia.Card do
|
|||||||
alias Pleroma.HTML
|
alias Pleroma.HTML
|
||||||
alias Pleroma.Object
|
alias Pleroma.Object
|
||||||
alias Pleroma.Repo
|
alias Pleroma.Repo
|
||||||
alias Pleroma.Web.RichMedia.Backfill
|
|
||||||
alias Pleroma.Web.RichMedia.Parser
|
alias Pleroma.Web.RichMedia.Parser
|
||||||
|
alias Pleroma.Workers.RichMediaWorker
|
||||||
|
|
||||||
@cachex Pleroma.Config.get([:cachex, :provider], Cachex)
|
@cachex Pleroma.Config.get([:cachex, :provider], Cachex)
|
||||||
@config_impl Application.compile_env(:pleroma, [__MODULE__, :config_impl], Pleroma.Config)
|
@config_impl Application.compile_env(:pleroma, [__MODULE__, :config_impl], Pleroma.Config)
|
||||||
@ -75,17 +75,18 @@ defmodule Pleroma.Web.RichMedia.Card do
|
|||||||
|
|
||||||
def get_by_url(nil), do: nil
|
def get_by_url(nil), do: nil
|
||||||
|
|
||||||
@spec get_or_backfill_by_url(String.t(), map()) :: t() | nil
|
@spec get_or_backfill_by_url(String.t(), keyword()) :: t() | nil
|
||||||
def get_or_backfill_by_url(url, backfill_opts \\ %{}) do
|
def get_or_backfill_by_url(url, opts \\ []) do
|
||||||
if @config_impl.get([:rich_media, :enabled]) do
|
if @config_impl.get([:rich_media, :enabled]) do
|
||||||
case get_by_url(url) do
|
case get_by_url(url) do
|
||||||
%__MODULE__{} = card ->
|
%__MODULE__{} = card ->
|
||||||
card
|
card
|
||||||
|
|
||||||
nil ->
|
nil ->
|
||||||
backfill_opts = Map.put(backfill_opts, :url, url)
|
activity_id = Keyword.get(opts, :activity, nil)
|
||||||
|
|
||||||
Backfill.start(backfill_opts)
|
RichMediaWorker.new(%{"op" => "backfill", "url" => url, "activity_id" => activity_id})
|
||||||
|
|> Oban.insert()
|
||||||
|
|
||||||
nil
|
nil
|
||||||
|
|
||||||
@ -137,7 +138,7 @@ defmodule Pleroma.Web.RichMedia.Card do
|
|||||||
nil
|
nil
|
||||||
else
|
else
|
||||||
{:cached, url} ->
|
{:cached, url} ->
|
||||||
get_or_backfill_by_url(url, %{activity_id: activity.id})
|
get_or_backfill_by_url(url, activity_id: activity.id)
|
||||||
|
|
||||||
_ ->
|
_ ->
|
||||||
:error
|
:error
|
||||||
|
@ -1,15 +0,0 @@
|
|||||||
# Pleroma: A lightweight social networking server
|
|
||||||
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
|
|
||||||
# SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
|
|
||||||
defmodule Pleroma.Workers.RichMediaExpirationWorker do
|
|
||||||
alias Pleroma.Web.RichMedia.Card
|
|
||||||
|
|
||||||
use Oban.Worker,
|
|
||||||
queue: :background
|
|
||||||
|
|
||||||
@impl Oban.Worker
|
|
||||||
def perform(%Job{args: %{"url" => url} = _args}) do
|
|
||||||
Card.delete(url)
|
|
||||||
end
|
|
||||||
end
|
|
19
lib/pleroma/workers/rich_media_worker.ex
Normal file
19
lib/pleroma/workers/rich_media_worker.ex
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Workers.RichMediaWorker do
|
||||||
|
alias Pleroma.Web.RichMedia.Backfill
|
||||||
|
alias Pleroma.Web.RichMedia.Card
|
||||||
|
|
||||||
|
use Oban.Worker, queue: :background, max_attempts: 3, unique: [period: 300]
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(%Job{args: %{"op" => "expire", "url" => url} = _args}) do
|
||||||
|
Card.delete(url)
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform(%Job{args: %{"op" => "backfill", "url" => _url} = args}) do
|
||||||
|
Backfill.run(args)
|
||||||
|
end
|
||||||
|
end
|
@ -9,6 +9,7 @@ defmodule Pleroma.Web.PleromaAPI.ChatMessageReferenceViewTest do
|
|||||||
alias Pleroma.Chat
|
alias Pleroma.Chat
|
||||||
alias Pleroma.Chat.MessageReference
|
alias Pleroma.Chat.MessageReference
|
||||||
alias Pleroma.Object
|
alias Pleroma.Object
|
||||||
|
alias Pleroma.Tests.ObanHelpers
|
||||||
alias Pleroma.UnstubbedConfigMock, as: ConfigMock
|
alias Pleroma.UnstubbedConfigMock, as: ConfigMock
|
||||||
alias Pleroma.Web.ActivityPub.ActivityPub
|
alias Pleroma.Web.ActivityPub.ActivityPub
|
||||||
alias Pleroma.Web.CommonAPI
|
alias Pleroma.Web.CommonAPI
|
||||||
@ -70,6 +71,8 @@ defmodule Pleroma.Web.PleromaAPI.ChatMessageReferenceViewTest do
|
|||||||
media_id: upload.id
|
media_id: upload.id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ObanHelpers.perform_all()
|
||||||
|
|
||||||
object = Object.normalize(activity, fetch: false)
|
object = Object.normalize(activity, fetch: false)
|
||||||
|
|
||||||
cm_ref = MessageReference.for_chat_and_object(chat, object)
|
cm_ref = MessageReference.for_chat_and_object(chat, object)
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
defmodule Pleroma.Web.RichMedia.CardTest do
|
defmodule Pleroma.Web.RichMedia.CardTest do
|
||||||
use Pleroma.DataCase, async: true
|
use Pleroma.DataCase, async: true
|
||||||
|
|
||||||
|
alias Pleroma.Tests.ObanHelpers
|
||||||
alias Pleroma.UnstubbedConfigMock, as: ConfigMock
|
alias Pleroma.UnstubbedConfigMock, as: ConfigMock
|
||||||
alias Pleroma.Web.CommonAPI
|
alias Pleroma.Web.CommonAPI
|
||||||
alias Pleroma.Web.RichMedia.Card
|
alias Pleroma.Web.RichMedia.Card
|
||||||
@ -36,6 +37,8 @@ defmodule Pleroma.Web.RichMedia.CardTest do
|
|||||||
content_type: "text/markdown"
|
content_type: "text/markdown"
|
||||||
})
|
})
|
||||||
|
|
||||||
|
ObanHelpers.perform_all()
|
||||||
|
|
||||||
assert %Card{url_hash: ^url_hash, fields: _} = Card.get_by_activity(activity)
|
assert %Card{url_hash: ^url_hash, fields: _} = Card.get_by_activity(activity)
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -50,6 +53,7 @@ defmodule Pleroma.Web.RichMedia.CardTest do
|
|||||||
|
|
||||||
# Force a backfill
|
# Force a backfill
|
||||||
Card.get_by_activity(activity)
|
Card.get_by_activity(activity)
|
||||||
|
ObanHelpers.perform_all()
|
||||||
|
|
||||||
assert match?(
|
assert match?(
|
||||||
%Card{url_hash: ^original_url_hash, fields: _},
|
%Card{url_hash: ^original_url_hash, fields: _},
|
||||||
@ -62,6 +66,7 @@ defmodule Pleroma.Web.RichMedia.CardTest do
|
|||||||
|
|
||||||
# Force a backfill
|
# Force a backfill
|
||||||
Card.get_by_activity(activity)
|
Card.get_by_activity(activity)
|
||||||
|
ObanHelpers.perform_all()
|
||||||
|
|
||||||
assert match?(
|
assert match?(
|
||||||
%Card{url_hash: ^updated_url_hash, fields: _},
|
%Card{url_hash: ^updated_url_hash, fields: _},
|
||||||
|
@ -4,10 +4,11 @@
|
|||||||
|
|
||||||
defmodule Pleroma.Web.RichMedia.Parser.TTL.AwsSignedUrlTest do
|
defmodule Pleroma.Web.RichMedia.Parser.TTL.AwsSignedUrlTest do
|
||||||
use Pleroma.DataCase, async: false
|
use Pleroma.DataCase, async: false
|
||||||
use Oban.Testing, repo: Pleroma.Repo
|
use Oban.Testing, repo: Pleroma.Repo, testing: :inline
|
||||||
|
|
||||||
import Mox
|
import Mox
|
||||||
|
|
||||||
|
alias Pleroma.Tests.ObanHelpers
|
||||||
alias Pleroma.UnstubbedConfigMock, as: ConfigMock
|
alias Pleroma.UnstubbedConfigMock, as: ConfigMock
|
||||||
alias Pleroma.Web.RichMedia.Card
|
alias Pleroma.Web.RichMedia.Card
|
||||||
alias Pleroma.Web.RichMedia.Parser.TTL.AwsSignedUrl
|
alias Pleroma.Web.RichMedia.Parser.TTL.AwsSignedUrl
|
||||||
@ -74,9 +75,19 @@ defmodule Pleroma.Web.RichMedia.Parser.TTL.AwsSignedUrlTest do
|
|||||||
|
|
||||||
Card.get_or_backfill_by_url(url)
|
Card.get_or_backfill_by_url(url)
|
||||||
|
|
||||||
assert_enqueued(worker: Pleroma.Workers.RichMediaExpirationWorker, args: %{"url" => url})
|
# Find the backfill job
|
||||||
|
expected_job =
|
||||||
|
[
|
||||||
|
worker: "Pleroma.Workers.RichMediaWorker",
|
||||||
|
args: %{"op" => "backfill", "url" => url}
|
||||||
|
]
|
||||||
|
|
||||||
[%Oban.Job{scheduled_at: scheduled_at}] = all_enqueued()
|
assert_enqueued(expected_job)
|
||||||
|
|
||||||
|
# Run it manually
|
||||||
|
ObanHelpers.perform_all()
|
||||||
|
|
||||||
|
[%Oban.Job{scheduled_at: scheduled_at} | _] = all_enqueued()
|
||||||
|
|
||||||
timestamp_dt = Timex.parse!(timestamp, "{ISO:Basic:Z}")
|
timestamp_dt = Timex.parse!(timestamp, "{ISO:Basic:Z}")
|
||||||
|
|
||||||
|
@ -8,6 +8,7 @@ defmodule Pleroma.Web.RichMedia.Parser.TTL.OpengraphTest do
|
|||||||
|
|
||||||
import Mox
|
import Mox
|
||||||
|
|
||||||
|
alias Pleroma.Tests.ObanHelpers
|
||||||
alias Pleroma.UnstubbedConfigMock, as: ConfigMock
|
alias Pleroma.UnstubbedConfigMock, as: ConfigMock
|
||||||
alias Pleroma.Web.RichMedia.Card
|
alias Pleroma.Web.RichMedia.Card
|
||||||
|
|
||||||
@ -36,6 +37,21 @@ defmodule Pleroma.Web.RichMedia.Parser.TTL.OpengraphTest do
|
|||||||
|
|
||||||
Card.get_or_backfill_by_url(url)
|
Card.get_or_backfill_by_url(url)
|
||||||
|
|
||||||
assert_enqueued(worker: Pleroma.Workers.RichMediaExpirationWorker, args: %{"url" => url})
|
# Find the backfill job
|
||||||
|
expected_job =
|
||||||
|
[
|
||||||
|
worker: "Pleroma.Workers.RichMediaWorker",
|
||||||
|
args: %{"op" => "backfill", "url" => url}
|
||||||
|
]
|
||||||
|
|
||||||
|
assert_enqueued(expected_job)
|
||||||
|
|
||||||
|
# Run it manually
|
||||||
|
ObanHelpers.perform_all()
|
||||||
|
|
||||||
|
assert_enqueued(
|
||||||
|
worker: Pleroma.Workers.RichMediaWorker,
|
||||||
|
args: %{"op" => "expire", "url" => url}
|
||||||
|
)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
Loading…
Reference in New Issue
Block a user