Compare commits

..

No commits in common. "5326c48e3c80c58cb9262a32c8a63ca17b1e6988" and "93bf15a5de2221b95ef83ac1bd4459318e202b92" have entirely different histories.

4 changed files with 421 additions and 164 deletions

4
run.sh
View File

@ -30,10 +30,6 @@ cd "$RUN_DIR"
source "$VENV" source "$VENV"
"$ENTRY" "$@" "$ENTRY" "$@"
RETURN_CODE="$?"
# Cleanup # Cleanup
deactivate deactivate
cd - > /dev/null cd - > /dev/null
exit "$RETURN_CODE"

View File

@ -28,13 +28,15 @@ from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from collections import OrderedDict from collections import OrderedDict
class EncryptionFail(Exception):
pass
class PasswordMismatch(Exception): class PasswordMismatch(Exception):
pass pass
# Salts
def generate_salt():
return os.urandom(16)
# Return string from bytes # Return string from bytes
def salt_encode(b): def salt_encode(b):
return base64.urlsafe_b64encode(b).decode() return base64.urlsafe_b64encode(b).decode()
@ -70,7 +72,8 @@ def derive_key(password, salt):
iterations=100000, iterations=100000,
backend=default_backend() backend=default_backend()
) )
return base64.urlsafe_b64encode(kdf.derive(password)) r_key = base64.urlsafe_b64encode(kdf.derive(password))
return r_key
# Encryption functions # Encryption functions
@ -96,62 +99,94 @@ def decrypt(token, key):
# encryption_function: encrypt(message, key) : decrypt(token, key): # encryption_function: encrypt(message, key) : decrypt(token, key):
# Returns settings_server_decrypted dictionary with Byte() values. Will need to use # Returns settings_server_decrypted dictionary with Byte() values. Will need to use
# ChangeEncodingDict to make them strings (recommended cfg file friendly) # ChangeEncodingDict to make them strings (recommended cfg file friendly)
def encrypt_settings(settings_server, password, salt, encryption_function): def __settings_server(password, salt, settings_server, encryption_function):
key = derive_key(password, salt) key = derive_key(password, salt)
settings_server_decrypted = OrderedDict() settings_server_decrypted = OrderedDict()
for setting in settings_server: for setting in settings_server:
settings_server_decrypted[setting] = encryption_function(settings_server[setting], key) settings_server_decrypted[setting] = encryption_function(settings_server[setting], key)
return settings_server_decrypted return settings_server_decrypted
def get_keyfile(keyfile=None):
if keyfile is not None: # Returns (salt, settings_server)
with open(keyfile, "rb") as f: def _settings_server_encrypt(settings_server):
return f.read() salt = generate_salt()
return None password = getpass.getpass("Enter password: ")
password2 = getpass.getpass("Retype Password: ")
if password != password2:
raise PasswordMismatch
settings_server_encrypted = __settings_server(password.encode(), salt, encode_dict(settings_server), encrypt)
return salt, settings_server_encrypted
def get_pass(q): # Returns (settings_server)
try: def _settings_server_decrypt(settings_server, settings_encrypt):
return getpass.getpass(q).encode() settings_server_encoded = encode_dict(settings_server)
except KeyboardInterrupt: if settings_encrypt["encrypt"]:
raise EncryptionFail("\nQuitting...")
def settings_server_encrypt(settings_server, keyfile=None):
try:
settings_server = encode_dict(settings_server)
salt = os.urandom(16)
password = get_keyfile(keyfile)
if password is None:
password = get_pass("Enter password: ")
password2 = get_pass("Enter password: ")
if password != password2:
raise PasswordMismatch("Passwords do not match")
encrypted = encrypt_settings(settings_server, password, salt, encrypt)
return salt_encode(salt), decode_dict(encrypted)
except PasswordMismatch as e:
raise EncryptionFail(str(e))
except Exception as e:
err = str(e)
raise EncryptionFail("Encrypt Error: {}".format(err))
def settings_server_decrypt(settings_server, settings_encrypt, keyfile=None):
try:
if not settings_encrypt["encrypt"]:
return settings_server
settings_server = encode_dict(settings_server)
password = get_keyfile(keyfile or settings_encrypt["keyfile"]) or get_pass("Enter password: ")
salt = salt_decode(settings_encrypt["salt"]) salt = salt_decode(settings_encrypt["salt"])
decrypted = encrypt_settings(settings_server, password, salt, decrypt) password = getpass.getpass("Enter password: ")
return decode_dict(decrypted) return __settings_server(password.encode(), salt, settings_server_encoded, decrypt)
except base64.binascii.Error: else:
raise EncryptionFail("Salt is invalid") return settings_server_encoded
# Wrapper function that will catch exceptions and exit
def settings_server_new(function, **kwargs):
try:
return function(**kwargs)
# If the user cancels the login
except KeyboardInterrupt:
print("\nQuitting...")
# If the user passwords do not match (encrypt)
except PasswordMismatch:
print("Passwords do not match...")
# Incorrect password entered (decrypt)
except InvalidToken: except InvalidToken:
raise EncryptionFail("Password or token is incorrect") print("Password or Token Incorrect...")
# Probably the salt value got modified
except base64.binascii.Error:
print("Salt is invalid...")
# Some other kind of fuck up
except Exception as e: except Exception as e:
err = str(e) print("Unknown exception occurred...")
raise EncryptionFail("Decrypt Error: {}".format(err)) print(e)
# Exit if an exception was thrown
sys.exit(1)
# Glue functions that package **kwargs automatically
def settings_server_encrypt(settings_server):
kwargs = {"settings_server": settings_server}
return settings_server_new(_settings_server_encrypt, **kwargs)
def settings_server_decrypt(settings_server, settings_encrypt):
kwargs = {
"settings_server": settings_server,
"settings_encrypt": settings_encrypt
}
return settings_server_new(_settings_server_decrypt, **kwargs)
# The _cfg functions should return a regular string
# These are the functions that should interface with the bot a return a plain string
# settings_server ordered dictionary
def settings_server_encrypt_cfg(settings_server):
salt, settings_server = settings_server_encrypt(settings_server)
return salt_encode(salt), decode_dict(settings_server)
def settings_server_decrypt_cfg(settings_server, settings_encrypt):
settings_server = settings_server_decrypt(settings_server, settings_encrypt)
return decode_dict(settings_server)
def main(): def main():
@ -167,7 +202,6 @@ def main():
parser.add_argument("--encrypt", help="Generate encrypted authentication.", action="store_true") parser.add_argument("--encrypt", help="Generate encrypted authentication.", action="store_true")
parser.add_argument("--decrypt", help="Decrypt encrypted authentication", action="store_true") parser.add_argument("--decrypt", help="Decrypt encrypted authentication", action="store_true")
parser.add_argument("--recrypt", help="Recrypt encrypted authentication", action="store_true") parser.add_argument("--recrypt", help="Recrypt encrypted authentication", action="store_true")
parser.add_argument("-k", "--keyfile", help="Keyfile used for decryption", default=None)
parser.add_argument("-c", "--cfg", help="Specify config file.", default=default_cfg) parser.add_argument("-c", "--cfg", help="Specify config file.", default=default_cfg)
arguments = parser.parse_args() arguments = parser.parse_args()
@ -180,28 +214,25 @@ def main():
import importlib import importlib
cfg = importlib.import_module(arguments.cfg) cfg = importlib.import_module(arguments.cfg)
settings_server = cfg.settings_server settings_server = cfg.settings_server
settings_encrypt = cfg.settings_encrypt settings_encrypt = None
keyfile = arguments.keyfile or settings_encrypt["keyfile"]
if arguments.decrypt and arguments.encrypt: if arguments.decrypt and arguments.encrypt:
print("Re-encrypting") print("Re-encrypting")
if arguments.decrypt: # arguments.decrypt if arguments.decrypt: # arguments.decrypt
print("Decrypt...") print("Decrypt...")
settings_server = settings_server_decrypt(settings_server, settings_encrypt, arguments.keyfile) settings_server = settings_server_decrypt_cfg(cfg.settings_server, cfg.settings_encrypt)
settings_encrypt = OrderedDict([ settings_encrypt = OrderedDict([
("encrypt", False), ("encrypt", False),
("salt", settings_encrypt["salt"]), ("salt", cfg.settings_encrypt["encrypt"])
("keyfile", arguments.keyfile)
]) ])
if arguments.encrypt: if arguments.encrypt:
print("Encrypt...") print("Encrypt...")
salt, settings_server = settings_server_encrypt(settings_server, keyfile) salt, settings_server = settings_server_encrypt_cfg(settings_server)
settings_encrypt = OrderedDict([ settings_encrypt = OrderedDict([
("encrypt", True), ("encrypt", True),
("salt", salt), ("salt", salt)
("keyfile", arguments.keyfile)
]) ])
print("settings_server = {}".format(pformat(settings_server))) print("settings_server = {}".format(pformat(settings_server)))
@ -210,8 +241,4 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
try: sys.exit(main())
sys.exit(main())
except EncryptionFail as e:
print(e)
sys.exit(1)

