Compare commits

...

5 Commits

5 changed files with 64 additions and 269 deletions

View File

@ -1,156 +0,0 @@
#! /usr/bin/env python3
# Yandere Lewd Bot, an image posting bot for Pleroma
# Copyright (C) 2022 Anon <yanderefedi@proton.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import getpass
import re
import sys
import datetime
import importlib
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonIOError
from collections import OrderedDict
import yanlib
class CreateAppError(Exception):
pass
def get_client_id_and_secret(app, permissions, domain):
try:
return Mastodon.create_app(app, scopes=permissions, api_base_url=domain)
except MastodonIOError:
raise CreateAppError("An error occurred. Make sure the domain name is correct.")
def get_api(client_id, client_secret, domain):
api = Mastodon(client_id, client_secret, api_base_url=domain)
return api
def get_token(api, email, password, permissions):
try:
token = api.log_in(email, password, scopes=permissions)
return token
except MastodonIllegalArgumentError:
raise CreateAppError("Username or Password is incorrect.")
except MastodonAPIError:
raise CreateAppError("Could not grant scopes:", ", ".join(permissions))
def get_cfg(cfg_name):
try:
cfg = importlib.import_module(cfg_name)
return cfg
except ImportError:
raise CreateAppError("Cannot import module:", cfg_name, "Make sure you omitted the .py extension and try again")
def package_settings(app, domain, client_id, client_secret, token):
settings_server = OrderedDict([
("app_name", app),
("api_base_url", domain),
("client_id", client_id),
("client_secret", client_secret),
("access_token", token)
])
return settings_server
def get_setting_reminder(fmt):
dt_now = datetime.datetime.now()
dt_now = dt_now.replace(year=dt_now.year + 1)
settings_reminder = dt_now.strftime(fmt)
return settings_reminder
def main():
# Create App
print("Generate and register Mastodon App")
print("You can just hit enter to accept the default for prompts that have them")
print("Ctrl+C to Quit\n")
try:
# Get instance information
app = input("Enter your app name (Default: app): ") or "app"
domain = input("URL of Instance (Default: https://yandere.cc): ") or "https://yandere.cc"
email = input("Enter your email: ")
password = getpass.getpass("Enter password: ")
print("Scopes: read, write, follow, push")
print("Separate each scope with a comma (example above).")
print("!!! Accept the default unless you intend to modify Yandere Lewd Bot !!!")
ans = input("Scopes (Default: write): ") or "write"
ans = re.sub(r"\s+", "", ans, flags=re.UNICODE)
permissions = ans.split(",")
print("Granting:", permissions)
# Begin logging in
client_id, client_secret = get_client_id_and_secret(app, permissions, domain)
api = get_api(client_id, client_secret, domain)
token = get_token(api, email, password, permissions)
# Get user setting for settings_time["long_date_format"]
# Needed for setting_reminder
print("What is the name of your main configuration file? Exclude the '.py' extension.")
ans = input("(Default: cfg): ") or "cfg"
cfg = get_cfg(ans)
# Credentials (unencrypted)
encrypt, salt = False, ""
settings_server = package_settings(app, domain, client_id, client_secret, token)
reminder = get_setting_reminder(cfg.settings_time["long_date_format"])
# Encrypt
ans = input("Do you want to encrypt your credentials (y/N)? ")
if ans.upper() in ("Y", "YES"):
import encryption
encrypt = True
salt, settings_server = encryption.settings_server_encrypt_cfg(settings_server)
settings_encrypt = OrderedDict([
("encrypt", encrypt),
("salt", salt),
])
# Output the user's new credentials
print("Copy to your config file!!!")
yanlib.pp_ordered_dict("settings_server", settings_server)
print('\nsettings_reminder = "{}"\n'.format(reminder))
yanlib.pp_dict("settings_encrypt", settings_encrypt)
return 0
except (KeyboardInterrupt, EOFError):
print("Quitting...")
return 1
except CreateAppError as e:
print(e)
return 2
except Exception as e:
print("Unhandled Exception!!", e)
return 3
if __name__ == "__main__":
status_code = main()
status_msg = {
0: "Success :)",
1: "User Quit :|",
2: "Error :(",
3: "Unhandled Exception ¯\\_(ツ)_/¯"
}
print(status_msg[status_code])
sys.exit(status_code)

View File

