From 02f454d32dce08d56545c84d85df866adfb03c65 Mon Sep 17 00:00:00 2001 From: Anon Date: Mon, 8 Jan 2024 17:13:50 -0800 Subject: [PATCH] Changed whitespace and added additional debug and wait information in konachan backend --- src/konachan_backend.py | 242 ++++++++++++++++++++-------------------- 1 file changed, 122 insertions(+), 120 deletions(-) diff --git a/src/konachan_backend.py b/src/konachan_backend.py index 4ab3614..ad67cf1 100644 --- a/src/konachan_backend.py +++ b/src/konachan_backend.py @@ -26,161 +26,163 @@ import re import time def random_tag(*tags): - return len(tags) == 1 and tags[0].lower() == "random" + return len(tags) == 1 and tags[0].lower() == "random" def collect_tags(post): - return post["tags"].strip().lower().split() + return post["tags"].strip().lower().split() def is_banned(post, profile): - tag_response = collect_tags(post) - tag_banned = profile["banned_tags"] - for tag in tag_banned: - if tag in tag_response: - return tag - return None + tag_response = collect_tags(post) + tag_banned = profile["banned_tags"] + for tag in tag_banned: + if tag in tag_response: + return tag + return None def get_nsfw(post): - return post["rating"] in ("q", "e") + return post["rating"] in ("q", "e") def select_from_response(response, profile, max_size=None): - for post in response: - if is_banned(post, profile): - continue - elif "file_url" not in post: - continue - # Select only nsfw - elif ( profile["force_nsfw"] is not None and - profile["force_nsfw"] != get_nsfw(post) - ): - continue - # Make sure serverside size is not larger than max_size - elif ( max_size != None and - "file_size" in post and - post["file_size"] > max_size - ): - continue - return post - return None + for post in response: + if is_banned(post, profile): + continue + elif "file_url" not in post: + continue + # Select only nsfw + elif ( profile["force_nsfw"] is not None and + profile["force_nsfw"] != get_nsfw(post) + ): + continue + # Make sure serverside size is not larger than max_size + elif ( max_size != None and + "file_size" in post and + post["file_size"] > max_size + ): + continue + return post + return None class downloader: - def __init__(self, backend_credentials): - self.api_endpoint = "{}/post/index.json?limit={}&page={}" - self.html_endpoint = "{}/post/index?limit={}&page={}" - self.tag_url = "&tags={}" - self.limit = 100 - self.retry_limit = 3 + def __init__(self, backend_credentials): + self.api_endpoint = "{}/post/index.json?limit={}&page={}" + self.html_endpoint = "{}/post/index?limit={}&page={}" + self.tag_url = "&tags={}" + self.limit = 100 + self.retry_limit = 3 - self.username = backend_credentials["username"] - self.password = backend_credentials["password"] - self.depth = backend_credentials["max_size"] - self.tmp = backend_credentials["tmp_dir"] - self.url = backend_credentials["url"] - self.max_depth = backend_credentials["max_depth"] - random.seed(os.urandom(16)) + self.username = backend_credentials["username"] + self.password = backend_credentials["password"] + self.depth = backend_credentials["max_size"] + self.tmp = backend_credentials["tmp_dir"] + self.url = backend_credentials["url"] + self.max_depth = backend_credentials["max_depth"] + random.seed(os.urandom(16)) - def download_post(self, post): - file_url = post["file_url"] - full_path = post["full_path"] + def download_post(self, post): + file_url = post["file_url"] + full_path = post["full_path"] - remote_image = requests.get(file_url) + remote_image = requests.get(file_url) - if remote_image.status_code != 200: - print("Remote image request returned:", remote_image.status_code) - return None + if remote_image.status_code != 200: + print("Remote image request returned:", remote_image.status_code) + return None - with open(full_path, "wb") as f: - f.write(remote_image.content) + with open(full_path, "wb") as f: + f.write(remote_image.content) - return post + return post - def search(self, search_url): - search_request = None - if self.username and self.password: - search_request = requests.get(search_url, - auth=(self.username, self.password) - ) - else: - search_request = requests.get(search_url) - return search_request + def search(self, search_url): + search_request = None + if self.username and self.password: + search_request = requests.get(search_url, + auth=(self.username, self.password) + ) + else: + search_request = requests.get(search_url) + return search_request - # I suck at regex :( - def get_max_page(self, html): - match = re.findall('page=[0-9]*', html) - if match: - last_group = match[len(match) - 1] - last_page = last_group.rsplit("=", 1)[1] - return int(last_page) - else: - return None + # I suck at regex :( + def get_max_page(self, html): + match = re.findall('page=[0-9]*', html) + if match: + last_group = match[len(match) - 1] + last_page = last_group.rsplit("=", 1)[1] + return int(last_page) + else: + return None - def fetch_post(self, profile): - tags = profile["tags"] - selected = dict() - max_depth = self.max_depth - search_url_tags = "+".join(tags) - search_url = "" + def fetch_post(self, profile): + tags = profile["tags"] + selected = dict() + max_depth = self.max_depth + search_url_tags = "+".join(tags) + search_url = "" - for _ in range(0, self.retry_limit): - page_offset = random.randint(0, max_depth) - search_url = self.api_endpoint.format(self.url, self.limit, page_offset) - search_url_html = self.html_endpoint.format(self.url, self.limit, page_offset) - if search_url_tags: - search_url += self.tag_url.format(search_url_tags) - search_url_html += self.tag_url.format(search_url_tags) + for _ in range(0, self.retry_limit): + page_offset = random.randint(0, max_depth) + search_url = self.api_endpoint.format(self.url, self.limit, page_offset) + search_url_html = self.html_endpoint.format(self.url, self.limit, page_offset) + if search_url_tags: + search_url += self.tag_url.format(search_url_tags) + search_url_html += self.tag_url.format(search_url_tags) - search_request = self.search(search_url) + search_request = self.search(search_url) - if search_request.status_code != 200: - print("Search {} request returned: {}".format(search_url, search_request.status_code)) - continue + if search_request.status_code != 200: + print("Search {} request returned: {}".format(search_url, search_request.status_code)) + continue - posts = search_request.json() - random.shuffle(posts) + posts = search_request.json() + random.shuffle(posts) - selected = select_from_response(posts, profile) + selected = select_from_response(posts, profile) - if selected is None: - print("Could not select image based on criteria", search_url) - time.sleep(2) - search_request = self.search(search_url_html) + if selected is None: + print("Could not select image based on criteria", search_url) + time.sleep(2) + search_request = self.search(search_url_html) - if search_request.status_code == 200: - new_max_depth = self.get_max_page(search_request.text) - if new_max_depth < max_depth: - max_depth = new_max_depth - else: - max_depth = max_depth // 2 - else: - max_depth = max_depth // 2 - continue + if search_request.status_code == 200: + new_max_depth = self.get_max_page(search_request.text) + if new_max_depth < max_depth: + max_depth = new_max_depth + else: + max_depth = max_depth // 2 + else: + max_depth = max_depth // 2 + time.sleep(2) + continue - break + break - if not selected: - return None + if not selected: + print("Error searching:", search_url_tags) + return None - tag_response = collect_tags(selected) - nsfw = get_nsfw(selected) - file_url = selected["file_url"] + tag_response = collect_tags(selected) + nsfw = get_nsfw(selected) + file_url = selected["file_url"] - basename = "{}.{}".format(selected["md5"], file_url.rsplit(".", 1)[1]) - full_path = os.path.join(self.tmp, basename) + basename = "{}.{}".format(selected["md5"], file_url.rsplit(".", 1)[1]) + full_path = os.path.join(self.tmp, basename) - r = { - # Add profile to dictioanry - "profile": profile, + r = { + # Add profile to dictioanry + "profile": profile, - # Query results - "search_url": search_url, - "file_url": file_url, - "full_path": full_path, - "tag_response": tag_response, - "nsfw": nsfw - } + # Query results + "search_url": search_url, + "file_url": file_url, + "full_path": full_path, + "tag_response": tag_response, + "nsfw": nsfw + } - return r + return r