View File

@ -24,6 +24,142 @@ import datetime
import contextlib import contextlib
# A class that inherits from YandereBot from the yandere_bot module
# This class is used to handle command line arguments gracefully, and extend functionality quickly
# Bot specific changes should be made here (if they are minor enough).
# This will be instantiated from the main() function
class YandereBot(yandere_bot.YandereBot):
# The below settings are required from the configuration module
settings_time = None
settings_reminder = None
# Wait before running
delay_start = 0
def __init__(self, cfg, debug_mode, prime_bot, delay_d=None, delay_h=None, delay_s=None):
super(YandereBot, self).__init__(cfg, debug_mode, prime_bot=False)
self.load_settings(self.cfg, ("settings_time", "settings_reminder"))
self.set_pretimer(delay_d, delay_h, delay_s)
if self.debug_mode:
print("[DEBUG MODE ON - DRY RUN BEGIN]")
# Do not perform sanity test if stdout is None.
# input() will fail if stdout is None, which could happen with the following command
# ./run --dry-run -h -d 'a date in the past'
if sys.stdout is not None and not self.pass_sanity_test():
raise FailedSanityTest
if prime_bot:
self.prime_bot()
def print_date_time_example(self):
print_fmt = " {0:6} {1:10} {2}"
time_fmt = self.settings_time["time_format"]
date_fmt = self.settings_time["date_format"]
current_time = self.dateSelection
print(print_fmt.format(
"TIME", time_fmt, current_time.strftime(time_fmt)
))
print(print_fmt.format(
"DATE", date_fmt, current_time.strftime(date_fmt)
))
def set_delay_d(self, d):
try:
t = datetime.datetime.strptime(d, self.settings_time["date_format"])
self.dateNextSelection = self.dateNextSelection.replace(
year=t.year, month=t.month, day=t.day
)
except Exception:
print("Invalid date format: {}\n\nCorrect date/time format examples:".format(d))
self.print_date_time_example()
raise DateWrongFormat
def set_delay_h(self, h, add_24):
try:
t = datetime.datetime.strptime(h, self.settings_time["time_format"])
self.dateNextSelection = self.dateNextSelection.replace(
hour=t.hour, minute=t.minute, second=t.second, microsecond=t.microsecond
)
if self.dateNextSelection < self.dateSelection and add_24:
self.dateNextSelection = yandere_bot.time_add_seconds(self.dateNextSelection, 60 * 60 * 24)
except Exception:
print("Invalid time format: {}\n\nCorrect date/time format examples:".format(h))
self.print_date_time_example()
raise TimeWrongFormat
def set_pretimer(self, d=None, h=None, s=0):
if d:
self.set_delay_d(d)
if h:
self.set_delay_h(h, d is None)
if s:
self.delay_start = max(0, s)
# Check for potential misconfigurations by the user
def pass_sanity_test(self):
# Calculate pre-timer value
seconds_until_next_pos = yandere_bot.time_diff_seconds(self.dateNextSelection, self.dateSelection)
# Possible misconfigurations that will prompt the user to continue
pretimer_less_than_zero = seconds_until_next_pos < 0
pretimer_greater_than_sleep = seconds_until_next_pos > self.settings_behavior["sleep_seconds"]
# Prompt the user
prompt_user = pretimer_less_than_zero or pretimer_greater_than_sleep
# Remind the user to generate new OAuth tokens
dt = datetime.datetime.strptime(self.settings_reminder, self.settings_time["long_date_format"])
if dt < datetime.datetime.now():
print("REMINDER: Generate new tokens!!")
# Check if the bot is back-posting in time and make sure this is what the user wanted to avoid spamming
if pretimer_less_than_zero:
sleep = round(abs(seconds_until_next_pos), 2)
images = round(sleep / (self.settings_behavior["sleep_seconds"] * self.settings_behavior["uploads_per_post"]), 2) + 1
print("WARNING: Pre-timer is less than the current time by: {} seconds. {} images will post immediately".format(
sleep, images
))
# Check if the bot will wait for longer than the default amount of sleep time configured in the cfg.py
elif pretimer_greater_than_sleep:
print("WARNING: Pre-timer will sleep for {} seconds. This is more than the configured amount ({} seconds)".format(
round(seconds_until_next_pos, 2), self.settings_behavior["sleep_seconds"]
))
# Prompt the user if something doesn't seem right
# This must be done before we set up our keyboard interrupts. Otherwise the below exceptions will not work.
try:
if prompt_user:
# Default to 'y' if the user just presses enter
ans = input("Do you want to continue [Y/n]? ") or "y"
return ans.lower() in ("y", "yes")
except (EOFError, KeyboardInterrupt):
print()
return False
# Sanity test passed
return True
def start(self, delay=None):
_delay = delay or self.delay_start
return super(YandereBot, self).start(max(0, _delay))
# Custom exceptions
class TimeWrongFormat(Exception):
pass
class DateWrongFormat(Exception):
pass
class FailedSanityTest(Exception):
pass
class FailedToLoadCfg(Exception): class FailedToLoadCfg(Exception):
pass pass
@ -37,14 +173,20 @@ def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="A bot for posting on Mastodon", description="A bot for posting on Mastodon",
# epilog="All switches can be combined for greater control", # epilog="All switches can be combined for greater control",
add_help=True) add_help=False)
parser.add_argument("--dry-run", help="Will not login or post to Plemora", action="store_true") parser.add_argument("--dry-run", help="Will not login or post to Plemora", action="store_true")
parser.add_argument("--debug", help="Same as --dry-run", action="store_true") parser.add_argument("--debug", help="Same as --dry-run", action="store_true")
parser.add_argument("-w", "--wait", type=int, help="Wait before posting first image (seconds)", default=0)
parser.add_argument("-t", "--time", help="Wait for time before posting first image", default=None)
parser.add_argument("-d", "--date", help="Wait for date before posting first image", default=None)
parser.add_argument("-c", "--config", help="Set custom config file (Default: {})".format(default_cfg), default=default_cfg) parser.add_argument("-c", "--config", help="Set custom config file (Default: {})".format(default_cfg), default=default_cfg)
parser.add_argument("-k", "--keyfile", help="Keyfile used for decryption", default=None) parser.add_argument("-h", "--help", help="Show this help message and exit", action="store_true")
parser.add_argument("remainder", help=argparse.SUPPRESS, nargs=argparse.REMAINDER) parser.add_argument("remainder", help=argparse.SUPPRESS, nargs=argparse.REMAINDER)
arguments = parser.parse_args() arguments = parser.parse_args()
# Redirect stdout when the bot first initializes if the bot is not going to run normally
redirect_stdout = None if arguments.help else sys.stdout
# Yandere Lewd Bot # Yandere Lewd Bot
yandere = None yandere = None
yandere_config = None yandere_config = None
@ -54,16 +196,31 @@ def main():
import importlib import importlib
yandere_config = importlib.import_module(arguments.config) yandere_config = importlib.import_module(arguments.config)
except ImportError: except ImportError:
raise FailedToLoadCfg("Invalid config file: {}".format(arguments.config)) print("Failed to Load Configuration:", arguments.config)
raise FailedToLoadCfg
# Flag if the bot is running in debug mode # Flag if the bot is running in debug mode
debug_mode = arguments.dry_run or arguments.debug debug_mode = (arguments.dry_run or arguments.debug or arguments.help)
yandere = yandere_bot.YandereBot( with contextlib.redirect_stdout(redirect_stdout):
yandere_config, prime_bot = not arguments.help
arguments.keyfile,
debug_mode yandere = YandereBot(
) yandere_config,
debug_mode,
prime_bot,
arguments.date,
arguments.time,
arguments.wait
)
# Print Usage Information with Time and Date Formats with Examples
if arguments.help:
parser.print_help()
print()
yandere.print_date_time_example()
print()
return 0
# Setup exit calls # Setup exit calls
# Must be done after we declare our bot(s), otherwise this will be called if quitting on decrypting settings ) # Must be done after we declare our bot(s), otherwise this will be called if quitting on decrypting settings )
@ -85,6 +242,13 @@ if __name__ == "__main__":
# Exceptions raised from the main function # Exceptions raised from the main function
except FailedToLoadCfg: except FailedToLoadCfg:
sys.exit(10) sys.exit(10)
except FailedSanityTest:
sys.exit(9)
except DateWrongFormat:
sys.exit(8)
except TimeWrongFormat:
sys.exit(7)
# Exceptions raised from the bot # Exceptions raised from the bot
except yandere_bot.Debug: except yandere_bot.Debug:
sys.exit(6) sys.exit(6)

