diff --git a/src/main.py b/src/main.py index 1384c37..d06edee 100755 --- a/src/main.py +++ b/src/main.py @@ -1,7 +1,7 @@ #! /usr/bin/env python3 -# Danbooru Bot, an image posting bot for Pleroma -# Copyright (C) 2022 Anon +# Mirai Nikki Bot a video frame posting bot for Pleroma +# Copyright (C) 2022 Anon # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -24,146 +24,10 @@ import datetime import contextlib -# A class that inherits YandereBot from the yandere_bot module -# This class is used to handle command line arguments gracefully, and extend functionality quickly -# Bot specific changes should be made here (if they are minor enough). -# This will be instantiated from the main() function -class YandereBot(yandere_bot.YandereBot): - # The below settings are required from the configuration module - settings_time = None - settings_reminder = None - - # Wait before running - delay_start = 0 - - def __init__(self, cfg, debug_mode, prime_bot, start_index, delay_d=None, delay_h=None, delay_s=None): - super(YandereBot, self).__init__(cfg, debug_mode, prime_bot=False) - self.load_settings(self.cfg, ("settings_time", "settings_reminder")) - self.set_pretimer(delay_d, delay_h, delay_s) - self.currentIndexCount = start_index - - if self.debug_mode: - print("[DEBUG MODE ON - DRY RUN BEGIN]") - - # Do not perform sanity test if stdout is None. - # input() will fail if stdout is None, which could happen with the following command - # ./run --dry-run -h -d 'a date in the past' - if sys.stdout is not None and not self.pass_sanity_test(): - raise FailedSanityTest - - if prime_bot: - self.prime_bot() - - def print_date_time_example(self): - print_fmt = " {0:6} {1:10} {2}" - time_fmt = self.settings_time["time_format"] - date_fmt = self.settings_time["date_format"] - current_time = self.dateSelection - - print(print_fmt.format( - "TIME", time_fmt, current_time.strftime(time_fmt) - )) - print(print_fmt.format( - "DATE", date_fmt, current_time.strftime(date_fmt) - )) - - def set_delay_d(self, d): - try: - t = datetime.datetime.strptime(d, self.settings_time["date_format"]) - self.dateNextSelection = self.dateNextSelection.replace( - year=t.year, month=t.month, day=t.day - ) - except Exception: - print("Invalid date format: {}\n\nCorrect date/time format examples:".format(d)) - self.print_date_time_example() - raise DateWrongFormat - - def set_delay_h(self, h, add_24): - try: - t = datetime.datetime.strptime(h, self.settings_time["time_format"]) - self.dateNextSelection = self.dateNextSelection.replace( - hour=t.hour, minute=t.minute, second=t.second, microsecond=t.microsecond - ) - if self.dateNextSelection < self.dateSelection and add_24: - self.dateNextSelection = yandere_bot.time_add_seconds(self.dateNextSelection, 60 * 60 * 24) - except Exception: - print("Invalid time format: {}\n\nCorrect date/time format examples:".format(h)) - self.print_date_time_example() - raise TimeWrongFormat - - def set_pretimer(self, d=None, h=None, s=0): - if d: - self.set_delay_d(d) - if h: - self.set_delay_h(h, d is None) - if s: - self.delay_start = max(0, s) - - # Check for potential misconfigurations by the user - def pass_sanity_test(self): - # Calculate pre-timer value - seconds_until_next_pos = yandere_bot.time_diff_seconds(self.dateNextSelection, self.dateSelection) - - # Possible misconfigurations that will prompt the user to continue - pretimer_less_than_zero = seconds_until_next_pos < 0 - pretimer_greater_than_sleep = seconds_until_next_pos > self.settings_behavior["sleep_seconds"] - - # Prompt the user - prompt_user = pretimer_less_than_zero or pretimer_greater_than_sleep - - # Remind the user to generate new OAuth tokens - dt = datetime.datetime.strptime(self.settings_reminder, self.settings_time["long_date_format"]) - if dt < datetime.datetime.now(): - print("REMINDER: Generate new tokens!!") - - # Check if the bot is back-posting in time and make sure this is what the user wanted to avoid spamming - if pretimer_less_than_zero: - sleep = round(abs(seconds_until_next_pos), 2) - images = round(sleep / (self.settings_behavior["sleep_seconds"] * self.settings_behavior["uploads_per_post"]), 2) + 1 - print("WARNING: Pre-timer is less than the current time by: {} seconds. {} images will post immediately".format( - sleep, images - )) - # Check if the bot will wait for longer than the default amount of sleep time configured in the cfg.py - elif pretimer_greater_than_sleep: - print("WARNING: Pre-timer will sleep for {} seconds. This is more than the configured amount ({} seconds)".format( - round(seconds_until_next_pos, 2), self.settings_behavior["sleep_seconds"] - )) - - # Prompt the user if something doesn't seem right - # This must be done before we set up our keyboard interrupts. Otherwise the below exceptions will not work. - try: - if prompt_user: - # Default to 'y' if the user just presses enter - ans = input("Do you want to continue [Y/n]? ") or "y" - return ans.lower() in ("y", "yes") - except (EOFError, KeyboardInterrupt): - print() - return False - - # Sanity test passed - return True - - def start(self, delay=None): - _delay = delay or self.delay_start - return super(YandereBot, self).start(max(0, _delay)) - - -# Custom exceptions -class TimeWrongFormat(Exception): - pass - - -class DateWrongFormat(Exception): - pass - - -class FailedSanityTest(Exception): - pass - - class FailedToLoadCfg(Exception): pass + # Entry point if run from the command line def main(): # Default config file @@ -173,21 +37,15 @@ def main(): parser = argparse.ArgumentParser( description="A bot for posting on Mastodon", # epilog="All switches can be combined for greater control", - add_help=False) + add_help=True) parser.add_argument("--dry-run", help="Will not login or post to Plemora", action="store_true") parser.add_argument("--debug", help="Same as --dry-run", action="store_true") - parser.add_argument("-w", "--wait", type=int, help="Wait before posting first image (seconds)", default=0) - parser.add_argument("-t", "--time", help="Wait for time before posting first image", default=None) - parser.add_argument("-d", "--date", help="Wait for date before posting first image", default=None) - parser.add_argument("-i", "--index", help="Start at index (only matters if profile select is set to sequential)", default=0) parser.add_argument("-c", "--config", help="Set custom config file (Default: {})".format(default_cfg), default=default_cfg) - parser.add_argument("-h", "--help", help="Show this help message and exit", action="store_true") + parser.add_argument("-k", "--keyfile", help="Keyfile used for decryption") + parser.add_argument("-i", "--index", help="Start at index (only matters if profile is set to sequential", default=0) parser.add_argument("remainder", help=argparse.SUPPRESS, nargs=argparse.REMAINDER) arguments = parser.parse_args() - # Redirect stdout when the bot first initializes if the bot is not going to run normally - redirect_stdout = None if arguments.help else sys.stdout - # Yandere Lewd Bot yandere = None yandere_config = None @@ -197,33 +55,19 @@ def main(): import importlib yandere_config = importlib.import_module(arguments.config) except ImportError: - print("Failed to Load Configuration:", arguments.config) - raise FailedToLoadCfg + raise FailedToLoadCfg("Invalid config file: {}".format(arguments.config)) # Flag if the bot is running in debug mode - debug_mode = (arguments.dry_run or arguments.debug or arguments.help) + debug_mode = arguments.dry_run or arguments.debug - with contextlib.redirect_stdout(redirect_stdout): - prime_bot = not arguments.help - - yandere = YandereBot( - yandere_config, - debug_mode, - prime_bot, - int(arguments.index), - arguments.date, - arguments.time, - arguments.wait - ) + yandere = yandere_bot.YandereBot( + yandere_config, + arguments.keyfile, + debug_mode, + ) + + yandere.currentIndexCount = int(arguments.index) - # Print Usage Information with Time and Date Formats with Examples - if arguments.help: - parser.print_help() - print() - yandere.print_date_time_example() - print() - return 0 - # Setup exit calls # Must be done after we declare our bot(s), otherwise this will be called if quitting on decrypting settings ) def yandere_quit(signo, _frame): @@ -244,18 +88,9 @@ if __name__ == "__main__": # Exceptions raised from the main function except FailedToLoadCfg: sys.exit(10) - except FailedSanityTest: - sys.exit(9) - except DateWrongFormat: - sys.exit(8) - except TimeWrongFormat: - sys.exit(7) - # Exceptions raised from the bot except yandere_bot.Debug: sys.exit(6) - except yandere_bot.MissingMasterList: - sys.exit(5) except yandere_bot.BadCfgFile: sys.exit(4) except yandere_bot.FailedLogin: diff --git a/src/yandere_bot.py b/src/yandere_bot.py index 58b042d..9fca3a3 100644 --- a/src/yandere_bot.py +++ b/src/yandere_bot.py @@ -46,26 +46,22 @@ class YandereBot: # Class variables mastodon_api = None failed_uploads = 0 + consecutive_failed_uploads = 0 currentSessionCount = 0 currentIndexCount = 0 debug_mode = False primed = False decrypted = False - # Time variables - dateSelection = None - dateNextSelection = None - # YandereBot.__init__() # @param cfg A dynamically loaded python module. See yanlib.module_load() for an example # @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma) # prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post) - def __init__(self, cfg, debug_mode=False, prime_bot=True): - self.dateSelection = datetime.datetime.now() - self.dateNextSelection = self.dateSelection + def __init__(self, cfg, keyfile=None, debug_mode=False, prime_bot=True): self.cfg = cfg self.load_settings(self.cfg) self.debug_mode = debug_mode or self.settings_behavior["debug"] + self.settings_encrypt["keyfile"] = keyfile or self.settings_encrypt["keyfile"] random.seed(os.urandom(16)) if prime_bot: self.prime_bot() @@ -93,13 +89,17 @@ class YandereBot: except AttributeError as e: print(e) raise BadCfgFile - + def decrypt_settings(self): if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode: import encryption - self.settings_server = encryption.settings_server_decrypt_cfg(self.settings_server, self.settings_encrypt) + try: + self.settings_server = encryption.settings_server_decrypt( + self.settings_server, self.settings_encrypt, self.settings_encrypt["keyfile"]) + except encryption.EncryptionFail as e: + raise BadCfgFile(str(e)) self.decrypted = True - + # Login to Pleroma def login(self): skip_login = self.debug_mode or self.mastodon_api is not None @@ -120,24 +120,16 @@ class YandereBot: raise FailedLogin # Maybe I should remove this from the backend? - def print_header_stats(self, picked, date_selection, date_next_selection): - picked_name = picked["profile"]["name"] if picked else None - picked_url = picked["file_url"] if picked else None - picked_path = picked["full_path"] if picked else None - picked_nsfw = picked["nsfw"] if picked else None - picked_index = (self.currentIndexCount - int(self.currentSessionCount > 0)) % len(self.settings_post) - next_selection_seconds = max(0, int(time_diff_seconds(date_next_selection, date_selection))) - - print("[Profile]", picked_name, "[Index]", picked_index) - print(picked_url) - print("Explicit:", picked_nsfw) - print("Selection time: {}".format( - date_selection.strftime(self.settings_time["long_date_format"])) ) - print("Next selection time: {} ({} seconds)".format( - date_next_selection.strftime(self.settings_time["long_date_format"]), next_selection_seconds) ) - print("[ {} Selected during session | {} Failed ]\n".format( - self.currentSessionCount, self.failed_uploads) ) + def print_header_stats(self, picked): + profile = picked["profile"]["name"] if picked else None + url = picked["file_url"] if picked else None + path = picked["full_path"] if picked else None + nsfw = picked["nsfw"] if picked else None + index = (self.currentIndexCount - int(self.currentSessionCount > 0)) % len(self.settings_post) + print("Profile: {} | Index: {} | NSFW: {} | Path: {} | URL: {}".format( + profile, index, nsfw, path, url + )) # Returns a list of media paths (without the hashes) def download_media(self, picked_profile): @@ -160,12 +152,28 @@ class YandereBot: # Returns a list of tuples that contain the media list path and media mastodon dictionary def upload_media_list(self, path_list): - if self.debug_mode: - return tuple() - media_list = ((ele, self.mastodon_api.media_post(ele, description=os.path.basename(ele))) for ele in path_list) + media_list = [] + # Validate picked + for path in path_list: + if not os.path.isfile(path): + raise FileNotFoundError("Could not upload: {}".format(path)) + elif not self.valid_mimetype(path): + raise InvalidMimeType("Invalid mime type") + elif not self.valid_file_size(path): + raise FileTooLarge("File is too large to upload") + + # Upload + for path in path_list: + if not self.debug_mode: + media = self.mastodon_api.media_post(path, description=os.path.basename(path)) + media_dict = { + "path": path, + "media": media + } + media_list.append(media_dict) return media_list - def get_post_text(self, picked, media_list): + def get_post_text(self, picked, media_list): content_type = self.settings_behavior["content_type"] content_newline = self.settings_behavior["content_newline"] nsfw = picked["nsfw"] @@ -190,10 +198,9 @@ class YandereBot: string_imglinks_joined = content_newline.join(filter(None, string_imglinks)) string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined))) - return content_type, string_post + return string_post - def valid_mimetype(self, picked): - full_path = picked["full_path"] + def valid_mimetype(self, full_path): mime = magic.from_file(full_path, mime=True) if mime is None: @@ -204,38 +211,26 @@ class YandereBot: return mime_category in ("image", "video") - def valid_file_size(self, picked): - full_path = picked["full_path"] + def valid_file_size(self, full_path): max_size = self.settings_behavior["max_size"] file_size = os.stat(full_path).st_size - return file_size <= max_size def _post(self, picked): - # Validate picked - if picked is None: - raise InvalidPost("Picked post is None") - - if not os.path.isfile(picked["full_path"]): - raise FileNotFoundError("File not found: {}".format(picked)) - - elif not self.valid_mimetype(picked): - raise InvalidMimeType("Invalid mime type") - - elif not self.valid_file_size(picked): - raise FileTooLarge("File is too large to upload") - - media_list = self.upload_media_list([picked["full_path"]]) - content_type, message = self.get_post_text(picked, media_list) + if not picked: + raise InvalidPost("Picked post is none") + upload_list = [picked["full_path"]] + media_list = self.upload_media_list(upload_list) + message = self.get_post_text(picked, media_list) if self.debug_mode: return picked self.mastodon_api.status_post( message, - media_ids=[i[1] for i in media_list if len(i) == 2], + media_ids=[i["media"] for i in media_list], visibility=self.settings_behavior["visibility"], sensitive=picked["nsfw"], - content_type=content_type + content_type=self.settings_behavior["content_type"] ) return picked @@ -273,6 +268,7 @@ class YandereBot: # After a successful post self.currentSessionCount += 1 + self.consecutive_failed_uploads = 0 self.currentIndexCount += 1 # The post was successful @@ -301,29 +297,16 @@ class YandereBot: # An exception occurred self.failed_uploads += 1 - print("[Errors: {}]".format(self.failed_uploads)) - - # Sleep - self.eventSleep.wait(self.settings_behavior["retry_seconds"]) + self.consecutive_failed_uploads += 1 # The post failed return None - def schedule_next_post(self): - self.dateSelection = self.dateNextSelection - self.dateNextSelection = time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"]) - - # Will wait between the current time and the time of next selection - def wait_future_time(self): - seconds = time_diff_seconds(self.dateNextSelection, datetime.datetime.now()) - self.eventSleep.wait(max(0, seconds)) - # [BEGIN THE PROGRAM] def prime_bot(self): if not self.debug_mode: self.decrypt_settings() - if self.can_post(): - self.login() + self.login() self.primed = True def start(self, delay=0): @@ -331,30 +314,6 @@ class YandereBot: if not self.primed: self.prime_bot() - # Early out if the bot is incapable of posting - if not self.can_post(): - print("Bot is incapable of posting!!") - return 1 - - start_time = self.dateSelection - delay_seconds = max(time_diff_seconds(self.dateNextSelection, start_time) + delay, delay) - delay_time = time_add_seconds(start_time, delay_seconds) - - # Print the first image in the list if a delay or pretimer is set - if delay_seconds: - self.print_header_stats(None, start_time, delay_time) - - # The delay parameter is different from the dateSelection and dateSelectionNext - # It will literally time out the bot for a given number of seconds regardless of the pre-timer setting - # This is useful if you want to set a delay of 30 seconds, before back-posting several images - self.eventSleep.wait(max(0, delay)) - - # Check if the pre-timer is set - # dateNextSelection should be greater than dateSelection if it is - # dateSelection and dateNextSelection are both set to the current time when the bot is initialized - if delay_seconds: - self.wait_future_time() - # Begin posting self.main_loop() @@ -367,35 +326,18 @@ class YandereBot: def can_post(self): return ( not self.eventSleep.is_set() and - self.settings_behavior["uploads_per_post"] > 0 and + self.currentSessionCount < self.settings_behavior["uploads_per_post"] and + self.consecutive_failed_uploads < self.settings_behavior["max_errors"] and self.currentIndexCount >= 0 ) def main_loop(self): - target_posts = self.settings_behavior["uploads_per_post"] + sleep_seconds = self.settings_behavior["retry_seconds"] while self.can_post(): - successful_posts = 0 - while (successful_posts < target_posts) and self.can_post(): - last_picked = self.post() - successful_posts += int(last_picked is not None) - if successful_posts >= target_posts: - self.schedule_next_post() - self.print_header_stats(last_picked, self.dateSelection, self.dateNextSelection) - else: - self.print_header_stats(last_picked, self.dateNextSelection, self.dateNextSelection) - + picked = self.post() + self.print_header_stats(picked) if self.can_post(): - self.wait_future_time() - - -# ------------------------------- TIME FUNCTIONS --------------------------------------------- -def time_add_seconds(dt, seconds): - return dt + datetime.timedelta(0, seconds) - - -def time_diff_seconds(d1, d2): - return (d1-d2).total_seconds() - + self.eventSleep.wait(sleep_seconds) # Custom Exceptions for YandereBot class Debug(Exception):