Fixed whitespace

This commit is contained in:
Anon 2022-07-27 20:51:29 -07:00
parent 9a2bf20eef
commit 5a4e251602

View File

@ -10,12 +10,13 @@
# #
# This program is distributed in the hope that it will be useful, # This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of # but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details. # GNU General Public License for more details.
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import sys import sys
import requests import requests
import json import json
@ -27,189 +28,189 @@ from collections import OrderedDict
def get_cf_dns_domain(cf_zone_id): def get_cf_dns_domain(cf_zone_id):
return "https://api.cloudflare.com/client/v4/zones/{}/dns_records".format( return "https://api.cloudflare.com/client/v4/zones/{}/dns_records".format(
cf_zone_id cf_zone_id
) )
def get_cf_dns_domain_record_id(cf_zone_id, cf_record_id): def get_cf_dns_domain_record_id(cf_zone_id, cf_record_id):
return "/".join([ return "/".join([
get_cf_dns_domain(cf_zone_id), get_cf_dns_domain(cf_zone_id),
cf_record_id cf_record_id
]) ])
# !! Do not use this !! The global Cloudflair API is dangerous. Keeping this here for posterity and info # !! Do not use this !! The global Cloudflair API is dangerous. Keeping this here for posterity and info
# def get_headers_global_api(cf_api_key, cf_email): # def get_headers_global_api(cf_api_key, cf_email):
# return { # return {
# "X-Auth-Key": cf_api_key, # "X-Auth-Key": cf_api_key,
# "X-Auth-Email": cf_email # "X-Auth-Email": cf_email
# } # }
def get_headers(cf_api_key): def get_headers(cf_api_key):
return { return {
"Authorization": "Bearer {}".format(cf_api_key) "Authorization": "Bearer {}".format(cf_api_key)
} }
def get_record_id(cf_zone_id, cf_api_key): def get_record_id(cf_zone_id, cf_api_key):
resp = requests.get( resp = requests.get(
url=get_cf_dns_domain(cf_zone_id), url=get_cf_dns_domain(cf_zone_id),
headers=get_headers(cf_api_key) headers=get_headers(cf_api_key)
) )
failed = resp.status_code != 200 failed = resp.status_code != 200
if not failed: if not failed:
print(json.dumps(resp.json(), indent=4, sort_keys=True)) print(json.dumps(resp.json(), indent=4, sort_keys=True))
return failed return failed
# A hack to prevent pycharm from bitching # A hack to prevent pycharm from bitching
def dictionary_split(s, delim='='): def dictionary_split(s, delim='='):
k, v = s.split(delim, 1) k, v = s.split(delim, 1)
return k, v return k, v
# Get the current ip address of the user # Get the current ip address of the user
def get_new_ip(): def get_new_ip():
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
a = requests.get("https://1.1.1.1/cdn-cgi/trace").text.split("\n") a = requests.get("https://1.1.1.1/cdn-cgi/trace").text.split("\n")
ip = dict(dictionary_split(item) for item in a if item != "")["ip"] ip = dict(dictionary_split(item) for item in a if item != "")["ip"]
return ip return ip
return None return None
def get_old_ip(cf_zone_id, cf_record_id, cf_api_key): def get_old_ip(cf_zone_id, cf_record_id, cf_api_key):
resp = requests.get( resp = requests.get(
url=get_cf_dns_domain_record_id(cf_zone_id, cf_record_id), url=get_cf_dns_domain_record_id(cf_zone_id, cf_record_id),
headers=get_headers(cf_api_key) headers=get_headers(cf_api_key)
) )
if resp.status_code == 200: if resp.status_code == 200:
return resp.json()["result"]["content"] return resp.json()["result"]["content"]
return None return None
def put_new_ip(domain_name, ip, cf_zone_id, cf_record_id, cf_api_key): def put_new_ip(domain_name, ip, cf_zone_id, cf_record_id, cf_api_key):
json_payload = { json_payload = {
"type": 'A', "type": 'A',
"name": domain_name, "name": domain_name,
"content": ip, "content": ip,
"proxied": True "proxied": True
} }
resp = requests.put( resp = requests.put(
url=get_cf_dns_domain_record_id(cf_zone_id, cf_record_id), url=get_cf_dns_domain_record_id(cf_zone_id, cf_record_id),
json=json_payload, json=json_payload,
headers=get_headers(cf_api_key) headers=get_headers(cf_api_key)
) )
success = resp.status_code == 200 success = resp.status_code == 200
return success return success
def config_parser_has_option(config, profile, option): def config_parser_has_option(config, profile, option):
# 'DEFAULT' is a special value in ConfigParser # 'DEFAULT' is a special value in ConfigParser
config_name = "" if profile == "DEFAULT" else profile config_name = "" if profile == "DEFAULT" else profile
if config.has_option(config_name, option): if config.has_option(config_name, option):
return config[profile][option] return config[profile][option]
return None return None
def get_default_path(filename): def get_default_path(filename):
paths = (".", os.environ["HOME"], "/root", "/") paths = (".", os.environ["HOME"], "/root", "/")
for path in path: for path in paths:
full_path = os.path.join(path, filename) full_path = os.path.join(path, filename)
if os.file.exists(full_path): if os.path.isfile(full_path):
return full_path return full_path
return filename return filename
def main(): def main():
# Logger - set to logging.INFO for production # Logger - set to logging.INFO for production
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
# String constants # String constants
default_secret_file = ".credentials.secret" default_secret_file = ".credentials.secret"
default_profile = "CLOUDFLARE" default_profile = "CLOUDFLARE"
default_fullpath = get_defualt_path(default_secret_file) default_fullpath = get_default_path(default_secret_file)
# Find the default path to crdentials file # Find the default path to crdentials file
# Argument Parser # Argument Parser
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="A utility for updating DNS records for dynamic IP addresses", description="A utility for updating DNS records for dynamic IP addresses",
epilog="Configuration files should be owned by root with an octal permission of 600 (read/write by root only). Run this program as sudo.", epilog="Configuration files should be owned by root with an octal permission of 600 (read/write by root only). Run this program as sudo.",
add_help=True add_help=True
) )
parser.add_argument("-i", "--get-record-ids", help="Print out (in JSON format) all records for your Cloudflair account", action="store_true") parser.add_argument("-i", "--get-record-ids", help="Print out (in JSON format) all records for your Cloudflair account", action="store_true")
parser.add_argument("-c", "--config", help="Specify configuration file (DEFAULT: '{}')".format(default_fullpath), default=default_fullpath) parser.add_argument("-c", "--config", help="Specify configuration file (DEFAULT: '{}')".format(default_fullpath), default=default_fullpath)
parser.add_argument("-p", "--profile", help="Specify profile in configuration file (DEFAULT: '{}')".format(default_profile), default=default_profile) parser.add_argument("-p", "--profile", help="Specify profile in configuration file (DEFAULT: '{}')".format(default_profile), default=default_profile)
parser.add_argument("-s", "--set", help="Set ip address to ip specified") parser.add_argument("-s", "--set", help="Set ip address to ip specified")
parser.add_argument("-g", "--get", help="Get old (from Cloudflare DNS records) and current ip address", action="store_true") parser.add_argument("-g", "--get", help="Get old (from Cloudflare DNS records) and current ip address", action="store_true")
arguments = parser.parse_args() arguments = parser.parse_args()
# Configuration file # Configuration file
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read(arguments.config) config.read(arguments.config)
options = OrderedDict([ options = OrderedDict([
("domain_name", config_parser_has_option(config, arguments.profile, "domain_name")), ("domain_name", config_parser_has_option(config, arguments.profile, "domain_name")),
("cf_api_key", config_parser_has_option(config, arguments.profile, "cf_api_key")), ("cf_api_key", config_parser_has_option(config, arguments.profile, "cf_api_key")),
("cf_zone_id", config_parser_has_option(config, arguments.profile, "cf_zone_id")), ("cf_zone_id", config_parser_has_option(config, arguments.profile, "cf_zone_id")),
("cf_record_id", config_parser_has_option(config, arguments.profile, "cf_record_id")) ("cf_record_id", config_parser_has_option(config, arguments.profile, "cf_record_id"))
]) ])
# Validate configuration file # Validate configuration file
missing_parameter = next((i for i in options if options[i] is None and i != "cf_record_id"), None) missing_parameter = next((i for i in options if options[i] is None and i != "cf_record_id"), None)
if missing_parameter is not None: if missing_parameter is not None:
logging.error("Configuration: {} | Profile: {} | Option: {}".format( logging.error("Configuration: {} | Profile: {} | Option: {}".format(
arguments.config, arguments.profile, missing_parameter arguments.config, arguments.profile, missing_parameter
)) ))
return 1 return 1
# If cf_zone_id is None, print out all record ids. The user will have to update this setting in their config file # If cf_zone_id is None, print out all record ids. The user will have to update this setting in their config file
# This will have to be done at least once # This will have to be done at least once
if arguments.get_record_ids or options["cf_record_id"] is None: if arguments.get_record_ids or options["cf_record_id"] is None:
return int(get_record_id(options["cf_zone_id"], options["cf_api_key"])) return int(get_record_id(options["cf_zone_id"], options["cf_api_key"]))
# Get the current dns record from cloudflair (old) # Get the current dns record from cloudflair (old)
old_ip = get_old_ip(options["cf_zone_id"], options["cf_record_id"], options["cf_api_key"]) old_ip = get_old_ip(options["cf_zone_id"], options["cf_record_id"], options["cf_api_key"])
# Get the current ip address of the server (new) # Get the current ip address of the server (new)
new_ip = None new_ip = None
if arguments.set: if arguments.set:
new_ip = arguments.set new_ip = arguments.set
else: else:
new_ip = get_new_ip() new_ip = get_new_ip()
if new_ip is None: if new_ip is None:
logging.error("Could not determine new IP") logging.error("Could not determine new IP")
return 1 return 1
# Print beginning of status line # Print beginning of status line
if old_ip is None: if old_ip is None:
logging.warn("Could not determine old IP") logging.warn("Could not determine old IP")
# Update cloudflair's dns record if necessary and print out the end of the status line # Update cloudflair's dns record if necessary and print out the end of the status line
if arguments.get: if arguments.get:
logging.info("Current IP address: {}".format(new_ip)) logging.info("Current IP address: {}".format(new_ip))
elif old_ip == new_ip: elif old_ip == new_ip:
logging.info("New IP matches old IP: {}".format(new_ip)) logging.info("New IP matches old IP: {}".format(new_ip))
else: else:
if put_new_ip(options["domain_name"], new_ip, options["cf_zone_id"], options["cf_record_id"], options["cf_api_key"]): if put_new_ip(options["domain_name"], new_ip, options["cf_zone_id"], options["cf_record_id"], options["cf_api_key"]):
logging.info("IP updated to: {} from: {}".format(old_ip, new_ip)) logging.info("IP updated to: {} from: {}".format(old_ip, new_ip))
else: else:
logging.error("Failed to update to: {} from: {}".format(old_ip, new_ip)) logging.error("Failed to update to: {} from: {}".format(old_ip, new_ip))
return 1 return 1
return 0 return 0
# Exit codes: # Exit codes:
# 0 - Success # 0 - Success
# 1 - An error occurred # 1 - An error occurred
if __name__ == "__main__": if __name__ == "__main__":
try: try:
sys.exit(main()) sys.exit(main())
except Exception as e: except Exception as e:
print("Exception!!", e) print("Exception!!", e)
sys.exit(1) sys.exit(1)