Removed modular backend
This commit is contained in:
parent
76e507cf67
commit
a5e73904bc
@ -1,365 +0,0 @@
|
|||||||
#! /usr/bin/env python3
|
|
||||||
|
|
||||||
# Mirai Nikki Bot, a video frame posting bot for Pleroma
|
|
||||||
# Copyright (C) 2022 Anon <yanderefedi@proton.me>
|
|
||||||
#
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
|
||||||
# it under the terms of the GNU General Public License as published by
|
|
||||||
# the Free Software Foundation, either version 3 of the License, or
|
|
||||||
# (at your option) any later version.
|
|
||||||
#
|
|
||||||
# This program is distributed in the hope that it will be useful,
|
|
||||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
# GNU General Public License for more details.
|
|
||||||
#
|
|
||||||
# You should have received a copy of the GNU General Public License
|
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
import os
|
|
||||||
import datetime
|
|
||||||
import random
|
|
||||||
import subprocess
|
|
||||||
from threading import Event
|
|
||||||
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
|
|
||||||
|
|
||||||
|
|
||||||
# A class that contains all of the rendering information for a single post
|
|
||||||
class PickAndRenderFrame:
|
|
||||||
# From Config File
|
|
||||||
profile_name = ""
|
|
||||||
path = ""
|
|
||||||
frames = 0
|
|
||||||
skip = tuple()
|
|
||||||
nsfw = tuple()
|
|
||||||
output_name = ""
|
|
||||||
message = ""
|
|
||||||
message_nsfw = ""
|
|
||||||
render_script = ""
|
|
||||||
|
|
||||||
# After pick
|
|
||||||
picked_frame = None
|
|
||||||
output_name_tr = ""
|
|
||||||
|
|
||||||
def __init__(self, picked, datetime_format):
|
|
||||||
dt_now = datetime.datetime.now()
|
|
||||||
dt_now_str = datetime.datetime.strftime(dt_now, datetime_format)
|
|
||||||
self.profile_name = picked["profile_name"]
|
|
||||||
self.path = picked["path"]
|
|
||||||
self.frames = picked["frames"]
|
|
||||||
self.skip = picked["skip"]
|
|
||||||
self.nsfw = picked["nsfw"]
|
|
||||||
self.output_name = picked["output_name"]
|
|
||||||
self.output_name_tr = self.output_name
|
|
||||||
self.message = picked["message"]
|
|
||||||
self.message_nsfw = picked["message_nsfw"]
|
|
||||||
self.render_script = picked["render_script"]
|
|
||||||
|
|
||||||
self.picked_frame = self._pick_frame()
|
|
||||||
|
|
||||||
# Shell-like substitutions
|
|
||||||
translations = {
|
|
||||||
"profile_name": self.profile_name,
|
|
||||||
"frame": str(self.picked_frame),
|
|
||||||
"datetime": dt_now_str
|
|
||||||
}
|
|
||||||
|
|
||||||
self.output_name_tr = self._translate_basename(translations)
|
|
||||||
self._render_frame()
|
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
render_file = self.output_name_tr
|
|
||||||
if render_file and os.path.isfile(render_file):
|
|
||||||
os.remove(render_file)
|
|
||||||
|
|
||||||
def _pick_frame(self):
|
|
||||||
picked_frame = None
|
|
||||||
while picked_frame is None:
|
|
||||||
picked_frame = random.random() * self.frames
|
|
||||||
for skip in self.skip:
|
|
||||||
begin, end = skip
|
|
||||||
if begin <= picked_frame <= end:
|
|
||||||
picked_frame = None
|
|
||||||
break
|
|
||||||
return picked_frame
|
|
||||||
|
|
||||||
def is_nsfw(self):
|
|
||||||
for nsfw in self.nsfw:
|
|
||||||
begin, end = nsfw
|
|
||||||
if begin <= self.picked_frame <= end:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _translate_basename(self, translations):
|
|
||||||
output_name_tr = self.output_name
|
|
||||||
for k,v in translations.items():
|
|
||||||
replace_token = "${" + k + "}"
|
|
||||||
output_name_tr = output_name_tr.replace(replace_token, v)
|
|
||||||
return output_name_tr
|
|
||||||
|
|
||||||
# TODO: Add translation keywords to the message
|
|
||||||
def get_message(self):
|
|
||||||
return self.message_nsfw if self.is_nsfw() else self.message
|
|
||||||
|
|
||||||
def _render_frame(self):
|
|
||||||
args = [
|
|
||||||
self.render_script,
|
|
||||||
self.path,
|
|
||||||
self.output_name_tr,
|
|
||||||
str(self.picked_frame)
|
|
||||||
]
|
|
||||||
subprocess.run(args)
|
|
||||||
|
|
||||||
|
|
||||||
class YandereBot:
|
|
||||||
# From the threading library. Is responsible for putting the bot to sleep, and exiting when the user quits (Ctrl+C)
|
|
||||||
eventSleep = Event()
|
|
||||||
|
|
||||||
# The configuration module
|
|
||||||
cfg = None
|
|
||||||
|
|
||||||
# The below settings are required from the configuration module
|
|
||||||
settings_server = None
|
|
||||||
settings_behavior = None
|
|
||||||
settings_time = None
|
|
||||||
settings_encrypt = None
|
|
||||||
|
|
||||||
# Class variables
|
|
||||||
mastodon_api = None
|
|
||||||
failed_uploads = 0
|
|
||||||
consecutive_failed_uploads = 0
|
|
||||||
currentSessionCount = 0
|
|
||||||
debug_mode = False
|
|
||||||
primed = False
|
|
||||||
decrypted = False
|
|
||||||
|
|
||||||
# YandereBot.__init__()
|
|
||||||
# @param cfg A dynamically loaded python module
|
|
||||||
# @param keyfile Keyfile to decrypt settings_post
|
|
||||||
# @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma)
|
|
||||||
# @prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post)
|
|
||||||
def __init__(self, cfg, keyfile=None, debug_mode=False, prime_bot=True):
|
|
||||||
self.cfg = cfg
|
|
||||||
self.load_settings(self.cfg)
|
|
||||||
self.debug_mode = debug_mode or self.settings_behavior["debug"]
|
|
||||||
self.settings_encrypt["keyfile"] = keyfile or self.settings_encrypt["keyfile"]
|
|
||||||
if prime_bot:
|
|
||||||
self.prime_bot()
|
|
||||||
|
|
||||||
# Setup Exit Calls
|
|
||||||
def exit(self):
|
|
||||||
self.eventSleep.set()
|
|
||||||
|
|
||||||
# Decryption settings
|
|
||||||
# Used to set class attributes with the same name to the value specified in the config file.
|
|
||||||
def load_settings(self, cfg, settings=None):
|
|
||||||
try:
|
|
||||||
default_settings = (
|
|
||||||
"settings_server",
|
|
||||||
"settings_behavior",
|
|
||||||
"settings_time",
|
|
||||||
"settings_encrypt"
|
|
||||||
)
|
|
||||||
_settings = settings or default_settings
|
|
||||||
for ele in _settings:
|
|
||||||
setattr(self, ele, getattr(cfg, ele))
|
|
||||||
|
|
||||||
except AttributeError as e:
|
|
||||||
print(e)
|
|
||||||
raise BadCfgFile
|
|
||||||
|
|
||||||
def decrypt_settings(self):
|
|
||||||
if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode:
|
|
||||||
import encryption
|
|
||||||
try:
|
|
||||||
self.settings_server = encryption.settings_server_decrypt(
|
|
||||||
self.settings_server, self.settings_encrypt, self.settings_encrypt["keyfile"])
|
|
||||||
except encryption.EncryptionFail as e:
|
|
||||||
raise BadCfgFile(str(e))
|
|
||||||
self.decrypted = True
|
|
||||||
|
|
||||||
# Login to Pleroma
|
|
||||||
def login(self):
|
|
||||||
skip_login = self.debug_mode or self.mastodon_api is not None
|
|
||||||
if skip_login:
|
|
||||||
return
|
|
||||||
if not self.decrypted:
|
|
||||||
self.decrypt_settings()
|
|
||||||
try:
|
|
||||||
self.mastodon_api = Mastodon(
|
|
||||||
client_id=self.settings_server["client_id"],
|
|
||||||
client_secret=self.settings_server["client_secret"],
|
|
||||||
access_token=self.settings_server["access_token"],
|
|
||||||
api_base_url=self.settings_server["api_base_url"],
|
|
||||||
feature_set=self.settings_behavior["feature_set"] # <--- Necessary for Mastodon Version 1.5.1
|
|
||||||
)
|
|
||||||
except (MastodonIllegalArgumentError, MastodonVersionError) as e:
|
|
||||||
print(e)
|
|
||||||
raise FailedLogin
|
|
||||||
|
|
||||||
# Maybe I should remove this from the backend?
|
|
||||||
def print_header_stats(self, picked):
|
|
||||||
profile, frame, nsfw, path = None, None, None, None
|
|
||||||
if picked:
|
|
||||||
profile = picked.profile_name
|
|
||||||
frame = picked.picked_frame
|
|
||||||
nsfw = picked.is_nsfw()
|
|
||||||
path = picked.output_name_tr
|
|
||||||
print("Profile: {} | Frame: {} | NSFW: {} | Path: {}".format(
|
|
||||||
profile, frame, nsfw, path
|
|
||||||
))
|
|
||||||
|
|
||||||
# Returns a list of tuples that contain the media list path and media mastodon dictionary
|
|
||||||
def upload_media_list(self, path_list):
|
|
||||||
media_list = []
|
|
||||||
for path in path_list:
|
|
||||||
if not os.path.isfile(path):
|
|
||||||
raise FileNotFoundError("Could not upload: {}".format(path))
|
|
||||||
|
|
||||||
for path in path_list:
|
|
||||||
if not self.debug_mode:
|
|
||||||
media = self.mastodon_api.media_post(path, description=os.path.basename(path))
|
|
||||||
media_dict = {
|
|
||||||
"path": path,
|
|
||||||
"media": media
|
|
||||||
}
|
|
||||||
media_list.append(media_dict)
|
|
||||||
return media_list
|
|
||||||
|
|
||||||
def get_post_text(self, yandere_frame, media_list):
|
|
||||||
content_type = self.settings_behavior["content_type"]
|
|
||||||
content_newline = self.settings_behavior["content_newline"]
|
|
||||||
static_message = content_newline.join(yandere_frame.get_message())
|
|
||||||
string_post = ""
|
|
||||||
string_imglinks = []
|
|
||||||
|
|
||||||
if media_list and self.settings_behavior["post_image_link"]:
|
|
||||||
for media_dict in media_list:
|
|
||||||
path = media_dict["path"]
|
|
||||||
media = media_dict["media"]
|
|
||||||
|
|
||||||
if path is None or media is None:
|
|
||||||
continue
|
|
||||||
elif content_type == "text/markdown" and not self.debug_mode:
|
|
||||||
string_imglinks.append(
|
|
||||||
"[{}]({})".format(os.path.basename(path), media["url"])
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
string_imglinks.append(media["url"])
|
|
||||||
|
|
||||||
# Join non empty strings with a newline character
|
|
||||||
string_imglinks_joined = content_newline.join(filter(None, string_imglinks))
|
|
||||||
string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined)))
|
|
||||||
return string_post
|
|
||||||
|
|
||||||
def _post(self, yandere_frame):
|
|
||||||
if not yandere_frame:
|
|
||||||
raise InvalidPost("Frame is none")
|
|
||||||
upload_list = [yandere_frame.output_name_tr]
|
|
||||||
media_list = self.upload_media_list(upload_list)
|
|
||||||
message = self.get_post_text(yandere_frame, media_list)
|
|
||||||
if self.debug_mode:
|
|
||||||
return yandere_frame
|
|
||||||
self.mastodon_api.status_post(
|
|
||||||
message,
|
|
||||||
media_ids=[i["media"] for i in media_list],
|
|
||||||
visibility=self.settings_behavior["visibility"],
|
|
||||||
sensitive=yandere_frame.is_nsfw(),
|
|
||||||
content_type=self.settings_behavior["content_type"]
|
|
||||||
)
|
|
||||||
return yandere_frame
|
|
||||||
|
|
||||||
# The main post function
|
|
||||||
# This funciton is responsible for picking a profile, generate a screenshot, and posting it.
|
|
||||||
#
|
|
||||||
# This function should return 'None' if a post failed, and the picked item from self.listPictures if it succeeded.
|
|
||||||
def post(self):
|
|
||||||
picked = None
|
|
||||||
|
|
||||||
# Attempt post
|
|
||||||
try:
|
|
||||||
# Post
|
|
||||||
dt_picked = self.settings_time["datetime"]
|
|
||||||
picked_profile = random.choice(self.cfg.settings_post)
|
|
||||||
picked = PickAndRenderFrame(picked_profile, dt_picked)
|
|
||||||
self._post(picked)
|
|
||||||
|
|
||||||
# After a successful post
|
|
||||||
self.currentSessionCount += 1
|
|
||||||
self.consecutive_failed_uploads = 0
|
|
||||||
|
|
||||||
# The post was successful
|
|
||||||
return picked
|
|
||||||
|
|
||||||
# Server Errors and other general exceptions
|
|
||||||
# Assume all exceptions are on the server side (besides FileNotFoundError of course)
|
|
||||||
# If the connection is timing out it could be for two reasons:
|
|
||||||
# 1. The error was caused by the user attempting to upload a large file over a slow connection:
|
|
||||||
# a. FIX: Reduce settings_behavior["max_size"]
|
|
||||||
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
|
|
||||||
# mastodon.py API will not specify why the connection timed out).
|
|
||||||
# 3. Failed to generate screenshot
|
|
||||||
# 4. Other general exceptions
|
|
||||||
except (FileNotFoundError, MastodonAPIError, InvalidPost, Exception) as e:
|
|
||||||
print("Exception:", e)
|
|
||||||
|
|
||||||
# An exception occurred
|
|
||||||
self.failed_uploads += 1
|
|
||||||
self.consecutive_failed_uploads += 1
|
|
||||||
|
|
||||||
# The post failed
|
|
||||||
return None
|
|
||||||
|
|
||||||
# [BEGIN THE PROGRAM]
|
|
||||||
def prime_bot(self):
|
|
||||||
random.seed(os.urandom(16))
|
|
||||||
if not self.debug_mode:
|
|
||||||
self.decrypt_settings()
|
|
||||||
self.login()
|
|
||||||
self.primed = True
|
|
||||||
|
|
||||||
def start(self, delay=0):
|
|
||||||
# Prime bot if not already primed.
|
|
||||||
if not self.primed:
|
|
||||||
self.prime_bot()
|
|
||||||
|
|
||||||
# Begin posting
|
|
||||||
self.main_loop()
|
|
||||||
|
|
||||||
# Return 1 if there are still pictures in the picture list
|
|
||||||
return 0
|
|
||||||
|
|
||||||
# End Conditions:
|
|
||||||
# 1. User presses Ctrl+C
|
|
||||||
# 2. settings_behavior["uploads_per_post"] is less than uploads_per_post
|
|
||||||
# 3. Consecutive failed uploads is less than max_errors
|
|
||||||
def can_post(self):
|
|
||||||
return (
|
|
||||||
not self.eventSleep.is_set() and
|
|
||||||
self.currentSessionCount < self.settings_behavior["uploads_per_post"] and
|
|
||||||
self.consecutive_failed_uploads < self.settings_behavior["max_errors"]
|
|
||||||
)
|
|
||||||
|
|
||||||
def main_loop(self):
|
|
||||||
sleep_seconds = self.settings_behavior["retry_seconds"]
|
|
||||||
while self.can_post():
|
|
||||||
picked = self.post()
|
|
||||||
self.print_header_stats(picked)
|
|
||||||
if self.can_post():
|
|
||||||
self.eventSleep.wait(sleep_seconds)
|
|
||||||
|
|
||||||
# Custom Exceptions for YandereBot
|
|
||||||
class Debug(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class FailedLogin(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class BadCfgFile(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class InvalidPost(Exception):
|
|
||||||
pass
|
|
Reference in New Issue
Block a user