implement a dyndns2 updater + tests
This commit is contained in:
parent
886bad5fbe
commit
7566e6a5e3
0
nsupdate/utils/_tests/__init__.py
Normal file
0
nsupdate/utils/_tests/__init__.py
Normal file
57
nsupdate/utils/_tests/test_ddns_client.py
Normal file
57
nsupdate/utils/_tests/test_ddns_client.py
Normal file
@ -0,0 +1,57 @@
|
||||
"""
|
||||
Tests for ddns_client module.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from requests import Timeout, ConnectionError
|
||||
|
||||
from ..ddns_client import dyndns2_update
|
||||
|
||||
# see also conftest.py
|
||||
BASEDOMAIN = 'nsupdate.info'
|
||||
HOSTNAME = 'nsupdate-ddns-client-unittest.' + BASEDOMAIN
|
||||
INVALID_HOSTNAME = 'nsupdate-ddns-client-nohost.' + BASEDOMAIN
|
||||
USER, PASSWORD = HOSTNAME, 'yUTvxjRwNu' # no problem, is only used for this unit test
|
||||
SERVER = 'ipv4.' + BASEDOMAIN
|
||||
SECURE = False # SSL/SNI support on python 2.x sucks :(
|
||||
|
||||
|
||||
class TestDynDns2Client(object):
|
||||
def test_timeout(self):
|
||||
with pytest.raises(Timeout):
|
||||
# this assumes that the service can't respond in 1us and thus times out
|
||||
dyndns2_update('wrong', 'wrong', SERVER,
|
||||
hostname='wrong', myip='1.2.3.4', secure=SECURE,
|
||||
timeout=0.000001)
|
||||
|
||||
def test_connrefused(self):
|
||||
with pytest.raises(ConnectionError):
|
||||
# this assumes that there is no service running on 127.0.0.42
|
||||
dyndns2_update('wrong', 'wrong', '127.0.0.42',
|
||||
hostname='wrong', myip='1.2.3.4', secure=SECURE,
|
||||
timeout=2.0)
|
||||
|
||||
def test_notfqdn(self):
|
||||
status, text = dyndns2_update('wrongdomainnotfqdn', 'wrongpassword', SERVER,
|
||||
hostname=HOSTNAME, myip='1.2.3.4', secure=SECURE)
|
||||
assert status == 200
|
||||
assert text == 'notfqdn'
|
||||
|
||||
def test_badauth(self):
|
||||
status, text = dyndns2_update(USER, 'wrongpassword', SERVER,
|
||||
hostname=HOSTNAME, myip='1.2.3.4', secure=SECURE)
|
||||
assert status == 401
|
||||
|
||||
def test_nohost(self):
|
||||
status, text = dyndns2_update(USER, PASSWORD, SERVER,
|
||||
hostname=INVALID_HOSTNAME, myip='1.2.3.4', secure=SECURE)
|
||||
assert status == 200
|
||||
assert text == 'nohost'
|
||||
|
||||
def test_success(self):
|
||||
ip = '1.2.3.4'
|
||||
status, text = dyndns2_update(USER, PASSWORD, SERVER,
|
||||
hostname=HOSTNAME, myip=ip, secure=SECURE)
|
||||
assert status == 200
|
||||
assert text in ["good %s" % ip, "nochg %s" % ip]
|
40
nsupdate/utils/ddns_client.py
Normal file
40
nsupdate/utils/ddns_client.py
Normal file
@ -0,0 +1,40 @@
|
||||
"""
|
||||
dyndns2 client
|
||||
"""
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
import requests
|
||||
|
||||
TIMEOUT = 30.0 # timeout for http request response [s]
|
||||
|
||||
|
||||
def dyndns2_update(userid, password,
|
||||
server, hostname=None, myip=None,
|
||||
path='/nic/update', secure=True, timeout=TIMEOUT):
|
||||
"""
|
||||
send a dyndns2-compatible update request
|
||||
|
||||
:param userid: for http basic auth
|
||||
:param password: for http basic auth
|
||||
:param server: server to send the update to
|
||||
:param hostname: hostname we want to update
|
||||
:param myip: the new ip address for hostname
|
||||
:param path: url path (default: '/nic/update')
|
||||
:param secure: whether to use ssl for the request (default: True)
|
||||
note: if you use secure=False, it will transmit
|
||||
the given data unencrypted.
|
||||
:param timeout: how long to wait until response has to begin
|
||||
:return:
|
||||
"""
|
||||
params = {}
|
||||
if hostname is not None:
|
||||
params['hostname'] = hostname
|
||||
if myip is not None:
|
||||
params['myip'] = myip
|
||||
url = "%s://%s%s" % ('https' if secure else 'http', server, path)
|
||||
r = requests.get(url, params=params, auth=(userid, password), timeout=timeout)
|
||||
r.close()
|
||||
logger.debug("update response: %d %s" % (r.status_code, r.text, ))
|
||||
return r.status_code, r.text
|
Loading…
x
Reference in New Issue
Block a user