@ -191,7 +191,7 @@ def settings_server_decrypt_cfg(settings_server, settings_encrypt):
def main():
import argparse
import yanlib
from pprint import pformat
default_cfg = "cfg"
@ -235,8 +235,8 @@ def main():
("salt", salt)
])
yanlib.pp_ordered_dict("settings_server", settings_server)
yanlib.pp_dict("settings_encrypt", settings_encrypt)
print("settings_server = {}".format(pformat(settings_server)))
print("settings_encrypt = {}".format(pformat(settings_encrypt)))
return 0

View File

@ -19,7 +19,6 @@
import sys
import argparse
import signal
import yanlib
import yandereBot
import datetime
import contextlib
@ -89,7 +88,7 @@ class YandereBot(yandereBot.YandereBot):
hour=t.hour, minute=t.minute, second=t.second, microsecond=t.microsecond
)
if self.dateNextSelection < self.dateSelection and add_24:
self.dateNextSelection = yanlib.time_add_seconds(self.dateNextSelection, 60 * 60 * 24)
self.dateNextSelection = yandereBot.time_add_seconds(self.dateNextSelection, 60 * 60 * 24)
except Exception:
print("Invalid time format: {}\n\nCorrect date/time format examples:".format(h))
self.print_date_time_example()
@ -106,7 +105,7 @@ class YandereBot(yandereBot.YandereBot):
# Check for potential misconfigurations by the user
def pass_sanity_test(self):
# Calculate pre-timer value
seconds_until_next_pos = yanlib.time_diff_seconds(self.dateNextSelection, self.dateSelection)
seconds_until_next_pos = yandereBot.time_diff_seconds(self.dateNextSelection, self.dateSelection)
# Possible misconfigurations that will prompt the user to continue
pretimer_less_than_zero = seconds_until_next_pos < 0

View File

