diff --git a/src/main.py b/src/main.py index c3968c8..cd71690 100755 --- a/src/main.py +++ b/src/main.py @@ -24,142 +24,6 @@ import datetime import contextlib -# A class that inherits from YandereBot from the yandere_bot module -# This class is used to handle command line arguments gracefully, and extend functionality quickly -# Bot specific changes should be made here (if they are minor enough). -# This will be instantiated from the main() function -class YandereBot(yandere_bot.YandereBot): - # The below settings are required from the configuration module - settings_time = None - settings_reminder = None - - # Wait before running - delay_start = 0 - - def __init__(self, cfg, debug_mode, prime_bot, delay_d=None, delay_h=None, delay_s=None): - super(YandereBot, self).__init__(cfg, debug_mode, prime_bot=False) - self.load_settings(self.cfg, ("settings_time", "settings_reminder")) - self.set_pretimer(delay_d, delay_h, delay_s) - - if self.debug_mode: - print("[DEBUG MODE ON - DRY RUN BEGIN]") - - # Do not perform sanity test if stdout is None. - # input() will fail if stdout is None, which could happen with the following command - # ./run --dry-run -h -d 'a date in the past' - if sys.stdout is not None and not self.pass_sanity_test(): - raise FailedSanityTest - - if prime_bot: - self.prime_bot() - - def print_date_time_example(self): - print_fmt = " {0:6} {1:10} {2}" - time_fmt = self.settings_time["time_format"] - date_fmt = self.settings_time["date_format"] - current_time = self.dateSelection - - print(print_fmt.format( - "TIME", time_fmt, current_time.strftime(time_fmt) - )) - print(print_fmt.format( - "DATE", date_fmt, current_time.strftime(date_fmt) - )) - - def set_delay_d(self, d): - try: - t = datetime.datetime.strptime(d, self.settings_time["date_format"]) - self.dateNextSelection = self.dateNextSelection.replace( - year=t.year, month=t.month, day=t.day - ) - except Exception: - print("Invalid date format: {}\n\nCorrect date/time format examples:".format(d)) - self.print_date_time_example() - raise DateWrongFormat - - def set_delay_h(self, h, add_24): - try: - t = datetime.datetime.strptime(h, self.settings_time["time_format"]) - self.dateNextSelection = self.dateNextSelection.replace( - hour=t.hour, minute=t.minute, second=t.second, microsecond=t.microsecond - ) - if self.dateNextSelection < self.dateSelection and add_24: - self.dateNextSelection = yandere_bot.time_add_seconds(self.dateNextSelection, 60 * 60 * 24) - except Exception: - print("Invalid time format: {}\n\nCorrect date/time format examples:".format(h)) - self.print_date_time_example() - raise TimeWrongFormat - - def set_pretimer(self, d=None, h=None, s=0): - if d: - self.set_delay_d(d) - if h: - self.set_delay_h(h, d is None) - if s: - self.delay_start = max(0, s) - - # Check for potential misconfigurations by the user - def pass_sanity_test(self): - # Calculate pre-timer value - seconds_until_next_pos = yandere_bot.time_diff_seconds(self.dateNextSelection, self.dateSelection) - - # Possible misconfigurations that will prompt the user to continue - pretimer_less_than_zero = seconds_until_next_pos < 0 - pretimer_greater_than_sleep = seconds_until_next_pos > self.settings_behavior["sleep_seconds"] - - # Prompt the user - prompt_user = pretimer_less_than_zero or pretimer_greater_than_sleep - - # Remind the user to generate new OAuth tokens - dt = datetime.datetime.strptime(self.settings_reminder, self.settings_time["long_date_format"]) - if dt < datetime.datetime.now(): - print("REMINDER: Generate new tokens!!") - - # Check if the bot is back-posting in time and make sure this is what the user wanted to avoid spamming - if pretimer_less_than_zero: - sleep = round(abs(seconds_until_next_pos), 2) - images = round(sleep / (self.settings_behavior["sleep_seconds"] * self.settings_behavior["uploads_per_post"]), 2) + 1 - print("WARNING: Pre-timer is less than the current time by: {} seconds. {} images will post immediately".format( - sleep, images - )) - # Check if the bot will wait for longer than the default amount of sleep time configured in the cfg.py - elif pretimer_greater_than_sleep: - print("WARNING: Pre-timer will sleep for {} seconds. This is more than the configured amount ({} seconds)".format( - round(seconds_until_next_pos, 2), self.settings_behavior["sleep_seconds"] - )) - - # Prompt the user if something doesn't seem right - # This must be done before we set up our keyboard interrupts. Otherwise the below exceptions will not work. - try: - if prompt_user: - # Default to 'y' if the user just presses enter - ans = input("Do you want to continue [Y/n]? ") or "y" - return ans.lower() in ("y", "yes") - except (EOFError, KeyboardInterrupt): - print() - return False - - # Sanity test passed - return True - - def start(self, delay=None): - _delay = delay or self.delay_start - return super(YandereBot, self).start(max(0, _delay)) - - -# Custom exceptions -class TimeWrongFormat(Exception): - pass - - -class DateWrongFormat(Exception): - pass - - -class FailedSanityTest(Exception): - pass - - class FailedToLoadCfg(Exception): pass @@ -173,20 +37,14 @@ def main(): parser = argparse.ArgumentParser( description="A bot for posting on Mastodon", # epilog="All switches can be combined for greater control", - add_help=False) + add_help=True) parser.add_argument("--dry-run", help="Will not login or post to Plemora", action="store_true") parser.add_argument("--debug", help="Same as --dry-run", action="store_true") - parser.add_argument("-w", "--wait", type=int, help="Wait before posting first image (seconds)", default=0) - parser.add_argument("-t", "--time", help="Wait for time before posting first image", default=None) - parser.add_argument("-d", "--date", help="Wait for date before posting first image", default=None) parser.add_argument("-c", "--config", help="Set custom config file (Default: {})".format(default_cfg), default=default_cfg) - parser.add_argument("-h", "--help", help="Show this help message and exit", action="store_true") + parser.add_argument("-k", "--keyfile", help="Keyfile used for decryption", default=None) parser.add_argument("remainder", help=argparse.SUPPRESS, nargs=argparse.REMAINDER) arguments = parser.parse_args() - # Redirect stdout when the bot first initializes if the bot is not going to run normally - redirect_stdout = None if arguments.help else sys.stdout - # Yandere Lewd Bot yandere = None yandere_config = None @@ -196,32 +54,17 @@ def main(): import importlib yandere_config = importlib.import_module(arguments.config) except ImportError: - print("Failed to Load Configuration:", arguments.config) - raise FailedToLoadCfg + raise FailedToLoadCfg("Invalid config file: {}".format(arguments.config)) # Flag if the bot is running in debug mode - debug_mode = (arguments.dry_run or arguments.debug or arguments.help) + debug_mode = arguments.dry_run or arguments.debug - with contextlib.redirect_stdout(redirect_stdout): - prime_bot = not arguments.help - - yandere = YandereBot( - yandere_config, - debug_mode, - prime_bot, - arguments.date, - arguments.time, - arguments.wait - ) + yandere = yandere_bot.YandereBot( + yandere_config, + arguments.keyfile, + debug_mode + ) - # Print Usage Information with Time and Date Formats with Examples - if arguments.help: - parser.print_help() - print() - yandere.print_date_time_example() - print() - return 0 - # Setup exit calls # Must be done after we declare our bot(s), otherwise this will be called if quitting on decrypting settings ) def yandere_quit(signo, _frame): @@ -242,13 +85,6 @@ if __name__ == "__main__": # Exceptions raised from the main function except FailedToLoadCfg: sys.exit(10) - except FailedSanityTest: - sys.exit(9) - except DateWrongFormat: - sys.exit(8) - except TimeWrongFormat: - sys.exit(7) - # Exceptions raised from the bot except yandere_bot.Debug: sys.exit(6) diff --git a/src/yandere_bot.py b/src/yandere_bot.py index 3bcbf37..37a0a1c 100644 --- a/src/yandere_bot.py +++ b/src/yandere_bot.py @@ -28,7 +28,7 @@ from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, M # A class that contains all of the rendering information for a single post -class YandereFrame: +class PickAndRenderFrame: # From Config File profile_name = "" path = "" @@ -36,16 +36,17 @@ class YandereFrame: skip = tuple() nsfw = tuple() output_name = "" - output_name_tr = "" - translation = {} message = "" message_nsfw = "" render_script = "" - # Picked + # After pick picked_frame = None + output_name_tr = "" - def __init__(self, picked): + def __init__(self, picked, datetime_format): + dt_now = datetime.datetime.now() + dt_now_str = datetime.datetime.strftime(dt_now, datetime_format) self.profile_name = picked["profile_name"] self.path = picked["path"] self.frames = picked["frames"] @@ -56,24 +57,33 @@ class YandereFrame: self.message = picked["message"] self.message_nsfw = picked["message_nsfw"] self.render_script = picked["render_script"] - self.add_translation("profile_name", self.profile_name) - def add_translation(self, k, v): - self.translation.update({k: v}) + self.picked_frame = self._pick_frame() - def pick(self): - random.seed(os.urandom(16)) + # Shell-like substitutions + translations = { + "profile_name": self.profile_name, + "frame": str(self.picked_frame), + "datetime": dt_now_str + } + + self.output_name_tr = self._translate_basename(translations) + self._render_frame() + + def __del__(self): + render_file = self.output_name_tr + if render_file and os.path.isfile(render_file): + os.remove(render_file) + + def _pick_frame(self): picked_frame = None while picked_frame is None: picked_frame = random.random() * self.frames for skip in self.skip: begin, end = skip if begin <= picked_frame <= end: - print("Cannot pick frame:", picked_frame, "from:", self.profile_name) picked_frame = None break - self.picked_frame = picked_frame - self.add_translation("frame", str(picked_frame)) return picked_frame def is_nsfw(self): @@ -83,29 +93,25 @@ class YandereFrame: return True return False - def translate_basename(self): - output_name = self.output_name - output_name_tr = output_name - for k,v in self.translation.items(): + def _translate_basename(self, translations): + output_name_tr = self.output_name + for k,v in translations.items(): replace_token = "${" + k + "}" output_name_tr = output_name_tr.replace(replace_token, v) - self.output_path_tr = output_name_tr return output_name_tr # TODO: Add translation keywords to the message def get_message(self): return self.message_nsfw if self.is_nsfw() else self.message - def render(self): - self.translate_basename() + def _render_frame(self): args = [ self.render_script, self.path, - self.output_path_tr, - self.translation["frame"] + self.output_name_tr, + str(self.picked_frame) ] subprocess.run(args) - return self.output_path_tr class YandereBot: @@ -123,27 +129,22 @@ class YandereBot: # Class variables mastodon_api = None - listPictures = [] failed_uploads = 0 + consecutive_failed_uploads = 0 currentSessionCount = 0 debug_mode = False primed = False decrypted = False - # Time variables - dateSelection = None - dateNextSelection = None - # YandereBot.__init__() # @param cfg A dynamically loaded python module # @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma) # prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post) - def __init__(self, cfg, debug_mode=False, prime_bot=True): - self.dateSelection = datetime.datetime.now() - self.dateNextSelection = self.dateSelection + def __init__(self, cfg, keyfile=None, debug_mode=False, prime_bot=True): self.cfg = cfg self.load_settings(self.cfg) self.debug_mode = debug_mode or self.settings_behavior["debug"] + self.settings_encrypt["keyfile"] = keyfile or self.settings_encrypt["keyfile"] if prime_bot: self.prime_bot() @@ -172,7 +173,11 @@ class YandereBot: def decrypt_settings(self): if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode: import encryption - self.settings_server = encryption.settings_server_decrypt_cfg(self.settings_server, self.settings_encrypt) + try: + self.settings_server = encryption.settings_server_decrypt( + self.settings_server, self.settings_encrypt, self.settings_encrypt["keyfile"]) + except encryption.EncryptionFail as e: + raise BadCfgFile(str(e)) self.decrypted = True # Login to Pleroma @@ -195,33 +200,30 @@ class YandereBot: raise FailedLogin # Maybe I should remove this from the backend? - def print_header_stats(self, yandere_frame, date_selection, date_next_selection): + def print_header_stats(self, picked): profile, frame, nsfw, path = None, None, None, None - if yandere_frame: - profile = yandere_frame.profile_name - frame = yandere_frame.picked_frame - nsfw = yandere_frame.is_nsfw() - path = yandere_frame.output_path_tr - print("\n==== PLEROMA ====") - print("Profile: {} | Frame: {} | NSFW: {}".format( - profile, frame, nsfw + if picked: + profile = picked.profile_name + frame = picked.picked_frame + nsfw = picked.is_nsfw() + path = picked.output_name_tr + print("Profile: {} | Frame: {} | NSFW: {} | Path: {}".format( + profile, frame, nsfw, path )) - print("Path:", path) - next_selection_seconds = max(0, int(time_diff_seconds(date_next_selection, date_selection))) - print("Selection time: {}".format( - date_selection.strftime(self.settings_time["long_date_format"])) ) - print("Next selection time: {} ({} seconds)".format( - date_next_selection.strftime(self.settings_time["long_date_format"]), next_selection_seconds) ) - print("[ {} Selected during session | {} Failed ]\n".format( - self.currentSessionCount, self.failed_uploads) ) # Returns a list of tuples that contain the media list path and media mastodon dictionary def upload_media_list(self, path_list): media_list = [] - for ele in path_list: + for path in path_list: + if not os.path.isfile(path): + raise FileNotFoundError("Could not upload: {}".format(path)) if not self.debug_mode: - media = self.mastodon_api.media_post(ele, description=os.path.basename(ele)) - media_list.append((ele, media)) + media = self.mastodon_api.media_post(path, description=os.path.basename(path)) + media_dict = { + "path": path, + "media": media + } + media_list.append(media_dict) return media_list def get_post_text(self, yandere_frame, media_list): @@ -232,8 +234,9 @@ class YandereBot: string_imglinks = [] if media_list and self.settings_behavior["post_image_link"]: - for ele in media_list: - path, media = ele + for media_dict in media_list: + path = media_dict["path"] + media = media_dict["media"] if path is None or media is None: continue elif content_type == "text/markdown" and not self.debug_mode: @@ -246,31 +249,25 @@ class YandereBot: # Join non empty strings with a newline character string_imglinks_joined = content_newline.join(filter(None, string_imglinks)) string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined))) - return content_type, string_post + return string_post def _post(self, yandere_frame): - media_list = self.upload_media_list([yandere_frame.output_path_tr]) - content_type, message = self.get_post_text(yandere_frame, media_list) + if not yandere_frame: + raise InvalidPost("Frame is none") + upload_list = [yandere_frame.output_name_tr] + media_list = self.upload_media_list(upload_list) + message = self.get_post_text(yandere_frame, media_list) if self.debug_mode: return yandere_frame self.mastodon_api.status_post( message, - media_ids=[i[1] for i in media_list if len(i) == 2], + media_ids=[i["media"] for i in media_list], visibility=self.settings_behavior["visibility"], sensitive=yandere_frame.is_nsfw(), - content_type=content_type + content_type=self.settings_behavior["content_type"] ) return yandere_frame - def render_frame(self, picked_profile): - yandere_frame = YandereFrame(picked_profile) - yandere_frame.add_translation("datetime", datetime.datetime.strftime(datetime.datetime.now(), self.cfg.settings_time["datetime"])) - yandere_frame.pick() - yandere_frame.render() - if not os.path.isfile(yandere_frame.output_path_tr): - raise FileNotFoundError("Could not generate screenshot:", yandere_frame.output_path_tr) - return yandere_frame - # The main post function # This function is responsible for incrementing self.currentSessionCount, as well as posting and blacklisting the # picked item. @@ -282,73 +279,39 @@ class YandereBot: def post(self): picked = None - # Flags that are set if an upload fails - timeout = False - # Attempt post try: # Post + dt_picked = self.settings_time["datetime"] picked_profile = random.choice(self.cfg.settings_post) - picked = self.render_frame(picked_profile) + picked = PickAndRenderFrame(picked_profile, dt_picked) self._post(picked) - if os.path.isfile(picked.output_path_tr): - os.remove(picked.output_path_tr) - else: - raise FileNotFoundError("Unable to render {}".format(picked.output_path_tr)) # After a successful post self.currentSessionCount += 1 + self.consecutive_failed_uploads = 0 # The post was successful return picked - # Attempted to post a file that doesn't exist (immediately repost ignoring retry_seconds) - except FileNotFoundError as e: - print(e) - # Exception flags - timeout = False - - # Check if the file limit has been reached - except MastodonAPIError as e: - # Check if the file limit has been reached (413 error) - file_limit_reached = False - with contextlib.suppress(IndexError): - file_limit_reached = (e.args[1] == 413) - - print("API Error:", e) - # Exception flags - timeout = True - # Server Errors - # Assume all exceptions are on the server side + # Assume all exceptions are on the server side (except for FileNotFoundError of course # If the connection is timing out it could be for two reasons: # 1. The error was caused by the user attempting to upload a large file over a slow connection: # a. FIX: Reduce settings_behavior["max_size"] # 2. The server is down. Check to verify in a web browser (this is the default assumption since the # mastodon.py API will not specify why the connection timed out). # The default assumption is #2 - except Exception as e: - print("Unhandled Exception:", e) - # Exception flags - timeout = True + except (FileNotFoundError, MastodonAPIError, InvalidPost, Exception) as e: + print("Exception:", e) # An exception occurred self.failed_uploads += 1 - if timeout: - self.eventSleep.wait(self.settings_behavior["retry_seconds"]) + self.consecutive_failed_uploads += 1 # The post failed return None - def schedule_next_post(self): - self.dateSelection = self.dateNextSelection - self.dateNextSelection = time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"]) - - # Will wait between the current time and the time of next selection - def wait_future_time(self): - seconds = time_diff_seconds(self.dateNextSelection, datetime.datetime.now()) - self.eventSleep.wait(max(0, seconds)) - # [BEGIN THE PROGRAM] def prime_bot(self): random.seed(os.urandom(16)) @@ -362,30 +325,6 @@ class YandereBot: if not self.primed: self.prime_bot() - # Early out if the bot is incapable of posting - if not self.can_post(): - print("Bot is incapable of posting!!") - return 1 - - start_time = self.dateSelection - delay_seconds = max(time_diff_seconds(self.dateNextSelection, start_time) + delay, delay) - delay_time = time_add_seconds(start_time, delay_seconds) - - # Print the first image in the list if a delay or pretimer is set - if delay_seconds: - self.print_header_stats(None, start_time, delay_time) - - # The delay parameter is different from the dateSelection and dateSelectionNext - # It will literally time out the bot for a given number of seconds regardless of the pre-timer setting - # This is useful if you want to set a delay of 30 seconds, before back-posting several images - self.eventSleep.wait(max(0, delay)) - - # Check if the pre-timer is set - # dateNextSelection should be greater than dateSelection if it is - # dateSelection and dateNextSelection are both set to the current time when the bot is initialized - if delay_seconds: - self.wait_future_time() - # Begin posting self.main_loop() @@ -396,32 +335,19 @@ class YandereBot: # 1. User presses Ctrl+C # 2. settings_behavior["uploads_per_post"] is less than one for some reason def can_post(self): - return not self.eventSleep.is_set() and self.settings_behavior["uploads_per_post"] > 0 + return ( + not self.eventSleep.is_set() and + self.currentSessionCount < self.settings_behavior["uploads_per_post"] and + self.consecutive_failed_uploads < self.settings_behavior["max_errors"] + ) def main_loop(self): - target_posts = self.settings_behavior["uploads_per_post"] + sleep_seconds = self.settings_behavior["retry_seconds"] while self.can_post(): - successful_posts = 0 - while (successful_posts < target_posts) and self.can_post(): - last_picked = self.post() - successful_posts += int(last_picked is not None) - if successful_posts >= target_posts: - self.schedule_next_post() - self.print_header_stats(last_picked, self.dateSelection, self.dateNextSelection) - else: - self.print_header_stats(last_picked, self.dateNextSelection, self.dateNextSelection) - - if self.can_post(): - self.wait_future_time() - -# ------------------------------- TIME FUNCTIONS --------------------------------------------- -def time_add_seconds(dt, seconds): - return dt + datetime.timedelta(0, seconds) - - -def time_diff_seconds(d1, d2): - return (d1-d2).total_seconds() - + picked = self.post() + self.print_header_stats(picked) + if self.can_post(): + self.eventSleep.wait(sleep_seconds) # Custom Exceptions for YandereBot class Debug(Exception): @@ -434,3 +360,7 @@ class FailedLogin(Exception): class BadCfgFile(Exception): pass + + +class InvalidPost(Exception): + pass