Compare commits
4 Commits
1d9b183e08
...
a266c44b0b
Author | SHA1 | Date | |
---|---|---|---|
a266c44b0b | |||
ad231997a5 | |||
bfaaa50117 | |||
8fb4da20ad |
@ -19,12 +19,13 @@
|
||||
from collections import OrderedDict
|
||||
|
||||
|
||||
def setup_profile(name, backend, tags, *message):
|
||||
def setup_profile(name, backend, tags, message, message_nsfw):
|
||||
post_setting = {
|
||||
"name": name,
|
||||
"backend": backend,
|
||||
"tags": tags,
|
||||
"message": message
|
||||
"message": (message,),
|
||||
"message_nsfw": (message_nsfw,)
|
||||
}
|
||||
return post_setting
|
||||
|
||||
@ -88,8 +89,8 @@ settings_time = {
|
||||
|
||||
# Apply post settings
|
||||
settings_post = (
|
||||
setup_profile("danbooru.random", "danbooru_backend", ("random",), "#random"),
|
||||
setup_profile("danbooru.yandere", "danbooru_backend", ("yandere",), "#yandere"),
|
||||
setup_profile("danbooru.random", "danbooru_backend", ("random",), "#random", "#random #lewd #nsfw"),
|
||||
setup_profile("danbooru.yandere", "danbooru_backend", ("yandere",), "#yandere", "#yandere #lewd #nsfw"),
|
||||
)
|
||||
|
||||
# Default post behavior:
|
||||
|
@ -1,2 +1,3 @@
|
||||
Mastodon.py
|
||||
cryptography
|
||||
cryptography
|
||||
python-magic
|
||||
|
@ -24,24 +24,33 @@ class downloader:
|
||||
username = None
|
||||
password = None
|
||||
tmp = None
|
||||
banned = None
|
||||
unallowed_extensions = (".zip",)
|
||||
|
||||
def __init__(self, banned = tuple(), username=None, password=None, tmp="/tmp"):
|
||||
def __init__(self, username=None, password=None, tmp="/tmp"):
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.tmp = tmp
|
||||
self.banned = banned
|
||||
|
||||
|
||||
def is_banned(self, tag_list):
|
||||
for tag in self.banned:
|
||||
if tag in tag_list:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def download(self, profile):
|
||||
# Search ratings: s=safe, e=explicit
|
||||
def download_post(self, post):
|
||||
file_url = post["file_url"]
|
||||
full_path = post["full_path"]
|
||||
|
||||
remote_image = requests.get(file_url)
|
||||
|
||||
if remote_image.status_code != 200:
|
||||
print("Remote image request returned:", remote_image.status_code)
|
||||
return None
|
||||
|
||||
for d in full_path:
|
||||
with open(d, "wb") as f:
|
||||
f.write(remote_image.content)
|
||||
|
||||
return post
|
||||
|
||||
|
||||
def fetch_post(self, profile):
|
||||
# Search ratings: s=safe, e=nsfw
|
||||
# base_url = "https://danbooru.donmai.us/posts.json?random=true&tags={}&rating=e&limit=1"
|
||||
tags = profile["tags"]
|
||||
search_url = "https://danbooru.donmai.us/posts.json?random=true&limit=1"
|
||||
@ -50,64 +59,49 @@ class downloader:
|
||||
search_url = "{}&tags={}".format(search_url, search_tags)
|
||||
|
||||
search_request = None
|
||||
while True:
|
||||
if self.username and self.password:
|
||||
search_request = requests.get(search_url,
|
||||
auth=(self.username, self.password)
|
||||
)
|
||||
else:
|
||||
search_request = requests.get(search_url)
|
||||
if self.username and self.password:
|
||||
search_request = requests.get(search_url,
|
||||
auth=(self.username, self.password)
|
||||
)
|
||||
else:
|
||||
search_request = requests.get(search_url)
|
||||
|
||||
if search_request.status_code != 200:
|
||||
print("Search request returned:", search_request.status_code)
|
||||
continue
|
||||
elif "large_file_url" not in search_request.json()[0]:
|
||||
continue
|
||||
elif "tag_string" not in search_request.json()[0]:
|
||||
continue
|
||||
elif "tag_string_general" not in search_request.json()[0]:
|
||||
continue
|
||||
elif self.is_banned(search_request.json()[0]["tag_string"]):
|
||||
print("Banned Tag1:", search_request.json()[0]["tag_string"])
|
||||
continue
|
||||
elif self.is_banned(search_request.json()[0]["tag_string_general"]):
|
||||
print("Banned Tag2",search_request.json()[0]["tag_string_general"])
|
||||
continue
|
||||
break
|
||||
|
||||
import pprint
|
||||
pprint.pprint(search_request.json()[0]["tag_string"])
|
||||
|
||||
large_file_url = search_request.json()[0]["file_url"]
|
||||
|
||||
explicit = search_request.json()[0]["rating"]
|
||||
explicit = get_most_sever_rating(explicit)
|
||||
|
||||
remote_image = requests.get(large_file_url)
|
||||
|
||||
if remote_image.status_code != 200:
|
||||
print("Remote image request returned:", remote_image.status_code)
|
||||
if search_request.status_code != 200:
|
||||
print("Search request returned:", search_request.status_code)
|
||||
return None
|
||||
response = search_request.json()[0]
|
||||
tag_response = []
|
||||
if "file_url" not in response:
|
||||
print("file_url is not in response")
|
||||
return None
|
||||
|
||||
# Aggregate Tags
|
||||
for tag_type in "tag_string", "tag_string_general":
|
||||
if tag_type in response:
|
||||
tag_response.append(response[tag_type].strip())
|
||||
|
||||
basename = large_file_url.rsplit("/", 1)[1]
|
||||
nsfw = search_request.json()[0]["rating"]
|
||||
nsfw = get_most_sever_rating(nsfw)
|
||||
|
||||
file_url = response["file_url"]
|
||||
|
||||
basename = file_url.rsplit("/", 1)[1]
|
||||
full_path = os.path.join(self.tmp, basename)
|
||||
|
||||
with open(full_path, "wb") as f:
|
||||
f.write(remote_image.content)
|
||||
|
||||
r = {
|
||||
# Add profile to dictioanry
|
||||
"name": profile["name"],
|
||||
"backend": profile["backend"],
|
||||
"tags": profile["tags"],
|
||||
"message": profile["message"],
|
||||
"message_nsfw": profile["message_nsfw"],
|
||||
|
||||
# Query results
|
||||
"search_url": search_url,
|
||||
"large_file_url": large_file_url,
|
||||
"file_url": file_url,
|
||||
"full_path": [full_path],
|
||||
"explicit": explicit,
|
||||
"tag_response": " ".join(tag_response),
|
||||
"nsfw": nsfw
|
||||
}
|
||||
|
||||
|
||||
return r
|
||||
|
@ -23,6 +23,7 @@ import fnmatch
|
||||
import math
|
||||
import shutil
|
||||
import importlib
|
||||
import magic
|
||||
from threading import Event
|
||||
from mastodon import Mastodon, MastodonIllegalArgumentError, MastodonAPIError, MastodonVersionError
|
||||
|
||||
@ -124,15 +125,15 @@ class YandereBot:
|
||||
# Maybe I should remove this from the backend?
|
||||
def print_header_stats(self, picked, date_selection, date_next_selection):
|
||||
picked_name = picked["name"] if picked else None
|
||||
picked_url = picked["large_file_url"] if picked else None
|
||||
picked_url = picked["file_url"] if picked else None
|
||||
picked_path = picked["full_path"] if picked else None
|
||||
picked_explicit = picked["explicit"] if picked else None
|
||||
picked_nsfw = picked["nsfw"] if picked else None
|
||||
next_selection_seconds = max(0, int(time_diff_seconds(date_next_selection, date_selection)))
|
||||
|
||||
print("[Profile]", picked_name)
|
||||
print(picked_path)
|
||||
print(picked_url)
|
||||
print("Explicit:", picked_explicit)
|
||||
print("Explicit:", picked_nsfw)
|
||||
print("Selection time: {}".format(
|
||||
date_selection.strftime(self.settings_time["long_date_format"])) )
|
||||
print("Next selection time: {} ({} seconds)".format(
|
||||
@ -140,16 +141,51 @@ class YandereBot:
|
||||
print("[ {} Selected during session | {} Failed ]\n".format(
|
||||
self.currentSessionCount, self.failed_uploads) )
|
||||
|
||||
|
||||
def is_banned(self, tag_response):
|
||||
for tag in self.settings_banned:
|
||||
if tag in tag_response:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# Returns a list of media paths (without the hashes)
|
||||
def get_media_list(self, picked):
|
||||
try:
|
||||
banned = self.settings_banned
|
||||
backend_s = picked["backend"]
|
||||
backend = importlib.import_module(backend_s)
|
||||
username = self.settings_credentials[backend_s]["username"]
|
||||
password = self.settings_credentials[backend_s]["password"]
|
||||
downloader = backend.downloader(banned, username, password, tmp=self.settings_behavior["tmp_dir"])
|
||||
img = downloader.download(picked)
|
||||
img = None
|
||||
downloader = backend.downloader(username, password, tmp=self.settings_behavior["tmp_dir"])
|
||||
|
||||
while self.can_post():
|
||||
img = downloader.fetch_post(picked)
|
||||
if img is None:
|
||||
self.eventSleep.wait(1)
|
||||
break
|
||||
if self.is_banned(img["tag_response"]):
|
||||
print("Banned tag:", img["tag_response"])
|
||||
self.eventSleep.wait(1)
|
||||
continue
|
||||
if downloader.download_post(img) is None:
|
||||
print("Failed to download image")
|
||||
self.eventSleep.wait(1)
|
||||
continue
|
||||
# Make sure the file is not malicious
|
||||
full_path = img["full_path"]
|
||||
mime = [magic.from_file(path, mime=True) for path in full_path]
|
||||
if None in mime:
|
||||
print("Unknown mime type.", mime)
|
||||
self.eventSleep.wait(1)
|
||||
continue
|
||||
mime_categories = [c.split("/", 1)[0] for c in mime]
|
||||
invalid_mime_categories = [c for c in mime_categories if c not in ("image", "video")]
|
||||
if invalid_mime_categories:
|
||||
print("mime type not allowed:", mime)
|
||||
self.eventSleep.wait(1)
|
||||
continue
|
||||
break
|
||||
return img
|
||||
except ImportError:
|
||||
print("Invalid Backend:", picked["backend"])
|
||||
@ -165,7 +201,9 @@ class YandereBot:
|
||||
def get_post_text(self, picked, media_list):
|
||||
content_type = self.settings_behavior["content_type"]
|
||||
content_newline = self.settings_behavior["content_newline"]
|
||||
static_message = content_newline.join(picked["message"])
|
||||
nsfw = picked["nsfw"]
|
||||
message = picked["message_nsfw"] if nsfw else picked["message"]
|
||||
static_message = content_newline.join(message)
|
||||
string_post = ""
|
||||
string_imglinks = []
|
||||
|
||||
@ -196,7 +234,7 @@ class YandereBot:
|
||||
message,
|
||||
media_ids=[i[1] for i in media_list if len(i) == 2],
|
||||
visibility=self.settings_behavior["visibility"],
|
||||
sensitive=picked["explicit"],
|
||||
sensitive=picked["nsfw"],
|
||||
content_type=content_type
|
||||
)
|
||||
return picked
|
||||
@ -234,6 +272,9 @@ class YandereBot:
|
||||
print("Posting...", picked_profile["name"])
|
||||
picked = self.get_media_list(picked_profile)
|
||||
self.currentIndexCount += 1
|
||||
|
||||
if not self.can_post():
|
||||
return None
|
||||
|
||||
self._post(picked)
|
||||
|
||||
@ -271,10 +312,10 @@ class YandereBot:
|
||||
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
|
||||
# mastodon.py API will not specify why the connection timed out).
|
||||
# The default assumption is #2
|
||||
except Exception as e:
|
||||
print("Unhandled Exception:", e)
|
||||
# Exception flags
|
||||
timeout = True
|
||||
# except Exception as e:
|
||||
# print("Unhandled Exception:", e)
|
||||
# # Exception flags
|
||||
# timeout = True
|
||||
|
||||
# An exception occurred
|
||||
self.failed_uploads += 1
|
||||
|
Reference in New Issue
Block a user