Compare commits

..

No commits in common. "bc439c15999afebbf581802a6242ed2b3e79bd3a" and "30e2e1807a45ab9c35393411dc7d798fef81c243" have entirely different histories.

5 changed files with 269 additions and 64 deletions

156
src/create_app.py Executable file
View File

@ -0,0 +1,156 @@
#! /usr/bin/env python3
# Yandere Lewd Bot, an image posting bot for Pleroma
# Copyright (C) 2022 Anon <yanderefedi@proton.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import getpass
import re
import sys
import datetime
import importlib
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonIOError
from collections import OrderedDict
import yanlib
class CreateAppError(Exception):
pass
def get_client_id_and_secret(app, permissions, domain):
try:
return Mastodon.create_app(app, scopes=permissions, api_base_url=domain)
except MastodonIOError:
raise CreateAppError("An error occurred. Make sure the domain name is correct.")
def get_api(client_id, client_secret, domain):
api = Mastodon(client_id, client_secret, api_base_url=domain)
return api
def get_token(api, email, password, permissions):
try:
token = api.log_in(email, password, scopes=permissions)
return token
except MastodonIllegalArgumentError:
raise CreateAppError("Username or Password is incorrect.")
except MastodonAPIError:
raise CreateAppError("Could not grant scopes:", ", ".join(permissions))
def get_cfg(cfg_name):
try:
cfg = importlib.import_module(cfg_name)
return cfg
except ImportError:
raise CreateAppError("Cannot import module:", cfg_name, "Make sure you omitted the .py extension and try again")
def package_settings(app, domain, client_id, client_secret, token):
settings_server = OrderedDict([
("app_name", app),
("api_base_url", domain),
("client_id", client_id),
("client_secret", client_secret),
("access_token", token)
])
return settings_server
def get_setting_reminder(fmt):
dt_now = datetime.datetime.now()
dt_now = dt_now.replace(year=dt_now.year + 1)
settings_reminder = dt_now.strftime(fmt)
return settings_reminder
def main():
# Create App
print("Generate and register Mastodon App")
print("You can just hit enter to accept the default for prompts that have them")
print("Ctrl+C to Quit\n")
try:
# Get instance information
app = input("Enter your app name (Default: app): ") or "app"
domain = input("URL of Instance (Default: https://yandere.cc): ") or "https://yandere.cc"
email = input("Enter your email: ")
password = getpass.getpass("Enter password: ")
print("Scopes: read, write, follow, push")
print("Separate each scope with a comma (example above).")
print("!!! Accept the default unless you intend to modify Yandere Lewd Bot !!!")
ans = input("Scopes (Default: write): ") or "write"
ans = re.sub(r"\s+", "", ans, flags=re.UNICODE)
permissions = ans.split(",")
print("Granting:", permissions)
# Begin logging in
client_id, client_secret = get_client_id_and_secret(app, permissions, domain)
api = get_api(client_id, client_secret, domain)
token = get_token(api, email, password, permissions)
# Get user setting for settings_time["long_date_format"]
# Needed for setting_reminder
print("What is the name of your main configuration file? Exclude the '.py' extension.")
ans = input("(Default: cfg): ") or "cfg"
cfg = get_cfg(ans)
# Credentials (unencrypted)
encrypt, salt = False, ""
settings_server = package_settings(app, domain, client_id, client_secret, token)
reminder = get_setting_reminder(cfg.settings_time["long_date_format"])
# Encrypt
ans = input("Do you want to encrypt your credentials (y/N)? ")
if ans.upper() in ("Y", "YES"):
import encryption
encrypt = True
salt, settings_server = encryption.settings_server_encrypt_cfg(settings_server)
settings_encrypt = OrderedDict([
("encrypt", encrypt),
("salt", salt),
])
# Output the user's new credentials
print("Copy to your config file!!!")
yanlib.pp_ordered_dict("settings_server", settings_server)
print('\nsettings_reminder = "{}"\n'.format(reminder))
yanlib.pp_dict("settings_encrypt", settings_encrypt)
return 0
except (KeyboardInterrupt, EOFError):
print("Quitting...")
return 1
except CreateAppError as e:
print(e)
return 2
except Exception as e:
print("Unhandled Exception!!", e)
return 3
if __name__ == "__main__":
status_code = main()
status_msg = {
0: "Success :)",
1: "User Quit :|",
2: "Error :(",
3: "Unhandled Exception ¯\\_(ツ)_/¯"
}
print(status_msg[status_code])
sys.exit(status_code)

