From 01d898288a668b017aa3d4d4304ac397db30a14a Mon Sep 17 00:00:00 2001 From: Anon Date: Sun, 14 May 2023 17:23:24 -0700 Subject: [PATCH] Refactored to remove built-in hash function --- default/cfg.py | 4 +- src/main.py | 249 ++++++++++++++++++++----------------------------- src/yanlib.py | 162 -------------------------------- 3 files changed, 100 insertions(+), 315 deletions(-) delete mode 100644 src/yanlib.py diff --git a/default/cfg.py b/default/cfg.py index 463aced..1f8265a 100644 --- a/default/cfg.py +++ b/default/cfg.py @@ -68,9 +68,7 @@ settings_encrypt = { # Basic settings to configure Yandere Bot's behavior settings_behavior = { - "master_list": ("./md5/master_file.txt",), - "master_blacklist_r": ("./md5/blacklist.txt", "./md5/master_blacklist.txt"), - "master_blacklist_w": ("./md5/blacklist.txt",), + "master_list": "./md5/master_file.txt", "max_size": 15*1024*1024, "visibility": "unlisted", "feature_set": "pleroma", diff --git a/src/main.py b/src/main.py index f56d175..569c58b 100755 --- a/src/main.py +++ b/src/main.py @@ -22,151 +22,128 @@ import argparse import signal import FediBot import contextlib -import yanlib import fnmatch -from functools import reduce from mastodon import MastodonAPIError -class BadPostSettings(Exception): - pass - class MissingMasterList(Exception): pass -class YanBotHash(yanlib.HashObject): - _postSettings = None +class YanBotHash: + def __init__(self, p, profile): + self._path = p + self._profile = profile + + def get_path(self): + return self._path - def __init__(self, hash_obj, profile): - super(YanBotHash, self).__init__() - if hash_obj is None: - return - self._sHash = hash_obj.get_hash_string() - self._sBinaryChar = hash_obj.get_binary_char() - self._sPath = hash_obj.get_hash_path() - self._postSettings = profile + def get_path_profile(self): + return self._profile - def get_post_setting(self): - return self._postSettings - -# A callback function for get_list_of_hashes_with_profiles() that returns a single profile from @param hash_obj -# @param hash_obj A HashObject() (or subclass) -# @param profiles A list of available profiles to match -# @param profiles_default The default profile to return if no profile is matched -def get_profile(hash_obj, profiles, profiles_default): - profile_gen = (x for x in profiles if fnmatch.fnmatch(hash_obj.get_hash_path(), x["path"])) - return next(profile_gen, profiles_default) - - -# Takes a file path and transforms it into a list of YanBotHash() with the appropriate profile -# @param f_name Path of hash file -# @param profiles List of profiles -> self.settings_post -# @param profiles_default The default profile to apply -# @param callback_get_profile Callback function -> should return a single profile. Default: get_profile() -def get_list_of_hashes_with_profiles(f_name, profiles, profile_default): - return [YanBotHash(i, get_profile(i, profiles, profile_default)) for i in yanlib.get_hash_list(f_name)] +def seek_to_line_number(fd, n): + linebreaks = 0 + ch = "" + while fd and ch and linebreaks < n: + ch = fd.read(1) + linebreaks += int(ch == '\n') + return ch class YandereBot(FediBot.YandereBot): - listPictures = [] - lenBlacklist = 0 - - def __init__(self, cfg, keyfile=None, debug_mode=False): - settings = { - "settings_time": {}, - "settings_post": {}, - "settings_post_default": {} - } - self.settings.update(settings) + def __init__(self, masterfile, line, cfg, keyfile=None, debug_mode=False): super(YandereBot, self).__init__(cfg, keyfile, debug_mode) + + # Pictures to post + self.listPictures = None + self.currentLine = max(0, line) + self.lastReadLine = None + self.lastLine = self.currentLine + self.master_file = masterfile or self.settings["settings_behavior"]["master_list"] + + self.load_settings(["settings_time", "settings_post", "settings_post_default"]) + def __del__(self): + if self.listPictures and not self.listPictures.closed: + self.listPictures.close() + # [BEGIN THE PROGRAM] def prime_bot(self): if self.primed: return self.load_picture_list() - self.validate_post_settings() super(YandereBot, self).prime_bot() - # Make sure there are no profiles in listPictures set to none. Print the bad post and exit if there is. - def validate_post_settings(self): - bad_post_count = 0 - for i in self.listPictures: - if i.get_post_setting() is None: - print("Bad post setting [{}]: {}".format(bad_post_count, i.get_full_string())) - bad_post_count += 1 - if bad_post_count: - raise BadPostSettings - - # Set up lists - def read_blacklist_files(self): - list_blacklist = [] - for i in self.settings["settings_behavior"]["master_blacklist_r"]: - # It doesn't matter if the picture file doesn't exist - with contextlib.suppress(IOError): - list_blacklist.extend(yanlib.get_hash_list(i)) - return list_blacklist - - def blacklist(self, picked): - self.lenBlacklist += 1 - for path in self.settings["settings_behavior"]["master_blacklist_w"]: - with open(path, "a") as f: - print(picked.get_full_string(), file=f) - # Returns a list of media paths (without the hashes) def get_media_list(self, picked): ext = self.settings["settings_behavior"]["multi_media_ext"] if not picked: return None - elif ext and os.path.splitext(picked.get_hash_path())[1].lower() == ext.lower(): - return [i.get_hash_path() for i in yanlib.get_hash_list(picked.get_hash_path())] + elif ext and os.path.splitext(picked.get_path())[1].lower() == ext.lower(): + with open(picked.get_path()) as fd: + return [i for i in fd.read().splitlines() if self.is_valid_upload(i)] else: - return [picked.get_hash_path()] + return [picked.get_path()] - # load_pictures will return a list of YanHashObj() with a blacklist(s) applied - # @param list_blacklist A list of HashObjects() that are blacklist hashes - def load_pictures(self, list_blacklist): - if not self.settings["settings_behavior"]["master_list"]: - raise MissingMasterList - - try: - list_pictures = reduce(lambda x, y: x + y, - [get_list_of_hashes_with_profiles( - f, - self.settings["settings_post"], - self.settings["settings_post_default"]) - for f in self.settings["settings_behavior"]["master_list"] - ]) - return yanlib.get_hash_list_blacklist(list_pictures, list_blacklist, self.settings["settings_behavior"]["max_size"]) - except IOError as e: - print(e) - raise MissingMasterList + # A callback function for get_list_of_hashes_with_profiles() that returns a single profile from @param hash_obj + def get_profile(self, p): + profiles = self.settings["settings_post"] + profile_default = self.settings["settings_post_default"] + profile_gen = (x for x in profiles if fnmatch.fnmatch(p, x["path"])) + return next(profile_gen, profile_default) + + def is_valid_upload(self, p): + max_size = self.settings["settings_behavior"]["max_size"] + return p and os.path.isfile(p) and os.path.getsize(p) <= max_size def load_picture_list(self): - list_blacklist = self.read_blacklist_files() - self.listPictures = self.load_pictures(list_blacklist) - self.lenBlacklist = len(list_blacklist) + try: + self.listPictures = sys.stdin if self.master_file == "-" else open(self.master_file, "r") + seek_to_line_number(self.listPictures, self.currentLine) + self.previousPos = self.listPictures.tell() + except FileNotFoundError: + raise MissingMasterList # Maybe I should remove this from the backend? def print_header_stats(self, picked): - picked_profile, _picked, picked_next_profile, picked_next = None, None, None, None + picked_profile, _picked = [None]*2 + line = "0" + if picked: - _picked = picked.get_full_string() - picked_profile = picked.get_post_setting()["name"] + _picked = picked.get_path() + picked_profile = picked.get_path_profile()["name"] + line = str(self.currentLine - 1) - if self.listPictures: - picked_next = self.listPictures[0].get_full_string() - picked_next_profile = self.listPictures[0].get_post_setting()["name"] - - print("Profile: {} | Picked: {} | Next_Profile: {} | Next_Pick: {}".format( - picked_profile, _picked, picked_next_profile, picked_next + print("Profile: {} | Picked: {} | Line: {}".format( + picked_profile, _picked, line )) + def getline(self): + if self.currentLine == self.lastLine and self.lastReadLine: + line = self.lastReadLine + else: + line = self.listPictures.readline().rstrip(os.linesep) + self.lastReadLine = line + self.lastLine = self.currentLine + self.currentLine += 1 + if not line: + self.listPictures.close() + return line + + def rewindline(self): + self.currentLine -= 1 + + def pick(self): - picked = self.listPictures.pop(0) + _picked = self.getline() + if not self.is_valid_upload(_picked): + raise FediBot.InvalidPost("Image not found {}".format(_picked)) + picked_profile = self.get_profile(_picked) + if not picked_profile: + raise FediBot.InvalidPost("No matching profile {}".format(_picked)) + picked = YanBotHash(_picked, picked_profile) media_list = self.get_media_list(picked) - spoiler = picked.get_post_setting()["spoiler"] - message = picked.get_post_setting()["message"] + spoiler = picked.get_path_profile()["spoiler"] + message = picked.get_path_profile()["message"] return { "picked": picked, "media_list": media_list, @@ -176,26 +153,23 @@ class YandereBot(FediBot.YandereBot): def after_post(self, picked): if picked: - self.blacklist(picked["picked"]) self.print_header_stats(picked["picked"]) else: self.print_header_stats(None) def post(self, callback=None): - picked = None reinsert_image = False try: - picked = self.pick() - return super(YandereBot, self).post(lambda: picked) + return super(YandereBot, self).post() # Attempted to post a file that doesn't exist (immediately repost ignoring retry_seconds) - except (FileNotFoundError, FediBot.InvalidPost): - if picked: - print("File not found:", picked["picked"].get_hash_path()) + except (FileNotFoundError, FediBot.InvalidPost) as e: + print(e) reinsert_image = False # Check if the file limit has been reached except MastodonAPIError as e: print("API Error:", e) + reinsert_image = True # Check if the file limit has been reached (413 error) with contextlib.suppress(IndexError): reinsert_image = e.args[1] != 413 @@ -214,8 +188,8 @@ class YandereBot(FediBot.YandereBot): # Exception flags reinsert_image = True - if picked and reinsert_image and self.consecutive_failed_uploads < self.settings["settings_behavior"]["max_errors"]: - self.listPictures.insert(0, picked["picked"]) + if reinsert_image and self.can_post(): + self.rewindline() self.handle_post_exception() @@ -224,18 +198,12 @@ class YandereBot(FediBot.YandereBot): def can_post(self): - return bool(len(self.listPictures)) and super(YandereBot, self).can_post() + return (not self.listPictures.closed) and super(YandereBot, self).can_post() def start(self): super(YandereBot, self).start() - # Return 1 if there are still pictures in the picture list - return len(self.listPictures) > 0 - - -class FailedToLoadCfg(Exception): - pass - + return not self.listPictures.closed # Entry point if run from the command line def main(): @@ -247,42 +215,26 @@ def main(): description="A bot for posting on Mastodon", # epilog="All switches can be combined for greater control", add_help=True) + parser.add_argument("masterfile", help="Override master file. The '-' symbol represents stdin", nargs="?") parser.add_argument("--dry-run", help="Will not login or post to Plemora", action="store_true") parser.add_argument("--debug", help="Same as --dry-run", action="store_true") parser.add_argument("-c", "--config", help="Set custom config file (Default: {})".format(default_cfg), default=default_cfg) parser.add_argument("-k", "--keyfile", help="Keyfile used for decryption") - parser.add_argument("-o", "--output", help="Output master list to stdout", action="store_true") + parser.add_argument("-l", "--line", help="Seek to line number", default=0) parser.add_argument("remainder", help=argparse.SUPPRESS, nargs=argparse.REMAINDER) arguments = parser.parse_args() - # Yandere Lewd Bot - yandere = None - yandere_config = None - - # Configuration file for Yandere Lewd Bot - try: - import importlib - yandere_config = importlib.import_module(arguments.config) - except ImportError: - raise FailedToLoadCfg("Invalid config file: {}".format(arguments.config)) - # Flag if the bot is running in debug mode - debug_mode = arguments.dry_run or arguments.debug or arguments.output + debug_mode = arguments.dry_run or arguments.debug yandere = YandereBot( - yandere_config, + arguments.masterfile, + int(arguments.line), + arguments.config, arguments.keyfile, debug_mode ) - # Output master list if -o switch is set - if arguments.output: - with contextlib.redirect_stdout(None): - yandere.prime_bot() - for item in yandere.listPictures: - print(item.get_full_string()) - return 0 - # Setup exit calls # Must be done after we declare our bot(s), otherwise this will be called if quitting on decrypting settings ) def yandere_quit(signo, _frame): @@ -300,15 +252,12 @@ if __name__ == "__main__": # A return value of 0 or 1 is a normal exit try: sys.exit(main()) - # Exceptions raised from the main function - except FailedToLoadCfg: - sys.exit(6) # Exceptions raised from the bot + except MissingMasterList: + sys.exit(3) except FediBot.Debug: sys.exit(5) except FediBot.BadCfgFile: sys.exit(4) - except BadPostSettings: - sys.exit(3) except FediBot.FailedLogin: sys.exit(2) diff --git a/src/yanlib.py b/src/yanlib.py deleted file mode 100644 index b6fe24d..0000000 --- a/src/yanlib.py +++ /dev/null @@ -1,162 +0,0 @@ -#! /usr/bin/env python3 - -# Yandere Lewd Bot, an image posting bot for Pleroma -# Copyright (C) 2022 Anon <@Anon@yandere.cc> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -# yanlib.py is required by yandereBot.py -# This file contains functions and utilities that may be useful to external tools and programs that interface with -# yandereBot, or manipulate hash files in some way. Typically instantiating a yandereBot object is unnecessary for this. - -import os - - -# Requires properly formatted MD5 checksum lines. -# The length of the string shouldn't matter. -# But it should be formatted like so: '8d20714ec6d3ee6d444931d01dd68626 *./rsc/file.png' -# The initializer does NOT check for the validity of the hash string. This should be done manually with is_valid_hash() -# ex. hashes = [HashObject(i) for i in hash_list if is_valid_hash(i)] -# See: get_hash_list() and get_hash_list_str() for functions to make the above example even more simple -class HashObject: - _sHash = "" - _sBinaryChar = "" - _sPath = "" - _sDeliminator = ' ' # This should never change - - def __init__(self, s_hash=None): - if s_hash is not None: - self.set_hash(s_hash) - - def set_hash(self, s): - line = s.strip() - split_hash = line.split(self._sDeliminator, 1) - self._sHash = split_hash[0] # The hash value - self._sBinaryChar = split_hash[1][:1] # The binary character - self._sPath = split_hash[1][1:] # The path to the file - - def get_hash_string(self): - return self._sHash - - def get_hash_path(self): - return self._sPath - - def get_binary_char(self): - return self._sBinaryChar - - def get_deliminator(self): - return self._sDeliminator - - def get_full_string(self): - return ( - self._sHash + - self._sDeliminator + - self._sBinaryChar + - self._sPath) - - -# -------------------------------- HASH FUNCTIONS -------------------------------------------- -# For MD5 hashes - -# Checking for the binary character when comparing hash string is usually overkill since the CoreUtils package reads -# files in binary mode regardless of whether the -b switch is passed. -# It is included here since it may be useful for some other applications besides the bot. -def is_matching_atter_binary(v_list, v_hash, callback): - return next( - (x for x in v_list if - callback(v_hash) == callback(x) and - v_hash.get_binary_char() == x.get_binary_char() - ), None) - - -def is_matching_atter(v_list, v_hash, callback): - return next( - (x for x in v_list if - callback(v_hash) == callback(x) - ), None) - -def get_matching(v_list, v_hash, callback, match_bin=False): - return is_matching_atter(v_list, v_hash, callback) if not match_bin else is_matching_atter_binary(v_list, v_hash, callback) - - -def get_matching_hash_in_list(v_list, v_hash, match_bin=False): - return get_matching(v_list, v_hash, lambda x: x.get_hash_string(), match_bin) - - -def get_matching_path_in_list(v_list, v_hash, match_bin=False): - return get_matching(v_list, v_hash, lambda x: x.get_hash_path(), match_bin) - - -def get_matching_full_string_in_list(v_list, v_hash, match_bin=False): - return get_matching(v_list, v_hash, lambda x: x.get_full_string(), match_bin) - - -# ---------------------------- HASH LIST FUNCTIONS -------------------------------------------- -# This is a very lazy check. We only care if the bot will crash when initializing a HashObject() -# A 'hash' of '1 *b' or '1 b' will pass in this function. -def is_valid_hash(s): - line = s.strip() - split_hash = line.find(' ') - - # Check - return not ( - line.startswith('#') or - len(line) < 4 or - split_hash in (-1, len(line) - 2) or - line[split_hash + 1] not in (' ', '*') - ) - - -def get_hash_list_str(lst_str): - return [HashObject(i) for i in lst_str if is_valid_hash(i)] - - -def get_hash_list(f_name): - with open(f_name, "r") as f: - return get_hash_list_str(f.readlines()) - - -def is_hash_blacklisted(line_hash, v_pictures, v_blacklist, max_size=None): - # Print out the entire line - line = line_hash.get_full_string() - full_path = line_hash.get_hash_path() - - # Used for comparing file size - cmp_size = max_size is not None - - # Only add it to the pictures list if the following conditions are met: - # NOTICE: This is going to be CPU intensive. - if get_matching_hash_in_list(v_pictures, line_hash): # No hash duplicates - print("Ignoring Duplicate Hash:", line) - elif get_matching_path_in_list(v_pictures, line_hash): # No path duplicates - print("Ignoring Duplicate Path:", line) - elif get_matching_hash_in_list(v_blacklist, line_hash): # File hash is not blacklisted - print("Ignoring Blacklisted Hash:", line) - elif get_matching_path_in_list(v_blacklist, line_hash): # File path is not blacklisted - print("Ignoring Blacklisted Path:", line) - elif not os.path.isfile(full_path): # File exists - print("File not found:", line) - elif cmp_size and os.path.getsize(full_path) > max_size: # Does not exceed max file limit (if max_size is set) - print("Exceeds Max File Size:", line) - else: - return False - return True - - -def get_hash_list_blacklist(v_pictures, v_blacklist, max_size=None): - ret = [] - for lineHash in v_pictures: - if not is_hash_blacklisted(lineHash, ret, v_blacklist, max_size): - ret.append(lineHash) - return ret