Refactored to include a CloudflareAPI class

This commit is contained in:
Anon 2022-10-21 14:16:02 -07:00
parent f1fb4624b4
commit 2ecff9e0d5

View File

@ -24,89 +24,103 @@ import argparse
import configparser import configparser
import contextlib import contextlib
import logging import logging
from collections import OrderedDict
class CloudflairAPI:
domain_name = None
cf_api_key = None
cf_zone_id = None
cf_record_id = None
def __init__(self, domain_name, cf_api_key, cf_zone_id, cf_record_id):
self.domain_name = domain_name
self.cf_api_key = cf_api_key
self.cf_zone_id = cf_zone_id
self.cf_record_id = cf_record_id
def get_cf_dns_domain(cf_zone_id): def get_missing_config_var(self):
return "https://api.cloudflare.com/client/v4/zones/{}/dns_records".format( keys = ("domain_name", "cf_api_key", "cf_zone_id", "cf_record_id")
cf_zone_id missing_parameter = next(
) (key for key in keys if getattr(self, key) is None),
None
)
return missing_parameter
def get_cf_dns_domain_record_id(cf_zone_id, cf_record_id): def get_cf_dns_domain(self):
return "/".join([ return "https://api.cloudflare.com/client/v4/zones/{}/dns_records".format(
get_cf_dns_domain(cf_zone_id), self.cf_zone_id
cf_record_id )
])
# !! Do not use this !! The global Cloudflair API is dangerous. Keeping this here for posterity and info def get_cf_dns_domain_record_id(self):
# def get_headers_global_api(cf_api_key, cf_email): return "{}/{}".format(
# return { self.get_cf_dns_domain(),
# "X-Auth-Key": cf_api_key, self.cf_record_id
# "X-Auth-Email": cf_email )
# }
def get_headers(cf_api_key): # !! Do not use this !! The global Cloudflair API is dangerous. Keeping this here for posterity and info
return { # def get_headers_global_api(cf_api_key, cf_email):
"Authorization": "Bearer {}".format(cf_api_key) # return {
} # "X-Auth-Key": cf_api_key,
# "X-Auth-Email": cf_email
# }
def get_record_id(cf_zone_id, cf_api_key): def get_headers(self):
resp = requests.get( return {
url=get_cf_dns_domain(cf_zone_id), "Authorization": "Bearer {}".format(self.cf_api_key)
headers=get_headers(cf_api_key) }
)
failed = resp.status_code != 200
if not failed:
print(json.dumps(resp.json(), indent=4, sort_keys=True))
return failed
# A hack to prevent pycharm from bitching def get_record_id(self):
def dictionary_split(s, delim='='): resp = requests.get(
k, v = s.split(delim, 1) url=self.get_cf_dns_domain(),
return k, v headers=self.get_headers()
)
failed = resp.status_code != 200
if not failed:
print(json.dumps(resp.json(), indent=4, sort_keys=True))
return failed
def get_old_ip(self):
resp = requests.get(
url=self.get_cf_dns_domain_record_id(),
headers=self.get_headers()
)
if resp.status_code == 200:
return resp.json()["result"]["content"]
return None
def put_new_ip(self, ip):
json_payload = {
"type": 'A',
"name": self.domain_name,
"content": ip,
"proxied": True
}
resp = requests.put(
url=self.get_cf_dns_domain_record_id(),
json=json_payload,
headers=self.get_headers()
)
success = resp.status_code == 200
return success
# Get the current ip address of the user # Get the current ip address of the user
def get_new_ip(): def get_new_ip():
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
a = requests.get("https://1.1.1.1/cdn-cgi/trace").text.split("\n") a = requests.get("https://1.1.1.1/cdn-cgi/trace").text.split("\n")
ip = dict(dictionary_split(item) for item in a if item != "")["ip"] ip = dict(item.split('=', 1) for item in filter(None, a))["ip"]
return ip return ip
return None return None
def get_old_ip(cf_zone_id, cf_record_id, cf_api_key):
resp = requests.get(
url=get_cf_dns_domain_record_id(cf_zone_id, cf_record_id),
headers=get_headers(cf_api_key)
)
if resp.status_code == 200:
return resp.json()["result"]["content"]
return None
def put_new_ip(domain_name, ip, cf_zone_id, cf_record_id, cf_api_key):
json_payload = {
"type": 'A',
"name": domain_name,
"content": ip,
"proxied": True
}
resp = requests.put(
url=get_cf_dns_domain_record_id(cf_zone_id, cf_record_id),
json=json_payload,
headers=get_headers(cf_api_key)
)
success = resp.status_code == 200
return success
def config_parser_has_option(config, profile, option): def config_parser_has_option(config, profile, option):
# 'DEFAULT' is a special value in ConfigParser # 'DEFAULT' is a special value in ConfigParser
config_name = "" if profile == "DEFAULT" else profile config_name = "" if profile == "DEFAULT" else profile
@ -162,36 +176,28 @@ def main():
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read(arguments.config) config.read(arguments.config)
options = OrderedDict([ cloudflair_api = CloudflairAPI(
("domain_name", config_parser_has_option(config, arguments.profile, "domain_name")), domain_name = config_parser_has_option(config, arguments.profile, "domain_name"),
("cf_api_key", config_parser_has_option(config, arguments.profile, "cf_api_key")), cf_api_key = config_parser_has_option(config, arguments.profile, "cf_api_key"),
("cf_zone_id", config_parser_has_option(config, arguments.profile, "cf_zone_id")), cf_zone_id = config_parser_has_option(config, arguments.profile, "cf_zone_id"),
("cf_record_id", config_parser_has_option(config, arguments.profile, "cf_record_id")) cf_record_id = config_parser_has_option(config, arguments.profile, "cf_record_id")
]) )
# Validate configuration file missing_parameter = cloudflair_api.get_missing_config_var()
missing_parameter = next((i for i in options if options[i] is None and i != "cf_record_id"), None)
if missing_parameter is not None: # If cf_zone_id is None, print out all record ids. The user will have to update this setting in their config file
logging.error("Configuration: {} | Profile: {} | Option: {}".format( # This will have to be done once
if missing_parameter == "cf_record_id" or arguments.get_record_ids:
return int(cloudflair_api.get_record_id())
elif missing_parameter is not None:
logging.error("Missing Configuration - Configuration: {} | Profile: {} | Option: {}".format(
arguments.config, arguments.profile, missing_parameter arguments.config, arguments.profile, missing_parameter
)) ))
return 1 return 1
# If cf_zone_id is None, print out all record ids. The user will have to update this setting in their config file
# This will have to be done at least once
if arguments.get_record_ids or options["cf_record_id"] is None:
return int(get_record_id(options["cf_zone_id"], options["cf_api_key"]))
# Get the current dns record from cloudflair (old) # Get the current dns record from cloudflair (old)
old_ip = get_old_ip(options["cf_zone_id"], options["cf_record_id"], options["cf_api_key"]) old_ip = cloudflair_api.get_old_ip()
new_ip = arguments.set or get_new_ip()
# Get the current ip address of the server (new)
new_ip = None
if arguments.set:
new_ip = arguments.set
else:
new_ip = get_new_ip()
if new_ip is None: if new_ip is None:
logging.error("Could not determine new IP") logging.error("Could not determine new IP")
@ -207,10 +213,10 @@ def main():
elif old_ip == new_ip: elif old_ip == new_ip:
logging.info("New IP matches old IP: {}".format(new_ip)) logging.info("New IP matches old IP: {}".format(new_ip))
else: else:
if put_new_ip(options["domain_name"], new_ip, options["cf_zone_id"], options["cf_record_id"], options["cf_api_key"]): if cloudflair_api.put_new_ip(new_ip):
logging.info("IP updated to: {} from: {}".format(old_ip, new_ip)) logging.info("IP updated to: {} from: {}".format(new_ip, old_ip))
else: else:
logging.error("Failed to update to: {} from: {}".format(old_ip, new_ip)) logging.error("Failed to update to: {} from: {}".format(new_ip, old_ip))
return 1 return 1
return 0 return 0