Compare commits

...

6 Commits

Author SHA1 Message Date
dc59b928a0 Minor update 2022-10-09 22:29:45 -07:00
70a6dbde22 Added konachan backend 2022-10-09 22:15:52 -07:00
8df71daa74 Added gelbooru backend 2022-10-09 22:15:35 -07:00
8ce1c4bbaa Refactored the way modules are dynamically imported 2022-10-09 22:15:09 -07:00
2dd89995e1 Removed hardcoded url 2022-10-09 22:14:27 -07:00
d2378a1e5b Updated to include additional backends 2022-10-09 22:12:30 -07:00
6 changed files with 400 additions and 12 deletions

View File

@ -70,12 +70,12 @@ settings_encrypt = {
"keyfile": None
}
```
If you used encryption, everything in the `settings_server` dictionary will be encrypted and you will have to enter your password every time you start the bot. Copy and paste these values from the terminal into your `src/cfg.py` file. **Make sure you paste over or delete the placeholder values with the same names.**
If you used encryption, everything in the `settings_server` dictionary will be encrypted. If you used encryption and did not specify a keyfile, you will have to enter your password every time you start the bot. Copy and paste these values from the terminal into your `src/cfg.py` file. **Make sure you paste over or delete the placeholder values with the same names.**
## Danbooru credentials
If you have an account with Danbooru, you can enter it below the `danbooru_backend` dictionary. It is not necessary to have an account unless you want to post images that are censored by default.
## Posting
To begin posting:

View File

@ -90,11 +90,32 @@ banned_tags = (
settings_backend = {
"danbooru_backend": {
"module": "danbooru_backend",
"username": None,
"password": None,
"tmp_dir": settings_behavior["tmp_dir"],
"max_size": settings_behavior["max_size"]
}
"max_size": settings_behavior["max_size"],
"url": "https://danbooru.donmai.us"
},
# The below backends are still being tested
"konachan_backend": {
"module": "konachan_backend",
"username": None,
"password": None,
"tmp_dir": settings_behavior["tmp_dir"],
"max_size": settings_behavior["max_size"],
"max_depth": 91,
"url": "https://konachan.com"
},
"gelbooru_backend": {
"module": "gelbooru_backend",
"username": None,
"password": None,
"tmp_dir": settings_behavior["tmp_dir"],
"max_size": settings_behavior["max_size"],
"max_depth": 200,
"url": "https://gelbooru.com"
},
}

View File

@ -76,14 +76,16 @@ class downloader:
password = None
max_size = None
tmp = None
# Limit for posts.json is 200
limit=100
url = ""
api_endpoint = "posts.json?random=true&limit=100"
api_tags = "&tags={}"
def __init__(self, backend_credentials):
self.username = backend_credentials["username"]
self.password = backend_credentials["password"]
self.max_size = backend_credentials["max_size"]
self.tmp = backend_credentials["tmp_dir"]
self.url = backend_credentials["url"]
def download_post(self, post):
@ -105,11 +107,11 @@ class downloader:
def fetch_post(self, profile):
# Search ratings: s=safe, e=nsfw
# base_url = "https://danbooru.donmai.us/posts.json?random=true&tags={}&rating=e&limit=1"
search_url = "/".join((self.url, self.api_endpoint))
tags = profile["tags"]
search_url = "https://danbooru.donmai.us/posts.json?random=true&limit={}".format(self.limit)
if tags and not random_tag(*tags):
search_tags = "+".join(tags)
search_url = "{}&tags={}".format(search_url, search_tags)
search_url += self.api_tags.format(search_tags)
search_request = None
if self.username and self.password:
@ -118,12 +120,12 @@ class downloader:
)
else:
search_request = requests.get(search_url)
if search_request.status_code != 200:
print(search_url)
print("Search request returned:", search_request.status_code)
return None
selected = select_from_response(search_request.json(), profile, self.max_size)
if selected is None:

190
src/gelbooru_backend.py Normal file
View File

@ -0,0 +1,190 @@
#! /usr/bin/env python3
# Danbooru Bot, an image posting bot for Pleroma
# Copyright (C) 2022 Anon <yanderefedi@proton.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# TODO:
# Update to enable multiple file downloads
import requests
import os
import random
import time
def random_tag(*tags):
return len(tags) == 1 and tags[0].lower() == "random"
def collect_tags(post):
tag_response = []
for tag_type in "tag_string", "tag_string_general":
if tag_type in post:
tag_response.append(post[tag_type].strip())
return " ".join(tag_response)
def is_banned(post, profile):
tag_response = collect_tags(post)
tag_banned = profile["banned_tags"]
for tag in tag_banned:
if tag in tag_response:
return tag
return None
def get_nsfw(post):
return post["rating"] in ("questionable", "explicit")
def select_from_response(response, profile, max_size=None):
for post in response:
if is_banned(post, profile):
continue
elif "file_url" not in post:
continue
# Select only nsfw
elif ( profile["force_nsfw"] is not None and
profile["force_nsfw"] != get_nsfw(post)
):
continue
# Make sure serverside size is not larger than max_size
elif ( max_size != None and
"file_size" in post and
post["file_size"] > max_size
):
continue
return post
return None
class downloader:
username = None
password = None
max_size = None
tmp = None
url = ""
api_endpoint = "index.php?page=dapi&s=post&q=index&json=1"
api_tags = "&tags={}"
api_limit = "&limit={}"
api_offset = "&pid={}"
limit = 100
max_depth = 200
def __init__(self, backend_credentials):
self.username = backend_credentials["username"]
self.password = backend_credentials["password"]
self.max_size = backend_credentials["max_size"]
self.tmp = backend_credentials["tmp_dir"]
self.url = backend_credentials["url"]
self.max_depth = backend_credentials["max_depth"]
random.seed(os.urandom(16))
def download_post(self, post):
file_url = post["file_url"]
full_path = post["full_path"]
remote_image = requests.get(file_url)
if remote_image.status_code != 200:
print("Remote image request returned:", remote_image.status_code)
return None
with open(full_path, "wb") as f:
f.write(remote_image.content)
return post
def get_full_url(self, limit=100, offset=0, *tags):
search_url = "/".join((self.url, self.api_endpoint))
search_url += self.api_limit.format(str(limit))
search_url += self.api_offset.format(str(offset))
if tags and not random_tag(*tags):
search_tags = "+".join(tags)
search_url += self.api_tags.format(search_tags)
return search_url
def search(self, search_url):
search_request = None
if self.username and self.password:
search_request = requests.get(search_url,
auth=(self.username, self.password)
)
else:
search_request = requests.get(search_url)
return search_request
def fetch_post(self, profile):
# Search ratings: s=safe, e=nsfw
# base_url = "https://danbooru.donmai.us/posts.json?random=true&tags={}&rating=e&limit=1"
tags = profile["tags"]
# Query to get number of pages for the tags
search_request = self.search(
self.get_full_url(1,0,*tags)
)
if search_request.status_code != 200:
print("Unable to determine number of tag indexes:", search_request.status_code)
return None
# Wait a second before querying again for the final picture
time.sleep(1)
total_posts = int(search_request.json()["@attributes"]["count"]) - self.limit - 1
index_count = total_posts // self.limit
page_offset = random.randint(0, max(0, min(index_count, self.max_depth)))
search_url = self.get_full_url(self.limit, page_offset, *tags)
search_request = self.search(search_url)
if search_request.status_code != 200:
print("Search request returned:", search_request.status_code)
return None
posts = search_request.json()["post"]
random.shuffle(posts)
selected = select_from_response(posts, profile, self.max_size)
if selected is None:
print("Could not select image based on criteria")
return None
tag_response = collect_tags(selected)
nsfw = get_nsfw(selected)
file_url = selected["file_url"]
basename = file_url.rsplit("/", 1)[1]
full_path = os.path.join(self.tmp, basename)
r = {
# Add profile to dictioanry
"profile": profile,
# Query results
"search_url": search_url,
"file_url": file_url,
"full_path": full_path,
"tag_response": " ".join(tag_response),
"nsfw": nsfw
}
return r

175
src/konachan_backend.py Normal file
View File

@ -0,0 +1,175 @@
#! /usr/bin/env python3
# Danbooru Bot, an image posting bot for Pleroma
# Copyright (C) 2022 Anon <yanderefedi@proton.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# TODO:
# Update to enable multiple file downloads
import requests
import os
import random
import time
def random_tag(*tags):
return len(tags) == 1 and tags[0].lower() == "random"
def collect_tags(post):
tag_response = []
for tag_type in "tag_string", "tag_string_general":
if tag_type in post:
tag_response.append(post[tag_type].strip())
return " ".join(tag_response)
def is_banned(post, profile):
tag_response = collect_tags(post)
tag_banned = profile["banned_tags"]
for tag in tag_banned:
if tag in tag_response:
return tag
return None
def get_nsfw(post):
return post["rating"] in ("q", "e")
def select_from_response(response, profile, max_size=None):
for post in response:
if is_banned(post, profile):
continue
elif "file_url" not in post:
continue
# Select only nsfw
elif ( profile["force_nsfw"] is not None and
profile["force_nsfw"] != get_nsfw(post)
):
continue
# Make sure serverside size is not larger than max_size
elif ( max_size != None and
"file_size" in post and
post["file_size"] > max_size
):
continue
return post
return None
class downloader:
username = None
password = None
max_size = None
tmp = None
url = ""
api_endpoint = "post.json?random=true&limit=100"
api_tags = "&tags={}"
api_limit = "&limit={}"
api_offset = "&page={}"
limit = 100
max_depth = 200
def __init__(self, backend_credentials):
self.username = backend_credentials["username"]
self.password = backend_credentials["password"]
self.max_size = backend_credentials["max_size"]
self.tmp = backend_credentials["tmp_dir"]
self.url = backend_credentials["url"]
self.max_depth = backend_credentials["max_depth"]
random.seed(os.urandom(16))
def download_post(self, post):
file_url = post["file_url"]
full_path = post["full_path"]
remote_image = requests.get(file_url)
if remote_image.status_code != 200:
print("Remote image request returned:", remote_image.status_code)
return None
with open(full_path, "wb") as f:
f.write(remote_image.content)
return post
def get_full_url(self, limit=100, offset=0, *tags):
search_url = "/".join((self.url, self.api_endpoint))
search_url += self.api_limit.format(str(limit))
search_url += self.api_offset.format(str(offset))
if tags and not random_tag(*tags):
search_tags = "+".join(tags)
search_url += self.api_tags.format(search_tags)
return search_url
def search(self, search_url):
search_request = None
print(search_url)
if self.username and self.password:
search_request = requests.get(search_url,
auth=(self.username, self.password)
)
else:
search_request = requests.get(search_url)
return search_request
def fetch_post(self, profile):
# Search ratings: s=safe, e=nsfw
tags = profile["tags"]
page_offset = random.randint(0, self.max_depth)
search_url = self.get_full_url(self.limit, page_offset, *tags)
search_request = self.search(search_url)
if search_request.status_code != 200:
print("Search request returned:", search_request.status_code)
return None
posts = search_request.json()
random.shuffle(posts)
selected = select_from_response(posts, profile, self.max_size)
if selected is None:
print("Could not select image based on criteria")
return None
tag_response = collect_tags(selected)
nsfw = get_nsfw(selected)
file_url = selected["file_url"]
basename = "{}.{}".format(selected["md5"], file_url.rsplit(".", 1)[1])
full_path = os.path.join(self.tmp, basename)
r = {
# Add profile to dictioanry
"profile": profile,
# Query results
"search_url": search_url,
"file_url": file_url,
"full_path": full_path,
"tag_response": " ".join(tag_response),
"nsfw": nsfw
}
return r

View File

@ -135,8 +135,8 @@ class YandereBot:
def download_media(self, picked_profile):
try:
backend_s = picked_profile["backend"]
backend = importlib.import_module(backend_s)
backend_credentials = self.settings_backend[backend_s]
backend = importlib.import_module(backend_credentials["module"])
img = None
downloader = backend.downloader(backend_credentials)
@ -292,7 +292,7 @@ class YandereBot:
# 2. The server is down. Check to verify in a web browser (this is the default assumption since the
# mastodon.py API will not specify why the connection timed out).
# The default assumption is #2
except (FileNotFoundError, MastodonAPIError, Exception) as e:
except (FileNotFoundError, MastodonAPIError) as e:
print("Exception:", e)
# An exception occurred