Compare commits

...

3 Commits

Author SHA1 Message Date
2475241f21 Refactored and improved exception handling 2022-08-30 00:16:50 -07:00
71f321c6f9 Refactored and improved readability 2022-08-30 00:15:51 -07:00
40c75ab624 Removed unused exception 2022-08-30 00:14:33 -07:00
3 changed files with 80 additions and 79 deletions

View File

@ -9,15 +9,11 @@ import os
def random_tag(*tags):
if len(tags) == 1 and tags[0].lower() == "random":
return True
return False
return len(tags) == 1 and tags[0].lower() == "random"
def get_most_sever_rating(*ratings):
if "q" in ratings or "e" in ratings:
return True
return False
def get_most_sever_rating(rating):
return rating in ("q", "e")
class downloader:
@ -42,9 +38,8 @@ class downloader:
print("Remote image request returned:", remote_image.status_code)
return None
for d in full_path:
with open(d, "wb") as f:
f.write(remote_image.content)
with open(full_path, "wb") as f:
f.write(remote_image.content)
return post
@ -80,11 +75,11 @@ class downloader:
if tag_type in response:
tag_response.append(response[tag_type].strip())
nsfw = search_request.json()[0]["rating"]
nsfw = response["rating"]
nsfw = get_most_sever_rating(nsfw)
file_url = response["file_url"]
basename = file_url.rsplit("/", 1)[1]
full_path = os.path.join(self.tmp, basename)
@ -99,7 +94,7 @@ class downloader:
# Query results
"search_url": search_url,
"file_url": file_url,
"full_path": [full_path],
"full_path": full_path,
"tag_response": " ".join(tag_response),
"nsfw": nsfw
}

View File

@ -268,7 +268,5 @@ if __name__ == "__main__":
sys.exit(5)
except yandere_bot.BadCfgFile:
sys.exit(4)
except yandere_bot.BadPostSettings:
sys.exit(3)
except yandere_bot.FailedLogin:
sys.exit(2)

View File

@ -24,6 +24,7 @@ import math
import shutil
import importlib
import magic
import random
from threading import Event
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
@ -68,6 +69,7 @@ class YandereBot:
self.cfg = cfg
self.load_settings(self.cfg)
self.debug_mode = debug_mode or self.settings_behavior["debug"]
random.seed(os.urandom(16))
if prime_bot:
self.prime_bot()
@ -142,15 +144,9 @@ class YandereBot:
self.currentSessionCount, self.failed_uploads) )
def is_banned(self, tag_response):
for tag in self.settings_banned:
if tag in tag_response:
return True
return False
# Returns a list of media paths (without the hashes)
def get_media_list(self, picked):
def download_media(self, picked):
try:
backend_s = picked["backend"]
backend = importlib.import_module(backend_s)
@ -159,34 +155,12 @@ class YandereBot:
img = None
downloader = backend.downloader(username, password, tmp=self.settings_behavior["tmp_dir"])
while self.can_post():
img = downloader.fetch_post(picked)
if img is None:
self.eventSleep.wait(1)
break
if self.is_banned(img["tag_response"]):
print("Banned tag:", img["tag_response"])
self.eventSleep.wait(1)
continue
if downloader.download_post(img) is None:
print("Failed to download image")
self.eventSleep.wait(1)
continue
# Make sure the file is not malicious
full_path = img["full_path"]
mime = [magic.from_file(path, mime=True) for path in full_path]
if None in mime:
print("Unknown mime type.", mime)
self.eventSleep.wait(1)
continue
mime_categories = [c.split("/", 1)[0] for c in mime]
invalid_mime_categories = [c for c in mime_categories if c not in ("image", "video")]
if invalid_mime_categories:
print("mime type not allowed:", mime)
self.eventSleep.wait(1)
continue
break
return img
img = downloader.fetch_post(picked)
if img is None:
raise InvalidPost("Img could not be downloaded")
return downloader.download_post(img)
except ImportError:
print("Invalid Backend:", picked["backend"])
return None
@ -224,9 +198,43 @@ class YandereBot:
string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined)))
return content_type, string_post
def is_banned(self, picked):
tag_response = picked["tag_response"]
for tag in self.settings_banned:
if tag in tag_response:
return True
return False
def valid_mimetype(self, picked):
full_path = picked["full_path"]
mime = magic.from_file(full_path, mime=True)
if mime is None:
return False
mime_category = mime.split("/", 1)[0]
return mime_category in ("image", "video")
def _post(self, picked):
media_list = self.upload_media_list(picked["full_path"])
# Validate picked
if picked is None:
raise InvalidPost("Picked post is None")
elif self.is_banned(picked):
raise BannedTag("Tag is banned")
elif not self.valid_mimetype(picked):
raise InvalidMimeType("Invalid mime type")
full_path = picked["full_path"]
if not os.path.isfile(full_path):
raise FileNotFoundError("File not found: {}".format(media_list))
media_list = self.upload_media_list([full_path])
content_type, message = self.get_post_text(picked, media_list)
if self.debug_mode:
return picked
@ -239,6 +247,7 @@ class YandereBot:
)
return picked
def pick_profile(self):
profiles = self.settings_post
tag_select = self.settings_behavior["tag_select"].lower()
@ -261,37 +270,27 @@ class YandereBot:
def post(self):
picked = None
# Flags that are set if an upload fails
timeout = False
# Attempt post
try:
# Post
while not picked and self.can_post():
picked_profile = self.pick_profile()
print("Posting...", picked_profile["name"])
picked = self.get_media_list(picked_profile)
self.currentIndexCount += 1
if not self.can_post():
return None
picked_profile = self.pick_profile()
print("Posting...", picked_profile["name"])
picked = self.download_media(picked_profile)
self._post(picked)
for path in picked["full_path"]:
os.remove(path)
os.remove(picked["full_path"])
# After a successful post
self.currentSessionCount += 1
self.currentIndexCount += 1
# The post was successful
return picked
# Attempted to post a file that doesn't exist (immediately repost ignoring retry_seconds)
except FileNotFoundError:
print("File not found:", picked["full_path"])
# Exception flags
timeout = False
# Failed post
except (BannedTag, UnknownMimeType, InvalidMimeType, FileNotFoundError) as e:
print("Posting error:", e)
# Check if the file limit has been reached
except MastodonAPIError as e:
@ -301,8 +300,6 @@ class YandereBot:
file_limit_reached = (e.args[1] == 413)
print("API Error:", e)
# Exception flags
timeout = True
# Server Errors
# Assume all exceptions are on the server side
@ -312,17 +309,16 @@ class YandereBot:
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
# mastodon.py API will not specify why the connection timed out).
# The default assumption is #2
# except Exception as e:
# print("Unhandled Exception:", e)
# # Exception flags
# timeout = True
except Exception as e:
print("Unhandled Exception:", e)
# An exception occurred
self.failed_uploads += 1
self.currentIndexCount += 1
print("[Errors: {}]".format(self.failed_uploads))
if timeout:
self.eventSleep.wait(self.settings_behavior["retry_seconds"])
# Sleep
self.eventSleep.wait(self.settings_behavior["retry_seconds"])
# The post failed
return None
@ -417,7 +413,19 @@ class Debug(Exception):
pass
class BadPostSettings(Exception):
class InvalidPost(Exception):
pass
class BannedTag(Exception):
pass
class UnknownMimeType(Exception):
pass
class InvalidMimeType(Exception):
pass