155 lines
4.2 KiB
Python
155 lines
4.2 KiB
Python
#! /usr/bin/env python3
|
|
|
|
# Danbooru Bot, an image posting bot for Pleroma
|
|
# Copyright (C) 2022 Anon <@Anon@yandere.cc>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
# TODO:
|
|
# Update to enable multiple file downloads
|
|
|
|
import requests
|
|
import os
|
|
import string
|
|
import os
|
|
import random
|
|
|
|
|
|
def random_tag(*tags):
|
|
return len(tags) == 1 and tags[0].lower() == "random"
|
|
|
|
|
|
def collect_tags(post):
|
|
possible_tag_keys = ("tag_string", "tag_string_general")
|
|
tag_response = [post[key].strip() for key in possible_tag_keys if key in post]
|
|
return " ".join(tag_response).lower().split()
|
|
|
|
|
|
|
|
def is_banned(post, profile):
|
|
tag_response = collect_tags(post)
|
|
tag_banned = profile["banned_tags"]
|
|
for tag in tag_banned:
|
|
if tag in tag_response:
|
|
return tag
|
|
return None
|
|
|
|
|
|
def get_nsfw(post):
|
|
return post["rating"] in ("q", "e")
|
|
|
|
|
|
def select_from_response(response, profile, max_size=None):
|
|
for post in response:
|
|
if is_banned(post, profile):
|
|
continue
|
|
elif "file_url" not in post:
|
|
continue
|
|
# Select only nsfw
|
|
elif ( profile["force_nsfw"] is not None and
|
|
profile["force_nsfw"] != get_nsfw(post)
|
|
):
|
|
continue
|
|
# Make sure serverside size is not larger than max_size
|
|
elif ( max_size != None and
|
|
"file_size" in post and
|
|
post["file_size"] > max_size
|
|
):
|
|
continue
|
|
return post
|
|
return None
|
|
|
|
def random_alpha_numeric_string(n=16):
|
|
random.seed(os.urandom(16))
|
|
return ''.join(random.choices(string.ascii_uppercase + string.ascii_lowercase + string.digits, k=n))
|
|
|
|
|
|
class downloader:
|
|
def __init__(self, backend_credentials):
|
|
self.username = backend_credentials["username"]
|
|
self.password = backend_credentials["password"]
|
|
self.max_size = backend_credentials["max_size"]
|
|
self.tmp = backend_credentials["tmp_dir"]
|
|
self.url = backend_credentials["url"]
|
|
self.api_endpoint = "posts.json?random=true&limit=100"
|
|
self.api_tags = "&tags={}"
|
|
# Generate random user-agent string since danbooru disabled api access to browser agent-user strings
|
|
self.header = {"user-agent": random_alpha_numeric_string(16)}
|
|
|
|
|
|
def download_post(self, post):
|
|
file_url = post["file_url"]
|
|
full_path = post["full_path"]
|
|
|
|
remote_image = requests.get(file_url)
|
|
|
|
if remote_image.status_code != 200:
|
|
print("Remote image request returned:", remote_image.status_code)
|
|
return None
|
|
|
|
with open(full_path, "wb") as f:
|
|
f.write(remote_image.content)
|
|
|
|
return post
|
|
|
|
|
|
def fetch_post(self, profile):
|
|
# Search ratings: s=safe, e=nsfw
|
|
# base_url = "https://danbooru.donmai.us/posts.json?random=true&tags={}&rating=e&limit=1"
|
|
search_url = "/".join((self.url, self.api_endpoint))
|
|
tags = profile["tags"]
|
|
if tags and not random_tag(*tags):
|
|
search_tags = "+".join(tags)
|
|
search_url += self.api_tags.format(search_tags)
|
|
|
|
search_request = None
|
|
if self.username and self.password:
|
|
search_request = requests.get(search_url,
|
|
auth=(self.username, self.password),
|
|
headers=self.header
|
|
)
|
|
else:
|
|
search_request = requests.get(search_url, headers=self.header)
|
|
|
|
if search_request.status_code != 200:
|
|
print("Search request returned:", search_request.status_code)
|
|
return None
|
|
|
|
selected = select_from_response(search_request.json(), profile, self.max_size)
|
|
|
|
if selected is None:
|
|
print("Could not select image based on criteria")
|
|
return None
|
|
|
|
tag_response = collect_tags(selected)
|
|
nsfw = get_nsfw(selected)
|
|
file_url = selected["file_url"]
|
|
|
|
basename = file_url.rsplit("/", 1)[1]
|
|
full_path = os.path.join(self.tmp, basename)
|
|
|
|
r = {
|
|
# Add profile to dictioanry
|
|
"profile": profile,
|
|
|
|
# Query results
|
|
"search_url": search_url,
|
|
"file_url": file_url,
|
|
"full_path": full_path,
|
|
"tag_response": tag_response,
|
|
"nsfw": nsfw
|
|
}
|
|
|
|
return r
|