Compare commits
3 Commits
93bf15a5de
...
5326c48e3c
Author | SHA1 | Date | |
---|---|---|---|
5326c48e3c | |||
08206db1d9 | |||
9975eaab63 |
4
run.sh
4
run.sh
@ -30,6 +30,10 @@ cd "$RUN_DIR"
|
|||||||
source "$VENV"
|
source "$VENV"
|
||||||
"$ENTRY" "$@"
|
"$ENTRY" "$@"
|
||||||
|
|
||||||
|
RETURN_CODE="$?"
|
||||||
|
|
||||||
# Cleanup
|
# Cleanup
|
||||||
deactivate
|
deactivate
|
||||||
cd - > /dev/null
|
cd - > /dev/null
|
||||||
|
|
||||||
|
exit "$RETURN_CODE"
|
||||||
|
@ -28,13 +28,11 @@ from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
|||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
|
||||||
|
|
||||||
class PasswordMismatch(Exception):
|
class EncryptionFail(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class PasswordMismatch(Exception):
|
||||||
# Salts
|
pass
|
||||||
def generate_salt():
|
|
||||||
return os.urandom(16)
|
|
||||||
|
|
||||||
|
|
||||||
# Return string from bytes
|
# Return string from bytes
|
||||||
@ -72,8 +70,7 @@ def derive_key(password, salt):
|
|||||||
iterations=100000,
|
iterations=100000,
|
||||||
backend=default_backend()
|
backend=default_backend()
|
||||||
)
|
)
|
||||||
r_key = base64.urlsafe_b64encode(kdf.derive(password))
|
return base64.urlsafe_b64encode(kdf.derive(password))
|
||||||
return r_key
|
|
||||||
|
|
||||||
|
|
||||||
# Encryption functions
|
# Encryption functions
|
||||||
@ -99,94 +96,62 @@ def decrypt(token, key):
|
|||||||
# encryption_function: encrypt(message, key) : decrypt(token, key):
|
# encryption_function: encrypt(message, key) : decrypt(token, key):
|
||||||
# Returns settings_server_decrypted dictionary with Byte() values. Will need to use
|
# Returns settings_server_decrypted dictionary with Byte() values. Will need to use
|
||||||
# ChangeEncodingDict to make them strings (recommended cfg file friendly)
|
# ChangeEncodingDict to make them strings (recommended cfg file friendly)
|
||||||
def __settings_server(password, salt, settings_server, encryption_function):
|
def encrypt_settings(settings_server, password, salt, encryption_function):
|
||||||
key = derive_key(password, salt)
|
key = derive_key(password, salt)
|
||||||
settings_server_decrypted = OrderedDict()
|
settings_server_decrypted = OrderedDict()
|
||||||
for setting in settings_server:
|
for setting in settings_server:
|
||||||
settings_server_decrypted[setting] = encryption_function(settings_server[setting], key)
|
settings_server_decrypted[setting] = encryption_function(settings_server[setting], key)
|
||||||
return settings_server_decrypted
|
return settings_server_decrypted
|
||||||
|
|
||||||
|
def get_keyfile(keyfile=None):
|
||||||
# Returns (salt, settings_server)
|
if keyfile is not None:
|
||||||
def _settings_server_encrypt(settings_server):
|
with open(keyfile, "rb") as f:
|
||||||
salt = generate_salt()
|
return f.read()
|
||||||
password = getpass.getpass("Enter password: ")
|
return None
|
||||||
password2 = getpass.getpass("Retype Password: ")
|
|
||||||
|
|
||||||
if password != password2:
|
|
||||||
raise PasswordMismatch
|
|
||||||
|
|
||||||
settings_server_encrypted = __settings_server(password.encode(), salt, encode_dict(settings_server), encrypt)
|
|
||||||
|
|
||||||
return salt, settings_server_encrypted
|
|
||||||
|
|
||||||
|
|
||||||
# Returns (settings_server)
|
def get_pass(q):
|
||||||
def _settings_server_decrypt(settings_server, settings_encrypt):
|
|
||||||
settings_server_encoded = encode_dict(settings_server)
|
|
||||||
if settings_encrypt["encrypt"]:
|
|
||||||
salt = salt_decode(settings_encrypt["salt"])
|
|
||||||
password = getpass.getpass("Enter password: ")
|
|
||||||
return __settings_server(password.encode(), salt, settings_server_encoded, decrypt)
|
|
||||||
else:
|
|
||||||
return settings_server_encoded
|
|
||||||
|
|
||||||
|
|
||||||
# Wrapper function that will catch exceptions and exit
|
|
||||||
def settings_server_new(function, **kwargs):
|
|
||||||
try:
|
try:
|
||||||
return function(**kwargs)
|
return getpass.getpass(q).encode()
|
||||||
|
|
||||||
# If the user cancels the login
|
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
print("\nQuitting...")
|
raise EncryptionFail("\nQuitting...")
|
||||||
|
|
||||||
# If the user passwords do not match (encrypt)
|
|
||||||
except PasswordMismatch:
|
|
||||||
print("Passwords do not match...")
|
|
||||||
|
|
||||||
# Incorrect password entered (decrypt)
|
def settings_server_encrypt(settings_server, keyfile=None):
|
||||||
except InvalidToken:
|
try:
|
||||||
print("Password or Token Incorrect...")
|
settings_server = encode_dict(settings_server)
|
||||||
|
salt = os.urandom(16)
|
||||||
# Probably the salt value got modified
|
password = get_keyfile(keyfile)
|
||||||
except base64.binascii.Error:
|
if password is None:
|
||||||
print("Salt is invalid...")
|
password = get_pass("Enter password: ")
|
||||||
|
password2 = get_pass("Enter password: ")
|
||||||
# Some other kind of fuck up
|
if password != password2:
|
||||||
|
raise PasswordMismatch("Passwords do not match")
|
||||||
|
encrypted = encrypt_settings(settings_server, password, salt, encrypt)
|
||||||
|
return salt_encode(salt), decode_dict(encrypted)
|
||||||
|
except PasswordMismatch as e:
|
||||||
|
raise EncryptionFail(str(e))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("Unknown exception occurred...")
|
err = str(e)
|
||||||
print(e)
|
raise EncryptionFail("Encrypt Error: {}".format(err))
|
||||||
|
|
||||||
# Exit if an exception was thrown
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
# Glue functions that package **kwargs automatically
|
def settings_server_decrypt(settings_server, settings_encrypt, keyfile=None):
|
||||||
def settings_server_encrypt(settings_server):
|
try:
|
||||||
kwargs = {"settings_server": settings_server}
|
if not settings_encrypt["encrypt"]:
|
||||||
return settings_server_new(_settings_server_encrypt, **kwargs)
|
return settings_server
|
||||||
|
settings_server = encode_dict(settings_server)
|
||||||
|
password = get_keyfile(keyfile or settings_encrypt["keyfile"]) or get_pass("Enter password: ")
|
||||||
def settings_server_decrypt(settings_server, settings_encrypt):
|
salt = salt_decode(settings_encrypt["salt"])
|
||||||
kwargs = {
|
decrypted = encrypt_settings(settings_server, password, salt, decrypt)
|
||||||
"settings_server": settings_server,
|
return decode_dict(decrypted)
|
||||||
"settings_encrypt": settings_encrypt
|
except base64.binascii.Error:
|
||||||
}
|
raise EncryptionFail("Salt is invalid")
|
||||||
return settings_server_new(_settings_server_decrypt, **kwargs)
|
except InvalidToken:
|
||||||
|
raise EncryptionFail("Password or token is incorrect")
|
||||||
|
except Exception as e:
|
||||||
# The _cfg functions should return a regular string
|
err = str(e)
|
||||||
# These are the functions that should interface with the bot a return a plain string
|
raise EncryptionFail("Decrypt Error: {}".format(err))
|
||||||
# settings_server ordered dictionary
|
|
||||||
def settings_server_encrypt_cfg(settings_server):
|
|
||||||
salt, settings_server = settings_server_encrypt(settings_server)
|
|
||||||
return salt_encode(salt), decode_dict(settings_server)
|
|
||||||
|
|
||||||
|
|
||||||
def settings_server_decrypt_cfg(settings_server, settings_encrypt):
|
|
||||||
settings_server = settings_server_decrypt(settings_server, settings_encrypt)
|
|
||||||
return decode_dict(settings_server)
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@ -202,6 +167,7 @@ def main():
|
|||||||
parser.add_argument("--encrypt", help="Generate encrypted authentication.", action="store_true")
|
parser.add_argument("--encrypt", help="Generate encrypted authentication.", action="store_true")
|
||||||
parser.add_argument("--decrypt", help="Decrypt encrypted authentication", action="store_true")
|
parser.add_argument("--decrypt", help="Decrypt encrypted authentication", action="store_true")
|
||||||
parser.add_argument("--recrypt", help="Recrypt encrypted authentication", action="store_true")
|
parser.add_argument("--recrypt", help="Recrypt encrypted authentication", action="store_true")
|
||||||
|
parser.add_argument("-k", "--keyfile", help="Keyfile used for decryption", default=None)
|
||||||
parser.add_argument("-c", "--cfg", help="Specify config file.", default=default_cfg)
|
parser.add_argument("-c", "--cfg", help="Specify config file.", default=default_cfg)
|
||||||
|
|
||||||
arguments = parser.parse_args()
|
arguments = parser.parse_args()
|
||||||
@ -214,25 +180,28 @@ def main():
|
|||||||
import importlib
|
import importlib
|
||||||
cfg = importlib.import_module(arguments.cfg)
|
cfg = importlib.import_module(arguments.cfg)
|
||||||
settings_server = cfg.settings_server
|
settings_server = cfg.settings_server
|
||||||
settings_encrypt = None
|
settings_encrypt = cfg.settings_encrypt
|
||||||
|
keyfile = arguments.keyfile or settings_encrypt["keyfile"]
|
||||||
|
|
||||||
if arguments.decrypt and arguments.encrypt:
|
if arguments.decrypt and arguments.encrypt:
|
||||||
print("Re-encrypting")
|
print("Re-encrypting")
|
||||||
|
|
||||||
if arguments.decrypt: # arguments.decrypt
|
if arguments.decrypt: # arguments.decrypt
|
||||||
print("Decrypt...")
|
print("Decrypt...")
|
||||||
settings_server = settings_server_decrypt_cfg(cfg.settings_server, cfg.settings_encrypt)
|
settings_server = settings_server_decrypt(settings_server, settings_encrypt, arguments.keyfile)
|
||||||
settings_encrypt = OrderedDict([
|
settings_encrypt = OrderedDict([
|
||||||
("encrypt", False),
|
("encrypt", False),
|
||||||
("salt", cfg.settings_encrypt["encrypt"])
|
("salt", settings_encrypt["salt"]),
|
||||||
|
("keyfile", arguments.keyfile)
|
||||||
])
|
])
|
||||||
|
|
||||||
if arguments.encrypt:
|
if arguments.encrypt:
|
||||||
print("Encrypt...")
|
print("Encrypt...")
|
||||||
salt, settings_server = settings_server_encrypt_cfg(settings_server)
|
salt, settings_server = settings_server_encrypt(settings_server, keyfile)
|
||||||
settings_encrypt = OrderedDict([
|
settings_encrypt = OrderedDict([
|
||||||
("encrypt", True),
|
("encrypt", True),
|
||||||
("salt", salt)
|
("salt", salt),
|
||||||
|
("keyfile", arguments.keyfile)
|
||||||
])
|
])
|
||||||
|
|
||||||
print("settings_server = {}".format(pformat(settings_server)))
|
print("settings_server = {}".format(pformat(settings_server)))
|
||||||
@ -241,4 +210,8 @@ def main():
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
sys.exit(main())
|
try:
|
||||||
|
sys.exit(main())
|
||||||
|
except EncryptionFail as e:
|
||||||
|
print(e)
|
||||||
|
sys.exit(1)
|
||||||
|
182
src/main.py
182
src/main.py
@ -24,142 +24,6 @@ import datetime
|
|||||||
import contextlib
|
import contextlib
|
||||||
|
|
||||||
|
|
||||||
# A class that inherits from YandereBot from the yandere_bot module
|
|
||||||
# This class is used to handle command line arguments gracefully, and extend functionality quickly
|
|
||||||
# Bot specific changes should be made here (if they are minor enough).
|
|
||||||
# This will be instantiated from the main() function
|
|
||||||
class YandereBot(yandere_bot.YandereBot):
|
|
||||||
# The below settings are required from the configuration module
|
|
||||||
settings_time = None
|
|
||||||
settings_reminder = None
|
|
||||||
|
|
||||||
# Wait before running
|
|
||||||
delay_start = 0
|
|
||||||
|
|
||||||
def __init__(self, cfg, debug_mode, prime_bot, delay_d=None, delay_h=None, delay_s=None):
|
|
||||||
super(YandereBot, self).__init__(cfg, debug_mode, prime_bot=False)
|
|
||||||
self.load_settings(self.cfg, ("settings_time", "settings_reminder"))
|
|
||||||
self.set_pretimer(delay_d, delay_h, delay_s)
|
|
||||||
|
|
||||||
if self.debug_mode:
|
|
||||||
print("[DEBUG MODE ON - DRY RUN BEGIN]")
|
|
||||||
|
|
||||||
# Do not perform sanity test if stdout is None.
|
|
||||||
# input() will fail if stdout is None, which could happen with the following command
|
|
||||||
# ./run --dry-run -h -d 'a date in the past'
|
|
||||||
if sys.stdout is not None and not self.pass_sanity_test():
|
|
||||||
raise FailedSanityTest
|
|
||||||
|
|
||||||
if prime_bot:
|
|
||||||
self.prime_bot()
|
|
||||||
|
|
||||||
def print_date_time_example(self):
|
|
||||||
print_fmt = " {0:6} {1:10} {2}"
|
|
||||||
time_fmt = self.settings_time["time_format"]
|
|
||||||
date_fmt = self.settings_time["date_format"]
|
|
||||||
current_time = self.dateSelection
|
|
||||||
|
|
||||||
print(print_fmt.format(
|
|
||||||
"TIME", time_fmt, current_time.strftime(time_fmt)
|
|
||||||
))
|
|
||||||
print(print_fmt.format(
|
|
||||||
"DATE", date_fmt, current_time.strftime(date_fmt)
|
|
||||||
))
|
|
||||||
|
|
||||||
def set_delay_d(self, d):
|
|
||||||
try:
|
|
||||||
t = datetime.datetime.strptime(d, self.settings_time["date_format"])
|
|
||||||
self.dateNextSelection = self.dateNextSelection.replace(
|
|
||||||
year=t.year, month=t.month, day=t.day
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
print("Invalid date format: {}\n\nCorrect date/time format examples:".format(d))
|
|
||||||
self.print_date_time_example()
|
|
||||||
raise DateWrongFormat
|
|
||||||
|
|
||||||
def set_delay_h(self, h, add_24):
|
|
||||||
try:
|
|
||||||
t = datetime.datetime.strptime(h, self.settings_time["time_format"])
|
|
||||||
self.dateNextSelection = self.dateNextSelection.replace(
|
|
||||||
hour=t.hour, minute=t.minute, second=t.second, microsecond=t.microsecond
|
|
||||||
)
|
|
||||||
if self.dateNextSelection < self.dateSelection and add_24:
|
|
||||||
self.dateNextSelection = yandere_bot.time_add_seconds(self.dateNextSelection, 60 * 60 * 24)
|
|
||||||
except Exception:
|
|
||||||
print("Invalid time format: {}\n\nCorrect date/time format examples:".format(h))
|
|
||||||
self.print_date_time_example()
|
|
||||||
raise TimeWrongFormat
|
|
||||||
|
|
||||||
def set_pretimer(self, d=None, h=None, s=0):
|
|
||||||
if d:
|
|
||||||
self.set_delay_d(d)
|
|
||||||
if h:
|
|
||||||
self.set_delay_h(h, d is None)
|
|
||||||
if s:
|
|
||||||
self.delay_start = max(0, s)
|
|
||||||
|
|
||||||
# Check for potential misconfigurations by the user
|
|
||||||
def pass_sanity_test(self):
|
|
||||||
# Calculate pre-timer value
|
|
||||||
seconds_until_next_pos = yandere_bot.time_diff_seconds(self.dateNextSelection, self.dateSelection)
|
|
||||||
|
|
||||||
# Possible misconfigurations that will prompt the user to continue
|
|
||||||
pretimer_less_than_zero = seconds_until_next_pos < 0
|
|
||||||
pretimer_greater_than_sleep = seconds_until_next_pos > self.settings_behavior["sleep_seconds"]
|
|
||||||
|
|
||||||
# Prompt the user
|
|
||||||
prompt_user = pretimer_less_than_zero or pretimer_greater_than_sleep
|
|
||||||
|
|
||||||
# Remind the user to generate new OAuth tokens
|
|
||||||
dt = datetime.datetime.strptime(self.settings_reminder, self.settings_time["long_date_format"])
|
|
||||||
if dt < datetime.datetime.now():
|
|
||||||
print("REMINDER: Generate new tokens!!")
|
|
||||||
|
|
||||||
# Check if the bot is back-posting in time and make sure this is what the user wanted to avoid spamming
|
|
||||||
if pretimer_less_than_zero:
|
|
||||||
sleep = round(abs(seconds_until_next_pos), 2)
|
|
||||||
images = round(sleep / (self.settings_behavior["sleep_seconds"] * self.settings_behavior["uploads_per_post"]), 2) + 1
|
|
||||||
print("WARNING: Pre-timer is less than the current time by: {} seconds. {} images will post immediately".format(
|
|
||||||
sleep, images
|
|
||||||
))
|
|
||||||
# Check if the bot will wait for longer than the default amount of sleep time configured in the cfg.py
|
|
||||||
elif pretimer_greater_than_sleep:
|
|
||||||
print("WARNING: Pre-timer will sleep for {} seconds. This is more than the configured amount ({} seconds)".format(
|
|
||||||
round(seconds_until_next_pos, 2), self.settings_behavior["sleep_seconds"]
|
|
||||||
))
|
|
||||||
|
|
||||||
# Prompt the user if something doesn't seem right
|
|
||||||
# This must be done before we set up our keyboard interrupts. Otherwise the below exceptions will not work.
|
|
||||||
try:
|
|
||||||
if prompt_user:
|
|
||||||
# Default to 'y' if the user just presses enter
|
|
||||||
ans = input("Do you want to continue [Y/n]? ") or "y"
|
|
||||||
return ans.lower() in ("y", "yes")
|
|
||||||
except (EOFError, KeyboardInterrupt):
|
|
||||||
print()
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Sanity test passed
|
|
||||||
return True
|
|
||||||
|
|
||||||
def start(self, delay=None):
|
|
||||||
_delay = delay or self.delay_start
|
|
||||||
return super(YandereBot, self).start(max(0, _delay))
|
|
||||||
|
|
||||||
|
|
||||||
# Custom exceptions
|
|
||||||
class TimeWrongFormat(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class DateWrongFormat(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class FailedSanityTest(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class FailedToLoadCfg(Exception):
|
class FailedToLoadCfg(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@ -173,20 +37,14 @@ def main():
|
|||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description="A bot for posting on Mastodon",
|
description="A bot for posting on Mastodon",
|
||||||
# epilog="All switches can be combined for greater control",
|
# epilog="All switches can be combined for greater control",
|
||||||
add_help=False)
|
add_help=True)
|
||||||
parser.add_argument("--dry-run", help="Will not login or post to Plemora", action="store_true")
|
parser.add_argument("--dry-run", help="Will not login or post to Plemora", action="store_true")
|
||||||
parser.add_argument("--debug", help="Same as --dry-run", action="store_true")
|
parser.add_argument("--debug", help="Same as --dry-run", action="store_true")
|
||||||
parser.add_argument("-w", "--wait", type=int, help="Wait before posting first image (seconds)", default=0)
|
|
||||||
parser.add_argument("-t", "--time", help="Wait for time before posting first image", default=None)
|
|
||||||
parser.add_argument("-d", "--date", help="Wait for date before posting first image", default=None)
|
|
||||||
parser.add_argument("-c", "--config", help="Set custom config file (Default: {})".format(default_cfg), default=default_cfg)
|
parser.add_argument("-c", "--config", help="Set custom config file (Default: {})".format(default_cfg), default=default_cfg)
|
||||||
parser.add_argument("-h", "--help", help="Show this help message and exit", action="store_true")
|
parser.add_argument("-k", "--keyfile", help="Keyfile used for decryption", default=None)
|
||||||
parser.add_argument("remainder", help=argparse.SUPPRESS, nargs=argparse.REMAINDER)
|
parser.add_argument("remainder", help=argparse.SUPPRESS, nargs=argparse.REMAINDER)
|
||||||
arguments = parser.parse_args()
|
arguments = parser.parse_args()
|
||||||
|
|
||||||
# Redirect stdout when the bot first initializes if the bot is not going to run normally
|
|
||||||
redirect_stdout = None if arguments.help else sys.stdout
|
|
||||||
|
|
||||||
# Yandere Lewd Bot
|
# Yandere Lewd Bot
|
||||||
yandere = None
|
yandere = None
|
||||||
yandere_config = None
|
yandere_config = None
|
||||||
@ -196,31 +54,16 @@ def main():
|
|||||||
import importlib
|
import importlib
|
||||||
yandere_config = importlib.import_module(arguments.config)
|
yandere_config = importlib.import_module(arguments.config)
|
||||||
except ImportError:
|
except ImportError:
|
||||||
print("Failed to Load Configuration:", arguments.config)
|
raise FailedToLoadCfg("Invalid config file: {}".format(arguments.config))
|
||||||
raise FailedToLoadCfg
|
|
||||||
|
|
||||||
# Flag if the bot is running in debug mode
|
# Flag if the bot is running in debug mode
|
||||||
debug_mode = (arguments.dry_run or arguments.debug or arguments.help)
|
debug_mode = arguments.dry_run or arguments.debug
|
||||||
|
|
||||||
with contextlib.redirect_stdout(redirect_stdout):
|
yandere = yandere_bot.YandereBot(
|
||||||
prime_bot = not arguments.help
|
yandere_config,
|
||||||
|
arguments.keyfile,
|
||||||
yandere = YandereBot(
|
debug_mode
|
||||||
yandere_config,
|
)
|
||||||
debug_mode,
|
|
||||||
prime_bot,
|
|
||||||
arguments.date,
|
|
||||||
arguments.time,
|
|
||||||
arguments.wait
|
|
||||||
)
|
|
||||||
|
|
||||||
# Print Usage Information with Time and Date Formats with Examples
|
|
||||||
if arguments.help:
|
|
||||||
parser.print_help()
|
|
||||||
print()
|
|
||||||
yandere.print_date_time_example()
|
|
||||||
print()
|
|
||||||
return 0
|
|
||||||
|
|
||||||
# Setup exit calls
|
# Setup exit calls
|
||||||
# Must be done after we declare our bot(s), otherwise this will be called if quitting on decrypting settings )
|
# Must be done after we declare our bot(s), otherwise this will be called if quitting on decrypting settings )
|
||||||
@ -242,13 +85,6 @@ if __name__ == "__main__":
|
|||||||
# Exceptions raised from the main function
|
# Exceptions raised from the main function
|
||||||
except FailedToLoadCfg:
|
except FailedToLoadCfg:
|
||||||
sys.exit(10)
|
sys.exit(10)
|
||||||
except FailedSanityTest:
|
|
||||||
sys.exit(9)
|
|
||||||
except DateWrongFormat:
|
|
||||||
sys.exit(8)
|
|
||||||
except TimeWrongFormat:
|
|
||||||
sys.exit(7)
|
|
||||||
|
|
||||||
# Exceptions raised from the bot
|
# Exceptions raised from the bot
|
||||||
except yandere_bot.Debug:
|
except yandere_bot.Debug:
|
||||||
sys.exit(6)
|
sys.exit(6)
|
||||||
|
@ -28,7 +28,7 @@ from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, M
|
|||||||
|
|
||||||
|
|
||||||
# A class that contains all of the rendering information for a single post
|
# A class that contains all of the rendering information for a single post
|
||||||
class YandereFrame:
|
class PickAndRenderFrame:
|
||||||
# From Config File
|
# From Config File
|
||||||
profile_name = ""
|
profile_name = ""
|
||||||
path = ""
|
path = ""
|
||||||
@ -36,16 +36,17 @@ class YandereFrame:
|
|||||||
skip = tuple()
|
skip = tuple()
|
||||||
nsfw = tuple()
|
nsfw = tuple()
|
||||||
output_name = ""
|
output_name = ""
|
||||||
output_name_tr = ""
|
|
||||||
translation = {}
|
|
||||||
message = ""
|
message = ""
|
||||||
message_nsfw = ""
|
message_nsfw = ""
|
||||||
render_script = ""
|
render_script = ""
|
||||||
|
|
||||||
# Picked
|
# After pick
|
||||||
picked_frame = None
|
picked_frame = None
|
||||||
|
output_name_tr = ""
|
||||||
|
|
||||||
def __init__(self, picked):
|
def __init__(self, picked, datetime_format):
|
||||||
|
dt_now = datetime.datetime.now()
|
||||||
|
dt_now_str = datetime.datetime.strftime(dt_now, datetime_format)
|
||||||
self.profile_name = picked["profile_name"]
|
self.profile_name = picked["profile_name"]
|
||||||
self.path = picked["path"]
|
self.path = picked["path"]
|
||||||
self.frames = picked["frames"]
|
self.frames = picked["frames"]
|
||||||
@ -56,24 +57,33 @@ class YandereFrame:
|
|||||||
self.message = picked["message"]
|
self.message = picked["message"]
|
||||||
self.message_nsfw = picked["message_nsfw"]
|
self.message_nsfw = picked["message_nsfw"]
|
||||||
self.render_script = picked["render_script"]
|
self.render_script = picked["render_script"]
|
||||||
self.add_translation("profile_name", self.profile_name)
|
|
||||||
|
|
||||||
def add_translation(self, k, v):
|
self.picked_frame = self._pick_frame()
|
||||||
self.translation.update({k: v})
|
|
||||||
|
|
||||||
def pick(self):
|
# Shell-like substitutions
|
||||||
random.seed(os.urandom(16))
|
translations = {
|
||||||
|
"profile_name": self.profile_name,
|
||||||
|
"frame": str(self.picked_frame),
|
||||||
|
"datetime": dt_now_str
|
||||||
|
}
|
||||||
|
|
||||||
|
self.output_name_tr = self._translate_basename(translations)
|
||||||
|
self._render_frame()
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
render_file = self.output_name_tr
|
||||||
|
if render_file and os.path.isfile(render_file):
|
||||||
|
os.remove(render_file)
|
||||||
|
|
||||||
|
def _pick_frame(self):
|
||||||
picked_frame = None
|
picked_frame = None
|
||||||
while picked_frame is None:
|
while picked_frame is None:
|
||||||
picked_frame = random.random() * self.frames
|
picked_frame = random.random() * self.frames
|
||||||
for skip in self.skip:
|
for skip in self.skip:
|
||||||
begin, end = skip
|
begin, end = skip
|
||||||
if begin <= picked_frame <= end:
|
if begin <= picked_frame <= end:
|
||||||
print("Cannot pick frame:", picked_frame, "from:", self.profile_name)
|
|
||||||
picked_frame = None
|
picked_frame = None
|
||||||
break
|
break
|
||||||
self.picked_frame = picked_frame
|
|
||||||
self.add_translation("frame", str(picked_frame))
|
|
||||||
return picked_frame
|
return picked_frame
|
||||||
|
|
||||||
def is_nsfw(self):
|
def is_nsfw(self):
|
||||||
@ -83,29 +93,25 @@ class YandereFrame:
|
|||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def translate_basename(self):
|
def _translate_basename(self, translations):
|
||||||
output_name = self.output_name
|
output_name_tr = self.output_name
|
||||||
output_name_tr = output_name
|
for k,v in translations.items():
|
||||||
for k,v in self.translation.items():
|
|
||||||
replace_token = "${" + k + "}"
|
replace_token = "${" + k + "}"
|
||||||
output_name_tr = output_name_tr.replace(replace_token, v)
|
output_name_tr = output_name_tr.replace(replace_token, v)
|
||||||
self.output_path_tr = output_name_tr
|
|
||||||
return output_name_tr
|
return output_name_tr
|
||||||
|
|
||||||
# TODO: Add translation keywords to the message
|
# TODO: Add translation keywords to the message
|
||||||
def get_message(self):
|
def get_message(self):
|
||||||
return self.message_nsfw if self.is_nsfw() else self.message
|
return self.message_nsfw if self.is_nsfw() else self.message
|
||||||
|
|
||||||
def render(self):
|
def _render_frame(self):
|
||||||
self.translate_basename()
|
|
||||||
args = [
|
args = [
|
||||||
self.render_script,
|
self.render_script,
|
||||||
self.path,
|
self.path,
|
||||||
self.output_path_tr,
|
self.output_name_tr,
|
||||||
self.translation["frame"]
|
str(self.picked_frame)
|
||||||
]
|
]
|
||||||
subprocess.run(args)
|
subprocess.run(args)
|
||||||
return self.output_path_tr
|
|
||||||
|
|
||||||
|
|
||||||
class YandereBot:
|
class YandereBot:
|
||||||
@ -123,27 +129,22 @@ class YandereBot:
|
|||||||
|
|
||||||
# Class variables
|
# Class variables
|
||||||
mastodon_api = None
|
mastodon_api = None
|
||||||
listPictures = []
|
|
||||||
failed_uploads = 0
|
failed_uploads = 0
|
||||||
|
consecutive_failed_uploads = 0
|
||||||
currentSessionCount = 0
|
currentSessionCount = 0
|
||||||
debug_mode = False
|
debug_mode = False
|
||||||
primed = False
|
primed = False
|
||||||
decrypted = False
|
decrypted = False
|
||||||
|
|
||||||
# Time variables
|
|
||||||
dateSelection = None
|
|
||||||
dateNextSelection = None
|
|
||||||
|
|
||||||
# YandereBot.__init__()
|
# YandereBot.__init__()
|
||||||
# @param cfg A dynamically loaded python module
|
# @param cfg A dynamically loaded python module
|
||||||
# @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma)
|
# @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma)
|
||||||
# prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post)
|
# prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post)
|
||||||
def __init__(self, cfg, debug_mode=False, prime_bot=True):
|
def __init__(self, cfg, keyfile=None, debug_mode=False, prime_bot=True):
|
||||||
self.dateSelection = datetime.datetime.now()
|
|
||||||
self.dateNextSelection = self.dateSelection
|
|
||||||
self.cfg = cfg
|
self.cfg = cfg
|
||||||
self.load_settings(self.cfg)
|
self.load_settings(self.cfg)
|
||||||
self.debug_mode = debug_mode or self.settings_behavior["debug"]
|
self.debug_mode = debug_mode or self.settings_behavior["debug"]
|
||||||
|
self.settings_encrypt["keyfile"] = keyfile or self.settings_encrypt["keyfile"]
|
||||||
if prime_bot:
|
if prime_bot:
|
||||||
self.prime_bot()
|
self.prime_bot()
|
||||||
|
|
||||||
@ -172,7 +173,11 @@ class YandereBot:
|
|||||||
def decrypt_settings(self):
|
def decrypt_settings(self):
|
||||||
if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode:
|
if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode:
|
||||||
import encryption
|
import encryption
|
||||||
self.settings_server = encryption.settings_server_decrypt_cfg(self.settings_server, self.settings_encrypt)
|
try:
|
||||||
|
self.settings_server = encryption.settings_server_decrypt(
|
||||||
|
self.settings_server, self.settings_encrypt, self.settings_encrypt["keyfile"])
|
||||||
|
except encryption.EncryptionFail as e:
|
||||||
|
raise BadCfgFile(str(e))
|
||||||
self.decrypted = True
|
self.decrypted = True
|
||||||
|
|
||||||
# Login to Pleroma
|
# Login to Pleroma
|
||||||
@ -195,33 +200,30 @@ class YandereBot:
|
|||||||
raise FailedLogin
|
raise FailedLogin
|
||||||
|
|
||||||
# Maybe I should remove this from the backend?
|
# Maybe I should remove this from the backend?
|
||||||
def print_header_stats(self, yandere_frame, date_selection, date_next_selection):
|
def print_header_stats(self, picked):
|
||||||
profile, frame, nsfw, path = None, None, None, None
|
profile, frame, nsfw, path = None, None, None, None
|
||||||
if yandere_frame:
|
if picked:
|
||||||
profile = yandere_frame.profile_name
|
profile = picked.profile_name
|
||||||
frame = yandere_frame.picked_frame
|
frame = picked.picked_frame
|
||||||
nsfw = yandere_frame.is_nsfw()
|
nsfw = picked.is_nsfw()
|
||||||
path = yandere_frame.output_path_tr
|
path = picked.output_name_tr
|
||||||
print("\n==== PLEROMA ====")
|
print("Profile: {} | Frame: {} | NSFW: {} | Path: {}".format(
|
||||||
print("Profile: {} | Frame: {} | NSFW: {}".format(
|
profile, frame, nsfw, path
|
||||||
profile, frame, nsfw
|
|
||||||
))
|
))
|
||||||
print("Path:", path)
|
|
||||||
next_selection_seconds = max(0, int(time_diff_seconds(date_next_selection, date_selection)))
|
|
||||||
print("Selection time: {}".format(
|
|
||||||
date_selection.strftime(self.settings_time["long_date_format"])) )
|
|
||||||
print("Next selection time: {} ({} seconds)".format(
|
|
||||||
date_next_selection.strftime(self.settings_time["long_date_format"]), next_selection_seconds) )
|
|
||||||
print("[ {} Selected during session | {} Failed ]\n".format(
|
|
||||||
self.currentSessionCount, self.failed_uploads) )
|
|
||||||
|
|
||||||
# Returns a list of tuples that contain the media list path and media mastodon dictionary
|
# Returns a list of tuples that contain the media list path and media mastodon dictionary
|
||||||
def upload_media_list(self, path_list):
|
def upload_media_list(self, path_list):
|
||||||
media_list = []
|
media_list = []
|
||||||
for ele in path_list:
|
for path in path_list:
|
||||||
|
if not os.path.isfile(path):
|
||||||
|
raise FileNotFoundError("Could not upload: {}".format(path))
|
||||||
if not self.debug_mode:
|
if not self.debug_mode:
|
||||||
media = self.mastodon_api.media_post(ele, description=os.path.basename(ele))
|
media = self.mastodon_api.media_post(path, description=os.path.basename(path))
|
||||||
media_list.append((ele, media))
|
media_dict = {
|
||||||
|
"path": path,
|
||||||
|
"media": media
|
||||||
|
}
|
||||||
|
media_list.append(media_dict)
|
||||||
return media_list
|
return media_list
|
||||||
|
|
||||||
def get_post_text(self, yandere_frame, media_list):
|
def get_post_text(self, yandere_frame, media_list):
|
||||||
@ -232,8 +234,9 @@ class YandereBot:
|
|||||||
string_imglinks = []
|
string_imglinks = []
|
||||||
|
|
||||||
if media_list and self.settings_behavior["post_image_link"]:
|
if media_list and self.settings_behavior["post_image_link"]:
|
||||||
for ele in media_list:
|
for media_dict in media_list:
|
||||||
path, media = ele
|
path = media_dict["path"]
|
||||||
|
media = media_dict["media"]
|
||||||
if path is None or media is None:
|
if path is None or media is None:
|
||||||
continue
|
continue
|
||||||
elif content_type == "text/markdown" and not self.debug_mode:
|
elif content_type == "text/markdown" and not self.debug_mode:
|
||||||
@ -246,31 +249,25 @@ class YandereBot:
|
|||||||
# Join non empty strings with a newline character
|
# Join non empty strings with a newline character
|
||||||
string_imglinks_joined = content_newline.join(filter(None, string_imglinks))
|
string_imglinks_joined = content_newline.join(filter(None, string_imglinks))
|
||||||
string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined)))
|
string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined)))
|
||||||
return content_type, string_post
|
return string_post
|
||||||
|
|
||||||
def _post(self, yandere_frame):
|
def _post(self, yandere_frame):
|
||||||
media_list = self.upload_media_list([yandere_frame.output_path_tr])
|
if not yandere_frame:
|
||||||
content_type, message = self.get_post_text(yandere_frame, media_list)
|
raise InvalidPost("Frame is none")
|
||||||
|
upload_list = [yandere_frame.output_name_tr]
|
||||||
|
media_list = self.upload_media_list(upload_list)
|
||||||
|
message = self.get_post_text(yandere_frame, media_list)
|
||||||
if self.debug_mode:
|
if self.debug_mode:
|
||||||
return yandere_frame
|
return yandere_frame
|
||||||
self.mastodon_api.status_post(
|
self.mastodon_api.status_post(
|
||||||
message,
|
message,
|
||||||
media_ids=[i[1] for i in media_list if len(i) == 2],
|
media_ids=[i["media"] for i in media_list],
|
||||||
visibility=self.settings_behavior["visibility"],
|
visibility=self.settings_behavior["visibility"],
|
||||||
sensitive=yandere_frame.is_nsfw(),
|
sensitive=yandere_frame.is_nsfw(),
|
||||||
content_type=content_type
|
content_type=self.settings_behavior["content_type"]
|
||||||
)
|
)
|
||||||
return yandere_frame
|
return yandere_frame
|
||||||
|
|
||||||
def render_frame(self, picked_profile):
|
|
||||||
yandere_frame = YandereFrame(picked_profile)
|
|
||||||
yandere_frame.add_translation("datetime", datetime.datetime.strftime(datetime.datetime.now(), self.cfg.settings_time["datetime"]))
|
|
||||||
yandere_frame.pick()
|
|
||||||
yandere_frame.render()
|
|
||||||
if not os.path.isfile(yandere_frame.output_path_tr):
|
|
||||||
raise FileNotFoundError("Could not generate screenshot:", yandere_frame.output_path_tr)
|
|
||||||
return yandere_frame
|
|
||||||
|
|
||||||
# The main post function
|
# The main post function
|
||||||
# This function is responsible for incrementing self.currentSessionCount, as well as posting and blacklisting the
|
# This function is responsible for incrementing self.currentSessionCount, as well as posting and blacklisting the
|
||||||
# picked item.
|
# picked item.
|
||||||
@ -282,73 +279,39 @@ class YandereBot:
|
|||||||
def post(self):
|
def post(self):
|
||||||
picked = None
|
picked = None
|
||||||
|
|
||||||
# Flags that are set if an upload fails
|
|
||||||
timeout = False
|
|
||||||
|
|
||||||
# Attempt post
|
# Attempt post
|
||||||
try:
|
try:
|
||||||
# Post
|
# Post
|
||||||
|
dt_picked = self.settings_time["datetime"]
|
||||||
picked_profile = random.choice(self.cfg.settings_post)
|
picked_profile = random.choice(self.cfg.settings_post)
|
||||||
picked = self.render_frame(picked_profile)
|
picked = PickAndRenderFrame(picked_profile, dt_picked)
|
||||||
self._post(picked)
|
self._post(picked)
|
||||||
if os.path.isfile(picked.output_path_tr):
|
|
||||||
os.remove(picked.output_path_tr)
|
|
||||||
else:
|
|
||||||
raise FileNotFoundError("Unable to render {}".format(picked.output_path_tr))
|
|
||||||
|
|
||||||
# After a successful post
|
# After a successful post
|
||||||
self.currentSessionCount += 1
|
self.currentSessionCount += 1
|
||||||
|
self.consecutive_failed_uploads = 0
|
||||||
|
|
||||||
# The post was successful
|
# The post was successful
|
||||||
return picked
|
return picked
|
||||||
|
|
||||||
# Attempted to post a file that doesn't exist (immediately repost ignoring retry_seconds)
|
|
||||||
except FileNotFoundError as e:
|
|
||||||
print(e)
|
|
||||||
# Exception flags
|
|
||||||
timeout = False
|
|
||||||
|
|
||||||
# Check if the file limit has been reached
|
|
||||||
except MastodonAPIError as e:
|
|
||||||
# Check if the file limit has been reached (413 error)
|
|
||||||
file_limit_reached = False
|
|
||||||
with contextlib.suppress(IndexError):
|
|
||||||
file_limit_reached = (e.args[1] == 413)
|
|
||||||
|
|
||||||
print("API Error:", e)
|
|
||||||
# Exception flags
|
|
||||||
timeout = True
|
|
||||||
|
|
||||||
# Server Errors
|
# Server Errors
|
||||||
# Assume all exceptions are on the server side
|
# Assume all exceptions are on the server side (except for FileNotFoundError of course
|
||||||
# If the connection is timing out it could be for two reasons:
|
# If the connection is timing out it could be for two reasons:
|
||||||
# 1. The error was caused by the user attempting to upload a large file over a slow connection:
|
# 1. The error was caused by the user attempting to upload a large file over a slow connection:
|
||||||
# a. FIX: Reduce settings_behavior["max_size"]
|
# a. FIX: Reduce settings_behavior["max_size"]
|
||||||
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
|
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
|
||||||
# mastodon.py API will not specify why the connection timed out).
|
# mastodon.py API will not specify why the connection timed out).
|
||||||
# The default assumption is #2
|
# The default assumption is #2
|
||||||
except Exception as e:
|
except (FileNotFoundError, MastodonAPIError, InvalidPost, Exception) as e:
|
||||||
print("Unhandled Exception:", e)
|
print("Exception:", e)
|
||||||
# Exception flags
|
|
||||||
timeout = True
|
|
||||||
|
|
||||||
# An exception occurred
|
# An exception occurred
|
||||||
self.failed_uploads += 1
|
self.failed_uploads += 1
|
||||||
if timeout:
|
self.consecutive_failed_uploads += 1
|
||||||
self.eventSleep.wait(self.settings_behavior["retry_seconds"])
|
|
||||||
|
|
||||||
# The post failed
|
# The post failed
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def schedule_next_post(self):
|
|
||||||
self.dateSelection = self.dateNextSelection
|
|
||||||
self.dateNextSelection = time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"])
|
|
||||||
|
|
||||||
# Will wait between the current time and the time of next selection
|
|
||||||
def wait_future_time(self):
|
|
||||||
seconds = time_diff_seconds(self.dateNextSelection, datetime.datetime.now())
|
|
||||||
self.eventSleep.wait(max(0, seconds))
|
|
||||||
|
|
||||||
# [BEGIN THE PROGRAM]
|
# [BEGIN THE PROGRAM]
|
||||||
def prime_bot(self):
|
def prime_bot(self):
|
||||||
random.seed(os.urandom(16))
|
random.seed(os.urandom(16))
|
||||||
@ -362,30 +325,6 @@ class YandereBot:
|
|||||||
if not self.primed:
|
if not self.primed:
|
||||||
self.prime_bot()
|
self.prime_bot()
|
||||||
|
|
||||||
# Early out if the bot is incapable of posting
|
|
||||||
if not self.can_post():
|
|
||||||
print("Bot is incapable of posting!!")
|
|
||||||
return 1
|
|
||||||
|
|
||||||
start_time = self.dateSelection
|
|
||||||
delay_seconds = max(time_diff_seconds(self.dateNextSelection, start_time) + delay, delay)
|
|
||||||
delay_time = time_add_seconds(start_time, delay_seconds)
|
|
||||||
|
|
||||||
# Print the first image in the list if a delay or pretimer is set
|
|
||||||
if delay_seconds:
|
|
||||||
self.print_header_stats(None, start_time, delay_time)
|
|
||||||
|
|
||||||
# The delay parameter is different from the dateSelection and dateSelectionNext
|
|
||||||
# It will literally time out the bot for a given number of seconds regardless of the pre-timer setting
|
|
||||||
# This is useful if you want to set a delay of 30 seconds, before back-posting several images
|
|
||||||
self.eventSleep.wait(max(0, delay))
|
|
||||||
|
|
||||||
# Check if the pre-timer is set
|
|
||||||
# dateNextSelection should be greater than dateSelection if it is
|
|
||||||
# dateSelection and dateNextSelection are both set to the current time when the bot is initialized
|
|
||||||
if delay_seconds:
|
|
||||||
self.wait_future_time()
|
|
||||||
|
|
||||||
# Begin posting
|
# Begin posting
|
||||||
self.main_loop()
|
self.main_loop()
|
||||||
|
|
||||||
@ -396,32 +335,19 @@ class YandereBot:
|
|||||||
# 1. User presses Ctrl+C
|
# 1. User presses Ctrl+C
|
||||||
# 2. settings_behavior["uploads_per_post"] is less than one for some reason
|
# 2. settings_behavior["uploads_per_post"] is less than one for some reason
|
||||||
def can_post(self):
|
def can_post(self):
|
||||||
return not self.eventSleep.is_set() and self.settings_behavior["uploads_per_post"] > 0
|
return (
|
||||||
|
not self.eventSleep.is_set() and
|
||||||
|
self.currentSessionCount < self.settings_behavior["uploads_per_post"] and
|
||||||
|
self.consecutive_failed_uploads < self.settings_behavior["max_errors"]
|
||||||
|
)
|
||||||
|
|
||||||
def main_loop(self):
|
def main_loop(self):
|
||||||
target_posts = self.settings_behavior["uploads_per_post"]
|
sleep_seconds = self.settings_behavior["retry_seconds"]
|
||||||
while self.can_post():
|
while self.can_post():
|
||||||
successful_posts = 0
|
picked = self.post()
|
||||||
while (successful_posts < target_posts) and self.can_post():
|
self.print_header_stats(picked)
|
||||||
last_picked = self.post()
|
if self.can_post():
|
||||||
successful_posts += int(last_picked is not None)
|
self.eventSleep.wait(sleep_seconds)
|
||||||
if successful_posts >= target_posts:
|
|
||||||
self.schedule_next_post()
|
|
||||||
self.print_header_stats(last_picked, self.dateSelection, self.dateNextSelection)
|
|
||||||
else:
|
|
||||||
self.print_header_stats(last_picked, self.dateNextSelection, self.dateNextSelection)
|
|
||||||
|
|
||||||
if self.can_post():
|
|
||||||
self.wait_future_time()
|
|
||||||
|
|
||||||
# ------------------------------- TIME FUNCTIONS ---------------------------------------------
|
|
||||||
def time_add_seconds(dt, seconds):
|
|
||||||
return dt + datetime.timedelta(0, seconds)
|
|
||||||
|
|
||||||
|
|
||||||
def time_diff_seconds(d1, d2):
|
|
||||||
return (d1-d2).total_seconds()
|
|
||||||
|
|
||||||
|
|
||||||
# Custom Exceptions for YandereBot
|
# Custom Exceptions for YandereBot
|
||||||
class Debug(Exception):
|
class Debug(Exception):
|
||||||
@ -434,3 +360,7 @@ class FailedLogin(Exception):
|
|||||||
|
|
||||||
class BadCfgFile(Exception):
|
class BadCfgFile(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidPost(Exception):
|
||||||
|
pass
|
||||||
|
Reference in New Issue
Block a user