268 lines
7.8 KiB
Python
268 lines
7.8 KiB
Python
#! /usr/bin/env python3
|
|
|
|
# Yandere Lewd Bot, an image posting bot for Pleroma
|
|
# Copyright (C) 2022 Anon <yanderefedi@proton.me>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import os
|
|
from threading import Event
|
|
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonVersionError
|
|
|
|
class YandereBot:
|
|
# From the threading library. Is responsible for putting the bot to sleep, and exiting when the user quits (Ctrl+C)
|
|
eventSleep = Event()
|
|
|
|
# The configuration module
|
|
cfg = None
|
|
|
|
# The below settings are required from the configuration module
|
|
settings = {
|
|
"settings_server": dict(),
|
|
"settings_behavior": dict(),
|
|
"settings_encrypt": dict()
|
|
}
|
|
|
|
# Class variables
|
|
mastodon_api = None
|
|
failed_uploads = 0
|
|
consecutive_failed_uploads = 0
|
|
currentSessionCount = 0
|
|
debug_mode = False
|
|
primed = False
|
|
decrypted = False
|
|
|
|
# YandereBot.__init__()
|
|
# @param cfg A dynamically loaded python module. See yanlib.module_load() for an example
|
|
# @param keyfile Keyfile to decrypt settings_post
|
|
# @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma)
|
|
def __init__(self, cfg, keyfile=None, debug_mode=False):
|
|
settings = self.settings
|
|
self.cfg = cfg
|
|
self.load_settings(self.cfg)
|
|
self.debug_mode = debug_mode or settings["settings_behavior"]["debug"]
|
|
settings["settings_encrypt"]["keyfile"] = keyfile or settings["settings_encrypt"]["keyfile"]
|
|
|
|
# Setup Exit Calls
|
|
def exit(self):
|
|
self.eventSleep.set()
|
|
|
|
# Decryption settings
|
|
# Used to set class attributes with the same name to the value specified in the config file.
|
|
def load_settings(self, cfg):
|
|
try:
|
|
for key in self.settings:
|
|
self.settings[key] = getattr(cfg, key)
|
|
|
|
except AttributeError as e:
|
|
print(e)
|
|
raise BadCfgFile
|
|
|
|
def decrypt_settings(self):
|
|
settings = self.settings
|
|
settings_encrypt = settings["settings_encrypt"]
|
|
settings_server = settings["settings_server"]
|
|
if settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode:
|
|
import FediBotEncryption
|
|
try:
|
|
self.settings["settings_server"] = FediBotEncryption.settings_server_decrypt(
|
|
settings_server, settings_encrypt, settings_encrypt["keyfile"])
|
|
except FediBotEncryption.EncryptionFail as e:
|
|
raise BadCfgFile(str(e))
|
|
self.decrypted = True
|
|
|
|
# Login to Pleroma
|
|
def login(self):
|
|
skip_login = self.debug_mode or self.mastodon_api is not None
|
|
if skip_login:
|
|
return
|
|
if not self.decrypted:
|
|
self.decrypt_settings()
|
|
try:
|
|
settings = self.settings
|
|
settings_server = settings["settings_server"]
|
|
settings_behavior = settings["settings_behavior"]
|
|
|
|
self.mastodon_api = Mastodon(
|
|
client_id=settings_server["client_id"],
|
|
client_secret=settings_server["client_secret"],
|
|
access_token=settings_server["access_token"],
|
|
api_base_url=settings_server["api_base_url"],
|
|
feature_set=settings_behavior["feature_set"] # <--- Necessary for Mastodon Version 1.5.1
|
|
)
|
|
except (MastodonIllegalArgumentError, MastodonVersionError) as e:
|
|
print(e)
|
|
raise FailedLogin
|
|
|
|
def upload_media_list_validate(self, path_list):
|
|
for path in path_list:
|
|
if path is None or not os.path.isfile(path):
|
|
raise FileNotFoundError("Could not upload: {}".format(path))
|
|
|
|
|
|
# Returns a list of tuples that contain the media list path and media mastodon dictionary
|
|
def upload_media_list(self, path_list):
|
|
self.upload_media_list_validate(path_list)
|
|
|
|
media_list = []
|
|
for path in path_list:
|
|
if not self.debug_mode:
|
|
media = self.mastodon_api.media_post(path, description=os.path.basename(path))
|
|
media_dict = {
|
|
"path": path,
|
|
"media": media
|
|
}
|
|
media_list.append(media_dict)
|
|
return media_list
|
|
|
|
def get_post_text(self, message, media_list):
|
|
settings = self.settings
|
|
settings_behavior = settings["settings_behavior"]
|
|
|
|
content_type = settings_behavior["content_type"]
|
|
content_newline = settings_behavior["content_newline"]
|
|
static_message = content_newline.join(message)
|
|
string_post = ""
|
|
string_imglinks = []
|
|
|
|
if media_list and settings_behavior["post_image_link"]:
|
|
for media_dict in media_list:
|
|
path = media_dict["path"]
|
|
media = media_dict["media"]
|
|
|
|
if path is None or media is None:
|
|
continue
|
|
elif content_type == "text/markdown" and not self.debug_mode:
|
|
string_imglinks.append(
|
|
"[{}]({})".format(os.path.basename(path), media["url"])
|
|
)
|
|
else:
|
|
string_imglinks.append(media["url"])
|
|
|
|
# Join non empty strings with a newline character
|
|
string_imglinks_joined = content_newline.join(filter(None, string_imglinks))
|
|
string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined)))
|
|
|
|
return string_post
|
|
|
|
def _post(self, picked):
|
|
settings = self.settings
|
|
settings_behavior = settings["settings_behavior"]
|
|
|
|
if not picked:
|
|
raise InvalidPost("Picked cannot be None")
|
|
if not picked["media_list"]:
|
|
raise InvalidPost("Media list is empty")
|
|
if self.debug_mode:
|
|
return picked
|
|
media_list = self.upload_media_list(picked["media_list"])
|
|
message = self.get_post_text(picked["message"], media_list)
|
|
self.mastodon_api.status_post(
|
|
message,
|
|
media_ids=[i["media"] for i in media_list],
|
|
visibility=settings_behavior["visibility"],
|
|
sensitive=picked["spoiler"],
|
|
content_type=settings_behavior["content_type"]
|
|
)
|
|
return picked
|
|
|
|
def pick(self):
|
|
return None
|
|
|
|
def after_post(self, picked):
|
|
return picked
|
|
|
|
def handle_post_exception(self):
|
|
# An exception occurred
|
|
self.failed_uploads += 1
|
|
self.consecutive_failed_uploads += 1
|
|
|
|
# The main post function
|
|
# This funciton is responsible for picking, posting, and blacklisting an image in listPictures
|
|
#
|
|
# It is also responsible for removing the picked item from self.listPictures, which can be accomplished by simply
|
|
# popping it at index 0. This should handle any error that might occur while posting.
|
|
#
|
|
# This function should return 'None' if a post failed, and the picked item from self.listPictures if it succeeded.
|
|
def post(self, callback=None):
|
|
# Post
|
|
_picked = callback() if callable(callback) else self.pick()
|
|
self._post(_picked)
|
|
|
|
# After a successful post
|
|
self.currentSessionCount += 1
|
|
self.consecutive_failed_uploads = 0
|
|
|
|
# The post was successful
|
|
return _picked
|
|
|
|
# [BEGIN THE PROGRAM]
|
|
def prime_bot(self):
|
|
if self.primed:
|
|
return
|
|
if not self.debug_mode:
|
|
self.decrypt_settings()
|
|
self.login()
|
|
self.primed = True
|
|
|
|
def start(self):
|
|
self.prime_bot()
|
|
|
|
# Begin posting
|
|
self.main_loop()
|
|
|
|
return 0
|
|
|
|
# End Conditions:
|
|
# 1. User presses Ctrl+C
|
|
# 2. There is nothing left in the picture list to select from
|
|
# 3. settings_behavior["uploads_per_post"] is less than uploads_per_post
|
|
# 4. Consecutive failed uploads is less than max_errors
|
|
def can_post(self):
|
|
settings = self.settings["settings_behavior"]
|
|
return (
|
|
not self.eventSleep.is_set() and
|
|
self.currentSessionCount < settings["uploads_per_post"] and
|
|
self.consecutive_failed_uploads < settings["max_errors"]
|
|
)
|
|
|
|
def main_loop(self):
|
|
sleep_seconds = self.settings["settings_behavior"]["retry_seconds"]
|
|
while self.can_post():
|
|
picked = self.post()
|
|
self.after_post(picked)
|
|
if self.can_post():
|
|
self.eventSleep.wait(sleep_seconds)
|
|
|
|
|
|
|
|
# Custom Exceptions for YandereBot
|
|
class Debug(Exception):
|
|
pass
|
|
|
|
|
|
class InvalidPost(Exception):
|
|
pass
|
|
|
|
|
|
class FailedLogin(Exception):
|
|
pass
|
|
|
|
|
|
class BadCfgFile(Exception):
|
|
pass
|
|
|
|
|
|
|