Compare commits

...

4 Commits

4 changed files with 109 additions and 72 deletions

View File

@ -19,12 +19,13 @@
from collections import OrderedDict
def setup_profile(name, backend, tags, *message):
def setup_profile(name, backend, tags, message, message_nsfw):
post_setting = {
"name": name,
"backend": backend,
"tags": tags,
"message": message
"message": (message,),
"message_nsfw": (message_nsfw,)
}
return post_setting
@ -88,8 +89,8 @@ settings_time = {
# Apply post settings
settings_post = (
setup_profile("danbooru.random", "danbooru_backend", ("random",), "#random"),
setup_profile("danbooru.yandere", "danbooru_backend", ("yandere",), "#yandere"),
setup_profile("danbooru.random", "danbooru_backend", ("random",), "#random", "#random #lewd #nsfw"),
setup_profile("danbooru.yandere", "danbooru_backend", ("yandere",), "#yandere", "#yandere #lewd #nsfw"),
)
# Default post behavior:

View File

@ -1,2 +1,3 @@
Mastodon.py
cryptography
cryptography
python-magic

View File

@ -24,24 +24,33 @@ class downloader:
username = None
password = None
tmp = None
banned = None
unallowed_extensions = (".zip",)
def __init__(self, banned = tuple(), username=None, password=None, tmp="/tmp"):
def __init__(self, username=None, password=None, tmp="/tmp"):
self.username = username
self.password = password
self.tmp = tmp
self.banned = banned
def is_banned(self, tag_list):
for tag in self.banned:
if tag in tag_list:
return True
return False
def download(self, profile):
# Search ratings: s=safe, e=explicit
def download_post(self, post):
file_url = post["file_url"]
full_path = post["full_path"]
remote_image = requests.get(file_url)
if remote_image.status_code != 200:
print("Remote image request returned:", remote_image.status_code)
return None
for d in full_path:
with open(d, "wb") as f:
f.write(remote_image.content)
return post
def fetch_post(self, profile):
# Search ratings: s=safe, e=nsfw
# base_url = "https://danbooru.donmai.us/posts.json?random=true&tags={}&rating=e&limit=1"
tags = profile["tags"]
search_url = "https://danbooru.donmai.us/posts.json?random=true&limit=1"
@ -50,64 +59,49 @@ class downloader:
search_url = "{}&tags={}".format(search_url, search_tags)
search_request = None
while True:
if self.username and self.password:
search_request = requests.get(search_url,
auth=(self.username, self.password)
)
else:
search_request = requests.get(search_url)
if self.username and self.password:
search_request = requests.get(search_url,
auth=(self.username, self.password)
)
else:
search_request = requests.get(search_url)
if search_request.status_code != 200:
print("Search request returned:", search_request.status_code)
continue
elif "large_file_url" not in search_request.json()[0]:
continue
elif "tag_string" not in search_request.json()[0]:
continue
elif "tag_string_general" not in search_request.json()[0]:
continue
elif self.is_banned(search_request.json()[0]["tag_string"]):
print("Banned Tag1:", search_request.json()[0]["tag_string"])
continue
elif self.is_banned(search_request.json()[0]["tag_string_general"]):
print("Banned Tag2",search_request.json()[0]["tag_string_general"])
continue
break
import pprint
pprint.pprint(search_request.json()[0]["tag_string"])
large_file_url = search_request.json()[0]["file_url"]
explicit = search_request.json()[0]["rating"]
explicit = get_most_sever_rating(explicit)
remote_image = requests.get(large_file_url)
if remote_image.status_code != 200:
print("Remote image request returned:", remote_image.status_code)
if search_request.status_code != 200:
print("Search request returned:", search_request.status_code)
return None
response = search_request.json()[0]
tag_response = []
if "file_url" not in response:
print("file_url is not in response")
return None
# Aggregate Tags
for tag_type in "tag_string", "tag_string_general":
if tag_type in response:
tag_response.append(response[tag_type].strip())
basename = large_file_url.rsplit("/", 1)[1]
nsfw = search_request.json()[0]["rating"]
nsfw = get_most_sever_rating(nsfw)
file_url = response["file_url"]
basename = file_url.rsplit("/", 1)[1]
full_path = os.path.join(self.tmp, basename)
with open(full_path, "wb") as f:
f.write(remote_image.content)
r = {
# Add profile to dictioanry
"name": profile["name"],
"backend": profile["backend"],
"tags": profile["tags"],
"message": profile["message"],
"message_nsfw": profile["message_nsfw"],
# Query results
"search_url": search_url,
"large_file_url": large_file_url,
"file_url": file_url,
"full_path": [full_path],
"explicit": explicit,
"tag_response": " ".join(tag_response),
"nsfw": nsfw
}
return r

View File

@ -23,6 +23,7 @@ import fnmatch
import math
import shutil
import importlib
import magic
from threading import Event
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
@ -124,15 +125,15 @@ class YandereBot:
# Maybe I should remove this from the backend?
def print_header_stats(self, picked, date_selection, date_next_selection):
picked_name = picked["name"] if picked else None
picked_url = picked["large_file_url"] if picked else None
picked_url = picked["file_url"] if picked else None
picked_path = picked["full_path"] if picked else None
picked_explicit = picked["explicit"] if picked else None
picked_nsfw = picked["nsfw"] if picked else None
next_selection_seconds = max(0, int(time_diff_seconds(date_next_selection, date_selection)))
print("[Profile]", picked_name)
print(picked_path)
print(picked_url)
print("Explicit:", picked_explicit)
print("Explicit:", picked_nsfw)
print("Selection time: {}".format(
date_selection.strftime(self.settings_time["long_date_format"])) )
print("Next selection time: {} ({} seconds)".format(
@ -140,16 +141,51 @@ class YandereBot:
print("[ {} Selected during session | {} Failed ]\n".format(
self.currentSessionCount, self.failed_uploads) )
def is_banned(self, tag_response):
for tag in self.settings_banned:
if tag in tag_response:
return True
return False
# Returns a list of media paths (without the hashes)
def get_media_list(self, picked):
try:
banned = self.settings_banned
backend_s = picked["backend"]
backend = importlib.import_module(backend_s)
username = self.settings_credentials[backend_s]["username"]
password = self.settings_credentials[backend_s]["password"]
downloader = backend.downloader(banned, username, password, tmp=self.settings_behavior["tmp_dir"])
img = downloader.download(picked)
img = None
downloader = backend.downloader(username, password, tmp=self.settings_behavior["tmp_dir"])
while self.can_post():
img = downloader.fetch_post(picked)
if img is None:
self.eventSleep.wait(1)
break
if self.is_banned(img["tag_response"]):
print("Banned tag:", img["tag_response"])
self.eventSleep.wait(1)
continue
if downloader.download_post(img) is None:
print("Failed to download image")
self.eventSleep.wait(1)
continue
# Make sure the file is not malicious
full_path = img["full_path"]
mime = [magic.from_file(path, mime=True) for path in full_path]
if None in mime:
print("Unknown mime type.", mime)
self.eventSleep.wait(1)
continue
mime_categories = [c.split("/", 1)[0] for c in mime]
invalid_mime_categories = [c for c in mime_categories if c not in ("image", "video")]
if invalid_mime_categories:
print("mime type not allowed:", mime)
self.eventSleep.wait(1)
continue
break
return img
except ImportError:
print("Invalid Backend:", picked["backend"])
@ -165,7 +201,9 @@ class YandereBot:
def get_post_text(self, picked, media_list):
content_type = self.settings_behavior["content_type"]
content_newline = self.settings_behavior["content_newline"]
static_message = content_newline.join(picked["message"])
nsfw = picked["nsfw"]
message = picked["message_nsfw"] if nsfw else picked["message"]
static_message = content_newline.join(message)
string_post = ""
string_imglinks = []
@ -196,7 +234,7 @@ class YandereBot:
message,
media_ids=[i[1] for i in media_list if len(i) == 2],
visibility=self.settings_behavior["visibility"],
sensitive=picked["explicit"],
sensitive=picked["nsfw"],
content_type=content_type
)
return picked
@ -234,6 +272,9 @@ class YandereBot:
print("Posting...", picked_profile["name"])
picked = self.get_media_list(picked_profile)
self.currentIndexCount += 1
if not self.can_post():
return None
self._post(picked)
@ -271,10 +312,10 @@ class YandereBot:
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
# mastodon.py API will not specify why the connection timed out).
# The default assumption is #2
except Exception as e:
print("Unhandled Exception:", e)
# Exception flags
timeout = True
# except Exception as e:
# print("Unhandled Exception:", e)
# # Exception flags
# timeout = True
# An exception occurred
self.failed_uploads += 1