Refactored and improved readability

This commit is contained in:
Anon 2022-08-30 00:15:51 -07:00
parent 40c75ab624
commit 71f321c6f9

View File

@ -24,6 +24,7 @@ import math
import shutil import shutil
import importlib import importlib
import magic import magic
import random
from threading import Event from threading import Event
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
@ -68,6 +69,7 @@ class YandereBot:
self.cfg = cfg self.cfg = cfg
self.load_settings(self.cfg) self.load_settings(self.cfg)
self.debug_mode = debug_mode or self.settings_behavior["debug"] self.debug_mode = debug_mode or self.settings_behavior["debug"]
random.seed(os.urandom(16))
if prime_bot: if prime_bot:
self.prime_bot() self.prime_bot()
@ -142,15 +144,9 @@ class YandereBot:
self.currentSessionCount, self.failed_uploads) ) self.currentSessionCount, self.failed_uploads) )
def is_banned(self, tag_response):
for tag in self.settings_banned:
if tag in tag_response:
return True
return False
# Returns a list of media paths (without the hashes) # Returns a list of media paths (without the hashes)
def get_media_list(self, picked): def download_media(self, picked):
try: try:
backend_s = picked["backend"] backend_s = picked["backend"]
backend = importlib.import_module(backend_s) backend = importlib.import_module(backend_s)
@ -159,34 +155,12 @@ class YandereBot:
img = None img = None
downloader = backend.downloader(username, password, tmp=self.settings_behavior["tmp_dir"]) downloader = backend.downloader(username, password, tmp=self.settings_behavior["tmp_dir"])
while self.can_post(): img = downloader.fetch_post(picked)
img = downloader.fetch_post(picked)
if img is None: if img is None:
self.eventSleep.wait(1) raise InvalidPost("Img could not be downloaded")
break
if self.is_banned(img["tag_response"]): return downloader.download_post(img)
print("Banned tag:", img["tag_response"])
self.eventSleep.wait(1)
continue
if downloader.download_post(img) is None:
print("Failed to download image")
self.eventSleep.wait(1)
continue
# Make sure the file is not malicious
full_path = img["full_path"]
mime = [magic.from_file(path, mime=True) for path in full_path]
if None in mime:
print("Unknown mime type.", mime)
self.eventSleep.wait(1)
continue
mime_categories = [c.split("/", 1)[0] for c in mime]
invalid_mime_categories = [c for c in mime_categories if c not in ("image", "video")]
if invalid_mime_categories:
print("mime type not allowed:", mime)
self.eventSleep.wait(1)
continue
break
return img
except ImportError: except ImportError:
print("Invalid Backend:", picked["backend"]) print("Invalid Backend:", picked["backend"])
return None return None
@ -225,8 +199,42 @@ class YandereBot:
return content_type, string_post return content_type, string_post
def is_banned(self, picked):
tag_response = picked["tag_response"]
for tag in self.settings_banned:
if tag in tag_response:
return True
return False
def valid_mimetype(self, picked):
full_path = picked["full_path"]
mime = magic.from_file(full_path, mime=True)
if mime is None:
return False
mime_category = mime.split("/", 1)[0]
return mime_category in ("image", "video")
def _post(self, picked): def _post(self, picked):
media_list = self.upload_media_list(picked["full_path"]) # Validate picked
if picked is None:
raise InvalidPost("Picked post is None")
elif self.is_banned(picked):
raise BannedTag("Tag is banned")
elif not self.valid_mimetype(picked):
raise InvalidMimeType("Invalid mime type")
full_path = picked["full_path"]
if not os.path.isfile(full_path):
raise FileNotFoundError("File not found: {}".format(media_list))
media_list = self.upload_media_list([full_path])
content_type, message = self.get_post_text(picked, media_list) content_type, message = self.get_post_text(picked, media_list)
if self.debug_mode: if self.debug_mode:
return picked return picked
@ -239,6 +247,7 @@ class YandereBot:
) )
return picked return picked
def pick_profile(self): def pick_profile(self):
profiles = self.settings_post profiles = self.settings_post
tag_select = self.settings_behavior["tag_select"].lower() tag_select = self.settings_behavior["tag_select"].lower()
@ -261,37 +270,27 @@ class YandereBot:
def post(self): def post(self):
picked = None picked = None
# Flags that are set if an upload fails
timeout = False
# Attempt post # Attempt post
try: try:
# Post # Post
while not picked and self.can_post(): picked_profile = self.pick_profile()
picked_profile = self.pick_profile() print("Posting...", picked_profile["name"])
print("Posting...", picked_profile["name"])
picked = self.get_media_list(picked_profile)
self.currentIndexCount += 1
if not self.can_post():
return None
picked = self.download_media(picked_profile)
self._post(picked) self._post(picked)
for path in picked["full_path"]: os.remove(picked["full_path"])
os.remove(path)
# After a successful post # After a successful post
self.currentSessionCount += 1 self.currentSessionCount += 1
self.currentIndexCount += 1
# The post was successful # The post was successful
return picked return picked
# Attempted to post a file that doesn't exist (immediately repost ignoring retry_seconds) # Failed post
except FileNotFoundError: except (BannedTag, UnknownMimeType, InvalidMimeType, FileNotFoundError) as e:
print("File not found:", picked["full_path"]) print("Posting error:", e)
# Exception flags
timeout = False
# Check if the file limit has been reached # Check if the file limit has been reached
except MastodonAPIError as e: except MastodonAPIError as e:
@ -301,8 +300,6 @@ class YandereBot:
file_limit_reached = (e.args[1] == 413) file_limit_reached = (e.args[1] == 413)
print("API Error:", e) print("API Error:", e)
# Exception flags
timeout = True
# Server Errors # Server Errors
# Assume all exceptions are on the server side # Assume all exceptions are on the server side
@ -312,17 +309,16 @@ class YandereBot:
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the # 2. The server is down. Check to verify in a web browser (this is the default assumption since the
# mastodon.py API will not specify why the connection timed out). # mastodon.py API will not specify why the connection timed out).
# The default assumption is #2 # The default assumption is #2
# except Exception as e: except Exception as e:
# print("Unhandled Exception:", e) print("Unhandled Exception:", e)
# # Exception flags
# timeout = True
# An exception occurred # An exception occurred
self.failed_uploads += 1 self.failed_uploads += 1
self.currentIndexCount += 1 self.currentIndexCount += 1
print("[Errors: {}]".format(self.failed_uploads)) print("[Errors: {}]".format(self.failed_uploads))
if timeout:
self.eventSleep.wait(self.settings_behavior["retry_seconds"]) # Sleep
self.eventSleep.wait(self.settings_behavior["retry_seconds"])
# The post failed # The post failed
return None return None
@ -417,7 +413,19 @@ class Debug(Exception):
pass pass
class BadPostSettings(Exception): class InvalidPost(Exception):
pass
class BannedTag(Exception):
pass
class UnknownMimeType(Exception):
pass
class InvalidMimeType(Exception):
pass pass