From a5e73904bc191c714a2da61c0e7457b0cd1e6362 Mon Sep 17 00:00:00 2001 From: Anon Date: Sun, 12 Mar 2023 17:38:25 -0700 Subject: [PATCH] Removed modular backend --- src/yandere_bot.py | 365 --------------------------------------------- 1 file changed, 365 deletions(-) delete mode 100644 src/yandere_bot.py diff --git a/src/yandere_bot.py b/src/yandere_bot.py deleted file mode 100644 index 5b5a877..0000000 --- a/src/yandere_bot.py +++ /dev/null @@ -1,365 +0,0 @@ -#! /usr/bin/env python3 - -# Mirai Nikki Bot, a video frame posting bot for Pleroma -# Copyright (C) 2022 Anon -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import os -import datetime -import random -import subprocess -from threading import Event -from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError - - -# A class that contains all of the rendering information for a single post -class PickAndRenderFrame: - # From Config File - profile_name = "" - path = "" - frames = 0 - skip = tuple() - nsfw = tuple() - output_name = "" - message = "" - message_nsfw = "" - render_script = "" - - # After pick - picked_frame = None - output_name_tr = "" - - def __init__(self, picked, datetime_format): - dt_now = datetime.datetime.now() - dt_now_str = datetime.datetime.strftime(dt_now, datetime_format) - self.profile_name = picked["profile_name"] - self.path = picked["path"] - self.frames = picked["frames"] - self.skip = picked["skip"] - self.nsfw = picked["nsfw"] - self.output_name = picked["output_name"] - self.output_name_tr = self.output_name - self.message = picked["message"] - self.message_nsfw = picked["message_nsfw"] - self.render_script = picked["render_script"] - - self.picked_frame = self._pick_frame() - - # Shell-like substitutions - translations = { - "profile_name": self.profile_name, - "frame": str(self.picked_frame), - "datetime": dt_now_str - } - - self.output_name_tr = self._translate_basename(translations) - self._render_frame() - - def __del__(self): - render_file = self.output_name_tr - if render_file and os.path.isfile(render_file): - os.remove(render_file) - - def _pick_frame(self): - picked_frame = None - while picked_frame is None: - picked_frame = random.random() * self.frames - for skip in self.skip: - begin, end = skip - if begin <= picked_frame <= end: - picked_frame = None - break - return picked_frame - - def is_nsfw(self): - for nsfw in self.nsfw: - begin, end = nsfw - if begin <= self.picked_frame <= end: - return True - return False - - def _translate_basename(self, translations): - output_name_tr = self.output_name - for k,v in translations.items(): - replace_token = "${" + k + "}" - output_name_tr = output_name_tr.replace(replace_token, v) - return output_name_tr - - # TODO: Add translation keywords to the message - def get_message(self): - return self.message_nsfw if self.is_nsfw() else self.message - - def _render_frame(self): - args = [ - self.render_script, - self.path, - self.output_name_tr, - str(self.picked_frame) - ] - subprocess.run(args) - - -class YandereBot: - # From the threading library. Is responsible for putting the bot to sleep, and exiting when the user quits (Ctrl+C) - eventSleep = Event() - - # The configuration module - cfg = None - - # The below settings are required from the configuration module - settings_server = None - settings_behavior = None - settings_time = None - settings_encrypt = None - - # Class variables - mastodon_api = None - failed_uploads = 0 - consecutive_failed_uploads = 0 - currentSessionCount = 0 - debug_mode = False - primed = False - decrypted = False - - # YandereBot.__init__() - # @param cfg A dynamically loaded python module - # @param keyfile Keyfile to decrypt settings_post - # @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma) - # @prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post) - def __init__(self, cfg, keyfile=None, debug_mode=False, prime_bot=True): - self.cfg = cfg - self.load_settings(self.cfg) - self.debug_mode = debug_mode or self.settings_behavior["debug"] - self.settings_encrypt["keyfile"] = keyfile or self.settings_encrypt["keyfile"] - if prime_bot: - self.prime_bot() - - # Setup Exit Calls - def exit(self): - self.eventSleep.set() - - # Decryption settings - # Used to set class attributes with the same name to the value specified in the config file. - def load_settings(self, cfg, settings=None): - try: - default_settings = ( - "settings_server", - "settings_behavior", - "settings_time", - "settings_encrypt" - ) - _settings = settings or default_settings - for ele in _settings: - setattr(self, ele, getattr(cfg, ele)) - - except AttributeError as e: - print(e) - raise BadCfgFile - - def decrypt_settings(self): - if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode: - import encryption - try: - self.settings_server = encryption.settings_server_decrypt( - self.settings_server, self.settings_encrypt, self.settings_encrypt["keyfile"]) - except encryption.EncryptionFail as e: - raise BadCfgFile(str(e)) - self.decrypted = True - - # Login to Pleroma - def login(self): - skip_login = self.debug_mode or self.mastodon_api is not None - if skip_login: - return - if not self.decrypted: - self.decrypt_settings() - try: - self.mastodon_api = Mastodon( - client_id=self.settings_server["client_id"], - client_secret=self.settings_server["client_secret"], - access_token=self.settings_server["access_token"], - api_base_url=self.settings_server["api_base_url"], - feature_set=self.settings_behavior["feature_set"] # <--- Necessary for Mastodon Version 1.5.1 - ) - except (MastodonIllegalArgumentError, MastodonVersionError) as e: - print(e) - raise FailedLogin - - # Maybe I should remove this from the backend? - def print_header_stats(self, picked): - profile, frame, nsfw, path = None, None, None, None - if picked: - profile = picked.profile_name - frame = picked.picked_frame - nsfw = picked.is_nsfw() - path = picked.output_name_tr - print("Profile: {} | Frame: {} | NSFW: {} | Path: {}".format( - profile, frame, nsfw, path - )) - - # Returns a list of tuples that contain the media list path and media mastodon dictionary - def upload_media_list(self, path_list): - media_list = [] - for path in path_list: - if not os.path.isfile(path): - raise FileNotFoundError("Could not upload: {}".format(path)) - - for path in path_list: - if not self.debug_mode: - media = self.mastodon_api.media_post(path, description=os.path.basename(path)) - media_dict = { - "path": path, - "media": media - } - media_list.append(media_dict) - return media_list - - def get_post_text(self, yandere_frame, media_list): - content_type = self.settings_behavior["content_type"] - content_newline = self.settings_behavior["content_newline"] - static_message = content_newline.join(yandere_frame.get_message()) - string_post = "" - string_imglinks = [] - - if media_list and self.settings_behavior["post_image_link"]: - for media_dict in media_list: - path = media_dict["path"] - media = media_dict["media"] - - if path is None or media is None: - continue - elif content_type == "text/markdown" and not self.debug_mode: - string_imglinks.append( - "[{}]({})".format(os.path.basename(path), media["url"]) - ) - else: - string_imglinks.append(media["url"]) - - # Join non empty strings with a newline character - string_imglinks_joined = content_newline.join(filter(None, string_imglinks)) - string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined))) - return string_post - - def _post(self, yandere_frame): - if not yandere_frame: - raise InvalidPost("Frame is none") - upload_list = [yandere_frame.output_name_tr] - media_list = self.upload_media_list(upload_list) - message = self.get_post_text(yandere_frame, media_list) - if self.debug_mode: - return yandere_frame - self.mastodon_api.status_post( - message, - media_ids=[i["media"] for i in media_list], - visibility=self.settings_behavior["visibility"], - sensitive=yandere_frame.is_nsfw(), - content_type=self.settings_behavior["content_type"] - ) - return yandere_frame - - # The main post function - # This funciton is responsible for picking a profile, generate a screenshot, and posting it. - # - # This function should return 'None' if a post failed, and the picked item from self.listPictures if it succeeded. - def post(self): - picked = None - - # Attempt post - try: - # Post - dt_picked = self.settings_time["datetime"] - picked_profile = random.choice(self.cfg.settings_post) - picked = PickAndRenderFrame(picked_profile, dt_picked) - self._post(picked) - - # After a successful post - self.currentSessionCount += 1 - self.consecutive_failed_uploads = 0 - - # The post was successful - return picked - - # Server Errors and other general exceptions - # Assume all exceptions are on the server side (besides FileNotFoundError of course) - # If the connection is timing out it could be for two reasons: - # 1. The error was caused by the user attempting to upload a large file over a slow connection: - # a. FIX: Reduce settings_behavior["max_size"] - # 2. The server is down. Check to verify in a web browser (this is the default assumption since the - # mastodon.py API will not specify why the connection timed out). - # 3. Failed to generate screenshot - # 4. Other general exceptions - except (FileNotFoundError, MastodonAPIError, InvalidPost, Exception) as e: - print("Exception:", e) - - # An exception occurred - self.failed_uploads += 1 - self.consecutive_failed_uploads += 1 - - # The post failed - return None - - # [BEGIN THE PROGRAM] - def prime_bot(self): - random.seed(os.urandom(16)) - if not self.debug_mode: - self.decrypt_settings() - self.login() - self.primed = True - - def start(self, delay=0): - # Prime bot if not already primed. - if not self.primed: - self.prime_bot() - - # Begin posting - self.main_loop() - - # Return 1 if there are still pictures in the picture list - return 0 - - # End Conditions: - # 1. User presses Ctrl+C - # 2. settings_behavior["uploads_per_post"] is less than uploads_per_post - # 3. Consecutive failed uploads is less than max_errors - def can_post(self): - return ( - not self.eventSleep.is_set() and - self.currentSessionCount < self.settings_behavior["uploads_per_post"] and - self.consecutive_failed_uploads < self.settings_behavior["max_errors"] - ) - - def main_loop(self): - sleep_seconds = self.settings_behavior["retry_seconds"] - while self.can_post(): - picked = self.post() - self.print_header_stats(picked) - if self.can_post(): - self.eventSleep.wait(sleep_seconds) - -# Custom Exceptions for YandereBot -class Debug(Exception): - pass - - -class FailedLogin(Exception): - pass - - -class BadCfgFile(Exception): - pass - - -class InvalidPost(Exception): - pass