allow single-host update secrets, make dnstools tests work for everybody
the nsupdate.info zone is configured to allow updating test.nsupdate.info with a non-secret key used for the tests. we also have a test that tries to update another host with that key and checks that this fails. change the tests so they only use test.nsupdate.info (if possible). single-host update secrets need a Domain record for the fqdn of this single host, the fqdn is tried first, before it tries the origin zone.
This commit is contained in:
parent
4fadb5de82
commit
57641f3b9a
40
conftest.py
40
conftest.py
@ -4,28 +4,15 @@ configuration for the tests
|
||||
|
||||
import pytest
|
||||
|
||||
# put test_settings.py into the toplevel dir and invoke py.test from there
|
||||
# needs to look like (just with YOUR domain, IP, algo, key, hostnames, IPs):
|
||||
"""
|
||||
# this is to create a Domain entry in the database, so it can be used for unit tests:
|
||||
# this is to create a Domain entries in the database, so they can be used for unit tests:
|
||||
BASEDOMAIN = "nsupdate.info"
|
||||
TEST_HOST = 'test.' + BASEDOMAIN # unit tests can update this host ONLY
|
||||
NAMESERVER_IP = "85.10.192.104"
|
||||
NAMESERVER_UPDATE_ALGORITHM = "HMAC_SHA512"
|
||||
NAMESERVER_UPDATE_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=="
|
||||
# no problem, you can ONLY update the TEST_HOST with this key, nothing else:
|
||||
NAMESERVER_UPDATE_KEY = "YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYQ=="
|
||||
NAMESERVER_PUBLIC = True
|
||||
|
||||
# for some unittests
|
||||
WWW_HOST = BASEDOMAIN
|
||||
NONEXISTING_HOST = 'nonexisting.' + BASEDOMAIN
|
||||
TEST_HOST = 'test.' + BASEDOMAIN
|
||||
WWW_IPV4_HOST = 'ipv4.' + BASEDOMAIN
|
||||
WWW_IPV6_HOST = 'ipv6.' + BASEDOMAIN
|
||||
WWW_IPV4_IP = '85.10.192.104'
|
||||
WWW_IPV6_IP = '2a01:4f8:a0:2ffe:0:ff:fe00:8000'
|
||||
"""
|
||||
|
||||
import test_settings
|
||||
|
||||
from django.utils.translation import activate
|
||||
|
||||
|
||||
@ -36,12 +23,21 @@ def db_init(db): # note: db is a predefined fixture and required here to have t
|
||||
Init the database contents for testing, so we have a service domain, ...
|
||||
"""
|
||||
from nsupdate.main.models import Domain
|
||||
# this is for updating:
|
||||
Domain.objects.create(
|
||||
domain=test_settings.BASEDOMAIN,
|
||||
nameserver_ip=test_settings.NAMESERVER_IP,
|
||||
nameserver_update_algorithm=test_settings.NAMESERVER_UPDATE_ALGORITHM,
|
||||
nameserver_update_key=test_settings.NAMESERVER_UPDATE_KEY,
|
||||
public=test_settings.NAMESERVER_PUBLIC,
|
||||
domain=TEST_HOST, # special: single-host update secret!
|
||||
nameserver_ip=NAMESERVER_IP,
|
||||
nameserver_update_algorithm=NAMESERVER_UPDATE_ALGORITHM,
|
||||
nameserver_update_key=NAMESERVER_UPDATE_KEY,
|
||||
public=NAMESERVER_PUBLIC,
|
||||
)
|
||||
# this is for querying:
|
||||
Domain.objects.create(
|
||||
domain=BASEDOMAIN,
|
||||
nameserver_ip=NAMESERVER_IP,
|
||||
nameserver_update_algorithm=NAMESERVER_UPDATE_ALGORITHM,
|
||||
nameserver_update_key='invalid=', # we don't send updates there (and the real key is really secret)
|
||||
public=NAMESERVER_PUBLIC,
|
||||
)
|
||||
|
||||
|
||||
|
@ -3,14 +3,19 @@ Tests for dnstools module.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from test_settings import *
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
from dns.resolver import NXDOMAIN, NoAnswer
|
||||
from dns.tsig import PeerBadSignature
|
||||
|
||||
from ..dnstools import add, delete, update, query_ns, parse_name, update_ns, SameIpError
|
||||
|
||||
# see also conftest.py
|
||||
BASEDOMAIN = 'nsupdate.info'
|
||||
TEST_HOST = 'test.' + BASEDOMAIN # you can ONLY update THIS host in unit tests
|
||||
INVALID_HOST = 'test999.' + BASEDOMAIN # therefore, this can't get updated
|
||||
|
||||
|
||||
def remove_records(host, records=('A', 'AAAA', )):
|
||||
# make sure the records are not there
|
||||
@ -75,28 +80,32 @@ class TestIntelligentDeleter(object):
|
||||
|
||||
class TestQuery(object):
|
||||
def test_queries_ok(self):
|
||||
assert query_ns(WWW_IPV4_HOST, 'A') == WWW_IPV4_IP # v4 ONLY
|
||||
assert query_ns(WWW_IPV6_HOST, 'AAAA') == WWW_IPV6_IP # v6 ONLY
|
||||
assert query_ns(WWW_HOST, 'A') == WWW_IPV4_IP # v4 and v6, query v4
|
||||
assert query_ns(WWW_HOST, 'AAAA') == WWW_IPV6_IP # v4 and v6, query v6
|
||||
|
||||
def test_queries_failing(self):
|
||||
host, ipv4, ipv6 = TEST_HOST, '42.42.42.42', '::23'
|
||||
remove_records(host)
|
||||
with pytest.raises(NXDOMAIN):
|
||||
query_ns(NONEXISTING_HOST, 'A')
|
||||
query_ns(TEST_HOST, 'A')
|
||||
with pytest.raises(NXDOMAIN):
|
||||
query_ns(NONEXISTING_HOST, 'AAAA')
|
||||
query_ns(TEST_HOST, 'AAAA')
|
||||
add(host, ipv4)
|
||||
assert query_ns(TEST_HOST, 'A') == ipv4
|
||||
with pytest.raises(NoAnswer):
|
||||
query_ns(TEST_HOST, 'AAAA')
|
||||
add(host, ipv6)
|
||||
assert query_ns(TEST_HOST, 'AAAA') == ipv6
|
||||
|
||||
|
||||
class TestUpdate(object):
|
||||
def test_parse1(self):
|
||||
origin, relname = parse_name('test.' + BASEDOMAIN)
|
||||
assert str(origin) == BASEDOMAIN + '.'
|
||||
assert str(relname) == 'test'
|
||||
host, domain = 'test', BASEDOMAIN
|
||||
origin, relname = parse_name(host + '.' + domain)
|
||||
assert str(origin) == domain + '.'
|
||||
assert str(relname) == host
|
||||
|
||||
def test_parse2(self):
|
||||
origin, relname = parse_name('foo.test.' + BASEDOMAIN)
|
||||
assert str(origin) == BASEDOMAIN + '.'
|
||||
assert str(relname) == 'foo.test'
|
||||
host, domain = 'prefix.test', BASEDOMAIN
|
||||
origin, relname = parse_name(host + '.' + domain)
|
||||
assert str(origin) == domain + '.'
|
||||
assert str(relname) == 'prefix.test'
|
||||
|
||||
def test_parse_with_origin(self):
|
||||
origin, relname = parse_name('foo.bar.baz.org', 'bar.baz.org')
|
||||
@ -168,3 +177,9 @@ class TestUpdate(object):
|
||||
|
||||
# make sure the v6 is unchanged
|
||||
assert query_ns(host6, 'AAAA') == ip6
|
||||
|
||||
def test_bad_update(self):
|
||||
# test whether we ONLY can update the TEST_HOST
|
||||
with pytest.raises(PeerBadSignature):
|
||||
response = update_ns(INVALID_HOST, 'A', '6.6.6.6', action='upd', ttl=60, raise_badsig=True)
|
||||
print response
|
||||
|
@ -155,8 +155,7 @@ def query_ns(qname, rdtype, origin=None):
|
||||
origin, name = parse_name(qname, origin)
|
||||
fqdn = name + origin
|
||||
assert fqdn.is_absolute()
|
||||
origin_str = str(origin)
|
||||
nameserver = get_ns_info(origin_str)[0]
|
||||
nameserver, origin = get_ns_info(fqdn)[0:2]
|
||||
resolver = dns.resolver.Resolver(configure=False)
|
||||
# we do not configure it from resolv.conf, but patch in the values we
|
||||
# want into the documented attributes:
|
||||
@ -195,18 +194,33 @@ def parse_name(fqdn, origin=None):
|
||||
return origin, rel_name
|
||||
|
||||
|
||||
def get_ns_info(origin):
|
||||
def get_ns_info(fqdn, origin=None):
|
||||
"""
|
||||
Get the master nameserver for the <origin> zone, the key needed
|
||||
to update the zone and the key algorithm used.
|
||||
|
||||
:param origin: zone we are dealing with, must be with trailing dot
|
||||
:return: master nameserver, update key, update algo
|
||||
:param fqdn: the fully qualified hostname we are dealing with (str)
|
||||
:param origin: zone we are dealing with, must be with trailing dot (default:autodetect) (str)
|
||||
:return: master nameserver, origin, domain, update keyname, update key, update algo
|
||||
:raises: NameServerNotAvailable if ns was flagged unavailable in the db
|
||||
"""
|
||||
fqdn_str = str(fqdn)
|
||||
origin, name = parse_name(fqdn_str, origin)
|
||||
origin_str = str(origin)
|
||||
from .models import Domain
|
||||
domain = origin.rstrip('.')
|
||||
try:
|
||||
# first we check if we have an entry for the fqdn
|
||||
# single-host update secret use case
|
||||
# XXX we need 2 DB accesses for the usual case just to support this rare case
|
||||
domain = fqdn_str.rstrip('.')
|
||||
d = Domain.objects.get(domain=domain)
|
||||
keyname = fqdn_str
|
||||
except Domain.DoesNotExist:
|
||||
# now check the base zone, the usual case
|
||||
# zone update secret use case
|
||||
domain = origin_str.rstrip('.')
|
||||
d = Domain.objects.get(domain=domain)
|
||||
keyname = origin_str
|
||||
if not d.available:
|
||||
if d.last_update + timedelta(seconds=UNAVAILABLE_RETRY) > now():
|
||||
# if there are troubles with a nameserver, we set available=False
|
||||
@ -215,12 +229,13 @@ def get_ns_info(origin):
|
||||
domain, d.nameserver_ip, ))
|
||||
else:
|
||||
# retry timeout is over, set it available again
|
||||
set_ns_availability(origin, True)
|
||||
set_ns_availability(domain, True)
|
||||
algorithm = getattr(dns.tsig, d.nameserver_update_algorithm)
|
||||
return d.nameserver_ip, d.nameserver_update_key, algorithm
|
||||
return d.nameserver_ip, origin, domain, name, keyname, d.nameserver_update_key, algorithm
|
||||
|
||||
|
||||
def update_ns(fqdn, rdtype='A', ipaddr=None, origin=None, action='upd', ttl=60):
|
||||
def update_ns(fqdn, rdtype='A', ipaddr=None, origin=None, action='upd', ttl=60,
|
||||
raise_badsig=False):
|
||||
"""
|
||||
update our master server
|
||||
|
||||
@ -233,11 +248,9 @@ def update_ns(fqdn, rdtype='A', ipaddr=None, origin=None, action='upd', ttl=60):
|
||||
:return: dns response
|
||||
"""
|
||||
assert action in ['add', 'del', 'upd', ]
|
||||
origin, name = parse_name(fqdn, origin)
|
||||
origin_str = str(origin)
|
||||
nameserver, key, algo = get_ns_info(origin_str)
|
||||
nameserver, origin, domain, name, keyname, key, algo = get_ns_info(fqdn, origin)
|
||||
upd = dns.update.Update(origin,
|
||||
keyring=dns.tsigkeyring.from_text({origin_str: key}),
|
||||
keyring=dns.tsigkeyring.from_text({keyname: key}),
|
||||
keyalgorithm=algo)
|
||||
if action == 'add':
|
||||
assert ipaddr is not None
|
||||
@ -255,11 +268,14 @@ def update_ns(fqdn, rdtype='A', ipaddr=None, origin=None, action='upd', ttl=60):
|
||||
except dns.exception.Timeout:
|
||||
logger.warning("timeout when performing %s for name %s and origin %s with rdtype %s and ipaddr %s" % (
|
||||
action, name, origin, rdtype, ipaddr))
|
||||
set_ns_availability(origin, False)
|
||||
set_ns_availability(domain, False)
|
||||
raise
|
||||
except dns.tsig.PeerBadSignature:
|
||||
logger.error("PeerBadSignature - shared secret mismatch? zone: %s" % (origin_str, ))
|
||||
set_ns_availability(origin, False)
|
||||
logger.error("PeerBadSignature - shared secret mismatch? zone: %s" % (origin, ))
|
||||
if raise_badsig:
|
||||
raise
|
||||
else:
|
||||
set_ns_availability(domain, False)
|
||||
|
||||
|
||||
def set_ns_availability(domain, available):
|
||||
|
Loading…
x
Reference in New Issue
Block a user