From 57641f3b9a1eda06e5b23fef0433ece163f3d3f9 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sun, 10 Nov 2013 05:52:41 +0100 Subject: [PATCH] allow single-host update secrets, make dnstools tests work for everybody the nsupdate.info zone is configured to allow updating test.nsupdate.info with a non-secret key used for the tests. we also have a test that tries to update another host with that key and checks that this fails. change the tests so they only use test.nsupdate.info (if possible). single-host update secrets need a Domain record for the fqdn of this single host, the fqdn is tried first, before it tries the origin zone. --- conftest.py | 40 ++++++++++----------- nsupdate/main/_tests/test_dnstools.py | 45 ++++++++++++++++-------- nsupdate/main/dnstools.py | 50 ++++++++++++++++++--------- 3 files changed, 81 insertions(+), 54 deletions(-) diff --git a/conftest.py b/conftest.py index ff0e7c8..6a61dbf 100644 --- a/conftest.py +++ b/conftest.py @@ -4,28 +4,15 @@ configuration for the tests import pytest -# put test_settings.py into the toplevel dir and invoke py.test from there -# needs to look like (just with YOUR domain, IP, algo, key, hostnames, IPs): -""" -# this is to create a Domain entry in the database, so it can be used for unit tests: +# this is to create a Domain entries in the database, so they can be used for unit tests: BASEDOMAIN = "nsupdate.info" +TEST_HOST = 'test.' + BASEDOMAIN # unit tests can update this host ONLY NAMESERVER_IP = "85.10.192.104" NAMESERVER_UPDATE_ALGORITHM = "HMAC_SHA512" -NAMESERVER_UPDATE_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx==" +# no problem, you can ONLY update the TEST_HOST with this key, nothing else: +NAMESERVER_UPDATE_KEY = "YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYQ==" NAMESERVER_PUBLIC = True -# for some unittests -WWW_HOST = BASEDOMAIN -NONEXISTING_HOST = 'nonexisting.' + BASEDOMAIN -TEST_HOST = 'test.' + BASEDOMAIN -WWW_IPV4_HOST = 'ipv4.' + BASEDOMAIN -WWW_IPV6_HOST = 'ipv6.' + BASEDOMAIN -WWW_IPV4_IP = '85.10.192.104' -WWW_IPV6_IP = '2a01:4f8:a0:2ffe:0:ff:fe00:8000' -""" - -import test_settings - from django.utils.translation import activate @@ -36,12 +23,21 @@ def db_init(db): # note: db is a predefined fixture and required here to have t Init the database contents for testing, so we have a service domain, ... """ from nsupdate.main.models import Domain + # this is for updating: Domain.objects.create( - domain=test_settings.BASEDOMAIN, - nameserver_ip=test_settings.NAMESERVER_IP, - nameserver_update_algorithm=test_settings.NAMESERVER_UPDATE_ALGORITHM, - nameserver_update_key=test_settings.NAMESERVER_UPDATE_KEY, - public=test_settings.NAMESERVER_PUBLIC, + domain=TEST_HOST, # special: single-host update secret! + nameserver_ip=NAMESERVER_IP, + nameserver_update_algorithm=NAMESERVER_UPDATE_ALGORITHM, + nameserver_update_key=NAMESERVER_UPDATE_KEY, + public=NAMESERVER_PUBLIC, + ) + # this is for querying: + Domain.objects.create( + domain=BASEDOMAIN, + nameserver_ip=NAMESERVER_IP, + nameserver_update_algorithm=NAMESERVER_UPDATE_ALGORITHM, + nameserver_update_key='invalid=', # we don't send updates there (and the real key is really secret) + public=NAMESERVER_PUBLIC, ) diff --git a/nsupdate/main/_tests/test_dnstools.py b/nsupdate/main/_tests/test_dnstools.py index 995432e..61eb6ee 100644 --- a/nsupdate/main/_tests/test_dnstools.py +++ b/nsupdate/main/_tests/test_dnstools.py @@ -3,14 +3,19 @@ Tests for dnstools module. """ import pytest -from test_settings import * pytestmark = pytest.mark.django_db from dns.resolver import NXDOMAIN, NoAnswer +from dns.tsig import PeerBadSignature from ..dnstools import add, delete, update, query_ns, parse_name, update_ns, SameIpError +# see also conftest.py +BASEDOMAIN = 'nsupdate.info' +TEST_HOST = 'test.' + BASEDOMAIN # you can ONLY update THIS host in unit tests +INVALID_HOST = 'test999.' + BASEDOMAIN # therefore, this can't get updated + def remove_records(host, records=('A', 'AAAA', )): # make sure the records are not there @@ -75,28 +80,32 @@ class TestIntelligentDeleter(object): class TestQuery(object): def test_queries_ok(self): - assert query_ns(WWW_IPV4_HOST, 'A') == WWW_IPV4_IP # v4 ONLY - assert query_ns(WWW_IPV6_HOST, 'AAAA') == WWW_IPV6_IP # v6 ONLY - assert query_ns(WWW_HOST, 'A') == WWW_IPV4_IP # v4 and v6, query v4 - assert query_ns(WWW_HOST, 'AAAA') == WWW_IPV6_IP # v4 and v6, query v6 - - def test_queries_failing(self): + host, ipv4, ipv6 = TEST_HOST, '42.42.42.42', '::23' + remove_records(host) with pytest.raises(NXDOMAIN): - query_ns(NONEXISTING_HOST, 'A') + query_ns(TEST_HOST, 'A') with pytest.raises(NXDOMAIN): - query_ns(NONEXISTING_HOST, 'AAAA') + query_ns(TEST_HOST, 'AAAA') + add(host, ipv4) + assert query_ns(TEST_HOST, 'A') == ipv4 + with pytest.raises(NoAnswer): + query_ns(TEST_HOST, 'AAAA') + add(host, ipv6) + assert query_ns(TEST_HOST, 'AAAA') == ipv6 class TestUpdate(object): def test_parse1(self): - origin, relname = parse_name('test.' + BASEDOMAIN) - assert str(origin) == BASEDOMAIN + '.' - assert str(relname) == 'test' + host, domain = 'test', BASEDOMAIN + origin, relname = parse_name(host + '.' + domain) + assert str(origin) == domain + '.' + assert str(relname) == host def test_parse2(self): - origin, relname = parse_name('foo.test.' + BASEDOMAIN) - assert str(origin) == BASEDOMAIN + '.' - assert str(relname) == 'foo.test' + host, domain = 'prefix.test', BASEDOMAIN + origin, relname = parse_name(host + '.' + domain) + assert str(origin) == domain + '.' + assert str(relname) == 'prefix.test' def test_parse_with_origin(self): origin, relname = parse_name('foo.bar.baz.org', 'bar.baz.org') @@ -168,3 +177,9 @@ class TestUpdate(object): # make sure the v6 is unchanged assert query_ns(host6, 'AAAA') == ip6 + + def test_bad_update(self): + # test whether we ONLY can update the TEST_HOST + with pytest.raises(PeerBadSignature): + response = update_ns(INVALID_HOST, 'A', '6.6.6.6', action='upd', ttl=60, raise_badsig=True) + print response diff --git a/nsupdate/main/dnstools.py b/nsupdate/main/dnstools.py index 2c6116c..8a257ba 100644 --- a/nsupdate/main/dnstools.py +++ b/nsupdate/main/dnstools.py @@ -155,8 +155,7 @@ def query_ns(qname, rdtype, origin=None): origin, name = parse_name(qname, origin) fqdn = name + origin assert fqdn.is_absolute() - origin_str = str(origin) - nameserver = get_ns_info(origin_str)[0] + nameserver, origin = get_ns_info(fqdn)[0:2] resolver = dns.resolver.Resolver(configure=False) # we do not configure it from resolv.conf, but patch in the values we # want into the documented attributes: @@ -195,18 +194,33 @@ def parse_name(fqdn, origin=None): return origin, rel_name -def get_ns_info(origin): +def get_ns_info(fqdn, origin=None): """ Get the master nameserver for the zone, the key needed to update the zone and the key algorithm used. - :param origin: zone we are dealing with, must be with trailing dot - :return: master nameserver, update key, update algo + :param fqdn: the fully qualified hostname we are dealing with (str) + :param origin: zone we are dealing with, must be with trailing dot (default:autodetect) (str) + :return: master nameserver, origin, domain, update keyname, update key, update algo :raises: NameServerNotAvailable if ns was flagged unavailable in the db """ + fqdn_str = str(fqdn) + origin, name = parse_name(fqdn_str, origin) + origin_str = str(origin) from .models import Domain - domain = origin.rstrip('.') - d = Domain.objects.get(domain=domain) + try: + # first we check if we have an entry for the fqdn + # single-host update secret use case + # XXX we need 2 DB accesses for the usual case just to support this rare case + domain = fqdn_str.rstrip('.') + d = Domain.objects.get(domain=domain) + keyname = fqdn_str + except Domain.DoesNotExist: + # now check the base zone, the usual case + # zone update secret use case + domain = origin_str.rstrip('.') + d = Domain.objects.get(domain=domain) + keyname = origin_str if not d.available: if d.last_update + timedelta(seconds=UNAVAILABLE_RETRY) > now(): # if there are troubles with a nameserver, we set available=False @@ -215,12 +229,13 @@ def get_ns_info(origin): domain, d.nameserver_ip, )) else: # retry timeout is over, set it available again - set_ns_availability(origin, True) + set_ns_availability(domain, True) algorithm = getattr(dns.tsig, d.nameserver_update_algorithm) - return d.nameserver_ip, d.nameserver_update_key, algorithm + return d.nameserver_ip, origin, domain, name, keyname, d.nameserver_update_key, algorithm -def update_ns(fqdn, rdtype='A', ipaddr=None, origin=None, action='upd', ttl=60): +def update_ns(fqdn, rdtype='A', ipaddr=None, origin=None, action='upd', ttl=60, + raise_badsig=False): """ update our master server @@ -233,11 +248,9 @@ def update_ns(fqdn, rdtype='A', ipaddr=None, origin=None, action='upd', ttl=60): :return: dns response """ assert action in ['add', 'del', 'upd', ] - origin, name = parse_name(fqdn, origin) - origin_str = str(origin) - nameserver, key, algo = get_ns_info(origin_str) + nameserver, origin, domain, name, keyname, key, algo = get_ns_info(fqdn, origin) upd = dns.update.Update(origin, - keyring=dns.tsigkeyring.from_text({origin_str: key}), + keyring=dns.tsigkeyring.from_text({keyname: key}), keyalgorithm=algo) if action == 'add': assert ipaddr is not None @@ -255,11 +268,14 @@ def update_ns(fqdn, rdtype='A', ipaddr=None, origin=None, action='upd', ttl=60): except dns.exception.Timeout: logger.warning("timeout when performing %s for name %s and origin %s with rdtype %s and ipaddr %s" % ( action, name, origin, rdtype, ipaddr)) - set_ns_availability(origin, False) + set_ns_availability(domain, False) raise except dns.tsig.PeerBadSignature: - logger.error("PeerBadSignature - shared secret mismatch? zone: %s" % (origin_str, )) - set_ns_availability(origin, False) + logger.error("PeerBadSignature - shared secret mismatch? zone: %s" % (origin, )) + if raise_badsig: + raise + else: + set_ns_availability(domain, False) def set_ns_availability(domain, available):