commit 9e15563999850a8094a73747c02b195ebd66f216 Author: Anon Date: Sun Mar 12 18:00:01 2023 -0700 Initial Commit diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..da2f77e --- /dev/null +++ b/__init__.py @@ -0,0 +1,3 @@ +from .fedibot import YandereBot, Debug, InvalidPost, FailedLogin, BadCfgFile + +__all__ = ["fedibot"] diff --git a/fedibot.py b/fedibot.py new file mode 100644 index 0000000..f2c25c6 --- /dev/null +++ b/fedibot.py @@ -0,0 +1,267 @@ +#! /usr/bin/env python3 + +# Yandere Lewd Bot, an image posting bot for Pleroma +# Copyright (C) 2022 Anon +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os +from threading import Event +from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonVersionError + +class YandereBot: + # From the threading library. Is responsible for putting the bot to sleep, and exiting when the user quits (Ctrl+C) + eventSleep = Event() + + # The configuration module + cfg = None + + # The below settings are required from the configuration module + settings = { + "settings_server": dict(), + "settings_behavior": dict(), + "settings_encrypt": dict() + } + + # Class variables + mastodon_api = None + failed_uploads = 0 + consecutive_failed_uploads = 0 + currentSessionCount = 0 + debug_mode = False + primed = False + decrypted = False + + # YandereBot.__init__() + # @param cfg A dynamically loaded python module. See yanlib.module_load() for an example + # @param keyfile Keyfile to decrypt settings_post + # @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma) + def __init__(self, cfg, keyfile=None, debug_mode=False): + settings = self.settings + self.cfg = cfg + self.load_settings(self.cfg) + self.debug_mode = debug_mode or settings["settings_behavior"]["debug"] + settings["settings_encrypt"]["keyfile"] = keyfile or settings["settings_encrypt"]["keyfile"] + + # Setup Exit Calls + def exit(self): + self.eventSleep.set() + + # Decryption settings + # Used to set class attributes with the same name to the value specified in the config file. + def load_settings(self, cfg): + try: + for key in self.settings: + self.settings[key] = getattr(cfg, key) + + except AttributeError as e: + print(e) + raise BadCfgFile + + def decrypt_settings(self): + settings = self.settings + settings_encrypt = settings["settings_encrypt"] + settings_server = settings["settings_server"] + if settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode: + import FediBotEncryption + try: + self.settings["settings_server"] = FediBotEncryption.settings_server_decrypt( + settings_server, settings_encrypt, settings_encrypt["keyfile"]) + except FediBotEncryption.EncryptionFail as e: + raise BadCfgFile(str(e)) + self.decrypted = True + + # Login to Pleroma + def login(self): + skip_login = self.debug_mode or self.mastodon_api is not None + if skip_login: + return + if not self.decrypted: + self.decrypt_settings() + try: + settings = self.settings + settings_server = settings["settings_server"] + settings_behavior = settings["settings_behavior"] + + self.mastodon_api = Mastodon( + client_id=settings_server["client_id"], + client_secret=settings_server["client_secret"], + access_token=settings_server["access_token"], + api_base_url=settings_server["api_base_url"], + feature_set=settings_behavior["feature_set"] # <--- Necessary for Mastodon Version 1.5.1 + ) + except (MastodonIllegalArgumentError, MastodonVersionError) as e: + print(e) + raise FailedLogin + + def upload_media_list_validate(self, path_list): + for path in path_list: + if path is None or not os.path.isfile(path): + raise FileNotFoundError("Could not upload: {}".format(path)) + + + # Returns a list of tuples that contain the media list path and media mastodon dictionary + def upload_media_list(self, path_list): + self.upload_media_list_validate(path_list) + + media_list = [] + for path in path_list: + if not self.debug_mode: + media = self.mastodon_api.media_post(path, description=os.path.basename(path)) + media_dict = { + "path": path, + "media": media + } + media_list.append(media_dict) + return media_list + + def get_post_text(self, message, media_list): + settings = self.settings + settings_behavior = settings["settings_behavior"] + + content_type = settings_behavior["content_type"] + content_newline = settings_behavior["content_newline"] + static_message = content_newline.join(message) + string_post = "" + string_imglinks = [] + + if media_list and settings_behavior["post_image_link"]: + for media_dict in media_list: + path = media_dict["path"] + media = media_dict["media"] + + if path is None or media is None: + continue + elif content_type == "text/markdown" and not self.debug_mode: + string_imglinks.append( + "[{}]({})".format(os.path.basename(path), media["url"]) + ) + else: + string_imglinks.append(media["url"]) + + # Join non empty strings with a newline character + string_imglinks_joined = content_newline.join(filter(None, string_imglinks)) + string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined))) + + return string_post + + def _post(self, picked): + settings = self.settings + settings_behavior = settings["settings_behavior"] + + if not picked: + raise InvalidPost("Picked cannot be None") + if not picked["media_list"]: + raise InvalidPost("Media list is empty") + if self.debug_mode: + return picked + media_list = self.upload_media_list(picked["media_list"]) + message = self.get_post_text(picked["message"], media_list) + self.mastodon_api.status_post( + message, + media_ids=[i["media"] for i in media_list], + visibility=settings_behavior["visibility"], + sensitive=picked["spoiler"], + content_type=settings_behavior["content_type"] + ) + return picked + + def pick(self): + return None + + def after_pick(self, picked): + return picked + + def handle_post_exception(self): + # An exception occurred + self.failed_uploads += 1 + self.consecutive_failed_uploads += 1 + + # The main post function + # This funciton is responsible for picking, posting, and blacklisting an image in listPictures + # + # It is also responsible for removing the picked item from self.listPictures, which can be accomplished by simply + # popping it at index 0. This should handle any error that might occur while posting. + # + # This function should return 'None' if a post failed, and the picked item from self.listPictures if it succeeded. + def post(self, picked): + # Post + self._post(picked) + + # After a successful post + self.currentSessionCount += 1 + self.consecutive_failed_uploads = 0 + + self.after_pick(picked) + + # The post was successful + return picked + + # [BEGIN THE PROGRAM] + def prime_bot(self): + if self.primed: + return + if not self.debug_mode: + self.decrypt_settings() + self.login() + self.primed = True + + def start(self): + self.prime_bot() + + # Begin posting + self.main_loop() + + return 0 + + # End Conditions: + # 1. User presses Ctrl+C + # 2. There is nothing left in the picture list to select from + # 3. settings_behavior["uploads_per_post"] is less than uploads_per_post + # 4. Consecutive failed uploads is less than max_errors + def can_post(self): + settings = self.settings["settings_behavior"] + return ( + not self.eventSleep.is_set() and + self.currentSessionCount < settings["uploads_per_post"] and + self.consecutive_failed_uploads < settings["max_errors"] + ) + + def main_loop(self): + sleep_seconds = self.settings["settings_behavior"]["retry_seconds"] + while self.can_post(): + self.post(self.pick()) + if self.can_post(): + self.eventSleep.wait(sleep_seconds) + + + +# Custom Exceptions for YandereBot +class Debug(Exception): + pass + + +class InvalidPost(Exception): + pass + + +class FailedLogin(Exception): + pass + + +class BadCfgFile(Exception): + pass + + +