dns: implement updating the master server
This commit is contained in:
parent
4fb3976ac3
commit
f4172a306a
@ -4,13 +4,14 @@ Tests for dnstools module.
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from nsupdate.main.dnstools import (query_ns, NONEXISTING_HOST, WWW_HOST, WWW_IPV4_HOST, WWW_IPV4_IP,
|
from nsupdate.main.dnstools import (query_ns, parse_name, update_ns,
|
||||||
|
BASEDOMAIN, NONEXISTING_HOST, WWW_HOST, WWW_IPV4_HOST, WWW_IPV4_IP,
|
||||||
WWW_IPV6_HOST, WWW_IPV6_IP, )
|
WWW_IPV6_HOST, WWW_IPV6_IP, )
|
||||||
|
|
||||||
from dns.resolver import NXDOMAIN
|
from dns.resolver import NXDOMAIN
|
||||||
|
|
||||||
|
|
||||||
class Tests(object):
|
class TestQuery(object):
|
||||||
def test_queries_ok(self):
|
def test_queries_ok(self):
|
||||||
"""
|
"""
|
||||||
check some simple dns lookups
|
check some simple dns lookups
|
||||||
@ -25,3 +26,87 @@ class Tests(object):
|
|||||||
query_ns(NONEXISTING_HOST, 'A')
|
query_ns(NONEXISTING_HOST, 'A')
|
||||||
with pytest.raises(NXDOMAIN):
|
with pytest.raises(NXDOMAIN):
|
||||||
query_ns(NONEXISTING_HOST, 'AAAA')
|
query_ns(NONEXISTING_HOST, 'AAAA')
|
||||||
|
|
||||||
|
|
||||||
|
class TestUpdate(object):
|
||||||
|
def test_parse1(self):
|
||||||
|
"""
|
||||||
|
check fqdn parsing
|
||||||
|
"""
|
||||||
|
origin, relname = parse_name('foo.' + BASEDOMAIN)
|
||||||
|
assert str(origin) == BASEDOMAIN + '.'
|
||||||
|
assert str(relname) == 'foo'
|
||||||
|
|
||||||
|
def test_parse2(self):
|
||||||
|
origin, relname = parse_name('foo.bar.' + BASEDOMAIN)
|
||||||
|
assert str(origin) == BASEDOMAIN + '.'
|
||||||
|
assert str(relname) == 'foo.bar'
|
||||||
|
|
||||||
|
def test_parse_with_origin(self):
|
||||||
|
origin, relname = parse_name('foo.bar.baz.org', 'bar.baz.org')
|
||||||
|
assert str(origin) == 'bar.baz.org' + '.'
|
||||||
|
assert str(relname) == 'foo'
|
||||||
|
|
||||||
|
def test_add_del_v4(self):
|
||||||
|
host, ip = 'test1.' + BASEDOMAIN, '1.1.1.1'
|
||||||
|
response = update_ns(host, 'A', ip, action='add', ttl=60)
|
||||||
|
print response
|
||||||
|
assert query_ns(host, 'A') == ip
|
||||||
|
response = update_ns(host, 'A', action='del')
|
||||||
|
print response
|
||||||
|
with pytest.raises(NXDOMAIN):
|
||||||
|
query_ns(host, 'A') == ip
|
||||||
|
|
||||||
|
def test_update_v4(self):
|
||||||
|
host, ip = 'test2.' + BASEDOMAIN, '2.2.2.2'
|
||||||
|
response = update_ns(host, 'A', ip, action='upd', ttl=60)
|
||||||
|
print response
|
||||||
|
assert query_ns(host, 'A') == ip
|
||||||
|
|
||||||
|
host, ip = 'test2.' + BASEDOMAIN, '3.3.3.3'
|
||||||
|
response = update_ns(host, 'A', ip, action='upd', ttl=60)
|
||||||
|
print response
|
||||||
|
assert query_ns(host, 'A') == ip
|
||||||
|
|
||||||
|
def test_add_del_v6(self):
|
||||||
|
host, ip = 'test3.' + BASEDOMAIN, '::1'
|
||||||
|
response = update_ns(host, 'AAAA', ip, action='add', ttl=60)
|
||||||
|
print response
|
||||||
|
assert query_ns(host, 'AAAA') == ip
|
||||||
|
response = update_ns(host, 'AAAA', action='del')
|
||||||
|
print response
|
||||||
|
with pytest.raises(NXDOMAIN):
|
||||||
|
query_ns(host, 'AAAA') == ip
|
||||||
|
|
||||||
|
def test_update_v6(self):
|
||||||
|
host, ip = 'test4.' + BASEDOMAIN, '::2'
|
||||||
|
response = update_ns(host, 'AAAA', ip, action='upd', ttl=60)
|
||||||
|
print response
|
||||||
|
assert query_ns(host, 'AAAA') == ip
|
||||||
|
|
||||||
|
host, ip = 'test4.' + BASEDOMAIN, '::3'
|
||||||
|
response = update_ns(host, 'AAAA', ip, action='upd', ttl=60)
|
||||||
|
print response
|
||||||
|
assert query_ns(host, 'AAAA') == ip
|
||||||
|
|
||||||
|
def test_update_mixed(self):
|
||||||
|
host4, ip4 = 'test5.' + BASEDOMAIN, '4.4.4.4'
|
||||||
|
response = update_ns(host4, 'A', ip4, action='upd', ttl=60)
|
||||||
|
print response
|
||||||
|
assert query_ns(host4, 'A') == ip4
|
||||||
|
|
||||||
|
host6, ip6 = 'test5.' + BASEDOMAIN, '::4'
|
||||||
|
response = update_ns(host6, 'AAAA', ip6, action='upd', ttl=60)
|
||||||
|
print response
|
||||||
|
assert query_ns(host6, 'AAAA') == ip6
|
||||||
|
|
||||||
|
# make sure the v4 is unchanged
|
||||||
|
assert query_ns(host4, 'A') == ip4
|
||||||
|
|
||||||
|
host4, ip4 = 'test5.' + BASEDOMAIN, '5.5.5.5'
|
||||||
|
response = update_ns(host4, 'A', ip4, action='upd', ttl=60)
|
||||||
|
print response
|
||||||
|
assert query_ns(host4, 'A') == ip4
|
||||||
|
|
||||||
|
# make sure the v6 is unchanged
|
||||||
|
assert query_ns(host6, 'AAAA') == ip6
|
||||||
|
@ -2,6 +2,14 @@
|
|||||||
Misc. DNS related code: query, dynamic update, etc.
|
Misc. DNS related code: query, dynamic update, etc.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import dns.name
|
||||||
|
import dns.resolver
|
||||||
|
import dns.query
|
||||||
|
import dns.update
|
||||||
|
import dns.tsig
|
||||||
|
import dns.tsigkeyring
|
||||||
|
|
||||||
|
|
||||||
SERVER = '85.10.192.104' # ns1.thinkmo.de (master / dynamic upd server for nsupdate.info)
|
SERVER = '85.10.192.104' # ns1.thinkmo.de (master / dynamic upd server for nsupdate.info)
|
||||||
BASEDOMAIN = 'nsupdate.info'
|
BASEDOMAIN = 'nsupdate.info'
|
||||||
|
|
||||||
@ -12,8 +20,9 @@ WWW_IPV6_HOST = 'www.ipv6.' + BASEDOMAIN
|
|||||||
WWW_IPV4_IP = '178.32.221.14'
|
WWW_IPV4_IP = '178.32.221.14'
|
||||||
WWW_IPV6_IP = '2001:41d0:8:e00e::1'
|
WWW_IPV6_IP = '2001:41d0:8:e00e::1'
|
||||||
|
|
||||||
import dns.name
|
UPDATE_ALGO = dns.tsig.HMAC_SHA512
|
||||||
import dns.resolver
|
UPDATE_KEY = 'YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYQ=='
|
||||||
|
|
||||||
|
|
||||||
def query_ns(qname, rdtype):
|
def query_ns(qname, rdtype):
|
||||||
"""
|
"""
|
||||||
@ -32,3 +41,48 @@ def query_ns(qname, rdtype):
|
|||||||
resolver.search = [dns.name.from_text(BASEDOMAIN), ]
|
resolver.search = [dns.name.from_text(BASEDOMAIN), ]
|
||||||
answer = resolver.query(qname, rdtype)
|
answer = resolver.query(qname, rdtype)
|
||||||
return str(list(answer)[0])
|
return str(list(answer)[0])
|
||||||
|
|
||||||
|
|
||||||
|
def parse_name(fqdn, origin=None):
|
||||||
|
"""
|
||||||
|
Parse a fully qualified domain name into a relative name
|
||||||
|
and a origin zone. Please note that the origin return value will
|
||||||
|
have a trailing dot.
|
||||||
|
|
||||||
|
:param fqdn: fully qualified domain name (str)
|
||||||
|
:param origin: origin zone (optional, str)
|
||||||
|
:return: origin, relative name (both dns.name.Name)
|
||||||
|
"""
|
||||||
|
fqdn = dns.name.from_text(fqdn)
|
||||||
|
if origin is None:
|
||||||
|
origin = dns.resolver.zone_for_name(fqdn)
|
||||||
|
rel_name = fqdn.relativize(origin)
|
||||||
|
else:
|
||||||
|
origin = dns.name.from_text(origin)
|
||||||
|
rel_name = fqdn - origin
|
||||||
|
return origin, rel_name
|
||||||
|
|
||||||
|
|
||||||
|
def update_ns(fqdn, rdtype='A', ipaddr=None, origin=None, action='upd', ttl=60):
|
||||||
|
"""
|
||||||
|
update our master server
|
||||||
|
|
||||||
|
:param qname: the name to update
|
||||||
|
:param rdtype: the record type
|
||||||
|
:param action: 'add', 'del' or 'upd'
|
||||||
|
"""
|
||||||
|
assert action in ['add', 'del', 'upd', ]
|
||||||
|
origin, name = parse_name(fqdn, origin)
|
||||||
|
upd = dns.update.Update(origin,
|
||||||
|
keyring=dns.tsigkeyring.from_text({BASEDOMAIN+'.': UPDATE_KEY}),
|
||||||
|
keyalgorithm=UPDATE_ALGO)
|
||||||
|
if action == 'add':
|
||||||
|
assert ipaddr is not None
|
||||||
|
upd.add(name, ttl, rdtype, ipaddr)
|
||||||
|
elif action == 'del':
|
||||||
|
upd.delete(name, rdtype)
|
||||||
|
elif action == 'upd':
|
||||||
|
assert ipaddr is not None
|
||||||
|
upd.replace(name, ttl, rdtype, ipaddr)
|
||||||
|
response = dns.query.tcp(upd, SERVER)
|
||||||
|
return response
|
||||||
|
Loading…
x
Reference in New Issue
Block a user