Removed built-in timers. Use cron to schedule posts

This commit is contained in:
Anon 2022-10-08 17:37:03 -07:00
parent e8a083c11d
commit f07afc254f
2 changed files with 73 additions and 296 deletions

View File

@ -1,7 +1,7 @@
#! /usr/bin/env python3 #! /usr/bin/env python3
# Danbooru Bot, an image posting bot for Pleroma # Mirai Nikki Bot a video frame posting bot for Pleroma
# Copyright (C) 2022 Anon <yanderefedi@proton.me> # Copyright (C) 2022 Anon
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by # it under the terms of the GNU General Public License as published by
@ -24,146 +24,10 @@ import datetime
import contextlib import contextlib
# A class that inherits YandereBot from the yandere_bot module
# This class is used to handle command line arguments gracefully, and extend functionality quickly
# Bot specific changes should be made here (if they are minor enough).
# This will be instantiated from the main() function
class YandereBot(yandere_bot.YandereBot):
# The below settings are required from the configuration module
settings_time = None
settings_reminder = None
# Wait before running
delay_start = 0
def __init__(self, cfg, debug_mode, prime_bot, start_index, delay_d=None, delay_h=None, delay_s=None):
super(YandereBot, self).__init__(cfg, debug_mode, prime_bot=False)
self.load_settings(self.cfg, ("settings_time", "settings_reminder"))
self.set_pretimer(delay_d, delay_h, delay_s)
self.currentIndexCount = start_index
if self.debug_mode:
print("[DEBUG MODE ON - DRY RUN BEGIN]")
# Do not perform sanity test if stdout is None.
# input() will fail if stdout is None, which could happen with the following command
# ./run --dry-run -h -d 'a date in the past'
if sys.stdout is not None and not self.pass_sanity_test():
raise FailedSanityTest
if prime_bot:
self.prime_bot()
def print_date_time_example(self):
print_fmt = " {0:6} {1:10} {2}"
time_fmt = self.settings_time["time_format"]
date_fmt = self.settings_time["date_format"]
current_time = self.dateSelection
print(print_fmt.format(
"TIME", time_fmt, current_time.strftime(time_fmt)
))
print(print_fmt.format(
"DATE", date_fmt, current_time.strftime(date_fmt)
))
def set_delay_d(self, d):
try:
t = datetime.datetime.strptime(d, self.settings_time["date_format"])
self.dateNextSelection = self.dateNextSelection.replace(
year=t.year, month=t.month, day=t.day
)
except Exception:
print("Invalid date format: {}\n\nCorrect date/time format examples:".format(d))
self.print_date_time_example()
raise DateWrongFormat
def set_delay_h(self, h, add_24):
try:
t = datetime.datetime.strptime(h, self.settings_time["time_format"])
self.dateNextSelection = self.dateNextSelection.replace(
hour=t.hour, minute=t.minute, second=t.second, microsecond=t.microsecond
)
if self.dateNextSelection < self.dateSelection and add_24:
self.dateNextSelection = yandere_bot.time_add_seconds(self.dateNextSelection, 60 * 60 * 24)
except Exception:
print("Invalid time format: {}\n\nCorrect date/time format examples:".format(h))
self.print_date_time_example()
raise TimeWrongFormat
def set_pretimer(self, d=None, h=None, s=0):
if d:
self.set_delay_d(d)
if h:
self.set_delay_h(h, d is None)
if s:
self.delay_start = max(0, s)
# Check for potential misconfigurations by the user
def pass_sanity_test(self):
# Calculate pre-timer value
seconds_until_next_pos = yandere_bot.time_diff_seconds(self.dateNextSelection, self.dateSelection)
# Possible misconfigurations that will prompt the user to continue
pretimer_less_than_zero = seconds_until_next_pos < 0
pretimer_greater_than_sleep = seconds_until_next_pos > self.settings_behavior["sleep_seconds"]
# Prompt the user
prompt_user = pretimer_less_than_zero or pretimer_greater_than_sleep
# Remind the user to generate new OAuth tokens
dt = datetime.datetime.strptime(self.settings_reminder, self.settings_time["long_date_format"])
if dt < datetime.datetime.now():
print("REMINDER: Generate new tokens!!")
# Check if the bot is back-posting in time and make sure this is what the user wanted to avoid spamming
if pretimer_less_than_zero:
sleep = round(abs(seconds_until_next_pos), 2)
images = round(sleep / (self.settings_behavior["sleep_seconds"] * self.settings_behavior["uploads_per_post"]), 2) + 1
print("WARNING: Pre-timer is less than the current time by: {} seconds. {} images will post immediately".format(
sleep, images
))
# Check if the bot will wait for longer than the default amount of sleep time configured in the cfg.py
elif pretimer_greater_than_sleep:
print("WARNING: Pre-timer will sleep for {} seconds. This is more than the configured amount ({} seconds)".format(
round(seconds_until_next_pos, 2), self.settings_behavior["sleep_seconds"]
))
# Prompt the user if something doesn't seem right
# This must be done before we set up our keyboard interrupts. Otherwise the below exceptions will not work.
try:
if prompt_user:
# Default to 'y' if the user just presses enter
ans = input("Do you want to continue [Y/n]? ") or "y"
return ans.lower() in ("y", "yes")
except (EOFError, KeyboardInterrupt):
print()
return False
# Sanity test passed
return True
def start(self, delay=None):
_delay = delay or self.delay_start
return super(YandereBot, self).start(max(0, _delay))
# Custom exceptions
class TimeWrongFormat(Exception):
pass
class DateWrongFormat(Exception):
pass
class FailedSanityTest(Exception):
pass
class FailedToLoadCfg(Exception): class FailedToLoadCfg(Exception):
pass pass
# Entry point if run from the command line # Entry point if run from the command line
def main(): def main():
# Default config file # Default config file
@ -173,21 +37,15 @@ def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="A bot for posting on Mastodon", description="A bot for posting on Mastodon",
# epilog="All switches can be combined for greater control", # epilog="All switches can be combined for greater control",
add_help=False) add_help=True)
parser.add_argument("--dry-run", help="Will not login or post to Plemora", action="store_true") parser.add_argument("--dry-run", help="Will not login or post to Plemora", action="store_true")
parser.add_argument("--debug", help="Same as --dry-run", action="store_true") parser.add_argument("--debug", help="Same as --dry-run", action="store_true")
parser.add_argument("-w", "--wait", type=int, help="Wait before posting first image (seconds)", default=0)
parser.add_argument("-t", "--time", help="Wait for time before posting first image", default=None)
parser.add_argument("-d", "--date", help="Wait for date before posting first image", default=None)
parser.add_argument("-i", "--index", help="Start at index (only matters if profile select is set to sequential)", default=0)
parser.add_argument("-c", "--config", help="Set custom config file (Default: {})".format(default_cfg), default=default_cfg) parser.add_argument("-c", "--config", help="Set custom config file (Default: {})".format(default_cfg), default=default_cfg)
parser.add_argument("-h", "--help", help="Show this help message and exit", action="store_true") parser.add_argument("-k", "--keyfile", help="Keyfile used for decryption")
parser.add_argument("-i", "--index", help="Start at index (only matters if profile is set to sequential", default=0)
parser.add_argument("remainder", help=argparse.SUPPRESS, nargs=argparse.REMAINDER) parser.add_argument("remainder", help=argparse.SUPPRESS, nargs=argparse.REMAINDER)
arguments = parser.parse_args() arguments = parser.parse_args()
# Redirect stdout when the bot first initializes if the bot is not going to run normally
redirect_stdout = None if arguments.help else sys.stdout
# Yandere Lewd Bot # Yandere Lewd Bot
yandere = None yandere = None
yandere_config = None yandere_config = None
@ -197,33 +55,19 @@ def main():
import importlib import importlib
yandere_config = importlib.import_module(arguments.config) yandere_config = importlib.import_module(arguments.config)
except ImportError: except ImportError:
print("Failed to Load Configuration:", arguments.config) raise FailedToLoadCfg("Invalid config file: {}".format(arguments.config))
raise FailedToLoadCfg
# Flag if the bot is running in debug mode # Flag if the bot is running in debug mode
debug_mode = (arguments.dry_run or arguments.debug or arguments.help) debug_mode = arguments.dry_run or arguments.debug
with contextlib.redirect_stdout(redirect_stdout): yandere = yandere_bot.YandereBot(
prime_bot = not arguments.help yandere_config,
arguments.keyfile,
yandere = YandereBot( debug_mode,
yandere_config, )
debug_mode,
prime_bot, yandere.currentIndexCount = int(arguments.index)
int(arguments.index),
arguments.date,
arguments.time,
arguments.wait
)
# Print Usage Information with Time and Date Formats with Examples
if arguments.help:
parser.print_help()
print()
yandere.print_date_time_example()
print()
return 0
# Setup exit calls # Setup exit calls
# Must be done after we declare our bot(s), otherwise this will be called if quitting on decrypting settings ) # Must be done after we declare our bot(s), otherwise this will be called if quitting on decrypting settings )
def yandere_quit(signo, _frame): def yandere_quit(signo, _frame):
@ -244,18 +88,9 @@ if __name__ == "__main__":
# Exceptions raised from the main function # Exceptions raised from the main function
except FailedToLoadCfg: except FailedToLoadCfg:
sys.exit(10) sys.exit(10)
except FailedSanityTest:
sys.exit(9)
except DateWrongFormat:
sys.exit(8)
except TimeWrongFormat:
sys.exit(7)
# Exceptions raised from the bot # Exceptions raised from the bot
except yandere_bot.Debug: except yandere_bot.Debug:
sys.exit(6) sys.exit(6)
except yandere_bot.MissingMasterList:
sys.exit(5)
except yandere_bot.BadCfgFile: except yandere_bot.BadCfgFile:
sys.exit(4) sys.exit(4)
except yandere_bot.FailedLogin: except yandere_bot.FailedLogin:

