dnstools: implement additional functions: add and delete, with tests.
This commit is contained in:
parent
3fb3524106
commit
074233430c
@ -4,7 +4,7 @@ Tests for dnstools module.
|
||||
|
||||
import pytest
|
||||
|
||||
from ..dnstools import update, query_ns, parse_name, update_ns, SameIpError
|
||||
from ..dnstools import add, delete, update, query_ns, parse_name, update_ns, SameIpError
|
||||
from django.conf import settings
|
||||
BASEDOMAIN = settings.BASEDOMAIN
|
||||
|
||||
@ -22,11 +22,66 @@ class TestIntelligentUpdater(object):
|
||||
pass
|
||||
# first update with this IP, should work without issue:
|
||||
update(host, ip)
|
||||
assert query_ns(host, 'A') == ip
|
||||
with pytest.raises(SameIpError):
|
||||
# trying to update again with same IP should raise
|
||||
update(host, ip)
|
||||
|
||||
|
||||
class TestIntelligentAdder(object):
|
||||
def test_double_add_same(self):
|
||||
host, ip = 'test0.' + settings.BASEDOMAIN, '1.2.3.4'
|
||||
# make sure the host is not there
|
||||
try:
|
||||
update_ns(host, 'A', action='del')
|
||||
except NXDOMAIN:
|
||||
# it is ok if it was never there
|
||||
pass
|
||||
# first add with this IP, should work without issue:
|
||||
add(host, ip)
|
||||
assert query_ns(host, 'A') == ip
|
||||
with pytest.raises(SameIpError):
|
||||
# trying to add again with same IP should raise
|
||||
add(host, ip)
|
||||
|
||||
def test_double_add_different(self):
|
||||
host, ip = 'test0.' + settings.BASEDOMAIN, '1.2.3.4'
|
||||
# make sure the host is not there
|
||||
try:
|
||||
update_ns(host, 'A', action='del')
|
||||
except NXDOMAIN:
|
||||
# it is ok if it was never there
|
||||
pass
|
||||
# first add with this IP, should work without issue:
|
||||
add(host, ip)
|
||||
assert query_ns(host, 'A') == ip
|
||||
different_ip = '4.3.2.1'
|
||||
# trying to add again with same IP should raise
|
||||
add(host, different_ip) # internally triggers an update
|
||||
assert query_ns(host, 'A') == different_ip
|
||||
|
||||
|
||||
class TestIntelligentDeleter(object):
|
||||
def test_delete(self):
|
||||
host, ip = 'test0.' + settings.BASEDOMAIN, '1.2.3.4'
|
||||
# make sure the host is there
|
||||
update_ns(host, 'A', ip, action='add')
|
||||
delete(host)
|
||||
# make sure it is gone
|
||||
with pytest.raises(NXDOMAIN):
|
||||
query_ns(host, 'A')
|
||||
|
||||
def test_double_delete(self):
|
||||
host = 'test0.' + settings.BASEDOMAIN
|
||||
# make sure the host is not there
|
||||
try:
|
||||
update_ns(host, 'A', action='del')
|
||||
except NXDOMAIN:
|
||||
# it is ok if it was never there
|
||||
pass
|
||||
delete(host) # hmm, this doesn't raise NXDOMAIN!?
|
||||
|
||||
|
||||
class TestQuery(object):
|
||||
def test_queries_ok(self):
|
||||
assert query_ns(settings.WWW_IPV4_HOST, 'A') == settings.WWW_IPV4_IP # v4 ONLY
|
||||
|
@ -1,5 +1,7 @@
|
||||
"""
|
||||
Misc. DNS related code: query, dynamic update, etc.
|
||||
|
||||
Usually, higher level code wants to call the add/update/delete functions.
|
||||
"""
|
||||
|
||||
from django.conf import settings
|
||||
@ -20,6 +22,63 @@ class SameIpError(ValueError):
|
||||
"""
|
||||
|
||||
|
||||
def get_rdtype(ipaddr):
|
||||
"""
|
||||
Get the record type 'A' or 'AAAA' for this ipaddr.
|
||||
|
||||
:param ipaddr: ip address v4 or v6 (str)
|
||||
:return: 'A' or 'AAAA'
|
||||
"""
|
||||
af = dns.inet.af_for_address(ipaddr)
|
||||
return 'A' if af == dns.inet.AF_INET else 'AAAA'
|
||||
|
||||
|
||||
def add(fqdn, ipaddr, ttl=60):
|
||||
"""
|
||||
intelligent dns adder - first does a lookup on the master server to find
|
||||
the current ip and only sends an 'add' if there is no such entry.
|
||||
otherwise send an 'upd' if the if we have a different ip.
|
||||
|
||||
:param fqdn: fully qualified domain name (str)
|
||||
:param ipaddr: new ip address
|
||||
:param ttl: time to live, default 60s (int)
|
||||
:raises: SameIpError if new and old IP is the same
|
||||
"""
|
||||
rdtype = get_rdtype(ipaddr)
|
||||
try:
|
||||
current_ipaddr = query_ns(fqdn, rdtype)
|
||||
# check if ip really changed
|
||||
ok = ipaddr != current_ipaddr
|
||||
action = 'upd'
|
||||
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
|
||||
# no dns entry yet, ok
|
||||
ok = True
|
||||
action = 'add'
|
||||
if ok:
|
||||
# only send an add/update if the ip really changed as the update
|
||||
# causes write I/O on the nameserver and also traffic to the
|
||||
# dns slaves (they get a notify if we update the zone).
|
||||
update_ns(fqdn, rdtype, ipaddr, action=action, ttl=ttl)
|
||||
else:
|
||||
raise SameIpError
|
||||
|
||||
|
||||
def delete(fqdn, rdtype=None):
|
||||
"""
|
||||
dns deleter
|
||||
|
||||
:param fqdn: fully qualified domain name (str)
|
||||
:param rdtype: 'A', 'AAAA' or None (deletes 'A' and 'AAAA')
|
||||
"""
|
||||
if rdtype is not None:
|
||||
assert rdtype in ['A', 'AAAA', ]
|
||||
rdtypes = [rdtype, ]
|
||||
else:
|
||||
rdtypes = ['A', 'AAAA']
|
||||
for rdtype in rdtypes:
|
||||
update_ns(fqdn, rdtype, action='del')
|
||||
|
||||
|
||||
def update(fqdn, ipaddr, ttl=60):
|
||||
"""
|
||||
intelligent dns updater - first does a lookup on the master server to find
|
||||
@ -30,8 +89,7 @@ def update(fqdn, ipaddr, ttl=60):
|
||||
:param ttl: time to live, default 60s (int)
|
||||
:raises: SameIpError if new and old IP is the same
|
||||
"""
|
||||
af = dns.inet.af_for_address(ipaddr)
|
||||
rdtype = 'A' if af == dns.inet.AF_INET else 'AAAA'
|
||||
rdtype = get_rdtype(ipaddr)
|
||||
try:
|
||||
current_ipaddr = query_ns(fqdn, rdtype)
|
||||
# check if ip really changed
|
||||
|
Loading…
x
Reference in New Issue
Block a user