Merge branch '2074-streaming-api-oauth-scopes-validation' into 'develop'

[#2074] OAuth scope checking in Streaming API

Closes #2074

See merge request pleroma/pleroma!3013
This commit is contained in:
lain 2020-09-21 17:15:54 +00:00
commit 9108e27c2f
7 changed files with 332 additions and 183 deletions

View File

@ -53,7 +53,7 @@ defmodule Pleroma.Plugs.OAuthScopesPlug do
|> assign(:token, nil) |> assign(:token, nil)
end end
@doc "Filters descendants of supported scopes" @doc "Keeps those of `scopes` which are descendants of `supported_scopes`"
def filter_descendants(scopes, supported_scopes) do def filter_descendants(scopes, supported_scopes) do
Enum.filter( Enum.filter(
scopes, scopes,

View File

@ -23,8 +23,8 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
with params <- Enum.into(:cow_qs.parse_qs(qs), %{}), with params <- Enum.into(:cow_qs.parse_qs(qs), %{}),
sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil), sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil),
access_token <- Map.get(params, "access_token"), access_token <- Map.get(params, "access_token"),
{:ok, user} <- authenticate_request(access_token, sec_websocket), {:ok, user, oauth_token} <- authenticate_request(access_token, sec_websocket),
{:ok, topic} <- Streamer.get_topic(Map.get(params, "stream"), user, params) do {:ok, topic} <- Streamer.get_topic(params["stream"], user, oauth_token, params) do
req = req =
if sec_websocket do if sec_websocket do
:cowboy_req.set_resp_header("sec-websocket-protocol", sec_websocket, req) :cowboy_req.set_resp_header("sec-websocket-protocol", sec_websocket, req)
@ -117,7 +117,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
# Public streams without authentication. # Public streams without authentication.
defp authenticate_request(nil, nil) do defp authenticate_request(nil, nil) do
{:ok, nil} {:ok, nil, nil}
end end
# Authenticated streams. # Authenticated streams.
@ -125,9 +125,9 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
token = access_token || sec_websocket token = access_token || sec_websocket
with true <- is_bitstring(token), with true <- is_bitstring(token),
%Token{user_id: user_id} <- Repo.get_by(Token, token: token), oauth_token = %Token{user_id: user_id} <- Repo.get_by(Token, token: token),
user = %User{} <- User.get_cached_by_id(user_id) do user = %User{} <- User.get_cached_by_id(user_id) do
{:ok, user} {:ok, user, oauth_token}
else else
_ -> {:error, :unauthorized} _ -> {:error, :unauthorized}
end end

View File

@ -11,10 +11,12 @@ defmodule Pleroma.Web.Streamer do
alias Pleroma.Conversation.Participation alias Pleroma.Conversation.Participation
alias Pleroma.Notification alias Pleroma.Notification
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Plugs.OAuthScopesPlug
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
alias Pleroma.Web.OAuth.Token
alias Pleroma.Web.StreamerView alias Pleroma.Web.StreamerView
@mix_env Mix.env() @mix_env Mix.env()
@ -26,53 +28,87 @@ defmodule Pleroma.Web.Streamer do
@user_streams ["user", "user:notification", "direct", "user:pleroma_chat"] @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
@doc "Expands and authorizes a stream, and registers the process for streaming." @doc "Expands and authorizes a stream, and registers the process for streaming."
@spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) :: @spec get_topic_and_add_socket(
stream :: String.t(),
User.t() | nil,
Token.t() | nil,
Map.t() | nil
) ::
{:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized} {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
def get_topic_and_add_socket(stream, user, params \\ %{}) do def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
case get_topic(stream, user, params) do case get_topic(stream, user, oauth_token, params) do
{:ok, topic} -> add_socket(topic, user) {:ok, topic} -> add_socket(topic, user)
error -> error error -> error
end end
end end
@doc "Expand and authorizes a stream" @doc "Expand and authorizes a stream"
@spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) :: @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
{:ok, topic :: String.t()} | {:error, :bad_topic} {:ok, topic :: String.t()} | {:error, :bad_topic}
def get_topic(stream, user, params \\ %{}) def get_topic(stream, user, oauth_token, params \\ %{})
# Allow all public steams. # Allow all public steams.
def get_topic(stream, _, _) when stream in @public_streams do def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
{:ok, stream} {:ok, stream}
end end
# Allow all hashtags streams. # Allow all hashtags streams.
def get_topic("hashtag", _, %{"tag" => tag}) do def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
{:ok, "hashtag:" <> tag} {:ok, "hashtag:" <> tag}
end end
# Expand user streams. # Expand user streams.
def get_topic(stream, %User{} = user, _) when stream in @user_streams do def get_topic(
{:ok, stream <> ":" <> to_string(user.id)} stream,
%User{id: user_id} = user,
%Token{user_id: token_user_id} = oauth_token,
_params
)
when stream in @user_streams and user_id == token_user_id do
# Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
required_scopes =
if stream == "user:notification" do
["read:notifications"]
else
["read:statuses"]
end
if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
{:error, :unauthorized}
else
{:ok, stream <> ":" <> to_string(user.id)}
end
end end
def get_topic(stream, _, _) when stream in @user_streams do def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
{:error, :unauthorized} {:error, :unauthorized}
end end
# List streams. # List streams.
def get_topic("list", %User{} = user, %{"list" => id}) do def get_topic(
if Pleroma.List.get(id, user) do "list",
{:ok, "list:" <> to_string(id)} %User{id: user_id} = user,
else %Token{user_id: token_user_id} = oauth_token,
{:error, :bad_topic} %{"list" => id}
)
when user_id == token_user_id do
cond do
OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
{:error, :unauthorized}
Pleroma.List.get(id, user) ->
{:ok, "list:" <> to_string(id)}
true ->
{:error, :bad_topic}
end end
end end
def get_topic("list", _, _) do def get_topic("list", _user, _oauth_token, _params) do
{:error, :unauthorized} {:error, :unauthorized}
end end
def get_topic(_, _, _) do def get_topic(_stream, _user, _oauth_token, _params) do
{:error, :bad_topic} {:error, :bad_topic}
end end

