Compare commits

..

3 Commits

Author SHA1 Message Date
2475241f21 Refactored and improved exception handling 2022-08-30 00:16:50 -07:00
71f321c6f9 Refactored and improved readability 2022-08-30 00:15:51 -07:00
40c75ab624 Removed unused exception 2022-08-30 00:14:33 -07:00
3 changed files with 80 additions and 79 deletions

View File

@ -9,15 +9,11 @@ import os
def random_tag(*tags): def random_tag(*tags):
if len(tags) == 1 and tags[0].lower() == "random": return len(tags) == 1 and tags[0].lower() == "random"
return True
return False
def get_most_sever_rating(*ratings): def get_most_sever_rating(rating):
if "q" in ratings or "e" in ratings: return rating in ("q", "e")
return True
return False
class downloader: class downloader:
@ -42,9 +38,8 @@ class downloader:
print("Remote image request returned:", remote_image.status_code) print("Remote image request returned:", remote_image.status_code)
return None return None
for d in full_path: with open(full_path, "wb") as f:
with open(d, "wb") as f: f.write(remote_image.content)
f.write(remote_image.content)
return post return post
@ -80,11 +75,11 @@ class downloader:
if tag_type in response: if tag_type in response:
tag_response.append(response[tag_type].strip()) tag_response.append(response[tag_type].strip())
nsfw = search_request.json()[0]["rating"] nsfw = response["rating"]
nsfw = get_most_sever_rating(nsfw) nsfw = get_most_sever_rating(nsfw)
file_url = response["file_url"] file_url = response["file_url"]
basename = file_url.rsplit("/", 1)[1] basename = file_url.rsplit("/", 1)[1]
full_path = os.path.join(self.tmp, basename) full_path = os.path.join(self.tmp, basename)
@ -99,7 +94,7 @@ class downloader:
# Query results # Query results
"search_url": search_url, "search_url": search_url,
"file_url": file_url, "file_url": file_url,
"full_path": [full_path], "full_path": full_path,
"tag_response": " ".join(tag_response), "tag_response": " ".join(tag_response),
"nsfw": nsfw "nsfw": nsfw
} }

View File

@ -268,7 +268,5 @@ if __name__ == "__main__":
sys.exit(5) sys.exit(5)
except yandere_bot.BadCfgFile: except yandere_bot.BadCfgFile:
sys.exit(4) sys.exit(4)
except yandere_bot.BadPostSettings:
sys.exit(3)
except yandere_bot.FailedLogin: except yandere_bot.FailedLogin:
sys.exit(2) sys.exit(2)

View File

