Compare commits
5 Commits
30e2e1807a
...
bc439c1599
Author | SHA1 | Date | |
---|---|---|---|
bc439c1599 | |||
70a35cf47b | |||
740221df94 | |||
521087cbe8 | |||
24747be4ee |
@ -1,156 +0,0 @@
|
||||
#! /usr/bin/env python3
|
||||
|
||||
# Yandere Lewd Bot, an image posting bot for Pleroma
|
||||
# Copyright (C) 2022 Anon <yanderefedi@proton.me>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
import getpass
|
||||
import re
|
||||
import sys
|
||||
import datetime
|
||||
import importlib
|
||||
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonIOError
|
||||
from collections import OrderedDict
|
||||
import yanlib
|
||||
|
||||
|
||||
class CreateAppError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def get_client_id_and_secret(app, permissions, domain):
|
||||
try:
|
||||
return Mastodon.create_app(app, scopes=permissions, api_base_url=domain)
|
||||
except MastodonIOError:
|
||||
raise CreateAppError("An error occurred. Make sure the domain name is correct.")
|
||||
|
||||
|
||||
def get_api(client_id, client_secret, domain):
|
||||
api = Mastodon(client_id, client_secret, api_base_url=domain)
|
||||
return api
|
||||
|
||||
|
||||
def get_token(api, email, password, permissions):
|
||||
try:
|
||||
token = api.log_in(email, password, scopes=permissions)
|
||||
return token
|
||||
except MastodonIllegalArgumentError:
|
||||
raise CreateAppError("Username or Password is incorrect.")
|
||||
except MastodonAPIError:
|
||||
raise CreateAppError("Could not grant scopes:", ", ".join(permissions))
|
||||
|
||||
|
||||
def get_cfg(cfg_name):
|
||||
try:
|
||||
cfg = importlib.import_module(cfg_name)
|
||||
return cfg
|
||||
except ImportError:
|
||||
raise CreateAppError("Cannot import module:", cfg_name, "Make sure you omitted the .py extension and try again")
|
||||
|
||||
|
||||
def package_settings(app, domain, client_id, client_secret, token):
|
||||
settings_server = OrderedDict([
|
||||
("app_name", app),
|
||||
("api_base_url", domain),
|
||||
("client_id", client_id),
|
||||
("client_secret", client_secret),
|
||||
("access_token", token)
|
||||
])
|
||||
return settings_server
|
||||
|
||||
|
||||
def get_setting_reminder(fmt):
|
||||
dt_now = datetime.datetime.now()
|
||||
dt_now = dt_now.replace(year=dt_now.year + 1)
|
||||
settings_reminder = dt_now.strftime(fmt)
|
||||
return settings_reminder
|
||||
|
||||
|
||||
def main():
|
||||
# Create App
|
||||
print("Generate and register Mastodon App")
|
||||
print("You can just hit enter to accept the default for prompts that have them")
|
||||
print("Ctrl+C to Quit\n")
|
||||
|
||||
try:
|
||||
# Get instance information
|
||||
app = input("Enter your app name (Default: app): ") or "app"
|
||||
domain = input("URL of Instance (Default: https://yandere.cc): ") or "https://yandere.cc"
|
||||
email = input("Enter your email: ")
|
||||
password = getpass.getpass("Enter password: ")
|
||||
print("Scopes: read, write, follow, push")
|
||||
print("Separate each scope with a comma (example above).")
|
||||
print("!!! Accept the default unless you intend to modify Yandere Lewd Bot !!!")
|
||||
ans = input("Scopes (Default: write): ") or "write"
|
||||
ans = re.sub(r"\s+", "", ans, flags=re.UNICODE)
|
||||
permissions = ans.split(",")
|
||||
print("Granting:", permissions)
|
||||
|
||||
# Begin logging in
|
||||
client_id, client_secret = get_client_id_and_secret(app, permissions, domain)
|
||||
api = get_api(client_id, client_secret, domain)
|
||||
token = get_token(api, email, password, permissions)
|
||||
|
||||
# Get user setting for settings_time["long_date_format"]
|
||||
# Needed for setting_reminder
|
||||
print("What is the name of your main configuration file? Exclude the '.py' extension.")
|
||||
ans = input("(Default: cfg): ") or "cfg"
|
||||
cfg = get_cfg(ans)
|
||||
|
||||
# Credentials (unencrypted)
|
||||
encrypt, salt = False, ""
|
||||
settings_server = package_settings(app, domain, client_id, client_secret, token)
|
||||
reminder = get_setting_reminder(cfg.settings_time["long_date_format"])
|
||||
|
||||
# Encrypt
|
||||
ans = input("Do you want to encrypt your credentials (y/N)? ")
|
||||
if ans.upper() in ("Y", "YES"):
|
||||
import encryption
|
||||
encrypt = True
|
||||
salt, settings_server = encryption.settings_server_encrypt_cfg(settings_server)
|
||||
|
||||
settings_encrypt = OrderedDict([
|
||||
("encrypt", encrypt),
|
||||
("salt", salt),
|
||||
])
|
||||
|
||||
# Output the user's new credentials
|
||||
print("Copy to your config file!!!")
|
||||
yanlib.pp_ordered_dict("settings_server", settings_server)
|
||||
print('\nsettings_reminder = "{}"\n'.format(reminder))
|
||||
yanlib.pp_dict("settings_encrypt", settings_encrypt)
|
||||
|
||||
return 0
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
print("Quitting...")
|
||||
return 1
|
||||
except CreateAppError as e:
|
||||
print(e)
|
||||
return 2
|
||||
except Exception as e:
|
||||
print("Unhandled Exception!!", e)
|
||||
return 3
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
status_code = main()
|
||||
status_msg = {
|
||||
0: "Success :)",
|
||||
1: "User Quit :|",
|
||||
2: "Error :(",
|
||||
3: "Unhandled Exception ¯\\_(ツ)_/¯"
|
||||
}
|
||||
print(status_msg[status_code])
|
||||
sys.exit(status_code)
|
@ -191,7 +191,7 @@ def settings_server_decrypt_cfg(settings_server, settings_encrypt):
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
import yanlib
|
||||
from pprint import pformat
|
||||
|
||||
default_cfg = "cfg"
|
||||
|
||||
@ -235,8 +235,8 @@ def main():
|
||||
("salt", salt)
|
||||
])
|
||||
|
||||
yanlib.pp_ordered_dict("settings_server", settings_server)
|
||||
yanlib.pp_dict("settings_encrypt", settings_encrypt)
|
||||
print("settings_server = {}".format(pformat(settings_server)))
|
||||
print("settings_encrypt = {}".format(pformat(settings_encrypt)))
|
||||
return 0
|
||||
|
||||
|
||||
|
@ -19,7 +19,6 @@
|
||||
import sys
|
||||
import argparse
|
||||
import signal
|
||||
import yanlib
|
||||
import yandereBot
|
||||
import datetime
|
||||
import contextlib
|
||||
@ -89,7 +88,7 @@ class YandereBot(yandereBot.YandereBot):
|
||||
hour=t.hour, minute=t.minute, second=t.second, microsecond=t.microsecond
|
||||
)
|
||||
if self.dateNextSelection < self.dateSelection and add_24:
|
||||
self.dateNextSelection = yanlib.time_add_seconds(self.dateNextSelection, 60 * 60 * 24)
|
||||
self.dateNextSelection = yandereBot.time_add_seconds(self.dateNextSelection, 60 * 60 * 24)
|
||||
except Exception:
|
||||
print("Invalid time format: {}\n\nCorrect date/time format examples:".format(h))
|
||||
self.print_date_time_example()
|
||||
@ -106,7 +105,7 @@ class YandereBot(yandereBot.YandereBot):
|
||||
# Check for potential misconfigurations by the user
|
||||
def pass_sanity_test(self):
|
||||
# Calculate pre-timer value
|
||||
seconds_until_next_pos = yanlib.time_diff_seconds(self.dateNextSelection, self.dateSelection)
|
||||
seconds_until_next_pos = yandereBot.time_diff_seconds(self.dateNextSelection, self.dateSelection)
|
||||
|
||||
# Possible misconfigurations that will prompt the user to continue
|
||||
pretimer_less_than_zero = seconds_until_next_pos < 0
|
||||
|
@ -23,6 +23,7 @@ import datetime
|
||||
import contextlib
|
||||
import fnmatch
|
||||
import math
|
||||
import shutil
|
||||
from threading import Event
|
||||
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
|
||||
|
||||
@ -156,7 +157,7 @@ class YandereBot:
|
||||
def blacklist(self, picked):
|
||||
self.lenBlacklist += 1
|
||||
for f in self.settings_behavior["master_blacklist_w"]:
|
||||
yanlib.append_file(
|
||||
append_file(
|
||||
f, picked.get_full_string(),
|
||||
self.settings_behavior["atomic_saving"],
|
||||
self.settings_behavior["sync_save"],
|
||||
@ -197,18 +198,18 @@ class YandereBot:
|
||||
picked_profile = picked.get_post_setting()["name"] if picked else None
|
||||
picked_next = self.listPictures[0].get_full_string() if self.listPictures else None
|
||||
picked_next_profile = self.listPictures[0].get_post_setting()["name"] if self.listPictures else None
|
||||
next_selection_seconds = max(0, int(yanlib.time_diff_seconds(date_next_selection, date_selection)))
|
||||
next_selection_seconds = max(0, int(time_diff_seconds(date_next_selection, date_selection)))
|
||||
|
||||
n_posts_remain = math.ceil(len(self.listPictures) / self.settings_behavior["uploads_per_post"])
|
||||
if date_selection != date_next_selection:
|
||||
n_posts_remain -= 1
|
||||
|
||||
remaining_seconds = n_posts_remain * self.settings_behavior["sleep_seconds"]
|
||||
date_end_selection = yanlib.time_add_seconds(date_selection, remaining_seconds + next_selection_seconds)
|
||||
date_end_selection_seconds = max(0, yanlib.time_diff_seconds(date_end_selection, date_selection))
|
||||
date_end_selection = time_add_seconds(date_selection, remaining_seconds + next_selection_seconds)
|
||||
date_end_selection_seconds = max(0, time_diff_seconds(date_end_selection, date_selection))
|
||||
if date_selection != date_next_selection and picked is None:
|
||||
date_end_selection_seconds += next_selection_seconds
|
||||
d, h, m, s = yanlib.humanize_time_delta(date_end_selection_seconds)
|
||||
d, h, m, s = humanize_time_delta(date_end_selection_seconds)
|
||||
|
||||
print("[Profile]", picked_profile)
|
||||
print("[Profile Next]", picked_next_profile)
|
||||
@ -360,11 +361,11 @@ class YandereBot:
|
||||
|
||||
def schedule_next_post(self):
|
||||
self.dateSelection = self.dateNextSelection
|
||||
self.dateNextSelection = yanlib.time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"])
|
||||
self.dateNextSelection = time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"])
|
||||
|
||||
# Will wait between the current time and the time of next selection
|
||||
def wait_future_time(self):
|
||||
seconds = yanlib.time_diff_seconds(self.dateNextSelection, datetime.datetime.now())
|
||||
seconds = time_diff_seconds(self.dateNextSelection, datetime.datetime.now())
|
||||
self.eventSleep.wait(max(0, seconds))
|
||||
|
||||
# [BEGIN THE PROGRAM]
|
||||
@ -389,8 +390,8 @@ class YandereBot:
|
||||
return 1
|
||||
|
||||
start_time = self.dateSelection
|
||||
delay_seconds = max(yanlib.time_diff_seconds(self.dateNextSelection, start_time) + delay, delay)
|
||||
delay_time = yanlib.time_add_seconds(start_time, delay_seconds)
|
||||
delay_seconds = max(time_diff_seconds(self.dateNextSelection, start_time) + delay, delay)
|
||||
delay_time = time_add_seconds(start_time, delay_seconds)
|
||||
|
||||
# Print the first image in the list if a delay or pretimer is set
|
||||
if delay_seconds:
|
||||
@ -464,6 +465,55 @@ def get_list_of_hashes_with_profiles(f_name, profiles, profiles_default, callbac
|
||||
return r
|
||||
|
||||
|
||||
# ------------------------------- TIME FUNCTIONS ---------------------------------------------
|
||||
def time_add_seconds(dt, seconds):
|
||||
return dt + datetime.timedelta(0, seconds)
|
||||
|
||||
|
||||
def time_diff_seconds(d1, d2):
|
||||
return (d1-d2).total_seconds()
|
||||
|
||||
|
||||
def humanize_time_delta(total_seconds):
|
||||
m, s = divmod(int(total_seconds), 60)
|
||||
h, m = divmod(m, 60)
|
||||
d, h = divmod(h, 24)
|
||||
return d, h, m, s
|
||||
|
||||
# --------------------------------------------------------------------------------------------
|
||||
def sync_to_disk(file_handle):
|
||||
file_handle.flush()
|
||||
os.fsync(file_handle.fileno())
|
||||
|
||||
|
||||
def append_file(f_name, s, atomic_saving=False, sync_save=False, tmp_dir="/var/tmp"):
|
||||
file_handle = None
|
||||
if atomic_saving:
|
||||
import tempfile
|
||||
file_handle = tempfile.NamedTemporaryFile(dir=tmp_dir)
|
||||
with contextlib.suppress(IOError):
|
||||
with open(f_name, "rb") as src:
|
||||
shutil.copyfileobj(src, file_handle)
|
||||
if sync_save:
|
||||
sync_to_disk(file_handle)
|
||||
else:
|
||||
file_handle = open(f_name, "ab")
|
||||
|
||||
file_handle.write(str.encode(s + os.linesep))
|
||||
|
||||
if sync_save:
|
||||
sync_to_disk(file_handle)
|
||||
|
||||
if atomic_saving:
|
||||
file_handle.seek(0)
|
||||
with open(f_name, "wb") as dst_file:
|
||||
shutil.copyfileobj(file_handle, dst_file)
|
||||
if sync_save:
|
||||
sync_to_disk(dst_file)
|
||||
|
||||
file_handle.close()
|
||||
|
||||
|
||||
# Custom Exceptions for YandereBot
|
||||
class Debug(Exception):
|
||||
pass
|
||||
|
@ -162,101 +162,3 @@ def get_hash_list_blacklist(v_pictures, v_blacklist, max_size=None):
|
||||
if not is_hash_blacklisted(lineHash, ret, v_blacklist, max_size):
|
||||
ret.append(lineHash)
|
||||
return ret
|
||||
|
||||
|
||||
# ------------------------------- TIME FUNCTIONS ---------------------------------------------
|
||||
def time_add_seconds(dt, seconds):
|
||||
return dt + datetime.timedelta(0, seconds)
|
||||
|
||||
|
||||
def time_diff_seconds(d1, d2):
|
||||
return (d1-d2).total_seconds()
|
||||
|
||||
|
||||
def input_time_format(s):
|
||||
return s.replace("-", "")
|
||||
|
||||
|
||||
def humanize_time_delta(total_seconds):
|
||||
m, s = divmod(int(total_seconds), 60)
|
||||
h, m = divmod(m, 60)
|
||||
d, h = divmod(h, 24)
|
||||
return d, h, m, s
|
||||
|
||||
|
||||
# -------------------------------- Pretty Print Functions ------------------------------------
|
||||
# If value is a subclass of string, return it in parentheses (used for generating a python configuration file)
|
||||
def _pp_str(v):
|
||||
if issubclass(str, type(v)):
|
||||
return '"{}"'.format(v)
|
||||
else:
|
||||
return "{}".format(v)
|
||||
|
||||
|
||||
# Prints out a dictionary in pretty print form as valid python code.
|
||||
# setting_dict should consist of string formats with key value pairs defined in setting_dict ex. '{key} {value}'.
|
||||
def _pp_dict(head, setting_dict, fmt, tail):
|
||||
if head:
|
||||
print(head)
|
||||
last_index = len(setting_dict) - 1
|
||||
# i, k -> index and key value of setting_dict
|
||||
for i, k in enumerate(setting_dict):
|
||||
_fmt = fmt
|
||||
_k = _pp_str(k)
|
||||
_v = _pp_str(setting_dict[k])
|
||||
# Add comma separator on all but the last value setting
|
||||
if i < last_index:
|
||||
_fmt += ","
|
||||
print(_fmt.format(_k, _v))
|
||||
if tail:
|
||||
print(tail)
|
||||
|
||||
|
||||
def pp_ordered_dict(setting, setting_dict):
|
||||
_pp_dict(
|
||||
"{} = OrderedDict([".format(setting),
|
||||
setting_dict,
|
||||
'\t({},\t{})',
|
||||
"])")
|
||||
|
||||
|
||||
def pp_dict(setting, setting_dict):
|
||||
_pp_dict(
|
||||
"{} = {{".format(setting),
|
||||
setting_dict,
|
||||
'\t{}:\t{}',
|
||||
"}")
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------------------------
|
||||
def sync_to_disk(file_handle):
|
||||
file_handle.flush()
|
||||
os.fsync(file_handle.fileno())
|
||||
|
||||
|
||||
def append_file(f_name, s, atomic_saving=False, sync_save=False, tmp_dir="/var/tmp"):
|
||||
file_handle = None
|
||||
if atomic_saving:
|
||||
import tempfile
|
||||
file_handle = tempfile.NamedTemporaryFile(dir=tmp_dir)
|
||||
with contextlib.suppress(IOError):
|
||||
with open(f_name, "rb") as src:
|
||||
shutil.copyfileobj(src, file_handle)
|
||||
if sync_save:
|
||||
sync_to_disk(file_handle)
|
||||
else:
|
||||
file_handle = open(f_name, "ab")
|
||||
|
||||
file_handle.write(str.encode(s + os.linesep))
|
||||
|
||||
if sync_save:
|
||||
sync_to_disk(file_handle)
|
||||
|
||||
if atomic_saving:
|
||||
file_handle.seek(0)
|
||||
with open(f_name, "wb") as dst_file:
|
||||
shutil.copyfileobj(file_handle, dst_file)
|
||||
if sync_save:
|
||||
sync_to_disk(dst_file)
|
||||
|
||||
file_handle.close()
|
||||
|
Reference in New Issue
Block a user