Changed whitespace format
This commit is contained in:
parent
02f454d32d
commit
5d51efefa4
@ -25,156 +25,156 @@ import random
|
||||
import time
|
||||
|
||||
def random_tag(*tags):
|
||||
return len(tags) == 1 and tags[0].lower() == "random"
|
||||
return len(tags) == 1 and tags[0].lower() == "random"
|
||||
|
||||
|
||||
def collect_tags(post):
|
||||
return post["tags"].strip().lower().split()
|
||||
return post["tags"].strip().lower().split()
|
||||
|
||||
|
||||
def is_banned(post, profile):
|
||||
tag_response = collect_tags(post)
|
||||
tag_banned = profile["banned_tags"]
|
||||
for tag in tag_banned:
|
||||
if tag in tag_response:
|
||||
return tag
|
||||
return None
|
||||
tag_response = collect_tags(post)
|
||||
tag_banned = profile["banned_tags"]
|
||||
for tag in tag_banned:
|
||||
if tag in tag_response:
|
||||
return tag
|
||||
return None
|
||||
|
||||
|
||||
def get_nsfw(post):
|
||||
return post["rating"] in ("questionable", "explicit")
|
||||
return post["rating"] in ("questionable", "explicit")
|
||||
|
||||
|
||||
def select_from_response(response, profile, max_size=None):
|
||||
for post in response:
|
||||
# Do not select banned tags
|
||||
if is_banned(post, profile):
|
||||
continue
|
||||
# Make sure file_url keyword is in the query
|
||||
elif "file_url" not in post:
|
||||
continue
|
||||
# Select only nsfw
|
||||
elif ( profile["force_nsfw"] is not None and
|
||||
profile["force_nsfw"] != get_nsfw(post)
|
||||
):
|
||||
continue
|
||||
# Make sure serverside size is not larger than max_size
|
||||
elif ( max_size != None and
|
||||
"file_size" in post and
|
||||
post["file_size"] > max_size
|
||||
):
|
||||
continue
|
||||
return post
|
||||
return None
|
||||
for post in response:
|
||||
# Do not select banned tags
|
||||
if is_banned(post, profile):
|
||||
continue
|
||||
# Make sure file_url keyword is in the query
|
||||
elif "file_url" not in post:
|
||||
continue
|
||||
# Select only nsfw
|
||||
elif ( profile["force_nsfw"] is not None and
|
||||
profile["force_nsfw"] != get_nsfw(post)
|
||||
):
|
||||
continue
|
||||
# Make sure serverside size is not larger than max_size
|
||||
elif ( max_size != None and
|
||||
"file_size" in post and
|
||||
post["file_size"] > max_size
|
||||
):
|
||||
continue
|
||||
return post
|
||||
return None
|
||||
|
||||
|
||||
class downloader:
|
||||
def __init__(self, backend_credentials):
|
||||
self.api_endpoint = "index.php?page=dapi&s=post&q=index&json=1"
|
||||
self.api_tags = "&tags={}"
|
||||
self.api_limit = "&limit={}"
|
||||
self.api_offset = "&pid={}"
|
||||
self.limit = 100
|
||||
def __init__(self, backend_credentials):
|
||||
self.api_endpoint = "index.php?page=dapi&s=post&q=index&json=1"
|
||||
self.api_tags = "&tags={}"
|
||||
self.api_limit = "&limit={}"
|
||||
self.api_offset = "&pid={}"
|
||||
self.limit = 100
|
||||
|
||||
self.username = backend_credentials["username"]
|
||||
self.password = backend_credentials["password"]
|
||||
self.max_size = backend_credentials["max_size"]
|
||||
self.tmp = backend_credentials["tmp_dir"]
|
||||
self.url = backend_credentials["url"]
|
||||
self.max_depth = backend_credentials["max_depth"]
|
||||
random.seed(os.urandom(16))
|
||||
self.username = backend_credentials["username"]
|
||||
self.password = backend_credentials["password"]
|
||||
self.max_size = backend_credentials["max_size"]
|
||||
self.tmp = backend_credentials["tmp_dir"]
|
||||
self.url = backend_credentials["url"]
|
||||
self.max_depth = backend_credentials["max_depth"]
|
||||
random.seed(os.urandom(16))
|
||||
|
||||
|
||||
def download_post(self, post):
|
||||
file_url = post["file_url"]
|
||||
full_path = post["full_path"]
|
||||
def download_post(self, post):
|
||||
file_url = post["file_url"]
|
||||
full_path = post["full_path"]
|
||||
|
||||
remote_image = requests.get(file_url)
|
||||
remote_image = requests.get(file_url)
|
||||
|
||||
if remote_image.status_code != 200:
|
||||
print("Remote image request returned:", remote_image.status_code)
|
||||
return None
|
||||
if remote_image.status_code != 200:
|
||||
print("Remote image request returned:", remote_image.status_code)
|
||||
return None
|
||||
|
||||
with open(full_path, "wb") as f:
|
||||
f.write(remote_image.content)
|
||||
with open(full_path, "wb") as f:
|
||||
f.write(remote_image.content)
|
||||
|
||||
return post
|
||||
return post
|
||||
|
||||
def get_full_url(self, limit=100, offset=0, *tags):
|
||||
search_url = "/".join((self.url, self.api_endpoint))
|
||||
search_url += self.api_limit.format(str(limit))
|
||||
search_url += self.api_offset.format(str(offset))
|
||||
if tags and not random_tag(*tags):
|
||||
search_tags = "+".join(tags)
|
||||
search_url += self.api_tags.format(search_tags)
|
||||
return search_url
|
||||
def get_full_url(self, limit=100, offset=0, *tags):
|
||||
search_url = "/".join((self.url, self.api_endpoint))
|
||||
search_url += self.api_limit.format(str(limit))
|
||||
search_url += self.api_offset.format(str(offset))
|
||||
if tags and not random_tag(*tags):
|
||||
search_tags = "+".join(tags)
|
||||
search_url += self.api_tags.format(search_tags)
|
||||
return search_url
|
||||
|
||||
|
||||
def search(self, search_url):
|
||||
if self.username and self.password:
|
||||
return requests.get(search_url,
|
||||
auth=(self.username, self.password)
|
||||
)
|
||||
else:
|
||||
return requests.get(search_url)
|
||||
def search(self, search_url):
|
||||
if self.username and self.password:
|
||||
return requests.get(search_url,
|
||||
auth=(self.username, self.password)
|
||||
)
|
||||
else:
|
||||
return requests.get(search_url)
|
||||
|
||||
|
||||
def fetch_post(self, profile):
|
||||
# Search ratings: s=safe, e=nsfw
|
||||
# base_url = "https://danbooru.donmai.us/posts.json?random=true&tags={}&rating=e&limit=1"
|
||||
tags = profile["tags"]
|
||||
def fetch_post(self, profile):
|
||||
# Search ratings: s=safe, e=nsfw
|
||||
# base_url = "https://danbooru.donmai.us/posts.json?random=true&tags={}&rating=e&limit=1"
|
||||
tags = profile["tags"]
|
||||
|
||||
# First query
|
||||
page_offset = random.randint(0, self.max_depth)
|
||||
search_url = self.get_full_url(self.limit, page_offset, *tags)
|
||||
search_request = self.search(search_url)
|
||||
search_ok = search_request.status_code == 200
|
||||
# First query
|
||||
page_offset = random.randint(0, self.max_depth)
|
||||
search_url = self.get_full_url(self.limit, page_offset, *tags)
|
||||
search_request = self.search(search_url)
|
||||
search_ok = search_request.status_code == 200
|
||||
|
||||
# Second query if our page offset is too high
|
||||
if search_ok and "post" not in search_request.json():
|
||||
max_posts = int(search_request.json()["@attributes"]["count"])
|
||||
total_pages = max_posts // self.limit
|
||||
# Second query if our page offset is too high
|
||||
if search_ok and "post" not in search_request.json():
|
||||
max_posts = int(search_request.json()["@attributes"]["count"])
|
||||
total_pages = max_posts // self.limit
|
||||
|
||||
# There is no point in querying again if max_posts is 0
|
||||
if max_posts <= 0:
|
||||
return None
|
||||
# There is no point in querying again if max_posts is 0
|
||||
if max_posts <= 0:
|
||||
return None
|
||||
|
||||
page_offset = random.randint(0, total_pages)
|
||||
search_url = self.get_full_url(self.limit, page_offset, *tags)
|
||||
time.sleep(1)
|
||||
search_request = self.search(search_url)
|
||||
search_ok = search_request.status_code == 200
|
||||
page_offset = random.randint(0, total_pages)
|
||||
search_url = self.get_full_url(self.limit, page_offset, *tags)
|
||||
time.sleep(1)
|
||||
search_request = self.search(search_url)
|
||||
search_ok = search_request.status_code == 200
|
||||
|
||||
if not search_ok:
|
||||
print("Search {} request returned: {}".format(search_url, search_request.status_code))
|
||||
return None
|
||||
if not search_ok:
|
||||
print("Search {} request returned: {}".format(search_url, search_request.status_code))
|
||||
return None
|
||||
|
||||
posts = search_request.json()["post"]
|
||||
random.shuffle(posts)
|
||||
posts = search_request.json()["post"]
|
||||
random.shuffle(posts)
|
||||
|
||||
selected = select_from_response(posts, profile, self.max_size)
|
||||
selected = select_from_response(posts, profile, self.max_size)
|
||||
|
||||
if selected is None:
|
||||
print("Could not select image based on criteria", search_url)
|
||||
return None
|
||||
if selected is None:
|
||||
print("Could not select image based on criteria", search_url)
|
||||
return None
|
||||
|
||||
tag_response = collect_tags(selected)
|
||||
nsfw = get_nsfw(selected)
|
||||
file_url = selected["file_url"]
|
||||
tag_response = collect_tags(selected)
|
||||
nsfw = get_nsfw(selected)
|
||||
file_url = selected["file_url"]
|
||||
|
||||
basename = file_url.rsplit("/", 1)[1]
|
||||
full_path = os.path.join(self.tmp, basename)
|
||||
basename = file_url.rsplit("/", 1)[1]
|
||||
full_path = os.path.join(self.tmp, basename)
|
||||
|
||||
r = {
|
||||
# Add profile to dictioanry
|
||||
"profile": profile,
|
||||
r = {
|
||||
# Add profile to dictioanry
|
||||
"profile": profile,
|
||||
|
||||
# Query results
|
||||
"search_url": search_url,
|
||||
"file_url": file_url,
|
||||
"full_path": full_path,
|
||||
"tag_response": tag_response,
|
||||
"nsfw": nsfw
|
||||
}
|
||||
# Query results
|
||||
"search_url": search_url,
|
||||
"file_url": file_url,
|
||||
"full_path": full_path,
|
||||
"tag_response": tag_response,
|
||||
"nsfw": nsfw
|
||||
}
|
||||
|
||||
return r
|
||||
return r
|
||||
|
Reference in New Issue
Block a user