View File

@ -191,7 +191,7 @@ def settings_server_decrypt_cfg(settings_server, settings_encrypt):
def main(): def main():
import argparse import argparse
from pprint import pformat import yanlib
default_cfg = "cfg" default_cfg = "cfg"
@ -235,8 +235,8 @@ def main():
("salt", salt) ("salt", salt)
]) ])
print("settings_server = {}".format(pformat(settings_server))) yanlib.pp_ordered_dict("settings_server", settings_server)
print("settings_encrypt = {}".format(pformat(settings_encrypt))) yanlib.pp_dict("settings_encrypt", settings_encrypt)
return 0 return 0

View File

@ -19,6 +19,7 @@
import sys import sys
import argparse import argparse
import signal import signal
import yanlib
import yandereBot import yandereBot
import datetime import datetime
import contextlib import contextlib
@ -88,7 +89,7 @@ class YandereBot(yandereBot.YandereBot):
hour=t.hour, minute=t.minute, second=t.second, microsecond=t.microsecond hour=t.hour, minute=t.minute, second=t.second, microsecond=t.microsecond
) )
if self.dateNextSelection < self.dateSelection and add_24: if self.dateNextSelection < self.dateSelection and add_24:
self.dateNextSelection = yandereBot.time_add_seconds(self.dateNextSelection, 60 * 60 * 24) self.dateNextSelection = yanlib.time_add_seconds(self.dateNextSelection, 60 * 60 * 24)
except Exception: except Exception:
print("Invalid time format: {}\n\nCorrect date/time format examples:".format(h)) print("Invalid time format: {}\n\nCorrect date/time format examples:".format(h))
self.print_date_time_example() self.print_date_time_example()
@ -105,7 +106,7 @@ class YandereBot(yandereBot.YandereBot):
# Check for potential misconfigurations by the user # Check for potential misconfigurations by the user
def pass_sanity_test(self): def pass_sanity_test(self):
# Calculate pre-timer value # Calculate pre-timer value
seconds_until_next_pos = yandereBot.time_diff_seconds(self.dateNextSelection, self.dateSelection) seconds_until_next_pos = yanlib.time_diff_seconds(self.dateNextSelection, self.dateSelection)
# Possible misconfigurations that will prompt the user to continue # Possible misconfigurations that will prompt the user to continue
pretimer_less_than_zero = seconds_until_next_pos < 0 pretimer_less_than_zero = seconds_until_next_pos < 0

View File

