diff --git a/src/gelbooru_backend.py b/src/gelbooru_backend.py index 8c6f9c9..cc641f4 100644 --- a/src/gelbooru_backend.py +++ b/src/gelbooru_backend.py @@ -25,162 +25,156 @@ import random import time def random_tag(*tags): - return len(tags) == 1 and tags[0].lower() == "random" + return len(tags) == 1 and tags[0].lower() == "random" def collect_tags(post): - return post["tags"].strip().lower().split() - + return post["tags"].strip().lower().split() + def is_banned(post, profile): - tag_response = collect_tags(post) - tag_banned = profile["banned_tags"] - for tag in tag_banned: - if tag in tag_response: - return tag - return None + tag_response = collect_tags(post) + tag_banned = profile["banned_tags"] + for tag in tag_banned: + if tag in tag_response: + return tag + return None def get_nsfw(post): - return post["rating"] in ("questionable", "explicit") + return post["rating"] in ("questionable", "explicit") def select_from_response(response, profile, max_size=None): - for post in response: - # Do not select banned tags - if is_banned(post, profile): - continue - # Make sure file_url keyword is in the query - elif "file_url" not in post: - continue - # Select only nsfw - elif ( profile["force_nsfw"] is not None and - profile["force_nsfw"] != get_nsfw(post) - ): - continue - # Make sure serverside size is not larger than max_size - elif ( max_size != None and - "file_size" in post and - post["file_size"] > max_size - ): - continue - return post - return None + for post in response: + # Do not select banned tags + if is_banned(post, profile): + continue + # Make sure file_url keyword is in the query + elif "file_url" not in post: + continue + # Select only nsfw + elif ( profile["force_nsfw"] is not None and + profile["force_nsfw"] != get_nsfw(post) + ): + continue + # Make sure serverside size is not larger than max_size + elif ( max_size != None and + "file_size" in post and + post["file_size"] > max_size + ): + continue + return post + return None class downloader: - username = None - password = None - max_size = None - tmp = "" - url = "" - api_endpoint = "index.php?page=dapi&s=post&q=index&json=1" - api_tags = "&tags={}" - api_limit = "&limit={}" - api_offset = "&pid={}" - limit = 100 - max_depth = 200 + def __init__(self, backend_credentials): + self.api_endpoint = "index.php?page=dapi&s=post&q=index&json=1" + self.api_tags = "&tags={}" + self.api_limit = "&limit={}" + self.api_offset = "&pid={}" + self.limit = 100 - def __init__(self, backend_credentials): - self.username = backend_credentials["username"] - self.password = backend_credentials["password"] - self.max_size = backend_credentials["max_size"] - self.tmp = backend_credentials["tmp_dir"] - self.url = backend_credentials["url"] - self.max_depth = backend_credentials["max_depth"] - random.seed(os.urandom(16)) + self.username = backend_credentials["username"] + self.password = backend_credentials["password"] + self.max_size = backend_credentials["max_size"] + self.tmp = backend_credentials["tmp_dir"] + self.url = backend_credentials["url"] + self.max_depth = backend_credentials["max_depth"] + random.seed(os.urandom(16)) - def download_post(self, post): - file_url = post["file_url"] - full_path = post["full_path"] + def download_post(self, post): + file_url = post["file_url"] + full_path = post["full_path"] - remote_image = requests.get(file_url) + remote_image = requests.get(file_url) - if remote_image.status_code != 200: - print("Remote image request returned:", remote_image.status_code) - return None + if remote_image.status_code != 200: + print("Remote image request returned:", remote_image.status_code) + return None - with open(full_path, "wb") as f: - f.write(remote_image.content) + with open(full_path, "wb") as f: + f.write(remote_image.content) - return post + return post - def get_full_url(self, limit=100, offset=0, *tags): - search_url = "/".join((self.url, self.api_endpoint)) - search_url += self.api_limit.format(str(limit)) - search_url += self.api_offset.format(str(offset)) - if tags and not random_tag(*tags): - search_tags = "+".join(tags) - search_url += self.api_tags.format(search_tags) - return search_url - + def get_full_url(self, limit=100, offset=0, *tags): + search_url = "/".join((self.url, self.api_endpoint)) + search_url += self.api_limit.format(str(limit)) + search_url += self.api_offset.format(str(offset)) + if tags and not random_tag(*tags): + search_tags = "+".join(tags) + search_url += self.api_tags.format(search_tags) + return search_url + - def search(self, search_url): - if self.username and self.password: - return requests.get(search_url, - auth=(self.username, self.password) - ) - else: - return requests.get(search_url) + def search(self, search_url): + if self.username and self.password: + return requests.get(search_url, + auth=(self.username, self.password) + ) + else: + return requests.get(search_url) - def fetch_post(self, profile): - # Search ratings: s=safe, e=nsfw - # base_url = "https://danbooru.donmai.us/posts.json?random=true&tags={}&rating=e&limit=1" - tags = profile["tags"] + def fetch_post(self, profile): + # Search ratings: s=safe, e=nsfw + # base_url = "https://danbooru.donmai.us/posts.json?random=true&tags={}&rating=e&limit=1" + tags = profile["tags"] - # First query - page_offset = random.randint(0, self.max_depth) - search_url = self.get_full_url(self.limit, page_offset, *tags) - search_request = self.search(search_url) - search_ok = search_request.status_code == 200 + # First query + page_offset = random.randint(0, self.max_depth) + search_url = self.get_full_url(self.limit, page_offset, *tags) + search_request = self.search(search_url) + search_ok = search_request.status_code == 200 - # Second query if our page offset is too high - if search_ok and "post" not in search_request.json(): - max_posts = int(search_request.json()["@attributes"]["count"]) - total_pages = max_posts // self.limit + # Second query if our page offset is too high + if search_ok and "post" not in search_request.json(): + max_posts = int(search_request.json()["@attributes"]["count"]) + total_pages = max_posts // self.limit - # There is no point in querying again if max_posts is 0 - if max_posts <= 0: - return None + # There is no point in querying again if max_posts is 0 + if max_posts <= 0: + return None - page_offset = random.randint(0, total_pages) - search_url = self.get_full_url(self.limit, page_offset, *tags) - time.sleep(1) - search_request = self.search(search_url) - search_ok = search_request.status_code == 200 + page_offset = random.randint(0, total_pages) + search_url = self.get_full_url(self.limit, page_offset, *tags) + time.sleep(1) + search_request = self.search(search_url) + search_ok = search_request.status_code == 200 - if not search_ok: - print("Search request returned:", search_request.status_code) - return None + if not search_ok: + print("Search request returned:", search_request.status_code) + return None - posts = search_request.json()["post"] - random.shuffle(posts) + posts = search_request.json()["post"] + random.shuffle(posts) - selected = select_from_response(posts, profile, self.max_size) + selected = select_from_response(posts, profile, self.max_size) - if selected is None: - print("Could not select image based on criteria") - return None + if selected is None: + print("Could not select image based on criteria") + return None - tag_response = collect_tags(selected) - nsfw = get_nsfw(selected) - file_url = selected["file_url"] + tag_response = collect_tags(selected) + nsfw = get_nsfw(selected) + file_url = selected["file_url"] - basename = file_url.rsplit("/", 1)[1] - full_path = os.path.join(self.tmp, basename) + basename = file_url.rsplit("/", 1)[1] + full_path = os.path.join(self.tmp, basename) - r = { - # Add profile to dictioanry - "profile": profile, + r = { + # Add profile to dictioanry + "profile": profile, - # Query results - "search_url": search_url, - "file_url": file_url, - "full_path": full_path, - "tag_response": tag_response, - "nsfw": nsfw - } + # Query results + "search_url": search_url, + "file_url": file_url, + "full_path": full_path, + "tag_response": tag_response, + "nsfw": nsfw + } - return r + return r