View File

@ -28,7 +28,7 @@ from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, M
# A class that contains all of the rendering information for a single post # A class that contains all of the rendering information for a single post
class PickAndRenderFrame: class YandereFrame:
# From Config File # From Config File
profile_name = "" profile_name = ""
path = "" path = ""
@ -36,17 +36,16 @@ class PickAndRenderFrame:
skip = tuple() skip = tuple()
nsfw = tuple() nsfw = tuple()
output_name = "" output_name = ""
output_name_tr = ""
translation = {}
message = "" message = ""
message_nsfw = "" message_nsfw = ""
render_script = "" render_script = ""
# After pick # Picked
picked_frame = None picked_frame = None
output_name_tr = ""
def __init__(self, picked, datetime_format): def __init__(self, picked):
dt_now = datetime.datetime.now()
dt_now_str = datetime.datetime.strftime(dt_now, datetime_format)
self.profile_name = picked["profile_name"] self.profile_name = picked["profile_name"]
self.path = picked["path"] self.path = picked["path"]
self.frames = picked["frames"] self.frames = picked["frames"]
@ -57,33 +56,24 @@ class PickAndRenderFrame:
self.message = picked["message"] self.message = picked["message"]
self.message_nsfw = picked["message_nsfw"] self.message_nsfw = picked["message_nsfw"]
self.render_script = picked["render_script"] self.render_script = picked["render_script"]
self.add_translation("profile_name", self.profile_name)
self.picked_frame = self._pick_frame() def add_translation(self, k, v):
self.translation.update({k: v})
# Shell-like substitutions def pick(self):
translations = { random.seed(os.urandom(16))
"profile_name": self.profile_name,
"frame": str(self.picked_frame),
"datetime": dt_now_str
}
self.output_name_tr = self._translate_basename(translations)
self._render_frame()
def __del__(self):
render_file = self.output_name_tr
if render_file and os.path.isfile(render_file):
os.remove(render_file)
def _pick_frame(self):
picked_frame = None picked_frame = None
while picked_frame is None: while picked_frame is None:
picked_frame = random.random() * self.frames picked_frame = random.random() * self.frames
for skip in self.skip: for skip in self.skip:
begin, end = skip begin, end = skip
if begin <= picked_frame <= end: if begin <= picked_frame <= end:
print("Cannot pick frame:", picked_frame, "from:", self.profile_name)
picked_frame = None picked_frame = None
break break
self.picked_frame = picked_frame
self.add_translation("frame", str(picked_frame))
return picked_frame return picked_frame
def is_nsfw(self): def is_nsfw(self):
@ -93,25 +83,29 @@ class PickAndRenderFrame:
return True return True
return False return False
def _translate_basename(self, translations): def translate_basename(self):
output_name_tr = self.output_name output_name = self.output_name
for k,v in translations.items(): output_name_tr = output_name
for k,v in self.translation.items():
replace_token = "${" + k + "}" replace_token = "${" + k + "}"
output_name_tr = output_name_tr.replace(replace_token, v) output_name_tr = output_name_tr.replace(replace_token, v)
self.output_path_tr = output_name_tr
return output_name_tr return output_name_tr
# TODO: Add translation keywords to the message # TODO: Add translation keywords to the message
def get_message(self): def get_message(self):
return self.message_nsfw if self.is_nsfw() else self.message return self.message_nsfw if self.is_nsfw() else self.message
def _render_frame(self): def render(self):
self.translate_basename()
args = [ args = [
self.render_script, self.render_script,
self.path, self.path,
self.output_name_tr, self.output_path_tr,
str(self.picked_frame) self.translation["frame"]
] ]
subprocess.run(args) subprocess.run(args)
return self.output_path_tr
class YandereBot: class YandereBot:
@ -129,22 +123,27 @@ class YandereBot:
# Class variables # Class variables
mastodon_api = None mastodon_api = None
listPictures = []
failed_uploads = 0 failed_uploads = 0
consecutive_failed_uploads = 0
currentSessionCount = 0 currentSessionCount = 0
debug_mode = False debug_mode = False
primed = False primed = False
decrypted = False decrypted = False
# Time variables
dateSelection = None
dateNextSelection = None
# YandereBot.__init__() # YandereBot.__init__()
# @param cfg A dynamically loaded python module # @param cfg A dynamically loaded python module
# @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma) # @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma)
# prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post) # prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post)
def __init__(self, cfg, keyfile=None, debug_mode=False, prime_bot=True): def __init__(self, cfg, debug_mode=False, prime_bot=True):
self.dateSelection = datetime.datetime.now()
self.dateNextSelection = self.dateSelection
self.cfg = cfg self.cfg = cfg
self.load_settings(self.cfg) self.load_settings(self.cfg)
self.debug_mode = debug_mode or self.settings_behavior["debug"] self.debug_mode = debug_mode or self.settings_behavior["debug"]
self.settings_encrypt["keyfile"] = keyfile or self.settings_encrypt["keyfile"]
if prime_bot: if prime_bot:
self.prime_bot() self.prime_bot()
@ -173,11 +172,7 @@ class YandereBot:
def decrypt_settings(self): def decrypt_settings(self):
if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode: if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode:
import encryption import encryption
try: self.settings_server = encryption.settings_server_decrypt_cfg(self.settings_server, self.settings_encrypt)
self.settings_server = encryption.settings_server_decrypt(
self.settings_server, self.settings_encrypt, self.settings_encrypt["keyfile"])
except encryption.EncryptionFail as e:
raise BadCfgFile(str(e))
self.decrypted = True self.decrypted = True
# Login to Pleroma # Login to Pleroma
@ -200,30 +195,33 @@ class YandereBot:
raise FailedLogin raise FailedLogin
# Maybe I should remove this from the backend? # Maybe I should remove this from the backend?
def print_header_stats(self, picked): def print_header_stats(self, yandere_frame, date_selection, date_next_selection):
profile, frame, nsfw, path = None, None, None, None profile, frame, nsfw, path = None, None, None, None
if picked: if yandere_frame:
profile = picked.profile_name profile = yandere_frame.profile_name
frame = picked.picked_frame frame = yandere_frame.picked_frame
nsfw = picked.is_nsfw() nsfw = yandere_frame.is_nsfw()
path = picked.output_name_tr path = yandere_frame.output_path_tr
print("Profile: {} | Frame: {} | NSFW: {} | Path: {}".format( print("\n==== PLEROMA ====")
profile, frame, nsfw, path print("Profile: {} | Frame: {} | NSFW: {}".format(
profile, frame, nsfw
)) ))
print("Path:", path)
next_selection_seconds = max(0, int(time_diff_seconds(date_next_selection, date_selection)))
print("Selection time: {}".format(
date_selection.strftime(self.settings_time["long_date_format"])) )
print("Next selection time: {} ({} seconds)".format(
date_next_selection.strftime(self.settings_time["long_date_format"]), next_selection_seconds) )
print("[ {} Selected during session | {} Failed ]\n".format(
self.currentSessionCount, self.failed_uploads) )
# Returns a list of tuples that contain the media list path and media mastodon dictionary # Returns a list of tuples that contain the media list path and media mastodon dictionary
def upload_media_list(self, path_list): def upload_media_list(self, path_list):
media_list = [] media_list = []
for path in path_list: for ele in path_list:
if not os.path.isfile(path):
raise FileNotFoundError("Could not upload: {}".format(path))
if not self.debug_mode: if not self.debug_mode:
media = self.mastodon_api.media_post(path, description=os.path.basename(path)) media = self.mastodon_api.media_post(ele, description=os.path.basename(ele))
media_dict = { media_list.append((ele, media))
"path": path,
"media": media
}
media_list.append(media_dict)
return media_list return media_list
def get_post_text(self, yandere_frame, media_list): def get_post_text(self, yandere_frame, media_list):
@ -234,9 +232,8 @@ class YandereBot:
string_imglinks = [] string_imglinks = []
if media_list and self.settings_behavior["post_image_link"]: if media_list and self.settings_behavior["post_image_link"]:
for media_dict in media_list: for ele in media_list:
path = media_dict["path"] path, media = ele
media = media_dict["media"]
if path is None or media is None: if path is None or media is None:
continue continue
elif content_type == "text/markdown" and not self.debug_mode: elif content_type == "text/markdown" and not self.debug_mode:
@ -249,25 +246,31 @@ class YandereBot:
# Join non empty strings with a newline character # Join non empty strings with a newline character
string_imglinks_joined = content_newline.join(filter(None, string_imglinks)) string_imglinks_joined = content_newline.join(filter(None, string_imglinks))
string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined))) string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined)))
return string_post return content_type, string_post
def _post(self, yandere_frame): def _post(self, yandere_frame):
if not yandere_frame: media_list = self.upload_media_list([yandere_frame.output_path_tr])
raise InvalidPost("Frame is none") content_type, message = self.get_post_text(yandere_frame, media_list)
upload_list = [yandere_frame.output_name_tr]
media_list = self.upload_media_list(upload_list)
message = self.get_post_text(yandere_frame, media_list)
if self.debug_mode: if self.debug_mode:
return yandere_frame return yandere_frame
self.mastodon_api.status_post( self.mastodon_api.status_post(
message, message,
media_ids=[i["media"] for i in media_list], media_ids=[i[1] for i in media_list if len(i) == 2],
visibility=self.settings_behavior["visibility"], visibility=self.settings_behavior["visibility"],
sensitive=yandere_frame.is_nsfw(), sensitive=yandere_frame.is_nsfw(),
content_type=self.settings_behavior["content_type"] content_type=content_type
) )
return yandere_frame return yandere_frame
def render_frame(self, picked_profile):
yandere_frame = YandereFrame(picked_profile)
yandere_frame.add_translation("datetime", datetime.datetime.strftime(datetime.datetime.now(), self.cfg.settings_time["datetime"]))
yandere_frame.pick()
yandere_frame.render()
if not os.path.isfile(yandere_frame.output_path_tr):
raise FileNotFoundError("Could not generate screenshot:", yandere_frame.output_path_tr)
return yandere_frame
# The main post function # The main post function
# This function is responsible for incrementing self.currentSessionCount, as well as posting and blacklisting the # This function is responsible for incrementing self.currentSessionCount, as well as posting and blacklisting the
# picked item. # picked item.
@ -279,39 +282,73 @@ class YandereBot:
def post(self): def post(self):
picked = None picked = None
# Flags that are set if an upload fails
timeout = False
# Attempt post # Attempt post
try: try:
# Post # Post
dt_picked = self.settings_time["datetime"]
picked_profile = random.choice(self.cfg.settings_post) picked_profile = random.choice(self.cfg.settings_post)
picked = PickAndRenderFrame(picked_profile, dt_picked) picked = self.render_frame(picked_profile)
self._post(picked) self._post(picked)
if os.path.isfile(picked.output_path_tr):
os.remove(picked.output_path_tr)
else:
raise FileNotFoundError("Unable to render {}".format(picked.output_path_tr))
# After a successful post # After a successful post
self.currentSessionCount += 1 self.currentSessionCount += 1
self.consecutive_failed_uploads = 0
# The post was successful # The post was successful
return picked return picked
# Attempted to post a file that doesn't exist (immediately repost ignoring retry_seconds)
except FileNotFoundError as e:
print(e)
# Exception flags
timeout = False
# Check if the file limit has been reached
except MastodonAPIError as e:
# Check if the file limit has been reached (413 error)
file_limit_reached = False
with contextlib.suppress(IndexError):
file_limit_reached = (e.args[1] == 413)
print("API Error:", e)
# Exception flags
timeout = True
# Server Errors # Server Errors
# Assume all exceptions are on the server side (except for FileNotFoundError of course # Assume all exceptions are on the server side
# If the connection is timing out it could be for two reasons: # If the connection is timing out it could be for two reasons:
# 1. The error was caused by the user attempting to upload a large file over a slow connection: # 1. The error was caused by the user attempting to upload a large file over a slow connection:
# a. FIX: Reduce settings_behavior["max_size"] # a. FIX: Reduce settings_behavior["max_size"]
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the # 2. The server is down. Check to verify in a web browser (this is the default assumption since the
# mastodon.py API will not specify why the connection timed out). # mastodon.py API will not specify why the connection timed out).
# The default assumption is #2 # The default assumption is #2
except (FileNotFoundError, MastodonAPIError, InvalidPost, Exception) as e: except Exception as e:
print("Exception:", e) print("Unhandled Exception:", e)
# Exception flags
timeout = True
# An exception occurred # An exception occurred
self.failed_uploads += 1 self.failed_uploads += 1
self.consecutive_failed_uploads += 1 if timeout:
self.eventSleep.wait(self.settings_behavior["retry_seconds"])
# The post failed # The post failed
return None return None
def schedule_next_post(self):
self.dateSelection = self.dateNextSelection
self.dateNextSelection = time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"])
# Will wait between the current time and the time of next selection
def wait_future_time(self):
seconds = time_diff_seconds(self.dateNextSelection, datetime.datetime.now())
self.eventSleep.wait(max(0, seconds))
# [BEGIN THE PROGRAM] # [BEGIN THE PROGRAM]
def prime_bot(self): def prime_bot(self):
random.seed(os.urandom(16)) random.seed(os.urandom(16))
@ -325,6 +362,30 @@ class YandereBot:
if not self.primed: if not self.primed:
self.prime_bot() self.prime_bot()
# Early out if the bot is incapable of posting
if not self.can_post():
print("Bot is incapable of posting!!")
return 1
start_time = self.dateSelection
delay_seconds = max(time_diff_seconds(self.dateNextSelection, start_time) + delay, delay)
delay_time = time_add_seconds(start_time, delay_seconds)
# Print the first image in the list if a delay or pretimer is set
if delay_seconds:
self.print_header_stats(None, start_time, delay_time)
# The delay parameter is different from the dateSelection and dateSelectionNext
# It will literally time out the bot for a given number of seconds regardless of the pre-timer setting
# This is useful if you want to set a delay of 30 seconds, before back-posting several images
self.eventSleep.wait(max(0, delay))
# Check if the pre-timer is set
# dateNextSelection should be greater than dateSelection if it is
# dateSelection and dateNextSelection are both set to the current time when the bot is initialized
if delay_seconds:
self.wait_future_time()
# Begin posting # Begin posting
self.main_loop() self.main_loop()
@ -335,19 +396,32 @@ class YandereBot:
# 1. User presses Ctrl+C # 1. User presses Ctrl+C
# 2. settings_behavior["uploads_per_post"] is less than one for some reason # 2. settings_behavior["uploads_per_post"] is less than one for some reason
def can_post(self): def can_post(self):
return ( return not self.eventSleep.is_set() and self.settings_behavior["uploads_per_post"] > 0
not self.eventSleep.is_set() and
self.currentSessionCount < self.settings_behavior["uploads_per_post"] and
self.consecutive_failed_uploads < self.settings_behavior["max_errors"]
)
def main_loop(self): def main_loop(self):
sleep_seconds = self.settings_behavior["retry_seconds"] target_posts = self.settings_behavior["uploads_per_post"]
while self.can_post(): while self.can_post():
picked = self.post() successful_posts = 0
self.print_header_stats(picked) while (successful_posts < target_posts) and self.can_post():
if self.can_post(): last_picked = self.post()
self.eventSleep.wait(sleep_seconds) successful_posts += int(last_picked is not None)
if successful_posts >= target_posts:
self.schedule_next_post()
self.print_header_stats(last_picked, self.dateSelection, self.dateNextSelection)
else:
self.print_header_stats(last_picked, self.dateNextSelection, self.dateNextSelection)
if self.can_post():
self.wait_future_time()
# ------------------------------- TIME FUNCTIONS ---------------------------------------------
def time_add_seconds(dt, seconds):
return dt + datetime.timedelta(0, seconds)
def time_diff_seconds(d1, d2):
return (d1-d2).total_seconds()
# Custom Exceptions for YandereBot # Custom Exceptions for YandereBot
class Debug(Exception): class Debug(Exception):
@ -360,7 +434,3 @@ class FailedLogin(Exception):
class BadCfgFile(Exception): class BadCfgFile(Exception):
pass pass
class InvalidPost(Exception):
pass