245 lines
6.9 KiB
Python
Executable File
245 lines
6.9 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
|
|
# Yandere Lewd Bot, an image posting bot for Pleroma
|
|
# Copyright (C) 2022 Anon <yanderefedi@proton.me>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import sys
|
|
import base64
|
|
import os
|
|
import getpass
|
|
from cryptography.fernet import Fernet, InvalidToken
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
|
|
|
from collections import OrderedDict
|
|
|
|
|
|
class PasswordMismatch(Exception):
|
|
pass
|
|
|
|
|
|
# Salts
|
|
def generate_salt():
|
|
return os.urandom(16)
|
|
|
|
|
|
# Return string from bytes
|
|
def salt_encode(b):
|
|
return base64.urlsafe_b64encode(b).decode()
|
|
|
|
|
|
# Return bytes from string
|
|
def salt_decode(s):
|
|
return base64.urlsafe_b64decode(s.encode())
|
|
|
|
|
|
# Ordered Dictionaries
|
|
def change_encoding_dict(settings_server, encoding_type):
|
|
return OrderedDict([(k, encoding_type(v)) for k, v in settings_server.items()])
|
|
|
|
|
|
# Return bytes from string
|
|
def encode_dict(settings_server):
|
|
return change_encoding_dict(settings_server, str.encode)
|
|
|
|
|
|
# Return string from bytes
|
|
def decode_dict(settings_server):
|
|
return change_encoding_dict(settings_server, bytes.decode)
|
|
|
|
|
|
# password: Bytes
|
|
# salt: Bytes
|
|
def derive_key(password, salt):
|
|
kdf = PBKDF2HMAC(
|
|
algorithm=hashes.SHA256(),
|
|
length=32,
|
|
salt=salt,
|
|
iterations=100000,
|
|
backend=default_backend()
|
|
)
|
|
r_key = base64.urlsafe_b64encode(kdf.derive(password))
|
|
return r_key
|
|
|
|
|
|
# Encryption functions
|
|
# message: Bytes
|
|
# key: Bytes
|
|
def encrypt(message, key):
|
|
f = Fernet(key)
|
|
token = f.encrypt(message)
|
|
return token
|
|
|
|
|
|
# token: Bytes
|
|
# key: Bytes
|
|
def decrypt(token, key):
|
|
f = Fernet(key)
|
|
message = f.decrypt(token)
|
|
return message
|
|
|
|
|
|
# password: bytes()
|
|
# salt: bytes()
|
|
# settings_server: dict() -> Byte values
|
|
# encryption_function: encrypt(message, key) : decrypt(token, key):
|
|
# Returns settings_server_decrypted dictionary with Byte() values. Will need to use
|
|
# ChangeEncodingDict to make them strings (recommended cfg file friendly)
|
|
def __settings_server(password, salt, settings_server, encryption_function):
|
|
key = derive_key(password, salt)
|
|
settings_server_decrypted = OrderedDict()
|
|
for setting in settings_server:
|
|
settings_server_decrypted[setting] = encryption_function(settings_server[setting], key)
|
|
return settings_server_decrypted
|
|
|
|
|
|
# Returns (salt, settings_server)
|
|
def _settings_server_encrypt(settings_server):
|
|
salt = generate_salt()
|
|
password = getpass.getpass("Enter password: ")
|
|
password2 = getpass.getpass("Retype Password: ")
|
|
|
|
if password != password2:
|
|
raise PasswordMismatch
|
|
|
|
settings_server_encrypted = __settings_server(password.encode(), salt, encode_dict(settings_server), encrypt)
|
|
|
|
return salt, settings_server_encrypted
|
|
|
|
|
|
# Returns (settings_server)
|
|
def _settings_server_decrypt(settings_server, settings_encrypt):
|
|
settings_server_encoded = encode_dict(settings_server)
|
|
if settings_encrypt["encrypt"]:
|
|
salt = salt_decode(settings_encrypt["salt"])
|
|
password = getpass.getpass("Enter password: ")
|
|
return __settings_server(password.encode(), salt, settings_server_encoded, decrypt)
|
|
else:
|
|
return settings_server_encoded
|
|
|
|
|
|
# Wrapper function that will catch exceptions and exit
|
|
def settings_server_new(function, **kwargs):
|
|
try:
|
|
return function(**kwargs)
|
|
|
|
# If the user cancels the login
|
|
except KeyboardInterrupt:
|
|
print("\nQuitting...")
|
|
|
|
# If the user passwords do not match (encrypt)
|
|
except PasswordMismatch:
|
|
print("Passwords do not match...")
|
|
|
|
# Incorrect password entered (decrypt)
|
|
except InvalidToken:
|
|
print("Password or Token Incorrect...")
|
|
|
|
# Probably the salt value got modified
|
|
except base64.binascii.Error:
|
|
print("Salt is invalid...")
|
|
|
|
# Some other kind of fuck up
|
|
except Exception as e:
|
|
print("Unknown exception occurred...")
|
|
print(e)
|
|
|
|
# Exit if an exception was thrown
|
|
sys.exit(1)
|
|
|
|
|
|
# Glue functions that package **kwargs automatically
|
|
def settings_server_encrypt(settings_server):
|
|
kwargs = {"settings_server": settings_server}
|
|
return settings_server_new(_settings_server_encrypt, **kwargs)
|
|
|
|
|
|
def settings_server_decrypt(settings_server, settings_encrypt):
|
|
kwargs = {
|
|
"settings_server": settings_server,
|
|
"settings_encrypt": settings_encrypt
|
|
}
|
|
return settings_server_new(_settings_server_decrypt, **kwargs)
|
|
|
|
|
|
# The _cfg functions should return a regular string
|
|
# These are the functions that should interface with the bot a return a plain string
|
|
# settings_server ordered dictionary
|
|
def settings_server_encrypt_cfg(settings_server):
|
|
salt, settings_server = settings_server_encrypt(settings_server)
|
|
return salt_encode(salt), decode_dict(settings_server)
|
|
|
|
|
|
def settings_server_decrypt_cfg(settings_server, settings_encrypt):
|
|
settings_server = settings_server_decrypt(settings_server, settings_encrypt)
|
|
return decode_dict(settings_server)
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
from pprint import pformat
|
|
|
|
default_cfg = "cfg"
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="A class to encrypt server credentials",
|
|
epilog="There are no additional parameters.",
|
|
add_help=True )
|
|
parser.add_argument("--encrypt", help="Generate encrypted authentication.", action="store_true")
|
|
parser.add_argument("--decrypt", help="Decrypt encrypted authentication", action="store_true")
|
|
parser.add_argument("--recrypt", help="Recrypt encrypted authentication", action="store_true")
|
|
parser.add_argument("-c", "--cfg", help="Specify config file.", default=default_cfg)
|
|
|
|
arguments = parser.parse_args()
|
|
|
|
if arguments.recrypt:
|
|
arguments.encrypt = True
|
|
arguments.decrypt = True
|
|
|
|
if arguments.encrypt or arguments.decrypt:
|
|
import importlib
|
|
cfg = importlib.import_module(arguments.cfg)
|
|
settings_server = cfg.settings_server
|
|
settings_encrypt = None
|
|
|
|
if arguments.decrypt and arguments.encrypt:
|
|
print("Re-encrypting")
|
|
|
|
if arguments.decrypt: # arguments.decrypt
|
|
print("Decrypt...")
|
|
settings_server = settings_server_decrypt_cfg(cfg.settings_server, cfg.settings_encrypt)
|
|
settings_encrypt = OrderedDict([
|
|
("encrypt", False),
|
|
("salt", cfg.settings_encrypt["encrypt"])
|
|
])
|
|
|
|
if arguments.encrypt:
|
|
print("Encrypt...")
|
|
salt, settings_server = settings_server_encrypt_cfg(settings_server)
|
|
settings_encrypt = OrderedDict([
|
|
("encrypt", True),
|
|
("salt", salt)
|
|
])
|
|
|
|
print("settings_server = {}".format(pformat(settings_server)))
|
|
print("settings_encrypt = {}".format(pformat(settings_encrypt)))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|