From 259516fc00a6b86e52a781285dbe78c31713f9dd Mon Sep 17 00:00:00 2001 From: Anon Date: Sun, 12 Mar 2023 17:43:42 -0700 Subject: [PATCH] Removed modular backend --- src/encryption.py | 217 -------------------------- src/yandere_bot.py | 380 --------------------------------------------- 2 files changed, 597 deletions(-) delete mode 100755 src/encryption.py delete mode 100644 src/yandere_bot.py diff --git a/src/encryption.py b/src/encryption.py deleted file mode 100755 index f9a4749..0000000 --- a/src/encryption.py +++ /dev/null @@ -1,217 +0,0 @@ -#! /usr/bin/env python3 - -# Danbooru Bot, an image posting bot for Pleroma -# Copyright (C) 2022 Anon -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import sys -import base64 -import os -import getpass -from cryptography.fernet import Fernet, InvalidToken -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC - -from collections import OrderedDict - - -class EncryptionFail(Exception): - pass - -class PasswordMismatch(Exception): - pass - - -# Return string from bytes -def salt_encode(b): - return base64.urlsafe_b64encode(b).decode() - - -# Return bytes from string -def salt_decode(s): - return base64.urlsafe_b64decode(s.encode()) - - -# Ordered Dictionaries -def change_encoding_dict(settings_server, encoding_type): - return OrderedDict([(k, encoding_type(v)) for k, v in settings_server.items()]) - - -# Return bytes from string -def encode_dict(settings_server): - return change_encoding_dict(settings_server, str.encode) - - -# Return string from bytes -def decode_dict(settings_server): - return change_encoding_dict(settings_server, bytes.decode) - - -# password: Bytes -# salt: Bytes -def derive_key(password, salt): - kdf = PBKDF2HMAC( - algorithm=hashes.SHA256(), - length=32, - salt=salt, - iterations=100000, - backend=default_backend() - ) - return base64.urlsafe_b64encode(kdf.derive(password)) - - -# Encryption functions -# message: Bytes -# key: Bytes -def encrypt(message, key): - f = Fernet(key) - token = f.encrypt(message) - return token - - -# token: Bytes -# key: Bytes -def decrypt(token, key): - f = Fernet(key) - message = f.decrypt(token) - return message - - -# password: bytes() -# salt: bytes() -# settings_server: dict() -> Byte values -# encryption_function: encrypt(message, key) : decrypt(token, key): -# Returns settings_server_decrypted dictionary with Byte() values. Will need to use -# ChangeEncodingDict to make them strings (recommended cfg file friendly) -def encrypt_settings(settings_server, password, salt, encryption_function): - key = derive_key(password, salt) - settings_server_decrypted = OrderedDict() - for setting in settings_server: - settings_server_decrypted[setting] = encryption_function(settings_server[setting], key) - return settings_server_decrypted - -def get_keyfile(keyfile=None): - if keyfile is not None: - with open(keyfile, "rb") as f: - return f.read() - return None - - -def get_pass(q): - try: - return getpass.getpass(q).encode() - except KeyboardInterrupt: - raise EncryptionFail("\nQuitting...") - - -def settings_server_encrypt(settings_server, keyfile=None): - try: - settings_server = encode_dict(settings_server) - salt = os.urandom(16) - password = get_keyfile(keyfile) - if password is None: - password = get_pass("Enter password: ") - password2 = get_pass("Enter password: ") - if password != password2: - raise PasswordMismatch("Passwords do not match") - encrypted = encrypt_settings(settings_server, password, salt, encrypt) - return salt_encode(salt), decode_dict(encrypted) - except PasswordMismatch as e: - raise EncryptionFail(str(e)) - except Exception as e: - err = str(e) - raise EncryptionFail("Encrypt Error: {}".format(err)) - - -def settings_server_decrypt(settings_server, settings_encrypt, keyfile=None): - try: - if not settings_encrypt["encrypt"]: - return settings_server - settings_server = encode_dict(settings_server) - password = get_keyfile(keyfile or settings_encrypt["keyfile"]) or get_pass("Enter password: ") - salt = salt_decode(settings_encrypt["salt"]) - decrypted = encrypt_settings(settings_server, password, salt, decrypt) - return decode_dict(decrypted) - except base64.binascii.Error: - raise EncryptionFail("Salt is invalid") - except InvalidToken: - raise EncryptionFail("Password or token is incorrect") - except Exception as e: - err = str(e) - raise EncryptionFail("Decrypt Error: {}".format(err)) - - -def main(): - import argparse - from pprint import pformat - - default_cfg = "cfg" - - parser = argparse.ArgumentParser( - description="A class to encrypt server credentials", - epilog="There are no additional parameters.", - add_help=True ) - parser.add_argument("--encrypt", help="Generate encrypted authentication.", action="store_true") - parser.add_argument("--decrypt", help="Decrypt encrypted authentication", action="store_true") - parser.add_argument("--recrypt", help="Recrypt encrypted authentication", action="store_true") - parser.add_argument("-k", "--keyfile", help="Keyfile used for decryption", default=None) - parser.add_argument("-c", "--cfg", help="Specify config file.", default=default_cfg) - - arguments = parser.parse_args() - - if arguments.recrypt: - arguments.encrypt = True - arguments.decrypt = True - - if arguments.encrypt or arguments.decrypt: - import importlib - cfg = importlib.import_module(arguments.cfg) - settings_server = cfg.settings_server - settings_encrypt = cfg.settings_encrypt - keyfile = arguments.keyfile or settings_encrypt["keyfile"] - - if arguments.decrypt and arguments.encrypt: - print("Re-encrypting") - - if arguments.decrypt: # arguments.decrypt - print("Decrypt...") - settings_server = settings_server_decrypt(settings_server, settings_encrypt, arguments.keyfile) - settings_encrypt = OrderedDict([ - ("encrypt", False), - ("salt", settings_encrypt["salt"]), - ("keyfile", arguments.keyfile) - ]) - - if arguments.encrypt: - print("Encrypt...") - salt, settings_server = settings_server_encrypt(settings_server, keyfile) - settings_encrypt = OrderedDict([ - ("encrypt", True), - ("salt", salt), - ("keyfile", arguments.keyfile) - ]) - - print("settings_server = {}".format(pformat(settings_server))) - print("settings_encrypt = {}".format(pformat(settings_encrypt))) - return 0 - - -if __name__ == "__main__": - try: - sys.exit(main()) - except EncryptionFail as e: - print(e) - sys.exit(1) diff --git a/src/yandere_bot.py b/src/yandere_bot.py deleted file mode 100644 index 4ea25b3..0000000 --- a/src/yandere_bot.py +++ /dev/null @@ -1,380 +0,0 @@ -#! /usr/bin/env python3 - -# Danbooru Bot, an image posting bot for Pleroma -# Copyright (C) 2022 Anon -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import os -import importlib -import magic -import random -import copy -from threading import Event -from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError - - -class YandereBot: - # From the threading library. Is responsible for putting the bot to sleep, and exiting when the user quits (Ctrl+C) - eventSleep = Event() - - # The configuration module - cfg = None - - # The below settings are required from the configuration module - settings_server = None - settings_behavior = None - settings_time = None - settings_post = None - settings_encrypt = None - settings_backend = None - - # Class variables - mastodon_api = None - failed_uploads = 0 - consecutive_failed_uploads = 0 - currentSessionCount = 0 - currentIndexCount = 0 - currentProfileIndex = [] - debug_mode = False - primed = False - decrypted = False - - # YandereBot.__init__() - # @param cfg A dynamically loaded python module. See yanlib.module_load() for an example - # @param keyfile Keyfile to decrypt settings_post - # @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma) - # @prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post) - def __init__(self, cfg, keyfile=None, debug_mode=False, prime_bot=True): - self.cfg = cfg - self.load_settings(self.cfg) - self.debug_mode = debug_mode or self.settings_behavior["debug"] - self.settings_encrypt["keyfile"] = keyfile or self.settings_encrypt["keyfile"] - self.currentProfileIndex = [0]*len(self.settings_post) - random.seed(os.urandom(16)) - if prime_bot: - self.prime_bot() - - # Setup Exit Calls - def exit(self): - self.eventSleep.set() - - # Decryption settings - # Used to set class attributes with the same name to the value specified in the config file. - def load_settings(self, cfg, settings=None): - try: - default_settings = ( - "settings_server", - "settings_behavior", - "settings_time", - "settings_post", - "settings_encrypt", - "settings_backend", - ) - _settings = settings or default_settings - for ele in _settings: - setattr(self, ele, getattr(cfg, ele)) - - except AttributeError as e: - print(e) - raise BadCfgFile - - def decrypt_settings(self): - if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode: - import encryption - try: - self.settings_server = encryption.settings_server_decrypt( - self.settings_server, self.settings_encrypt, self.settings_encrypt["keyfile"]) - except encryption.EncryptionFail as e: - raise BadCfgFile(str(e)) - self.decrypted = True - - # Login to Pleroma - def login(self): - skip_login = self.debug_mode or self.mastodon_api is not None - if skip_login: - return - if not self.decrypted: - self.decrypt_settings() - try: - self.mastodon_api = Mastodon( - client_id=self.settings_server["client_id"], - client_secret=self.settings_server["client_secret"], - access_token=self.settings_server["access_token"], - api_base_url=self.settings_server["api_base_url"], - feature_set=self.settings_behavior["feature_set"] # <--- Necessary for Mastodon Version 1.5.1 - ) - except (MastodonIllegalArgumentError, MastodonVersionError) as e: - print(e) - raise FailedLogin - - # Maybe I should remove this from the backend? - def print_header_stats(self, picked): - profile = picked["profile"]["name"] if picked else None - url = picked["file_url"] if picked else None - path = picked["full_path"] if picked else None - nsfw = picked["nsfw"] if picked else None - - posted_once = int(self.currentSessionCount > 0) - index = (self.currentIndexCount - posted_once) % len(self.settings_post) - state_print = copy.copy(self.currentProfileIndex) - state_print[index] = state_print[index] - posted_once - - state_print = [state_print[i] % len(self.settings_post[i]) for i in range(0, len(self.currentProfileIndex))] - - print("Profile: {} | Index: {} | NSFW: {} | Path: {} | URL: {}".format( - profile, index, nsfw, path, url - )) - print("State: {}".format(','.join(map(str, state_print)))) - - # Returns a list of media paths (without the hashes) - def download_media(self, picked_profile): - try: - backend_s = picked_profile["backend"] - backend_credentials = self.settings_backend[backend_s] - backend = importlib.import_module(backend_credentials["module"]) - - downloader = backend.downloader(backend_credentials) - img = downloader.fetch_post(picked_profile) - - if img is None: - raise InvalidPost("Img could not be downloaded") - - return downloader.download_post(img) - except ImportError: - print("Invalid Backend:", picked_profile["backend"]) - return None - - # Returns a list of tuples that contain the media list path and media mastodon dictionary - def upload_media_list(self, path_list): - media_list = [] - # Validate picked - for path in path_list: - if not os.path.isfile(path): - raise FileNotFoundError("Could not upload: {}".format(path)) - elif not self.valid_mimetype(path): - raise InvalidMimeType("Invalid mime type") - elif not self.valid_file_size(path): - raise FileTooLarge("File is too large to upload") - - # Upload - for path in path_list: - if not self.debug_mode: - media = self.mastodon_api.media_post(path, description=os.path.basename(path)) - media_dict = { - "path": path, - "media": media - } - media_list.append(media_dict) - return media_list - - def get_post_text(self, picked, media_list): - content_type = self.settings_behavior["content_type"] - content_newline = self.settings_behavior["content_newline"] - nsfw = picked["nsfw"] - message = picked["profile"]["message_nsfw"] if nsfw else picked["profile"]["message"] - static_message = content_newline.join(message) - string_post = "" - string_imglinks = [] - - if media_list and self.settings_behavior["post_image_link"]: - for media_dict in media_list: - path = media_dict["path"] - media = media_dict["media"] - - if path is None or media is None: - continue - elif content_type == "text/markdown" and not self.debug_mode: - string_imglinks.append( - "[{}]({})".format(os.path.basename(path), media["url"]) - ) - else: - string_imglinks.append(media["url"]) - - # Join non empty strings with a newline character - string_imglinks_joined = content_newline.join(filter(None, string_imglinks)) - string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined))) - - return string_post - - def valid_mimetype(self, full_path): - mime = magic.from_file(full_path, mime=True) - - if mime is None: - return False - - mime_category = mime.split("/", 1)[0] - - return mime_category in ("image", "video") - - - def valid_file_size(self, full_path): - max_size = self.settings_behavior["max_size"] - file_size = os.stat(full_path).st_size - return file_size <= max_size - - - def _post(self, picked): - if not picked: - raise InvalidPost("Picked post is none") - upload_list = [picked["full_path"]] - media_list = self.upload_media_list(upload_list) - message = self.get_post_text(picked, media_list) - if self.debug_mode: - return picked - self.mastodon_api.status_post( - message, - media_ids=[i["media"] for i in media_list], - visibility=self.settings_behavior["visibility"], - sensitive=picked["nsfw"], - content_type=self.settings_behavior["content_type"] - ) - return picked - - - def pick_index(self, mode, current_index, length): - if mode == "random": - return random.randint(0, length - 1) - elif mode == "sequential": - return current_index % length - - - def pick_profile(self): - # Get x and y - mode = self.settings_behavior["tag_select"].lower() - posts = self.settings_post - x = self.pick_index(mode, self.currentIndexCount, len(posts)) - y = self.pick_index(mode, self.currentProfileIndex[x], len(posts[x])) - - # Return the Profile - return x, y - - - # The main post function - # This funciton is responsible for picking a profile, generate a screenshot, and posting it. - # - # This function should return 'None' if a post failed, and the picked item from self.listPictures if it succeeded. - def post(self): - picked = None - - # Attempt post - try: - # Post - x, y = self.pick_profile() - picked_profile = self.settings_post[x][y] - picked = self.download_media(picked_profile) - self._post(picked) - - os.remove(picked["full_path"]) - - # After a successful post - self.currentSessionCount += 1 - self.consecutive_failed_uploads = 0 - - # Set indexes - self.currentProfileIndex[x] += 1 - self.currentIndexCount += 1 - - # The post was successful - return picked - - # Invalid post (move to next profile) - except InvalidPost as e: - self.currentIndexCount += 1 - print("Invalid post:", e) - - # Invalid post (remove downloaded files) - except (FileTooLarge, InvalidMimeType) as e: - os.remove(picked["full_path"]) - print("Unable to post:", e) - - # Server Errors and other general exceptions - # Assume all exceptions are on the server side (besides FileNotFoundError of course) - # If the connection is timing out it could be for two reasons: - # 1. The error was caused by the user attempting to upload a large file over a slow connection: - # a. FIX: Reduce settings_behavior["max_size"] - # 2. The server is down. Check to verify in a web browser (this is the default assumption since the - # mastodon.py API will not specify why the connection timed out). - # 3. Other general exceptions - except (FileNotFoundError, MastodonAPIError, Exception) as e: - print("Exception:", e) - - # An exception occurred - self.failed_uploads += 1 - self.consecutive_failed_uploads += 1 - - # The post failed - return None - - # [BEGIN THE PROGRAM] - def prime_bot(self): - if not self.debug_mode: - self.decrypt_settings() - self.login() - self.primed = True - - def start(self, delay=0): - # Prime bot if not already primed. - if not self.primed: - self.prime_bot() - - # Begin posting - self.main_loop() - - # Return 1 if there are still pictures in the picture list - return 0 - - # End Conditions: - # 1. User presses Ctrl+C - # 2. settings_behavior["uploads_per_post"] is less than uploads_per_post - # 3. Consecutive failed uploads is less than max_errors - def can_post(self): - return ( - not self.eventSleep.is_set() and - self.currentSessionCount < self.settings_behavior["uploads_per_post"] and - self.consecutive_failed_uploads < self.settings_behavior["max_errors"] and - self.currentIndexCount >= 0 - ) - - def main_loop(self): - sleep_seconds = self.settings_behavior["retry_seconds"] - while self.can_post(): - picked = self.post() - self.print_header_stats(picked) - if self.can_post(): - self.eventSleep.wait(sleep_seconds) - -# Custom Exceptions for YandereBot -class Debug(Exception): - pass - - -class InvalidPost(Exception): - pass - - -class FileTooLarge(Exception): - pass - - -class InvalidMimeType(Exception): - pass - - -class FailedLogin(Exception): - pass - - -class BadCfgFile(Exception): - pass