@ -23,7 +23,6 @@ import datetime
import contextlib import contextlib
import fnmatch import fnmatch
import math import math
import shutil
from threading import Event from threading import Event
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
@ -157,7 +156,7 @@ class YandereBot:
def blacklist(self, picked): def blacklist(self, picked):
self.lenBlacklist += 1 self.lenBlacklist += 1
for f in self.settings_behavior["master_blacklist_w"]: for f in self.settings_behavior["master_blacklist_w"]:
append_file( yanlib.append_file(
f, picked.get_full_string(), f, picked.get_full_string(),
self.settings_behavior["atomic_saving"], self.settings_behavior["atomic_saving"],
self.settings_behavior["sync_save"], self.settings_behavior["sync_save"],
@ -198,18 +197,18 @@ class YandereBot:
picked_profile = picked.get_post_setting()["name"] if picked else None picked_profile = picked.get_post_setting()["name"] if picked else None
picked_next = self.listPictures[0].get_full_string() if self.listPictures else None picked_next = self.listPictures[0].get_full_string() if self.listPictures else None
picked_next_profile = self.listPictures[0].get_post_setting()["name"] if self.listPictures else None picked_next_profile = self.listPictures[0].get_post_setting()["name"] if self.listPictures else None
next_selection_seconds = max(0, int(time_diff_seconds(date_next_selection, date_selection))) next_selection_seconds = max(0, int(yanlib.time_diff_seconds(date_next_selection, date_selection)))
n_posts_remain = math.ceil(len(self.listPictures) / self.settings_behavior["uploads_per_post"]) n_posts_remain = math.ceil(len(self.listPictures) / self.settings_behavior["uploads_per_post"])
if date_selection != date_next_selection: if date_selection != date_next_selection:
n_posts_remain -= 1 n_posts_remain -= 1
remaining_seconds = n_posts_remain * self.settings_behavior["sleep_seconds"] remaining_seconds = n_posts_remain * self.settings_behavior["sleep_seconds"]
date_end_selection = time_add_seconds(date_selection, remaining_seconds + next_selection_seconds) date_end_selection = yanlib.time_add_seconds(date_selection, remaining_seconds + next_selection_seconds)
date_end_selection_seconds = max(0, time_diff_seconds(date_end_selection, date_selection)) date_end_selection_seconds = max(0, yanlib.time_diff_seconds(date_end_selection, date_selection))
if date_selection != date_next_selection and picked is None: if date_selection != date_next_selection and picked is None:
date_end_selection_seconds += next_selection_seconds date_end_selection_seconds += next_selection_seconds
d, h, m, s = humanize_time_delta(date_end_selection_seconds) d, h, m, s = yanlib.humanize_time_delta(date_end_selection_seconds)
print("[Profile]", picked_profile) print("[Profile]", picked_profile)
print("[Profile Next]", picked_next_profile) print("[Profile Next]", picked_next_profile)
@ -361,11 +360,11 @@ class YandereBot:
def schedule_next_post(self): def schedule_next_post(self):
self.dateSelection = self.dateNextSelection self.dateSelection = self.dateNextSelection
self.dateNextSelection = time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"]) self.dateNextSelection = yanlib.time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"])
# Will wait between the current time and the time of next selection # Will wait between the current time and the time of next selection
def wait_future_time(self): def wait_future_time(self):
seconds = time_diff_seconds(self.dateNextSelection, datetime.datetime.now()) seconds = yanlib.time_diff_seconds(self.dateNextSelection, datetime.datetime.now())
self.eventSleep.wait(max(0, seconds)) self.eventSleep.wait(max(0, seconds))
# [BEGIN THE PROGRAM] # [BEGIN THE PROGRAM]
@ -390,8 +389,8 @@ class YandereBot:
return 1 return 1
start_time = self.dateSelection start_time = self.dateSelection
delay_seconds = max(time_diff_seconds(self.dateNextSelection, start_time) + delay, delay) delay_seconds = max(yanlib.time_diff_seconds(self.dateNextSelection, start_time) + delay, delay)
delay_time = time_add_seconds(start_time, delay_seconds) delay_time = yanlib.time_add_seconds(start_time, delay_seconds)
# Print the first image in the list if a delay or pretimer is set # Print the first image in the list if a delay or pretimer is set
if delay_seconds: if delay_seconds:
@ -465,55 +464,6 @@ def get_list_of_hashes_with_profiles(f_name, profiles, profiles_default, callbac
return r return r
# ------------------------------- TIME FUNCTIONS ---------------------------------------------
def time_add_seconds(dt, seconds):
return dt + datetime.timedelta(0, seconds)
def time_diff_seconds(d1, d2):
return (d1-d2).total_seconds()
def humanize_time_delta(total_seconds):
m, s = divmod(int(total_seconds), 60)
h, m = divmod(m, 60)
d, h = divmod(h, 24)
return d, h, m, s
# --------------------------------------------------------------------------------------------
def sync_to_disk(file_handle):
file_handle.flush()
os.fsync(file_handle.fileno())
def append_file(f_name, s, atomic_saving=False, sync_save=False, tmp_dir="/var/tmp"):
file_handle = None
if atomic_saving:
import tempfile
file_handle = tempfile.NamedTemporaryFile(dir=tmp_dir)
with contextlib.suppress(IOError):
with open(f_name, "rb") as src:
shutil.copyfileobj(src, file_handle)
if sync_save:
sync_to_disk(file_handle)
else:
file_handle = open(f_name, "ab")
file_handle.write(str.encode(s + os.linesep))
if sync_save:
sync_to_disk(file_handle)
if atomic_saving:
file_handle.seek(0)
with open(f_name, "wb") as dst_file:
shutil.copyfileobj(file_handle, dst_file)
if sync_save:
sync_to_disk(dst_file)
file_handle.close()
# Custom Exceptions for YandereBot # Custom Exceptions for YandereBot
class Debug(Exception): class Debug(Exception):
pass pass

View File

@ -162,3 +162,101 @@ def get_hash_list_blacklist(v_pictures, v_blacklist, max_size=None):
if not is_hash_blacklisted(lineHash, ret, v_blacklist, max_size): if not is_hash_blacklisted(lineHash, ret, v_blacklist, max_size):
ret.append(lineHash) ret.append(lineHash)
return ret return ret
# ------------------------------- TIME FUNCTIONS ---------------------------------------------
def time_add_seconds(dt, seconds):
return dt + datetime.timedelta(0, seconds)
def time_diff_seconds(d1, d2):
return (d1-d2).total_seconds()
def input_time_format(s):
return s.replace("-", "")
def humanize_time_delta(total_seconds):
m, s = divmod(int(total_seconds), 60)
h, m = divmod(m, 60)
d, h = divmod(h, 24)
return d, h, m, s
# -------------------------------- Pretty Print Functions ------------------------------------
# If value is a subclass of string, return it in parentheses (used for generating a python configuration file)
def _pp_str(v):
if issubclass(str, type(v)):
return '"{}"'.format(v)
else:
return "{}".format(v)
# Prints out a dictionary in pretty print form as valid python code.
# setting_dict should consist of string formats with key value pairs defined in setting_dict ex. '{key} {value}'.
def _pp_dict(head, setting_dict, fmt, tail):
if head:
print(head)
last_index = len(setting_dict) - 1
# i, k -> index and key value of setting_dict
for i, k in enumerate(setting_dict):
_fmt = fmt
_k = _pp_str(k)
_v = _pp_str(setting_dict[k])
# Add comma separator on all but the last value setting
if i < last_index:
_fmt += ","
print(_fmt.format(_k, _v))
if tail:
print(tail)
def pp_ordered_dict(setting, setting_dict):
_pp_dict(
"{} = OrderedDict([".format(setting),
setting_dict,
'\t({},\t{})',
"])")
def pp_dict(setting, setting_dict):
_pp_dict(
"{} = {{".format(setting),
setting_dict,
'\t{}:\t{}',
"}")
# --------------------------------------------------------------------------------------------
def sync_to_disk(file_handle):
file_handle.flush()
os.fsync(file_handle.fileno())
def append_file(f_name, s, atomic_saving=False, sync_save=False, tmp_dir="/var/tmp"):
file_handle = None
if atomic_saving:
import tempfile
file_handle = tempfile.NamedTemporaryFile(dir=tmp_dir)
with contextlib.suppress(IOError):
with open(f_name, "rb") as src:
shutil.copyfileobj(src, file_handle)
if sync_save:
sync_to_disk(file_handle)
else:
file_handle = open(f_name, "ab")
file_handle.write(str.encode(s + os.linesep))
if sync_save:
sync_to_disk(file_handle)
if atomic_saving:
file_handle.seek(0)
with open(f_name, "wb") as dst_file:
shutil.copyfileobj(file_handle, dst_file)
if sync_save:
sync_to_disk(dst_file)
file_handle.close()