Added support for mime checking and nsfw message
This commit is contained in:
parent
bfaaa50117
commit
ad231997a5
@ -23,6 +23,7 @@ import fnmatch
|
|||||||
import math
|
import math
|
||||||
import shutil
|
import shutil
|
||||||
import importlib
|
import importlib
|
||||||
|
import magic
|
||||||
from threading import Event
|
from threading import Event
|
||||||
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
|
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
|
||||||
|
|
||||||
@ -124,15 +125,15 @@ class YandereBot:
|
|||||||
# Maybe I should remove this from the backend?
|
# Maybe I should remove this from the backend?
|
||||||
def print_header_stats(self, picked, date_selection, date_next_selection):
|
def print_header_stats(self, picked, date_selection, date_next_selection):
|
||||||
picked_name = picked["name"] if picked else None
|
picked_name = picked["name"] if picked else None
|
||||||
picked_url = picked["large_file_url"] if picked else None
|
picked_url = picked["file_url"] if picked else None
|
||||||
picked_path = picked["full_path"] if picked else None
|
picked_path = picked["full_path"] if picked else None
|
||||||
picked_explicit = picked["explicit"] if picked else None
|
picked_nsfw = picked["nsfw"] if picked else None
|
||||||
next_selection_seconds = max(0, int(time_diff_seconds(date_next_selection, date_selection)))
|
next_selection_seconds = max(0, int(time_diff_seconds(date_next_selection, date_selection)))
|
||||||
|
|
||||||
print("[Profile]", picked_name)
|
print("[Profile]", picked_name)
|
||||||
print(picked_path)
|
print(picked_path)
|
||||||
print(picked_url)
|
print(picked_url)
|
||||||
print("Explicit:", picked_explicit)
|
print("Explicit:", picked_nsfw)
|
||||||
print("Selection time: {}".format(
|
print("Selection time: {}".format(
|
||||||
date_selection.strftime(self.settings_time["long_date_format"])) )
|
date_selection.strftime(self.settings_time["long_date_format"])) )
|
||||||
print("Next selection time: {} ({} seconds)".format(
|
print("Next selection time: {} ({} seconds)".format(
|
||||||
@ -140,16 +141,51 @@ class YandereBot:
|
|||||||
print("[ {} Selected during session | {} Failed ]\n".format(
|
print("[ {} Selected during session | {} Failed ]\n".format(
|
||||||
self.currentSessionCount, self.failed_uploads) )
|
self.currentSessionCount, self.failed_uploads) )
|
||||||
|
|
||||||
|
|
||||||
|
def is_banned(self, tag_response):
|
||||||
|
for tag in self.settings_banned:
|
||||||
|
if tag in tag_response:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
# Returns a list of media paths (without the hashes)
|
# Returns a list of media paths (without the hashes)
|
||||||
def get_media_list(self, picked):
|
def get_media_list(self, picked):
|
||||||
try:
|
try:
|
||||||
banned = self.settings_banned
|
|
||||||
backend_s = picked["backend"]
|
backend_s = picked["backend"]
|
||||||
backend = importlib.import_module(backend_s)
|
backend = importlib.import_module(backend_s)
|
||||||
username = self.settings_credentials[backend_s]["username"]
|
username = self.settings_credentials[backend_s]["username"]
|
||||||
password = self.settings_credentials[backend_s]["password"]
|
password = self.settings_credentials[backend_s]["password"]
|
||||||
downloader = backend.downloader(banned, username, password, tmp=self.settings_behavior["tmp_dir"])
|
img = None
|
||||||
img = downloader.download(picked)
|
downloader = backend.downloader(username, password, tmp=self.settings_behavior["tmp_dir"])
|
||||||
|
|
||||||
|
while self.can_post():
|
||||||
|
img = downloader.fetch_post(picked)
|
||||||
|
if img is None:
|
||||||
|
self.eventSleep.wait(1)
|
||||||
|
break
|
||||||
|
if self.is_banned(img["tag_response"]):
|
||||||
|
print("Banned tag:", img["tag_response"])
|
||||||
|
self.eventSleep.wait(1)
|
||||||
|
continue
|
||||||
|
if downloader.download_post(img) is None:
|
||||||
|
print("Failed to download image")
|
||||||
|
self.eventSleep.wait(1)
|
||||||
|
continue
|
||||||
|
# Make sure the file is not malicious
|
||||||
|
full_path = img["full_path"]
|
||||||
|
mime = [magic.from_file(path, mime=True) for path in full_path]
|
||||||
|
if None in mime:
|
||||||
|
print("Unknown mime type.", mime)
|
||||||
|
self.eventSleep.wait(1)
|
||||||
|
continue
|
||||||
|
mime_categories = [c.split("/", 1)[0] for c in mime]
|
||||||
|
invalid_mime_categories = [c for c in mime_categories if c not in ("image", "video")]
|
||||||
|
if invalid_mime_categories:
|
||||||
|
print("mime type not allowed:", mime)
|
||||||
|
self.eventSleep.wait(1)
|
||||||
|
continue
|
||||||
|
break
|
||||||
return img
|
return img
|
||||||
except ImportError:
|
except ImportError:
|
||||||
print("Invalid Backend:", picked["backend"])
|
print("Invalid Backend:", picked["backend"])
|
||||||
@ -165,7 +201,9 @@ class YandereBot:
|
|||||||
def get_post_text(self, picked, media_list):
|
def get_post_text(self, picked, media_list):
|
||||||
content_type = self.settings_behavior["content_type"]
|
content_type = self.settings_behavior["content_type"]
|
||||||
content_newline = self.settings_behavior["content_newline"]
|
content_newline = self.settings_behavior["content_newline"]
|
||||||
static_message = content_newline.join(picked["message"])
|
nsfw = picked["nsfw"]
|
||||||
|
message = picked["message_nsfw"] if nsfw else picked["message"]
|
||||||
|
static_message = content_newline.join(message)
|
||||||
string_post = ""
|
string_post = ""
|
||||||
string_imglinks = []
|
string_imglinks = []
|
||||||
|
|
||||||
@ -196,7 +234,7 @@ class YandereBot:
|
|||||||
message,
|
message,
|
||||||
media_ids=[i[1] for i in media_list if len(i) == 2],
|
media_ids=[i[1] for i in media_list if len(i) == 2],
|
||||||
visibility=self.settings_behavior["visibility"],
|
visibility=self.settings_behavior["visibility"],
|
||||||
sensitive=picked["explicit"],
|
sensitive=picked["nsfw"],
|
||||||
content_type=content_type
|
content_type=content_type
|
||||||
)
|
)
|
||||||
return picked
|
return picked
|
||||||
@ -234,6 +272,9 @@ class YandereBot:
|
|||||||
print("Posting...", picked_profile["name"])
|
print("Posting...", picked_profile["name"])
|
||||||
picked = self.get_media_list(picked_profile)
|
picked = self.get_media_list(picked_profile)
|
||||||
self.currentIndexCount += 1
|
self.currentIndexCount += 1
|
||||||
|
|
||||||
|
if not self.can_post():
|
||||||
|
return None
|
||||||
|
|
||||||
self._post(picked)
|
self._post(picked)
|
||||||
|
|
||||||
@ -271,10 +312,10 @@ class YandereBot:
|
|||||||
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
|
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
|
||||||
# mastodon.py API will not specify why the connection timed out).
|
# mastodon.py API will not specify why the connection timed out).
|
||||||
# The default assumption is #2
|
# The default assumption is #2
|
||||||
except Exception as e:
|
# except Exception as e:
|
||||||
print("Unhandled Exception:", e)
|
# print("Unhandled Exception:", e)
|
||||||
# Exception flags
|
# # Exception flags
|
||||||
timeout = True
|
# timeout = True
|
||||||
|
|
||||||
# An exception occurred
|
# An exception occurred
|
||||||
self.failed_uploads += 1
|
self.failed_uploads += 1
|
||||||
|
Reference in New Issue
Block a user