Refactored to make use of backend module
This commit is contained in:
parent
ba2e859e71
commit
89381b76d4
177
src/main.py
177
src/main.py
@ -16,10 +16,177 @@
|
|||||||
# You should have received a copy of the GNU General Public License
|
# You should have received a copy of the GNU General Public License
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import os
|
||||||
import sys
|
import sys
|
||||||
import argparse
|
import argparse
|
||||||
|
import random
|
||||||
import signal
|
import signal
|
||||||
import yandere_bot
|
import FediBot
|
||||||
|
import importlib
|
||||||
|
import magic
|
||||||
|
import copy
|
||||||
|
|
||||||
|
class FileTooLarge(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidMimeType(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class YandereBot(FediBot.YandereBot):
|
||||||
|
currentSessionCount = 0
|
||||||
|
currentIndexCount = 0
|
||||||
|
currentProfileIndex = []
|
||||||
|
|
||||||
|
def __init__(self, cfg, keyfile=None, debug_mode=False):
|
||||||
|
settings = {
|
||||||
|
"settings_time": {},
|
||||||
|
"settings_post": {},
|
||||||
|
"settings_backend": {}
|
||||||
|
}
|
||||||
|
self.settings.update(settings)
|
||||||
|
super(YandereBot, self).__init__(cfg, keyfile, debug_mode)
|
||||||
|
random.seed(os.urandom(16))
|
||||||
|
self.currentProfileIndex = [0] * len(self.settings["settings_post"])
|
||||||
|
|
||||||
|
def print_header_stats(self, picked):
|
||||||
|
settings_post = self.settings["settings_post"]
|
||||||
|
profile = picked["profile"]["name"] if picked else None
|
||||||
|
url = picked["file_url"] if picked else None
|
||||||
|
path = picked["full_path"] if picked else None
|
||||||
|
nsfw = picked["nsfw"] if picked else None
|
||||||
|
|
||||||
|
posted_once = int(self.currentSessionCount > 0)
|
||||||
|
index = (self.currentIndexCount - posted_once) % len(settings_post)
|
||||||
|
state_print = copy.copy(self.currentProfileIndex)
|
||||||
|
state_print[index] = state_print[index] - posted_once
|
||||||
|
|
||||||
|
state_print = [state_print[i] % len(settings_post[i]) for i in range(0, len(self.currentProfileIndex))]
|
||||||
|
|
||||||
|
print("Profile: {} | Index: {} | NSFW: {} | Path: {} | URL: {}".format(
|
||||||
|
profile, index, nsfw, path, url
|
||||||
|
))
|
||||||
|
print("State: {}".format(','.join(map(str, state_print))))
|
||||||
|
|
||||||
|
|
||||||
|
# Returns a list of media paths (without the hashes)
|
||||||
|
def download_media(self, picked_profile):
|
||||||
|
try:
|
||||||
|
backend_s = picked_profile["backend"]
|
||||||
|
backend_credentials = self.settings["settings_backend"][backend_s]
|
||||||
|
backend = importlib.import_module(backend_credentials["module"])
|
||||||
|
|
||||||
|
downloader = backend.downloader(backend_credentials)
|
||||||
|
img = downloader.fetch_post(picked_profile)
|
||||||
|
|
||||||
|
if img is None:
|
||||||
|
raise FediBot.InvalidPost("Img could not be downloaded")
|
||||||
|
|
||||||
|
return downloader.download_post(img)
|
||||||
|
except ImportError:
|
||||||
|
print("Invalid Backend:", picked_profile["backend"])
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Returns a list of tuples that contain the media list path and media mastodon dictionary
|
||||||
|
def upload_media_list_validate(self, media_list):
|
||||||
|
# Check to make sure the paths in media_list actually exist
|
||||||
|
super(YandereBot, self).upload_media_list_validate(media_list)
|
||||||
|
|
||||||
|
# Validate picked
|
||||||
|
for path in media_list:
|
||||||
|
if not self.valid_mimetype(path):
|
||||||
|
raise InvalidMimeType("Invalid mime type")
|
||||||
|
elif not self.valid_file_size(path):
|
||||||
|
raise FileTooLarge("File is too large to upload")
|
||||||
|
|
||||||
|
|
||||||
|
def valid_mimetype(self, full_path):
|
||||||
|
mime = magic.from_file(full_path, mime=True)
|
||||||
|
|
||||||
|
if mime is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
mime_category = mime.split("/", 1)[0]
|
||||||
|
|
||||||
|
return mime_category in ("image", "video")
|
||||||
|
|
||||||
|
def get_message(self, picked):
|
||||||
|
nsfw = picked["nsfw"]
|
||||||
|
message = picked["profile"]["message_nsfw"] if nsfw else picked["profile"]["message"]
|
||||||
|
return message
|
||||||
|
|
||||||
|
def valid_file_size(self, full_path):
|
||||||
|
settings_behavior = self.settings["settings_behavior"]
|
||||||
|
max_size = settings_behavior["max_size"]
|
||||||
|
file_size = os.stat(full_path).st_size
|
||||||
|
return file_size <= max_size
|
||||||
|
|
||||||
|
|
||||||
|
def pick_index(self, mode, current_index, length):
|
||||||
|
if mode == "random":
|
||||||
|
return random.randint(0, length - 1)
|
||||||
|
elif mode == "sequential":
|
||||||
|
return current_index % length
|
||||||
|
|
||||||
|
|
||||||
|
def pick_profile(self):
|
||||||
|
settings_behavior = self.settings["settings_behavior"]
|
||||||
|
# Get x and y
|
||||||
|
mode = settings_behavior["tag_select"].lower()
|
||||||
|
posts = self.settings["settings_post"]
|
||||||
|
x = self.pick_index(mode, self.currentIndexCount, len(posts))
|
||||||
|
y = self.pick_index(mode, self.currentProfileIndex[x], len(posts[x]))
|
||||||
|
|
||||||
|
# Return the Profile
|
||||||
|
return x, y
|
||||||
|
|
||||||
|
def pick(self):
|
||||||
|
settings_post = self.settings["settings_post"]
|
||||||
|
x, y = self.pick_profile()
|
||||||
|
picked_profile = settings_post[x][y]
|
||||||
|
|
||||||
|
picked = self.download_media(picked_profile)
|
||||||
|
media_list = [picked["full_path"]]
|
||||||
|
spoiler = picked["nsfw"]
|
||||||
|
message = self.get_message(picked)
|
||||||
|
return {
|
||||||
|
"picked": picked,
|
||||||
|
"media_list": media_list,
|
||||||
|
"spoiler": spoiler,
|
||||||
|
"message": message
|
||||||
|
}
|
||||||
|
|
||||||
|
def after_pick(self, picked):
|
||||||
|
self.print_header_stats(picked["picked"])
|
||||||
|
self.currentProfileIndex[self.currentIndexCount] += 1
|
||||||
|
self.currentIndexCount += 1
|
||||||
|
os.remove(picked["picked"]["full_path"])
|
||||||
|
return super(YandereBot, self).after_pick(picked)
|
||||||
|
|
||||||
|
def post(self, picked):
|
||||||
|
# Attempt post
|
||||||
|
try:
|
||||||
|
return super(YandereBot, self).post(picked)
|
||||||
|
|
||||||
|
# Invalid post (move to next profile)
|
||||||
|
except FediBot.InvalidPost as e:
|
||||||
|
self.currentIndexCount += 1
|
||||||
|
print("Invalid post:", e)
|
||||||
|
|
||||||
|
# Invalid post (remove downloaded files)
|
||||||
|
except (FileTooLarge, InvalidMimeType) as e:
|
||||||
|
os.remove(picked["full_path"])
|
||||||
|
print("Unable to post:", e)
|
||||||
|
|
||||||
|
self.handle_post_exception()
|
||||||
|
|
||||||
|
# The post failed
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def can_post(self):
|
||||||
|
return self.currentIndexCount >= 0 and super(YandereBot, self).can_post()
|
||||||
|
|
||||||
|
|
||||||
class FailedToLoadCfg(Exception):
|
class FailedToLoadCfg(Exception):
|
||||||
@ -59,7 +226,7 @@ def main():
|
|||||||
# Flag if the bot is running in debug mode
|
# Flag if the bot is running in debug mode
|
||||||
debug_mode = arguments.dry_run or arguments.debug
|
debug_mode = arguments.dry_run or arguments.debug
|
||||||
|
|
||||||
yandere = yandere_bot.YandereBot(
|
yandere = YandereBot(
|
||||||
yandere_config,
|
yandere_config,
|
||||||
arguments.keyfile,
|
arguments.keyfile,
|
||||||
debug_mode,
|
debug_mode,
|
||||||
@ -94,9 +261,9 @@ if __name__ == "__main__":
|
|||||||
except FailedToLoadCfg:
|
except FailedToLoadCfg:
|
||||||
sys.exit(10)
|
sys.exit(10)
|
||||||
# Exceptions raised from the bot
|
# Exceptions raised from the bot
|
||||||
except yandere_bot.Debug:
|
except FediBot.Debug:
|
||||||
sys.exit(6)
|
sys.exit(6)
|
||||||
except yandere_bot.BadCfgFile:
|
except FediBot.BadCfgFile:
|
||||||
sys.exit(4)
|
sys.exit(4)
|
||||||
except yandere_bot.FailedLogin:
|
except FediBot.FailedLogin:
|
||||||
sys.exit(2)
|
sys.exit(2)
|
||||||
|
Reference in New Issue
Block a user