Compare commits

..

No commits in common. "89381b76d4863e42a022faa784754fb547405d0f" and "9e11c593050212a3eb349d3cfb484f87b8e5c266" have entirely different histories.

4 changed files with 633 additions and 185 deletions

44
.gitignore vendored
View File

@ -1,14 +1,32 @@
# Ignore Everything # User preferance files
* /rsc*/
/md5/
/src/cfg*.py
/log.txt
# Exception # Virtual environment
!/default/* /venv*
!/docs/* /src/__pycache__/
!/src/main.py
!/src/*_backend.py # PyCharm IDE folder
!/.gitignore /.idea/
!/LICENSE.txt
!/README.md # Editor files
!/cron.sh /*~
!/requirements.txt /*#
!/run.sh
# Automation scripts + odds and ends
/notes.txt
/compile.sh
/upload.sh
/balance_2.csv
/DEFAULT.csv
/generate_random.sh
/src/make_master_template.py
/src/make_master_list.py
/src/hash_path_exists.py
/utils/
/cfg_availible/
/profile/
/gnu_header.txt
/backup*

217
src/encryption.py Executable file
View File

@ -0,0 +1,217 @@
#! /usr/bin/env python3
# Danbooru Bot, an image posting bot for Pleroma
# Copyright (C) 2022 Anon <yanderefedi@proton.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import sys
import base64
import os
import getpass
from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from collections import OrderedDict
class EncryptionFail(Exception):
pass
class PasswordMismatch(Exception):
pass
# Return string from bytes
def salt_encode(b):
return base64.urlsafe_b64encode(b).decode()
# Return bytes from string
def salt_decode(s):
return base64.urlsafe_b64decode(s.encode())
# Ordered Dictionaries
def change_encoding_dict(settings_server, encoding_type):
return OrderedDict([(k, encoding_type(v)) for k, v in settings_server.items()])
# Return bytes from string
def encode_dict(settings_server):
return change_encoding_dict(settings_server, str.encode)
# Return string from bytes
def decode_dict(settings_server):
return change_encoding_dict(settings_server, bytes.decode)
# password: Bytes
# salt: Bytes
def derive_key(password, salt):
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
backend=default_backend()
)
return base64.urlsafe_b64encode(kdf.derive(password))
# Encryption functions
# message: Bytes
# key: Bytes
def encrypt(message, key):
f = Fernet(key)
token = f.encrypt(message)
return token
# token: Bytes
# key: Bytes
def decrypt(token, key):
f = Fernet(key)
message = f.decrypt(token)
return message
# password: bytes()
# salt: bytes()
# settings_server: dict() -> Byte values
# encryption_function: encrypt(message, key) : decrypt(token, key):
# Returns settings_server_decrypted dictionary with Byte() values. Will need to use
# ChangeEncodingDict to make them strings (recommended cfg file friendly)
def encrypt_settings(settings_server, password, salt, encryption_function):
key = derive_key(password, salt)
settings_server_decrypted = OrderedDict()
for setting in settings_server:
settings_server_decrypted[setting] = encryption_function(settings_server[setting], key)
return settings_server_decrypted
def get_keyfile(keyfile=None):
if keyfile is not None:
with open(keyfile, "rb") as f:
return f.read()
return None
def get_pass(q):
try:
return getpass.getpass(q).encode()
except KeyboardInterrupt:
raise EncryptionFail("\nQuitting...")
def settings_server_encrypt(settings_server, keyfile=None):
try:
settings_server = encode_dict(settings_server)
salt = os.urandom(16)
password = get_keyfile(keyfile)
if password is None:
password = get_pass("Enter password: ")
password2 = get_pass("Enter password: ")
if password != password2:
raise PasswordMismatch("Passwords do not match")
encrypted = encrypt_settings(settings_server, password, salt, encrypt)
return salt_encode(salt), decode_dict(encrypted)
except PasswordMismatch as e:
raise EncryptionFail(str(e))
except Exception as e:
err = str(e)
raise EncryptionFail("Encrypt Error: {}".format(err))
def settings_server_decrypt(settings_server, settings_encrypt, keyfile=None):
try:
if not settings_encrypt["encrypt"]:
return settings_server
settings_server = encode_dict(settings_server)
password = get_keyfile(keyfile or settings_encrypt["keyfile"]) or get_pass("Enter password: ")
salt = salt_decode(settings_encrypt["salt"])
decrypted = encrypt_settings(settings_server, password, salt, decrypt)
return decode_dict(decrypted)
except base64.binascii.Error:
raise EncryptionFail("Salt is invalid")
except InvalidToken:
raise EncryptionFail("Password or token is incorrect")
except Exception as e:
err = str(e)
raise EncryptionFail("Decrypt Error: {}".format(err))
def main():
import argparse
from pprint import pformat
default_cfg = "cfg"
parser = argparse.ArgumentParser(
description="A class to encrypt server credentials",
epilog="There are no additional parameters.",
add_help=True )
parser.add_argument("--encrypt", help="Generate encrypted authentication.", action="store_true")
parser.add_argument("--decrypt", help="Decrypt encrypted authentication", action="store_true")
parser.add_argument("--recrypt", help="Recrypt encrypted authentication", action="store_true")
parser.add_argument("-k", "--keyfile", help="Keyfile used for decryption", default=None)
parser.add_argument("-c", "--cfg", help="Specify config file.", default=default_cfg)
arguments = parser.parse_args()
if arguments.recrypt:
arguments.encrypt = True
arguments.decrypt = True
if arguments.encrypt or arguments.decrypt:
import importlib
cfg = importlib.import_module(arguments.cfg)
settings_server = cfg.settings_server
settings_encrypt = cfg.settings_encrypt
keyfile = arguments.keyfile or settings_encrypt["keyfile"]
if arguments.decrypt and arguments.encrypt:
print("Re-encrypting")
if arguments.decrypt: # arguments.decrypt
print("Decrypt...")
settings_server = settings_server_decrypt(settings_server, settings_encrypt, arguments.keyfile)
settings_encrypt = OrderedDict([
("encrypt", False),
("salt", settings_encrypt["salt"]),
("keyfile", arguments.keyfile)
])
if arguments.encrypt:
print("Encrypt...")
salt, settings_server = settings_server_encrypt(settings_server, keyfile)
settings_encrypt = OrderedDict([
("encrypt", True),
("salt", salt),
("keyfile", arguments.keyfile)
])
print("settings_server = {}".format(pformat(settings_server)))
print("settings_encrypt = {}".format(pformat(settings_encrypt)))
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except EncryptionFail as e:
print(e)
sys.exit(1)

View File

@ -16,177 +16,10 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import sys import sys
import argparse import argparse
import random
import signal import signal
import FediBot import yandere_bot
import importlib
import magic
import copy
class FileTooLarge(Exception):
pass
class InvalidMimeType(Exception):
pass
class YandereBot(FediBot.YandereBot):
currentSessionCount = 0
currentIndexCount = 0
currentProfileIndex = []
def __init__(self, cfg, keyfile=None, debug_mode=False):
settings = {
"settings_time": {},
"settings_post": {},
"settings_backend": {}
}
self.settings.update(settings)
super(YandereBot, self).__init__(cfg, keyfile, debug_mode)
random.seed(os.urandom(16))
self.currentProfileIndex = [0] * len(self.settings["settings_post"])
def print_header_stats(self, picked):
settings_post = self.settings["settings_post"]
profile = picked["profile"]["name"] if picked else None
url = picked["file_url"] if picked else None
path = picked["full_path"] if picked else None
nsfw = picked["nsfw"] if picked else None
posted_once = int(self.currentSessionCount > 0)
index = (self.currentIndexCount - posted_once) % len(settings_post)
state_print = copy.copy(self.currentProfileIndex)
state_print[index] = state_print[index] - posted_once
state_print = [state_print[i] % len(settings_post[i]) for i in range(0, len(self.currentProfileIndex))]
print("Profile: {} | Index: {} | NSFW: {} | Path: {} | URL: {}".format(
profile, index, nsfw, path, url
))
print("State: {}".format(','.join(map(str, state_print))))
# Returns a list of media paths (without the hashes)
def download_media(self, picked_profile):
try:
backend_s = picked_profile["backend"]
backend_credentials = self.settings["settings_backend"][backend_s]
backend = importlib.import_module(backend_credentials["module"])
downloader = backend.downloader(backend_credentials)
img = downloader.fetch_post(picked_profile)
if img is None:
raise FediBot.InvalidPost("Img could not be downloaded")
return downloader.download_post(img)
except ImportError:
print("Invalid Backend:", picked_profile["backend"])
return None
# Returns a list of tuples that contain the media list path and media mastodon dictionary
def upload_media_list_validate(self, media_list):
# Check to make sure the paths in media_list actually exist
super(YandereBot, self).upload_media_list_validate(media_list)
# Validate picked
for path in media_list:
if not self.valid_mimetype(path):
raise InvalidMimeType("Invalid mime type")
elif not self.valid_file_size(path):
raise FileTooLarge("File is too large to upload")
def valid_mimetype(self, full_path):
mime = magic.from_file(full_path, mime=True)
if mime is None:
return False
mime_category = mime.split("/", 1)[0]
return mime_category in ("image", "video")
def get_message(self, picked):
nsfw = picked["nsfw"]
message = picked["profile"]["message_nsfw"] if nsfw else picked["profile"]["message"]
return message
def valid_file_size(self, full_path):
settings_behavior = self.settings["settings_behavior"]
max_size = settings_behavior["max_size"]
file_size = os.stat(full_path).st_size
return file_size <= max_size
def pick_index(self, mode, current_index, length):
if mode == "random":
return random.randint(0, length - 1)
elif mode == "sequential":
return current_index % length
def pick_profile(self):
settings_behavior = self.settings["settings_behavior"]
# Get x and y
mode = settings_behavior["tag_select"].lower()
posts = self.settings["settings_post"]
x = self.pick_index(mode, self.currentIndexCount, len(posts))
y = self.pick_index(mode, self.currentProfileIndex[x], len(posts[x]))
# Return the Profile
return x, y
def pick(self):
settings_post = self.settings["settings_post"]
x, y = self.pick_profile()
picked_profile = settings_post[x][y]
picked = self.download_media(picked_profile)
media_list = [picked["full_path"]]
spoiler = picked["nsfw"]
message = self.get_message(picked)
return {
"picked": picked,
"media_list": media_list,
"spoiler": spoiler,
"message": message
}
def after_pick(self, picked):
self.print_header_stats(picked["picked"])
self.currentProfileIndex[self.currentIndexCount] += 1
self.currentIndexCount += 1
os.remove(picked["picked"]["full_path"])
return super(YandereBot, self).after_pick(picked)
def post(self, picked):
# Attempt post
try:
return super(YandereBot, self).post(picked)
# Invalid post (move to next profile)
except FediBot.InvalidPost as e:
self.currentIndexCount += 1
print("Invalid post:", e)
# Invalid post (remove downloaded files)
except (FileTooLarge, InvalidMimeType) as e:
os.remove(picked["full_path"])
print("Unable to post:", e)
self.handle_post_exception()
# The post failed
return None
def can_post(self):
return self.currentIndexCount >= 0 and super(YandereBot, self).can_post()
class FailedToLoadCfg(Exception): class FailedToLoadCfg(Exception):
@ -226,7 +59,7 @@ def main():
# Flag if the bot is running in debug mode # Flag if the bot is running in debug mode
debug_mode = arguments.dry_run or arguments.debug debug_mode = arguments.dry_run or arguments.debug
yandere = YandereBot( yandere = yandere_bot.YandereBot(
yandere_config, yandere_config,
arguments.keyfile, arguments.keyfile,
debug_mode, debug_mode,
@ -261,9 +94,9 @@ if __name__ == "__main__":
except FailedToLoadCfg: except FailedToLoadCfg:
sys.exit(10) sys.exit(10)
# Exceptions raised from the bot # Exceptions raised from the bot
except FediBot.Debug: except yandere_bot.Debug:
sys.exit(6) sys.exit(6)
except FediBot.BadCfgFile: except yandere_bot.BadCfgFile:
sys.exit(4) sys.exit(4)
except FediBot.FailedLogin: except yandere_bot.FailedLogin:
sys.exit(2) sys.exit(2)

380
src/yandere_bot.py Normal file
View File

@ -0,0 +1,380 @@
#! /usr/bin/env python3
# Danbooru Bot, an image posting bot for Pleroma
# Copyright (C) 2022 Anon <yanderefedi@proton.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import importlib
import magic
import random
import copy
from threading import Event
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
class YandereBot:
# From the threading library. Is responsible for putting the bot to sleep, and exiting when the user quits (Ctrl+C)
eventSleep = Event()
# The configuration module
cfg = None
# The below settings are required from the configuration module
settings_server = None
settings_behavior = None
settings_time = None
settings_post = None
settings_encrypt = None
settings_backend = None
# Class variables
mastodon_api = None
failed_uploads = 0
consecutive_failed_uploads = 0
currentSessionCount = 0
currentIndexCount = 0
currentProfileIndex = []
debug_mode = False
primed = False
decrypted = False
# YandereBot.__init__()
# @param cfg A dynamically loaded python module. See yanlib.module_load() for an example
# @param keyfile Keyfile to decrypt settings_post
# @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma)
# @prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post)
def __init__(self, cfg, keyfile=None, debug_mode=False, prime_bot=True):
self.cfg = cfg
self.load_settings(self.cfg)
self.debug_mode = debug_mode or self.settings_behavior["debug"]
self.settings_encrypt["keyfile"] = keyfile or self.settings_encrypt["keyfile"]
self.currentProfileIndex = [0]*len(self.settings_post)
random.seed(os.urandom(16))
if prime_bot:
self.prime_bot()
# Setup Exit Calls
def exit(self):
self.eventSleep.set()
# Decryption settings
# Used to set class attributes with the same name to the value specified in the config file.
def load_settings(self, cfg, settings=None):
try:
default_settings = (
"settings_server",
"settings_behavior",
"settings_time",
"settings_post",
"settings_encrypt",
"settings_backend",
)
_settings = settings or default_settings
for ele in _settings:
setattr(self, ele, getattr(cfg, ele))
except AttributeError as e:
print(e)
raise BadCfgFile
def decrypt_settings(self):
if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode:
import encryption
try:
self.settings_server = encryption.settings_server_decrypt(
self.settings_server, self.settings_encrypt, self.settings_encrypt["keyfile"])
except encryption.EncryptionFail as e:
raise BadCfgFile(str(e))
self.decrypted = True
# Login to Pleroma
def login(self):
skip_login = self.debug_mode or self.mastodon_api is not None
if skip_login:
return
if not self.decrypted:
self.decrypt_settings()
try:
self.mastodon_api = Mastodon(
client_id=self.settings_server["client_id"],
client_secret=self.settings_server["client_secret"],
access_token=self.settings_server["access_token"],
api_base_url=self.settings_server["api_base_url"],
feature_set=self.settings_behavior["feature_set"] # <--- Necessary for Mastodon Version 1.5.1
)
except (MastodonIllegalArgumentError, MastodonVersionError) as e:
print(e)
raise FailedLogin
# Maybe I should remove this from the backend?
def print_header_stats(self, picked):
profile = picked["profile"]["name"] if picked else None
url = picked["file_url"] if picked else None
path = picked["full_path"] if picked else None
nsfw = picked["nsfw"] if picked else None
posted_once = int(self.currentSessionCount > 0)
index = (self.currentIndexCount - posted_once) % len(self.settings_post)
state_print = copy.copy(self.currentProfileIndex)
state_print[index] = state_print[index] - posted_once
state_print = [state_print[i] % len(self.settings_post[i]) for i in range(0, len(self.currentProfileIndex))]
print("Profile: {} | Index: {} | NSFW: {} | Path: {} | URL: {}".format(
profile, index, nsfw, path, url
))
print("State: {}".format(','.join(map(str, state_print))))
# Returns a list of media paths (without the hashes)
def download_media(self, picked_profile):
try:
backend_s = picked_profile["backend"]
backend_credentials = self.settings_backend[backend_s]
backend = importlib.import_module(backend_credentials["module"])
downloader = backend.downloader(backend_credentials)
img = downloader.fetch_post(picked_profile)
if img is None:
raise InvalidPost("Img could not be downloaded")
return downloader.download_post(img)
except ImportError:
print("Invalid Backend:", picked_profile["backend"])
return None
# Returns a list of tuples that contain the media list path and media mastodon dictionary
def upload_media_list(self, path_list):
media_list = []
# Validate picked
for path in path_list:
if not os.path.isfile(path):
raise FileNotFoundError("Could not upload: {}".format(path))
elif not self.valid_mimetype(path):
raise InvalidMimeType("Invalid mime type")
elif not self.valid_file_size(path):
raise FileTooLarge("File is too large to upload")
# Upload
for path in path_list:
if not self.debug_mode:
media = self.mastodon_api.media_post(path, description=os.path.basename(path))
media_dict = {
"path": path,
"media": media
}
media_list.append(media_dict)
return media_list
def get_post_text(self, picked, media_list):
content_type = self.settings_behavior["content_type"]
content_newline = self.settings_behavior["content_newline"]
nsfw = picked["nsfw"]
message = picked["profile"]["message_nsfw"] if nsfw else picked["profile"]["message"]
static_message = content_newline.join(message)
string_post = ""
string_imglinks = []
if media_list and self.settings_behavior["post_image_link"]:
for media_dict in media_list:
path = media_dict["path"]
media = media_dict["media"]
if path is None or media is None:
continue
elif content_type == "text/markdown" and not self.debug_mode:
string_imglinks.append(
"[{}]({})".format(os.path.basename(path), media["url"])
)
else:
string_imglinks.append(media["url"])
# Join non empty strings with a newline character
string_imglinks_joined = content_newline.join(filter(None, string_imglinks))
string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined)))
return string_post
def valid_mimetype(self, full_path):
mime = magic.from_file(full_path, mime=True)
if mime is None:
return False
mime_category = mime.split("/", 1)[0]
return mime_category in ("image", "video")
def valid_file_size(self, full_path):
max_size = self.settings_behavior["max_size"]
file_size = os.stat(full_path).st_size
return file_size <= max_size
def _post(self, picked):
if not picked:
raise InvalidPost("Picked post is none")
upload_list = [picked["full_path"]]
media_list = self.upload_media_list(upload_list)
message = self.get_post_text(picked, media_list)
if self.debug_mode:
return picked
self.mastodon_api.status_post(
message,
media_ids=[i["media"] for i in media_list],
visibility=self.settings_behavior["visibility"],
sensitive=picked["nsfw"],
content_type=self.settings_behavior["content_type"]
)
return picked
def pick_index(self, mode, current_index, length):
if mode == "random":
return random.randint(0, length - 1)
elif mode == "sequential":
return current_index % length
def pick_profile(self):
# Get x and y
mode = self.settings_behavior["tag_select"].lower()
posts = self.settings_post
x = self.pick_index(mode, self.currentIndexCount, len(posts))
y = self.pick_index(mode, self.currentProfileIndex[x], len(posts[x]))
# Return the Profile
return x, y
# The main post function
# This funciton is responsible for picking a profile, generate a screenshot, and posting it.
#
# This function should return 'None' if a post failed, and the picked item from self.listPictures if it succeeded.
def post(self):
picked = None
# Attempt post
try:
# Post
x, y = self.pick_profile()
picked_profile = self.settings_post[x][y]
picked = self.download_media(picked_profile)
self._post(picked)
os.remove(picked["full_path"])
# After a successful post
self.currentSessionCount += 1
self.consecutive_failed_uploads = 0
# Set indexes
self.currentProfileIndex[x] += 1
self.currentIndexCount += 1
# The post was successful
return picked
# Invalid post (move to next profile)
except InvalidPost as e:
self.currentIndexCount += 1
print("Invalid post:", e)
# Invalid post (remove downloaded files)
except (FileTooLarge, InvalidMimeType) as e:
os.remove(picked["full_path"])
print("Unable to post:", e)
# Server Errors and other general exceptions
# Assume all exceptions are on the server side (besides FileNotFoundError of course)
# If the connection is timing out it could be for two reasons:
# 1. The error was caused by the user attempting to upload a large file over a slow connection:
# a. FIX: Reduce settings_behavior["max_size"]
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
# mastodon.py API will not specify why the connection timed out).
# 3. Other general exceptions
except (FileNotFoundError, MastodonAPIError, Exception) as e:
print("Exception:", e)
# An exception occurred
self.failed_uploads += 1
self.consecutive_failed_uploads += 1
# The post failed
return None
# [BEGIN THE PROGRAM]
def prime_bot(self):
if not self.debug_mode:
self.decrypt_settings()
self.login()
self.primed = True
def start(self, delay=0):
# Prime bot if not already primed.
if not self.primed:
self.prime_bot()
# Begin posting
self.main_loop()
# Return 1 if there are still pictures in the picture list
return 0
# End Conditions:
# 1. User presses Ctrl+C
# 2. settings_behavior["uploads_per_post"] is less than uploads_per_post
# 3. Consecutive failed uploads is less than max_errors
def can_post(self):
return (
not self.eventSleep.is_set() and
self.currentSessionCount < self.settings_behavior["uploads_per_post"] and
self.consecutive_failed_uploads < self.settings_behavior["max_errors"] and
self.currentIndexCount >= 0
)
def main_loop(self):
sleep_seconds = self.settings_behavior["retry_seconds"]
while self.can_post():
picked = self.post()
self.print_header_stats(picked)
if self.can_post():
self.eventSleep.wait(sleep_seconds)
# Custom Exceptions for YandereBot
class Debug(Exception):
pass
class InvalidPost(Exception):
pass
class FileTooLarge(Exception):
pass
class InvalidMimeType(Exception):
pass
class FailedLogin(Exception):
pass
class BadCfgFile(Exception):
pass