View File

@ -78,7 +78,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
Pleroma.Repo.insert( Pleroma.Repo.insert(
OAuth.App.register_changeset(%OAuth.App{}, %{ OAuth.App.register_changeset(%OAuth.App{}, %{
client_name: "client", client_name: "client",
scopes: ["scope"], scopes: ["read"],
redirect_uris: "url" redirect_uris: "url"
}) })
) )

View File

@ -179,17 +179,19 @@ defmodule Pleroma.NotificationTest do
describe "create_notification" do describe "create_notification" do
@tag needs_streamer: true @tag needs_streamer: true
test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do
user = insert(:user) %{user: user, token: oauth_token} = oauth_access(["read"])
task = task =
Task.async(fn -> Task.async(fn ->
Streamer.get_topic_and_add_socket("user", user) {:ok, _topic} = Streamer.get_topic_and_add_socket("user", user, oauth_token)
assert_receive {:render_with_user, _, _, _}, 4_000 assert_receive {:render_with_user, _, _, _}, 4_000
end) end)
task_user_notification = task_user_notification =
Task.async(fn -> Task.async(fn ->
Streamer.get_topic_and_add_socket("user:notification", user) {:ok, _topic} =
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
assert_receive {:render_with_user, _, _, _}, 4_000 assert_receive {:render_with_user, _, _, _}, 4_000
end) end)

View File

@ -27,6 +27,21 @@ defmodule Pleroma.DataCase do
import Ecto.Query import Ecto.Query
import Pleroma.DataCase import Pleroma.DataCase
use Pleroma.Tests.Helpers use Pleroma.Tests.Helpers
# Sets up OAuth access with specified scopes
defp oauth_access(scopes, opts \\ []) do
user =
Keyword.get_lazy(opts, :user, fn ->
Pleroma.Factory.insert(:user)
end)
token =
Keyword.get_lazy(opts, :oauth_token, fn ->
Pleroma.Factory.insert(:oauth_token, user: user, scopes: scopes)
end)
%{user: user, token: token}
end
end end
end end

View File

