From 43787c4ebdcb18843813ec116357a43ba27385d0 Mon Sep 17 00:00:00 2001 From: Anon Date: Sat, 11 Mar 2023 16:18:01 -0800 Subject: [PATCH] Refactor to make modular --- src/encryption.py | 217 ------------------------ src/yandere_bot.py | 412 --------------------------------------------- 2 files changed, 629 deletions(-) delete mode 100755 src/encryption.py delete mode 100644 src/yandere_bot.py diff --git a/src/encryption.py b/src/encryption.py deleted file mode 100755 index a9b07c0..0000000 --- a/src/encryption.py +++ /dev/null @@ -1,217 +0,0 @@ -#! /usr/bin/env python3 - -# Yandere Lewd Bot, an image posting bot for Pleroma -# Copyright (C) 2022 Anon -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import sys -import base64 -import os -import getpass -from cryptography.fernet import Fernet, InvalidToken -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC - -from collections import OrderedDict - - -class EncryptionFail(Exception): - pass - -class PasswordMismatch(Exception): - pass - - -# Return string from bytes -def salt_encode(b): - return base64.urlsafe_b64encode(b).decode() - - -# Return bytes from string -def salt_decode(s): - return base64.urlsafe_b64decode(s.encode()) - - -# Ordered Dictionaries -def change_encoding_dict(settings_server, encoding_type): - return OrderedDict([(k, encoding_type(v)) for k, v in settings_server.items()]) - - -# Return bytes from string -def encode_dict(settings_server): - return change_encoding_dict(settings_server, str.encode) - - -# Return string from bytes -def decode_dict(settings_server): - return change_encoding_dict(settings_server, bytes.decode) - - -# password: Bytes -# salt: Bytes -def derive_key(password, salt): - kdf = PBKDF2HMAC( - algorithm=hashes.SHA256(), - length=32, - salt=salt, - iterations=100000, - backend=default_backend() - ) - return base64.urlsafe_b64encode(kdf.derive(password)) - - -# Encryption functions -# message: Bytes -# key: Bytes -def encrypt(message, key): - f = Fernet(key) - token = f.encrypt(message) - return token - - -# token: Bytes -# key: Bytes -def decrypt(token, key): - f = Fernet(key) - message = f.decrypt(token) - return message - - -# password: bytes() -# salt: bytes() -# settings_server: dict() -> Byte values -# encryption_function: encrypt(message, key) : decrypt(token, key): -# Returns settings_server_decrypted dictionary with Byte() values. Will need to use -# ChangeEncodingDict to make them strings (recommended cfg file friendly) -def encrypt_settings(settings_server, password, salt, encryption_function): - key = derive_key(password, salt) - settings_server_decrypted = OrderedDict() - for setting in settings_server: - settings_server_decrypted[setting] = encryption_function(settings_server[setting], key) - return settings_server_decrypted - -def get_keyfile(keyfile=None): - if keyfile is not None: - with open(keyfile, "rb") as f: - return f.read() - return None - - -def get_pass(q): - try: - return getpass.getpass(q).encode() - except KeyboardInterrupt: - raise EncryptionFail("\nQuitting...") - - -def settings_server_encrypt(settings_server, keyfile=None): - try: - settings_server = encode_dict(settings_server) - salt = os.urandom(16) - password = get_keyfile(keyfile) - if password is None: - password = get_pass("Enter password: ") - password2 = get_pass("Enter password: ") - if password != password2: - raise PasswordMismatch("Passwords do not match") - encrypted = encrypt_settings(settings_server, password, salt, encrypt) - return salt_encode(salt), decode_dict(encrypted) - except PasswordMismatch as e: - raise EncryptionFail(str(e)) - except Exception as e: - err = str(e) - raise EncryptionFail("Encrypt Error: {}".format(err)) - - -def settings_server_decrypt(settings_server, settings_encrypt, keyfile=None): - try: - if not settings_encrypt["encrypt"]: - return settings_server - settings_server = encode_dict(settings_server) - password = get_keyfile(keyfile or settings_encrypt["keyfile"]) or get_pass("Enter password: ") - salt = salt_decode(settings_encrypt["salt"]) - decrypted = encrypt_settings(settings_server, password, salt, decrypt) - return decode_dict(decrypted) - except base64.binascii.Error: - raise EncryptionFail("Salt is invalid") - except InvalidToken: - raise EncryptionFail("Password or token is incorrect") - except Exception as e: - err = str(e) - raise EncryptionFail("Decrypt Error: {}".format(err)) - - -def main(): - import argparse - from pprint import pformat - - default_cfg = "cfg" - - parser = argparse.ArgumentParser( - description="A class to encrypt server credentials", - epilog="There are no additional parameters.", - add_help=True ) - parser.add_argument("--encrypt", help="Generate encrypted authentication.", action="store_true") - parser.add_argument("--decrypt", help="Decrypt encrypted authentication", action="store_true") - parser.add_argument("--recrypt", help="Recrypt encrypted authentication", action="store_true") - parser.add_argument("-k", "--keyfile", help="Keyfile used for decryption", default=None) - parser.add_argument("-c", "--cfg", help="Specify config file.", default=default_cfg) - - arguments = parser.parse_args() - - if arguments.recrypt: - arguments.encrypt = True - arguments.decrypt = True - - if arguments.encrypt or arguments.decrypt: - import importlib - cfg = importlib.import_module(arguments.cfg) - settings_server = cfg.settings_server - settings_encrypt = cfg.settings_encrypt - keyfile = arguments.keyfile or settings_encrypt["keyfile"] - - if arguments.decrypt and arguments.encrypt: - print("Re-encrypting") - - if arguments.decrypt: # arguments.decrypt - print("Decrypt...") - settings_server = settings_server_decrypt(settings_server, settings_encrypt, arguments.keyfile) - settings_encrypt = OrderedDict([ - ("encrypt", False), - ("salt", settings_encrypt["salt"]), - ("keyfile", arguments.keyfile) - ]) - - if arguments.encrypt: - print("Encrypt...") - salt, settings_server = settings_server_encrypt(settings_server, keyfile) - settings_encrypt = OrderedDict([ - ("encrypt", True), - ("salt", salt), - ("keyfile", arguments.keyfile) - ]) - - print("settings_server = {}".format(pformat(settings_server))) - print("settings_encrypt = {}".format(pformat(settings_encrypt))) - return 0 - - -if __name__ == "__main__": - try: - sys.exit(main()) - except EncryptionFail as e: - print(e) - sys.exit(1) diff --git a/src/yandere_bot.py b/src/yandere_bot.py deleted file mode 100644 index ebfa4cb..0000000 --- a/src/yandere_bot.py +++ /dev/null @@ -1,412 +0,0 @@ -#! /usr/bin/env python3 - -# Yandere Lewd Bot, an image posting bot for Pleroma -# Copyright (C) 2022 Anon -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import yanlib - -import os -import contextlib -import fnmatch -from functools import reduce -from threading import Event -from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError - - -class YanBotHash(yanlib.HashObject): - _postSettings = None - - def __init__(self, hash_obj, profile): - super(YanBotHash, self).__init__() - if hash_obj is None: - return - self._sHash = hash_obj.get_hash_string() - self._sBinaryChar = hash_obj.get_binary_char() - self._sPath = hash_obj.get_hash_path() - self._postSettings = profile - - def get_post_setting(self): - return self._postSettings - - -class YandereBot: - # From the threading library. Is responsible for putting the bot to sleep, and exiting when the user quits (Ctrl+C) - eventSleep = Event() - - # The configuration module - cfg = None - - # The below settings are required from the configuration module - settings_server = None - settings_behavior = None - settings_time = None - settings_post = None - settings_post_default = None - settings_encrypt = None - - # Class variables - mastodon_api = None - listPictures = [] - lenBlacklist = 0 - failed_uploads = 0 - consecutive_failed_uploads = 0 - currentSessionCount = 0 - debug_mode = False - primed = False - decrypted = False - - # YandereBot.__init__() - # @param cfg A dynamically loaded python module. See yanlib.module_load() for an example - # @param keyfile Keyfile to decrypt settings_post - # @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma) - def __init__(self, cfg, keyfile=None, debug_mode=False): - self.cfg = cfg - self.load_settings(self.cfg) - self.debug_mode = debug_mode or self.settings_behavior["debug"] - self.settings_encrypt["keyfile"] = keyfile or self.settings_encrypt["keyfile"] - - # Setup Exit Calls - def exit(self): - self.eventSleep.set() - - # Make sure there are no profiles in listPictures set to none. Print the bad post and exit if there is. - def validate_post_settings(self): - bad_post_count = 0 - for i in self.listPictures: - if i.get_post_setting() is None: - print("Bad post setting [{}]: {}".format(bad_post_count, i.get_full_string())) - bad_post_count += 1 - if bad_post_count: - raise BadPostSettings - - # Decryption settings - # Used to set class attributes with the same name to the value specified in the config file. - def load_settings(self, cfg, settings=None): - try: - default_settings = ( - "settings_server", - "settings_behavior", - "settings_time", - "settings_post", - "settings_post_default", - "settings_encrypt" - ) - _settings = settings or default_settings - for ele in _settings: - setattr(self, ele, getattr(cfg, ele)) - - except AttributeError as e: - print(e) - raise BadCfgFile - - def decrypt_settings(self): - if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode: - import encryption - try: - self.settings_server = encryption.settings_server_decrypt( - self.settings_server, self.settings_encrypt, self.settings_encrypt["keyfile"]) - except encryption.EncryptionFail as e: - raise BadCfgFile(str(e)) - self.decrypted = True - - # Login to Pleroma - def login(self): - skip_login = self.debug_mode or self.mastodon_api is not None - if skip_login: - return - if not self.decrypted: - self.decrypt_settings() - try: - self.mastodon_api = Mastodon( - client_id=self.settings_server["client_id"], - client_secret=self.settings_server["client_secret"], - access_token=self.settings_server["access_token"], - api_base_url=self.settings_server["api_base_url"], - feature_set=self.settings_behavior["feature_set"] # <--- Necessary for Mastodon Version 1.5.1 - ) - except (MastodonIllegalArgumentError, MastodonVersionError) as e: - print(e) - raise FailedLogin - - # Set up lists - def read_blacklist_files(self): - list_blacklist = [] - for i in self.settings_behavior["master_blacklist_r"]: - # It doesn't matter if the picture file doesn't exist - with contextlib.suppress(IOError): - list_blacklist.extend(yanlib.get_hash_list(i)) - return list_blacklist - - def blacklist(self, picked): - self.lenBlacklist += 1 - for path in self.settings_behavior["master_blacklist_w"]: - with open(path, "a") as f: - print(picked.get_full_string(), file=f) - - # load_pictures will return a list of YanHashObj() with a blacklist(s) applied - # @param list_blacklist A list of HashObjects() that are blacklist hashes - def load_pictures(self, list_blacklist): - if not self.settings_behavior["master_list"]: - raise MissingMasterList - - try: - list_pictures = reduce(lambda x, y: x + y, - [get_list_of_hashes_with_profiles( - f, - self.settings_post, - self.settings_post_default) - for f in self.settings_behavior["master_list"] - ]) - return yanlib.get_hash_list_blacklist(list_pictures, list_blacklist, self.settings_behavior["max_size"]) - except IOError as e: - print(e) - raise MissingMasterList - - def load_picture_list(self): - list_blacklist = self.read_blacklist_files() - self.listPictures = self.load_pictures(list_blacklist) - self.lenBlacklist = len(list_blacklist) - - - # Maybe I should remove this from the backend? - def print_header_stats(self, picked): - _picked = picked.get_full_string() if picked else None - picked_profile = picked.get_post_setting()["name"] if picked else None - picked_next = self.listPictures[0].get_full_string() if self.listPictures else None - picked_next_profile = self.listPictures[0].get_post_setting()["name"] if self.listPictures else None - - print("Profile: {} | Picked: {} | Next_Profile: {} | Next_Pick: {}".format( - picked_profile, _picked, picked_next_profile, picked_next - )) - - # Returns a list of media paths (without the hashes) - def get_media_list(self, picked): - ext = self.settings_behavior["multi_media_ext"] - if not picked: - return None - elif ext and os.path.splitext(picked.get_hash_path())[1].lower() == ext.lower(): - return [i.get_hash_path() for i in yanlib.get_hash_list(picked.get_hash_path())] - else: - return [picked.get_hash_path()] - - # Returns a list of tuples that contain the media list path and media mastodon dictionary - def upload_media_list(self, path_list): - media_list = [] - for path in path_list: - if not os.path.isfile(path): - raise FileNotFoundError("Could not upload: {}".format(path)) - - for path in path_list: - if not self.debug_mode: - media = self.mastodon_api.media_post(path, description=os.path.basename(path)) - media_dict = { - "path": path, - "media": media - } - media_list.append(media_dict) - return media_list - - def get_post_text(self, picked, media_list): - content_type = self.settings_behavior["content_type"] - content_newline = self.settings_behavior["content_newline"] - static_message = content_newline.join(picked.get_post_setting()["message"]) - string_post = "" - string_imglinks = [] - - if media_list and self.settings_behavior["post_image_link"]: - for media_dict in media_list: - path = media_dict["path"] - media = media_dict["media"] - - if path is None or media is None: - continue - elif content_type == "text/markdown" and not self.debug_mode: - string_imglinks.append( - "[{}]({})".format(os.path.basename(path), media["url"]) - ) - else: - string_imglinks.append(media["url"]) - - # Join non empty strings with a newline character - string_imglinks_joined = content_newline.join(filter(None, string_imglinks)) - string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined))) - - return string_post - - def _post(self, picked): - images = self.get_media_list(picked) - if not images: - raise InvalidPost("Media list is empty") - media_list = self.upload_media_list(images) - message = self.get_post_text(picked, media_list) - if self.debug_mode: - return picked - self.mastodon_api.status_post( - message, - media_ids=[i["media"] for i in media_list], - visibility=self.settings_behavior["visibility"], - sensitive=picked.get_post_setting()["spoiler"], - content_type=self.settings_behavior["content_type"] - ) - return picked - - # The main post function - # This funciton is responsible for picking, posting, and blacklisting an image in listPictures - # - # It is also responsible for removing the picked item from self.listPictures, which can be accomplished by simply - # popping it at index 0. This should handle any error that might occur while posting. - # - # This function should return 'None' if a post failed, and the picked item from self.listPictures if it succeeded. - def post(self): - picked = None - - # Flags that are set if an upload fails - reinsert_image = False - - # Attempt post - try: - # Post - picked = self.listPictures.pop(0) - self._post(picked) - - # After a successful post - self.currentSessionCount += 1 - self.consecutive_failed_uploads = 0 - self.blacklist(picked) - - # The post was successful - return picked - - # Attempted to post a file that doesn't exist (immediately repost ignoring retry_seconds) - except (FileNotFoundError, InvalidPost): - print("File not found:", picked.get_hash_path()) - reinsert_image = False - - # Check if the file limit has been reached - except MastodonAPIError as e: - print("API Error:", e) - # Check if the file limit has been reached (413 error) - with contextlib.suppress(IndexError): - reinsert_image = e.args[1] != 413 - - # Server Errors and other general exceptions - # Assume all exceptions are on the server side (besides FileNotFoundError of course) - # If the connection is timing out it could be for two reasons: - # 1. The error was caused by the user attempting to upload a large file over a slow connection: - # a. FIX: Reduce settings_behavior["max_size"] - # 2. The server is down. Check to verify in a web browser (this is the default assumption since the - # mastodon.py API will not specify why the connection timed out). - # 3. Failed to generate screenshot - # 4. Other general exceptions - except Exception as e: - print("Unhandled Exception:", e) - # Exception flags - reinsert_image = True - - # An exception occurred - self.failed_uploads += 1 - self.consecutive_failed_uploads += 1 - - if reinsert_image and self.consecutive_failed_uploads < self.settings_behavior["max_errors"]: - self.listPictures.insert(0, picked) - - # The post failed - return None - - # [BEGIN THE PROGRAM] - def prime_bot(self): - if self.primed: - return - self.load_picture_list() - self.validate_post_settings() - if not self.debug_mode: - self.decrypt_settings() - self.login() - self.primed = True - - def start(self): - self.prime_bot() - - # Begin posting - self.main_loop() - - # Return 1 if there are still pictures in the picture list - return len(self.listPictures) > 0 - - # End Conditions: - # 1. User presses Ctrl+C - # 2. There is nothing left in the picture list to select from - # 3. settings_behavior["uploads_per_post"] is less than uploads_per_post - # 4. Consecutive failed uploads is less than max_errors - def can_post(self): - return ( - not self.eventSleep.is_set() and - bool(len(self.listPictures)) and - self.currentSessionCount < self.settings_behavior["uploads_per_post"] and - self.consecutive_failed_uploads < self.settings_behavior["max_errors"] - ) - - - def main_loop(self): - sleep_seconds = self.settings_behavior["retry_seconds"] - while self.can_post(): - picked = self.post() - self.print_header_stats(picked) - if self.can_post(): - self.eventSleep.wait(sleep_seconds) - - -# A callback function for get_list_of_hashes_with_profiles() that returns a single profile from @param hash_obj -# @param hash_obj A HashObject() (or subclass) -# @param profiles A list of available profiles to match -# @param profiles_default The default profile to return if no profile is matched -def get_profile(hash_obj, profiles, profiles_default): - profile_gen = (x for x in profiles if fnmatch.fnmatch(hash_obj.get_hash_path(), x["path"])) - return next(profile_gen, profiles_default) - - -# Takes a file path and transforms it into a list of YanBotHash() with the appropriate profile -# @param f_name Path of hash file -# @param profiles List of profiles -> self.settings_post -# @param profiles_default The default profile to apply -# @param callback_get_profile Callback function -> should return a single profile. Default: get_profile() -def get_list_of_hashes_with_profiles(f_name, profiles, profile_default): - return [YanBotHash(i, get_profile(i, profiles, profile_default)) for i in yanlib.get_hash_list(f_name)] - - -# Custom Exceptions for YandereBot -class Debug(Exception): - pass - - -class InvalidPost(Exception): - pass - - -class BadPostSettings(Exception): - pass - - -class FailedLogin(Exception): - pass - - -class BadCfgFile(Exception): - pass - - -class MissingMasterList(Exception): - pass