@ -24,6 +24,7 @@ import math
import shutil import shutil
import importlib import importlib
import magic import magic
import random
from threading import Event from threading import Event
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
@ -68,6 +69,7 @@ class YandereBot:
self.cfg = cfg self.cfg = cfg
self.load_settings(self.cfg) self.load_settings(self.cfg)
self.debug_mode = debug_mode or self.settings_behavior["debug"] self.debug_mode = debug_mode or self.settings_behavior["debug"]
random.seed(os.urandom(16))
if prime_bot: if prime_bot:
self.prime_bot() self.prime_bot()
@ -142,15 +144,9 @@ class YandereBot:
self.currentSessionCount, self.failed_uploads) ) self.currentSessionCount, self.failed_uploads) )
def is_banned(self, tag_response):
for tag in self.settings_banned:
if tag in tag_response:
return True
return False
# Returns a list of media paths (without the hashes) # Returns a list of media paths (without the hashes)
def get_media_list(self, picked): def download_media(self, picked):
try: try:
backend_s = picked["backend"] backend_s = picked["backend"]
backend = importlib.import_module(backend_s) backend = importlib.import_module(backend_s)
@ -159,34 +155,12 @@ class YandereBot:
img = None img = None
downloader = backend.downloader(username, password, tmp=self.settings_behavior["tmp_dir"]) downloader = backend.downloader(username, password, tmp=self.settings_behavior["tmp_dir"])
while self.can_post(): img = downloader.fetch_post(picked)
img = downloader.fetch_post(picked)
if img is None: if img is None:
self.eventSleep.wait(1) raise InvalidPost("Img could not be downloaded")
break
if self.is_banned(img["tag_response"]): return downloader.download_post(img)
print("Banned tag:", img["tag_response"])
self.eventSleep.wait(1)
continue
if downloader.download_post(img) is None:
print("Failed to download image")
self.eventSleep.wait(1)
continue
# Make sure the file is not malicious
full_path = img["full_path"]
mime = [magic.from_file(path, mime=True) for path in full_path]
if None in mime:
print("Unknown mime type.", mime)
self.eventSleep.wait(1)
continue
mime_categories = [c.split("/", 1)[0] for c in mime]
invalid_mime_categories = [c for c in mime_categories if c not in ("image", "video")]
if invalid_mime_categories:
print("mime type not allowed:", mime)
self.eventSleep.wait(1)
continue
break
return img
except ImportError: except ImportError:
print("Invalid Backend:", picked["backend"]) print("Invalid Backend:", picked["backend"])
return None return None
@ -224,9 +198,43 @@ class YandereBot:
string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined))) string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined)))
return content_type, string_post return content_type, string_post
def is_banned(self, picked):
tag_response = picked["tag_response"]
for tag in self.settings_banned:
if tag in tag_response:
return True
return False
def valid_mimetype(self, picked):
full_path = picked["full_path"]
mime = magic.from_file(full_path, mime=True)
if mime is None:
return False
mime_category = mime.split("/", 1)[0]
return mime_category in ("image", "video")
def _post(self, picked): def _post(self, picked):
media_list = self.upload_media_list(picked["full_path"]) # Validate picked
if picked is None:
raise InvalidPost("Picked post is None")
elif self.is_banned(picked):
raise BannedTag("Tag is banned")
elif not self.valid_mimetype(picked):
raise InvalidMimeType("Invalid mime type")
full_path = picked["full_path"]
if not os.path.isfile(full_path):
raise FileNotFoundError("File not found: {}".format(media_list))
media_list = self.upload_media_list([full_path])
content_type, message = self.get_post_text(picked, media_list) content_type, message = self.get_post_text(picked, media_list)
if self.debug_mode: if self.debug_mode:
return picked return picked
@ -239,6 +247,7 @@ class YandereBot:
) )
return picked return picked
def pick_profile(self): def pick_profile(self):
profiles = self.settings_post profiles = self.settings_post
tag_select = self.settings_behavior["tag_select"].lower() tag_select = self.settings_behavior["tag_select"].lower()
@ -261,37 +270,27 @@ class YandereBot:
def post(self): def post(self):
picked = None picked = None
# Flags that are set if an upload fails
timeout = False
# Attempt post # Attempt post
try: try:
# Post # Post
while not picked and self.can_post(): picked_profile = self.pick_profile()
picked_profile = self.pick_profile() print("Posting...", picked_profile["name"])
print("Posting...", picked_profile["name"])
picked = self.get_media_list(picked_profile)
self.currentIndexCount += 1
if not self.can_post():
return None
picked = self.download_media(picked_profile)
self._post(picked) self._post(picked)
for path in picked["full_path"]: os.remove(picked["full_path"])
os.remove(path)
# After a successful post # After a successful post
self.currentSessionCount += 1 self.currentSessionCount += 1
self.currentIndexCount += 1
# The post was successful # The post was successful
return picked return picked
# Attempted to post a file that doesn't exist (immediately repost ignoring retry_seconds) # Failed post
except FileNotFoundError: except (BannedTag, UnknownMimeType, InvalidMimeType, FileNotFoundError) as e:
print("File not found:", picked["full_path"]) print("Posting error:", e)
# Exception flags
timeout = False
# Check if the file limit has been reached # Check if the file limit has been reached
except MastodonAPIError as e: except MastodonAPIError as e:
@ -301,8 +300,6 @@ class YandereBot:
file_limit_reached = (e.args[1] == 413) file_limit_reached = (e.args[1] == 413)
print("API Error:", e) print("API Error:", e)
# Exception flags
timeout = True
# Server Errors # Server Errors
# Assume all exceptions are on the server side # Assume all exceptions are on the server side
@ -312,17 +309,16 @@ class YandereBot:
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the # 2. The server is down. Check to verify in a web browser (this is the default assumption since the
# mastodon.py API will not specify why the connection timed out). # mastodon.py API will not specify why the connection timed out).
# The default assumption is #2 # The default assumption is #2
# except Exception as e: except Exception as e:
# print("Unhandled Exception:", e) print("Unhandled Exception:", e)
# # Exception flags
# timeout = True
# An exception occurred # An exception occurred
self.failed_uploads += 1 self.failed_uploads += 1
self.currentIndexCount += 1 self.currentIndexCount += 1
print("[Errors: {}]".format(self.failed_uploads)) print("[Errors: {}]".format(self.failed_uploads))
if timeout:
self.eventSleep.wait(self.settings_behavior["retry_seconds"]) # Sleep
self.eventSleep.wait(self.settings_behavior["retry_seconds"])
# The post failed # The post failed
return None return None
@ -417,7 +413,19 @@ class Debug(Exception):
pass pass
class BadPostSettings(Exception): class InvalidPost(Exception):
pass
class BannedTag(Exception):
pass
class UnknownMimeType(Exception):
pass
class InvalidMimeType(Exception):
pass pass