@ -21,92 +21,148 @@ defmodule Pleroma.Web.StreamerTest do
setup do: clear_config([:instance, :skip_thread_containment]) setup do: clear_config([:instance, :skip_thread_containment])
describe "get_topic without an user" do describe "get_topic/_ (unauthenticated)" do
test "allows public" do test "allows public" do
assert {:ok, "public"} = Streamer.get_topic("public", nil) assert {:ok, "public"} = Streamer.get_topic("public", nil, nil)
assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil) assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil, nil)
assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil) assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil, nil)
assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil) assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil, nil)
end end
test "allows hashtag streams" do test "allows hashtag streams" do
assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, %{"tag" => "cofe"}) assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, nil, %{"tag" => "cofe"})
end end
test "disallows user streams" do test "disallows user streams" do
assert {:error, _} = Streamer.get_topic("user", nil) assert {:error, _} = Streamer.get_topic("user", nil, nil)
assert {:error, _} = Streamer.get_topic("user:notification", nil) assert {:error, _} = Streamer.get_topic("user:notification", nil, nil)
assert {:error, _} = Streamer.get_topic("direct", nil) assert {:error, _} = Streamer.get_topic("direct", nil, nil)
end end
test "disallows list streams" do test "disallows list streams" do
assert {:error, _} = Streamer.get_topic("list", nil, %{"list" => 42}) assert {:error, _} = Streamer.get_topic("list", nil, nil, %{"list" => 42})
end end
end end
describe "get_topic with an user" do describe "get_topic/_ (authenticated)" do
setup do setup do: oauth_access(["read"])
user = insert(:user)
{:ok, %{user: user}} test "allows public streams (regardless of OAuth token scopes)", %{
user: user,
token: read_oauth_token
} do
with oauth_token <- [nil, read_oauth_token] do
assert {:ok, "public"} = Streamer.get_topic("public", user, oauth_token)
assert {:ok, "public:local"} = Streamer.get_topic("public:local", user, oauth_token)
assert {:ok, "public:media"} = Streamer.get_topic("public:media", user, oauth_token)
assert {:ok, "public:local:media"} =
Streamer.get_topic("public:local:media", user, oauth_token)
end
end end
test "allows public streams", %{user: user} do test "allows user streams (with proper OAuth token scopes)", %{
assert {:ok, "public"} = Streamer.get_topic("public", user) user: user,
assert {:ok, "public:local"} = Streamer.get_topic("public:local", user) token: read_oauth_token
assert {:ok, "public:media"} = Streamer.get_topic("public:media", user) } do
assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", user) %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
end %{token: read_statuses_token} = oauth_access(["read:statuses"], user: user)
%{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
test "allows user streams", %{user: user} do
expected_user_topic = "user:#{user.id}" expected_user_topic = "user:#{user.id}"
expected_notif_topic = "user:notification:#{user.id}" expected_notification_topic = "user:notification:#{user.id}"
expected_direct_topic = "direct:#{user.id}" expected_direct_topic = "direct:#{user.id}"
assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user) expected_pleroma_chat_topic = "user:pleroma_chat:#{user.id}"
assert {:ok, ^expected_notif_topic} = Streamer.get_topic("user:notification", user)
assert {:ok, ^expected_direct_topic} = Streamer.get_topic("direct", user) for valid_user_token <- [read_oauth_token, read_statuses_token] do
assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user, valid_user_token)
assert {:ok, ^expected_direct_topic} =
Streamer.get_topic("direct", user, valid_user_token)
assert {:ok, ^expected_pleroma_chat_topic} =
Streamer.get_topic("user:pleroma_chat", user, valid_user_token)
end
for invalid_user_token <- [read_notifications_token, badly_scoped_token],
user_topic <- ["user", "direct", "user:pleroma_chat"] do
assert {:error, :unauthorized} = Streamer.get_topic(user_topic, user, invalid_user_token)
end
for valid_notification_token <- [read_oauth_token, read_notifications_token] do
assert {:ok, ^expected_notification_topic} =
Streamer.get_topic("user:notification", user, valid_notification_token)
end
for invalid_notification_token <- [read_statuses_token, badly_scoped_token] do
assert {:error, :unauthorized} =
Streamer.get_topic("user:notification", user, invalid_notification_token)
end
end end
test "allows hashtag streams", %{user: user} do test "allows hashtag streams (regardless of OAuth token scopes)", %{
assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", user, %{"tag" => "cofe"}) user: user,
token: read_oauth_token
} do
for oauth_token <- [nil, read_oauth_token] do
assert {:ok, "hashtag:cofe"} =
Streamer.get_topic("hashtag", user, oauth_token, %{"tag" => "cofe"})
end
end end
test "disallows registering to an user stream", %{user: user} do test "disallows registering to another user's stream", %{user: user, token: read_oauth_token} do
another_user = insert(:user) another_user = insert(:user)
assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user) assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user, read_oauth_token)
assert {:error, _} = Streamer.get_topic("user:notification:#{another_user.id}", user)
assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user) assert {:error, _} =
Streamer.get_topic("user:notification:#{another_user.id}", user, read_oauth_token)
assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user, read_oauth_token)
end end
test "allows list stream that are owned by the user", %{user: user} do test "allows list stream that are owned by the user (with `read` or `read:lists` scopes)", %{
user: user,
token: read_oauth_token
} do
%{token: read_lists_token} = oauth_access(["read:lists"], user: user)
%{token: invalid_token} = oauth_access(["irrelevant:scope"], user: user)
{:ok, list} = List.create("Test", user) {:ok, list} = List.create("Test", user)
assert {:error, _} = Streamer.get_topic("list:#{list.id}", user)
assert {:ok, _} = Streamer.get_topic("list", user, %{"list" => list.id}) assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, read_oauth_token)
for valid_token <- [read_oauth_token, read_lists_token] do
assert {:ok, _} = Streamer.get_topic("list", user, valid_token, %{"list" => list.id})
end
assert {:error, _} = Streamer.get_topic("list", user, invalid_token, %{"list" => list.id})
end end
test "disallows list stream that are not owned by the user", %{user: user} do test "disallows list stream that are not owned by the user", %{user: user, token: oauth_token} do
another_user = insert(:user) another_user = insert(:user)
{:ok, list} = List.create("Test", another_user) {:ok, list} = List.create("Test", another_user)
assert {:error, _} = Streamer.get_topic("list:#{list.id}", user)
assert {:error, _} = Streamer.get_topic("list", user, %{"list" => list.id}) assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, oauth_token)
assert {:error, _} = Streamer.get_topic("list", user, oauth_token, %{"list" => list.id})
end end
end end
describe "user streams" do describe "user streams" do
setup do setup do
user = insert(:user) %{user: user, token: token} = oauth_access(["read"])
notify = insert(:notification, user: user, activity: build(:note_activity)) notify = insert(:notification, user: user, activity: build(:note_activity))
{:ok, %{user: user, notify: notify}} {:ok, %{user: user, notify: notify, token: token}}
end end
test "it streams the user's post in the 'user' stream", %{user: user} do test "it streams the user's post in the 'user' stream", %{user: user, token: oauth_token} do
Streamer.get_topic_and_add_socket("user", user) Streamer.get_topic_and_add_socket("user", user, oauth_token)
{:ok, activity} = CommonAPI.post(user, %{status: "hey"}) {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity}
refute Streamer.filtered_by_user?(user, activity) refute Streamer.filtered_by_user?(user, activity)
end end
test "it streams boosts of the user in the 'user' stream", %{user: user} do test "it streams boosts of the user in the 'user' stream", %{user: user, token: oauth_token} do
Streamer.get_topic_and_add_socket("user", user) Streamer.get_topic_and_add_socket("user", user, oauth_token)
other_user = insert(:user) other_user = insert(:user)
{:ok, activity} = CommonAPI.post(other_user, %{status: "hey"}) {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
@ -117,9 +173,10 @@ defmodule Pleroma.Web.StreamerTest do
end end
test "it does not stream announces of the user's own posts in the 'user' stream", %{ test "it does not stream announces of the user's own posts in the 'user' stream", %{
user: user user: user,
token: oauth_token
} do } do
Streamer.get_topic_and_add_socket("user", user) Streamer.get_topic_and_add_socket("user", user, oauth_token)
other_user = insert(:user) other_user = insert(:user)
{:ok, activity} = CommonAPI.post(user, %{status: "hey"}) {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
@ -129,9 +186,10 @@ defmodule Pleroma.Web.StreamerTest do
end end
test "it does stream notifications announces of the user's own posts in the 'user' stream", %{ test "it does stream notifications announces of the user's own posts in the 'user' stream", %{
user: user user: user,
token: oauth_token
} do } do
Streamer.get_topic_and_add_socket("user", user) Streamer.get_topic_and_add_socket("user", user, oauth_token)
other_user = insert(:user) other_user = insert(:user)
{:ok, activity} = CommonAPI.post(user, %{status: "hey"}) {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
@ -145,8 +203,11 @@ defmodule Pleroma.Web.StreamerTest do
refute Streamer.filtered_by_user?(user, notification) refute Streamer.filtered_by_user?(user, notification)
end end
test "it streams boosts of mastodon user in the 'user' stream", %{user: user} do test "it streams boosts of mastodon user in the 'user' stream", %{
Streamer.get_topic_and_add_socket("user", user) user: user,
token: oauth_token
} do
Streamer.get_topic_and_add_socket("user", user, oauth_token)
other_user = insert(:user) other_user = insert(:user)
{:ok, activity} = CommonAPI.post(other_user, %{status: "hey"}) {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
@ -164,21 +225,34 @@ defmodule Pleroma.Web.StreamerTest do
refute Streamer.filtered_by_user?(user, announce) refute Streamer.filtered_by_user?(user, announce)
end end
test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do test "it sends notify to in the 'user' stream", %{
Streamer.get_topic_and_add_socket("user", user) user: user,
token: oauth_token,
notify: notify
} do
Streamer.get_topic_and_add_socket("user", user, oauth_token)
Streamer.stream("user", notify) Streamer.stream("user", notify)
assert_receive {:render_with_user, _, _, ^notify} assert_receive {:render_with_user, _, _, ^notify}
refute Streamer.filtered_by_user?(user, notify) refute Streamer.filtered_by_user?(user, notify)
end end
test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do test "it sends notify to in the 'user:notification' stream", %{
Streamer.get_topic_and_add_socket("user:notification", user) user: user,
token: oauth_token,
notify: notify
} do
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
Streamer.stream("user:notification", notify) Streamer.stream("user:notification", notify)
assert_receive {:render_with_user, _, _, ^notify} assert_receive {:render_with_user, _, _, ^notify}
refute Streamer.filtered_by_user?(user, notify) refute Streamer.filtered_by_user?(user, notify)
end end
test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} do test "it sends chat messages to the 'user:pleroma_chat' stream", %{
user: user,
token: oauth_token
} do
other_user = insert(:user) other_user = insert(:user)
{:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno") {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
@ -187,7 +261,7 @@ defmodule Pleroma.Web.StreamerTest do
cm_ref = MessageReference.for_chat_and_object(chat, object) cm_ref = MessageReference.for_chat_and_object(chat, object)
cm_ref = %{cm_ref | chat: chat, object: object} cm_ref = %{cm_ref | chat: chat, object: object}
Streamer.get_topic_and_add_socket("user:pleroma_chat", user) Streamer.get_topic_and_add_socket("user:pleroma_chat", user, oauth_token)
Streamer.stream("user:pleroma_chat", {user, cm_ref}) Streamer.stream("user:pleroma_chat", {user, cm_ref})
text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
@ -196,7 +270,7 @@ defmodule Pleroma.Web.StreamerTest do
assert_receive {:text, ^text} assert_receive {:text, ^text}
end end
test "it sends chat messages to the 'user' stream", %{user: user} do test "it sends chat messages to the 'user' stream", %{user: user, token: oauth_token} do
other_user = insert(:user) other_user = insert(:user)
{:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno") {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
@ -205,7 +279,7 @@ defmodule Pleroma.Web.StreamerTest do
cm_ref = MessageReference.for_chat_and_object(chat, object) cm_ref = MessageReference.for_chat_and_object(chat, object)
cm_ref = %{cm_ref | chat: chat, object: object} cm_ref = %{cm_ref | chat: chat, object: object}
Streamer.get_topic_and_add_socket("user", user) Streamer.get_topic_and_add_socket("user", user, oauth_token)
Streamer.stream("user", {user, cm_ref}) Streamer.stream("user", {user, cm_ref})
text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
@ -214,7 +288,10 @@ defmodule Pleroma.Web.StreamerTest do
assert_receive {:text, ^text} assert_receive {:text, ^text}
end end
test "it sends chat message notifications to the 'user:notification' stream", %{user: user} do test "it sends chat message notifications to the 'user:notification' stream", %{
user: user,
token: oauth_token
} do
other_user = insert(:user) other_user = insert(:user)
{:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey") {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
@ -223,19 +300,21 @@ defmodule Pleroma.Web.StreamerTest do
Repo.get_by(Pleroma.Notification, user_id: user.id, activity_id: create_activity.id) Repo.get_by(Pleroma.Notification, user_id: user.id, activity_id: create_activity.id)
|> Repo.preload(:activity) |> Repo.preload(:activity)
Streamer.get_topic_and_add_socket("user:notification", user) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
Streamer.stream("user:notification", notify) Streamer.stream("user:notification", notify)
assert_receive {:render_with_user, _, _, ^notify} assert_receive {:render_with_user, _, _, ^notify}
refute Streamer.filtered_by_user?(user, notify) refute Streamer.filtered_by_user?(user, notify)
end end
test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{ test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
user: user user: user,
token: oauth_token
} do } do
blocked = insert(:user) blocked = insert(:user)
{:ok, _user_relationship} = User.block(user, blocked) {:ok, _user_relationship} = User.block(user, blocked)
Streamer.get_topic_and_add_socket("user:notification", user) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
{:ok, activity} = CommonAPI.post(user, %{status: ":("}) {:ok, activity} = CommonAPI.post(user, %{status: ":("})
{:ok, _} = CommonAPI.favorite(blocked, activity.id) {:ok, _} = CommonAPI.favorite(blocked, activity.id)
@ -244,14 +323,15 @@ defmodule Pleroma.Web.StreamerTest do
end end
test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{ test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
user: user user: user,
token: oauth_token
} do } do
user2 = insert(:user) user2 = insert(:user)
{:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
{:ok, _} = CommonAPI.add_mute(user, activity) {:ok, _} = CommonAPI.add_mute(user, activity)
Streamer.get_topic_and_add_socket("user:notification", user) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
{:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
@ -260,12 +340,13 @@ defmodule Pleroma.Web.StreamerTest do
end end
test "it sends favorite to 'user:notification' stream'", %{ test "it sends favorite to 'user:notification' stream'", %{
user: user user: user,
token: oauth_token
} do } do
user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"}) user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
{:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
Streamer.get_topic_and_add_socket("user:notification", user) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
{:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
assert_receive {:render_with_user, _, "notification.json", notif} assert_receive {:render_with_user, _, "notification.json", notif}
@ -274,13 +355,14 @@ defmodule Pleroma.Web.StreamerTest do
end end
test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{ test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{
user: user user: user,
token: oauth_token
} do } do
user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"}) user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
{:ok, user} = User.block_domain(user, "hecking-lewd-place.com") {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
{:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
Streamer.get_topic_and_add_socket("user:notification", user) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
{:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
refute_receive _ refute_receive _
@ -288,7 +370,8 @@ defmodule Pleroma.Web.StreamerTest do
end end
test "it sends follow activities to the 'user:notification' stream", %{ test "it sends follow activities to the 'user:notification' stream", %{
user: user user: user,
token: oauth_token
} do } do
user_url = user.ap_id user_url = user.ap_id
user2 = insert(:user) user2 = insert(:user)
@ -303,7 +386,7 @@ defmodule Pleroma.Web.StreamerTest do
%Tesla.Env{status: 200, body: body} %Tesla.Env{status: 200, body: body}
end) end)
Streamer.get_topic_and_add_socket("user:notification", user) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
{:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user) {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user)
assert_receive {:render_with_user, _, "notification.json", notif} assert_receive {:render_with_user, _, "notification.json", notif}
@ -312,51 +395,53 @@ defmodule Pleroma.Web.StreamerTest do
end end
end end
test "it sends to public authenticated" do describe "public streams" do
user = insert(:user) test "it sends to public (authenticated)" do
other_user = insert(:user) %{user: user, token: oauth_token} = oauth_access(["read"])
other_user = insert(:user)
Streamer.get_topic_and_add_socket("public", other_user) Streamer.get_topic_and_add_socket("public", user, oauth_token)
{:ok, activity} = CommonAPI.post(user, %{status: "Test"}) {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity}
refute Streamer.filtered_by_user?(user, activity) refute Streamer.filtered_by_user?(other_user, activity)
end
test "it sends to public (unauthenticated)" do
user = insert(:user)
Streamer.get_topic_and_add_socket("public", nil, nil)
{:ok, activity} = CommonAPI.post(user, %{status: "Test"})
activity_id = activity.id
assert_receive {:text, event}
assert %{"event" => "update", "payload" => payload} = Jason.decode!(event)
assert %{"id" => ^activity_id} = Jason.decode!(payload)
{:ok, _} = CommonAPI.delete(activity.id, user)
assert_receive {:text, event}
assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
end
test "handles deletions" do
%{user: user, token: oauth_token} = oauth_access(["read"])
other_user = insert(:user)
{:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
Streamer.get_topic_and_add_socket("public", user, oauth_token)
{:ok, _} = CommonAPI.delete(activity.id, other_user)
activity_id = activity.id
assert_receive {:text, event}
assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
end
end end
test "works for deletions" do describe "thread_containment/2" do
user = insert(:user)
other_user = insert(:user)
{:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
Streamer.get_topic_and_add_socket("public", user)
{:ok, _} = CommonAPI.delete(activity.id, other_user)
activity_id = activity.id
assert_receive {:text, event}
assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
end
test "it sends to public unauthenticated" do
user = insert(:user)
Streamer.get_topic_and_add_socket("public", nil)
{:ok, activity} = CommonAPI.post(user, %{status: "Test"})
activity_id = activity.id
assert_receive {:text, event}
assert %{"event" => "update", "payload" => payload} = Jason.decode!(event)
assert %{"id" => ^activity_id} = Jason.decode!(payload)
{:ok, _} = CommonAPI.delete(activity.id, user)
assert_receive {:text, event}
assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
end
describe "thread_containment" do
test "it filters to user if recipients invalid and thread containment is enabled" do test "it filters to user if recipients invalid and thread containment is enabled" do
Pleroma.Config.put([:instance, :skip_thread_containment], false) Pleroma.Config.put([:instance, :skip_thread_containment], false)
author = insert(:user) author = insert(:user)
user = insert(:user) %{user: user, token: oauth_token} = oauth_access(["read"])
User.follow(user, author, :follow_accept) User.follow(user, author, :follow_accept)
activity = activity =
@ -368,7 +453,7 @@ defmodule Pleroma.Web.StreamerTest do
) )
) )
Streamer.get_topic_and_add_socket("public", user) Streamer.get_topic_and_add_socket("public", user, oauth_token)
Streamer.stream("public", activity) Streamer.stream("public", activity)
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity}
assert Streamer.filtered_by_user?(user, activity) assert Streamer.filtered_by_user?(user, activity)
@ -377,7 +462,7 @@ defmodule Pleroma.Web.StreamerTest do
test "it sends message if recipients invalid and thread containment is disabled" do test "it sends message if recipients invalid and thread containment is disabled" do
Pleroma.Config.put([:instance, :skip_thread_containment], true) Pleroma.Config.put([:instance, :skip_thread_containment], true)
author = insert(:user) author = insert(:user)
user = insert(:user) %{user: user, token: oauth_token} = oauth_access(["read"])
User.follow(user, author, :follow_accept) User.follow(user, author, :follow_accept)
activity = activity =
@ -389,7 +474,7 @@ defmodule Pleroma.Web.StreamerTest do
) )
) )
Streamer.get_topic_and_add_socket("public", user) Streamer.get_topic_and_add_socket("public", user, oauth_token)
Streamer.stream("public", activity) Streamer.stream("public", activity)
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity}
@ -400,6 +485,7 @@ defmodule Pleroma.Web.StreamerTest do
Pleroma.Config.put([:instance, :skip_thread_containment], false) Pleroma.Config.put([:instance, :skip_thread_containment], false)
author = insert(:user) author = insert(:user)
user = insert(:user, skip_thread_containment: true) user = insert(:user, skip_thread_containment: true)
%{token: oauth_token} = oauth_access(["read"], user: user)
User.follow(user, author, :follow_accept) User.follow(user, author, :follow_accept)
activity = activity =
@ -411,7 +497,7 @@ defmodule Pleroma.Web.StreamerTest do
) )
) )
Streamer.get_topic_and_add_socket("public", user) Streamer.get_topic_and_add_socket("public", user, oauth_token)
Streamer.stream("public", activity) Streamer.stream("public", activity)
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity}
@ -420,23 +506,26 @@ defmodule Pleroma.Web.StreamerTest do
end end
describe "blocks" do describe "blocks" do
test "it filters messages involving blocked users" do setup do: oauth_access(["read"])
user = insert(:user)
test "it filters messages involving blocked users", %{user: user, token: oauth_token} do
blocked_user = insert(:user) blocked_user = insert(:user)
{:ok, _user_relationship} = User.block(user, blocked_user) {:ok, _user_relationship} = User.block(user, blocked_user)
Streamer.get_topic_and_add_socket("public", user) Streamer.get_topic_and_add_socket("public", user, oauth_token)
{:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"}) {:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"})
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity}
assert Streamer.filtered_by_user?(user, activity) assert Streamer.filtered_by_user?(user, activity)
end end
test "it filters messages transitively involving blocked users" do test "it filters messages transitively involving blocked users", %{
blocker = insert(:user) user: blocker,
token: blocker_token
} do
blockee = insert(:user) blockee = insert(:user)
friend = insert(:user) friend = insert(:user)
Streamer.get_topic_and_add_socket("public", blocker) Streamer.get_topic_and_add_socket("public", blocker, blocker_token)
{:ok, _user_relationship} = User.block(blocker, blockee) {:ok, _user_relationship} = User.block(blocker, blockee)
@ -458,8 +547,9 @@ defmodule Pleroma.Web.StreamerTest do
end end
describe "lists" do describe "lists" do
test "it doesn't send unwanted DMs to list" do setup do: oauth_access(["read"])
user_a = insert(:user)
test "it doesn't send unwanted DMs to list", %{user: user_a, token: user_a_token} do
user_b = insert(:user) user_b = insert(:user)
user_c = insert(:user) user_c = insert(:user)
@ -468,7 +558,7 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, list} = List.create("Test", user_a) {:ok, list} = List.create("Test", user_a)
{:ok, list} = List.follow(list, user_b) {:ok, list} = List.follow(list, user_b)
Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id}) Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
{:ok, _activity} = {:ok, _activity} =
CommonAPI.post(user_b, %{ CommonAPI.post(user_b, %{
@ -479,14 +569,13 @@ defmodule Pleroma.Web.StreamerTest do
refute_receive _ refute_receive _
end end
test "it doesn't send unwanted private posts to list" do test "it doesn't send unwanted private posts to list", %{user: user_a, token: user_a_token} do
user_a = insert(:user)
user_b = insert(:user) user_b = insert(:user)
{:ok, list} = List.create("Test", user_a) {:ok, list} = List.create("Test", user_a)
{:ok, list} = List.follow(list, user_b) {:ok, list} = List.follow(list, user_b)
Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id}) Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
{:ok, _activity} = {:ok, _activity} =
CommonAPI.post(user_b, %{ CommonAPI.post(user_b, %{
@ -497,8 +586,7 @@ defmodule Pleroma.Web.StreamerTest do
refute_receive _ refute_receive _
end end
test "it sends wanted private posts to list" do test "it sends wanted private posts to list", %{user: user_a, token: user_a_token} do
user_a = insert(:user)
user_b = insert(:user) user_b = insert(:user)
{:ok, user_a} = User.follow(user_a, user_b) {:ok, user_a} = User.follow(user_a, user_b)
@ -506,7 +594,7 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, list} = List.create("Test", user_a) {:ok, list} = List.create("Test", user_a)
{:ok, list} = List.follow(list, user_b) {:ok, list} = List.follow(list, user_b)
Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id}) Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
{:ok, activity} = {:ok, activity} =
CommonAPI.post(user_b, %{ CommonAPI.post(user_b, %{
@ -520,8 +608,9 @@ defmodule Pleroma.Web.StreamerTest do
end end
describe "muted reblogs" do describe "muted reblogs" do
test "it filters muted reblogs" do setup do: oauth_access(["read"])
user1 = insert(:user)
test "it filters muted reblogs", %{user: user1, token: user1_token} do
user2 = insert(:user) user2 = insert(:user)
user3 = insert(:user) user3 = insert(:user)
CommonAPI.follow(user1, user2) CommonAPI.follow(user1, user2)
@ -529,34 +618,38 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, create_activity} = CommonAPI.post(user3, %{status: "I'm kawen"}) {:ok, create_activity} = CommonAPI.post(user3, %{status: "I'm kawen"})
Streamer.get_topic_and_add_socket("user", user1) Streamer.get_topic_and_add_socket("user", user1, user1_token)
{:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2) {:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2)
assert_receive {:render_with_user, _, _, ^announce_activity} assert_receive {:render_with_user, _, _, ^announce_activity}
assert Streamer.filtered_by_user?(user1, announce_activity) assert Streamer.filtered_by_user?(user1, announce_activity)
end end
test "it filters reblog notification for reblog-muted actors" do test "it filters reblog notification for reblog-muted actors", %{
user1 = insert(:user) user: user1,
token: user1_token
} do
user2 = insert(:user) user2 = insert(:user)
CommonAPI.follow(user1, user2) CommonAPI.follow(user1, user2)
CommonAPI.hide_reblogs(user1, user2) CommonAPI.hide_reblogs(user1, user2)
{:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"}) {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
Streamer.get_topic_and_add_socket("user", user1) Streamer.get_topic_and_add_socket("user", user1, user1_token)
{:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2) {:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2)
assert_receive {:render_with_user, _, "notification.json", notif} assert_receive {:render_with_user, _, "notification.json", notif}
assert Streamer.filtered_by_user?(user1, notif) assert Streamer.filtered_by_user?(user1, notif)
end end
test "it send non-reblog notification for reblog-muted actors" do test "it send non-reblog notification for reblog-muted actors", %{
user1 = insert(:user) user: user1,
token: user1_token
} do
user2 = insert(:user) user2 = insert(:user)
CommonAPI.follow(user1, user2) CommonAPI.follow(user1, user2)
CommonAPI.hide_reblogs(user1, user2) CommonAPI.hide_reblogs(user1, user2)
{:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"}) {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
Streamer.get_topic_and_add_socket("user", user1) Streamer.get_topic_and_add_socket("user", user1, user1_token)
{:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id) {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
assert_receive {:render_with_user, _, "notification.json", notif} assert_receive {:render_with_user, _, "notification.json", notif}
@ -564,27 +657,28 @@ defmodule Pleroma.Web.StreamerTest do
end end
end end
test "it filters posts from muted threads" do describe "muted threads" do
user = insert(:user) test "it filters posts from muted threads" do
user2 = insert(:user) user = insert(:user)
Streamer.get_topic_and_add_socket("user", user2) %{user: user2, token: user2_token} = oauth_access(["read"])
{:ok, user2, user, _activity} = CommonAPI.follow(user2, user) Streamer.get_topic_and_add_socket("user", user2, user2_token)
{:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
{:ok, _} = CommonAPI.add_mute(user2, activity) {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
assert_receive {:render_with_user, _, _, ^activity} {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
assert Streamer.filtered_by_user?(user2, activity) {:ok, _} = CommonAPI.add_mute(user2, activity)
assert_receive {:render_with_user, _, _, ^activity}
assert Streamer.filtered_by_user?(user2, activity)
end
end end
describe "direct streams" do describe "direct streams" do
setup do setup do: oauth_access(["read"])
:ok
end
test "it sends conversation update to the 'direct' stream", %{} do test "it sends conversation update to the 'direct' stream", %{user: user, token: oauth_token} do
user = insert(:user)
another_user = insert(:user) another_user = insert(:user)
Streamer.get_topic_and_add_socket("direct", user) Streamer.get_topic_and_add_socket("direct", user, oauth_token)
{:ok, _create_activity} = {:ok, _create_activity} =
CommonAPI.post(another_user, %{ CommonAPI.post(another_user, %{
@ -602,11 +696,11 @@ defmodule Pleroma.Web.StreamerTest do
assert last_status["pleroma"]["direct_conversation_id"] == participation.id assert last_status["pleroma"]["direct_conversation_id"] == participation.id
end end
test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted",
user = insert(:user) %{user: user, token: oauth_token} do
another_user = insert(:user) another_user = insert(:user)
Streamer.get_topic_and_add_socket("direct", user) Streamer.get_topic_and_add_socket("direct", user, oauth_token)
{:ok, create_activity} = {:ok, create_activity} =
CommonAPI.post(another_user, %{ CommonAPI.post(another_user, %{
@ -629,10 +723,12 @@ defmodule Pleroma.Web.StreamerTest do
refute_receive _ refute_receive _
end end
test "it sends conversation update to the 'direct' stream when a message is deleted" do test "it sends conversation update to the 'direct' stream when a message is deleted", %{
user = insert(:user) user: user,
token: oauth_token
} do
another_user = insert(:user) another_user = insert(:user)
Streamer.get_topic_and_add_socket("direct", user) Streamer.get_topic_and_add_socket("direct", user, oauth_token)
{:ok, create_activity} = {:ok, create_activity} =
CommonAPI.post(another_user, %{ CommonAPI.post(another_user, %{