Compare commits
No commits in common. "48c40f25e018dfdb658ee9579281dbf397646e9a" and "4a8c851a7b7dd94179721f0fd77faad8b46fc8b0" have entirely different histories.
48c40f25e0
...
4a8c851a7b
5
.gitignore
vendored
5
.gitignore
vendored
@ -18,12 +18,9 @@
|
||||
/notes.txt
|
||||
/compile.sh
|
||||
/upload.sh
|
||||
/balance_*.csv
|
||||
/balance.xlsx
|
||||
/balance_2.csv
|
||||
/DEFAULT.csv
|
||||
/generate_random.sh
|
||||
/print_1_txt.sh
|
||||
/get_weight.sh
|
||||
/src/make_master_template.py
|
||||
/src/make_master_list.py
|
||||
/src/hash_path_exists.py
|
||||
|
4
run.sh
4
run.sh
@ -30,10 +30,6 @@ cd "$RUN_DIR"
|
||||
source "$VENV"
|
||||
"$ENTRY" "$@"
|
||||
|
||||
RETURN_CODE="$?"
|
||||
|
||||
# Cleanup
|
||||
deactivate
|
||||
cd - > /dev/null
|
||||
|
||||
exit "$RETURN_CODE"
|
||||
|
@ -28,13 +28,15 @@ from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
from collections import OrderedDict
|
||||
|
||||
|
||||
class EncryptionFail(Exception):
|
||||
pass
|
||||
|
||||
class PasswordMismatch(Exception):
|
||||
pass
|
||||
|
||||
|
||||
# Salts
|
||||
def generate_salt():
|
||||
return os.urandom(16)
|
||||
|
||||
|
||||
# Return string from bytes
|
||||
def salt_encode(b):
|
||||
return base64.urlsafe_b64encode(b).decode()
|
||||
@ -70,7 +72,8 @@ def derive_key(password, salt):
|
||||
iterations=100000,
|
||||
backend=default_backend()
|
||||
)
|
||||
return base64.urlsafe_b64encode(kdf.derive(password))
|
||||
r_key = base64.urlsafe_b64encode(kdf.derive(password))
|
||||
return r_key
|
||||
|
||||
|
||||
# Encryption functions
|
||||
@ -96,62 +99,94 @@ def decrypt(token, key):
|
||||
# encryption_function: encrypt(message, key) : decrypt(token, key):
|
||||
# Returns settings_server_decrypted dictionary with Byte() values. Will need to use
|
||||
# ChangeEncodingDict to make them strings (recommended cfg file friendly)
|
||||
def encrypt_settings(settings_server, password, salt, encryption_function):
|
||||
def __settings_server(password, salt, settings_server, encryption_function):
|
||||
key = derive_key(password, salt)
|
||||
settings_server_decrypted = OrderedDict()
|
||||
for setting in settings_server:
|
||||
settings_server_decrypted[setting] = encryption_function(settings_server[setting], key)
|
||||
return settings_server_decrypted
|
||||
|
||||
def get_keyfile(keyfile=None):
|
||||
if keyfile is not None:
|
||||
with open(keyfile, "rb") as f:
|
||||
return f.read()
|
||||
return None
|
||||
|
||||
# Returns (salt, settings_server)
|
||||
def _settings_server_encrypt(settings_server):
|
||||
salt = generate_salt()
|
||||
password = getpass.getpass("Enter password: ")
|
||||
password2 = getpass.getpass("Retype Password: ")
|
||||
|
||||
def get_pass(q):
|
||||
try:
|
||||
return getpass.getpass(q).encode()
|
||||
except KeyboardInterrupt:
|
||||
raise EncryptionFail("\nQuitting...")
|
||||
|
||||
|
||||
def settings_server_encrypt(settings_server, keyfile=None):
|
||||
try:
|
||||
settings_server = encode_dict(settings_server)
|
||||
salt = os.urandom(16)
|
||||
password = get_keyfile(keyfile)
|
||||
if password is None:
|
||||
password = get_pass("Enter password: ")
|
||||
password2 = get_pass("Enter password: ")
|
||||
if password != password2:
|
||||
raise PasswordMismatch("Passwords do not match")
|
||||
encrypted = encrypt_settings(settings_server, password, salt, encrypt)
|
||||
return salt_encode(salt), decode_dict(encrypted)
|
||||
except PasswordMismatch as e:
|
||||
raise EncryptionFail(str(e))
|
||||
except Exception as e:
|
||||
err = str(e)
|
||||
raise EncryptionFail("Encrypt Error: {}".format(err))
|
||||
raise PasswordMismatch
|
||||
|
||||
settings_server_encrypted = __settings_server(password.encode(), salt, encode_dict(settings_server), encrypt)
|
||||
|
||||
return salt, settings_server_encrypted
|
||||
|
||||
|
||||
def settings_server_decrypt(settings_server, settings_encrypt, keyfile=None):
|
||||
try:
|
||||
if not settings_encrypt["encrypt"]:
|
||||
return settings_server
|
||||
settings_server = encode_dict(settings_server)
|
||||
password = get_keyfile(keyfile or settings_encrypt["keyfile"]) or get_pass("Enter password: ")
|
||||
# Returns (settings_server)
|
||||
def _settings_server_decrypt(settings_server, settings_encrypt):
|
||||
settings_server_encoded = encode_dict(settings_server)
|
||||
if settings_encrypt["encrypt"]:
|
||||
salt = salt_decode(settings_encrypt["salt"])
|
||||
decrypted = encrypt_settings(settings_server, password, salt, decrypt)
|
||||
return decode_dict(decrypted)
|
||||
except base64.binascii.Error:
|
||||
raise EncryptionFail("Salt is invalid")
|
||||
password = getpass.getpass("Enter password: ")
|
||||
return __settings_server(password.encode(), salt, settings_server_encoded, decrypt)
|
||||
else:
|
||||
return settings_server_encoded
|
||||
|
||||
|
||||
# Wrapper function that will catch exceptions and exit
|
||||
def settings_server_new(function, **kwargs):
|
||||
try:
|
||||
return function(**kwargs)
|
||||
|
||||
# If the user cancels the login
|
||||
except KeyboardInterrupt:
|
||||
print("\nQuitting...")
|
||||
|
||||
# If the user passwords do not match (encrypt)
|
||||
except PasswordMismatch:
|
||||
print("Passwords do not match...")
|
||||
|
||||
# Incorrect password entered (decrypt)
|
||||
except InvalidToken:
|
||||
raise EncryptionFail("Password or token is incorrect")
|
||||
print("Password or Token Incorrect...")
|
||||
|
||||
# Probably the salt value got modified
|
||||
except base64.binascii.Error:
|
||||
print("Salt is invalid...")
|
||||
|
||||
# Some other kind of fuck up
|
||||
except Exception as e:
|
||||
err = str(e)
|
||||
raise EncryptionFail("Decrypt Error: {}".format(err))
|
||||
print("Unknown exception occurred...")
|
||||
print(e)
|
||||
|
||||
# Exit if an exception was thrown
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# Glue functions that package **kwargs automatically
|
||||
def settings_server_encrypt(settings_server):
|
||||
kwargs = {"settings_server": settings_server}
|
||||
return settings_server_new(_settings_server_encrypt, **kwargs)
|
||||
|
||||
|
||||
def settings_server_decrypt(settings_server, settings_encrypt):
|
||||
kwargs = {
|
||||
"settings_server": settings_server,
|
||||
"settings_encrypt": settings_encrypt
|
||||
}
|
||||
return settings_server_new(_settings_server_decrypt, **kwargs)
|
||||
|
||||
|
||||
# The _cfg functions should return a regular string
|
||||
# These are the functions that should interface with the bot a return a plain string
|
||||
# settings_server ordered dictionary
|
||||
def settings_server_encrypt_cfg(settings_server):
|
||||
salt, settings_server = settings_server_encrypt(settings_server)
|
||||
return salt_encode(salt), decode_dict(settings_server)
|
||||
|
||||
|
||||
def settings_server_decrypt_cfg(settings_server, settings_encrypt):
|
||||
settings_server = settings_server_decrypt(settings_server, settings_encrypt)
|
||||
return decode_dict(settings_server)
|
||||
|
||||
|
||||
def main():
|
||||
@ -167,7 +202,6 @@ def main():
|
||||
parser.add_argument("--encrypt", help="Generate encrypted authentication.", action="store_true")
|
||||
parser.add_argument("--decrypt", help="Decrypt encrypted authentication", action="store_true")
|
||||
parser.add_argument("--recrypt", help="Recrypt encrypted authentication", action="store_true")
|
||||
parser.add_argument("-k", "--keyfile", help="Keyfile used for decryption", default=None)
|
||||
parser.add_argument("-c", "--cfg", help="Specify config file.", default=default_cfg)
|
||||
|
||||
arguments = parser.parse_args()
|
||||
@ -180,28 +214,25 @@ def main():
|
||||
import importlib
|
||||
cfg = importlib.import_module(arguments.cfg)
|
||||
settings_server = cfg.settings_server
|
||||
settings_encrypt = cfg.settings_encrypt
|
||||
keyfile = arguments.keyfile or settings_encrypt["keyfile"]
|
||||
settings_encrypt = None
|
||||
|
||||
if arguments.decrypt and arguments.encrypt:
|
||||
print("Re-encrypting")
|
||||
|
||||
if arguments.decrypt: # arguments.decrypt
|
||||
print("Decrypt...")
|
||||
settings_server = settings_server_decrypt(settings_server, settings_encrypt, arguments.keyfile)
|
||||
settings_server = settings_server_decrypt_cfg(cfg.settings_server, cfg.settings_encrypt)
|
||||
settings_encrypt = OrderedDict([
|
||||
("encrypt", False),
|
||||
("salt", settings_encrypt["salt"]),
|
||||
("keyfile", arguments.keyfile)
|
||||
("salt", cfg.settings_encrypt["encrypt"])
|
||||
])
|
||||
|
||||
if arguments.encrypt:
|
||||
print("Encrypt...")
|
||||
salt, settings_server = settings_server_encrypt(settings_server, keyfile)
|
||||
salt, settings_server = settings_server_encrypt_cfg(settings_server)
|
||||
settings_encrypt = OrderedDict([
|
||||
("encrypt", True),
|
||||
("salt", salt),
|
||||
("keyfile", arguments.keyfile)
|
||||
("salt", salt)
|
||||
])
|
||||
|
||||
print("settings_server = {}".format(pformat(settings_server)))
|
||||
@ -210,8 +241,4 @@ def main():
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
sys.exit(main())
|
||||
except EncryptionFail as e:
|
||||
print(e)
|
||||
sys.exit(1)
|
||||
|
205
src/main.py
205
src/main.py
@ -1,7 +1,7 @@
|
||||
#! /usr/bin/env python3
|
||||
|
||||
# Mirai Nikki Bot a video frame posting bot for Pleroma
|
||||
# Copyright (C) 2022 Anon
|
||||
# Yandere Lewd Bot, an image posting bot for Pleroma
|
||||
# Copyright (C) 2022 Anon <yanderefedi@proton.me>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
@ -19,15 +19,154 @@
|
||||
import sys
|
||||
import argparse
|
||||
import signal
|
||||
import yandere_bot
|
||||
import yandereBot
|
||||
import datetime
|
||||
import contextlib
|
||||
|
||||
|
||||
class FailedToLoadCfg(Exception):
|
||||
# A class that inherits from either YandereBot from yandereBot module
|
||||
# This class is used to handle command line arguments gracefully, and extend functionality quickly
|
||||
# Bot specific changes should be made here (if they are minor enough).
|
||||
# This will be instantiated from the main() function
|
||||
class YandereBot(yandereBot.YandereBot):
|
||||
# The below settings are required from the configuration module
|
||||
settings_time = None
|
||||
settings_reminder = None
|
||||
|
||||
# Wait before running
|
||||
delay_start = 0
|
||||
|
||||
def __init__(self, cfg, debug_mode, prime_bot, delay_d=None, delay_h=None, delay_s=None):
|
||||
super(YandereBot, self).__init__(cfg, debug_mode, prime_bot=False)
|
||||
self.load_settings(self.cfg, ("settings_time", "settings_reminder"))
|
||||
self.set_pretimer(delay_d, delay_h, delay_s)
|
||||
|
||||
if self.debug_mode:
|
||||
print("[DEBUG MODE ON - DRY RUN BEGIN]")
|
||||
|
||||
# Do not perform sanity test if stdout is None.
|
||||
# input() will fail if stdout is None, which could happen with the following command
|
||||
# ./run --dry-run -h -d 'a date in the past'
|
||||
if sys.stdout is not None and not self.pass_sanity_test():
|
||||
raise FailedSanityTest
|
||||
|
||||
if prime_bot:
|
||||
self.prime_bot()
|
||||
|
||||
def print_date_time_example(self):
|
||||
print_fmt = " {0:6} {1:10} {2}"
|
||||
time_fmt = self.settings_time["time_format"]
|
||||
date_fmt = self.settings_time["date_format"]
|
||||
current_time = self.dateSelection
|
||||
|
||||
print(print_fmt.format(
|
||||
"TIME", time_fmt, current_time.strftime(time_fmt)
|
||||
))
|
||||
print(print_fmt.format(
|
||||
"DATE", date_fmt, current_time.strftime(date_fmt)
|
||||
))
|
||||
|
||||
def dump_pictures(self):
|
||||
for ele in self.listPictures:
|
||||
print(ele.get_full_string())
|
||||
|
||||
def set_delay_d(self, d):
|
||||
try:
|
||||
t = datetime.datetime.strptime(d, self.settings_time["date_format"])
|
||||
self.dateNextSelection = self.dateNextSelection.replace(
|
||||
year=t.year, month=t.month, day=t.day
|
||||
)
|
||||
except Exception:
|
||||
print("Invalid date format: {}\n\nCorrect date/time format examples:".format(d))
|
||||
self.print_date_time_example()
|
||||
raise DateWrongFormat
|
||||
|
||||
def set_delay_h(self, h, add_24):
|
||||
try:
|
||||
t = datetime.datetime.strptime(h, self.settings_time["time_format"])
|
||||
self.dateNextSelection = self.dateNextSelection.replace(
|
||||
hour=t.hour, minute=t.minute, second=t.second, microsecond=t.microsecond
|
||||
)
|
||||
if self.dateNextSelection < self.dateSelection and add_24:
|
||||
self.dateNextSelection = yandereBot.time_add_seconds(self.dateNextSelection, 60 * 60 * 24)
|
||||
except Exception:
|
||||
print("Invalid time format: {}\n\nCorrect date/time format examples:".format(h))
|
||||
self.print_date_time_example()
|
||||
raise TimeWrongFormat
|
||||
|
||||
def set_pretimer(self, d=None, h=None, s=0):
|
||||
if d:
|
||||
self.set_delay_d(d)
|
||||
if h:
|
||||
self.set_delay_h(h, d is None)
|
||||
if s:
|
||||
self.delay_start = max(0, s)
|
||||
|
||||
# Check for potential misconfigurations by the user
|
||||
def pass_sanity_test(self):
|
||||
# Calculate pre-timer value
|
||||
seconds_until_next_pos = yandereBot.time_diff_seconds(self.dateNextSelection, self.dateSelection)
|
||||
|
||||
# Possible misconfigurations that will prompt the user to continue
|
||||
pretimer_less_than_zero = seconds_until_next_pos < 0
|
||||
pretimer_greater_than_sleep = seconds_until_next_pos > self.settings_behavior["sleep_seconds"]
|
||||
|
||||
# Prompt the user
|
||||
prompt_user = pretimer_less_than_zero or pretimer_greater_than_sleep
|
||||
|
||||
# Remind the user to generate new OAuth tokens
|
||||
dt = datetime.datetime.strptime(self.settings_reminder, self.settings_time["long_date_format"])
|
||||
if dt < datetime.datetime.now():
|
||||
print("REMINDER: Generate new tokens!!")
|
||||
|
||||
# Check if the bot is back-posting in time and make sure this is what the user wanted to avoid spamming
|
||||
if pretimer_less_than_zero:
|
||||
sleep = round(abs(seconds_until_next_pos), 2)
|
||||
images = round(sleep / (self.settings_behavior["sleep_seconds"] * self.settings_behavior["uploads_per_post"]), 2) + 1
|
||||
print("WARNING: Pre-timer is less than the current time by: {} seconds. {} images will post immediately".format(
|
||||
sleep, images
|
||||
))
|
||||
# Check if the bot will wait for longer than the default amount of sleep time configured in the cfg.py
|
||||
elif pretimer_greater_than_sleep:
|
||||
print("WARNING: Pre-timer will sleep for {} seconds. This is more than the configured amount ({} seconds)".format(
|
||||
round(seconds_until_next_pos, 2), self.settings_behavior["sleep_seconds"]
|
||||
))
|
||||
|
||||
# Prompt the user if something doesn't seem right
|
||||
# This must be done before we set up our keyboard interrupts. Otherwise the below exceptions will not work.
|
||||
try:
|
||||
if prompt_user:
|
||||
# Default to 'y' if the user just presses enter
|
||||
ans = input("Do you want to continue [Y/n]? ") or "y"
|
||||
return ans.lower() in ("y", "yes")
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
print()
|
||||
return False
|
||||
|
||||
# Sanity test passed
|
||||
return True
|
||||
|
||||
def start(self, delay=None):
|
||||
_delay = delay or self.delay_start
|
||||
return super(YandereBot, self).start(max(0, _delay))
|
||||
|
||||
|
||||
# Custom exceptions
|
||||
class TimeWrongFormat(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class DateWrongFormat(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class FailedSanityTest(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class FailedToLoadCfg(Exception):
|
||||
pass
|
||||
|
||||
# Entry point if run from the command line
|
||||
def main():
|
||||
# Default config file
|
||||
@ -37,14 +176,21 @@ def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="A bot for posting on Mastodon",
|
||||
# epilog="All switches can be combined for greater control",
|
||||
add_help=True)
|
||||
add_help=False)
|
||||
parser.add_argument("--dry-run", help="Will not login or post to Plemora", action="store_true")
|
||||
parser.add_argument("--debug", help="Same as --dry-run", action="store_true")
|
||||
parser.add_argument("-w", "--wait", type=int, help="Wait before posting first image (seconds)", default=0)
|
||||
parser.add_argument("-t", "--time", help="Wait for time before posting first image", default=None)
|
||||
parser.add_argument("-d", "--date", help="Wait for date before posting first image", default=None)
|
||||
parser.add_argument("-c", "--config", help="Set custom config file (Default: {})".format(default_cfg), default=default_cfg)
|
||||
parser.add_argument("-k", "--keyfile", help="Keyfile used for decryption")
|
||||
parser.add_argument("-o", "--output-hashes", help="Output list of hashes", action="store_true")
|
||||
parser.add_argument("-h", "--help", help="Show this help message and exit", action="store_true")
|
||||
parser.add_argument("remainder", help=argparse.SUPPRESS, nargs=argparse.REMAINDER)
|
||||
arguments = parser.parse_args()
|
||||
|
||||
# Redirect stdout when the bot first initializes if the bot is not going to run normally
|
||||
redirect_stdout = None if arguments.output_hashes or arguments.help else sys.stdout
|
||||
|
||||
# Yandere Lewd Bot
|
||||
yandere = None
|
||||
yandere_config = None
|
||||
@ -54,17 +200,37 @@ def main():
|
||||
import importlib
|
||||
yandere_config = importlib.import_module(arguments.config)
|
||||
except ImportError:
|
||||
raise FailedToLoadCfg("Invalid config file: {}".format(arguments.config))
|
||||
print("Failed to Load Configuration:", arguments.config)
|
||||
raise FailedToLoadCfg
|
||||
|
||||
# Flag if the bot is running in debug mode
|
||||
debug_mode = arguments.dry_run or arguments.debug
|
||||
debug_mode = (arguments.dry_run or arguments.debug or arguments.output_hashes or arguments.help)
|
||||
|
||||
yandere = yandere_bot.YandereBot(
|
||||
with contextlib.redirect_stdout(redirect_stdout):
|
||||
prime_bot = not arguments.help
|
||||
|
||||
yandere = YandereBot(
|
||||
yandere_config,
|
||||
arguments.keyfile,
|
||||
debug_mode
|
||||
debug_mode,
|
||||
prime_bot,
|
||||
arguments.date,
|
||||
arguments.time,
|
||||
arguments.wait
|
||||
)
|
||||
|
||||
# Print Usage Information with Time and Date Formats with Examples
|
||||
if arguments.help:
|
||||
parser.print_help()
|
||||
print()
|
||||
yandere.print_date_time_example()
|
||||
print()
|
||||
return 0
|
||||
|
||||
# Output all of the images in the bot's picture list
|
||||
if arguments.output_hashes:
|
||||
yandere.dump_pictures()
|
||||
return 0
|
||||
|
||||
# Setup exit calls
|
||||
# Must be done after we declare our bot(s), otherwise this will be called if quitting on decrypting settings )
|
||||
def yandere_quit(signo, _frame):
|
||||
@ -85,12 +251,21 @@ if __name__ == "__main__":
|
||||
# Exceptions raised from the main function
|
||||
except FailedToLoadCfg:
|
||||
sys.exit(10)
|
||||
except FailedSanityTest:
|
||||
sys.exit(9)
|
||||
except DateWrongFormat:
|
||||
sys.exit(8)
|
||||
except TimeWrongFormat:
|
||||
sys.exit(7)
|
||||
|
||||
# Exceptions raised from the bot
|
||||
except yandere_bot.Debug:
|
||||
except yandereBot.Debug:
|
||||
sys.exit(6)
|
||||
except yandere_bot.BadCfgFile:
|
||||
except yandereBot.MissingMasterList:
|
||||
sys.exit(5)
|
||||
except yandereBot.BadCfgFile:
|
||||
sys.exit(4)
|
||||
except yandere_bot.BadPostSettings:
|
||||
except yandereBot.BadPostSettings:
|
||||
sys.exit(3)
|
||||
except yandere_bot.FailedLogin:
|
||||
except yandereBot.FailedLogin:
|
||||
sys.exit(2)
|
||||
|
@ -64,21 +64,25 @@ class YandereBot:
|
||||
listPictures = []
|
||||
lenBlacklist = 0
|
||||
failed_uploads = 0
|
||||
consecutive_failed_uploads = 0
|
||||
currentSessionCount = 0
|
||||
debug_mode = False
|
||||
primed = False
|
||||
decrypted = False
|
||||
|
||||
# Time variables
|
||||
dateSelection = None
|
||||
dateNextSelection = None
|
||||
|
||||
# YandereBot.__init__()
|
||||
# @param cfg A dynamically loaded python module. See yanlib.module_load() for an example
|
||||
# @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma)
|
||||
# prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post)
|
||||
def __init__(self, cfg, keyfile=None, debug_mode=False, prime_bot=True):
|
||||
def __init__(self, cfg, debug_mode=False, prime_bot=True):
|
||||
self.dateSelection = datetime.datetime.now()
|
||||
self.dateNextSelection = self.dateSelection
|
||||
self.cfg = cfg
|
||||
self.load_settings(self.cfg)
|
||||
self.debug_mode = debug_mode or self.settings_behavior["debug"]
|
||||
self.settings_encrypt["keyfile"] = keyfile or self.settings_encrypt["keyfile"]
|
||||
if prime_bot:
|
||||
self.prime_bot()
|
||||
|
||||
@ -119,11 +123,7 @@ class YandereBot:
|
||||
def decrypt_settings(self):
|
||||
if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode:
|
||||
import encryption
|
||||
try:
|
||||
self.settings_server = encryption.settings_server_decrypt(
|
||||
self.settings_server, self.settings_encrypt, self.settings_encrypt["keyfile"])
|
||||
except encryption.EncryptionFail as e:
|
||||
raise BadCfgFile(str(e))
|
||||
self.settings_server = encryption.settings_server_decrypt_cfg(self.settings_server, self.settings_encrypt)
|
||||
self.decrypted = True
|
||||
|
||||
# Login to Pleroma
|
||||
@ -167,10 +167,10 @@ class YandereBot:
|
||||
# load_pictures will return a list of YanHashObj() with a blacklist(s) applied
|
||||
# @param list_blacklist A list of HashObjects() that are blacklist hashes
|
||||
def load_pictures(self, list_blacklist):
|
||||
list_pictures = []
|
||||
if not self.settings_behavior["master_list"]:
|
||||
raise MissingMasterList
|
||||
|
||||
list_pictures = []
|
||||
# Return a list of hashes with profiles
|
||||
try:
|
||||
for f in self.settings_behavior["master_list"]:
|
||||
@ -193,22 +193,42 @@ class YandereBot:
|
||||
random.shuffle(self.listPictures)
|
||||
|
||||
# Maybe I should remove this from the backend?
|
||||
def print_header_stats(self, picked):
|
||||
def print_header_stats(self, picked, date_selection, date_next_selection):
|
||||
_picked = picked.get_full_string() if picked else None
|
||||
picked_profile = picked.get_post_setting()["name"] if picked else None
|
||||
picked_next = self.listPictures[0].get_full_string() if self.listPictures else None
|
||||
picked_next_profile = self.listPictures[0].get_post_setting()["name"] if self.listPictures else None
|
||||
next_selection_seconds = max(0, int(time_diff_seconds(date_next_selection, date_selection)))
|
||||
|
||||
print("Profile: {} | Picked: {} | Next_Profile: {} | Next_Pick: {}".format(
|
||||
picked_profile, _picked, picked_next_profile, picked_next
|
||||
))
|
||||
n_posts_remain = math.ceil(len(self.listPictures) / self.settings_behavior["uploads_per_post"])
|
||||
if date_selection != date_next_selection:
|
||||
n_posts_remain -= 1
|
||||
|
||||
remaining_seconds = n_posts_remain * self.settings_behavior["sleep_seconds"]
|
||||
date_end_selection = time_add_seconds(date_selection, remaining_seconds + next_selection_seconds)
|
||||
date_end_selection_seconds = max(0, time_diff_seconds(date_end_selection, date_selection))
|
||||
if date_selection != date_next_selection and picked is None:
|
||||
date_end_selection_seconds += next_selection_seconds
|
||||
d, h, m, s = humanize_time_delta(date_end_selection_seconds)
|
||||
|
||||
print("[Profile]", picked_profile)
|
||||
print("[Profile Next]", picked_next_profile)
|
||||
print(_picked)
|
||||
print("Next:", picked_next) # There should always be something in picture_next list
|
||||
print("Selection time: {}".format(
|
||||
date_selection.strftime(self.settings_time["long_date_format"])) )
|
||||
print("Next selection time: {} ({} seconds)".format(
|
||||
date_next_selection.strftime(self.settings_time["long_date_format"]), next_selection_seconds) )
|
||||
print("End selection time: {}".format(date_end_selection.strftime(
|
||||
self.settings_time["long_date_week"])) )
|
||||
print("Time Remaining: {} Days | {} Hours | {} Minutes | {} Seconds".format(d, h, m, s) )
|
||||
print("[ {} Pictures | {} Blacklisted | {} Selected during session | {} Failed ]\n".format(
|
||||
len(self.listPictures), self.lenBlacklist, self.currentSessionCount, self.failed_uploads) )
|
||||
|
||||
# Returns a list of media paths (without the hashes)
|
||||
def get_media_list(self, picked):
|
||||
ext = self.settings_behavior["multi_media_ext"]
|
||||
if not picked:
|
||||
return None
|
||||
elif ext and os.path.splitext(picked.get_hash_path())[1].lower() == ext.lower():
|
||||
if ext and os.path.splitext(picked.get_hash_path())[1].lower() == ext.lower():
|
||||
return [i.get_hash_path() for i in yanlib.get_hash_list(picked.get_hash_path())]
|
||||
else:
|
||||
return [picked.get_hash_path()]
|
||||
@ -216,16 +236,10 @@ class YandereBot:
|
||||
# Returns a list of tuples that contain the media list path and media mastodon dictionary
|
||||
def upload_media_list(self, path_list):
|
||||
media_list = []
|
||||
for path in path_list:
|
||||
if not os.path.isfile(path):
|
||||
raise FileNotFoundError("Could not upload: {}".format(path))
|
||||
for ele in path_list:
|
||||
if not self.debug_mode:
|
||||
media = self.mastodon_api.media_post(path, description=os.path.basename(path))
|
||||
media_dict = {
|
||||
"path": path,
|
||||
"media": media
|
||||
}
|
||||
media_list.append(media_dict)
|
||||
media = self.mastodon_api.media_post(ele, description=os.path.basename(ele))
|
||||
media_list.append((ele, media))
|
||||
return media_list
|
||||
|
||||
def get_post_text(self, picked, media_list):
|
||||
@ -251,22 +265,22 @@ class YandereBot:
|
||||
string_imglinks_joined = content_newline.join(filter(None, string_imglinks))
|
||||
string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined)))
|
||||
|
||||
return string_post
|
||||
return content_type, string_post
|
||||
|
||||
def _post(self, picked):
|
||||
images = self.get_media_list(picked)
|
||||
if not images:
|
||||
raise InvalidPost("Media list is empty")
|
||||
raise ValueError("Media list is empty")
|
||||
media_list = self.upload_media_list(images)
|
||||
message = self.get_post_text(picked, media_list)
|
||||
content_type, message = self.get_post_text(picked, media_list)
|
||||
if self.debug_mode:
|
||||
return picked
|
||||
self.mastodon_api.status_post(
|
||||
message,
|
||||
media_ids=[i["media"] for i in media_list],
|
||||
media_ids=[i[1] for i in media_list if len(i) == 2],
|
||||
visibility=self.settings_behavior["visibility"],
|
||||
sensitive=picked.get_post_setting()["spoiler"],
|
||||
content_type=self.settings_behavior["content_type"]
|
||||
content_type=content_type
|
||||
)
|
||||
return picked
|
||||
|
||||
@ -283,6 +297,7 @@ class YandereBot:
|
||||
|
||||
# Flags that are set if an upload fails
|
||||
reinsert_image = False
|
||||
timeout = False
|
||||
|
||||
# Attempt post
|
||||
try:
|
||||
@ -292,24 +307,33 @@ class YandereBot:
|
||||
|
||||
# After a successful post
|
||||
self.currentSessionCount += 1
|
||||
self.consecutive_failed_uploads = 0
|
||||
self.blacklist(picked)
|
||||
|
||||
# The post was successful
|
||||
return picked
|
||||
|
||||
# Attempted to post a file that doesn't exist (immediately repost ignoring retry_seconds)
|
||||
except (FileNotFoundError, InvalidPost):
|
||||
except FileNotFoundError:
|
||||
print("File not found:", picked.get_hash_path())
|
||||
reinsert_image = False
|
||||
# Exception flags
|
||||
reinsert_image, timeout = False, False
|
||||
|
||||
# Media list is empty
|
||||
except ValueError as e:
|
||||
print("Media list cannot be empty:", picked.get_hash_path())
|
||||
# Exception flags
|
||||
reinsert_image, timeout = False, False
|
||||
|
||||
# Check if the file limit has been reached
|
||||
except MastodonAPIError as e:
|
||||
print("API Error:", e)
|
||||
# Check if the file limit has been reached (413 error)
|
||||
file_limit_reached = False
|
||||
with contextlib.suppress(IndexError):
|
||||
reinsert_image = e.args[1] != 413
|
||||
file_limit_reached = (e.args[1] == 413)
|
||||
|
||||
print("API Error:", e)
|
||||
# Exception flags
|
||||
reinsert_image, timeout = not file_limit_reached, True
|
||||
|
||||
# Server Errors
|
||||
# Assume all exceptions are on the server side
|
||||
@ -322,18 +346,28 @@ class YandereBot:
|
||||
except Exception as e:
|
||||
print("Unhandled Exception:", e)
|
||||
# Exception flags
|
||||
reinsert_image = True
|
||||
reinsert_image, timeout = True, True
|
||||
|
||||
# An exception occurred
|
||||
self.failed_uploads += 1
|
||||
self.consecutive_failed_uploads += 1
|
||||
|
||||
if reinsert_image and self.consecutive_failed_uploads < self.settings_behavior["max_errors"]:
|
||||
print("[Errors: {}] {}{}".format(self.failed_uploads, picked.get_full_string(), os.linesep))
|
||||
if reinsert_image:
|
||||
self.listPictures.insert(0, picked)
|
||||
if timeout:
|
||||
self.eventSleep.wait(self.settings_behavior["retry_seconds"])
|
||||
|
||||
# The post failed
|
||||
return None
|
||||
|
||||
def schedule_next_post(self):
|
||||
self.dateSelection = self.dateNextSelection
|
||||
self.dateNextSelection = time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"])
|
||||
|
||||
# Will wait between the current time and the time of next selection
|
||||
def wait_future_time(self):
|
||||
seconds = time_diff_seconds(self.dateNextSelection, datetime.datetime.now())
|
||||
self.eventSleep.wait(max(0, seconds))
|
||||
|
||||
# [BEGIN THE PROGRAM]
|
||||
def prime_bot(self):
|
||||
self.load_picture_list()
|
||||
@ -341,6 +375,7 @@ class YandereBot:
|
||||
self.shuffle_list()
|
||||
if not self.debug_mode:
|
||||
self.decrypt_settings()
|
||||
if self.can_post():
|
||||
self.login()
|
||||
self.primed = True
|
||||
|
||||
@ -349,6 +384,30 @@ class YandereBot:
|
||||
if not self.primed:
|
||||
self.prime_bot()
|
||||
|
||||
# Early out if the bot is incapable of posting
|
||||
if not self.can_post():
|
||||
print("Bot is incapable of posting!!")
|
||||
return 1
|
||||
|
||||
start_time = self.dateSelection
|
||||
delay_seconds = max(time_diff_seconds(self.dateNextSelection, start_time) + delay, delay)
|
||||
delay_time = time_add_seconds(start_time, delay_seconds)
|
||||
|
||||
# Print the first image in the list if a delay or pretimer is set
|
||||
if delay_seconds:
|
||||
self.print_header_stats(None, start_time, delay_time)
|
||||
|
||||
# The delay parameter is different from the dateSelection and dateSelectionNext
|
||||
# It will literally time out the bot for a given number of seconds regardless of the pre-timer setting
|
||||
# This is useful if you want to set a delay of 30 seconds, before back-posting several images
|
||||
self.eventSleep.wait(max(0, delay))
|
||||
|
||||
# Check if the pre-timer is set
|
||||
# dateNextSelection should be greater than dateSelection if it is
|
||||
# dateSelection and dateNextSelection are both set to the current time when the bot is initialized
|
||||
if delay_seconds:
|
||||
self.wait_future_time()
|
||||
|
||||
# Begin posting
|
||||
self.main_loop()
|
||||
|
||||
@ -360,21 +419,23 @@ class YandereBot:
|
||||
# 2. There is nothing left in the picture list to select from
|
||||
# 3. settings_behavior["uploads_per_post"] is less than one for some reason
|
||||
def can_post(self):
|
||||
return (
|
||||
not self.eventSleep.is_set() and
|
||||
bool(len(self.listPictures)) and
|
||||
self.currentSessionCount < self.settings_behavior["uploads_per_post"] and
|
||||
self.consecutive_failed_uploads < self.settings_behavior["max_errors"]
|
||||
)
|
||||
|
||||
return not self.eventSleep.is_set() and bool(len(self.listPictures)) and self.settings_behavior["uploads_per_post"] > 0
|
||||
|
||||
def main_loop(self):
|
||||
sleep_seconds = self.settings_behavior["retry_seconds"]
|
||||
target_posts = self.settings_behavior["uploads_per_post"]
|
||||
while self.can_post():
|
||||
picked = self.post()
|
||||
self.print_header_stats(picked)
|
||||
successful_posts = 0
|
||||
while (successful_posts < target_posts) and self.can_post():
|
||||
last_picked = self.post()
|
||||
successful_posts += int(last_picked is not None)
|
||||
if successful_posts >= target_posts:
|
||||
self.schedule_next_post()
|
||||
self.print_header_stats(last_picked, self.dateSelection, self.dateNextSelection)
|
||||
else:
|
||||
self.print_header_stats(last_picked, self.dateNextSelection, self.dateNextSelection)
|
||||
|
||||
if self.can_post():
|
||||
self.eventSleep.wait(sleep_seconds)
|
||||
self.wait_future_time()
|
||||
|
||||
|
||||
# A callback function for get_list_of_hashes_with_profiles() that returns a single profile from @param hash_obj
|
||||
@ -404,6 +465,21 @@ def get_list_of_hashes_with_profiles(f_name, profiles, profiles_default, callbac
|
||||
return r
|
||||
|
||||
|
||||
# ------------------------------- TIME FUNCTIONS ---------------------------------------------
|
||||
def time_add_seconds(dt, seconds):
|
||||
return dt + datetime.timedelta(0, seconds)
|
||||
|
||||
|
||||
def time_diff_seconds(d1, d2):
|
||||
return (d1-d2).total_seconds()
|
||||
|
||||
|
||||
def humanize_time_delta(total_seconds):
|
||||
m, s = divmod(int(total_seconds), 60)
|
||||
h, m = divmod(m, 60)
|
||||
d, h = divmod(h, 24)
|
||||
return d, h, m, s
|
||||
|
||||
# --------------------------------------------------------------------------------------------
|
||||
def sync_to_disk(file_handle):
|
||||
file_handle.flush()
|
||||
@ -443,10 +519,6 @@ class Debug(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidPost(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class BadPostSettings(Exception):
|
||||
pass
|
||||
|
Reference in New Issue
Block a user