Compare commits

..

No commits in common. "22c5a6e231bccc4b7cb834a9c5929bc4cb8b1548" and "2e22104a668f6c8681b18f5bdd27b9651aee271e" have entirely different histories.

2 changed files with 227 additions and 240 deletions

View File

@ -25,156 +25,162 @@ import random
import time import time
def random_tag(*tags): def random_tag(*tags):
return len(tags) == 1 and tags[0].lower() == "random" return len(tags) == 1 and tags[0].lower() == "random"
def collect_tags(post): def collect_tags(post):
return post["tags"].strip().lower().split() return post["tags"].strip().lower().split()
def is_banned(post, profile): def is_banned(post, profile):
tag_response = collect_tags(post) tag_response = collect_tags(post)
tag_banned = profile["banned_tags"] tag_banned = profile["banned_tags"]
for tag in tag_banned: for tag in tag_banned:
if tag in tag_response: if tag in tag_response:
return tag return tag
return None return None
def get_nsfw(post): def get_nsfw(post):
return post["rating"] in ("questionable", "explicit") return post["rating"] in ("questionable", "explicit")
def select_from_response(response, profile, max_size=None): def select_from_response(response, profile, max_size=None):
for post in response: for post in response:
# Do not select banned tags # Do not select banned tags
if is_banned(post, profile): if is_banned(post, profile):
continue continue
# Make sure file_url keyword is in the query # Make sure file_url keyword is in the query
elif "file_url" not in post: elif "file_url" not in post:
continue continue
# Select only nsfw # Select only nsfw
elif ( profile["force_nsfw"] is not None and elif ( profile["force_nsfw"] is not None and
profile["force_nsfw"] != get_nsfw(post) profile["force_nsfw"] != get_nsfw(post)
): ):
continue continue
# Make sure serverside size is not larger than max_size # Make sure serverside size is not larger than max_size
elif ( max_size != None and elif ( max_size != None and
"file_size" in post and "file_size" in post and
post["file_size"] > max_size post["file_size"] > max_size
): ):
continue continue
return post return post
return None return None
class downloader: class downloader:
def __init__(self, backend_credentials): username = None
self.api_endpoint = "index.php?page=dapi&s=post&q=index&json=1" password = None
self.api_tags = "&tags={}" max_size = None
self.api_limit = "&limit={}" tmp = ""
self.api_offset = "&pid={}" url = ""
self.limit = 100 api_endpoint = "index.php?page=dapi&s=post&q=index&json=1"
api_tags = "&tags={}"
api_limit = "&limit={}"
api_offset = "&pid={}"
limit = 100
max_depth = 200
self.username = backend_credentials["username"] def __init__(self, backend_credentials):
self.password = backend_credentials["password"] self.username = backend_credentials["username"]
self.max_size = backend_credentials["max_size"] self.password = backend_credentials["password"]
self.tmp = backend_credentials["tmp_dir"] self.max_size = backend_credentials["max_size"]
self.url = backend_credentials["url"] self.tmp = backend_credentials["tmp_dir"]
self.max_depth = backend_credentials["max_depth"] self.url = backend_credentials["url"]
random.seed(os.urandom(16)) self.max_depth = backend_credentials["max_depth"]
random.seed(os.urandom(16))
def download_post(self, post): def download_post(self, post):
file_url = post["file_url"] file_url = post["file_url"]
full_path = post["full_path"] full_path = post["full_path"]
remote_image = requests.get(file_url) remote_image = requests.get(file_url)
if remote_image.status_code != 200: if remote_image.status_code != 200:
print("Remote image request returned:", remote_image.status_code) print("Remote image request returned:", remote_image.status_code)
return None return None
with open(full_path, "wb") as f: with open(full_path, "wb") as f:
f.write(remote_image.content) f.write(remote_image.content)
return post return post
def get_full_url(self, limit=100, offset=0, *tags): def get_full_url(self, limit=100, offset=0, *tags):
search_url = "/".join((self.url, self.api_endpoint)) search_url = "/".join((self.url, self.api_endpoint))
search_url += self.api_limit.format(str(limit)) search_url += self.api_limit.format(str(limit))
search_url += self.api_offset.format(str(offset)) search_url += self.api_offset.format(str(offset))
if tags and not random_tag(*tags): if tags and not random_tag(*tags):
search_tags = "+".join(tags) search_tags = "+".join(tags)
search_url += self.api_tags.format(search_tags) search_url += self.api_tags.format(search_tags)
return search_url return search_url
def search(self, search_url): def search(self, search_url):
if self.username and self.password: if self.username and self.password:
return requests.get(search_url, return requests.get(search_url,
auth=(self.username, self.password) auth=(self.username, self.password)
) )
else: else:
return requests.get(search_url) return requests.get(search_url)
def fetch_post(self, profile): def fetch_post(self, profile):
# Search ratings: s=safe, e=nsfw # Search ratings: s=safe, e=nsfw
# base_url = "https://danbooru.donmai.us/posts.json?random=true&tags={}&rating=e&limit=1" # base_url = "https://danbooru.donmai.us/posts.json?random=true&tags={}&rating=e&limit=1"
tags = profile["tags"] tags = profile["tags"]
# First query # First query
page_offset = random.randint(0, self.max_depth) page_offset = random.randint(0, self.max_depth)
search_url = self.get_full_url(self.limit, page_offset, *tags) search_url = self.get_full_url(self.limit, page_offset, *tags)
search_request = self.search(search_url) search_request = self.search(search_url)
search_ok = search_request.status_code == 200 search_ok = search_request.status_code == 200
# Second query if our page offset is too high # Second query if our page offset is too high
if search_ok and "post" not in search_request.json(): if search_ok and "post" not in search_request.json():
max_posts = int(search_request.json()["@attributes"]["count"]) max_posts = int(search_request.json()["@attributes"]["count"])
total_pages = max_posts // self.limit total_pages = max_posts // self.limit
# There is no point in querying again if max_posts is 0 # There is no point in querying again if max_posts is 0
if max_posts <= 0: if max_posts <= 0:
return None return None
page_offset = random.randint(0, total_pages) page_offset = random.randint(0, total_pages)
search_url = self.get_full_url(self.limit, page_offset, *tags) search_url = self.get_full_url(self.limit, page_offset, *tags)
time.sleep(1) time.sleep(1)
search_request = self.search(search_url) search_request = self.search(search_url)
search_ok = search_request.status_code == 200 search_ok = search_request.status_code == 200
if not search_ok: if not search_ok:
print("Search request returned:", search_request.status_code) print("Search request returned:", search_request.status_code)
return None return None
posts = search_request.json()["post"] posts = search_request.json()["post"]
random.shuffle(posts) random.shuffle(posts)
selected = select_from_response(posts, profile, self.max_size) selected = select_from_response(posts, profile, self.max_size)
if selected is None: if selected is None:
print("Could not select image based on criteria") print("Could not select image based on criteria")
return None return None
tag_response = collect_tags(selected) tag_response = collect_tags(selected)
nsfw = get_nsfw(selected) nsfw = get_nsfw(selected)
file_url = selected["file_url"] file_url = selected["file_url"]
basename = file_url.rsplit("/", 1)[1] basename = file_url.rsplit("/", 1)[1]
full_path = os.path.join(self.tmp, basename) full_path = os.path.join(self.tmp, basename)
r = { r = {
# Add profile to dictioanry # Add profile to dictioanry
"profile": profile, "profile": profile,
# Query results # Query results
"search_url": search_url, "search_url": search_url,
"file_url": file_url, "file_url": file_url,
"full_path": full_path, "full_path": full_path,
"tag_response": tag_response, "tag_response": tag_response,
"nsfw": nsfw "nsfw": nsfw
} }
return r return r

