diff --git a/src/main.py b/src/main.py index 938f525..2107f56 100755 --- a/src/main.py +++ b/src/main.py @@ -1,7 +1,7 @@ #! /usr/bin/env python3 -# Yandere Lewd Bot, an image posting bot for Pleroma -# Copyright (C) 2022 Anon +# Mirai Nikki Bot a video frame posting bot for Pleroma +# Copyright (C) 2022 Anon # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -19,154 +19,15 @@ import sys import argparse import signal -import yandereBot +import yandere_bot import datetime import contextlib -# A class that inherits from either YandereBot from yandereBot module -# This class is used to handle command line arguments gracefully, and extend functionality quickly -# Bot specific changes should be made here (if they are minor enough). -# This will be instantiated from the main() function -class YandereBot(yandereBot.YandereBot): - # The below settings are required from the configuration module - settings_time = None - settings_reminder = None - - # Wait before running - delay_start = 0 - - def __init__(self, cfg, debug_mode, prime_bot, delay_d=None, delay_h=None, delay_s=None): - super(YandereBot, self).__init__(cfg, debug_mode, prime_bot=False) - self.load_settings(self.cfg, ("settings_time", "settings_reminder")) - self.set_pretimer(delay_d, delay_h, delay_s) - - if self.debug_mode: - print("[DEBUG MODE ON - DRY RUN BEGIN]") - - # Do not perform sanity test if stdout is None. - # input() will fail if stdout is None, which could happen with the following command - # ./run --dry-run -h -d 'a date in the past' - if sys.stdout is not None and not self.pass_sanity_test(): - raise FailedSanityTest - - if prime_bot: - self.prime_bot() - - def print_date_time_example(self): - print_fmt = " {0:6} {1:10} {2}" - time_fmt = self.settings_time["time_format"] - date_fmt = self.settings_time["date_format"] - current_time = self.dateSelection - - print(print_fmt.format( - "TIME", time_fmt, current_time.strftime(time_fmt) - )) - print(print_fmt.format( - "DATE", date_fmt, current_time.strftime(date_fmt) - )) - - def dump_pictures(self): - for ele in self.listPictures: - print(ele.get_full_string()) - - def set_delay_d(self, d): - try: - t = datetime.datetime.strptime(d, self.settings_time["date_format"]) - self.dateNextSelection = self.dateNextSelection.replace( - year=t.year, month=t.month, day=t.day - ) - except Exception: - print("Invalid date format: {}\n\nCorrect date/time format examples:".format(d)) - self.print_date_time_example() - raise DateWrongFormat - - def set_delay_h(self, h, add_24): - try: - t = datetime.datetime.strptime(h, self.settings_time["time_format"]) - self.dateNextSelection = self.dateNextSelection.replace( - hour=t.hour, minute=t.minute, second=t.second, microsecond=t.microsecond - ) - if self.dateNextSelection < self.dateSelection and add_24: - self.dateNextSelection = yandereBot.time_add_seconds(self.dateNextSelection, 60 * 60 * 24) - except Exception: - print("Invalid time format: {}\n\nCorrect date/time format examples:".format(h)) - self.print_date_time_example() - raise TimeWrongFormat - - def set_pretimer(self, d=None, h=None, s=0): - if d: - self.set_delay_d(d) - if h: - self.set_delay_h(h, d is None) - if s: - self.delay_start = max(0, s) - - # Check for potential misconfigurations by the user - def pass_sanity_test(self): - # Calculate pre-timer value - seconds_until_next_pos = yandereBot.time_diff_seconds(self.dateNextSelection, self.dateSelection) - - # Possible misconfigurations that will prompt the user to continue - pretimer_less_than_zero = seconds_until_next_pos < 0 - pretimer_greater_than_sleep = seconds_until_next_pos > self.settings_behavior["sleep_seconds"] - - # Prompt the user - prompt_user = pretimer_less_than_zero or pretimer_greater_than_sleep - - # Remind the user to generate new OAuth tokens - dt = datetime.datetime.strptime(self.settings_reminder, self.settings_time["long_date_format"]) - if dt < datetime.datetime.now(): - print("REMINDER: Generate new tokens!!") - - # Check if the bot is back-posting in time and make sure this is what the user wanted to avoid spamming - if pretimer_less_than_zero: - sleep = round(abs(seconds_until_next_pos), 2) - images = round(sleep / (self.settings_behavior["sleep_seconds"] * self.settings_behavior["uploads_per_post"]), 2) + 1 - print("WARNING: Pre-timer is less than the current time by: {} seconds. {} images will post immediately".format( - sleep, images - )) - # Check if the bot will wait for longer than the default amount of sleep time configured in the cfg.py - elif pretimer_greater_than_sleep: - print("WARNING: Pre-timer will sleep for {} seconds. This is more than the configured amount ({} seconds)".format( - round(seconds_until_next_pos, 2), self.settings_behavior["sleep_seconds"] - )) - - # Prompt the user if something doesn't seem right - # This must be done before we set up our keyboard interrupts. Otherwise the below exceptions will not work. - try: - if prompt_user: - # Default to 'y' if the user just presses enter - ans = input("Do you want to continue [Y/n]? ") or "y" - return ans.lower() in ("y", "yes") - except (EOFError, KeyboardInterrupt): - print() - return False - - # Sanity test passed - return True - - def start(self, delay=None): - _delay = delay or self.delay_start - return super(YandereBot, self).start(max(0, _delay)) - - -# Custom exceptions -class TimeWrongFormat(Exception): - pass - - -class DateWrongFormat(Exception): - pass - - -class FailedSanityTest(Exception): - pass - - class FailedToLoadCfg(Exception): pass + # Entry point if run from the command line def main(): # Default config file @@ -176,21 +37,14 @@ def main(): parser = argparse.ArgumentParser( description="A bot for posting on Mastodon", # epilog="All switches can be combined for greater control", - add_help=False) + add_help=True) parser.add_argument("--dry-run", help="Will not login or post to Plemora", action="store_true") parser.add_argument("--debug", help="Same as --dry-run", action="store_true") - parser.add_argument("-w", "--wait", type=int, help="Wait before posting first image (seconds)", default=0) - parser.add_argument("-t", "--time", help="Wait for time before posting first image", default=None) - parser.add_argument("-d", "--date", help="Wait for date before posting first image", default=None) parser.add_argument("-c", "--config", help="Set custom config file (Default: {})".format(default_cfg), default=default_cfg) - parser.add_argument("-o", "--output-hashes", help="Output list of hashes", action="store_true") - parser.add_argument("-h", "--help", help="Show this help message and exit", action="store_true") + parser.add_argument("-k", "--keyfile", help="Keyfile used for decryption") parser.add_argument("remainder", help=argparse.SUPPRESS, nargs=argparse.REMAINDER) arguments = parser.parse_args() - # Redirect stdout when the bot first initializes if the bot is not going to run normally - redirect_stdout = None if arguments.output_hashes or arguments.help else sys.stdout - # Yandere Lewd Bot yandere = None yandere_config = None @@ -200,37 +54,17 @@ def main(): import importlib yandere_config = importlib.import_module(arguments.config) except ImportError: - print("Failed to Load Configuration:", arguments.config) - raise FailedToLoadCfg + raise FailedToLoadCfg("Invalid config file: {}".format(arguments.config)) # Flag if the bot is running in debug mode - debug_mode = (arguments.dry_run or arguments.debug or arguments.output_hashes or arguments.help) + debug_mode = arguments.dry_run or arguments.debug - with contextlib.redirect_stdout(redirect_stdout): - prime_bot = not arguments.help - - yandere = YandereBot( - yandere_config, - debug_mode, - prime_bot, - arguments.date, - arguments.time, - arguments.wait - ) + yandere = yandere_bot.YandereBot( + yandere_config, + arguments.keyfile, + debug_mode + ) - # Print Usage Information with Time and Date Formats with Examples - if arguments.help: - parser.print_help() - print() - yandere.print_date_time_example() - print() - return 0 - - # Output all of the images in the bot's picture list - if arguments.output_hashes: - yandere.dump_pictures() - return 0 - # Setup exit calls # Must be done after we declare our bot(s), otherwise this will be called if quitting on decrypting settings ) def yandere_quit(signo, _frame): @@ -251,21 +85,12 @@ if __name__ == "__main__": # Exceptions raised from the main function except FailedToLoadCfg: sys.exit(10) - except FailedSanityTest: - sys.exit(9) - except DateWrongFormat: - sys.exit(8) - except TimeWrongFormat: - sys.exit(7) - # Exceptions raised from the bot - except yandereBot.Debug: + except yandere_bot.Debug: sys.exit(6) - except yandereBot.MissingMasterList: - sys.exit(5) - except yandereBot.BadCfgFile: + except yandere_bot.BadCfgFile: sys.exit(4) - except yandereBot.BadPostSettings: + except yandere_bot.BadPostSettings: sys.exit(3) - except yandereBot.FailedLogin: + except yandere_bot.FailedLogin: sys.exit(2) diff --git a/src/yandere_bot.py b/src/yandere_bot.py index c2319bc..e955bbf 100644 --- a/src/yandere_bot.py +++ b/src/yandere_bot.py @@ -64,25 +64,21 @@ class YandereBot: listPictures = [] lenBlacklist = 0 failed_uploads = 0 + consecutive_failed_uploads = 0 currentSessionCount = 0 debug_mode = False primed = False decrypted = False - # Time variables - dateSelection = None - dateNextSelection = None - # YandereBot.__init__() # @param cfg A dynamically loaded python module. See yanlib.module_load() for an example # @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma) # prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post) - def __init__(self, cfg, debug_mode=False, prime_bot=True): - self.dateSelection = datetime.datetime.now() - self.dateNextSelection = self.dateSelection + def __init__(self, cfg, keyfile=None, debug_mode=False, prime_bot=True): self.cfg = cfg self.load_settings(self.cfg) self.debug_mode = debug_mode or self.settings_behavior["debug"] + self.settings_encrypt["keyfile"] = keyfile or self.settings_encrypt["keyfile"] if prime_bot: self.prime_bot() @@ -119,11 +115,15 @@ class YandereBot: except AttributeError as e: print(e) raise BadCfgFile - + def decrypt_settings(self): if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode: import encryption - self.settings_server = encryption.settings_server_decrypt_cfg(self.settings_server, self.settings_encrypt) + try: + self.settings_server = encryption.settings_server_decrypt( + self.settings_server, self.settings_encrypt, self.settings_encrypt["keyfile"]) + except encryption.EncryptionFail as e: + raise BadCfgFile(str(e)) self.decrypted = True # Login to Pleroma @@ -167,10 +167,10 @@ class YandereBot: # load_pictures will return a list of YanHashObj() with a blacklist(s) applied # @param list_blacklist A list of HashObjects() that are blacklist hashes def load_pictures(self, list_blacklist): - list_pictures = [] if not self.settings_behavior["master_list"]: raise MissingMasterList - + + list_pictures = [] # Return a list of hashes with profiles try: for f in self.settings_behavior["master_list"]: @@ -193,42 +193,22 @@ class YandereBot: random.shuffle(self.listPictures) # Maybe I should remove this from the backend? - def print_header_stats(self, picked, date_selection, date_next_selection): + def print_header_stats(self, picked): _picked = picked.get_full_string() if picked else None picked_profile = picked.get_post_setting()["name"] if picked else None picked_next = self.listPictures[0].get_full_string() if self.listPictures else None picked_next_profile = self.listPictures[0].get_post_setting()["name"] if self.listPictures else None - next_selection_seconds = max(0, int(time_diff_seconds(date_next_selection, date_selection))) - n_posts_remain = math.ceil(len(self.listPictures) / self.settings_behavior["uploads_per_post"]) - if date_selection != date_next_selection: - n_posts_remain -= 1 - - remaining_seconds = n_posts_remain * self.settings_behavior["sleep_seconds"] - date_end_selection = time_add_seconds(date_selection, remaining_seconds + next_selection_seconds) - date_end_selection_seconds = max(0, time_diff_seconds(date_end_selection, date_selection)) - if date_selection != date_next_selection and picked is None: - date_end_selection_seconds += next_selection_seconds - d, h, m, s = humanize_time_delta(date_end_selection_seconds) - - print("[Profile]", picked_profile) - print("[Profile Next]", picked_next_profile) - print(_picked) - print("Next:", picked_next) # There should always be something in picture_next list - print("Selection time: {}".format( - date_selection.strftime(self.settings_time["long_date_format"])) ) - print("Next selection time: {} ({} seconds)".format( - date_next_selection.strftime(self.settings_time["long_date_format"]), next_selection_seconds) ) - print("End selection time: {}".format(date_end_selection.strftime( - self.settings_time["long_date_week"])) ) - print("Time Remaining: {} Days | {} Hours | {} Minutes | {} Seconds".format(d, h, m, s) ) - print("[ {} Pictures | {} Blacklisted | {} Selected during session | {} Failed ]\n".format( - len(self.listPictures), self.lenBlacklist, self.currentSessionCount, self.failed_uploads) ) + print("Profile: {} | Picked: {} | Next_Profile: {} | Next_Pick: {}".format( + picked_profile, _picked, picked_next_profile, picked_next + )) # Returns a list of media paths (without the hashes) def get_media_list(self, picked): ext = self.settings_behavior["multi_media_ext"] - if ext and os.path.splitext(picked.get_hash_path())[1].lower() == ext.lower(): + if not picked: + return None + elif ext and os.path.splitext(picked.get_hash_path())[1].lower() == ext.lower(): return [i.get_hash_path() for i in yanlib.get_hash_list(picked.get_hash_path())] else: return [picked.get_hash_path()] @@ -236,10 +216,16 @@ class YandereBot: # Returns a list of tuples that contain the media list path and media mastodon dictionary def upload_media_list(self, path_list): media_list = [] - for ele in path_list: + for path in path_list: + if not os.path.isfile(path): + raise FileNotFoundError("Could not upload: {}".format(path)) if not self.debug_mode: - media = self.mastodon_api.media_post(ele, description=os.path.basename(ele)) - media_list.append((ele, media)) + media = self.mastodon_api.media_post(path, description=os.path.basename(path)) + media_dict = { + "path": path, + "media": media + } + media_list.append(media_dict) return media_list def get_post_text(self, picked, media_list): @@ -265,23 +251,23 @@ class YandereBot: string_imglinks_joined = content_newline.join(filter(None, string_imglinks)) string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined))) - return content_type, string_post + return string_post def _post(self, picked): images = self.get_media_list(picked) if not images: - raise ValueError("Media list is empty") + raise InvalidPost("Media list is empty") media_list = self.upload_media_list(images) - content_type, message = self.get_post_text(picked, media_list) + message = self.get_post_text(picked, media_list) if self.debug_mode: return picked self.mastodon_api.status_post( message, - media_ids=[i[1] for i in media_list if len(i) == 2], + media_ids=[i["media"] for i in media_list], visibility=self.settings_behavior["visibility"], sensitive=picked.get_post_setting()["spoiler"], - content_type=content_type - ) + content_type=self.settings_behavior["content_type"] + ) return picked # The main post function @@ -297,7 +283,6 @@ class YandereBot: # Flags that are set if an upload fails reinsert_image = False - timeout = False # Attempt post try: @@ -307,34 +292,25 @@ class YandereBot: # After a successful post self.currentSessionCount += 1 + self.consecutive_failed_uploads = 0 self.blacklist(picked) # The post was successful return picked # Attempted to post a file that doesn't exist (immediately repost ignoring retry_seconds) - except FileNotFoundError: + except (FileNotFoundError, InvalidPost): print("File not found:", picked.get_hash_path()) - # Exception flags - reinsert_image, timeout = False, False - - # Media list is empty - except ValueError as e: - print("Media list cannot be empty:", picked.get_hash_path()) - # Exception flags - reinsert_image, timeout = False, False + reinsert_image = False # Check if the file limit has been reached except MastodonAPIError as e: - # Check if the file limit has been reached (413 error) - file_limit_reached = False - with contextlib.suppress(IndexError): - file_limit_reached = (e.args[1] == 413) - print("API Error:", e) - # Exception flags - reinsert_image, timeout = not file_limit_reached, True + # Check if the file limit has been reached (413 error) + with contextlib.suppress(IndexError): + reinsert_image = e.args[1] != 413 + # Server Errors # Assume all exceptions are on the server side # If the connection is timing out it could be for two reasons: @@ -346,28 +322,18 @@ class YandereBot: except Exception as e: print("Unhandled Exception:", e) # Exception flags - reinsert_image, timeout = True, True + reinsert_image = True # An exception occurred self.failed_uploads += 1 - print("[Errors: {}] {}{}".format(self.failed_uploads, picked.get_full_string(), os.linesep)) - if reinsert_image: + self.consecutive_failed_uploads += 1 + + if reinsert_image and self.consecutive_failed_uploads < self.settings_behavior["max_errors"]: self.listPictures.insert(0, picked) - if timeout: - self.eventSleep.wait(self.settings_behavior["retry_seconds"]) # The post failed return None - def schedule_next_post(self): - self.dateSelection = self.dateNextSelection - self.dateNextSelection = time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"]) - - # Will wait between the current time and the time of next selection - def wait_future_time(self): - seconds = time_diff_seconds(self.dateNextSelection, datetime.datetime.now()) - self.eventSleep.wait(max(0, seconds)) - # [BEGIN THE PROGRAM] def prime_bot(self): self.load_picture_list() @@ -375,8 +341,7 @@ class YandereBot: self.shuffle_list() if not self.debug_mode: self.decrypt_settings() - if self.can_post(): - self.login() + self.login() self.primed = True def start(self, delay=0): @@ -384,30 +349,6 @@ class YandereBot: if not self.primed: self.prime_bot() - # Early out if the bot is incapable of posting - if not self.can_post(): - print("Bot is incapable of posting!!") - return 1 - - start_time = self.dateSelection - delay_seconds = max(time_diff_seconds(self.dateNextSelection, start_time) + delay, delay) - delay_time = time_add_seconds(start_time, delay_seconds) - - # Print the first image in the list if a delay or pretimer is set - if delay_seconds: - self.print_header_stats(None, start_time, delay_time) - - # The delay parameter is different from the dateSelection and dateSelectionNext - # It will literally time out the bot for a given number of seconds regardless of the pre-timer setting - # This is useful if you want to set a delay of 30 seconds, before back-posting several images - self.eventSleep.wait(max(0, delay)) - - # Check if the pre-timer is set - # dateNextSelection should be greater than dateSelection if it is - # dateSelection and dateNextSelection are both set to the current time when the bot is initialized - if delay_seconds: - self.wait_future_time() - # Begin posting self.main_loop() @@ -419,23 +360,21 @@ class YandereBot: # 2. There is nothing left in the picture list to select from # 3. settings_behavior["uploads_per_post"] is less than one for some reason def can_post(self): - return not self.eventSleep.is_set() and bool(len(self.listPictures)) and self.settings_behavior["uploads_per_post"] > 0 + return ( + not self.eventSleep.is_set() and + bool(len(self.listPictures)) and + self.currentSessionCount < self.settings_behavior["uploads_per_post"] and + self.consecutive_failed_uploads < self.settings_behavior["max_errors"] + ) + def main_loop(self): - target_posts = self.settings_behavior["uploads_per_post"] + sleep_seconds = self.settings_behavior["retry_seconds"] while self.can_post(): - successful_posts = 0 - while (successful_posts < target_posts) and self.can_post(): - last_picked = self.post() - successful_posts += int(last_picked is not None) - if successful_posts >= target_posts: - self.schedule_next_post() - self.print_header_stats(last_picked, self.dateSelection, self.dateNextSelection) - else: - self.print_header_stats(last_picked, self.dateNextSelection, self.dateNextSelection) - + picked = self.post() + self.print_header_stats(picked) if self.can_post(): - self.wait_future_time() + self.eventSleep.wait(sleep_seconds) # A callback function for get_list_of_hashes_with_profiles() that returns a single profile from @param hash_obj @@ -465,21 +404,6 @@ def get_list_of_hashes_with_profiles(f_name, profiles, profiles_default, callbac return r -# ------------------------------- TIME FUNCTIONS --------------------------------------------- -def time_add_seconds(dt, seconds): - return dt + datetime.timedelta(0, seconds) - - -def time_diff_seconds(d1, d2): - return (d1-d2).total_seconds() - - -def humanize_time_delta(total_seconds): - m, s = divmod(int(total_seconds), 60) - h, m = divmod(m, 60) - d, h = divmod(h, 24) - return d, h, m, s - # -------------------------------------------------------------------------------------------- def sync_to_disk(file_handle): file_handle.flush() @@ -519,6 +443,10 @@ class Debug(Exception): pass +class InvalidPost(Exception): + pass + + class BadPostSettings(Exception): pass