View File

@ -46,26 +46,22 @@ class YandereBot:
# Class variables # Class variables
mastodon_api = None mastodon_api = None
failed_uploads = 0 failed_uploads = 0
consecutive_failed_uploads = 0
currentSessionCount = 0 currentSessionCount = 0
currentIndexCount = 0 currentIndexCount = 0
debug_mode = False debug_mode = False
primed = False primed = False
decrypted = False decrypted = False
# Time variables
dateSelection = None
dateNextSelection = None
# YandereBot.__init__() # YandereBot.__init__()
# @param cfg A dynamically loaded python module. See yanlib.module_load() for an example # @param cfg A dynamically loaded python module. See yanlib.module_load() for an example
# @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma) # @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma)
# prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post) # prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post)
def __init__(self, cfg, debug_mode=False, prime_bot=True): def __init__(self, cfg, keyfile=None, debug_mode=False, prime_bot=True):
self.dateSelection = datetime.datetime.now()
self.dateNextSelection = self.dateSelection
self.cfg = cfg self.cfg = cfg
self.load_settings(self.cfg) self.load_settings(self.cfg)
self.debug_mode = debug_mode or self.settings_behavior["debug"] self.debug_mode = debug_mode or self.settings_behavior["debug"]
self.settings_encrypt["keyfile"] = keyfile or self.settings_encrypt["keyfile"]
random.seed(os.urandom(16)) random.seed(os.urandom(16))
if prime_bot: if prime_bot:
self.prime_bot() self.prime_bot()
@ -93,13 +89,17 @@ class YandereBot:
except AttributeError as e: except AttributeError as e:
print(e) print(e)
raise BadCfgFile raise BadCfgFile
def decrypt_settings(self): def decrypt_settings(self):
if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode: if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode:
import encryption import encryption
self.settings_server = encryption.settings_server_decrypt_cfg(self.settings_server, self.settings_encrypt) try:
self.settings_server = encryption.settings_server_decrypt(
self.settings_server, self.settings_encrypt, self.settings_encrypt["keyfile"])
except encryption.EncryptionFail as e:
raise BadCfgFile(str(e))
self.decrypted = True self.decrypted = True
# Login to Pleroma # Login to Pleroma
def login(self): def login(self):
skip_login = self.debug_mode or self.mastodon_api is not None skip_login = self.debug_mode or self.mastodon_api is not None
@ -120,24 +120,16 @@ class YandereBot:
raise FailedLogin raise FailedLogin
# Maybe I should remove this from the backend? # Maybe I should remove this from the backend?
def print_header_stats(self, picked, date_selection, date_next_selection): def print_header_stats(self, picked):
picked_name = picked["profile"]["name"] if picked else None profile = picked["profile"]["name"] if picked else None
picked_url = picked["file_url"] if picked else None url = picked["file_url"] if picked else None
picked_path = picked["full_path"] if picked else None path = picked["full_path"] if picked else None
picked_nsfw = picked["nsfw"] if picked else None nsfw = picked["nsfw"] if picked else None
picked_index = (self.currentIndexCount - int(self.currentSessionCount > 0)) % len(self.settings_post) index = (self.currentIndexCount - int(self.currentSessionCount > 0)) % len(self.settings_post)
next_selection_seconds = max(0, int(time_diff_seconds(date_next_selection, date_selection)))
print("[Profile]", picked_name, "[Index]", picked_index)
print(picked_url)
print("Explicit:", picked_nsfw)
print("Selection time: {}".format(
date_selection.strftime(self.settings_time["long_date_format"])) )
print("Next selection time: {} ({} seconds)".format(
date_next_selection.strftime(self.settings_time["long_date_format"]), next_selection_seconds) )
print("[ {} Selected during session | {} Failed ]\n".format(
self.currentSessionCount, self.failed_uploads) )
print("Profile: {} | Index: {} | NSFW: {} | Path: {} | URL: {}".format(
profile, index, nsfw, path, url
))
# Returns a list of media paths (without the hashes) # Returns a list of media paths (without the hashes)
def download_media(self, picked_profile): def download_media(self, picked_profile):
@ -160,12 +152,28 @@ class YandereBot:
# Returns a list of tuples that contain the media list path and media mastodon dictionary # Returns a list of tuples that contain the media list path and media mastodon dictionary
def upload_media_list(self, path_list): def upload_media_list(self, path_list):
if self.debug_mode: media_list = []
return tuple() # Validate picked
media_list = ((ele, self.mastodon_api.media_post(ele, description=os.path.basename(ele))) for ele in path_list) for path in path_list:
if not os.path.isfile(path):
raise FileNotFoundError("Could not upload: {}".format(path))
elif not self.valid_mimetype(path):
raise InvalidMimeType("Invalid mime type")
elif not self.valid_file_size(path):
raise FileTooLarge("File is too large to upload")
# Upload
for path in path_list:
if not self.debug_mode:
media = self.mastodon_api.media_post(path, description=os.path.basename(path))
media_dict = {
"path": path,
"media": media
}
media_list.append(media_dict)
return media_list return media_list
def get_post_text(self, picked, media_list): def get_post_text(self, picked, media_list):
content_type = self.settings_behavior["content_type"] content_type = self.settings_behavior["content_type"]
content_newline = self.settings_behavior["content_newline"] content_newline = self.settings_behavior["content_newline"]
nsfw = picked["nsfw"] nsfw = picked["nsfw"]
@ -190,10 +198,9 @@ class YandereBot:
string_imglinks_joined = content_newline.join(filter(None, string_imglinks)) string_imglinks_joined = content_newline.join(filter(None, string_imglinks))
string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined))) string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined)))
return content_type, string_post return string_post
def valid_mimetype(self, picked): def valid_mimetype(self, full_path):
full_path = picked["full_path"]
mime = magic.from_file(full_path, mime=True) mime = magic.from_file(full_path, mime=True)
if mime is None: if mime is None:
@ -204,38 +211,26 @@ class YandereBot:
return mime_category in ("image", "video") return mime_category in ("image", "video")
def valid_file_size(self, picked): def valid_file_size(self, full_path):
full_path = picked["full_path"]
max_size = self.settings_behavior["max_size"] max_size = self.settings_behavior["max_size"]
file_size = os.stat(full_path).st_size file_size = os.stat(full_path).st_size
return file_size <= max_size return file_size <= max_size
def _post(self, picked): def _post(self, picked):
# Validate picked if not picked:
if picked is None: raise InvalidPost("Picked post is none")
raise InvalidPost("Picked post is None") upload_list = [picked["full_path"]]
media_list = self.upload_media_list(upload_list)
if not os.path.isfile(picked["full_path"]): message = self.get_post_text(picked, media_list)
raise FileNotFoundError("File not found: {}".format(picked))
elif not self.valid_mimetype(picked):
raise InvalidMimeType("Invalid mime type")
elif not self.valid_file_size(picked):
raise FileTooLarge("File is too large to upload")
media_list = self.upload_media_list([picked["full_path"]])
content_type, message = self.get_post_text(picked, media_list)
if self.debug_mode: if self.debug_mode:
return picked return picked
self.mastodon_api.status_post( self.mastodon_api.status_post(
message, message,
media_ids=[i[1] for i in media_list if len(i) == 2], media_ids=[i["media"] for i in media_list],
visibility=self.settings_behavior["visibility"], visibility=self.settings_behavior["visibility"],
sensitive=picked["nsfw"], sensitive=picked["nsfw"],
content_type=content_type content_type=self.settings_behavior["content_type"]
) )
return picked return picked
@ -273,6 +268,7 @@ class YandereBot:
# After a successful post # After a successful post
self.currentSessionCount += 1 self.currentSessionCount += 1
self.consecutive_failed_uploads = 0
self.currentIndexCount += 1 self.currentIndexCount += 1
# The post was successful # The post was successful
@ -301,29 +297,16 @@ class YandereBot:
# An exception occurred # An exception occurred
self.failed_uploads += 1 self.failed_uploads += 1
print("[Errors: {}]".format(self.failed_uploads)) self.consecutive_failed_uploads += 1
# Sleep
self.eventSleep.wait(self.settings_behavior["retry_seconds"])
# The post failed # The post failed
return None return None
def schedule_next_post(self):
self.dateSelection = self.dateNextSelection
self.dateNextSelection = time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"])
# Will wait between the current time and the time of next selection
def wait_future_time(self):
seconds = time_diff_seconds(self.dateNextSelection, datetime.datetime.now())
self.eventSleep.wait(max(0, seconds))
# [BEGIN THE PROGRAM] # [BEGIN THE PROGRAM]
def prime_bot(self): def prime_bot(self):
if not self.debug_mode: if not self.debug_mode:
self.decrypt_settings() self.decrypt_settings()
if self.can_post(): self.login()
self.login()
self.primed = True self.primed = True
def start(self, delay=0): def start(self, delay=0):
@ -331,30 +314,6 @@ class YandereBot:
if not self.primed: if not self.primed:
self.prime_bot() self.prime_bot()
# Early out if the bot is incapable of posting
if not self.can_post():
print("Bot is incapable of posting!!")
return 1
start_time = self.dateSelection
delay_seconds = max(time_diff_seconds(self.dateNextSelection, start_time) + delay, delay)
delay_time = time_add_seconds(start_time, delay_seconds)
# Print the first image in the list if a delay or pretimer is set
if delay_seconds:
self.print_header_stats(None, start_time, delay_time)
# The delay parameter is different from the dateSelection and dateSelectionNext
# It will literally time out the bot for a given number of seconds regardless of the pre-timer setting
# This is useful if you want to set a delay of 30 seconds, before back-posting several images
self.eventSleep.wait(max(0, delay))
# Check if the pre-timer is set
# dateNextSelection should be greater than dateSelection if it is
# dateSelection and dateNextSelection are both set to the current time when the bot is initialized
if delay_seconds:
self.wait_future_time()
# Begin posting # Begin posting
self.main_loop() self.main_loop()
@ -367,35 +326,18 @@ class YandereBot:
def can_post(self): def can_post(self):
return ( return (
not self.eventSleep.is_set() and not self.eventSleep.is_set() and
self.settings_behavior["uploads_per_post"] > 0 and self.currentSessionCount < self.settings_behavior["uploads_per_post"] and
self.consecutive_failed_uploads < self.settings_behavior["max_errors"] and
self.currentIndexCount >= 0 self.currentIndexCount >= 0
) )
def main_loop(self): def main_loop(self):
target_posts = self.settings_behavior["uploads_per_post"] sleep_seconds = self.settings_behavior["retry_seconds"]
while self.can_post(): while self.can_post():
successful_posts = 0 picked = self.post()
while (successful_posts < target_posts) and self.can_post(): self.print_header_stats(picked)
last_picked = self.post()
successful_posts += int(last_picked is not None)
if successful_posts >= target_posts:
self.schedule_next_post()
self.print_header_stats(last_picked, self.dateSelection, self.dateNextSelection)
else:
self.print_header_stats(last_picked, self.dateNextSelection, self.dateNextSelection)
if self.can_post(): if self.can_post():
self.wait_future_time() self.eventSleep.wait(sleep_seconds)
# ------------------------------- TIME FUNCTIONS ---------------------------------------------
def time_add_seconds(dt, seconds):
return dt + datetime.timedelta(0, seconds)
def time_diff_seconds(d1, d2):
return (d1-d2).total_seconds()
# Custom Exceptions for YandereBot # Custom Exceptions for YandereBot
class Debug(Exception): class Debug(Exception):