From 89381b76d4863e42a022faa784754fb547405d0f Mon Sep 17 00:00:00 2001 From: Anon Date: Sun, 12 Mar 2023 17:45:40 -0700 Subject: [PATCH] Refactored to make use of backend module --- src/main.py | 177 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 172 insertions(+), 5 deletions(-) diff --git a/src/main.py b/src/main.py index a5b691f..fd05442 100755 --- a/src/main.py +++ b/src/main.py @@ -16,10 +16,177 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import os import sys import argparse +import random import signal -import yandere_bot +import FediBot +import importlib +import magic +import copy + +class FileTooLarge(Exception): + pass + + +class InvalidMimeType(Exception): + pass + + +class YandereBot(FediBot.YandereBot): + currentSessionCount = 0 + currentIndexCount = 0 + currentProfileIndex = [] + + def __init__(self, cfg, keyfile=None, debug_mode=False): + settings = { + "settings_time": {}, + "settings_post": {}, + "settings_backend": {} + } + self.settings.update(settings) + super(YandereBot, self).__init__(cfg, keyfile, debug_mode) + random.seed(os.urandom(16)) + self.currentProfileIndex = [0] * len(self.settings["settings_post"]) + + def print_header_stats(self, picked): + settings_post = self.settings["settings_post"] + profile = picked["profile"]["name"] if picked else None + url = picked["file_url"] if picked else None + path = picked["full_path"] if picked else None + nsfw = picked["nsfw"] if picked else None + + posted_once = int(self.currentSessionCount > 0) + index = (self.currentIndexCount - posted_once) % len(settings_post) + state_print = copy.copy(self.currentProfileIndex) + state_print[index] = state_print[index] - posted_once + + state_print = [state_print[i] % len(settings_post[i]) for i in range(0, len(self.currentProfileIndex))] + + print("Profile: {} | Index: {} | NSFW: {} | Path: {} | URL: {}".format( + profile, index, nsfw, path, url + )) + print("State: {}".format(','.join(map(str, state_print)))) + + + # Returns a list of media paths (without the hashes) + def download_media(self, picked_profile): + try: + backend_s = picked_profile["backend"] + backend_credentials = self.settings["settings_backend"][backend_s] + backend = importlib.import_module(backend_credentials["module"]) + + downloader = backend.downloader(backend_credentials) + img = downloader.fetch_post(picked_profile) + + if img is None: + raise FediBot.InvalidPost("Img could not be downloaded") + + return downloader.download_post(img) + except ImportError: + print("Invalid Backend:", picked_profile["backend"]) + return None + + # Returns a list of tuples that contain the media list path and media mastodon dictionary + def upload_media_list_validate(self, media_list): + # Check to make sure the paths in media_list actually exist + super(YandereBot, self).upload_media_list_validate(media_list) + + # Validate picked + for path in media_list: + if not self.valid_mimetype(path): + raise InvalidMimeType("Invalid mime type") + elif not self.valid_file_size(path): + raise FileTooLarge("File is too large to upload") + + + def valid_mimetype(self, full_path): + mime = magic.from_file(full_path, mime=True) + + if mime is None: + return False + + mime_category = mime.split("/", 1)[0] + + return mime_category in ("image", "video") + + def get_message(self, picked): + nsfw = picked["nsfw"] + message = picked["profile"]["message_nsfw"] if nsfw else picked["profile"]["message"] + return message + + def valid_file_size(self, full_path): + settings_behavior = self.settings["settings_behavior"] + max_size = settings_behavior["max_size"] + file_size = os.stat(full_path).st_size + return file_size <= max_size + + + def pick_index(self, mode, current_index, length): + if mode == "random": + return random.randint(0, length - 1) + elif mode == "sequential": + return current_index % length + + + def pick_profile(self): + settings_behavior = self.settings["settings_behavior"] + # Get x and y + mode = settings_behavior["tag_select"].lower() + posts = self.settings["settings_post"] + x = self.pick_index(mode, self.currentIndexCount, len(posts)) + y = self.pick_index(mode, self.currentProfileIndex[x], len(posts[x])) + + # Return the Profile + return x, y + + def pick(self): + settings_post = self.settings["settings_post"] + x, y = self.pick_profile() + picked_profile = settings_post[x][y] + + picked = self.download_media(picked_profile) + media_list = [picked["full_path"]] + spoiler = picked["nsfw"] + message = self.get_message(picked) + return { + "picked": picked, + "media_list": media_list, + "spoiler": spoiler, + "message": message + } + + def after_pick(self, picked): + self.print_header_stats(picked["picked"]) + self.currentProfileIndex[self.currentIndexCount] += 1 + self.currentIndexCount += 1 + os.remove(picked["picked"]["full_path"]) + return super(YandereBot, self).after_pick(picked) + + def post(self, picked): + # Attempt post + try: + return super(YandereBot, self).post(picked) + + # Invalid post (move to next profile) + except FediBot.InvalidPost as e: + self.currentIndexCount += 1 + print("Invalid post:", e) + + # Invalid post (remove downloaded files) + except (FileTooLarge, InvalidMimeType) as e: + os.remove(picked["full_path"]) + print("Unable to post:", e) + + self.handle_post_exception() + + # The post failed + return None + + + def can_post(self): + return self.currentIndexCount >= 0 and super(YandereBot, self).can_post() class FailedToLoadCfg(Exception): @@ -59,7 +226,7 @@ def main(): # Flag if the bot is running in debug mode debug_mode = arguments.dry_run or arguments.debug - yandere = yandere_bot.YandereBot( + yandere = YandereBot( yandere_config, arguments.keyfile, debug_mode, @@ -94,9 +261,9 @@ if __name__ == "__main__": except FailedToLoadCfg: sys.exit(10) # Exceptions raised from the bot - except yandere_bot.Debug: + except FediBot.Debug: sys.exit(6) - except yandere_bot.BadCfgFile: + except FediBot.BadCfgFile: sys.exit(4) - except yandere_bot.FailedLogin: + except FediBot.FailedLogin: sys.exit(2)