cleanup settings access
This commit is contained in:
parent
60353a6429
commit
a769b4d8ed
@ -4,16 +4,16 @@ Tests for dnstools module.
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from ..dnstools import (update, query_ns, parse_name, update_ns, SameIpError,
|
from ..dnstools import update, query_ns, parse_name, update_ns, SameIpError
|
||||||
BASEDOMAIN, NONEXISTING_HOST,
|
from django.conf import settings
|
||||||
WWW_HOST, WWW_IPV4_HOST, WWW_IPV4_IP, WWW_IPV6_HOST, WWW_IPV6_IP, )
|
BASEDOMAIN = settings.BASEDOMAIN
|
||||||
|
|
||||||
from dns.resolver import NXDOMAIN
|
from dns.resolver import NXDOMAIN
|
||||||
|
|
||||||
|
|
||||||
class TestIntelligentUpdater(object):
|
class TestIntelligentUpdater(object):
|
||||||
def test_double_update(self):
|
def test_double_update(self):
|
||||||
host, ip = 'test0.' + BASEDOMAIN, '1.2.3.4'
|
host, ip = 'test0.' + settings.BASEDOMAIN, '1.2.3.4'
|
||||||
# make sure the host is not there
|
# make sure the host is not there
|
||||||
try:
|
try:
|
||||||
update_ns(host, 'A', action='del')
|
update_ns(host, 'A', action='del')
|
||||||
@ -29,16 +29,16 @@ class TestIntelligentUpdater(object):
|
|||||||
|
|
||||||
class TestQuery(object):
|
class TestQuery(object):
|
||||||
def test_queries_ok(self):
|
def test_queries_ok(self):
|
||||||
assert query_ns(WWW_IPV4_HOST, 'A') == WWW_IPV4_IP # v4 ONLY
|
assert query_ns(settings.WWW_IPV4_HOST, 'A') == settings.WWW_IPV4_IP # v4 ONLY
|
||||||
assert query_ns(WWW_IPV6_HOST, 'AAAA') == WWW_IPV6_IP # v6 ONLY
|
assert query_ns(settings.WWW_IPV6_HOST, 'AAAA') == settings.WWW_IPV6_IP # v6 ONLY
|
||||||
assert query_ns(WWW_HOST, 'A') == WWW_IPV4_IP # v4 and v6, query v4
|
assert query_ns(settings.WWW_HOST, 'A') == settings.WWW_IPV4_IP # v4 and v6, query v4
|
||||||
assert query_ns(WWW_HOST, 'AAAA') == WWW_IPV6_IP # v4 and v6, query v6
|
assert query_ns(settings.WWW_HOST, 'AAAA') == settings.WWW_IPV6_IP # v4 and v6, query v6
|
||||||
|
|
||||||
def test_queries_failing(self):
|
def test_queries_failing(self):
|
||||||
with pytest.raises(NXDOMAIN):
|
with pytest.raises(NXDOMAIN):
|
||||||
query_ns(NONEXISTING_HOST, 'A')
|
query_ns(settings.NONEXISTING_HOST, 'A')
|
||||||
with pytest.raises(NXDOMAIN):
|
with pytest.raises(NXDOMAIN):
|
||||||
query_ns(NONEXISTING_HOST, 'AAAA')
|
query_ns(settings.NONEXISTING_HOST, 'AAAA')
|
||||||
|
|
||||||
|
|
||||||
class TestUpdate(object):
|
class TestUpdate(object):
|
||||||
|
@ -13,20 +13,6 @@ import dns.tsig
|
|||||||
import dns.tsigkeyring
|
import dns.tsigkeyring
|
||||||
|
|
||||||
|
|
||||||
SERVER = settings.SERVER
|
|
||||||
BASEDOMAIN = settings.BASEDOMAIN
|
|
||||||
|
|
||||||
NONEXISTING_HOST = settings.NONEXISTING_HOST
|
|
||||||
WWW_HOST = settings.WWW_HOST
|
|
||||||
WWW_IPV4_HOST = settings.WWW_IPV4_HOST
|
|
||||||
WWW_IPV6_HOST = settings.WWW_IPV6_HOST
|
|
||||||
WWW_IPV4_IP = settings.WWW_IPV4_IP
|
|
||||||
WWW_IPV6_IP = settings.WWW_IPV6_IP
|
|
||||||
|
|
||||||
UPDATE_ALGO = settings.UPDATE_ALGO
|
|
||||||
UPDATE_KEY = settings.UPDATE_KEY
|
|
||||||
|
|
||||||
|
|
||||||
class SameIpError(ValueError):
|
class SameIpError(ValueError):
|
||||||
"""
|
"""
|
||||||
raised if an IP address is already present in DNS and and update was
|
raised if an IP address is already present in DNS and and update was
|
||||||
@ -66,8 +52,8 @@ def query_ns(qname, rdtype):
|
|||||||
resolver = dns.resolver.Resolver(configure=False)
|
resolver = dns.resolver.Resolver(configure=False)
|
||||||
# we do not configure it from resolv.conf, but patch in the values we
|
# we do not configure it from resolv.conf, but patch in the values we
|
||||||
# want into the documented attributes:
|
# want into the documented attributes:
|
||||||
resolver.nameservers = [SERVER, ]
|
resolver.nameservers = [settings.SERVER, ]
|
||||||
resolver.search = [dns.name.from_text(BASEDOMAIN), ]
|
resolver.search = [dns.name.from_text(settings.BASEDOMAIN), ]
|
||||||
answer = resolver.query(qname, rdtype)
|
answer = resolver.query(qname, rdtype)
|
||||||
return str(list(answer)[0])
|
return str(list(answer)[0])
|
||||||
|
|
||||||
@ -107,8 +93,8 @@ def update_ns(fqdn, rdtype='A', ipaddr=None, origin=None, action='upd', ttl=60):
|
|||||||
assert action in ['add', 'del', 'upd', ]
|
assert action in ['add', 'del', 'upd', ]
|
||||||
origin, name = parse_name(fqdn, origin)
|
origin, name = parse_name(fqdn, origin)
|
||||||
upd = dns.update.Update(origin,
|
upd = dns.update.Update(origin,
|
||||||
keyring=dns.tsigkeyring.from_text({BASEDOMAIN+'.': UPDATE_KEY}),
|
keyring=dns.tsigkeyring.from_text({settings.BASEDOMAIN+'.': settings.UPDATE_KEY}),
|
||||||
keyalgorithm=UPDATE_ALGO)
|
keyalgorithm=settings.UPDATE_ALGO)
|
||||||
if action == 'add':
|
if action == 'add':
|
||||||
assert ipaddr is not None
|
assert ipaddr is not None
|
||||||
upd.add(name, ttl, rdtype, ipaddr)
|
upd.add(name, ttl, rdtype, ipaddr)
|
||||||
@ -117,5 +103,5 @@ def update_ns(fqdn, rdtype='A', ipaddr=None, origin=None, action='upd', ttl=60):
|
|||||||
elif action == 'upd':
|
elif action == 'upd':
|
||||||
assert ipaddr is not None
|
assert ipaddr is not None
|
||||||
upd.replace(name, ttl, rdtype, ipaddr)
|
upd.replace(name, ttl, rdtype, ipaddr)
|
||||||
response = dns.query.tcp(upd, SERVER)
|
response = dns.query.tcp(upd, settings.SERVER)
|
||||||
return response
|
return response
|
||||||
|
Loading…
x
Reference in New Issue
Block a user