Broadcast conversation update when DM is deleted
This commit is contained in:
parent
58d567ddc5
commit
f2c03425b0
@ -189,6 +189,22 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
|
|||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def stream_out_participations(%Object{data: %{"context" => context}}, user) do
|
||||||
|
with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
|
||||||
|
conversation = Repo.preload(conversation, :participations),
|
||||||
|
last_activity_id =
|
||||||
|
fetch_latest_activity_id_for_context(conversation.ap_id, %{
|
||||||
|
"user" => user,
|
||||||
|
"blocking_user" => user
|
||||||
|
}) do
|
||||||
|
if last_activity_id do
|
||||||
|
stream_out_participations(conversation.participations)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def stream_out_participations(_, _), do: :noop
|
||||||
|
|
||||||
def stream_out(activity) do
|
def stream_out(activity) do
|
||||||
public = "https://www.w3.org/ns/activitystreams#Public"
|
public = "https://www.w3.org/ns/activitystreams#Public"
|
||||||
|
|
||||||
@ -401,7 +417,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
|
|||||||
"to" => to,
|
"to" => to,
|
||||||
"deleted_activity_id" => activity && activity.id
|
"deleted_activity_id" => activity && activity.id
|
||||||
},
|
},
|
||||||
{:ok, activity} <- insert(data, local),
|
{:ok, activity} <- insert(data, local, false),
|
||||||
|
stream_out_participations(object, user),
|
||||||
_ <- decrease_replies_count_if_reply(object),
|
_ <- decrease_replies_count_if_reply(object),
|
||||||
# Changing note count prior to enqueuing federation task in order to avoid
|
# Changing note count prior to enqueuing federation task in order to avoid
|
||||||
# race conditions on updating user.info
|
# race conditions on updating user.info
|
||||||
|
@ -356,4 +356,110 @@ defmodule Pleroma.Web.StreamerTest do
|
|||||||
|
|
||||||
Task.await(task)
|
Task.await(task)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe "direct streams" do
|
||||||
|
setup do
|
||||||
|
GenServer.start(Streamer, %{}, name: Streamer)
|
||||||
|
|
||||||
|
on_exit(fn ->
|
||||||
|
if pid = Process.whereis(Streamer) do
|
||||||
|
Process.exit(pid, :kill)
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
|
test "it sends conversation update to the 'direct' stream", %{} do
|
||||||
|
user = insert(:user)
|
||||||
|
another_user = insert(:user)
|
||||||
|
|
||||||
|
task =
|
||||||
|
Task.async(fn ->
|
||||||
|
assert_receive {:text, _received_event}, 4_000
|
||||||
|
end)
|
||||||
|
|
||||||
|
Streamer.add_socket(
|
||||||
|
"direct",
|
||||||
|
%{transport_pid: task.pid, assigns: %{user: user}}
|
||||||
|
)
|
||||||
|
|
||||||
|
{:ok, _create_activity} =
|
||||||
|
CommonAPI.post(another_user, %{
|
||||||
|
"status" => "hey @#{user.nickname}",
|
||||||
|
"visibility" => "direct"
|
||||||
|
})
|
||||||
|
|
||||||
|
Task.await(task)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "it doesn't send conversation update to the 'direct' streamj when the last message in the conversation is deleted" do
|
||||||
|
user = insert(:user)
|
||||||
|
another_user = insert(:user)
|
||||||
|
|
||||||
|
{:ok, create_activity} =
|
||||||
|
CommonAPI.post(another_user, %{
|
||||||
|
"status" => "hi @#{user.nickname}",
|
||||||
|
"visibility" => "direct"
|
||||||
|
})
|
||||||
|
|
||||||
|
task =
|
||||||
|
Task.async(fn ->
|
||||||
|
assert_receive {:text, received_event}, 4_000
|
||||||
|
assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
|
||||||
|
|
||||||
|
refute_receive {:text, _}, 4_000
|
||||||
|
end)
|
||||||
|
|
||||||
|
Streamer.add_socket(
|
||||||
|
"direct",
|
||||||
|
%{transport_pid: task.pid, assigns: %{user: user}}
|
||||||
|
)
|
||||||
|
|
||||||
|
{:ok, _} = CommonAPI.delete(create_activity.id, another_user)
|
||||||
|
|
||||||
|
Task.await(task)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "it sends conversation update to the 'direct' stream when a message is deleted" do
|
||||||
|
user = insert(:user)
|
||||||
|
another_user = insert(:user)
|
||||||
|
|
||||||
|
{:ok, create_activity} =
|
||||||
|
CommonAPI.post(another_user, %{
|
||||||
|
"status" => "hi @#{user.nickname}",
|
||||||
|
"visibility" => "direct"
|
||||||
|
})
|
||||||
|
|
||||||
|
{:ok, create_activity2} =
|
||||||
|
CommonAPI.post(another_user, %{
|
||||||
|
"status" => "hi @#{user.nickname}",
|
||||||
|
"in_reply_to_status_id" => create_activity.id,
|
||||||
|
"visibility" => "direct"
|
||||||
|
})
|
||||||
|
|
||||||
|
task =
|
||||||
|
Task.async(fn ->
|
||||||
|
assert_receive {:text, received_event}, 4_000
|
||||||
|
assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
|
||||||
|
|
||||||
|
assert_receive {:text, received_event}, 4_000
|
||||||
|
|
||||||
|
assert %{"event" => "conversation", "payload" => received_payload} =
|
||||||
|
Jason.decode!(received_event)
|
||||||
|
|
||||||
|
assert %{"last_status" => last_status} = Jason.decode!(received_payload)
|
||||||
|
assert last_status["id"] == to_string(create_activity.id)
|
||||||
|
end)
|
||||||
|
|
||||||
|
Streamer.add_socket(
|
||||||
|
"direct",
|
||||||
|
%{transport_pid: task.pid, assigns: %{user: user}}
|
||||||
|
)
|
||||||
|
|
||||||
|
{:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
|
||||||
|
|
||||||
|
Task.await(task)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
Loading…
Reference in New Issue
Block a user