Refactor to make bot modular
This commit is contained in:
parent
c9e3b165eb
commit
98a6cd40fb
200
src/main.py
200
src/main.py
@ -16,11 +16,199 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import signal
|
||||
import yandere_bot
|
||||
import common
|
||||
import contextlib
|
||||
import yanlib
|
||||
import fnmatch
|
||||
from functools import reduce
|
||||
from mastodon import MastodonAPIError
|
||||
|
||||
|
||||
class BadPostSettings(Exception):
|
||||
pass
|
||||
|
||||
class MissingMasterList(Exception):
|
||||
pass
|
||||
|
||||
class YanBotHash(yanlib.HashObject):
|
||||
_postSettings = None
|
||||
|
||||
def __init__(self, hash_obj, profile):
|
||||
super(YanBotHash, self).__init__()
|
||||
if hash_obj is None:
|
||||
return
|
||||
self._sHash = hash_obj.get_hash_string()
|
||||
self._sBinaryChar = hash_obj.get_binary_char()
|
||||
self._sPath = hash_obj.get_hash_path()
|
||||
self._postSettings = profile
|
||||
|
||||
def get_post_setting(self):
|
||||
return self._postSettings
|
||||
|
||||
# A callback function for get_list_of_hashes_with_profiles() that returns a single profile from @param hash_obj
|
||||
# @param hash_obj A HashObject() (or subclass)
|
||||
# @param profiles A list of available profiles to match
|
||||
# @param profiles_default The default profile to return if no profile is matched
|
||||
def get_profile(hash_obj, profiles, profiles_default):
|
||||
profile_gen = (x for x in profiles if fnmatch.fnmatch(hash_obj.get_hash_path(), x["path"]))
|
||||
return next(profile_gen, profiles_default)
|
||||
|
||||
|
||||
# Takes a file path and transforms it into a list of YanBotHash() with the appropriate profile
|
||||
# @param f_name Path of hash file
|
||||
# @param profiles List of profiles -> self.settings_post
|
||||
# @param profiles_default The default profile to apply
|
||||
# @param callback_get_profile Callback function -> should return a single profile. Default: get_profile()
|
||||
def get_list_of_hashes_with_profiles(f_name, profiles, profile_default):
|
||||
return [YanBotHash(i, get_profile(i, profiles, profile_default)) for i in yanlib.get_hash_list(f_name)]
|
||||
|
||||
|
||||
class YandereBot(common.YandereBot):
|
||||
listPictures = []
|
||||
lenBlacklist = 0
|
||||
|
||||
def __init__(self, cfg, keyfile=None, debug_mode=False):
|
||||
settings = {
|
||||
"settings_time": {},
|
||||
"settings_post": {},
|
||||
"settings_post_default": {}
|
||||
}
|
||||
self.settings.update(settings)
|
||||
super(YandereBot, self).__init__(cfg, keyfile, debug_mode)
|
||||
|
||||
# [BEGIN THE PROGRAM]
|
||||
def prime_bot(self):
|
||||
if self.primed:
|
||||
return
|
||||
self.load_picture_list()
|
||||
self.validate_post_settings()
|
||||
super(YandereBot, self).prime_bot()
|
||||
|
||||
# Make sure there are no profiles in listPictures set to none. Print the bad post and exit if there is.
|
||||
def validate_post_settings(self):
|
||||
bad_post_count = 0
|
||||
for i in self.listPictures:
|
||||
if i.get_post_setting() is None:
|
||||
print("Bad post setting [{}]: {}".format(bad_post_count, i.get_full_string()))
|
||||
bad_post_count += 1
|
||||
if bad_post_count:
|
||||
raise BadPostSettings
|
||||
|
||||
# Set up lists
|
||||
def read_blacklist_files(self):
|
||||
list_blacklist = []
|
||||
for i in self.settings["settings_behavior"]["master_blacklist_r"]:
|
||||
# It doesn't matter if the picture file doesn't exist
|
||||
with contextlib.suppress(IOError):
|
||||
list_blacklist.extend(yanlib.get_hash_list(i))
|
||||
return list_blacklist
|
||||
|
||||
def blacklist(self, picked):
|
||||
self.lenBlacklist += 1
|
||||
for path in self.settings["settings_behavior"]["master_blacklist_w"]:
|
||||
with open(path, "a") as f:
|
||||
print(picked.get_full_string(), file=f)
|
||||
|
||||
# Returns a list of media paths (without the hashes)
|
||||
def get_media_list(self, picked):
|
||||
ext = self.settings["settings_behavior"]["multi_media_ext"]
|
||||
if not picked:
|
||||
return None
|
||||
elif ext and os.path.splitext(picked.get_hash_path())[1].lower() == ext.lower():
|
||||
return [i.get_hash_path() for i in yanlib.get_hash_list(picked.get_hash_path())]
|
||||
else:
|
||||
return [picked.get_hash_path()]
|
||||
|
||||
# load_pictures will return a list of YanHashObj() with a blacklist(s) applied
|
||||
# @param list_blacklist A list of HashObjects() that are blacklist hashes
|
||||
def load_pictures(self, list_blacklist):
|
||||
if not self.settings["settings_behavior"]["master_list"]:
|
||||
raise MissingMasterList
|
||||
|
||||
try:
|
||||
list_pictures = reduce(lambda x, y: x + y,
|
||||
[get_list_of_hashes_with_profiles(
|
||||
f,
|
||||
self.settings["settings_post"],
|
||||
self.settings["settings_post_default"])
|
||||
for f in self.settings["settings_behavior"]["master_list"]
|
||||
])
|
||||
return yanlib.get_hash_list_blacklist(list_pictures, list_blacklist, self.settings["settings_behavior"]["max_size"])
|
||||
except IOError as e:
|
||||
print(e)
|
||||
raise MissingMasterList
|
||||
|
||||
def get_post_text(self, picked, media_list):
|
||||
super(YandereBot, self).get_post_text(picked.get_post_setting()["message"], media_list)
|
||||
|
||||
def load_picture_list(self):
|
||||
list_blacklist = self.read_blacklist_files()
|
||||
self.listPictures = self.load_pictures(list_blacklist)
|
||||
self.lenBlacklist = len(list_blacklist)
|
||||
|
||||
# Maybe I should remove this from the backend?
|
||||
def print_header_stats(self, picked):
|
||||
_picked = picked.get_full_string() if picked else None
|
||||
picked_profile = picked.get_post_setting()["name"] if picked else None
|
||||
picked_next = self.listPictures[0].get_full_string() if self.listPictures else None
|
||||
picked_next_profile = self.listPictures[0].get_post_setting()["name"] if self.listPictures else None
|
||||
|
||||
print("Profile: {} | Picked: {} | Next_Profile: {} | Next_Pick: {}".format(
|
||||
picked_profile, _picked, picked_next_profile, picked_next
|
||||
))
|
||||
|
||||
def pick(self):
|
||||
return self.listPictures.pop(0)
|
||||
|
||||
def after_pick(self, picked):
|
||||
self.blacklist(picked)
|
||||
self.print_header_stats(picked)
|
||||
|
||||
def post(self, picked):
|
||||
reinsert_image = False
|
||||
try:
|
||||
return super(YandereBot, self).post(picked)
|
||||
# Attempted to post a file that doesn't exist (immediately repost ignoring retry_seconds)
|
||||
except (FileNotFoundError, common.InvalidPost):
|
||||
print("File not found:", picked.get_hash_path())
|
||||
reinsert_image = False
|
||||
|
||||
# Check if the file limit has been reached
|
||||
except MastodonAPIError as e:
|
||||
print("API Error:", e)
|
||||
# Check if the file limit has been reached (413 error)
|
||||
with contextlib.suppress(IndexError):
|
||||
reinsert_image = e.args[1] != 413
|
||||
|
||||
# Server Errors and other general exceptions
|
||||
# Assume all exceptions are on the server side (besides FileNotFoundError of course)
|
||||
# If the connection is timing out it could be for two reasons:
|
||||
# 1. The error was caused by the user attempting to upload a large file over a slow connection:
|
||||
# a. FIX: Reduce settings_behavior["max_size"]
|
||||
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
|
||||
# mastodon.py API will not specify why the connection timed out).
|
||||
# 3. Failed to generate screenshot
|
||||
# 4. Other general exceptions
|
||||
except Exception as e:
|
||||
print("Unhandled Exception:", e)
|
||||
# Exception flags
|
||||
reinsert_image = True
|
||||
|
||||
if reinsert_image and self.consecutive_failed_uploads < self.settings["settings_behavior"]["max_errors"]:
|
||||
self.listPictures.insert(0, picked)
|
||||
|
||||
self.handle_post_exception()
|
||||
|
||||
# The post failed
|
||||
return None
|
||||
|
||||
|
||||
def can_post(self):
|
||||
return bool(len(self.listPictures)) and super(YandereBot, self).can_post()
|
||||
|
||||
|
||||
class FailedToLoadCfg(Exception):
|
||||
@ -59,7 +247,7 @@ def main():
|
||||
# Flag if the bot is running in debug mode
|
||||
debug_mode = arguments.dry_run or arguments.debug or arguments.output
|
||||
|
||||
yandere = yandere_bot.YandereBot(
|
||||
yandere = YandereBot(
|
||||
yandere_config,
|
||||
arguments.keyfile,
|
||||
debug_mode
|
||||
@ -94,11 +282,11 @@ if __name__ == "__main__":
|
||||
except FailedToLoadCfg:
|
||||
sys.exit(6)
|
||||
# Exceptions raised from the bot
|
||||
except yandere_bot.Debug:
|
||||
except common.Debug:
|
||||
sys.exit(5)
|
||||
except yandere_bot.BadCfgFile:
|
||||
except common.BadCfgFile:
|
||||
sys.exit(4)
|
||||
except yandere_bot.BadPostSettings:
|
||||
except BadPostSettings:
|
||||
sys.exit(3)
|
||||
except yandere_bot.FailedLogin:
|
||||
except common.FailedLogin:
|
||||
sys.exit(2)
|
||||
|
Reference in New Issue
Block a user