Compare commits
No commits in common. "253a003e2ae5eec668ac460e7db2a5455d7bc03d" and "d9336fda6efbc7ef0c2ceaecfebaf60599e0e4f3" have entirely different histories.
253a003e2a
...
d9336fda6e
48
.gitignore
vendored
48
.gitignore
vendored
@ -1,13 +1,37 @@
|
|||||||
# Ignore Everything
|
# User preferance files
|
||||||
*
|
/rsc*/
|
||||||
|
/md5/
|
||||||
|
/src/cfg*.py
|
||||||
|
/log.txt
|
||||||
|
|
||||||
# Exception
|
# Virtual environment
|
||||||
!/default/*
|
/venv*
|
||||||
!/docs/*
|
/src/__pycache__/
|
||||||
!/src/main.py
|
|
||||||
!/src/yanlib.py
|
# PyCharm IDE folder
|
||||||
!/.gitignore
|
/.idea/
|
||||||
!/LICENSE.txt
|
|
||||||
!/README.md
|
# Editor files
|
||||||
!/requirements.txt
|
/*~
|
||||||
!/run.sh
|
/*#
|
||||||
|
|
||||||
|
# Automation scripts + odds and ends
|
||||||
|
/notes.txt
|
||||||
|
/compile.sh
|
||||||
|
/upload.sh
|
||||||
|
/balance*.csv
|
||||||
|
/balance*.ods
|
||||||
|
/default*.csv
|
||||||
|
/DEFAULT.csv
|
||||||
|
/generate_random.sh
|
||||||
|
/print_1_txt.sh
|
||||||
|
/get_weight.sh
|
||||||
|
/src/make_master_template.py
|
||||||
|
/src/make_master_list.py
|
||||||
|
/src/hash_path_exists.py
|
||||||
|
/utils/
|
||||||
|
/cfg_availible/
|
||||||
|
/profile/
|
||||||
|
/gnu_header.txt
|
||||||
|
/backup*
|
||||||
|
/resum_default.sh
|
||||||
|
217
src/encryption.py
Executable file
217
src/encryption.py
Executable file
@ -0,0 +1,217 @@
|
|||||||
|
#! /usr/bin/env python3
|
||||||
|
|
||||||
|
# Yandere Lewd Bot, an image posting bot for Pleroma
|
||||||
|
# Copyright (C) 2022 Anon <yanderefedi@proton.me>
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import base64
|
||||||
|
import os
|
||||||
|
import getpass
|
||||||
|
from cryptography.fernet import Fernet, InvalidToken
|
||||||
|
from cryptography.hazmat.backends import default_backend
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||||
|
|
||||||
|
from collections import OrderedDict
|
||||||
|
|
||||||
|
|
||||||
|
class EncryptionFail(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class PasswordMismatch(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# Return string from bytes
|
||||||
|
def salt_encode(b):
|
||||||
|
return base64.urlsafe_b64encode(b).decode()
|
||||||
|
|
||||||
|
|
||||||
|
# Return bytes from string
|
||||||
|
def salt_decode(s):
|
||||||
|
return base64.urlsafe_b64decode(s.encode())
|
||||||
|
|
||||||
|
|
||||||
|
# Ordered Dictionaries
|
||||||
|
def change_encoding_dict(settings_server, encoding_type):
|
||||||
|
return OrderedDict([(k, encoding_type(v)) for k, v in settings_server.items()])
|
||||||
|
|
||||||
|
|
||||||
|
# Return bytes from string
|
||||||
|
def encode_dict(settings_server):
|
||||||
|
return change_encoding_dict(settings_server, str.encode)
|
||||||
|
|
||||||
|
|
||||||
|
# Return string from bytes
|
||||||
|
def decode_dict(settings_server):
|
||||||
|
return change_encoding_dict(settings_server, bytes.decode)
|
||||||
|
|
||||||
|
|
||||||
|
# password: Bytes
|
||||||
|
# salt: Bytes
|
||||||
|
def derive_key(password, salt):
|
||||||
|
kdf = PBKDF2HMAC(
|
||||||
|
algorithm=hashes.SHA256(),
|
||||||
|
length=32,
|
||||||
|
salt=salt,
|
||||||
|
iterations=100000,
|
||||||
|
backend=default_backend()
|
||||||
|
)
|
||||||
|
return base64.urlsafe_b64encode(kdf.derive(password))
|
||||||
|
|
||||||
|
|
||||||
|
# Encryption functions
|
||||||
|
# message: Bytes
|
||||||
|
# key: Bytes
|
||||||
|
def encrypt(message, key):
|
||||||
|
f = Fernet(key)
|
||||||
|
token = f.encrypt(message)
|
||||||
|
return token
|
||||||
|
|
||||||
|
|
||||||
|
# token: Bytes
|
||||||
|
# key: Bytes
|
||||||
|
def decrypt(token, key):
|
||||||
|
f = Fernet(key)
|
||||||
|
message = f.decrypt(token)
|
||||||
|
return message
|
||||||
|
|
||||||
|
|
||||||
|
# password: bytes()
|
||||||
|
# salt: bytes()
|
||||||
|
# settings_server: dict() -> Byte values
|
||||||
|
# encryption_function: encrypt(message, key) : decrypt(token, key):
|
||||||
|
# Returns settings_server_decrypted dictionary with Byte() values. Will need to use
|
||||||
|
# ChangeEncodingDict to make them strings (recommended cfg file friendly)
|
||||||
|
def encrypt_settings(settings_server, password, salt, encryption_function):
|
||||||
|
key = derive_key(password, salt)
|
||||||
|
settings_server_decrypted = OrderedDict()
|
||||||
|
for setting in settings_server:
|
||||||
|
settings_server_decrypted[setting] = encryption_function(settings_server[setting], key)
|
||||||
|
return settings_server_decrypted
|
||||||
|
|
||||||
|
def get_keyfile(keyfile=None):
|
||||||
|
if keyfile is not None:
|
||||||
|
with open(keyfile, "rb") as f:
|
||||||
|
return f.read()
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_pass(q):
|
||||||
|
try:
|
||||||
|
return getpass.getpass(q).encode()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
raise EncryptionFail("\nQuitting...")
|
||||||
|
|
||||||
|
|
||||||
|
def settings_server_encrypt(settings_server, keyfile=None):
|
||||||
|
try:
|
||||||
|
settings_server = encode_dict(settings_server)
|
||||||
|
salt = os.urandom(16)
|
||||||
|
password = get_keyfile(keyfile)
|
||||||
|
if password is None:
|
||||||
|
password = get_pass("Enter password: ")
|
||||||
|
password2 = get_pass("Enter password: ")
|
||||||
|
if password != password2:
|
||||||
|
raise PasswordMismatch("Passwords do not match")
|
||||||
|
encrypted = encrypt_settings(settings_server, password, salt, encrypt)
|
||||||
|
return salt_encode(salt), decode_dict(encrypted)
|
||||||
|
except PasswordMismatch as e:
|
||||||
|
raise EncryptionFail(str(e))
|
||||||
|
except Exception as e:
|
||||||
|
err = str(e)
|
||||||
|
raise EncryptionFail("Encrypt Error: {}".format(err))
|
||||||
|
|
||||||
|
|
||||||
|
def settings_server_decrypt(settings_server, settings_encrypt, keyfile=None):
|
||||||
|
try:
|
||||||
|
if not settings_encrypt["encrypt"]:
|
||||||
|
return settings_server
|
||||||
|
settings_server = encode_dict(settings_server)
|
||||||
|
password = get_keyfile(keyfile or settings_encrypt["keyfile"]) or get_pass("Enter password: ")
|
||||||
|
salt = salt_decode(settings_encrypt["salt"])
|
||||||
|
decrypted = encrypt_settings(settings_server, password, salt, decrypt)
|
||||||
|
return decode_dict(decrypted)
|
||||||
|
except base64.binascii.Error:
|
||||||
|
raise EncryptionFail("Salt is invalid")
|
||||||
|
except InvalidToken:
|
||||||
|
raise EncryptionFail("Password or token is incorrect")
|
||||||
|
except Exception as e:
|
||||||
|
err = str(e)
|
||||||
|
raise EncryptionFail("Decrypt Error: {}".format(err))
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
import argparse
|
||||||
|
from pprint import pformat
|
||||||
|
|
||||||
|
default_cfg = "cfg"
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="A class to encrypt server credentials",
|
||||||
|
epilog="There are no additional parameters.",
|
||||||
|
add_help=True )
|
||||||
|
parser.add_argument("--encrypt", help="Generate encrypted authentication.", action="store_true")
|
||||||
|
parser.add_argument("--decrypt", help="Decrypt encrypted authentication", action="store_true")
|
||||||
|
parser.add_argument("--recrypt", help="Recrypt encrypted authentication", action="store_true")
|
||||||
|
parser.add_argument("-k", "--keyfile", help="Keyfile used for decryption", default=None)
|
||||||
|
parser.add_argument("-c", "--cfg", help="Specify config file.", default=default_cfg)
|
||||||
|
|
||||||
|
arguments = parser.parse_args()
|
||||||
|
|
||||||
|
if arguments.recrypt:
|
||||||
|
arguments.encrypt = True
|
||||||
|
arguments.decrypt = True
|
||||||
|
|
||||||
|
if arguments.encrypt or arguments.decrypt:
|
||||||
|
import importlib
|
||||||
|
cfg = importlib.import_module(arguments.cfg)
|
||||||
|
settings_server = cfg.settings_server
|
||||||
|
settings_encrypt = cfg.settings_encrypt
|
||||||
|
keyfile = arguments.keyfile or settings_encrypt["keyfile"]
|
||||||
|
|
||||||
|
if arguments.decrypt and arguments.encrypt:
|
||||||
|
print("Re-encrypting")
|
||||||
|
|
||||||
|
if arguments.decrypt: # arguments.decrypt
|
||||||
|
print("Decrypt...")
|
||||||
|
settings_server = settings_server_decrypt(settings_server, settings_encrypt, arguments.keyfile)
|
||||||
|
settings_encrypt = OrderedDict([
|
||||||
|
("encrypt", False),
|
||||||
|
("salt", settings_encrypt["salt"]),
|
||||||
|
("keyfile", arguments.keyfile)
|
||||||
|
])
|
||||||
|
|
||||||
|
if arguments.encrypt:
|
||||||
|
print("Encrypt...")
|
||||||
|
salt, settings_server = settings_server_encrypt(settings_server, keyfile)
|
||||||
|
settings_encrypt = OrderedDict([
|
||||||
|
("encrypt", True),
|
||||||
|
("salt", salt),
|
||||||
|
("keyfile", arguments.keyfile)
|
||||||
|
])
|
||||||
|
|
||||||
|
print("settings_server = {}".format(pformat(settings_server)))
|
||||||
|
print("settings_encrypt = {}".format(pformat(settings_encrypt)))
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
sys.exit(main())
|
||||||
|
except EncryptionFail as e:
|
||||||
|
print(e)
|
||||||
|
sys.exit(1)
|
212
src/main.py
212
src/main.py
@ -16,211 +16,11 @@
|
|||||||
# You should have received a copy of the GNU General Public License
|
# You should have received a copy of the GNU General Public License
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
import os
|
|
||||||
import sys
|
import sys
|
||||||
import argparse
|
import argparse
|
||||||
import signal
|
import signal
|
||||||
import FediBot
|
import yandere_bot
|
||||||
import contextlib
|
import contextlib
|
||||||
import yanlib
|
|
||||||
import fnmatch
|
|
||||||
from functools import reduce
|
|
||||||
from mastodon import MastodonAPIError
|
|
||||||
|
|
||||||
|
|
||||||
class BadPostSettings(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class MissingMasterList(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class YanBotHash(yanlib.HashObject):
|
|
||||||
_postSettings = None
|
|
||||||
|
|
||||||
def __init__(self, hash_obj, profile):
|
|
||||||
super(YanBotHash, self).__init__()
|
|
||||||
if hash_obj is None:
|
|
||||||
return
|
|
||||||
self._sHash = hash_obj.get_hash_string()
|
|
||||||
self._sBinaryChar = hash_obj.get_binary_char()
|
|
||||||
self._sPath = hash_obj.get_hash_path()
|
|
||||||
self._postSettings = profile
|
|
||||||
|
|
||||||
def get_post_setting(self):
|
|
||||||
return self._postSettings
|
|
||||||
|
|
||||||
# A callback function for get_list_of_hashes_with_profiles() that returns a single profile from @param hash_obj
|
|
||||||
# @param hash_obj A HashObject() (or subclass)
|
|
||||||
# @param profiles A list of available profiles to match
|
|
||||||
# @param profiles_default The default profile to return if no profile is matched
|
|
||||||
def get_profile(hash_obj, profiles, profiles_default):
|
|
||||||
profile_gen = (x for x in profiles if fnmatch.fnmatch(hash_obj.get_hash_path(), x["path"]))
|
|
||||||
return next(profile_gen, profiles_default)
|
|
||||||
|
|
||||||
|
|
||||||
# Takes a file path and transforms it into a list of YanBotHash() with the appropriate profile
|
|
||||||
# @param f_name Path of hash file
|
|
||||||
# @param profiles List of profiles -> self.settings_post
|
|
||||||
# @param profiles_default The default profile to apply
|
|
||||||
# @param callback_get_profile Callback function -> should return a single profile. Default: get_profile()
|
|
||||||
def get_list_of_hashes_with_profiles(f_name, profiles, profile_default):
|
|
||||||
return [YanBotHash(i, get_profile(i, profiles, profile_default)) for i in yanlib.get_hash_list(f_name)]
|
|
||||||
|
|
||||||
|
|
||||||
class YandereBot(FediBot.YandereBot):
|
|
||||||
listPictures = []
|
|
||||||
lenBlacklist = 0
|
|
||||||
|
|
||||||
def __init__(self, cfg, keyfile=None, debug_mode=False):
|
|
||||||
settings = {
|
|
||||||
"settings_time": {},
|
|
||||||
"settings_post": {},
|
|
||||||
"settings_post_default": {}
|
|
||||||
}
|
|
||||||
self.settings.update(settings)
|
|
||||||
super(YandereBot, self).__init__(cfg, keyfile, debug_mode)
|
|
||||||
|
|
||||||
# [BEGIN THE PROGRAM]
|
|
||||||
def prime_bot(self):
|
|
||||||
if self.primed:
|
|
||||||
return
|
|
||||||
self.load_picture_list()
|
|
||||||
self.validate_post_settings()
|
|
||||||
super(YandereBot, self).prime_bot()
|
|
||||||
|
|
||||||
# Make sure there are no profiles in listPictures set to none. Print the bad post and exit if there is.
|
|
||||||
def validate_post_settings(self):
|
|
||||||
bad_post_count = 0
|
|
||||||
for i in self.listPictures:
|
|
||||||
if i.get_post_setting() is None:
|
|
||||||
print("Bad post setting [{}]: {}".format(bad_post_count, i.get_full_string()))
|
|
||||||
bad_post_count += 1
|
|
||||||
if bad_post_count:
|
|
||||||
raise BadPostSettings
|
|
||||||
|
|
||||||
# Set up lists
|
|
||||||
def read_blacklist_files(self):
|
|
||||||
list_blacklist = []
|
|
||||||
for i in self.settings["settings_behavior"]["master_blacklist_r"]:
|
|
||||||
# It doesn't matter if the picture file doesn't exist
|
|
||||||
with contextlib.suppress(IOError):
|
|
||||||
list_blacklist.extend(yanlib.get_hash_list(i))
|
|
||||||
return list_blacklist
|
|
||||||
|
|
||||||
def blacklist(self, picked):
|
|
||||||
self.lenBlacklist += 1
|
|
||||||
for path in self.settings["settings_behavior"]["master_blacklist_w"]:
|
|
||||||
with open(path, "a") as f:
|
|
||||||
print(picked.get_full_string(), file=f)
|
|
||||||
|
|
||||||
# Returns a list of media paths (without the hashes)
|
|
||||||
def get_media_list(self, picked):
|
|
||||||
ext = self.settings["settings_behavior"]["multi_media_ext"]
|
|
||||||
if not picked:
|
|
||||||
return None
|
|
||||||
elif ext and os.path.splitext(picked.get_hash_path())[1].lower() == ext.lower():
|
|
||||||
return [i.get_hash_path() for i in yanlib.get_hash_list(picked.get_hash_path())]
|
|
||||||
else:
|
|
||||||
return [picked.get_hash_path()]
|
|
||||||
|
|
||||||
# load_pictures will return a list of YanHashObj() with a blacklist(s) applied
|
|
||||||
# @param list_blacklist A list of HashObjects() that are blacklist hashes
|
|
||||||
def load_pictures(self, list_blacklist):
|
|
||||||
if not self.settings["settings_behavior"]["master_list"]:
|
|
||||||
raise MissingMasterList
|
|
||||||
|
|
||||||
try:
|
|
||||||
list_pictures = reduce(lambda x, y: x + y,
|
|
||||||
[get_list_of_hashes_with_profiles(
|
|
||||||
f,
|
|
||||||
self.settings["settings_post"],
|
|
||||||
self.settings["settings_post_default"])
|
|
||||||
for f in self.settings["settings_behavior"]["master_list"]
|
|
||||||
])
|
|
||||||
return yanlib.get_hash_list_blacklist(list_pictures, list_blacklist, self.settings["settings_behavior"]["max_size"])
|
|
||||||
except IOError as e:
|
|
||||||
print(e)
|
|
||||||
raise MissingMasterList
|
|
||||||
|
|
||||||
def load_picture_list(self):
|
|
||||||
list_blacklist = self.read_blacklist_files()
|
|
||||||
self.listPictures = self.load_pictures(list_blacklist)
|
|
||||||
self.lenBlacklist = len(list_blacklist)
|
|
||||||
|
|
||||||
# Maybe I should remove this from the backend?
|
|
||||||
def print_header_stats(self, picked):
|
|
||||||
_picked = picked.get_full_string() if picked else None
|
|
||||||
picked_profile = picked.get_post_setting()["name"] if picked else None
|
|
||||||
picked_next = self.listPictures[0].get_full_string() if self.listPictures else None
|
|
||||||
picked_next_profile = self.listPictures[0].get_post_setting()["name"] if self.listPictures else None
|
|
||||||
|
|
||||||
print("Profile: {} | Picked: {} | Next_Profile: {} | Next_Pick: {}".format(
|
|
||||||
picked_profile, _picked, picked_next_profile, picked_next
|
|
||||||
))
|
|
||||||
|
|
||||||
def pick(self):
|
|
||||||
picked = self.listPictures.pop(0)
|
|
||||||
media_list = self.get_media_list(picked)
|
|
||||||
spoiler = picked.get_post_setting()["spoiler"]
|
|
||||||
message = picked.get_post_setting()["message"]
|
|
||||||
return {
|
|
||||||
"picked": picked,
|
|
||||||
"media_list": media_list,
|
|
||||||
"spoiler": spoiler,
|
|
||||||
"message": message,
|
|
||||||
}
|
|
||||||
|
|
||||||
def after_pick(self, picked):
|
|
||||||
self.blacklist(picked["picked"])
|
|
||||||
self.print_header_stats(picked["picked"])
|
|
||||||
|
|
||||||
def post(self, picked):
|
|
||||||
reinsert_image = False
|
|
||||||
try:
|
|
||||||
return super(YandereBot, self).post(picked)
|
|
||||||
# Attempted to post a file that doesn't exist (immediately repost ignoring retry_seconds)
|
|
||||||
except (FileNotFoundError, FediBot.InvalidPost):
|
|
||||||
print("File not found:", picked["picked"].get_hash_path())
|
|
||||||
reinsert_image = False
|
|
||||||
|
|
||||||
# Check if the file limit has been reached
|
|
||||||
except MastodonAPIError as e:
|
|
||||||
print("API Error:", e)
|
|
||||||
# Check if the file limit has been reached (413 error)
|
|
||||||
with contextlib.suppress(IndexError):
|
|
||||||
reinsert_image = e.args[1] != 413
|
|
||||||
|
|
||||||
# Server Errors and other general exceptions
|
|
||||||
# Assume all exceptions are on the server side (besides FileNotFoundError of course)
|
|
||||||
# If the connection is timing out it could be for two reasons:
|
|
||||||
# 1. The error was caused by the user attempting to upload a large file over a slow connection:
|
|
||||||
# a. FIX: Reduce settings_behavior["max_size"]
|
|
||||||
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
|
|
||||||
# mastodon.py API will not specify why the connection timed out).
|
|
||||||
# 3. Failed to generate screenshot
|
|
||||||
# 4. Other general exceptions
|
|
||||||
except Exception as e:
|
|
||||||
print("Unhandled Exception:", e)
|
|
||||||
# Exception flags
|
|
||||||
reinsert_image = True
|
|
||||||
|
|
||||||
if reinsert_image and self.consecutive_failed_uploads < self.settings["settings_behavior"]["max_errors"]:
|
|
||||||
self.listPictures.insert(0, picked["picked"])
|
|
||||||
|
|
||||||
self.handle_post_exception()
|
|
||||||
|
|
||||||
# The post failed
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def can_post(self):
|
|
||||||
return bool(len(self.listPictures)) and super(YandereBot, self).can_post()
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
super(YandereBot, self).start()
|
|
||||||
|
|
||||||
# Return 1 if there are still pictures in the picture list
|
|
||||||
return len(self.listPictures) > 0
|
|
||||||
|
|
||||||
|
|
||||||
class FailedToLoadCfg(Exception):
|
class FailedToLoadCfg(Exception):
|
||||||
@ -259,7 +59,7 @@ def main():
|
|||||||
# Flag if the bot is running in debug mode
|
# Flag if the bot is running in debug mode
|
||||||
debug_mode = arguments.dry_run or arguments.debug or arguments.output
|
debug_mode = arguments.dry_run or arguments.debug or arguments.output
|
||||||
|
|
||||||
yandere = YandereBot(
|
yandere = yandere_bot.YandereBot(
|
||||||
yandere_config,
|
yandere_config,
|
||||||
arguments.keyfile,
|
arguments.keyfile,
|
||||||
debug_mode
|
debug_mode
|
||||||
@ -294,11 +94,11 @@ if __name__ == "__main__":
|
|||||||
except FailedToLoadCfg:
|
except FailedToLoadCfg:
|
||||||
sys.exit(6)
|
sys.exit(6)
|
||||||
# Exceptions raised from the bot
|
# Exceptions raised from the bot
|
||||||
except FediBot.Debug:
|
except yandere_bot.Debug:
|
||||||
sys.exit(5)
|
sys.exit(5)
|
||||||
except FediBot.BadCfgFile:
|
except yandere_bot.BadCfgFile:
|
||||||
sys.exit(4)
|
sys.exit(4)
|
||||||
except BadPostSettings:
|
except yandere_bot.BadPostSettings:
|
||||||
sys.exit(3)
|
sys.exit(3)
|
||||||
except FediBot.FailedLogin:
|
except yandere_bot.FailedLogin:
|
||||||
sys.exit(2)
|
sys.exit(2)
|
||||||
|
412
src/yandere_bot.py
Normal file
412
src/yandere_bot.py
Normal file
@ -0,0 +1,412 @@
|
|||||||
|
#! /usr/bin/env python3
|
||||||
|
|
||||||
|
# Yandere Lewd Bot, an image posting bot for Pleroma
|
||||||
|
# Copyright (C) 2022 Anon <yanderefedi@proton.me>
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import yanlib
|
||||||
|
|
||||||
|
import os
|
||||||
|
import contextlib
|
||||||
|
import fnmatch
|
||||||
|
from functools import reduce
|
||||||
|
from threading import Event
|
||||||
|
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
|
||||||
|
|
||||||
|
|
||||||
|
class YanBotHash(yanlib.HashObject):
|
||||||
|
_postSettings = None
|
||||||
|
|
||||||
|
def __init__(self, hash_obj, profile):
|
||||||
|
super(YanBotHash, self).__init__()
|
||||||
|
if hash_obj is None:
|
||||||
|
return
|
||||||
|
self._sHash = hash_obj.get_hash_string()
|
||||||
|
self._sBinaryChar = hash_obj.get_binary_char()
|
||||||
|
self._sPath = hash_obj.get_hash_path()
|
||||||
|
self._postSettings = profile
|
||||||
|
|
||||||
|
def get_post_setting(self):
|
||||||
|
return self._postSettings
|
||||||
|
|
||||||
|
|
||||||
|
class YandereBot:
|
||||||
|
# From the threading library. Is responsible for putting the bot to sleep, and exiting when the user quits (Ctrl+C)
|
||||||
|
eventSleep = Event()
|
||||||
|
|
||||||
|
# The configuration module
|
||||||
|
cfg = None
|
||||||
|
|
||||||
|
# The below settings are required from the configuration module
|
||||||
|
settings_server = None
|
||||||
|
settings_behavior = None
|
||||||
|
settings_time = None
|
||||||
|
settings_post = None
|
||||||
|
settings_post_default = None
|
||||||
|
settings_encrypt = None
|
||||||
|
|
||||||
|
# Class variables
|
||||||
|
mastodon_api = None
|
||||||
|
listPictures = []
|
||||||
|
lenBlacklist = 0
|
||||||
|
failed_uploads = 0
|
||||||
|
consecutive_failed_uploads = 0
|
||||||
|
currentSessionCount = 0
|
||||||
|
debug_mode = False
|
||||||
|
primed = False
|
||||||
|
decrypted = False
|
||||||
|
|
||||||
|
# YandereBot.__init__()
|
||||||
|
# @param cfg A dynamically loaded python module. See yanlib.module_load() for an example
|
||||||
|
# @param keyfile Keyfile to decrypt settings_post
|
||||||
|
# @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma)
|
||||||
|
def __init__(self, cfg, keyfile=None, debug_mode=False):
|
||||||
|
self.cfg = cfg
|
||||||
|
self.load_settings(self.cfg)
|
||||||
|
self.debug_mode = debug_mode or self.settings_behavior["debug"]
|
||||||
|
self.settings_encrypt["keyfile"] = keyfile or self.settings_encrypt["keyfile"]
|
||||||
|
|
||||||
|
# Setup Exit Calls
|
||||||
|
def exit(self):
|
||||||
|
self.eventSleep.set()
|
||||||
|
|
||||||
|
# Make sure there are no profiles in listPictures set to none. Print the bad post and exit if there is.
|
||||||
|
def validate_post_settings(self):
|
||||||
|
bad_post_count = 0
|
||||||
|
for i in self.listPictures:
|
||||||
|
if i.get_post_setting() is None:
|
||||||
|
print("Bad post setting [{}]: {}".format(bad_post_count, i.get_full_string()))
|
||||||
|
bad_post_count += 1
|
||||||
|
if bad_post_count:
|
||||||
|
raise BadPostSettings
|
||||||
|
|
||||||
|
# Decryption settings
|
||||||
|
# Used to set class attributes with the same name to the value specified in the config file.
|
||||||
|
def load_settings(self, cfg, settings=None):
|
||||||
|
try:
|
||||||
|
default_settings = (
|
||||||
|
"settings_server",
|
||||||
|
"settings_behavior",
|
||||||
|
"settings_time",
|
||||||
|
"settings_post",
|
||||||
|
"settings_post_default",
|
||||||
|
"settings_encrypt"
|
||||||
|
)
|
||||||
|
_settings = settings or default_settings
|
||||||
|
for ele in _settings:
|
||||||
|
setattr(self, ele, getattr(cfg, ele))
|
||||||
|
|
||||||
|
except AttributeError as e:
|
||||||
|
print(e)
|
||||||
|
raise BadCfgFile
|
||||||
|
|
||||||
|
def decrypt_settings(self):
|
||||||
|
if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode:
|
||||||
|
import encryption
|
||||||
|
try:
|
||||||
|
self.settings_server = encryption.settings_server_decrypt(
|
||||||
|
self.settings_server, self.settings_encrypt, self.settings_encrypt["keyfile"])
|
||||||
|
except encryption.EncryptionFail as e:
|
||||||
|
raise BadCfgFile(str(e))
|
||||||
|
self.decrypted = True
|
||||||
|
|
||||||
|
# Login to Pleroma
|
||||||
|
def login(self):
|
||||||
|
skip_login = self.debug_mode or self.mastodon_api is not None
|
||||||
|
if skip_login:
|
||||||
|
return
|
||||||
|
if not self.decrypted:
|
||||||
|
self.decrypt_settings()
|
||||||
|
try:
|
||||||
|
self.mastodon_api = Mastodon(
|
||||||
|
client_id=self.settings_server["client_id"],
|
||||||
|
client_secret=self.settings_server["client_secret"],
|
||||||
|
access_token=self.settings_server["access_token"],
|
||||||
|
api_base_url=self.settings_server["api_base_url"],
|
||||||
|
feature_set=self.settings_behavior["feature_set"] # <--- Necessary for Mastodon Version 1.5.1
|
||||||
|
)
|
||||||
|
except (MastodonIllegalArgumentError, MastodonVersionError) as e:
|
||||||
|
print(e)
|
||||||
|
raise FailedLogin
|
||||||
|
|
||||||
|
# Set up lists
|
||||||
|
def read_blacklist_files(self):
|
||||||
|
list_blacklist = []
|
||||||
|
for i in self.settings_behavior["master_blacklist_r"]:
|
||||||
|
# It doesn't matter if the picture file doesn't exist
|
||||||
|
with contextlib.suppress(IOError):
|
||||||
|
list_blacklist.extend(yanlib.get_hash_list(i))
|
||||||
|
return list_blacklist
|
||||||
|
|
||||||
|
def blacklist(self, picked):
|
||||||
|
self.lenBlacklist += 1
|
||||||
|
for path in self.settings_behavior["master_blacklist_w"]:
|
||||||
|
with open(path, "a") as f:
|
||||||
|
print(picked.get_full_string(), file=f)
|
||||||
|
|
||||||
|
# load_pictures will return a list of YanHashObj() with a blacklist(s) applied
|
||||||
|
# @param list_blacklist A list of HashObjects() that are blacklist hashes
|
||||||
|
def load_pictures(self, list_blacklist):
|
||||||
|
if not self.settings_behavior["master_list"]:
|
||||||
|
raise MissingMasterList
|
||||||
|
|
||||||
|
try:
|
||||||
|
list_pictures = reduce(lambda x, y: x + y,
|
||||||
|
[get_list_of_hashes_with_profiles(
|
||||||
|
f,
|
||||||
|
self.settings_post,
|
||||||
|
self.settings_post_default)
|
||||||
|
for f in self.settings_behavior["master_list"]
|
||||||
|
])
|
||||||
|
return yanlib.get_hash_list_blacklist(list_pictures, list_blacklist, self.settings_behavior["max_size"])
|
||||||
|
except IOError as e:
|
||||||
|
print(e)
|
||||||
|
raise MissingMasterList
|
||||||
|
|
||||||
|
def load_picture_list(self):
|
||||||
|
list_blacklist = self.read_blacklist_files()
|
||||||
|
self.listPictures = self.load_pictures(list_blacklist)
|
||||||
|
self.lenBlacklist = len(list_blacklist)
|
||||||
|
|
||||||
|
|
||||||
|
# Maybe I should remove this from the backend?
|
||||||
|
def print_header_stats(self, picked):
|
||||||
|
_picked = picked.get_full_string() if picked else None
|
||||||
|
picked_profile = picked.get_post_setting()["name"] if picked else None
|
||||||
|
picked_next = self.listPictures[0].get_full_string() if self.listPictures else None
|
||||||
|
picked_next_profile = self.listPictures[0].get_post_setting()["name"] if self.listPictures else None
|
||||||
|
|
||||||
|
print("Profile: {} | Picked: {} | Next_Profile: {} | Next_Pick: {}".format(
|
||||||
|
picked_profile, _picked, picked_next_profile, picked_next
|
||||||
|
))
|
||||||
|
|
||||||
|
# Returns a list of media paths (without the hashes)
|
||||||
|
def get_media_list(self, picked):
|
||||||
|
ext = self.settings_behavior["multi_media_ext"]
|
||||||
|
if not picked:
|
||||||
|
return None
|
||||||
|
elif ext and os.path.splitext(picked.get_hash_path())[1].lower() == ext.lower():
|
||||||
|
return [i.get_hash_path() for i in yanlib.get_hash_list(picked.get_hash_path())]
|
||||||
|
else:
|
||||||
|
return [picked.get_hash_path()]
|
||||||
|
|
||||||
|
# Returns a list of tuples that contain the media list path and media mastodon dictionary
|
||||||
|
def upload_media_list(self, path_list):
|
||||||
|
media_list = []
|
||||||
|
for path in path_list:
|
||||||
|
if not os.path.isfile(path):
|
||||||
|
raise FileNotFoundError("Could not upload: {}".format(path))
|
||||||
|
|
||||||
|
for path in path_list:
|
||||||
|
if not self.debug_mode:
|
||||||
|
media = self.mastodon_api.media_post(path, description=os.path.basename(path))
|
||||||
|
media_dict = {
|
||||||
|
"path": path,
|
||||||
|
"media": media
|
||||||
|
}
|
||||||
|
media_list.append(media_dict)
|
||||||
|
return media_list
|
||||||
|
|
||||||
|
def get_post_text(self, picked, media_list):
|
||||||
|
content_type = self.settings_behavior["content_type"]
|
||||||
|
content_newline = self.settings_behavior["content_newline"]
|
||||||
|
static_message = content_newline.join(picked.get_post_setting()["message"])
|
||||||
|
string_post = ""
|
||||||
|
string_imglinks = []
|
||||||
|
|
||||||
|
if media_list and self.settings_behavior["post_image_link"]:
|
||||||
|
for media_dict in media_list:
|
||||||
|
path = media_dict["path"]
|
||||||
|
media = media_dict["media"]
|
||||||
|
|
||||||
|
if path is None or media is None:
|
||||||
|
continue
|
||||||
|
elif content_type == "text/markdown" and not self.debug_mode:
|
||||||
|
string_imglinks.append(
|
||||||
|
"[{}]({})".format(os.path.basename(path), media["url"])
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
string_imglinks.append(media["url"])
|
||||||
|
|
||||||
|
# Join non empty strings with a newline character
|
||||||
|
string_imglinks_joined = content_newline.join(filter(None, string_imglinks))
|
||||||
|
string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined)))
|
||||||
|
|
||||||
|
return string_post
|
||||||
|
|
||||||
|
def _post(self, picked):
|
||||||
|
images = self.get_media_list(picked)
|
||||||
|
if not images:
|
||||||
|
raise InvalidPost("Media list is empty")
|
||||||
|
media_list = self.upload_media_list(images)
|
||||||
|
message = self.get_post_text(picked, media_list)
|
||||||
|
if self.debug_mode:
|
||||||
|
return picked
|
||||||
|
self.mastodon_api.status_post(
|
||||||
|
message,
|
||||||
|
media_ids=[i["media"] for i in media_list],
|
||||||
|
visibility=self.settings_behavior["visibility"],
|
||||||
|
sensitive=picked.get_post_setting()["spoiler"],
|
||||||
|
content_type=self.settings_behavior["content_type"]
|
||||||
|
)
|
||||||
|
return picked
|
||||||
|
|
||||||
|
# The main post function
|
||||||
|
# This funciton is responsible for picking, posting, and blacklisting an image in listPictures
|
||||||
|
#
|
||||||
|
# It is also responsible for removing the picked item from self.listPictures, which can be accomplished by simply
|
||||||
|
# popping it at index 0. This should handle any error that might occur while posting.
|
||||||
|
#
|
||||||
|
# This function should return 'None' if a post failed, and the picked item from self.listPictures if it succeeded.
|
||||||
|
def post(self):
|
||||||
|
picked = None
|
||||||
|
|
||||||
|
# Flags that are set if an upload fails
|
||||||
|
reinsert_image = False
|
||||||
|
|
||||||
|
# Attempt post
|
||||||
|
try:
|
||||||
|
# Post
|
||||||
|
picked = self.listPictures.pop(0)
|
||||||
|
self._post(picked)
|
||||||
|
|
||||||
|
# After a successful post
|
||||||
|
self.currentSessionCount += 1
|
||||||
|
self.consecutive_failed_uploads = 0
|
||||||
|
self.blacklist(picked)
|
||||||
|
|
||||||
|
# The post was successful
|
||||||
|
return picked
|
||||||
|
|
||||||
|
# Attempted to post a file that doesn't exist (immediately repost ignoring retry_seconds)
|
||||||
|
except (FileNotFoundError, InvalidPost):
|
||||||
|
print("File not found:", picked.get_hash_path())
|
||||||
|
reinsert_image = False
|
||||||
|
|
||||||
|
# Check if the file limit has been reached
|
||||||
|
except MastodonAPIError as e:
|
||||||
|
print("API Error:", e)
|
||||||
|
# Check if the file limit has been reached (413 error)
|
||||||
|
with contextlib.suppress(IndexError):
|
||||||
|
reinsert_image = e.args[1] != 413
|
||||||
|
|
||||||
|
# Server Errors and other general exceptions
|
||||||
|
# Assume all exceptions are on the server side (besides FileNotFoundError of course)
|
||||||
|
# If the connection is timing out it could be for two reasons:
|
||||||
|
# 1. The error was caused by the user attempting to upload a large file over a slow connection:
|
||||||
|
# a. FIX: Reduce settings_behavior["max_size"]
|
||||||
|
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
|
||||||
|
# mastodon.py API will not specify why the connection timed out).
|
||||||
|
# 3. Failed to generate screenshot
|
||||||
|
# 4. Other general exceptions
|
||||||
|
except Exception as e:
|
||||||
|
print("Unhandled Exception:", e)
|
||||||
|
# Exception flags
|
||||||
|
reinsert_image = True
|
||||||
|
|
||||||
|
# An exception occurred
|
||||||
|
self.failed_uploads += 1
|
||||||
|
self.consecutive_failed_uploads += 1
|
||||||
|
|
||||||
|
if reinsert_image and self.consecutive_failed_uploads < self.settings_behavior["max_errors"]:
|
||||||
|
self.listPictures.insert(0, picked)
|
||||||
|
|
||||||
|
# The post failed
|
||||||
|
return None
|
||||||
|
|
||||||
|
# [BEGIN THE PROGRAM]
|
||||||
|
def prime_bot(self):
|
||||||
|
if self.primed:
|
||||||
|
return
|
||||||
|
self.load_picture_list()
|
||||||
|
self.validate_post_settings()
|
||||||
|
if not self.debug_mode:
|
||||||
|
self.decrypt_settings()
|
||||||
|
self.login()
|
||||||
|
self.primed = True
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
self.prime_bot()
|
||||||
|
|
||||||
|
# Begin posting
|
||||||
|
self.main_loop()
|
||||||
|
|
||||||
|
# Return 1 if there are still pictures in the picture list
|
||||||
|
return len(self.listPictures) > 0
|
||||||
|
|
||||||
|
# End Conditions:
|
||||||
|
# 1. User presses Ctrl+C
|
||||||
|
# 2. There is nothing left in the picture list to select from
|
||||||
|
# 3. settings_behavior["uploads_per_post"] is less than uploads_per_post
|
||||||
|
# 4. Consecutive failed uploads is less than max_errors
|
||||||
|
def can_post(self):
|
||||||
|
return (
|
||||||
|
not self.eventSleep.is_set() and
|
||||||
|
bool(len(self.listPictures)) and
|
||||||
|
self.currentSessionCount < self.settings_behavior["uploads_per_post"] and
|
||||||
|
self.consecutive_failed_uploads < self.settings_behavior["max_errors"]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def main_loop(self):
|
||||||
|
sleep_seconds = self.settings_behavior["retry_seconds"]
|
||||||
|
while self.can_post():
|
||||||
|
picked = self.post()
|
||||||
|
self.print_header_stats(picked)
|
||||||
|
if self.can_post():
|
||||||
|
self.eventSleep.wait(sleep_seconds)
|
||||||
|
|
||||||
|
|
||||||
|
# A callback function for get_list_of_hashes_with_profiles() that returns a single profile from @param hash_obj
|
||||||
|
# @param hash_obj A HashObject() (or subclass)
|
||||||
|
# @param profiles A list of available profiles to match
|
||||||
|
# @param profiles_default The default profile to return if no profile is matched
|
||||||
|
def get_profile(hash_obj, profiles, profiles_default):
|
||||||
|
profile_gen = (x for x in profiles if fnmatch.fnmatch(hash_obj.get_hash_path(), x["path"]))
|
||||||
|
return next(profile_gen, profiles_default)
|
||||||
|
|
||||||
|
|
||||||
|
# Takes a file path and transforms it into a list of YanBotHash() with the appropriate profile
|
||||||
|
# @param f_name Path of hash file
|
||||||
|
# @param profiles List of profiles -> self.settings_post
|
||||||
|
# @param profiles_default The default profile to apply
|
||||||
|
# @param callback_get_profile Callback function -> should return a single profile. Default: get_profile()
|
||||||
|
def get_list_of_hashes_with_profiles(f_name, profiles, profile_default):
|
||||||
|
return [YanBotHash(i, get_profile(i, profiles, profile_default)) for i in yanlib.get_hash_list(f_name)]
|
||||||
|
|
||||||
|
|
||||||
|
# Custom Exceptions for YandereBot
|
||||||
|
class Debug(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidPost(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class BadPostSettings(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FailedLogin(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class BadCfgFile(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class MissingMasterList(Exception):
|
||||||
|
pass
|
Reference in New Issue
Block a user