View File

@ -22,165 +22,146 @@
import requests import requests
import os import os
import random import random
import re
import time
def random_tag(*tags): def random_tag(*tags):
return len(tags) == 1 and tags[0].lower() == "random" return len(tags) == 1 and tags[0].lower() == "random"
def collect_tags(post): def collect_tags(post):
return post["tags"].strip().lower().split() return post["tags"].strip().lower().split()
def is_banned(post, profile): def is_banned(post, profile):
tag_response = collect_tags(post) tag_response = collect_tags(post)
tag_banned = profile["banned_tags"] tag_banned = profile["banned_tags"]
for tag in tag_banned: for tag in tag_banned:
if tag in tag_response: if tag in tag_response:
return tag return tag
return None return None
def get_nsfw(post): def get_nsfw(post):
return post["rating"] in ("q", "e") return post["rating"] in ("q", "e")
def select_from_response(response, profile, max_size=None): def select_from_response(response, profile, max_size=None):
for post in response: for post in response:
if is_banned(post, profile): if is_banned(post, profile):
continue continue
elif "file_url" not in post: elif "file_url" not in post:
continue continue
# Select only nsfw # Select only nsfw
elif ( profile["force_nsfw"] is not None and elif ( profile["force_nsfw"] is not None and
profile["force_nsfw"] != get_nsfw(post) profile["force_nsfw"] != get_nsfw(post)
): ):
continue continue
# Make sure serverside size is not larger than max_size # Make sure serverside size is not larger than max_size
elif ( max_size != None and elif ( max_size != None and
"file_size" in post and "file_size" in post and
post["file_size"] > max_size post["file_size"] > max_size
): ):
continue continue
return post return post
return None return None
class downloader: class downloader:
def __init__(self, backend_credentials): username = None
self.api_endpoint = "{}/post/index.json?limit={}&page={}" password = None
self.html_endpoint = "{}/post/index?limit={}&page={}" max_size = None
self.tag_url = "&tags={}" tmp = None
self.limit = 100 url = ""
self.retry_limit = 3 api_endpoint = "post.json?random=true&limit=100"
api_tags = "&tags={}"
api_limit = "&limit={}"
api_offset = "&page={}"
limit = 100
max_depth = 200
self.username = backend_credentials["username"] def __init__(self, backend_credentials):
self.password = backend_credentials["password"] self.username = backend_credentials["username"]
self.depth = backend_credentials["max_size"] self.password = backend_credentials["password"]
self.tmp = backend_credentials["tmp_dir"] self.max_size = backend_credentials["max_size"]
self.url = backend_credentials["url"] self.tmp = backend_credentials["tmp_dir"]
self.max_depth = backend_credentials["max_depth"] self.url = backend_credentials["url"]
random.seed(os.urandom(16)) self.max_depth = backend_credentials["max_depth"]
random.seed(os.urandom(16))
def download_post(self, post): def download_post(self, post):
file_url = post["file_url"] file_url = post["file_url"]
full_path = post["full_path"] full_path = post["full_path"]
remote_image = requests.get(file_url) remote_image = requests.get(file_url)
if remote_image.status_code != 200: if remote_image.status_code != 200:
print("Remote image request returned:", remote_image.status_code) print("Remote image request returned:", remote_image.status_code)
return None return None
with open(full_path, "wb") as f: with open(full_path, "wb") as f:
f.write(remote_image.content) f.write(remote_image.content)
return post return post
def search(self, search_url): def get_full_url(self, limit=100, offset=0, *tags):
search_request = None search_url = "/".join((self.url, self.api_endpoint))
if self.username and self.password: search_url += self.api_limit.format(str(limit))
search_request = requests.get(search_url, search_url += self.api_offset.format(str(offset))
auth=(self.username, self.password) if tags and not random_tag(*tags):
) search_tags = "+".join(tags)
else: search_url += self.api_tags.format(search_tags)
search_request = requests.get(search_url) return search_url
return search_request
# I suck at regex :( def search(self, search_url):
def get_max_page(self, html): search_request = None
match = re.findall('page=[0-9]*', html) if self.username and self.password:
if match: search_request = requests.get(search_url,
last_group = match[len(match) - 1] auth=(self.username, self.password)
last_page = last_group.rsplit("=", 1)[1] )
return int(last_page) else:
else: search_request = requests.get(search_url)
return None return search_request
def fetch_post(self, profile):
tags = profile["tags"]
selected = dict()
max_depth = self.max_depth
search_url_tags = "+".join(tags)
search_url = ""
for _ in range(0, self.retry_limit): def fetch_post(self, profile):
page_offset = random.randint(0, max_depth) # Search ratings: s=safe, e=nsfw
search_url = self.api_endpoint.format(self.url, self.limit, page_offset) tags = profile["tags"]
search_url_html = self.html_endpoint.format(self.url, self.limit, page_offset)
if search_url_tags:
search_url += self.tag_url.format(search_url_tags)
search_url_html += self.tag_url.format(search_url_tags)
search_request = self.search(search_url) page_offset = random.randint(0, self.max_depth)
search_url = self.get_full_url(self.limit, page_offset, *tags)
search_request = self.search(search_url)
if search_request.status_code != 200: if search_request.status_code != 200:
print("Search request returned:", search_request.status_code) print("Search request returned:", search_request.status_code)
continue return None
posts = search_request.json() posts = search_request.json()
random.shuffle(posts) random.shuffle(posts)
selected = select_from_response(posts, profile) selected = select_from_response(posts, profile, self.max_size)
if selected is None: if selected is None:
print("Could not select image based on criteria") print("Could not select image based on criteria")
time.sleep(2) return None
search_request = self.search(search_url_html)
if search_request.status_code == 200: tag_response = collect_tags(selected)
new_max_depth = self.get_max_page(search_request.text) nsfw = get_nsfw(selected)
if new_max_depth < max_depth: file_url = selected["file_url"]
max_depth = new_max_depth
else:
max_depth = max_depth // 2
else:
max_depth = max_depth // 2
continue
break basename = "{}.{}".format(selected["md5"], file_url.rsplit(".", 1)[1])
full_path = os.path.join(self.tmp, basename)
if not selected: r = {
return None # Add profile to dictioanry
"profile": profile,
tag_response = collect_tags(selected) # Query results
nsfw = get_nsfw(selected) "search_url": search_url,
file_url = selected["file_url"] "file_url": file_url,
"full_path": full_path,
"tag_response": tag_response,
"nsfw": nsfw
}
basename = "{}.{}".format(selected["md5"], file_url.rsplit(".", 1)[1]) return r
full_path = os.path.join(self.tmp, basename)
r = {
# Add profile to dictioanry
"profile": profile,
# Query results
"search_url": search_url,
"file_url": file_url,
"full_path": full_path,
"tag_response": tag_response,
"nsfw": nsfw
}
return r