Compare commits
3 Commits
a266c44b0b
...
2475241f21
Author | SHA1 | Date | |
---|---|---|---|
2475241f21 | |||
71f321c6f9 | |||
40c75ab624 |
@ -9,15 +9,11 @@ import os
|
|||||||
|
|
||||||
|
|
||||||
def random_tag(*tags):
|
def random_tag(*tags):
|
||||||
if len(tags) == 1 and tags[0].lower() == "random":
|
return len(tags) == 1 and tags[0].lower() == "random"
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def get_most_sever_rating(*ratings):
|
def get_most_sever_rating(rating):
|
||||||
if "q" in ratings or "e" in ratings:
|
return rating in ("q", "e")
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
class downloader:
|
class downloader:
|
||||||
@ -42,9 +38,8 @@ class downloader:
|
|||||||
print("Remote image request returned:", remote_image.status_code)
|
print("Remote image request returned:", remote_image.status_code)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
for d in full_path:
|
with open(full_path, "wb") as f:
|
||||||
with open(d, "wb") as f:
|
f.write(remote_image.content)
|
||||||
f.write(remote_image.content)
|
|
||||||
|
|
||||||
return post
|
return post
|
||||||
|
|
||||||
@ -80,11 +75,11 @@ class downloader:
|
|||||||
if tag_type in response:
|
if tag_type in response:
|
||||||
tag_response.append(response[tag_type].strip())
|
tag_response.append(response[tag_type].strip())
|
||||||
|
|
||||||
nsfw = search_request.json()[0]["rating"]
|
nsfw = response["rating"]
|
||||||
nsfw = get_most_sever_rating(nsfw)
|
nsfw = get_most_sever_rating(nsfw)
|
||||||
|
|
||||||
file_url = response["file_url"]
|
file_url = response["file_url"]
|
||||||
|
|
||||||
basename = file_url.rsplit("/", 1)[1]
|
basename = file_url.rsplit("/", 1)[1]
|
||||||
full_path = os.path.join(self.tmp, basename)
|
full_path = os.path.join(self.tmp, basename)
|
||||||
|
|
||||||
@ -99,7 +94,7 @@ class downloader:
|
|||||||
# Query results
|
# Query results
|
||||||
"search_url": search_url,
|
"search_url": search_url,
|
||||||
"file_url": file_url,
|
"file_url": file_url,
|
||||||
"full_path": [full_path],
|
"full_path": full_path,
|
||||||
"tag_response": " ".join(tag_response),
|
"tag_response": " ".join(tag_response),
|
||||||
"nsfw": nsfw
|
"nsfw": nsfw
|
||||||
}
|
}
|
||||||
|
@ -268,7 +268,5 @@ if __name__ == "__main__":
|
|||||||
sys.exit(5)
|
sys.exit(5)
|
||||||
except yandere_bot.BadCfgFile:
|
except yandere_bot.BadCfgFile:
|
||||||
sys.exit(4)
|
sys.exit(4)
|
||||||
except yandere_bot.BadPostSettings:
|
|
||||||
sys.exit(3)
|
|
||||||
except yandere_bot.FailedLogin:
|
except yandere_bot.FailedLogin:
|
||||||
sys.exit(2)
|
sys.exit(2)
|
||||||
|
@ -24,6 +24,7 @@ import math
|
|||||||
import shutil
|
import shutil
|
||||||
import importlib
|
import importlib
|
||||||
import magic
|
import magic
|
||||||
|
import random
|
||||||
from threading import Event
|
from threading import Event
|
||||||
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
|
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
|
||||||
|
|
||||||
@ -68,6 +69,7 @@ class YandereBot:
|
|||||||
self.cfg = cfg
|
self.cfg = cfg
|
||||||
self.load_settings(self.cfg)
|
self.load_settings(self.cfg)
|
||||||
self.debug_mode = debug_mode or self.settings_behavior["debug"]
|
self.debug_mode = debug_mode or self.settings_behavior["debug"]
|
||||||
|
random.seed(os.urandom(16))
|
||||||
if prime_bot:
|
if prime_bot:
|
||||||
self.prime_bot()
|
self.prime_bot()
|
||||||
|
|
||||||
@ -142,15 +144,9 @@ class YandereBot:
|
|||||||
self.currentSessionCount, self.failed_uploads) )
|
self.currentSessionCount, self.failed_uploads) )
|
||||||
|
|
||||||
|
|
||||||
def is_banned(self, tag_response):
|
|
||||||
for tag in self.settings_banned:
|
|
||||||
if tag in tag_response:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
# Returns a list of media paths (without the hashes)
|
# Returns a list of media paths (without the hashes)
|
||||||
def get_media_list(self, picked):
|
def download_media(self, picked):
|
||||||
try:
|
try:
|
||||||
backend_s = picked["backend"]
|
backend_s = picked["backend"]
|
||||||
backend = importlib.import_module(backend_s)
|
backend = importlib.import_module(backend_s)
|
||||||
@ -159,34 +155,12 @@ class YandereBot:
|
|||||||
img = None
|
img = None
|
||||||
downloader = backend.downloader(username, password, tmp=self.settings_behavior["tmp_dir"])
|
downloader = backend.downloader(username, password, tmp=self.settings_behavior["tmp_dir"])
|
||||||
|
|
||||||
while self.can_post():
|
img = downloader.fetch_post(picked)
|
||||||
img = downloader.fetch_post(picked)
|
|
||||||
if img is None:
|
if img is None:
|
||||||
self.eventSleep.wait(1)
|
raise InvalidPost("Img could not be downloaded")
|
||||||
break
|
|
||||||
if self.is_banned(img["tag_response"]):
|
return downloader.download_post(img)
|
||||||
print("Banned tag:", img["tag_response"])
|
|
||||||
self.eventSleep.wait(1)
|
|
||||||
continue
|
|
||||||
if downloader.download_post(img) is None:
|
|
||||||
print("Failed to download image")
|
|
||||||
self.eventSleep.wait(1)
|
|
||||||
continue
|
|
||||||
# Make sure the file is not malicious
|
|
||||||
full_path = img["full_path"]
|
|
||||||
mime = [magic.from_file(path, mime=True) for path in full_path]
|
|
||||||
if None in mime:
|
|
||||||
print("Unknown mime type.", mime)
|
|
||||||
self.eventSleep.wait(1)
|
|
||||||
continue
|
|
||||||
mime_categories = [c.split("/", 1)[0] for c in mime]
|
|
||||||
invalid_mime_categories = [c for c in mime_categories if c not in ("image", "video")]
|
|
||||||
if invalid_mime_categories:
|
|
||||||
print("mime type not allowed:", mime)
|
|
||||||
self.eventSleep.wait(1)
|
|
||||||
continue
|
|
||||||
break
|
|
||||||
return img
|
|
||||||
except ImportError:
|
except ImportError:
|
||||||
print("Invalid Backend:", picked["backend"])
|
print("Invalid Backend:", picked["backend"])
|
||||||
return None
|
return None
|
||||||
@ -224,9 +198,43 @@ class YandereBot:
|
|||||||
string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined)))
|
string_post = content_newline.join(filter(None, (static_message, string_imglinks_joined)))
|
||||||
|
|
||||||
return content_type, string_post
|
return content_type, string_post
|
||||||
|
|
||||||
|
|
||||||
|
def is_banned(self, picked):
|
||||||
|
tag_response = picked["tag_response"]
|
||||||
|
for tag in self.settings_banned:
|
||||||
|
if tag in tag_response:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def valid_mimetype(self, picked):
|
||||||
|
full_path = picked["full_path"]
|
||||||
|
mime = magic.from_file(full_path, mime=True)
|
||||||
|
|
||||||
|
if mime is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
mime_category = mime.split("/", 1)[0]
|
||||||
|
|
||||||
|
return mime_category in ("image", "video")
|
||||||
|
|
||||||
|
|
||||||
def _post(self, picked):
|
def _post(self, picked):
|
||||||
media_list = self.upload_media_list(picked["full_path"])
|
# Validate picked
|
||||||
|
if picked is None:
|
||||||
|
raise InvalidPost("Picked post is None")
|
||||||
|
elif self.is_banned(picked):
|
||||||
|
raise BannedTag("Tag is banned")
|
||||||
|
elif not self.valid_mimetype(picked):
|
||||||
|
raise InvalidMimeType("Invalid mime type")
|
||||||
|
|
||||||
|
full_path = picked["full_path"]
|
||||||
|
if not os.path.isfile(full_path):
|
||||||
|
raise FileNotFoundError("File not found: {}".format(media_list))
|
||||||
|
|
||||||
|
media_list = self.upload_media_list([full_path])
|
||||||
|
|
||||||
content_type, message = self.get_post_text(picked, media_list)
|
content_type, message = self.get_post_text(picked, media_list)
|
||||||
if self.debug_mode:
|
if self.debug_mode:
|
||||||
return picked
|
return picked
|
||||||
@ -239,6 +247,7 @@ class YandereBot:
|
|||||||
)
|
)
|
||||||
return picked
|
return picked
|
||||||
|
|
||||||
|
|
||||||
def pick_profile(self):
|
def pick_profile(self):
|
||||||
profiles = self.settings_post
|
profiles = self.settings_post
|
||||||
tag_select = self.settings_behavior["tag_select"].lower()
|
tag_select = self.settings_behavior["tag_select"].lower()
|
||||||
@ -261,37 +270,27 @@ class YandereBot:
|
|||||||
def post(self):
|
def post(self):
|
||||||
picked = None
|
picked = None
|
||||||
|
|
||||||
# Flags that are set if an upload fails
|
|
||||||
timeout = False
|
|
||||||
|
|
||||||
# Attempt post
|
# Attempt post
|
||||||
try:
|
try:
|
||||||
# Post
|
# Post
|
||||||
while not picked and self.can_post():
|
picked_profile = self.pick_profile()
|
||||||
picked_profile = self.pick_profile()
|
print("Posting...", picked_profile["name"])
|
||||||
print("Posting...", picked_profile["name"])
|
|
||||||
picked = self.get_media_list(picked_profile)
|
|
||||||
self.currentIndexCount += 1
|
|
||||||
|
|
||||||
if not self.can_post():
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
picked = self.download_media(picked_profile)
|
||||||
self._post(picked)
|
self._post(picked)
|
||||||
|
|
||||||
for path in picked["full_path"]:
|
os.remove(picked["full_path"])
|
||||||
os.remove(path)
|
|
||||||
|
|
||||||
# After a successful post
|
# After a successful post
|
||||||
self.currentSessionCount += 1
|
self.currentSessionCount += 1
|
||||||
|
self.currentIndexCount += 1
|
||||||
|
|
||||||
# The post was successful
|
# The post was successful
|
||||||
return picked
|
return picked
|
||||||
|
|
||||||
# Attempted to post a file that doesn't exist (immediately repost ignoring retry_seconds)
|
# Failed post
|
||||||
except FileNotFoundError:
|
except (BannedTag, UnknownMimeType, InvalidMimeType, FileNotFoundError) as e:
|
||||||
print("File not found:", picked["full_path"])
|
print("Posting error:", e)
|
||||||
# Exception flags
|
|
||||||
timeout = False
|
|
||||||
|
|
||||||
# Check if the file limit has been reached
|
# Check if the file limit has been reached
|
||||||
except MastodonAPIError as e:
|
except MastodonAPIError as e:
|
||||||
@ -301,8 +300,6 @@ class YandereBot:
|
|||||||
file_limit_reached = (e.args[1] == 413)
|
file_limit_reached = (e.args[1] == 413)
|
||||||
|
|
||||||
print("API Error:", e)
|
print("API Error:", e)
|
||||||
# Exception flags
|
|
||||||
timeout = True
|
|
||||||
|
|
||||||
# Server Errors
|
# Server Errors
|
||||||
# Assume all exceptions are on the server side
|
# Assume all exceptions are on the server side
|
||||||
@ -312,17 +309,16 @@ class YandereBot:
|
|||||||
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
|
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
|
||||||
# mastodon.py API will not specify why the connection timed out).
|
# mastodon.py API will not specify why the connection timed out).
|
||||||
# The default assumption is #2
|
# The default assumption is #2
|
||||||
# except Exception as e:
|
except Exception as e:
|
||||||
# print("Unhandled Exception:", e)
|
print("Unhandled Exception:", e)
|
||||||
# # Exception flags
|
|
||||||
# timeout = True
|
|
||||||
|
|
||||||
# An exception occurred
|
# An exception occurred
|
||||||
self.failed_uploads += 1
|
self.failed_uploads += 1
|
||||||
self.currentIndexCount += 1
|
self.currentIndexCount += 1
|
||||||
print("[Errors: {}]".format(self.failed_uploads))
|
print("[Errors: {}]".format(self.failed_uploads))
|
||||||
if timeout:
|
|
||||||
self.eventSleep.wait(self.settings_behavior["retry_seconds"])
|
# Sleep
|
||||||
|
self.eventSleep.wait(self.settings_behavior["retry_seconds"])
|
||||||
|
|
||||||
# The post failed
|
# The post failed
|
||||||
return None
|
return None
|
||||||
@ -417,7 +413,19 @@ class Debug(Exception):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class BadPostSettings(Exception):
|
class InvalidPost(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class BannedTag(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class UnknownMimeType(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidMimeType(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user