From f4172a306aa253695249eda109c5a5b278ef5907 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 28 Sep 2013 15:29:16 +0200 Subject: [PATCH] dns: implement updating the master server --- nsupdate/main/_tests/test_dnstools.py | 89 ++++++++++++++++++++++++++- nsupdate/main/dnstools.py | 58 ++++++++++++++++- 2 files changed, 143 insertions(+), 4 deletions(-) diff --git a/nsupdate/main/_tests/test_dnstools.py b/nsupdate/main/_tests/test_dnstools.py index a69b265..f945bd1 100644 --- a/nsupdate/main/_tests/test_dnstools.py +++ b/nsupdate/main/_tests/test_dnstools.py @@ -4,13 +4,14 @@ Tests for dnstools module. import pytest -from nsupdate.main.dnstools import (query_ns, NONEXISTING_HOST, WWW_HOST, WWW_IPV4_HOST, WWW_IPV4_IP, +from nsupdate.main.dnstools import (query_ns, parse_name, update_ns, + BASEDOMAIN, NONEXISTING_HOST, WWW_HOST, WWW_IPV4_HOST, WWW_IPV4_IP, WWW_IPV6_HOST, WWW_IPV6_IP, ) from dns.resolver import NXDOMAIN -class Tests(object): +class TestQuery(object): def test_queries_ok(self): """ check some simple dns lookups @@ -25,3 +26,87 @@ class Tests(object): query_ns(NONEXISTING_HOST, 'A') with pytest.raises(NXDOMAIN): query_ns(NONEXISTING_HOST, 'AAAA') + + +class TestUpdate(object): + def test_parse1(self): + """ + check fqdn parsing + """ + origin, relname = parse_name('foo.' + BASEDOMAIN) + assert str(origin) == BASEDOMAIN + '.' + assert str(relname) == 'foo' + + def test_parse2(self): + origin, relname = parse_name('foo.bar.' + BASEDOMAIN) + assert str(origin) == BASEDOMAIN + '.' + assert str(relname) == 'foo.bar' + + def test_parse_with_origin(self): + origin, relname = parse_name('foo.bar.baz.org', 'bar.baz.org') + assert str(origin) == 'bar.baz.org' + '.' + assert str(relname) == 'foo' + + def test_add_del_v4(self): + host, ip = 'test1.' + BASEDOMAIN, '1.1.1.1' + response = update_ns(host, 'A', ip, action='add', ttl=60) + print response + assert query_ns(host, 'A') == ip + response = update_ns(host, 'A', action='del') + print response + with pytest.raises(NXDOMAIN): + query_ns(host, 'A') == ip + + def test_update_v4(self): + host, ip = 'test2.' + BASEDOMAIN, '2.2.2.2' + response = update_ns(host, 'A', ip, action='upd', ttl=60) + print response + assert query_ns(host, 'A') == ip + + host, ip = 'test2.' + BASEDOMAIN, '3.3.3.3' + response = update_ns(host, 'A', ip, action='upd', ttl=60) + print response + assert query_ns(host, 'A') == ip + + def test_add_del_v6(self): + host, ip = 'test3.' + BASEDOMAIN, '::1' + response = update_ns(host, 'AAAA', ip, action='add', ttl=60) + print response + assert query_ns(host, 'AAAA') == ip + response = update_ns(host, 'AAAA', action='del') + print response + with pytest.raises(NXDOMAIN): + query_ns(host, 'AAAA') == ip + + def test_update_v6(self): + host, ip = 'test4.' + BASEDOMAIN, '::2' + response = update_ns(host, 'AAAA', ip, action='upd', ttl=60) + print response + assert query_ns(host, 'AAAA') == ip + + host, ip = 'test4.' + BASEDOMAIN, '::3' + response = update_ns(host, 'AAAA', ip, action='upd', ttl=60) + print response + assert query_ns(host, 'AAAA') == ip + + def test_update_mixed(self): + host4, ip4 = 'test5.' + BASEDOMAIN, '4.4.4.4' + response = update_ns(host4, 'A', ip4, action='upd', ttl=60) + print response + assert query_ns(host4, 'A') == ip4 + + host6, ip6 = 'test5.' + BASEDOMAIN, '::4' + response = update_ns(host6, 'AAAA', ip6, action='upd', ttl=60) + print response + assert query_ns(host6, 'AAAA') == ip6 + + # make sure the v4 is unchanged + assert query_ns(host4, 'A') == ip4 + + host4, ip4 = 'test5.' + BASEDOMAIN, '5.5.5.5' + response = update_ns(host4, 'A', ip4, action='upd', ttl=60) + print response + assert query_ns(host4, 'A') == ip4 + + # make sure the v6 is unchanged + assert query_ns(host6, 'AAAA') == ip6 diff --git a/nsupdate/main/dnstools.py b/nsupdate/main/dnstools.py index 1cd5ce4..762babc 100644 --- a/nsupdate/main/dnstools.py +++ b/nsupdate/main/dnstools.py @@ -2,6 +2,14 @@ Misc. DNS related code: query, dynamic update, etc. """ +import dns.name +import dns.resolver +import dns.query +import dns.update +import dns.tsig +import dns.tsigkeyring + + SERVER = '85.10.192.104' # ns1.thinkmo.de (master / dynamic upd server for nsupdate.info) BASEDOMAIN = 'nsupdate.info' @@ -12,8 +20,9 @@ WWW_IPV6_HOST = 'www.ipv6.' + BASEDOMAIN WWW_IPV4_IP = '178.32.221.14' WWW_IPV6_IP = '2001:41d0:8:e00e::1' -import dns.name -import dns.resolver +UPDATE_ALGO = dns.tsig.HMAC_SHA512 +UPDATE_KEY = 'YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYQ==' + def query_ns(qname, rdtype): """ @@ -32,3 +41,48 @@ def query_ns(qname, rdtype): resolver.search = [dns.name.from_text(BASEDOMAIN), ] answer = resolver.query(qname, rdtype) return str(list(answer)[0]) + + +def parse_name(fqdn, origin=None): + """ + Parse a fully qualified domain name into a relative name + and a origin zone. Please note that the origin return value will + have a trailing dot. + + :param fqdn: fully qualified domain name (str) + :param origin: origin zone (optional, str) + :return: origin, relative name (both dns.name.Name) + """ + fqdn = dns.name.from_text(fqdn) + if origin is None: + origin = dns.resolver.zone_for_name(fqdn) + rel_name = fqdn.relativize(origin) + else: + origin = dns.name.from_text(origin) + rel_name = fqdn - origin + return origin, rel_name + + +def update_ns(fqdn, rdtype='A', ipaddr=None, origin=None, action='upd', ttl=60): + """ + update our master server + + :param qname: the name to update + :param rdtype: the record type + :param action: 'add', 'del' or 'upd' + """ + assert action in ['add', 'del', 'upd', ] + origin, name = parse_name(fqdn, origin) + upd = dns.update.Update(origin, + keyring=dns.tsigkeyring.from_text({BASEDOMAIN+'.': UPDATE_KEY}), + keyalgorithm=UPDATE_ALGO) + if action == 'add': + assert ipaddr is not None + upd.add(name, ttl, rdtype, ipaddr) + elif action == 'del': + upd.delete(name, rdtype) + elif action == 'upd': + assert ipaddr is not None + upd.replace(name, ttl, rdtype, ipaddr) + response = dns.query.tcp(upd, SERVER) + return response