diff --git a/src/konachan_backend.py b/src/konachan_backend.py new file mode 100644 index 0000000..7f208da --- /dev/null +++ b/src/konachan_backend.py @@ -0,0 +1,175 @@ +#! /usr/bin/env python3 + +# Danbooru Bot, an image posting bot for Pleroma +# Copyright (C) 2022 Anon +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +# TODO: +# Update to enable multiple file downloads + +import requests +import os +import random +import time + +def random_tag(*tags): + return len(tags) == 1 and tags[0].lower() == "random" + + +def collect_tags(post): + tag_response = [] + for tag_type in "tag_string", "tag_string_general": + if tag_type in post: + tag_response.append(post[tag_type].strip()) + return " ".join(tag_response) + + + +def is_banned(post, profile): + tag_response = collect_tags(post) + tag_banned = profile["banned_tags"] + for tag in tag_banned: + if tag in tag_response: + return tag + return None + + + +def get_nsfw(post): + return post["rating"] in ("q", "e") + + +def select_from_response(response, profile, max_size=None): + for post in response: + if is_banned(post, profile): + continue + elif "file_url" not in post: + continue + # Select only nsfw + elif ( profile["force_nsfw"] is not None and + profile["force_nsfw"] != get_nsfw(post) + ): + continue + # Make sure serverside size is not larger than max_size + elif ( max_size != None and + "file_size" in post and + post["file_size"] > max_size + ): + continue + return post + return None + + +class downloader: + username = None + password = None + max_size = None + tmp = None + url = "" + api_endpoint = "post.json?random=true&limit=100" + api_tags = "&tags={}" + api_limit = "&limit={}" + api_offset = "&page={}" + limit = 100 + max_depth = 200 + + def __init__(self, backend_credentials): + self.username = backend_credentials["username"] + self.password = backend_credentials["password"] + self.max_size = backend_credentials["max_size"] + self.tmp = backend_credentials["tmp_dir"] + self.url = backend_credentials["url"] + self.max_depth = backend_credentials["max_depth"] + random.seed(os.urandom(16)) + + + def download_post(self, post): + file_url = post["file_url"] + full_path = post["full_path"] + + remote_image = requests.get(file_url) + + if remote_image.status_code != 200: + print("Remote image request returned:", remote_image.status_code) + return None + + with open(full_path, "wb") as f: + f.write(remote_image.content) + + return post + + def get_full_url(self, limit=100, offset=0, *tags): + search_url = "/".join((self.url, self.api_endpoint)) + search_url += self.api_limit.format(str(limit)) + search_url += self.api_offset.format(str(offset)) + if tags and not random_tag(*tags): + search_tags = "+".join(tags) + search_url += self.api_tags.format(search_tags) + return search_url + + + def search(self, search_url): + search_request = None + print(search_url) + if self.username and self.password: + search_request = requests.get(search_url, + auth=(self.username, self.password) + ) + else: + search_request = requests.get(search_url) + return search_request + + + def fetch_post(self, profile): + # Search ratings: s=safe, e=nsfw + tags = profile["tags"] + + page_offset = random.randint(0, self.max_depth) + search_url = self.get_full_url(self.limit, page_offset, *tags) + search_request = self.search(search_url) + + if search_request.status_code != 200: + print("Search request returned:", search_request.status_code) + return None + + posts = search_request.json() + random.shuffle(posts) + + selected = select_from_response(posts, profile, self.max_size) + + if selected is None: + print("Could not select image based on criteria") + return None + + tag_response = collect_tags(selected) + nsfw = get_nsfw(selected) + file_url = selected["file_url"] + + basename = "{}.{}".format(selected["md5"], file_url.rsplit(".", 1)[1]) + full_path = os.path.join(self.tmp, basename) + + r = { + # Add profile to dictioanry + "profile": profile, + + # Query results + "search_url": search_url, + "file_url": file_url, + "full_path": full_path, + "tag_response": " ".join(tag_response), + "nsfw": nsfw + } + + return r