commit 883a85ec7f
Anon 2022-07-14 12:32:18 -07:00
# Readme
Yandere Lewd Bot is a customizable, no frills image-posting bot for Pleroma.
A live example can be viewed at [](
Currently the bot will only run on Linux. I believe it should run on most distros but I have only tested it on Arch and Raspbian.
## Usage
usage: [--dry-run] [--debug] [-w WAIT] [-t TIME] [-d DATE] [-c CONFIG]
[-o] [-h]
A bot for posting on Mastodon
optional arguments:
--dry-run Will not login or post to Plemora
--debug Same as --dry-run
-w WAIT, --wait WAIT Wait before posting first image (seconds)
-t TIME, --time TIME Wait for time before posting first image
-d DATE, --date DATE Wait for date before posting first image
-c CONFIG, --config CONFIG
Set custom config file (Default: cfg)
-o, --output-hashes Output list of hashes
-h, --help Show this help message and exit
TIME %I:%M%p 03:58PM
DATE %m/%d/%Y 11/26/2021
## About
Yandere Lewd Bot is designed to be simple to understand (from a technical perspective) and easy to modify/extend. Yandere Lewd Bot relies on GNU formatted hash files to configure and organize posts (from the GNU coreutils package):
(ex. `26d00a6ee5ae8fa3d58ee44b32b8992f *./rsc/nsfw/image.jpg`)
The bot is still in development and can easily be crashed by writing a poorly configured `` file, or messing with the hash files it uses to configure posts.
## Installing
To setup the python3 environment (for Arch Linux) run the following commands:
sudo pacman -Syu
sudo pacman -S --needed python git
git clone ''
cd yandereLewdBot/
python -m venv venv
source ./venv/bin/activate
pip install --upgrade pip
pip install -r requierements.txt
cp ./bin/ ./bin/
<chmod.txt xargs -i chmod +x '{}'
./ -h
If everything worked correctly you should see usage information on Yandere Lewd Bot.
The ./ file will automatically activate and deactive the python virtual environment. You may want add a symlink to `/usr/local/bin` for convienence.
sudo ln -s "$(pwd)/" "/usr/local/bin/yandereLewdBot"
yandereLewdBot -h
For Debian or Ubuntu based distros I believe you need to change python with python3 for the package and command name `¯\_(ツ)_/¯`
## Generating your OAuth Tokens
Before you can begin posting from the bot, you must first create an account on the instance of your choice, and then generate your OAuth tokens.
To generate your tokens, run the following commands and follow the interactive prompts.
source ./venv/bin/activate
# Follow the interactive prompts.
**NOTE:** Yandere Lewd Bot only requieres write permissions. When prompted you should accept the default permission unless extending the bot's default functionality.
If you did everything correctly you should see `Success! :)` at the end of the terminal, as well as your credentials in the format below:
settings_server = OrderedDict([
"app_name": "app",
"api_base_url": "",
"client_id": "Long String of Text",
"client_secret": "Long String of Text",
"access_token": "Long String of Text"
settings_reminder = "09/20/2020 02:58PM"
settings_encrypt = {
"encrypt": False,
"salt": "",
If you used encryption, everything in the `settings_server` dictionary will be encrypted and you will have to enter your password every time you start the bot. Your `settings_encrypt` will look like this:
settings_encrypt = {
"encrypt": True,
"salt": "A Short String of Text",
Copy and paste these values from the terminal into your `bin/` file. **Make sure you paste over or delete the placeholder values with the same names.**
## Posting
To begin posting:
- Make sure you already have a Pleroma account set up .
- Generate your OAuth tokens following the guide above.
- Put some images in the rsc/ folder (organize by safe and nsfw posts).
- 'safe' posts will be marked with `#yandere`
- 'nsfw' posts will be spoilered and marked as `#yandere #nsfw`
- cd into the root directory (the directory with ``)
- Start the bot with `./`
- Enter your password if you encrypted your OAuth tokens.
- If you symlinked the file in the [Installing](##installing) section you can simply run `yandereLewdBot` instead of changing directories and starting the bot with `./`
- If everything worked correctly, you should see your first image posted to your account.
- Hit Ctrl+C to stop the bot.
- When you start the bot again, it will begin posting from where it last left off.
- It accomplishes this by writing to a `md5/blacklist.txt` file. If you see this file, it worked correctly.
- Keep in mind that the bot's default visibility setting is set to unlisted for testing. You may want to update it to public.
- Read [docs/]( to customize the bot for your purposes.
# Useful Tips and Commands
## Quickly Sum the Entire rsc/ Folder
To quickly sum everything in the `rsc/` folder, run the following commands:
cd yandereLewdBot/
find "./rsc" -type f -exec md5sum -b {} \; > "./md5/master_file.txt"
## Installing Screen
Most likely, you will want this bot to run over a long period of time. It is probobly best to install the `screen` package for Linux so you don't have to have a terminal window constantly open (especially if you are running it from a remote machine such as a Raspberry Pi or server).
sudo pacman -Syu
sudo pacman -S --needed screen
# Screen cheatsheet
# To create a new screen session named 'yandereLewdBot'
screen -S yandereLewdBot
# To detach from the screen session
Ctrl+a d
# To reconnect to the screen session
screen -r yandereLewdBot
# List screen sessions
screen -ls
# Kill the screen session
Ctrl+a k y
# Scroll back
Ctrl+a ESC Page Up
# End scroll back
## A More Useful rsc/ Folder Structure
A more advanced (and useful) way of organizing images in the `rsc/` folder is by date added, group (such as a character name like Yuno Gasai), and safe/nsfw profiles. To do this, the following folder structure would be used: `rsc/`
To configure this in the `settings_post` section, use the following for safe/nsfw repectivly:
setup_safe_profile("", "./rsc/*/Yuno/safe/*", "#Yuno_Gasai #yandere")
setup_nsfw_profile("Yuno.nsfw", "./rsc/*/Yuno/nsfw/*", "#Yuno_Gasai #nsfw #yandere")
Notice the wildcard character `'*'` is used in place of This will match any folder in the resource folder, and all of our folders will be in order of when we added them.
## A Master Blacklist
The default configuration file has the following settings below (these files will not exist by default. The bot will simply ignore blacklist hash files that it can't open). The _r stands for read and the _w stands for write. This will ensure that the master_blacklist.txt is always read, but is never written to. This is intended for the user to manually add GNU formatted hashes to master_blacklist.txt in case they want to make especially sure that hashes and paths that are matched in the list are never uploaded accidentally.
"master_blacklist_r": ("./md5/blacklist.txt", "./md5/master_blacklist.txt"),
"master_blacklist_w": ("./md5/blacklist.txt",),
## Quickly Create Configuration Files
Configuration files for Yandere Lewd Bot are just python files. Because of this we can easily create multiple configurations by importing the main configuration file, and overriding the values we need. This is useful for creating holiday and debug configurations without needing to create a new configuration file from scratch.
# Name:
# Post to an alt account using the exact same settings as the main configuration file
# Run with the following command: ./ -c alt
# is the default configuration. You should set this to whatever you are overriding.
from cfg import *
settings_server = OrderedDict([
("app_name", "generate from ./bin/"),
("api_base_url", "generate from ./bin/"),
("client_id", "generate from ./bin/"),
("client_secret", "generate from ./bin/"),
("access_token", "generate from ./bin/")
settings_reminder = "generate from ./bin/"
settings_encrypt = {
"encrypt": False,
"salt": "generate from ./bin/"
settings_behavior["master_blacklist_r"] = ("./md5/blacklist_alt.txt", "./md5/master_blacklist.txt")
settings_behavior["master_blacklist_w"] = ("./md5/blacklist_alt.txt",)
# Uncommenting the below might be useful if your alt is used for debugging and testing
# settings_behavior["master_list"] = ("master_file_alt.txt",)
# settings_behavior["visibility"] = "private"
# settings_behavior["debug"] = True
# settings_behavior["master_blacklist_w"] = tuple()
# Name:
# An xmas themed config file!
# Run with the following command: ./ -c xmas
# is the default configuration. You should set this to whatever you are overriding.
from cfg import *
settings_behavior["master_list"] = ("./md5/event_xmas.txt",)
settings_behavior["master_blacklist_r"] = ("./md5/blacklist_xmas.txt", "master_blacklist.txt")
settings_behavior["master_blacklist_w"] = ("./md5/blacklist_xmas.txt",)
# Prepend '#Merry #Christmas' to the beginning of the first line in each post
for setting in settings_post:
if len(setting["message"]) and issubclass(type(setting["message"][0]), str):
new_msg = "#Merry #Christmas {}".format(setting["message"][0])
setting["message"] = (new_msg,) + setting["message"][1:]

# Configuration
Yandere Lewd Bot uses regular python files to configure posts. The default configuration is ``
Included in the file are three [helper functions](#setting-up-profiles-in-a-single-line) to quickly configure posts. These are located at the top of the configuration file.
Below is a detailed description of each parameter in the configuration file.
## setting_server
Paste the generated credentials from `` in the fields below.
**`app_name:`** The name of the application.
**`api_base_url:`** The URL of the instance.
**`client_id:`** The client ID generated by ``
**`client_secret:`** The client secret generated by ``
**`access_token:`** The client token generated by ``
**NOTE:** If you enabled encryption when generating your tokens, the above should be long strings of seemingly incoherent text.
## settings_reminder
A setting to remind the user to regenerate their OAuth tokens
**`settings_reminder:`** A date and time, formatted as `settings_time["long_date_format"]`. Used to remind the user to regenerate their OAuth tokens.
## settings_encrypt
Paste the generated credentials from `` in the fields below.
**`encrypt:`** If encryption is enabled this should be set to `True`
**`salt:`** The salt value generated by ``
## settings_behavior
This is the main configuration setting for the bot.
**`master_list:`** The master list(s) for the bot to post from. This should be a GNU formatted hash file. If multiple lists are specified they will be concatenated in order.
**`master_blacklist_r:`** The master blacklist(s) to be *read* when the bot starts. This should be a GNU formatted hash file. All hashes and paths specified in this setting will be blacklisted when the bot initializes and will not be posted.
**`master_blacklist_w:`** The master file(s) to *write* to when the bot makes a successful post. This will be a GNU formatted hash file.
**`max_size:`** The max upload size (in bytes). Files that are over will be excluded from posting. Set this to `None` if there is no limit (not recommended). If you find that the bot is continuously attempting to repost a post that is failing, and the connection is timing out, it usually signifies that you are attempting to upload a large file over a slow connection and getting dropped from the server. Reducing this value should fix the issue.
**`visibility:`** The visibility of the post ('public', 'unlisted', 'private', or 'direct').
**`feature_set:`** The feature set of the instance you are connecting to ('mainline', 'fedibird', or 'pleroma'). **This bot has only been tested with 'pleroma'.**
**`sleep_seconds:`** The time between posts (in seconds)
**`uploads_per_post:`** The number of times the bot should post after `sleep_seconds`
**`retry_seconds:`** The time the bot should wait before attempting to re-upload a post that failed (in seconds). This value is ignored if the bot experiences a FileNotFoundError (the file was deleted after the bot was started) or encounters a 413 error (file upload size is too large). In either of the two cases the bot will continue posting the next image in the list.
**`multi_media_ext:`** Files ending with this extension will be treated as a multi-upload list. All GNU formatted hashes contained within will be uploaded in a single post when picked by the bot. Setting this value to `None` or `False` will disable this behavior.
**`content_type:`** The content type of the post ('text/plain' (default), 'text/markdown', 'text/html', 'text/bbcode'). **This bot has only been tested with `'text/plain'` and `'text/markdown'`**.
**`content_newline:`** The newline character or string. This will be inserted between each line in the message. If `content_type` is `'text/plain'`, this value should be `'\n'`. If `content_type` is `'text/markdown'` you may need to experiment. On some instances the defualt `'\n'` works, on others you may need to change this to `'<br/>'`.
**`post_image_link:`** If set to `True` this will insert a direct media link at the end of the message for each media file uploaded. If `content_type` is `'text\markdown'` it will make the link actionable, otherwise it will simply be a plain text html link. Some instances add this link automatically, in which case this should be set to `False`. See `content_newline` and `content_type` to correctly configure actionable links.
**`post_random:`** Shuffle all items in `master_list` so that they post in random order.
**`atomic_saving:`** If set to `True`, when the bot blacklists a file it will create a temporary copy of the file(s) that it is writing to. It will write to the temp file, then write over the old file. This is basically a paranoid saving option in case your server loses power unexpectedly while the bot is writing data. Generally speaking setting this option to `True` is probably unnecessary and will result in excessive writes to the disk.
**`sync_save:`** If set to`True` the bot will synchronize data to the disk immediately when it writes to a blacklist file. Generally speaking this should be set to `True` in case the server looses power unexpectedly. It should be set to `False` you are posting a lot of images at once. You should also be aware that if `atomic_saving` is enabled, it will syncronize all writes to and from the temp file which is probobly excessive.
**`tmp_dir:`** The temporary directory to use if `atomic_saving` is set to `True`. The default location is `/var/tmp`.
**`debug:`** Set this to true for testing. This will prevent Yandere Lewd Bot from logging into the mastodon instance and asking for your encryption password (meaning that nothing will actually get posted to your account, but it will still do everything else).
**NOTE:** `master_list`, `master_blacklist_r`, and `master_blacklist_w` all take a tuple() of str(). Please see below for formatting help.
> To specify multiple files: `master_blacklist_r: ("./md5/blacklist.txt", "./md5/master_blacklist.txt"),`
> To specify a single file: `master_blacklist_r: ("./md5/blacklist.txt",),`
> To specify no file: `master_blacklist_r: tuple(),`
## settings_time
Time localization settings.
**`time_format:`** This is used to schedule posts using the `-t` switch. Referenced in `set_pretimer_hour()`
**`time_format_seconds:`** Unused. Should be the same as `time_format` but with a resolution in seconds.
**`date_format:`** This is used to schedule posts using the `-d` switch. Referenced in `set_pretimer_day()`
**`long_date_format:`** Time and date format. `settings_encrypt["reminder"]` Should be in this format. Referenced in `print_header_stats()` and `_sanity_warnings()`
**`long_date_seconds_format:`** Unused. Should be the same as `long_date_format` but with a resolution in seconds.
**`long_date_week:`** The same as `long_date_format` but with the day of the week specified. Referenced in `print_header_stats()`
## settings_post
Configure Yandere Lewd Bot profiles. In the default configuration, images are sorted by safe and nsfw profiles. This means that all 'safe' images should be located in `./rsc/safe/*` and all 'nsfw' images should be located in `./rsc/nsfw/*`. The bot will apply profiles based on it's file path using [Unix filename pattern matching](
**`settings_post:`** A tuple() containing all of the profile dictionaries that the bot can post. See `setup_profile()`, `setup_nsfw_profile()`, and `setup_safe_profile()` for help configuring custom profiles.
**`settings_post_default:`** The default post profile. If no profile can be matched for an item in the `master_list` it will default to the profile specified by this value. If the value is `None`, the bot will error out and output the offending line(s) from the hash file (recommended). If you want to apply a default post behavior for unmatched profiles, create a default setting with `setup_profile()` helper function. The Unix `path` keyword will be completely ignored.
# Setting Up Profiles
Each profile in `settings_post` contains a dictionary with the following keywords: `name`, `path`, `message`, and `spoiler`
Below is an explanation of each keyword:
* **`name:`** String value. The name of the profile. This is mainly used to help you identify what profile is being applied to posts. This information is printed by the bot, but is otherwise unused.
* **`path:`** String value. This should be a Unix path with wildcards. The bot will configure posts accordingly by using [Unix filename pattern matching]( implemented in the `fnmatch` python library.
* **`message:`** A tuple of strings. The message. Each string in the tuple will be placed on a new line.
* **`spoiler:`** Boolean value. Mark the media as sensitive. Use for nsfw or spoilers.
## Setting Up Profiles in a Single Line
At the top of the default configuration file are the following three helper functions to make setting up posts more intuative.
>**`setup_profile(name, path, spoiler, *message)`**
**`setup_nsfw_profiles(name, path, *message)`**
**`setup_safe_profiles(name, path, *message)`**
**`setup_profile(name, path, spoiler, *message)`**
Returns a post setting dictionary that contains all of the information about a single profile. Each parameter corresponds to the keywords listed above.
* **`name:`** The name of the profile.
* **`path:`** A Unix path with wildcards.
* **`spoiler:`** Mark the media as sensitive. Use for nsfw or spoilers.
* **`*message:`** The message. This is a variable length parameter (meaning that multiple parameters may be passed to it). Each line in your message should be passed as a separate string to the `*message` variable.
>**DO NOT** use the newline character `\n` to configure multiline messages. Instead pass each line as a separate string parameter: ex. `setup_profile("", "./rsc/safe/*", False, "First line in my message", "Second line in my message")`
> Each line in the example above will be placed on a new line.
**`setup_nsfw_profiles(name, path, *message)`**
Returns a post_setting dictionary that contains all of the information about a single **nsfw** profile.
**`setup_safe_profiles(name, path, *message)`**
Returns a post_setting dictionary that contains all of the information about a single **safe** profile.

#! /usr/bin/env bash
# Get the runtime path of the bot
ABS_PATH="$(readlink -f "$0")"
RUN_DIR="$(dirname "$ABS_PATH")"
# Relative paths to the virtual environment and
# cd into the bot's root path, set up the virtual environment, and run
cd "$RUN_DIR"
[ ! -f "$VENV" ] && echo "Virtual environment not found: ${VENV}" && cd - > /dev/null && exit 1
source "$VENV"
"$ENTRY" "$@"
# Cleanup
cd - > /dev/null

#! /usr/bin/env python
# Program Name: Name
# Purpose: Purpose
import getpass
import re
import sys
import datetime
import importlib
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonIOError
from collections import OrderedDict
import yanlib
class CreateAppError(Exception):
def get_client_id_and_secret(app, permissions, domain):
return Mastodon.create_app(app, scopes=permissions, api_base_url=domain)
except MastodonIOError:
raise CreateAppError("An error occurred. Make sure the domain name is correct.")
def get_api(client_id, client_secret, domain):
api = Mastodon(client_id, client_secret, api_base_url=domain)
return api
def get_token(api, email, password, permissions):
token = api.log_in(email, password, scopes=permissions)
return token
except MastodonIllegalArgumentError:
raise CreateAppError("Username or Password is incorrect.")
except MastodonAPIError:
raise CreateAppError("Could not grant scopes:", ", ".join(permissions))
def get_cfg(cfg_name):
cfg = importlib.import_module(cfg_name)
return cfg
except ImportError:
raise CreateAppError("Cannot import module:", cfg_name, "Make sure you omitted the .py extension and try again")
def package_settings(app, domain, client_id, client_secret, token):
settings_server = OrderedDict([
("app_name", app),
("api_base_url", domain),
("client_id", client_id),
("client_secret", client_secret),
("access_token", token)
return settings_server
def get_setting_reminder(fmt):
dt_now =
dt_now = dt_now.replace(year=dt_now.year + 1)
settings_reminder = dt_now.strftime(fmt)
return settings_reminder
def main():
# Create App
print("Generate and register Mastodon App")
print("You can just hit enter to accept the default for prompts that have them")
print("Ctrl+C to Quit\n")
# Get instance information
app = input("Enter your app name (Default: app): ") or "app"
domain = input("URL of Instance (Default: ") or ""
email = input("Enter your email: ")
password = getpass.getpass("Enter password: ")
print("Scopes: read, write, follow, push")
print("Separate each scope with a comma (example above).")
print("!!! Accept the default unless you intend to modify Yandere Lewd Bot !!!")
ans = input("Scopes (Default: write): ") or "write"
ans = re.sub(r"\s+", "", ans, flags=re.UNICODE)
permissions = ans.split(",")
print("Granting:", permissions)
# Begin logging in
client_id, client_secret = get_client_id_and_secret(app, permissions, domain)
api = get_api(client_id, client_secret, domain)
token = get_token(api, email, password, permissions)
# Get user setting for settings_time["long_date_format"]
# Needed for setting_reminder
print("What is the name of your main configuration file? Exclude the '.py' extension.")
ans = input("(Default: cfg): ") or "cfg"
cfg = get_cfg(ans)
# Credentials (unencrypted)
encrypt, salt = False, ""
settings_server = package_settings(app, domain, client_id, client_secret, token)
reminder = get_setting_reminder(cfg.settings_time["long_date_format"])
# Encrypt
ans = input("Do you want to encrypt your credentials (y/N)? ")
if ans.upper() in ("Y", "YES"):
import encryption
encrypt = True
salt, settings_server = encryption.settings_server_encrypt_cfg(settings_server)
settings_encrypt = OrderedDict([
("encrypt", encrypt),
("salt", salt),
# Output the user's new credentials
print("Copy to your config file!!!")
yanlib.pp_ordered_dict("settings_server", settings_server)
print('\nsettings_reminder = "{}"\n'.format(reminder))
yanlib.pp_dict("settings_encrypt", settings_encrypt)
return 0
except (KeyboardInterrupt, EOFError):
return 1
except CreateAppError as e:
return 2
except Exception as e:
print("Unhandled Exception!!", e)
return 3
if __name__ == "__main__":
status_code = main()
status_msg = {
0: "Success :)",
1: "User Quit :|",
2: "Error :(",
3: "Unhandled Exception ¯\\_(ツ)_/¯"

#! /usr/bin/env python
# Program Name: Encryption
# Purpose: Encrypt and store key
import sys
import base64
import os
import getpass
from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from collections import OrderedDict
class PasswordMismatch(Exception):
# Salts
def generate_salt():
return os.urandom(16)
# Return string from bytes
def salt_encode(b):
return base64.urlsafe_b64encode(b).decode()
# Return bytes from string
def salt_decode(s):
return base64.urlsafe_b64decode(s.encode())
# Ordered Dictionaries
def change_encoding_dict(settings_server, encoding_type):
return OrderedDict([(k, encoding_type(v)) for k, v in settings_server.items()])
# Return bytes from string
def encode_dict(settings_server):
return change_encoding_dict(settings_server, str.encode)
# Return string from bytes
def decode_dict(settings_server):
return change_encoding_dict(settings_server, bytes.decode)
# password: Bytes
# salt: Bytes
def derive_key(password, salt):
r_key = base64.urlsafe_b64encode(kdf.derive(password))
return r_key
# Encryption functions
# message: Bytes
# key: Bytes
def encrypt(message, key):
f = Fernet(key)
token = f.encrypt(message)
return token
# token: Bytes
# key: Bytes
def decrypt(token, key):
f = Fernet(key)
message = f.decrypt(token)
return message
# password: bytes()
# salt: bytes()
# settings_server: dict() -> Byte values
# encryption_function: encrypt(message, key) : decrypt(token, key):
# Returns settings_server_decrypted dictionary with Byte() values. Will need to use
# ChangeEncodingDict to make them strings (recommended cfg file friendly)
def __settings_server(password, salt, settings_server, encryption_function):
key = derive_key(password, salt)
settings_server_decrypted = OrderedDict()
for setting in settings_server:
settings_server_decrypted[setting] = encryption_function(settings_server[setting], key)
return settings_server_decrypted
# Returns (salt, settings_server)
def _settings_server_encrypt(settings_server):
salt = generate_salt()
password = getpass.getpass("Enter password: ")
password2 = getpass.getpass("Retype Password: ")
if password != password2:
raise PasswordMismatch
settings_server_encrypted = __settings_server(password.encode(), salt, encode_dict(settings_server), encrypt)
return salt, settings_server_encrypted
# Returns (settings_server)
def _settings_server_decrypt(settings_server, settings_encrypt):
settings_server_encoded = encode_dict(settings_server)
if settings_encrypt["encrypt"]:
salt = salt_decode(settings_encrypt["salt"])
password = getpass.getpass("Enter password: ")
return __settings_server(password.encode(), salt, settings_server_encoded, decrypt)
return settings_server_encoded
# Wrapper function that will catch exceptions and exit
def settings_server_new(function, **kwargs):
return function(**kwargs)
# If the user cancels the login
except KeyboardInterrupt:
# If the user passwords do not match (encrypt)
except PasswordMismatch:
print("Passwords do not match...")
# Incorrect password entered (decrypt)
except InvalidToken:
print("Password or Token Incorrect...")
# Probably the salt value got modified
except base64.binascii.Error:
print("Salt is invalid...")
# Some other kind of fuck up
except Exception as e:
print("Unknown exception occurred...")
# Exit if an exception was thrown
# Glue functions that package **kwargs automatically
def settings_server_encrypt(settings_server):
kwargs = {"settings_server": settings_server}
return settings_server_new(_settings_server_encrypt, **kwargs)
def settings_server_decrypt(settings_server, settings_encrypt):
kwargs = {
"settings_server": settings_server,
"settings_encrypt": settings_encrypt
return settings_server_new(_settings_server_decrypt, **kwargs)
# The _cfg functions should return a regular string
# These are the functions that should interface with the bot a return a plain string
# settings_server ordered dictionary
def settings_server_encrypt_cfg(settings_server):
salt, settings_server = settings_server_encrypt(settings_server)
return salt_encode(salt), decode_dict(settings_server)
def settings_server_decrypt_cfg(settings_server, settings_encrypt):
settings_server = settings_server_decrypt(settings_server, settings_encrypt)
return decode_dict(settings_server)
def main():
import argparse
import yanlib
default_cfg = "cfg"
parser = argparse.ArgumentParser(
description="A class to encrypt server credentials",
epilog="There are no additional parameters.",
add_help=True )
parser.add_argument("--encrypt", help="Generate encrypted authentication.", action="store_true")
parser.add_argument("--decrypt", help="Decrypt encrypted authentication", action="store_true")
parser.add_argument("--recrypt", help="Recrypt encrypted authentication", action="store_true")
parser.add_argument("-c", "--cfg", help="Specify config file.", default=default_cfg)
arguments = parser.parse_args()
if arguments.recrypt:
arguments.encrypt = True
arguments.decrypt = True
if arguments.encrypt or arguments.decrypt:
import importlib
cfg = importlib.import_module(arguments.cfg)
settings_server = cfg.settings_server
settings_encrypt = None
if arguments.decrypt and arguments.encrypt:
if arguments.decrypt: # arguments.decrypt
settings_server = settings_server_decrypt_cfg(cfg.settings_server, cfg.settings_encrypt)
settings_encrypt = OrderedDict([
("encrypt", False),
("salt", cfg.settings_encrypt["encrypt"])
if arguments.encrypt:
salt, settings_server = settings_server_encrypt_cfg(settings_server)
settings_encrypt = OrderedDict([
("encrypt", True),
("salt", salt)
yanlib.pp_ordered_dict("settings_server", settings_server)
yanlib.pp_dict("settings_encrypt", settings_encrypt)
return 0
#! /usr/bin/env python
import sys
import argparse
import signal
import yanlib
import yandereBot
import datetime
import contextlib
# A class that inherits from either YandereBot from yandereBot module
# This class is used to handle command line arguments gracefully, and extend functionality quickly
# Bot specific changes should be made here (if they are minor enough).
# This will be instantiated from the main() function
class YandereBot(yandereBot.YandereBot):
# The below settings are required from the configuration module
settings_time = None
settings_reminder = None
# Wait before running
delay_start = 0
def __init__(self, cfg, debug_mode, prime_bot, delay_d=None, delay_h=None, delay_s=None):
super(YandereBot, self).__init__(cfg, debug_mode, prime_bot=False)
self.load_settings(self.cfg, ("settings_time", "settings_reminder"))
self.set_pretimer(delay_d, delay_h, delay_s)
if self.debug_mode:
# Do not perform sanity test if stdout is None.
# input() will fail if stdout is None, which could happen with the following command
# ./run --dry-run -h -d 'a date in the past'
if sys.stdout is not None and not self.pass_sanity_test():
raise FailedSanityTest
if prime_bot:
def print_date_time_example(self):
print_fmt = " {0:6} {1:10} {2}"
time_fmt = self.settings_time["time_format"]
date_fmt = self.settings_time["date_format"]
current_time = self.dateSelection
"TIME", time_fmt, current_time.strftime(time_fmt)
"DATE", date_fmt, current_time.strftime(date_fmt)
def dump_pictures(self):
for ele in self.listPictures:
def set_delay_d(self, d):
t = datetime.datetime.strptime(d, self.settings_time["date_format"])
self.dateNextSelection = self.dateNextSelection.replace(
year=t.year, month=t.month,
except Exception:
print("Invalid date format: {}\n\nCorrect date/time format examples:".format(d))
raise DateWrongFormat
def set_delay_h(self, h, add_24):
t = datetime.datetime.strptime(h, self.settings_time["time_format"])
self.dateNextSelection = self.dateNextSelection.replace(
hour=t.hour, minute=t.minute, second=t.second, microsecond=t.microsecond
if self.dateNextSelection < self.dateSelection and add_24:
self.dateNextSelection = yanlib.time_add_seconds(self.dateNextSelection, 60 * 60 * 24)
except Exception:
print("Invalid time format: {}\n\nCorrect date/time format examples:".format(h))
raise TimeWrongFormat
def set_pretimer(self, d=None, h=None, s=0):
if d:
if h:
self.set_delay_h(h, d is None)
if s:
self.delay_start = max(0, s)
# Check for potential misconfigurations by the user
def pass_sanity_test(self):
# Calculate pre-timer value
seconds_until_next_pos = yanlib.time_diff_seconds(self.dateNextSelection, self.dateSelection)
# Possible misconfigurations that will prompt the user to continue
pretimer_less_than_zero = seconds_until_next_pos < 0
pretimer_greater_than_sleep = seconds_until_next_pos > self.settings_behavior["sleep_seconds"]
# Prompt the user
prompt_user = pretimer_less_than_zero or pretimer_greater_than_sleep
# Remind the user to generate new OAuth tokens
dt = datetime.datetime.strptime(self.settings_reminder, self.settings_time["long_date_format"])
if dt <
print("REMINDER: Generate new tokens!!")
# Check if the bot is back-posting in time and make sure this is what the user wanted to avoid spamming
if pretimer_less_than_zero:
sleep = round(abs(seconds_until_next_pos), 2)
images = round(sleep / (self.settings_behavior["sleep_seconds"] * self.settings_behavior["uploads_per_post"]), 2) + 1
print("WARNING: Pre-timer is less than the current time by: {} seconds. {} images will post immediately".format(
sleep, images
# Check if the bot will wait for longer than the default amount of sleep time configured in the
elif pretimer_greater_than_sleep:
print("WARNING: Pre-timer will sleep for {} seconds. This is more than the configured amount ({} seconds)".format(
round(seconds_until_next_pos, 2), self.settings_behavior["sleep_seconds"]
# Prompt the user if something doesn't seem right
# This must be done before we set up our keyboard interrupts. Otherwise the below exceptions will not work.
if prompt_user:
# Default to 'y' if the user just presses enter
ans = input("Do you want to continue [Y/n]? ") or "y"
return ans.lower() in ("y", "yes")
except (EOFError, KeyboardInterrupt):
return False
# Sanity test passed
return True
def start(self, delay=None):
_delay = delay or self.delay_start
return super(YandereBot, self).start(max(0, _delay))
# Custom exceptions
class TimeWrongFormat(Exception):
class DateWrongFormat(Exception):
class FailedSanityTest(Exception):
class FailedToLoadCfg(Exception):
# Entry point if run from the command line
def main():
# Default config file
default_cfg = "cfg"
# Parser
parser = argparse.ArgumentParser(
description="A bot for posting on Mastodon",
# epilog="All switches can be combined for greater control",
parser.add_argument("--dry-run", help="Will not login or post to Plemora", action="store_true")
parser.add_argument("--debug", help="Same as --dry-run", action="store_true")
parser.add_argument("-w", "--wait", type=int, help="Wait before posting first image (seconds)", default=0)
parser.add_argument("-t", "--time", help="Wait for time before posting first image", default=None)
parser.add_argument("-d", "--date", help="Wait for date before posting first image", default=None)
parser.add_argument("-c", "--config", help="Set custom config file (Default: {})".format(default_cfg), default=default_cfg)
parser.add_argument("-o", "--output-hashes", help="Output list of hashes", action="store_true")
parser.add_argument("-h", "--help", help="Show this help message and exit", action="store_true")
parser.add_argument("remainder", help=argparse.SUPPRESS, nargs=argparse.REMAINDER)
arguments = parser.parse_args()
# Redirect stdout when the bot first initializes if the bot is not going to run normally
redirect_stdout = None if arguments.output_hashes or else sys.stdout
# Yandere Lewd Bot
yandere = None
yandere_config = None
# Configuration file for Yandere Lewd Bot
import importlib
yandere_config = importlib.import_module(arguments.config)
except ImportError:
print("Failed to Load Configuration:", arguments.config)
raise FailedToLoadCfg
# Flag if the bot is running in debug mode
debug_mode = (arguments.dry_run or arguments.debug or arguments.output_hashes or
with contextlib.redirect_stdout(redirect_stdout):
prime_bot = not
yandere = YandereBot(
# Print Usage Information with Time and Date Formats with Examples
return 0
# Output all of the images in the bot's picture list
if arguments.output_hashes:
return 0
# Setup exit calls
# Must be done after we declare our bot(s), otherwise this will be called if quitting on decrypting settings )
def yandere_quit(signo, _frame):
print("\nInterrupted by {}, shutting down...".format(signo))
# Setup our exit calls
for sig in ("TERM", "HUP", "INT"):
signal.signal(getattr(signal, "SIG" + sig), yandere_quit)
return yandere.start()
if __name__ == "__main__":
# A return value of 0 or 1 is a normal exit
# Exceptions raised from the main function
except FailedToLoadCfg:
except FailedSanityTest:
except DateWrongFormat:
except TimeWrongFormat:
# Exceptions raised from the bot
except yandereBot.Debug:
except yandereBot.MissingMasterList:
except yandereBot.BadCfgFile:
except yandereBot.BadPostSettings:
#! /usr/bin/env python
# Program Name: Name
# Purpose: Purpose
import yanlib
import os
import datetime
import contextlib
import fnmatch
import math
from threading import Event
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
class YanBotHash(yanlib.HashObject):
_postSettings = None
def __init__(self, hash_obj, profile):
super(YanBotHash, self).__init__()
if hash_obj is None:
self._sHash = hash_obj.get_hash_string()
self._sBinaryChar = hash_obj.get_binary_char()
self._sPath = hash_obj.get_hash_path()
self._postSettings = profile
def get_post_setting(self):
return self._postSettings
class YandereBot:
# From the threading library. Is responsible for putting the bot to sleep, and exiting when the user quits (Ctrl+C)
eventSleep = Event()
# The configuration module
cfg = None
# The below settings are required from the configuration module
settings_server = None
settings_behavior = None
settings_time = None
settings_post = None
settings_post_default = None
settings_encrypt = None
# Class variables
mastodon_api = None
listPictures = []
lenBlacklist = 0
failed_uploads = 0
currentSessionCount = 0
debug_mode = False
primed = False
decrypted = False
# Time variables
dateSelection = None
dateNextSelection = None
# YandereBot.__init__()
# @param cfg A dynamically loaded python module. See yanlib.module_load() for an example
# @param debug_mode Should the bot run in debug mode (do not sign in or post to Pleroma)
# prime_bot Should the bot immediately prime itself (configure picture list and login, but don't post)
def __init__(self, cfg, debug_mode=False, prime_bot=True):
self.dateSelection =
self.dateNextSelection = self.dateSelection
self.cfg = cfg
self.debug_mode = debug_mode or self.settings_behavior["debug"]
if prime_bot:
# Setup Exit Calls
def exit(self):
# Make sure there are no profiles in listPictures set to none. Print the bad post and exit if there is.
def validate_post_settings(self):
bad_post_count = 0
for i in self.listPictures:
if i.get_post_setting() is None:
print("Bad post setting [{}]: {}".format(bad_post_count, i.get_full_string()))
bad_post_count += 1
if bad_post_count:
raise BadPostSettings
# Decryption settings
# Used to set class attributes with the same name to the value specified in the config file.
def load_settings(self, cfg, settings=None):
default_settings = (
_settings = settings or default_settings
for ele in _settings:
setattr(self, ele, getattr(cfg, ele))
except AttributeError as e:
raise BadCfgFile
def decrypt_settings(self):
if self.settings_encrypt["encrypt"] and not self.decrypted and not self.debug_mode:
import encryption
self.settings_server = encryption.settings_server_decrypt_cfg(self.settings_server, self.settings_encrypt)
self.decrypted = True
# Login to Pleroma
def login(self):
skip_login = self.debug_mode or self.mastodon_api is not None
if skip_login:
if not self.decrypted:
self.mastodon_api = Mastodon(
feature_set=self.settings_behavior["feature_set"] # <--- Necessary for Mastodon Version 1.5.1
except (MastodonIllegalArgumentError, MastodonVersionError) as e:
raise FailedLogin
# Set up lists
def read_blacklist_files(self):
list_blacklist = []
for i in self.settings_behavior["master_blacklist_r"]:
# It doesn't matter if the picture file doesn't exist
with contextlib.suppress(IOError):
return list_blacklist
def blacklist(self, picked):
self.lenBlacklist += 1
for f in self.settings_behavior["master_blacklist_w"]:
f, picked.get_full_string(),
# load_pictures will return a list of YanHashObj() with a blacklist(s) applied
# @param list_blacklist A list of HashObjects() that are blacklist hashes
def load_pictures(self, list_blacklist):
list_pictures = []
if not self.settings_behavior["master_list"]:
raise MissingMasterList
# Return a list of hashes with profiles
for f in self.settings_behavior["master_list"]:
add_list = get_list_of_hashes_with_profiles(f, self.settings_post, self.settings_post_default, get_profile)
except IOError as e:
raise MissingMasterList
return yanlib.get_hash_list_blacklist(list_pictures, list_blacklist, self.settings_behavior["max_size"])
def load_picture_list(self):
list_blacklist = self.read_blacklist_files()
self.listPictures = self.load_pictures(list_blacklist)
self.lenBlacklist = len(list_blacklist)
def shuffle_list(self):
if self.settings_behavior["post_random"]:
import random
# Maybe I should remove this from the backend?
def print_header_stats(self, picked, date_selection, date_next_selection):
_picked = picked.get_full_string() if picked else None
picked_profile = picked.get_post_setting()["name"] if picked else None
picked_next = self.listPictures[0].get_full_string() if self.listPictures else None
picked_next_profile = self.listPictures[0].get_post_setting()["name"] if self.listPictures else None
next_selection_seconds = max(0, int(yanlib.time_diff_seconds(date_next_selection, date_selection)))
n_posts_remain = math.ceil(len(self.listPictures) / self.settings_behavior["uploads_per_post"])
if date_selection != date_next_selection:
n_posts_remain -= 1
remaining_seconds = n_posts_remain * self.settings_behavior["sleep_seconds"]
date_end_selection = yanlib.time_add_seconds(date_selection, remaining_seconds + next_selection_seconds)
date_end_selection_seconds = max(0, yanlib.time_diff_seconds(date_end_selection, date_selection))
if date_selection != date_next_selection and picked is None:
date_end_selection_seconds += next_selection_seconds
d, h, m, s = yanlib.humanize_time_delta(date_end_selection_seconds)
print("[Profile]", picked_profile)
print("[Profile Next]", picked_next_profile)
print("Next:", picked_next) # There should always be something in picture_next list
print("Selection time: {}".format(
date_selection.strftime(self.settings_time["long_date_format"])) )
print("Next selection time: {} ({} seconds)".format(
date_next_selection.strftime(self.settings_time["long_date_format"]), next_selection_seconds) )
print("End selection time: {}".format(date_end_selection.strftime(
self.settings_time["long_date_week"])) )
print("Time Remaining: {} Days | {} Hours | {} Minutes | {} Seconds".format(d, h, m, s) )
print("[ {} Pictures | {} Blacklisted | {} Selected during session | {} Failed ]\n".format(
len(self.listPictures), self.lenBlacklist, self.currentSessionCount, self.failed_uploads) )
# Returns a list of media paths (without the hashes)
def get_media_list(self, picked):
ext = self.settings_behavior["multi_media_ext"]
if ext and os.path.splitext(picked.get_hash_path())[1].lower() == ext.lower():
return [i.get_hash_path() for i in yanlib.get_hash_list(picked.get_hash_path())]
return [picked.get_hash_path()]
# Returns a list of tuples that contain the media list path and media mastodon dictionary
def upload_media_list(self, path_list):
media_list = []
for ele in path_list:
if not self.debug_mode:
media = self.mastodon_api.media_post(ele, description=os.path.basename(ele))
media_list.append((ele, media))
return media_list
def get_post_text(self, picked, media_list):
content_type = self.settings_behavior["content_type"]
content_newline = self.settings_behavior["content_newline"]
static_message = content_newline.join(picked.get_post_setting()["message"])
string_post = ""
string_imglinks = []
if media_list and self.settings_behavior["post_image_link"]:
for ele in media_list:
path, media = ele
if path is None or media is None:
elif content_type == "text/markdown" and not self.debug_mode:
"[{}]({})".format(os.path.basename(path), media["url"])
# Join non empty strings with a newline character
string_imglinks_joined = content_newline.join(filter(None, string_imglinks))
string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined)))
return content_type, string_post
def _post(self, picked):
images = self.get_media_list(picked)
if not images:
raise ValueError("Media list is empty")
media_list = self.upload_media_list(images)
content_type, message = self.get_post_text(picked, media_list)
if self.debug_mode:
return picked
media_ids=[i[1] for i in media_list if len(i) == 2],
return picked
# The main post function
# This function is responsible for incrementing self.currentSessionCount, as well as posting and blacklisting the
# picked item.
# It is also responsible for removing the picked item from self.listPictures, which can be accomplished by simply
# popping it at index 0. This should handle any error that might occur while posting.
# This function should return 'None' if a post failed, and the picked item from self.listPictures if it succeeded.
def post(self):
picked = None
# Flags that are set if an upload fails
reinsert_image = False
timeout = False
# Attempt post
# Post
picked = self.listPictures.pop(0)
# After a successful post
self.currentSessionCount += 1
# The post was successful
return picked
# Attempted to post a file that doesn't exist (immediately repost ignoring retry_seconds)
except FileNotFoundError:
print("File not found:", picked.get_hash_path())
# Exception flags
reinsert_image, timeout = False, False
# Media list is empty
except ValueError as e:
print("Media list cannot be empty:", picked.get_hash_path())
# Exception flags
reinsert_image, timeout = False, False
# Check if the file limit has been reached
except MastodonAPIError as e:
# Check if the file limit has been reached (413 error)
file_limit_reached = False
with contextlib.suppress(IndexError):
file_limit_reached = (e.args[1] == 413)
print("API Error:", e)
# Exception flags
reinsert_image, timeout = not file_limit_reached, True
# Server Errors
# Assume all exceptions are on the server side
# If the connection is timing out it could be for two reasons:
# 1. The error was caused by the user attempting to upload a large file over a slow connection:
# a. FIX: Reduce settings_behavior["max_size"]
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
# API will not specify why the connection timed out).
# The default assumption is #2
except Exception as e:
print("Unhandled Exception:", e)
# Exception flags
reinsert_image, timeout = True, True
# An exception occurred
self.failed_uploads += 1
print("[Errors: {}] {}{}".format(self.failed_uploads, picked.get_full_string(), os.linesep))
if reinsert_image:
self.listPictures.insert(0, picked)
if timeout:
# The post failed
return None
def schedule_next_post(self):
self.dateSelection = self.dateNextSelection
self.dateNextSelection = yanlib.time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"])
# Will wait between the current time and the time of next selection
def wait_future_time(self):
seconds = yanlib.time_diff_seconds(self.dateNextSelection,
self.eventSleep.wait(max(0, seconds))
def prime_bot(self):
if not self.debug_mode:
if self.can_post():
self.primed = True
def start(self, delay=0):
# Prime bot if not already primed.
if not self.primed:
# Early out if the bot is incapable of posting
if not self.can_post():
print("Bot is incapable of posting!!")
return 1
start_time = self.dateSelection
delay_seconds = max(yanlib.time_diff_seconds(self.dateNextSelection, start_time) + delay, delay)
delay_time = yanlib.time_add_seconds(start_time, delay_seconds)
# Print the first image in the list if a delay or pretimer is set
if delay_seconds:
self.print_header_stats(None, start_time, delay_time)
# The delay parameter is different from the dateSelection and dateSelectionNext
# It will literally time out the bot for a given number of seconds regardless of the pre-timer setting
# This is useful if you want to set a delay of 30 seconds, before back-posting several images
self.eventSleep.wait(max(0, delay))
# Check if the pre-timer is set
# dateNextSelection should be greater than dateSelection if it is
# dateSelection and dateNextSelection are both set to the current time when the bot is initialized
if delay_seconds:
# Begin posting
# Return 1 if there are still pictures in the picture list
return len(self.listPictures) > 0
# End Conditions:
# 1. User presses Ctrl+C
# 2. There is nothing left in the picture list to select from
# 3. settings_behavior["uploads_per_post"] is less than one for some reason
def can_post(self):
return not self.eventSleep.is_set() and bool(len(self.listPictures)) and self.settings_behavior["uploads_per_post"] > 0
def main_loop(self):
target_posts = self.settings_behavior["uploads_per_post"]
while self.can_post():
successful_posts = 0
while (successful_posts < target_posts) and self.can_post():
last_picked =
successful_posts += int(last_picked is not None)
if successful_posts >= target_posts:
self.print_header_stats(last_picked, self.dateSelection, self.dateNextSelection)
self.print_header_stats(last_picked, self.dateNextSelection, self.dateNextSelection)
if self.can_post():
# A callback function for get_list_of_hashes_with_profiles() that returns a single profile from @param hash_obj
# @param hash_obj A HashObject() (or subclass)
# @param profiles A list of available profiles to match
# @param profiles_default The default profile to return if no profile is matched
def get_profile(hash_obj, profiles, profiles_default):
profile_gen = (x for x in profiles if fnmatch.fnmatch(hash_obj.get_hash_path(), x["path"]))
return next(profile_gen, profiles_default)
# Takes a file path and transforms it into a list of YanBotHash() with the appropriate profile
# @param f_name Path of hash file
# @param profiles List of profiles -> self.settings_post
# @param profiles_default The default profile to apply
# @param callback_get_profile Callback function -> should return a single profile. Default: get_profile()
def get_list_of_hashes_with_profiles(f_name, profiles, profiles_default, callback_get_profile):
r = []
with open(f_name, "r") as f:
for _line in f:
line = _line.strip()
if not yanlib.is_valid_hash(line):
hash_obj = yanlib.HashObject(line)
post_setting = callback_get_profile(hash_obj, profiles, profiles_default)
r.append(YanBotHash(hash_obj, post_setting))
return r
# Custom Exceptions for YandereBot
class Debug(Exception):
class BadPostSettings(Exception):
class FailedLogin(Exception):
class BadCfgFile(Exception):
class MissingMasterList(Exception):

#! /usr/bin/env python
# Program Name: yandereBot
# Purpose: A Simple Mastadon Bot to Post Images
# is required by
# This file contains functions and utilities that may be useful to external tools and programs that interface with
# yandereBot, or manipulate hash files in some way. Typically instantiating a yandereBot object is unnecessary for this.
import os
import datetime
import shutil
import contextlib
# Requires properly formatted MD5 checksum lines.
# The length of the string shouldn't matter.
# But it should be formatted like so: '8d20714ec6d3ee6d444931d01dd68626 *./rsc/file.png'
# The initializer does NOT check for the validity of the hash string. This should be done manually with is_valid_hash()
# ex. hashes = [HashObject(i) for i in hash_list if is_valid_hash(i)]
# See: get_hash_list() and get_hash_list_str() for functions to make the above example even more simple
class HashObject:
_sHash = ""
_sBinaryChar = ""
_sPath = ""
_sDeliminator = ' ' # This should never change
def __init__(self, s_hash=None):
if s_hash is not None:
def set_hash(self, s):
line = s.strip()
split_hash = line.split(self._sDeliminator, 1)
self._sHash = split_hash[0] # The hash value
self._sBinaryChar = split_hash[1][:1] # The binary character
self._sPath = split_hash[1][1:] # The path to the file
def get_hash_string(self):
return self._sHash
def get_hash_path(self):
return self._sPath
def get_binary_char(self):
return self._sBinaryChar
def get_deliminator(self):
return self._sDeliminator
def get_full_string(self):
return (
self._sHash +
self._sDeliminator +
self._sBinaryChar +
# -------------------------------- HASH FUNCTIONS --------------------------------------------
# For MD5 hashes
# Checking for the binary character when comparing hash string is usually overkill since the CoreUtils package reads
# files in binary mode regardless of whether the -b switch is passed.
# It is included here since it may be useful for some other applications besides the bot.
def is_matching_atter_binary(v_list, v_hash, v_atter):
atter_bin = "get_binary_char"
return next(
(x for x in v_list if
getattr(v_hash, v_atter)() == getattr(x, v_atter)() and
getattr(v_hash, atter_bin)() == getattr(x, atter_bin)()
), None)
def is_matching_atter(v_list, v_hash, v_atter):
return next((x for x in v_list if getattr(v_hash, v_atter)() == getattr(x, v_atter)()), None)
def get_matching(v_list, v_hash, atter, match_bin=False):
return is_matching_atter(v_list, v_hash, atter) if not match_bin else is_matching_atter_binary(v_list, v_hash, atter)
def get_matching_hash_in_list(v_list, v_hash, match_bin=False):
return get_matching(v_list, v_hash, "get_hash_string", match_bin)
def get_matching_path_in_list(v_list, v_hash, match_bin=False):
return get_matching(v_list, v_hash, "get_hash_path", match_bin)
def get_matching_full_string_in_list(v_list, v_hash, match_bin=False):
return get_matching(v_list, v_hash, "get_full_string", match_bin)
# ---------------------------- HASH LIST FUNCTIONS --------------------------------------------
# This is a very lazy check. We only care if the bot will crash when initializing a HashObject()
# A 'hash' of '1 *b' or '1 b' will pass in this function.
def is_valid_hash(s):
line = s.strip()
split_hash = line.find(' ')
# Check
return not (
line.startswith('#') or
len(line) < 4 or
split_hash in (-1, len(line) - 2) or
line[split_hash + 1] not in (' ', '*')
def get_hash_list_str(lst_str):
return [HashObject(i) for i in lst_str if is_valid_hash(i)]
def get_hash_list(f_name):
with open(f_name, "r") as f:
return get_hash_list_str(f.readlines())
def is_hash_blacklisted(line_hash, v_pictures, v_blacklist, max_size=None):
# Print out the entire line
line = line_hash.get_full_string()
full_path = line_hash.get_hash_path()
# Used for comparing file size
cmp_size = max_size is not None
# Only add it to the pictures list if the following conditions are met:
# NOTICE: This is going to be CPU intensive.
if get_matching_hash_in_list(v_pictures, line_hash): # No hash duplicates
print("Ignoring Duplicate Hash:", line)
elif get_matching_path_in_list(v_pictures, line_hash): # No path duplicates
print("Ignoring Duplicate Path:", line)
elif get_matching_hash_in_list(v_blacklist, line_hash): # File hash is not blacklisted
print("Ignoring Blacklisted Hash:", line)
elif get_matching_path_in_list(v_blacklist, line_hash): # File path is not blacklisted
print("Ignoring Blacklisted Path:", line)
elif not os.path.isfile(full_path): # File exists
print("File not found:", line)
elif cmp_size and os.path.getsize(full_path) > max_size: # Does not exceed max file limit (if max_size is set)
print("Exceeds Max File Size:", line)
return False
return True
def get_hash_list_blacklist(v_pictures, v_blacklist, max_size=None):
ret = []
for lineHash in v_pictures:
if not is_hash_blacklisted(lineHash, ret, v_blacklist, max_size):
return ret
# ------------------------------- TIME FUNCTIONS ---------------------------------------------
def time_add_seconds(dt, seconds):
return dt + datetime.timedelta(0, seconds)
def time_diff_seconds(d1, d2):
return (d1-d2).total_seconds()
def input_time_format(s):
return s.replace("-", "")
def humanize_time_delta(total_seconds):
m, s = divmod(int(total_seconds), 60)
h, m = divmod(m, 60)
d, h = divmod(h, 24)
return d, h, m, s
# -------------------------------- Pretty Print Functions ------------------------------------
# If value is a subclass of string, return it in parentheses (used for generating a python configuration file)
def _pp_str(v):
if issubclass(str, type(v)):
return '"{}"'.format(v)
return "{}".format(v)
# Prints out a dictionary in pretty print form as valid python code.
# setting_dict should consist of string formats with key value pairs defined in setting_dict ex. '{key} {value}'.
def _pp_dict(head, setting_dict, fmt, tail):
if head:
last_index = len(setting_dict) - 1
# i, k -> index and key value of setting_dict
for i, k in enumerate(setting_dict):
_fmt = fmt
_k = _pp_str(k)
_v = _pp_str(setting_dict[k])
# Add comma separator on all but the last value setting
if i < last_index:
_fmt += ","
print(_fmt.format(_k, _v))
if tail:
def pp_ordered_dict(setting, setting_dict):
"{} = OrderedDict([".format(setting),
def pp_dict(setting, setting_dict):
"{} = {{".format(setting),
# --------------------------------------------------------------------------------------------
def sync_to_disk(file_handle):
def append_file(f_name, s, atomic_saving=False, sync_save=False, tmp_dir="/var/tmp"):
file_handle = None
if atomic_saving:
import tempfile
file_handle = tempfile.NamedTemporaryFile(dir=tmp_dir)
with contextlib.suppress(IOError):
with open(f_name, "rb") as src:
shutil.copyfileobj(src, file_handle)
if sync_save:
file_handle = open(f_name, "ab")
file_handle.write(str.encode(s + os.linesep))
if sync_save:
if atomic_saving:
with open(f_name, "wb") as dst_file:
shutil.copyfileobj(file_handle, dst_file)
if sync_save: