diff --git a/tools/python/apixml2python/linphone_testing_module.mustache b/tools/python/apixml2python/linphone_testing_module.mustache index b68f8a558..454c81fb2 100644 --- a/tools/python/apixml2python/linphone_testing_module.mustache +++ b/tools/python/apixml2python/linphone_testing_module.mustache @@ -1,5 +1,24 @@ #include "private.h" +static PyObject * pylinphone_testing_module_method_get_random_token(PyObject *self, PyObject *args) { + PyObject *pyret; + char * cresult; + int _len; + + if (!PyArg_ParseTuple(args, "i", &_len)) { + return NULL; + } + + pylinphone_trace(1, "[PYLINPHONE] >>> %s(%d)", __FUNCTION__, _len); + cresult = sal_get_random_token(_len); + pylinphone_dispatch_messages(); + + pyret = Py_BuildValue("z", cresult); + ms_free(cresult); + pylinphone_trace(-1, "[PYLINPHONE] <<< %s -> %p", __FUNCTION__, pyret); + return pyret; +} + static PyObject * pylinphone_testing_module_method_set_dns_user_hosts_file(PyObject *self, PyObject *args) { PyObject *_core; char *_path; @@ -28,6 +47,7 @@ static PyObject * pylinphone_testing_module_method_set_dns_user_hosts_file(PyObj } static PyMethodDef pylinphone_TestingModuleMethods[] = { + { "get_random_token", pylinphone_testing_module_method_get_random_token, METH_VARARGS, "Gets a random token of the specified length." }, { "set_dns_user_hosts_file", pylinphone_testing_module_method_set_dns_user_hosts_file, METH_VARARGS, "Allows to set a user specified hosts file." }, /* Sentinel */ { NULL, NULL, 0, NULL } diff --git a/tools/python/unittests/linphonetester.py b/tools/python/unittests/linphonetester.py index fcd485c79..1edbddf94 100644 --- a/tools/python/unittests/linphonetester.py +++ b/tools/python/unittests/linphonetester.py @@ -4,6 +4,7 @@ from copy import deepcopy import linphone import logging import os +import sys import time @@ -51,6 +52,114 @@ class Logger(logging.Logger): method(msg) +class Account: + def __init__(self, id_addr, unique_id): + self.created = False + self.done = False + self.auth_requested = False + self.identity = id_addr.clone() + self.password = linphone.testing.get_random_token(8) + self.modified_identity = id_addr.clone() + modified_username = "{username}_{unique_id}".format(username=id_addr.username, unique_id=unique_id) + self.modified_identity.username = modified_username + + +class AccountManager: + def __init__(self): + self.unique_id = linphone.testing.get_random_token(6) + self.accounts = [] + + @classmethod + def wait_for_until(cls, lc1, lc2, func, timeout): + lcs = [] + if lc1 is not None: + lcs.append(lc1) + if lc2 is not None: + lcs.append(lc2) + return cls.wait_for_list(lcs, func, timeout) + + @classmethod + def wait_for_list(cls, lcs, func, timeout): + start = datetime.now() + end = start + timedelta(milliseconds = timeout) + res = func(*lcs) + while not res and datetime.now() < end: + for lc in lcs: + lc.iterate() + time.sleep(0.02) + res = func(*lcs) + return res + + @classmethod + def account_created_on_server_cb(cls, lc, cfg, state, message): + if state == linphone.RegistrationOk: + lc.user_data.created = True + elif state == linphone.RegistrationCleared: + lc.user_data.done = True + + @classmethod + def account_created_auth_requested_cb(cls, lc, realm, username, domain): + lc.user_data.auth_requested = True + + def check_account(self, cfg, logger=None): + create_account = False + lc = cfg.core + identity = cfg.identity + id_addr = linphone.Address.new(identity) + account = get_account(id_addr) + if account is None: + if logger is not None: + logger.info("No account for {identity} exists, going to create one.".format(identity=identity)) + account = Account(id_addr, self.unique_id) + self.accounts.append(account) + create_account = True + cfg.identity = account.modified_identity.as_string() + if create_account: + self._create_account_on_server(account, cfg, logger) + ai = linphone.AuthInfo.new(account.modified_identity.username, None, account.password, None, None, account.modified_identity.domain) + lc.add_auth_info(ai) + return account.modified_identity + + def _create_account_on_server(self, account, refcfg, logger=None): + vtable = {} + tmp_identity = account.modified_identity.clone() + vtable['registration_state_changed'] = AccountManager.account_created_on_server_cb + vtable['auth_info_requested'] = AccountManager.account_created_auth_requested_cb + lc = CoreManager.configure_lc_from(vtable, tester_resources_path, None, account) + cfg = lc.create_proxy_config() + tmp_identity.password = account.password + tmp_identity.set_header("X-Create-Account", "yes") + cfg.identity = tmp_identity.as_string() + server_addr = linphone.Address.new(refcfg.server_addr) + server_addr.transport = linphone.TransportType.TransportTcp; + server_addr.port = 0 + cfg.server_addr = server_addr.as_string() + cfg.expires = 3600 + lc.add_proxy_config(cfg) + if AccountManager.wait_for_until(lc, None, lambda lc: lc.user_data.auth_requested == True, 10000) != True: + if logger is not None: + logger.critical("[TESTER] Account for {identity} could not be created on server.".format(identity=refcfg.identity)) + sys.exit(-1) + cfg.stop_refreshing() + cfg.edit() + cfg.identity = account.modified_identity.as_string() + cfg.done() + ai = linphone.AuthInfo.new(account.modified_identity.username, None, account.password, None, None, account.modified_identity.domain) + lc.add_auth_info(ai) + if AccountManager.wait_for_until(lc, None, lambda lc: lc.user_data.created == True, 3000) != True: + if logger is not None: + logger.critical("[TESTER] Account for {identity} is not working on server.".format(identity=refcfg.identity)) + sys.exit(-1) + lc.remove_proxy_config(cfg) + if AccountManager.wait_for_until(lc, None, lambda lc: lc.user_data.done == True, 3000) != True: + if logger is not None: + logger.critical("[TESTER] Account creation could not clean the registration context.") + sys.exit(-1) + + +account_manager = AccountManager() + + class CoreManagerStats: def __init__(self): self.reset() @@ -162,6 +271,21 @@ class CoreManagerStats: class CoreManager: + @classmethod + def configure_lc_from(vtable, resources_path, rc_path, user_data=None): + filepath = None + if rc_path is not None: + filepath = os.path.join(resources_path, rc_path) + assert_equals(os.path.isfile(filepath), True) + lc = linphone.Core.new(vtable, None, filepath) + linphone.testing.set_dns_user_hosts_file(lc, os.path.join(resources_path, 'tester_hosts')) + lc.root_ca = os.path.join(resources_path, 'certificates', 'cn', 'cafile.pem') + lc.ring = os.path.join(resources_path, 'sounds', 'oldphone.wav') + lc.ringback = os.path.join(resources_path, 'sounds', 'ringback.wav') + lc.static_picture = os.path.join(resources_path, 'images', 'nowebcamCIF.jpg') + lc.user_data = user_data + return lc + @classmethod def wait_for_until(cls, manager1, manager2, func, timeout): managers = [] @@ -468,8 +592,8 @@ class CoreManager: rc_path = None if rc_file is not None: rc_path = os.path.join('rcfiles', rc_file) - self.lc = self.configure_lc_from(vtable, tester_resources_path, rc_path) - self.lc.user_data = self + self.lc = CoreManager.configure_lc_from(vtable, tester_resources_path, rc_path, self) + self.check_accounts() if check_for_proxies and rc_file is not None: proxy_count = len(self.lc.proxy_config_list) else: @@ -486,19 +610,6 @@ class CoreManager: def stop(self): self.lc = None - def configure_lc_from(self, vtable, resources_path, rc_path): - filepath = None - if rc_path is not None: - filepath = os.path.join(resources_path, rc_path) - assert_equals(os.path.isfile(filepath), True) - lc = linphone.Core.new(vtable, None, filepath) - linphone.testing.set_dns_user_hosts_file(lc, os.path.join(resources_path, 'tester_hosts')) - lc.root_ca = os.path.join(resources_path, 'certificates', 'cn', 'cafile.pem') - lc.ring = os.path.join(resources_path, 'sounds', 'oldphone.wav') - lc.ringback = os.path.join(resources_path, 'sounds', 'ringback.wav') - lc.static_picture = os.path.join(resources_path, 'images', 'nowebcamCIF.jpg') - return lc - def enable_audio_codec(self, mime, rate): codecs = self.lc.audio_codecs for codec in codecs: @@ -510,3 +621,8 @@ class CoreManager: def disable_all_audio_codecs_except_one(self, mime): self.enable_audio_codec(mime, -1) + + def check_accounts(self): + pcl = self.lc.proxy_config_list + for cfg in pcl: + self.identity = account_manager.check_account(cfg, self.logger)