@ -23,6 +23,7 @@ import datetime
import contextlib
import fnmatch
import math
import shutil
from threading import Event
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
@ -156,7 +157,7 @@ class YandereBot:
def blacklist(self, picked):
self.lenBlacklist += 1
for f in self.settings_behavior["master_blacklist_w"]:
yanlib.append_file(
append_file(
f, picked.get_full_string(),
self.settings_behavior["atomic_saving"],
self.settings_behavior["sync_save"],
@ -197,18 +198,18 @@ class YandereBot:
picked_profile = picked.get_post_setting()["name"] if picked else None
picked_next = self.listPictures[0].get_full_string() if self.listPictures else None
picked_next_profile = self.listPictures[0].get_post_setting()["name"] if self.listPictures else None
next_selection_seconds = max(0, int(yanlib.time_diff_seconds(date_next_selection, date_selection)))
next_selection_seconds = max(0, int(time_diff_seconds(date_next_selection, date_selection)))
n_posts_remain = math.ceil(len(self.listPictures) / self.settings_behavior["uploads_per_post"])
if date_selection != date_next_selection:
n_posts_remain -= 1
remaining_seconds = n_posts_remain * self.settings_behavior["sleep_seconds"]
date_end_selection = yanlib.time_add_seconds(date_selection, remaining_seconds + next_selection_seconds)
date_end_selection_seconds = max(0, yanlib.time_diff_seconds(date_end_selection, date_selection))
date_end_selection = time_add_seconds(date_selection, remaining_seconds + next_selection_seconds)
date_end_selection_seconds = max(0, time_diff_seconds(date_end_selection, date_selection))
if date_selection != date_next_selection and picked is None:
date_end_selection_seconds += next_selection_seconds
d, h, m, s = yanlib.humanize_time_delta(date_end_selection_seconds)
d, h, m, s = humanize_time_delta(date_end_selection_seconds)
print("[Profile]", picked_profile)
print("[Profile Next]", picked_next_profile)
@ -360,11 +361,11 @@ class YandereBot:
def schedule_next_post(self):
self.dateSelection = self.dateNextSelection
self.dateNextSelection = yanlib.time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"])
self.dateNextSelection = time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"])
# Will wait between the current time and the time of next selection
def wait_future_time(self):
seconds = yanlib.time_diff_seconds(self.dateNextSelection, datetime.datetime.now())
seconds = time_diff_seconds(self.dateNextSelection, datetime.datetime.now())
self.eventSleep.wait(max(0, seconds))
# [BEGIN THE PROGRAM]
@ -389,8 +390,8 @@ class YandereBot:
return 1
start_time = self.dateSelection
delay_seconds = max(yanlib.time_diff_seconds(self.dateNextSelection, start_time) + delay, delay)
delay_time = yanlib.time_add_seconds(start_time, delay_seconds)
delay_seconds = max(time_diff_seconds(self.dateNextSelection, start_time) + delay, delay)
delay_time = time_add_seconds(start_time, delay_seconds)
# Print the first image in the list if a delay or pretimer is set
if delay_seconds:
@ -464,6 +465,55 @@ def get_list_of_hashes_with_profiles(f_name, profiles, profiles_default, callbac
return r
# ------------------------------- TIME FUNCTIONS ---------------------------------------------
def time_add_seconds(dt, seconds):
return dt + datetime.timedelta(0, seconds)
def time_diff_seconds(d1, d2):
return (d1-d2).total_seconds()
def humanize_time_delta(total_seconds):
m, s = divmod(int(total_seconds), 60)
h, m = divmod(m, 60)
d, h = divmod(h, 24)
return d, h, m, s
# --------------------------------------------------------------------------------------------
def sync_to_disk(file_handle):
file_handle.flush()
os.fsync(file_handle.fileno())
def append_file(f_name, s, atomic_saving=False, sync_save=False, tmp_dir="/var/tmp"):
file_handle = None
if atomic_saving:
import tempfile
file_handle = tempfile.NamedTemporaryFile(dir=tmp_dir)
with contextlib.suppress(IOError):
with open(f_name, "rb") as src:
shutil.copyfileobj(src, file_handle)
if sync_save:
sync_to_disk(file_handle)
else:
file_handle = open(f_name, "ab")
file_handle.write(str.encode(s + os.linesep))
if sync_save:
sync_to_disk(file_handle)
if atomic_saving:
file_handle.seek(0)
with open(f_name, "wb") as dst_file:
shutil.copyfileobj(file_handle, dst_file)
if sync_save:
sync_to_disk(dst_file)
file_handle.close()
# Custom Exceptions for YandereBot
class Debug(Exception):
pass

View File

@ -162,101 +162,3 @@ def get_hash_list_blacklist(v_pictures, v_blacklist, max_size=None):
if not is_hash_blacklisted(lineHash, ret, v_blacklist, max_size):
ret.append(lineHash)
return ret
# ------------------------------- TIME FUNCTIONS ---------------------------------------------
def time_add_seconds(dt, seconds):
return dt + datetime.timedelta(0, seconds)
def time_diff_seconds(d1, d2):
return (d1-d2).total_seconds()
def input_time_format(s):
return s.replace("-", "")
def humanize_time_delta(total_seconds):
m, s = divmod(int(total_seconds), 60)
h, m = divmod(m, 60)
d, h = divmod(h, 24)
return d, h, m, s
# -------------------------------- Pretty Print Functions ------------------------------------
# If value is a subclass of string, return it in parentheses (used for generating a python configuration file)
def _pp_str(v):
if issubclass(str, type(v)):
return '"{}"'.format(v)
else:
return "{}".format(v)
# Prints out a dictionary in pretty print form as valid python code.
# setting_dict should consist of string formats with key value pairs defined in setting_dict ex. '{key} {value}'.
def _pp_dict(head, setting_dict, fmt, tail):
if head:
print(head)
last_index = len(setting_dict) - 1
# i, k -> index and key value of setting_dict
for i, k in enumerate(setting_dict):
_fmt = fmt
_k = _pp_str(k)
_v = _pp_str(setting_dict[k])
# Add comma separator on all but the last value setting
if i < last_index:
_fmt += ","
print(_fmt.format(_k, _v))
if tail:
print(tail)
def pp_ordered_dict(setting, setting_dict):
_pp_dict(
"{} = OrderedDict([".format(setting),
setting_dict,
'\t({},\t{})',
"])")
def pp_dict(setting, setting_dict):
_pp_dict(
"{} = {{".format(setting),
setting_dict,
'\t{}:\t{}',
"}")
# --------------------------------------------------------------------------------------------
def sync_to_disk(file_handle):
file_handle.flush()
os.fsync(file_handle.fileno())
def append_file(f_name, s, atomic_saving=False, sync_save=False, tmp_dir="/var/tmp"):
file_handle = None
if atomic_saving:
import tempfile
file_handle = tempfile.NamedTemporaryFile(dir=tmp_dir)
with contextlib.suppress(IOError):
with open(f_name, "rb") as src:
shutil.copyfileobj(src, file_handle)
if sync_save:
sync_to_disk(file_handle)
else:
file_handle = open(f_name, "ab")
file_handle.write(str.encode(s + os.linesep))
if sync_save:
sync_to_disk(file_handle)
if atomic_saving:
file_handle.seek(0)
with open(f_name, "wb") as dst_file:
shutil.copyfileobj(file_handle, dst_file)
if sync_save:
sync_to_disk(dst_file)
file_handle.close()