Compare commits

..

No commits in common. "fe00f1d635acbcf3199f6c6d465e897625add446" and "76e507cf67435f8e04c6d8f984c069ccc8a45038" have entirely different histories.

3 changed files with 400 additions and 161 deletions

42
.gitignore vendored
View File

@ -1,13 +1,31 @@
# Ignore Everything
*
# User preferance files
/rsc*
/md5/
/src/cfg*.py
# Exception
!/default/*
!/docs/*
!/render/*
!/src/main.py
!/.gitignore
!/LICENSE.txt
!/README.md
!/requirements.txt
!/run.sh
# Virtual environment
/venv*
/src/__pycache__/
# PyCharm IDE folder
/.idea/
# Editor files
/*~
/*#
# Automation scripts + odds and ends
/notes.txt
/compile.sh
/upload.sh
/balance_2.csv
/DEFAULT.csv
/generate_random.sh
/src/make_master_template.py
/src/make_master_list.py
/src/hash_path_exists.py
/utils/
/cfg_availible/
/profile/
/gnu_header.txt
/backup*

View File

@ -16,154 +16,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import sys
import argparse
import signal
import random
import datetime
import subprocess
import FediBot
from mastodon import MastodonAPIError
# A class that contains all of the rendering information for a single post
class PickAndRenderFrame:
# From Config File
profile_name = ""
path = ""
frames = 0
skip = tuple()
nsfw = tuple()
output_name = ""
message = ""
message_nsfw = ""
render_script = ""
# After pick
picked_frame = None
output_name_tr = ""
def __init__(self, picked, datetime_format):
dt_now = datetime.datetime.now()
dt_now_str = datetime.datetime.strftime(dt_now, datetime_format)
self.profile_name = picked["profile_name"]
self.path = picked["path"]
self.frames = picked["frames"]
self.skip = picked["skip"]
self.nsfw = picked["nsfw"]
self.output_name = picked["output_name"]
self.output_name_tr = self.output_name
self.message = picked["message"]
self.message_nsfw = picked["message_nsfw"]
self.render_script = picked["render_script"]
self.picked_frame = self._pick_frame()
# Shell-like substitutions
translations = {
"profile_name": self.profile_name,
"frame": str(self.picked_frame),
"datetime": dt_now_str
}
self.output_name_tr = self._translate_basename(translations)
self._render_frame()
def __del__(self):
render_file = self.output_name_tr
if render_file and os.path.isfile(render_file):
os.remove(render_file)
def _pick_frame(self):
picked_frame = None
while picked_frame is None:
picked_frame = random.random() * self.frames
for skip in self.skip:
begin, end = skip
if begin <= picked_frame <= end:
picked_frame = None
break
return picked_frame
def is_nsfw(self):
for nsfw in self.nsfw:
begin, end = nsfw
if begin <= self.picked_frame <= end:
return True
return False
def _translate_basename(self, translations):
output_name_tr = self.output_name
for k,v in translations.items():
replace_token = "${" + k + "}"
output_name_tr = output_name_tr.replace(replace_token, v)
return output_name_tr
# TODO: Add translation keywords to the message
def get_message(self):
return self.message_nsfw if self.is_nsfw() else self.message
def _render_frame(self):
args = [
self.render_script,
self.path,
self.output_name_tr,
str(self.picked_frame)
]
subprocess.run(args)
class YandereBot(FediBot.YandereBot):
def __init__(self, cfg, keyfile=None, debug_mode=False):
settings = {
"settings_time": {},
"settings_post": {},
}
self.settings.update(settings)
super(YandereBot, self).__init__(cfg, keyfile, debug_mode)
# Maybe I should remove this from the backend?
def print_header_stats(self, picked):
profile, frame, nsfw, path = None, None, None, None
if picked:
profile = picked.profile_name
frame = picked.picked_frame
nsfw = picked.is_nsfw()
path = picked.output_name_tr
print("Profile: {} | Frame: {} | NSFW: {} | Path: {}".format(
profile, frame, nsfw, path
))
def pick(self):
dt_picked = self.settings["settings_time"]["datetime"]
picked_profile = random.choice(self.settings["settings_post"])
picked = PickAndRenderFrame(picked_profile, dt_picked)
media_list = [picked.output_name_tr]
spoiler = picked.is_nsfw()
message = picked.get_message()
return {
"picked": picked,
"media_list": media_list,
"spoiler": spoiler,
"message": message
}
def after_pick(self, picked):
self.print_header_stats(picked["picked"])
def post(self, picked):
try:
super(YandereBot, self).post(picked)
except (FileNotFoundError, MastodonAPIError, FediBot.InvalidPost, Exception) as e:
print("Exception:", e)
self.handle_post_exception()
# The post failed
return None
import yandere_bot
class FailedToLoadCfg(Exception):
@ -201,7 +57,7 @@ def main():
# Flag if the bot is running in debug mode
debug_mode = arguments.dry_run or arguments.debug
yandere = YandereBot(
yandere = yandere_bot.YandereBot(
yandere_config,
arguments.keyfile,
debug_mode
@ -228,9 +84,9 @@ if __name__ == "__main__":
except FailedToLoadCfg:
sys.exit(10)
# Exceptions raised from the bot
except FediBot.Debug:
except yandere_bot.Debug:
sys.exit(6)
except FediBot.BadCfgFile:
except yandere_bot.BadCfgFile:
sys.exit(4)
except FediBot.FailedLogin:
except yandere_bot.FailedLogin:
sys.exit(2)

365
src/yandere_bot.py Normal file
View File

@ -0,0 +1,365 @@
#! /usr/bin/env python3
# Mirai Nikki Bot, a video frame posting bot for Pleroma
# Copyright (C) 2022 Anon <yanderefedi@proton.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import datetime
import random
import subprocess
from threading import Event
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
# A class that contains all of the rendering information for a single post
class PickAndRenderFrame:
# From Config File
profile_name = ""
path = ""
frames = 0
skip = tuple()
nsfw = tuple()
output_name = ""
message = ""
message_nsfw = ""
render_script = ""
# After pick
picked_frame = None
output_name_tr = ""
def __init__(self, picked, datetime_format):
dt_now = datetime.datetime.now()
dt_now_str = datetime.datetime.strftime(dt_now, datetime_format)
self.profile_name = picked["profile_name"]
self.path = picked["path"]
self.frames = picked["frames"]
self.skip = picked["skip"]
self.nsfw = picked["nsfw"]
self.output_name = picked["output_name"]
self.output_name_tr = self.output_name
self.message = picked["message"]
self.message_nsfw = picked["message_nsfw"]
self.render_script = picked["render_script"]
self.picked_frame = self._pick_frame()
# Shell-like substitutions
translations = {
"profile_name": self.profile_name,
"frame": str(self.picked_frame),
"datetime": dt_now_str
}
self.output_name_tr = self._translate_basename(translations)
self._render_frame()
def __del__(self):
render_file = self.output_name_tr
if render_file and os.path.isfile(render_file):
os.remove(render_file)
def _pick_frame(self):
picked_frame = None
while picked_frame is None:
picked_frame = random.random() * self.frames
for skip in self.skip:
begin, end = skip
if begin <= picked_frame <= end:
picked_frame = None
break
return picked_frame
def is_nsfw(self):
for nsfw in self.nsfw:
begin, end = nsfw
if begin <= self.picked_frame <= end:
return True
return False
def _translate_basename(self, translations):
output_name_tr = self.output_name
for k,v in translations.items():
replace_token = "${" + k + "}"
output_name_tr = output_name_tr.replace(replace_token, v)
return output_name_tr
# TODO: Add translation keywords to the message
def get_message(self):
return self.message_nsfw if self.is_nsfw() else self.message
def _render_frame(self):
args = [
self.render_script,
self.path,
self.output_name_tr,
str(self.picked_frame)
]
subprocess.run(args)
class YandereBot:
# From the threading library. Is responsible for putting the bot to sleep, and exiting when the user quits (Ctrl+C)
eventSleep = Event()
# The configuration module
cfg = None
# The below settings are required from the configuration module
settings_server = None
settings_behavior = None
settings_time = None
settings_encrypt = None
# Class variables
mastodon_api = None
failed_uploads = 0
consecutive_failed_uploads = 0
currentSessionCount = 0
debug_mode = False
primed = False
decrypted = False
# YandereBot.__init__()
# @param cfg A dynamically loaded python module
# @param keyfile Keyfile to decrypt settings_post
# @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma)
# @prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post)
def __init__(self, cfg, keyfile=None, debug_mode=False, prime_bot=True):
self.cfg = cfg
self.load_settings(self.cfg)
self.debug_mode = debug_mode or self.settings_behavior["debug"]
self.settings_encrypt["keyfile"] = keyfile or self.settings_encrypt["keyfile"]
if prime_bot:
self.prime_bot()
# Setup Exit Calls
def exit(self):
self.eventSleep.set()
# Decryption settings
# Used to set class attributes with the same name to the value specified in the config file.
def load_settings(self, cfg, settings=None):
try:
default_settings = (
"settings_server",
"settings_behavior",
"settings_time",
"settings_encrypt"
)
_settings = settings or default_settings
for ele in _settings:
setattr(self, ele, getattr(cfg, ele))
except AttributeError as e:
print(e)
raise BadCfgFile
def decrypt_settings(self):
if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode:
import encryption
try:
self.settings_server = encryption.settings_server_decrypt(
self.settings_server, self.settings_encrypt, self.settings_encrypt["keyfile"])
except encryption.EncryptionFail as e:
raise BadCfgFile(str(e))
self.decrypted = True
# Login to Pleroma
def login(self):
skip_login = self.debug_mode or self.mastodon_api is not None
if skip_login:
return
if not self.decrypted:
self.decrypt_settings()
try:
self.mastodon_api = Mastodon(
client_id=self.settings_server["client_id"],
client_secret=self.settings_server["client_secret"],
access_token=self.settings_server["access_token"],
api_base_url=self.settings_server["api_base_url"],
feature_set=self.settings_behavior["feature_set"] # <--- Necessary for Mastodon Version 1.5.1
)
except (MastodonIllegalArgumentError, MastodonVersionError) as e:
print(e)
raise FailedLogin
# Maybe I should remove this from the backend?
def print_header_stats(self, picked):
profile, frame, nsfw, path = None, None, None, None
if picked:
profile = picked.profile_name
frame = picked.picked_frame
nsfw = picked.is_nsfw()
path = picked.output_name_tr
print("Profile: {} | Frame: {} | NSFW: {} | Path: {}".format(
profile, frame, nsfw, path
))
# Returns a list of tuples that contain the media list path and media mastodon dictionary
def upload_media_list(self, path_list):
media_list = []
for path in path_list:
if not os.path.isfile(path):
raise FileNotFoundError("Could not upload: {}".format(path))
for path in path_list:
if not self.debug_mode:
media = self.mastodon_api.media_post(path, description=os.path.basename(path))
media_dict = {
"path": path,
"media": media
}
media_list.append(media_dict)
return media_list
def get_post_text(self, yandere_frame, media_list):
content_type = self.settings_behavior["content_type"]
content_newline = self.settings_behavior["content_newline"]
static_message = content_newline.join(yandere_frame.get_message())
string_post = ""
string_imglinks = []
if media_list and self.settings_behavior["post_image_link"]:
for media_dict in media_list:
path = media_dict["path"]
media = media_dict["media"]
if path is None or media is None:
continue
elif content_type == "text/markdown" and not self.debug_mode:
string_imglinks.append(
"[{}]({})".format(os.path.basename(path), media["url"])
)
else:
string_imglinks.append(media["url"])
# Join non empty strings with a newline character
string_imglinks_joined = content_newline.join(filter(None, string_imglinks))
string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined)))
return string_post
def _post(self, yandere_frame):
if not yandere_frame:
raise InvalidPost("Frame is none")
upload_list = [yandere_frame.output_name_tr]
media_list = self.upload_media_list(upload_list)
message = self.get_post_text(yandere_frame, media_list)
if self.debug_mode:
return yandere_frame
self.mastodon_api.status_post(
message,
media_ids=[i["media"] for i in media_list],
visibility=self.settings_behavior["visibility"],
sensitive=yandere_frame.is_nsfw(),
content_type=self.settings_behavior["content_type"]
)
return yandere_frame
# The main post function
# This funciton is responsible for picking a profile, generate a screenshot, and posting it.
#
# This function should return 'None' if a post failed, and the picked item from self.listPictures if it succeeded.
def post(self):
picked = None
# Attempt post
try:
# Post
dt_picked = self.settings_time["datetime"]
picked_profile = random.choice(self.cfg.settings_post)
picked = PickAndRenderFrame(picked_profile, dt_picked)
self._post(picked)
# After a successful post
self.currentSessionCount += 1
self.consecutive_failed_uploads = 0
# The post was successful
return picked
# Server Errors and other general exceptions
# Assume all exceptions are on the server side (besides FileNotFoundError of course)
# If the connection is timing out it could be for two reasons:
# 1. The error was caused by the user attempting to upload a large file over a slow connection:
# a. FIX: Reduce settings_behavior["max_size"]
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
# mastodon.py API will not specify why the connection timed out).
# 3. Failed to generate screenshot
# 4. Other general exceptions
except (FileNotFoundError, MastodonAPIError, InvalidPost, Exception) as e:
print("Exception:", e)
# An exception occurred
self.failed_uploads += 1
self.consecutive_failed_uploads += 1
# The post failed
return None
# [BEGIN THE PROGRAM]
def prime_bot(self):
random.seed(os.urandom(16))
if not self.debug_mode:
self.decrypt_settings()
self.login()
self.primed = True
def start(self, delay=0):
# Prime bot if not already primed.
if not self.primed:
self.prime_bot()
# Begin posting
self.main_loop()
# Return 1 if there are still pictures in the picture list
return 0
# End Conditions:
# 1. User presses Ctrl+C
# 2. settings_behavior["uploads_per_post"] is less than uploads_per_post
# 3. Consecutive failed uploads is less than max_errors
def can_post(self):
return (
not self.eventSleep.is_set() and
self.currentSessionCount < self.settings_behavior["uploads_per_post"] and
self.consecutive_failed_uploads < self.settings_behavior["max_errors"]
)
def main_loop(self):
sleep_seconds = self.settings_behavior["retry_seconds"]
while self.can_post():
picked = self.post()
self.print_header_stats(picked)
if self.can_post():
self.eventSleep.wait(sleep_seconds)
# Custom Exceptions for YandereBot
class Debug(Exception):
pass
class FailedLogin(Exception):
pass
class BadCfgFile(Exception):
pass
class InvalidPost(Exception):
pass