Compare commits
5 Commits
30e2e1807a
...
bc439c1599
Author | SHA1 | Date | |
---|---|---|---|
bc439c1599 | |||
70a35cf47b | |||
740221df94 | |||
521087cbe8 | |||
24747be4ee |
@ -1,156 +0,0 @@
|
|||||||
#! /usr/bin/env python3
|
|
||||||
|
|
||||||
# Yandere Lewd Bot, an image posting bot for Pleroma
|
|
||||||
# Copyright (C) 2022 Anon <yanderefedi@proton.me>
|
|
||||||
#
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
|
||||||
# it under the terms of the GNU General Public License as published by
|
|
||||||
# the Free Software Foundation, either version 3 of the License, or
|
|
||||||
# (at your option) any later version.
|
|
||||||
#
|
|
||||||
# This program is distributed in the hope that it will be useful,
|
|
||||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
# GNU General Public License for more details.
|
|
||||||
#
|
|
||||||
# You should have received a copy of the GNU General Public License
|
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
import getpass
|
|
||||||
import re
|
|
||||||
import sys
|
|
||||||
import datetime
|
|
||||||
import importlib
|
|
||||||
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonIOError
|
|
||||||
from collections import OrderedDict
|
|
||||||
import yanlib
|
|
||||||
|
|
||||||
|
|
||||||
class CreateAppError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def get_client_id_and_secret(app, permissions, domain):
|
|
||||||
try:
|
|
||||||
return Mastodon.create_app(app, scopes=permissions, api_base_url=domain)
|
|
||||||
except MastodonIOError:
|
|
||||||
raise CreateAppError("An error occurred. Make sure the domain name is correct.")
|
|
||||||
|
|
||||||
|
|
||||||
def get_api(client_id, client_secret, domain):
|
|
||||||
api = Mastodon(client_id, client_secret, api_base_url=domain)
|
|
||||||
return api
|
|
||||||
|
|
||||||
|
|
||||||
def get_token(api, email, password, permissions):
|
|
||||||
try:
|
|
||||||
token = api.log_in(email, password, scopes=permissions)
|
|
||||||
return token
|
|
||||||
except MastodonIllegalArgumentError:
|
|
||||||
raise CreateAppError("Username or Password is incorrect.")
|
|
||||||
except MastodonAPIError:
|
|
||||||
raise CreateAppError("Could not grant scopes:", ", ".join(permissions))
|
|
||||||
|
|
||||||
|
|
||||||
def get_cfg(cfg_name):
|
|
||||||
try:
|
|
||||||
cfg = importlib.import_module(cfg_name)
|
|
||||||
return cfg
|
|
||||||
except ImportError:
|
|
||||||
raise CreateAppError("Cannot import module:", cfg_name, "Make sure you omitted the .py extension and try again")
|
|
||||||
|
|
||||||
|
|
||||||
def package_settings(app, domain, client_id, client_secret, token):
|
|
||||||
settings_server = OrderedDict([
|
|
||||||
("app_name", app),
|
|
||||||
("api_base_url", domain),
|
|
||||||
("client_id", client_id),
|
|
||||||
("client_secret", client_secret),
|
|
||||||
("access_token", token)
|
|
||||||
])
|
|
||||||
return settings_server
|
|
||||||
|
|
||||||
|
|
||||||
def get_setting_reminder(fmt):
|
|
||||||
dt_now = datetime.datetime.now()
|
|
||||||
dt_now = dt_now.replace(year=dt_now.year + 1)
|
|
||||||
settings_reminder = dt_now.strftime(fmt)
|
|
||||||
return settings_reminder
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
# Create App
|
|
||||||
print("Generate and register Mastodon App")
|
|
||||||
print("You can just hit enter to accept the default for prompts that have them")
|
|
||||||
print("Ctrl+C to Quit\n")
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Get instance information
|
|
||||||
app = input("Enter your app name (Default: app): ") or "app"
|
|
||||||
domain = input("URL of Instance (Default: https://yandere.cc): ") or "https://yandere.cc"
|
|
||||||
email = input("Enter your email: ")
|
|
||||||
password = getpass.getpass("Enter password: ")
|
|
||||||
print("Scopes: read, write, follow, push")
|
|
||||||
print("Separate each scope with a comma (example above).")
|
|
||||||
print("!!! Accept the default unless you intend to modify Yandere Lewd Bot !!!")
|
|
||||||
ans = input("Scopes (Default: write): ") or "write"
|
|
||||||
ans = re.sub(r"\s+", "", ans, flags=re.UNICODE)
|
|
||||||
permissions = ans.split(",")
|
|
||||||
print("Granting:", permissions)
|
|
||||||
|
|
||||||
# Begin logging in
|
|
||||||
client_id, client_secret = get_client_id_and_secret(app, permissions, domain)
|
|
||||||
api = get_api(client_id, client_secret, domain)
|
|
||||||
token = get_token(api, email, password, permissions)
|
|
||||||
|
|
||||||
# Get user setting for settings_time["long_date_format"]
|
|
||||||
# Needed for setting_reminder
|
|
||||||
print("What is the name of your main configuration file? Exclude the '.py' extension.")
|
|
||||||
ans = input("(Default: cfg): ") or "cfg"
|
|
||||||
cfg = get_cfg(ans)
|
|
||||||
|
|
||||||
# Credentials (unencrypted)
|
|
||||||
encrypt, salt = False, ""
|
|
||||||
settings_server = package_settings(app, domain, client_id, client_secret, token)
|
|
||||||
reminder = get_setting_reminder(cfg.settings_time["long_date_format"])
|
|
||||||
|
|
||||||
# Encrypt
|
|
||||||
ans = input("Do you want to encrypt your credentials (y/N)? ")
|
|
||||||
if ans.upper() in ("Y", "YES"):
|
|
||||||
import encryption
|
|
||||||
encrypt = True
|
|
||||||
salt, settings_server = encryption.settings_server_encrypt_cfg(settings_server)
|
|
||||||
|
|
||||||
settings_encrypt = OrderedDict([
|
|
||||||
("encrypt", encrypt),
|
|
||||||
("salt", salt),
|
|
||||||
])
|
|
||||||
|
|
||||||
# Output the user's new credentials
|
|
||||||
print("Copy to your config file!!!")
|
|
||||||
yanlib.pp_ordered_dict("settings_server", settings_server)
|
|
||||||
print('\nsettings_reminder = "{}"\n'.format(reminder))
|
|
||||||
yanlib.pp_dict("settings_encrypt", settings_encrypt)
|
|
||||||
|
|
||||||
return 0
|
|
||||||
except (KeyboardInterrupt, EOFError):
|
|
||||||
print("Quitting...")
|
|
||||||
return 1
|
|
||||||
except CreateAppError as e:
|
|
||||||
print(e)
|
|
||||||
return 2
|
|
||||||
except Exception as e:
|
|
||||||
print("Unhandled Exception!!", e)
|
|
||||||
return 3
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
status_code = main()
|
|
||||||
status_msg = {
|
|
||||||
0: "Success :)",
|
|
||||||
1: "User Quit :|",
|
|
||||||
2: "Error :(",
|
|
||||||
3: "Unhandled Exception ¯\\_(ツ)_/¯"
|
|
||||||
}
|
|
||||||
print(status_msg[status_code])
|
|
||||||
sys.exit(status_code)
|
|
@ -191,7 +191,7 @@ def settings_server_decrypt_cfg(settings_server, settings_encrypt):
|
|||||||
|
|
||||||
def main():
|
def main():
|
||||||
import argparse
|
import argparse
|
||||||
import yanlib
|
from pprint import pformat
|
||||||
|
|
||||||
default_cfg = "cfg"
|
default_cfg = "cfg"
|
||||||
|
|
||||||
@ -235,8 +235,8 @@ def main():
|
|||||||
("salt", salt)
|
("salt", salt)
|
||||||
])
|
])
|
||||||
|
|
||||||
yanlib.pp_ordered_dict("settings_server", settings_server)
|
print("settings_server = {}".format(pformat(settings_server)))
|
||||||
yanlib.pp_dict("settings_encrypt", settings_encrypt)
|
print("settings_encrypt = {}".format(pformat(settings_encrypt)))
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
@ -19,7 +19,6 @@
|
|||||||
import sys
|
import sys
|
||||||
import argparse
|
import argparse
|
||||||
import signal
|
import signal
|
||||||
import yanlib
|
|
||||||
import yandereBot
|
import yandereBot
|
||||||
import datetime
|
import datetime
|
||||||
import contextlib
|
import contextlib
|
||||||
@ -89,7 +88,7 @@ class YandereBot(yandereBot.YandereBot):
|
|||||||
hour=t.hour, minute=t.minute, second=t.second, microsecond=t.microsecond
|
hour=t.hour, minute=t.minute, second=t.second, microsecond=t.microsecond
|
||||||
)
|
)
|
||||||
if self.dateNextSelection < self.dateSelection and add_24:
|
if self.dateNextSelection < self.dateSelection and add_24:
|
||||||
self.dateNextSelection = yanlib.time_add_seconds(self.dateNextSelection, 60 * 60 * 24)
|
self.dateNextSelection = yandereBot.time_add_seconds(self.dateNextSelection, 60 * 60 * 24)
|
||||||
except Exception:
|
except Exception:
|
||||||
print("Invalid time format: {}\n\nCorrect date/time format examples:".format(h))
|
print("Invalid time format: {}\n\nCorrect date/time format examples:".format(h))
|
||||||
self.print_date_time_example()
|
self.print_date_time_example()
|
||||||
@ -106,7 +105,7 @@ class YandereBot(yandereBot.YandereBot):
|
|||||||
# Check for potential misconfigurations by the user
|
# Check for potential misconfigurations by the user
|
||||||
def pass_sanity_test(self):
|
def pass_sanity_test(self):
|
||||||
# Calculate pre-timer value
|
# Calculate pre-timer value
|
||||||
seconds_until_next_pos = yanlib.time_diff_seconds(self.dateNextSelection, self.dateSelection)
|
seconds_until_next_pos = yandereBot.time_diff_seconds(self.dateNextSelection, self.dateSelection)
|
||||||
|
|
||||||
# Possible misconfigurations that will prompt the user to continue
|
# Possible misconfigurations that will prompt the user to continue
|
||||||
pretimer_less_than_zero = seconds_until_next_pos < 0
|
pretimer_less_than_zero = seconds_until_next_pos < 0
|
||||||
|
@ -23,6 +23,7 @@ import datetime
|
|||||||
import contextlib
|
import contextlib
|
||||||
import fnmatch
|
import fnmatch
|
||||||
import math
|
import math
|
||||||
|
import shutil
|
||||||
from threading import Event
|
from threading import Event
|
||||||
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
|
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
|
||||||
|
|
||||||
@ -156,7 +157,7 @@ class YandereBot:
|
|||||||
def blacklist(self, picked):
|
def blacklist(self, picked):
|
||||||
self.lenBlacklist += 1
|
self.lenBlacklist += 1
|
||||||
for f in self.settings_behavior["master_blacklist_w"]:
|
for f in self.settings_behavior["master_blacklist_w"]:
|
||||||
yanlib.append_file(
|
append_file(
|
||||||
f, picked.get_full_string(),
|
f, picked.get_full_string(),
|
||||||
self.settings_behavior["atomic_saving"],
|
self.settings_behavior["atomic_saving"],
|
||||||
self.settings_behavior["sync_save"],
|
self.settings_behavior["sync_save"],
|
||||||
@ -197,18 +198,18 @@ class YandereBot:
|
|||||||
picked_profile = picked.get_post_setting()["name"] if picked else None
|
picked_profile = picked.get_post_setting()["name"] if picked else None
|
||||||
picked_next = self.listPictures[0].get_full_string() if self.listPictures else None
|
picked_next = self.listPictures[0].get_full_string() if self.listPictures else None
|
||||||
picked_next_profile = self.listPictures[0].get_post_setting()["name"] if self.listPictures else None
|
picked_next_profile = self.listPictures[0].get_post_setting()["name"] if self.listPictures else None
|
||||||
next_selection_seconds = max(0, int(yanlib.time_diff_seconds(date_next_selection, date_selection)))
|
next_selection_seconds = max(0, int(time_diff_seconds(date_next_selection, date_selection)))
|
||||||
|
|
||||||
n_posts_remain = math.ceil(len(self.listPictures) / self.settings_behavior["uploads_per_post"])
|
n_posts_remain = math.ceil(len(self.listPictures) / self.settings_behavior["uploads_per_post"])
|
||||||
if date_selection != date_next_selection:
|
if date_selection != date_next_selection:
|
||||||
n_posts_remain -= 1
|
n_posts_remain -= 1
|
||||||
|
|
||||||
remaining_seconds = n_posts_remain * self.settings_behavior["sleep_seconds"]
|
remaining_seconds = n_posts_remain * self.settings_behavior["sleep_seconds"]
|
||||||
date_end_selection = yanlib.time_add_seconds(date_selection, remaining_seconds + next_selection_seconds)
|
date_end_selection = time_add_seconds(date_selection, remaining_seconds + next_selection_seconds)
|
||||||
date_end_selection_seconds = max(0, yanlib.time_diff_seconds(date_end_selection, date_selection))
|
date_end_selection_seconds = max(0, time_diff_seconds(date_end_selection, date_selection))
|
||||||
if date_selection != date_next_selection and picked is None:
|
if date_selection != date_next_selection and picked is None:
|
||||||
date_end_selection_seconds += next_selection_seconds
|
date_end_selection_seconds += next_selection_seconds
|
||||||
d, h, m, s = yanlib.humanize_time_delta(date_end_selection_seconds)
|
d, h, m, s = humanize_time_delta(date_end_selection_seconds)
|
||||||
|
|
||||||
print("[Profile]", picked_profile)
|
print("[Profile]", picked_profile)
|
||||||
print("[Profile Next]", picked_next_profile)
|
print("[Profile Next]", picked_next_profile)
|
||||||
@ -360,11 +361,11 @@ class YandereBot:
|
|||||||
|
|
||||||
def schedule_next_post(self):
|
def schedule_next_post(self):
|
||||||
self.dateSelection = self.dateNextSelection
|
self.dateSelection = self.dateNextSelection
|
||||||
self.dateNextSelection = yanlib.time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"])
|
self.dateNextSelection = time_add_seconds(self.dateSelection, self.settings_behavior["sleep_seconds"])
|
||||||
|
|
||||||
# Will wait between the current time and the time of next selection
|
# Will wait between the current time and the time of next selection
|
||||||
def wait_future_time(self):
|
def wait_future_time(self):
|
||||||
seconds = yanlib.time_diff_seconds(self.dateNextSelection, datetime.datetime.now())
|
seconds = time_diff_seconds(self.dateNextSelection, datetime.datetime.now())
|
||||||
self.eventSleep.wait(max(0, seconds))
|
self.eventSleep.wait(max(0, seconds))
|
||||||
|
|
||||||
# [BEGIN THE PROGRAM]
|
# [BEGIN THE PROGRAM]
|
||||||
@ -389,8 +390,8 @@ class YandereBot:
|
|||||||
return 1
|
return 1
|
||||||
|
|
||||||
start_time = self.dateSelection
|
start_time = self.dateSelection
|
||||||
delay_seconds = max(yanlib.time_diff_seconds(self.dateNextSelection, start_time) + delay, delay)
|
delay_seconds = max(time_diff_seconds(self.dateNextSelection, start_time) + delay, delay)
|
||||||
delay_time = yanlib.time_add_seconds(start_time, delay_seconds)
|
delay_time = time_add_seconds(start_time, delay_seconds)
|
||||||
|
|
||||||
# Print the first image in the list if a delay or pretimer is set
|
# Print the first image in the list if a delay or pretimer is set
|
||||||
if delay_seconds:
|
if delay_seconds:
|
||||||
@ -464,6 +465,55 @@ def get_list_of_hashes_with_profiles(f_name, profiles, profiles_default, callbac
|
|||||||
return r
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
# ------------------------------- TIME FUNCTIONS ---------------------------------------------
|
||||||
|
def time_add_seconds(dt, seconds):
|
||||||
|
return dt + datetime.timedelta(0, seconds)
|
||||||
|
|
||||||
|
|
||||||
|
def time_diff_seconds(d1, d2):
|
||||||
|
return (d1-d2).total_seconds()
|
||||||
|
|
||||||
|
|
||||||
|
def humanize_time_delta(total_seconds):
|
||||||
|
m, s = divmod(int(total_seconds), 60)
|
||||||
|
h, m = divmod(m, 60)
|
||||||
|
d, h = divmod(h, 24)
|
||||||
|
return d, h, m, s
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------------------------
|
||||||
|
def sync_to_disk(file_handle):
|
||||||
|
file_handle.flush()
|
||||||
|
os.fsync(file_handle.fileno())
|
||||||
|
|
||||||
|
|
||||||
|
def append_file(f_name, s, atomic_saving=False, sync_save=False, tmp_dir="/var/tmp"):
|
||||||
|
file_handle = None
|
||||||
|
if atomic_saving:
|
||||||
|
import tempfile
|
||||||
|
file_handle = tempfile.NamedTemporaryFile(dir=tmp_dir)
|
||||||
|
with contextlib.suppress(IOError):
|
||||||
|
with open(f_name, "rb") as src:
|
||||||
|
shutil.copyfileobj(src, file_handle)
|
||||||
|
if sync_save:
|
||||||
|
sync_to_disk(file_handle)
|
||||||
|
else:
|
||||||
|
file_handle = open(f_name, "ab")
|
||||||
|
|
||||||
|
file_handle.write(str.encode(s + os.linesep))
|
||||||
|
|
||||||
|
if sync_save:
|
||||||
|
sync_to_disk(file_handle)
|
||||||
|
|
||||||
|
if atomic_saving:
|
||||||
|
file_handle.seek(0)
|
||||||
|
with open(f_name, "wb") as dst_file:
|
||||||
|
shutil.copyfileobj(file_handle, dst_file)
|
||||||
|
if sync_save:
|
||||||
|
sync_to_disk(dst_file)
|
||||||
|
|
||||||
|
file_handle.close()
|
||||||
|
|
||||||
|
|
||||||
# Custom Exceptions for YandereBot
|
# Custom Exceptions for YandereBot
|
||||||
class Debug(Exception):
|
class Debug(Exception):
|
||||||
pass
|
pass
|
||||||
|
@ -162,101 +162,3 @@ def get_hash_list_blacklist(v_pictures, v_blacklist, max_size=None):
|
|||||||
if not is_hash_blacklisted(lineHash, ret, v_blacklist, max_size):
|
if not is_hash_blacklisted(lineHash, ret, v_blacklist, max_size):
|
||||||
ret.append(lineHash)
|
ret.append(lineHash)
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
||||||
# ------------------------------- TIME FUNCTIONS ---------------------------------------------
|
|
||||||
def time_add_seconds(dt, seconds):
|
|
||||||
return dt + datetime.timedelta(0, seconds)
|
|
||||||
|
|
||||||
|
|
||||||
def time_diff_seconds(d1, d2):
|
|
||||||
return (d1-d2).total_seconds()
|
|
||||||
|
|
||||||
|
|
||||||
def input_time_format(s):
|
|
||||||
return s.replace("-", "")
|
|
||||||
|
|
||||||
|
|
||||||
def humanize_time_delta(total_seconds):
|
|
||||||
m, s = divmod(int(total_seconds), 60)
|
|
||||||
h, m = divmod(m, 60)
|
|
||||||
d, h = divmod(h, 24)
|
|
||||||
return d, h, m, s
|
|
||||||
|
|
||||||
|
|
||||||
# -------------------------------- Pretty Print Functions ------------------------------------
|
|
||||||
# If value is a subclass of string, return it in parentheses (used for generating a python configuration file)
|
|
||||||
def _pp_str(v):
|
|
||||||
if issubclass(str, type(v)):
|
|
||||||
return '"{}"'.format(v)
|
|
||||||
else:
|
|
||||||
return "{}".format(v)
|
|
||||||
|
|
||||||
|
|
||||||
# Prints out a dictionary in pretty print form as valid python code.
|
|
||||||
# setting_dict should consist of string formats with key value pairs defined in setting_dict ex. '{key} {value}'.
|
|
||||||
def _pp_dict(head, setting_dict, fmt, tail):
|
|
||||||
if head:
|
|
||||||
print(head)
|
|
||||||
last_index = len(setting_dict) - 1
|
|
||||||
# i, k -> index and key value of setting_dict
|
|
||||||
for i, k in enumerate(setting_dict):
|
|
||||||
_fmt = fmt
|
|
||||||
_k = _pp_str(k)
|
|
||||||
_v = _pp_str(setting_dict[k])
|
|
||||||
# Add comma separator on all but the last value setting
|
|
||||||
if i < last_index:
|
|
||||||
_fmt += ","
|
|
||||||
print(_fmt.format(_k, _v))
|
|
||||||
if tail:
|
|
||||||
print(tail)
|
|
||||||
|
|
||||||
|
|
||||||
def pp_ordered_dict(setting, setting_dict):
|
|
||||||
_pp_dict(
|
|
||||||
"{} = OrderedDict([".format(setting),
|
|
||||||
setting_dict,
|
|
||||||
'\t({},\t{})',
|
|
||||||
"])")
|
|
||||||
|
|
||||||
|
|
||||||
def pp_dict(setting, setting_dict):
|
|
||||||
_pp_dict(
|
|
||||||
"{} = {{".format(setting),
|
|
||||||
setting_dict,
|
|
||||||
'\t{}:\t{}',
|
|
||||||
"}")
|
|
||||||
|
|
||||||
|
|
||||||
# --------------------------------------------------------------------------------------------
|
|
||||||
def sync_to_disk(file_handle):
|
|
||||||
file_handle.flush()
|
|
||||||
os.fsync(file_handle.fileno())
|
|
||||||
|
|
||||||
|
|
||||||
def append_file(f_name, s, atomic_saving=False, sync_save=False, tmp_dir="/var/tmp"):
|
|
||||||
file_handle = None
|
|
||||||
if atomic_saving:
|
|
||||||
import tempfile
|
|
||||||
file_handle = tempfile.NamedTemporaryFile(dir=tmp_dir)
|
|
||||||
with contextlib.suppress(IOError):
|
|
||||||
with open(f_name, "rb") as src:
|
|
||||||
shutil.copyfileobj(src, file_handle)
|
|
||||||
if sync_save:
|
|
||||||
sync_to_disk(file_handle)
|
|
||||||
else:
|
|
||||||
file_handle = open(f_name, "ab")
|
|
||||||
|
|
||||||
file_handle.write(str.encode(s + os.linesep))
|
|
||||||
|
|
||||||
if sync_save:
|
|
||||||
sync_to_disk(file_handle)
|
|
||||||
|
|
||||||
if atomic_saving:
|
|
||||||
file_handle.seek(0)
|
|
||||||
with open(f_name, "wb") as dst_file:
|
|
||||||
shutil.copyfileobj(file_handle, dst_file)
|
|
||||||
if sync_save:
|
|
||||||
sync_to_disk(dst_file)
|
|
||||||
|
|
||||||
file_handle.close()
|
|
||||||
|
Reference in New Issue
Block a user