Refactored to remove built-in hash function

This commit is contained in:
Anon 2023-05-14 17:23:24 -07:00
parent 452c3b6fc3
commit 01d898288a
3 changed files with 100 additions and 315 deletions

View File

@ -68,9 +68,7 @@ settings_encrypt = {
# Basic settings to configure Yandere Bot's behavior
settings_behavior = {
"master_list": ("./md5/master_file.txt",),
"master_blacklist_r": ("./md5/blacklist.txt", "./md5/master_blacklist.txt"),
"master_blacklist_w": ("./md5/blacklist.txt",),
"master_list": "./md5/master_file.txt",
"max_size": 15*1024*1024,
"visibility": "unlisted",
"feature_set": "pleroma",

View File

@ -22,151 +22,128 @@ import argparse
import signal
import FediBot
import contextlib
import yanlib
import fnmatch
from functools import reduce
from mastodon import MastodonAPIError
class BadPostSettings(Exception):
pass
class MissingMasterList(Exception):
pass
class YanBotHash(yanlib.HashObject):
_postSettings = None
class YanBotHash:
def __init__(self, p, profile):
self._path = p
self._profile = profile
def __init__(self, hash_obj, profile):
super(YanBotHash, self).__init__()
if hash_obj is None:
return
self._sHash = hash_obj.get_hash_string()
self._sBinaryChar = hash_obj.get_binary_char()
self._sPath = hash_obj.get_hash_path()
self._postSettings = profile
def get_path(self):
return self._path
def get_post_setting(self):
return self._postSettings
def get_path_profile(self):
return self._profile
# A callback function for get_list_of_hashes_with_profiles() that returns a single profile from @param hash_obj
# @param hash_obj A HashObject() (or subclass)
# @param profiles A list of available profiles to match
# @param profiles_default The default profile to return if no profile is matched
def get_profile(hash_obj, profiles, profiles_default):
profile_gen = (x for x in profiles if fnmatch.fnmatch(hash_obj.get_hash_path(), x["path"]))
return next(profile_gen, profiles_default)
# Takes a file path and transforms it into a list of YanBotHash() with the appropriate profile
# @param f_name Path of hash file
# @param profiles List of profiles -> self.settings_post
# @param profiles_default The default profile to apply
# @param callback_get_profile Callback function -> should return a single profile. Default: get_profile()
def get_list_of_hashes_with_profiles(f_name, profiles, profile_default):
return [YanBotHash(i, get_profile(i, profiles, profile_default)) for i in yanlib.get_hash_list(f_name)]
def seek_to_line_number(fd, n):
linebreaks = 0
ch = "<unset>"
while fd and ch and linebreaks < n:
ch = fd.read(1)
linebreaks += int(ch == '\n')
return ch
class YandereBot(FediBot.YandereBot):
listPictures = []
lenBlacklist = 0
def __init__(self, cfg, keyfile=None, debug_mode=False):
settings = {
"settings_time": {},
"settings_post": {},
"settings_post_default": {}
}
self.settings.update(settings)
def __init__(self, masterfile, line, cfg, keyfile=None, debug_mode=False):
super(YandereBot, self).__init__(cfg, keyfile, debug_mode)
# Pictures to post
self.listPictures = None
self.currentLine = max(0, line)
self.lastReadLine = None
self.lastLine = self.currentLine
self.master_file = masterfile or self.settings["settings_behavior"]["master_list"]
self.load_settings(["settings_time", "settings_post", "settings_post_default"])
def __del__(self):
if self.listPictures and not self.listPictures.closed:
self.listPictures.close()
# [BEGIN THE PROGRAM]
def prime_bot(self):
if self.primed:
return
self.load_picture_list()
self.validate_post_settings()
super(YandereBot, self).prime_bot()
# Make sure there are no profiles in listPictures set to none. Print the bad post and exit if there is.
def validate_post_settings(self):
bad_post_count = 0
for i in self.listPictures:
if i.get_post_setting() is None:
print("Bad post setting [{}]: {}".format(bad_post_count, i.get_full_string()))
bad_post_count += 1
if bad_post_count:
raise BadPostSettings
# Set up lists
def read_blacklist_files(self):
list_blacklist = []
for i in self.settings["settings_behavior"]["master_blacklist_r"]:
# It doesn't matter if the picture file doesn't exist
with contextlib.suppress(IOError):
list_blacklist.extend(yanlib.get_hash_list(i))
return list_blacklist
def blacklist(self, picked):
self.lenBlacklist += 1
for path in self.settings["settings_behavior"]["master_blacklist_w"]:
with open(path, "a") as f:
print(picked.get_full_string(), file=f)
# Returns a list of media paths (without the hashes)
def get_media_list(self, picked):
ext = self.settings["settings_behavior"]["multi_media_ext"]
if not picked:
return None
elif ext and os.path.splitext(picked.get_hash_path())[1].lower() == ext.lower():
return [i.get_hash_path() for i in yanlib.get_hash_list(picked.get_hash_path())]
elif ext and os.path.splitext(picked.get_path())[1].lower() == ext.lower():
with open(picked.get_path()) as fd:
return [i for i in fd.read().splitlines() if self.is_valid_upload(i)]
else:
return [picked.get_hash_path()]
return [picked.get_path()]
# load_pictures will return a list of YanHashObj() with a blacklist(s) applied
# @param list_blacklist A list of HashObjects() that are blacklist hashes
def load_pictures(self, list_blacklist):
if not self.settings["settings_behavior"]["master_list"]:
raise MissingMasterList
# A callback function for get_list_of_hashes_with_profiles() that returns a single profile from @param hash_obj
def get_profile(self, p):
profiles = self.settings["settings_post"]
profile_default = self.settings["settings_post_default"]
profile_gen = (x for x in profiles if fnmatch.fnmatch(p, x["path"]))
return next(profile_gen, profile_default)
try:
list_pictures = reduce(lambda x, y: x + y,
[get_list_of_hashes_with_profiles(
f,
self.settings["settings_post"],
self.settings["settings_post_default"])
for f in self.settings["settings_behavior"]["master_list"]
])
return yanlib.get_hash_list_blacklist(list_pictures, list_blacklist, self.settings["settings_behavior"]["max_size"])
except IOError as e:
print(e)
raise MissingMasterList
def is_valid_upload(self, p):
max_size = self.settings["settings_behavior"]["max_size"]
return p and os.path.isfile(p) and os.path.getsize(p) <= max_size
def load_picture_list(self):
list_blacklist = self.read_blacklist_files()
self.listPictures = self.load_pictures(list_blacklist)
self.lenBlacklist = len(list_blacklist)
try:
self.listPictures = sys.stdin if self.master_file == "-" else open(self.master_file, "r")
seek_to_line_number(self.listPictures, self.currentLine)
self.previousPos = self.listPictures.tell()
except FileNotFoundError:
raise MissingMasterList
# Maybe I should remove this from the backend?
def print_header_stats(self, picked):
picked_profile, _picked, picked_next_profile, picked_next = None, None, None, None
picked_profile, _picked = [None]*2
line = "0"
if picked:
_picked = picked.get_full_string()
picked_profile = picked.get_post_setting()["name"]
_picked = picked.get_path()
picked_profile = picked.get_path_profile()["name"]
line = str(self.currentLine - 1)
if self.listPictures:
picked_next = self.listPictures[0].get_full_string()
picked_next_profile = self.listPictures[0].get_post_setting()["name"]
print("Profile: {} | Picked: {} | Next_Profile: {} | Next_Pick: {}".format(
picked_profile, _picked, picked_next_profile, picked_next
print("Profile: {} | Picked: {} | Line: {}".format(
picked_profile, _picked, line
))
def getline(self):
if self.currentLine == self.lastLine and self.lastReadLine:
line = self.lastReadLine
else:
line = self.listPictures.readline().rstrip(os.linesep)
self.lastReadLine = line
self.lastLine = self.currentLine
self.currentLine += 1
if not line:
self.listPictures.close()
return line
def rewindline(self):
self.currentLine -= 1
def pick(self):
picked = self.listPictures.pop(0)
_picked = self.getline()
if not self.is_valid_upload(_picked):
raise FediBot.InvalidPost("Image not found {}".format(_picked))
picked_profile = self.get_profile(_picked)
if not picked_profile:
raise FediBot.InvalidPost("No matching profile {}".format(_picked))
picked = YanBotHash(_picked, picked_profile)
media_list = self.get_media_list(picked)
spoiler = picked.get_post_setting()["spoiler"]
message = picked.get_post_setting()["message"]
spoiler = picked.get_path_profile()["spoiler"]
message = picked.get_path_profile()["message"]
return {
"picked": picked,
"media_list": media_list,
@ -176,26 +153,23 @@ class YandereBot(FediBot.YandereBot):
def after_post(self, picked):
if picked:
self.blacklist(picked["picked"])
self.print_header_stats(picked["picked"])
else:
self.print_header_stats(None)
def post(self, callback=None):
picked = None
reinsert_image = False
try:
picked = self.pick()
return super(YandereBot, self).post(lambda: picked)
return super(YandereBot, self).post()
# Attempted to post a file that doesn't exist (immediately repost ignoring retry_seconds)
except (FileNotFoundError, FediBot.InvalidPost):
if picked:
print("File not found:", picked["picked"].get_hash_path())
except (FileNotFoundError, FediBot.InvalidPost) as e:
print(e)
reinsert_image = False
# Check if the file limit has been reached
except MastodonAPIError as e:
print("API Error:", e)
reinsert_image = True
# Check if the file limit has been reached (413 error)
with contextlib.suppress(IndexError):
reinsert_image = e.args[1] != 413
@ -214,8 +188,8 @@ class YandereBot(FediBot.YandereBot):
# Exception flags
reinsert_image = True
if picked and reinsert_image and self.consecutive_failed_uploads < self.settings["settings_behavior"]["max_errors"]:
self.listPictures.insert(0, picked["picked"])
if reinsert_image and self.can_post():
self.rewindline()
self.handle_post_exception()
@ -224,18 +198,12 @@ class YandereBot(FediBot.YandereBot):
def can_post(self):
return bool(len(self.listPictures)) and super(YandereBot, self).can_post()
return (not self.listPictures.closed) and super(YandereBot, self).can_post()
def start(self):
super(YandereBot, self).start()
# Return 1 if there are still pictures in the picture list
return len(self.listPictures) > 0
class FailedToLoadCfg(Exception):
pass
return not self.listPictures.closed
# Entry point if run from the command line
def main():
@ -247,42 +215,26 @@ def main():
description="A bot for posting on Mastodon",
# epilog="All switches can be combined for greater control",
add_help=True)
parser.add_argument("masterfile", help="Override master file. The '-' symbol represents stdin", nargs="?")
parser.add_argument("--dry-run", help="Will not login or post to Plemora", action="store_true")
parser.add_argument("--debug", help="Same as --dry-run", action="store_true")
parser.add_argument("-c", "--config", help="Set custom config file (Default: {})".format(default_cfg), default=default_cfg)
parser.add_argument("-k", "--keyfile", help="Keyfile used for decryption")
parser.add_argument("-o", "--output", help="Output master list to stdout", action="store_true")
parser.add_argument("-l", "--line", help="Seek to line number", default=0)
parser.add_argument("remainder", help=argparse.SUPPRESS, nargs=argparse.REMAINDER)
arguments = parser.parse_args()
# Yandere Lewd Bot
yandere = None
yandere_config = None
# Configuration file for Yandere Lewd Bot
try:
import importlib
yandere_config = importlib.import_module(arguments.config)
except ImportError:
raise FailedToLoadCfg("Invalid config file: {}".format(arguments.config))
# Flag if the bot is running in debug mode
debug_mode = arguments.dry_run or arguments.debug or arguments.output
debug_mode = arguments.dry_run or arguments.debug
yandere = YandereBot(
yandere_config,
arguments.masterfile,
int(arguments.line),
arguments.config,
arguments.keyfile,
debug_mode
)
# Output master list if -o switch is set
if arguments.output:
with contextlib.redirect_stdout(None):
yandere.prime_bot()
for item in yandere.listPictures:
print(item.get_full_string())
return 0
# Setup exit calls
# Must be done after we declare our bot(s), otherwise this will be called if quitting on decrypting settings )
def yandere_quit(signo, _frame):
@ -300,15 +252,12 @@ if __name__ == "__main__":
# A return value of 0 or 1 is a normal exit
try:
sys.exit(main())
# Exceptions raised from the main function
except FailedToLoadCfg:
sys.exit(6)
# Exceptions raised from the bot
except MissingMasterList:
sys.exit(3)
except FediBot.Debug:
sys.exit(5)
except FediBot.BadCfgFile:
sys.exit(4)
except BadPostSettings:
sys.exit(3)
except FediBot.FailedLogin:
sys.exit(2)

View File

@ -1,162 +0,0 @@
#! /usr/bin/env python3
# Yandere Lewd Bot, an image posting bot for Pleroma
# Copyright (C) 2022 Anon <@Anon@yandere.cc>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# yanlib.py is required by yandereBot.py
# This file contains functions and utilities that may be useful to external tools and programs that interface with
# yandereBot, or manipulate hash files in some way. Typically instantiating a yandereBot object is unnecessary for this.
import os
# Requires properly formatted MD5 checksum lines.
# The length of the string shouldn't matter.
# But it should be formatted like so: '8d20714ec6d3ee6d444931d01dd68626 *./rsc/file.png'
# The initializer does NOT check for the validity of the hash string. This should be done manually with is_valid_hash()
# ex. hashes = [HashObject(i) for i in hash_list if is_valid_hash(i)]
# See: get_hash_list() and get_hash_list_str() for functions to make the above example even more simple
class HashObject:
_sHash = ""
_sBinaryChar = ""
_sPath = ""
_sDeliminator = ' ' # This should never change
def __init__(self, s_hash=None):
if s_hash is not None:
self.set_hash(s_hash)
def set_hash(self, s):
line = s.strip()
split_hash = line.split(self._sDeliminator, 1)
self._sHash = split_hash[0] # The hash value
self._sBinaryChar = split_hash[1][:1] # The binary character
self._sPath = split_hash[1][1:] # The path to the file
def get_hash_string(self):
return self._sHash
def get_hash_path(self):
return self._sPath
def get_binary_char(self):
return self._sBinaryChar
def get_deliminator(self):
return self._sDeliminator
def get_full_string(self):
return (
self._sHash +
self._sDeliminator +
self._sBinaryChar +
self._sPath)
# -------------------------------- HASH FUNCTIONS --------------------------------------------
# For MD5 hashes
# Checking for the binary character when comparing hash string is usually overkill since the CoreUtils package reads
# files in binary mode regardless of whether the -b switch is passed.
# It is included here since it may be useful for some other applications besides the bot.
def is_matching_atter_binary(v_list, v_hash, callback):
return next(
(x for x in v_list if
callback(v_hash) == callback(x) and
v_hash.get_binary_char() == x.get_binary_char()
), None)
def is_matching_atter(v_list, v_hash, callback):
return next(
(x for x in v_list if
callback(v_hash) == callback(x)
), None)
def get_matching(v_list, v_hash, callback, match_bin=False):
return is_matching_atter(v_list, v_hash, callback) if not match_bin else is_matching_atter_binary(v_list, v_hash, callback)
def get_matching_hash_in_list(v_list, v_hash, match_bin=False):
return get_matching(v_list, v_hash, lambda x: x.get_hash_string(), match_bin)
def get_matching_path_in_list(v_list, v_hash, match_bin=False):
return get_matching(v_list, v_hash, lambda x: x.get_hash_path(), match_bin)
def get_matching_full_string_in_list(v_list, v_hash, match_bin=False):
return get_matching(v_list, v_hash, lambda x: x.get_full_string(), match_bin)
# ---------------------------- HASH LIST FUNCTIONS --------------------------------------------
# This is a very lazy check. We only care if the bot will crash when initializing a HashObject()
# A 'hash' of '1 *b' or '1 b' will pass in this function.
def is_valid_hash(s):
line = s.strip()
split_hash = line.find(' ')
# Check
return not (
line.startswith('#') or
len(line) < 4 or
split_hash in (-1, len(line) - 2) or
line[split_hash + 1] not in (' ', '*')
)
def get_hash_list_str(lst_str):
return [HashObject(i) for i in lst_str if is_valid_hash(i)]
def get_hash_list(f_name):
with open(f_name, "r") as f:
return get_hash_list_str(f.readlines())
def is_hash_blacklisted(line_hash, v_pictures, v_blacklist, max_size=None):
# Print out the entire line
line = line_hash.get_full_string()
full_path = line_hash.get_hash_path()
# Used for comparing file size
cmp_size = max_size is not None
# Only add it to the pictures list if the following conditions are met:
# NOTICE: This is going to be CPU intensive.
if get_matching_hash_in_list(v_pictures, line_hash): # No hash duplicates
print("Ignoring Duplicate Hash:", line)
elif get_matching_path_in_list(v_pictures, line_hash): # No path duplicates
print("Ignoring Duplicate Path:", line)
elif get_matching_hash_in_list(v_blacklist, line_hash): # File hash is not blacklisted
print("Ignoring Blacklisted Hash:", line)
elif get_matching_path_in_list(v_blacklist, line_hash): # File path is not blacklisted
print("Ignoring Blacklisted Path:", line)
elif not os.path.isfile(full_path): # File exists
print("File not found:", line)
elif cmp_size and os.path.getsize(full_path) > max_size: # Does not exceed max file limit (if max_size is set)
print("Exceeds Max File Size:", line)
else:
return False
return True
def get_hash_list_blacklist(v_pictures, v_blacklist, max_size=None):
ret = []
for lineHash in v_pictures:
if not is_hash_blacklisted(lineHash, ret, v_blacklist, max_size):
ret.append(lineHash)
return ret