This repository has been archived on 2024-03-09. You can view files and clone it, but cannot push or open issues or pull requests.
DanbooruBot/src/yandere_bot.py

424 lines
13 KiB
Python

#! /usr/bin/env python3
# Yandere Lewd Bot, an image posting bot for Pleroma
# Copyright (C) 2022 Anon <yanderefedi@proton.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import datetime
import contextlib
import math
import shutil
import importlib
import magic
import random
from threading import Event
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
class YandereBot:
# From the threading library. Is responsible for putting the bot to sleep, and exiting when the user quits (Ctrl+C)
eventSleep = Event()
# The configuration module
cfg = None
# The below settings are required from the configuration module
settings_server = None
settings_behavior = None
settings_time = None
settings_post = None
settings_encrypt = None
settings_credentials = None
# Class variables
mastodon_api = None
failed_uploads = 0
currentSessionCount = 0
currentIndexCount = 0
debug_mode = False
primed = False
decrypted = False
# Time variables
dateSelection = None
dateNextSelection = None
# YandereBot.__init__()
# @param cfg A dynamically loaded python module. See yanlib.module_load() for an example
# @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma)
# prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post)
def __init__(self, cfg, debug_mode=False, prime_bot=True):
self.dateSelection = datetime.datetime.now()
self.dateNextSelection = self.dateSelection
self.cfg = cfg
self.load_settings(self.cfg)
self.debug_mode = debug_mode or self.settings_behavior["debug"]
random.seed(os.urandom(16))
if prime_bot:
self.prime_bot()
# Setup Exit Calls
def exit(self):
self.eventSleep.set()
# Decryption settings
# Used to set class attributes with the same name to the value specified in the config file.
def load_settings(self, cfg, settings=None):
try:
default_settings = (
"settings_server",
"settings_behavior",
"settings_time",
"settings_post",
"settings_encrypt",
"settings_credentials",
)
_settings = settings or default_settings
for ele in _settings:
setattr(self, ele, getattr(cfg, ele))
except AttributeError as e:
print(e)
raise BadCfgFile
def decrypt_settings(self):
if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode:
import encryption
self.settings_server = encryption.settings_server_decrypt_cfg(self.settings_server, self.settings_encrypt)
self.decrypted = True
# Login to Pleroma
def login(self):
skip_login = self.debug_mode or self.mastodon_api is not None
if skip_login:
return
if not self.decrypted:
self.decrypt_settings()
try:
self.mastodon_api = Mastodon(
client_id=self.settings_server["client_id"],
client_secret=self.settings_server["client_secret"],
access_token=self.settings_server["access_token"],
api_base_url=self.settings_server["api_base_url"],
feature_set=self.settings_behavior["feature_set"] # <--- Necessary for Mastodon Version 1.5.1
)
except (MastodonIllegalArgumentError, MastodonVersionError) as e:
print(e)
raise FailedLogin
# Maybe I should remove this from the backend?
def print_header_stats(self, picked, date_selection, date_next_selection):
picked_name = picked["profile"]["name"] if picked else None
picked_url = picked["file_url"] if picked else None
picked_path = picked["full_path"] if picked else None
picked_nsfw = picked["nsfw"] if picked else None
picked_index = (self.currentIndexCount % len(self.settings_post)) - 1
next_selection_seconds = max(0, int(time_diff_seconds(date_next_selection, date_selection)))
print("[Profile]", picked_name, "[Index]", picked_index)
print(picked_url)
print("Explicit:", picked_nsfw)
print("Selection time: {}".format(
date_selection.strftime(self.settings_time["long_date_format"])) )
print("Next selection time: {} ({} seconds)".format(
date_next_selection.strftime(self.settings_time["long_date_format"]), next_selection_seconds) )
print("[ {} Selected during session | {} Failed ]\n".format(
self.currentSessionCount, self.failed_uploads) )
# Returns a list of media paths (without the hashes)
def download_media(self, picked_profile):
try:
backend_s = picked_profile["backend"]
backend = importlib.import_module(backend_s)
username = self.settings_credentials[backend_s]["username"]
password = self.settings_credentials[backend_s]["password"]
img = None
downloader = backend.downloader(username, password, tmp=self.settings_behavior["tmp_dir"])
img = downloader.fetch_post(picked_profile)
if img is None:
raise InvalidPost("Img could not be downloaded")
return downloader.download_post(img)
except ImportError:
print("Invalid Backend:", picked_profile["backend"])
return None
# Returns a list of tuples that contain the media list path and media mastodon dictionary
def upload_media_list(self, path_list):
if self.debug_mode:
return tuple()
media_list = ((ele, self.mastodon_api.media_post(ele, description=os.path.basename(ele))) for ele in path_list)
return media_list
def get_post_text(self, picked, media_list):
content_type = self.settings_behavior["content_type"]
content_newline = self.settings_behavior["content_newline"]
nsfw = picked["nsfw"]
message = picked["profile"]["message_nsfw"] if nsfw else picked["profile"]["message"]
static_message = content_newline.join(message)
string_post = ""
string_imglinks = []
if media_list and self.settings_behavior["post_image_link"]:
for ele in media_list:
path, media = ele
if path is None or media is None:
continue
elif content_type == "text/markdown" and not self.debug_mode:
string_imglinks.append(
"[{}]({})".format(os.path.basename(path), media["url"])
)
else:
string_imglinks.append(media["url"])
# Join non empty strings with a newline character
string_imglinks_joined = content_newline.join(filter(None, string_imglinks))
string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined)))
return content_type, string_post
def valid_mimetype(self, picked):
full_path = picked["full_path"]
mime = magic.from_file(full_path, mime=True)
if mime is None:
return False
mime_category = mime.split("/", 1)[0]
return mime_category in ("image", "video")
def _post(self, picked):
# Validate picked
if picked is None:
raise InvalidPost("Picked post is None")
if not os.path.isfile(picked["full_path"]):
raise FileNotFoundError("File not found: {}".format(picked))
elif not self.valid_mimetype(picked):
raise InvalidMimeType("Invalid mime type")
media_list = self.upload_media_list([picked["full_path"]])
content_type, message = self.get_post_text(picked, media_list)
if self.debug_mode:
return picked
self.mastodon_api.status_post(
message,
media_ids=[i[1] for i in media_list if len(i) == 2],
visibility=self.settings_behavior["visibility"],
sensitive=picked["nsfw"],
content_type=content_type
)
return picked
def pick_profile(self):
profiles = self.settings_post
tag_select = self.settings_behavior["tag_select"].lower()
pick_index = None
if tag_select == "random":
pick_index = random.randint(0, len(profiles) - 1)
elif tag_select == "sequential":
pick_index = self.currentIndexCount % len(profiles)
return profiles[pick_index]
# The main post function
# This function is responsible for incrementing self.currentSessionCount, as well as posting and blacklisting the
# picked item.
#
# It is also responsible for removing the picked item from self.listPictures, which can be accomplished by simply
# popping it at index 0. This should handle any error that might occur while posting.
#
# This function should return 'None' if a post failed, and the picked item from self.listPictures if it succeeded.
def post(self):
picked = None
# Attempt post
try:
# Post
picked_profile = self.pick_profile()
picked = self.download_media(picked_profile)
self._post(picked)
os.remove(picked["full_path"])
# After a successful post
self.currentSessionCount += 1
self.currentIndexCount += 1
# The post was successful
return picked
# Invalid post (move to next profile)
except InvalidPost as e:
print("Invalid post:", e)
# Failed post
except (InvalidMimeType, FileNotFoundError) as e:
# Decrement currentIndexCount to repost from the same profile
self.currentIndexCount -= 1
print("Posting error:", e)
# Check if the file limit has been reached
except MastodonAPIError as e:
# Check if the file limit has been reached (413 error)
file_limit_reached = False
with contextlib.suppress(IndexError):
file_limit_reached = (e.args[1] == 413)
if file_limit_reached:
self.currentIndexCount -= 1
print("API Error:", e)
# Server Errors
# Assume all exceptions are on the server side
# If the connection is timing out it could be for two reasons:
# 1. The error was caused by the user attempting to upload a large file over a slow connection:
# a. FIX: Reduce settings_behavior["max_size"]
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
# mastodon.py API will not specify why the connection timed out).
# The default assumption is #2
except Exception as e:
print("Unhandled Exception:", e)
# An exception occurred
self.failed_uploads += 1
self.currentIndexCount += 1
print("[Errors: {}]".format(self.failed_uploads))
# Sleep
self.eventSleep.wait(self.settings_behavior["retry_seconds"])
# The post failed
return None
def schedule_next_post(self):
self.dateSelection = self.dateNextSelection
self.dateNextSelection = time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"])
# Will wait between the current time and the time of next selection
def wait_future_time(self):
seconds = time_diff_seconds(self.dateNextSelection, datetime.datetime.now())
self.eventSleep.wait(max(0, seconds))
# [BEGIN THE PROGRAM]
def prime_bot(self):
if not self.debug_mode:
self.decrypt_settings()
if self.can_post():
self.login()
self.primed = True
def start(self, delay=0):
# Prime bot if not already primed.
if not self.primed:
self.prime_bot()
# Early out if the bot is incapable of posting
if not self.can_post():
print("Bot is incapable of posting!!")
return 1
start_time = self.dateSelection
delay_seconds = max(time_diff_seconds(self.dateNextSelection, start_time) + delay, delay)
delay_time = time_add_seconds(start_time, delay_seconds)
# Print the first image in the list if a delay or pretimer is set
if delay_seconds:
self.print_header_stats(None, start_time, delay_time)
# The delay parameter is different from the dateSelection and dateSelectionNext
# It will literally time out the bot for a given number of seconds regardless of the pre-timer setting
# This is useful if you want to set a delay of 30 seconds, before back-posting several images
self.eventSleep.wait(max(0, delay))
# Check if the pre-timer is set
# dateNextSelection should be greater than dateSelection if it is
# dateSelection and dateNextSelection are both set to the current time when the bot is initialized
if delay_seconds:
self.wait_future_time()
# Begin posting
self.main_loop()
# Return 1 if there are still pictures in the picture list
return 0
# End Conditions:
# 1. User presses Ctrl+C
# 2. There is nothing left in the picture list to select from
# 3. settings_behavior["uploads_per_post"] is less than one for some reason
def can_post(self):
return not self.eventSleep.is_set() and self.settings_behavior["uploads_per_post"] > 0
def main_loop(self):
target_posts = self.settings_behavior["uploads_per_post"]
while self.can_post():
successful_posts = 0
while (successful_posts < target_posts) and self.can_post():
last_picked = self.post()
successful_posts += int(last_picked is not None)
if successful_posts >= target_posts:
self.schedule_next_post()
self.print_header_stats(last_picked, self.dateSelection, self.dateNextSelection)
else:
self.print_header_stats(last_picked, self.dateNextSelection, self.dateNextSelection)
if self.can_post():
self.wait_future_time()
# ------------------------------- TIME FUNCTIONS ---------------------------------------------
def time_add_seconds(dt, seconds):
return dt + datetime.timedelta(0, seconds)
def time_diff_seconds(d1, d2):
return (d1-d2).total_seconds()
# Custom Exceptions for YandereBot
class Debug(Exception):
pass
class InvalidPost(Exception):
pass
class InvalidMimeType(Exception):
pass
class FailedLogin(Exception):
pass
class BadCfgFile(Exception):
pass
class MissingMasterList(Exception):
pass