928bda2e43
Old way was wrong for multiple reasons. If we do this as a config value it fixes :slave.start/3 being picked up as a compile warning on OTP26. Also if we want to do any real clustering we'll need something like this to support OTP25 and older.
236 lines
7.4 KiB
Elixir
236 lines
7.4 KiB
Elixir
# Pleroma: A lightweight social networking server
|
|
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
|
|
# SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
defmodule Pleroma.Cluster do
|
|
@moduledoc """
|
|
Facilities for managing a cluster of slave VM's for federated testing.
|
|
|
|
## Spawning the federated cluster
|
|
|
|
`spawn_cluster/1` spawns a map of slave nodes that are started
|
|
within the running VM. During startup, the slave node is sent all configuration
|
|
from the parent node, as well as all code. After receiving configuration and
|
|
code, the slave then starts all applications currently running on the parent.
|
|
The configuration passed to `spawn_cluster/1` overrides any parent application
|
|
configuration for the provided OTP app and key. This is useful for customizing
|
|
the Ecto database, Phoenix webserver ports, etc.
|
|
|
|
For example, to start a single federated VM named ":federated1", with the
|
|
Pleroma Endpoint running on port 4123, and with a database named
|
|
"pleroma_test1", you would run:
|
|
|
|
endpoint_conf = Application.fetch_env!(:pleroma, Pleroma.Web.Endpoint)
|
|
repo_conf = Application.fetch_env!(:pleroma, Pleroma.Repo)
|
|
|
|
Pleroma.Cluster.spawn_cluster(%{
|
|
:"federated1@127.0.0.1" => [
|
|
{:pleroma, Pleroma.Repo, Keyword.merge(repo_conf, database: "pleroma_test1")},
|
|
{:pleroma, Pleroma.Web.Endpoint,
|
|
Keyword.merge(endpoint_conf, http: [port: 4011], url: [port: 4011], server: true)}
|
|
]
|
|
})
|
|
|
|
*Note*: application configuration for a given key is not merged,
|
|
so any customization requires first fetching the existing values
|
|
and merging yourself by providing the merged configuration,
|
|
such as above with the endpoint config and repo config.
|
|
|
|
## Executing code within a remote node
|
|
|
|
Use the `within/2` macro to execute code within the context of a remote
|
|
federated node. The code block captures all local variable bindings from
|
|
the parent's context and returns the result of the expression after executing
|
|
it on the remote node. For example:
|
|
|
|
import Pleroma.Cluster
|
|
|
|
parent_value = 123
|
|
|
|
result =
|
|
within :"federated1@127.0.0.1" do
|
|
{node(), parent_value}
|
|
end
|
|
|
|
assert result == {:"federated1@127.0.0.1, 123}
|
|
|
|
*Note*: while local bindings are captured and available within the block,
|
|
other parent contexts like required, aliased, or imported modules are not
|
|
in scope. Those will need to be reimported/aliases/required within the block
|
|
as `within/2` is a remote procedure call.
|
|
"""
|
|
|
|
@extra_apps Pleroma.Mixfile.application()[:extra_applications]
|
|
|
|
@doc """
|
|
Spawns the default Pleroma federated cluster.
|
|
|
|
Values before may be customized as needed for the test suite.
|
|
"""
|
|
def spawn_default_cluster do
|
|
endpoint_conf = Application.fetch_env!(:pleroma, Pleroma.Web.Endpoint)
|
|
repo_conf = Application.fetch_env!(:pleroma, Pleroma.Repo)
|
|
|
|
spawn_cluster(%{
|
|
:"federated1@127.0.0.1" => [
|
|
{:pleroma, Pleroma.Repo, Keyword.merge(repo_conf, database: "pleroma_test_federated1")},
|
|
{:pleroma, Pleroma.Web.Endpoint,
|
|
Keyword.merge(endpoint_conf, http: [port: 4011], url: [port: 4011], server: true)}
|
|
],
|
|
:"federated2@127.0.0.1" => [
|
|
{:pleroma, Pleroma.Repo, Keyword.merge(repo_conf, database: "pleroma_test_federated2")},
|
|
{:pleroma, Pleroma.Web.Endpoint,
|
|
Keyword.merge(endpoint_conf, http: [port: 4012], url: [port: 4012], server: true)}
|
|
]
|
|
})
|
|
end
|
|
|
|
@doc """
|
|
Spawns a configured map of federated nodes.
|
|
|
|
See `Pleroma.Cluster` module documentation for details.
|
|
"""
|
|
def spawn_cluster(node_configs) do
|
|
# Turn node into a distributed node with the given long name
|
|
:net_kernel.start([:"primary@127.0.0.1"])
|
|
|
|
# Allow spawned nodes to fetch all code from this node
|
|
{:ok, _} = :erl_boot_server.start([])
|
|
allow_boot("127.0.0.1")
|
|
|
|
silence_logger_warnings(fn ->
|
|
node_configs
|
|
|> Enum.map(&Task.async(fn -> start_slave(&1) end))
|
|
|> Enum.map(&Task.await(&1, 90_000))
|
|
end)
|
|
end
|
|
|
|
@doc """
|
|
Executes block of code again remote node.
|
|
|
|
See `Pleroma.Cluster` module documentation for details.
|
|
"""
|
|
defmacro within(node, do: block) do
|
|
quote do
|
|
rpc(unquote(node), unquote(__MODULE__), :eval_quoted, [
|
|
unquote(Macro.escape(block)),
|
|
binding()
|
|
])
|
|
end
|
|
end
|
|
|
|
@doc false
|
|
def eval_quoted(block, binding) do
|
|
{result, _binding} = Code.eval_quoted(block, binding, __ENV__)
|
|
result
|
|
end
|
|
|
|
defp start_slave({node_host, override_configs}) do
|
|
log(node_host, "booting federated VM")
|
|
|
|
{:ok, node} =
|
|
do_start_slave(%{host: "127.0.0.1", name: node_name(node_host), args: vm_args()})
|
|
|
|
add_code_paths(node)
|
|
load_apps_and_transfer_configuration(node, override_configs)
|
|
ensure_apps_started(node)
|
|
{:ok, node}
|
|
end
|
|
|
|
def rpc(node, module, function, args) do
|
|
:rpc.block_call(node, module, function, args)
|
|
end
|
|
|
|
defp vm_args do
|
|
~c"-loader inet -hosts 127.0.0.1 -setcookie #{:erlang.get_cookie()}"
|
|
end
|
|
|
|
defp allow_boot(host) do
|
|
{:ok, ipv4} = :inet.parse_ipv4_address(~c"#{host}")
|
|
:ok = :erl_boot_server.add_slave(ipv4)
|
|
end
|
|
|
|
defp add_code_paths(node) do
|
|
rpc(node, :code, :add_paths, [:code.get_path()])
|
|
end
|
|
|
|
defp load_apps_and_transfer_configuration(node, override_configs) do
|
|
Enum.each(Application.loaded_applications(), fn {app_name, _, _} ->
|
|
app_name
|
|
|> Application.get_all_env()
|
|
|> Enum.each(fn {key, primary_config} ->
|
|
rpc(node, Application, :put_env, [app_name, key, primary_config, [persistent: true]])
|
|
end)
|
|
end)
|
|
|
|
Enum.each(override_configs, fn {app_name, key, val} ->
|
|
rpc(node, Application, :put_env, [app_name, key, val, [persistent: true]])
|
|
end)
|
|
end
|
|
|
|
defp log(node, msg), do: IO.puts("[#{node}] #{msg}")
|
|
|
|
defp ensure_apps_started(node) do
|
|
loaded_names = Enum.map(Application.loaded_applications(), fn {name, _, _} -> name end)
|
|
app_names = @extra_apps ++ (loaded_names -- @extra_apps)
|
|
|
|
rpc(node, Application, :ensure_all_started, [:mix])
|
|
rpc(node, Mix, :env, [Mix.env()])
|
|
rpc(node, __MODULE__, :prepare_database, [])
|
|
|
|
log(node, "starting application")
|
|
|
|
Enum.reduce(app_names, MapSet.new(), fn app, loaded ->
|
|
if Enum.member?(loaded, app) do
|
|
loaded
|
|
else
|
|
{:ok, started} = rpc(node, Application, :ensure_all_started, [app])
|
|
MapSet.union(loaded, MapSet.new(started))
|
|
end
|
|
end)
|
|
end
|
|
|
|
@doc false
|
|
def prepare_database do
|
|
log(node(), "preparing database")
|
|
repo_config = Application.get_env(:pleroma, Pleroma.Repo)
|
|
repo_config[:adapter].storage_down(repo_config)
|
|
repo_config[:adapter].storage_up(repo_config)
|
|
|
|
{:ok, _, _} =
|
|
Ecto.Migrator.with_repo(Pleroma.Repo, fn repo ->
|
|
Ecto.Migrator.run(repo, :up, log: false, all: true)
|
|
end)
|
|
|
|
Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, :manual)
|
|
{:ok, _} = Application.ensure_all_started(:ex_machina)
|
|
end
|
|
|
|
defp silence_logger_warnings(func) do
|
|
prev_level = Logger.level()
|
|
Logger.configure(level: :error)
|
|
res = func.()
|
|
Logger.configure(level: prev_level)
|
|
|
|
res
|
|
end
|
|
|
|
defp node_name(node_host) do
|
|
node_host
|
|
|> to_string()
|
|
|> String.split("@")
|
|
|> Enum.at(0)
|
|
|> String.to_atom()
|
|
end
|
|
|
|
defp do_start_slave(%{host: host, name: name, args: args} = opts) do
|
|
peer_module = Application.get_env(__MODULE__, :peer_module)
|
|
|
|
if peer_module == :peer do
|
|
peer_module.start(opts)
|
|
else
|
|
peer_module.start(host, name, args)
|
|
end
|
|
end
|
|
end
|