Handle testing accounts creation in Python.

This commit is contained in:
Ghislain MARY 2014-12-08 15:49:43 +01:00
parent 180de834ee
commit 0e924f4b4e
2 changed files with 151 additions and 15 deletions

View file

@ -1,5 +1,24 @@
#include "private.h"
static PyObject * pylinphone_testing_module_method_get_random_token(PyObject *self, PyObject *args) {
PyObject *pyret;
char * cresult;
int _len;
if (!PyArg_ParseTuple(args, "i", &_len)) {
return NULL;
}
pylinphone_trace(1, "[PYLINPHONE] >>> %s(%d)", __FUNCTION__, _len);
cresult = sal_get_random_token(_len);
pylinphone_dispatch_messages();
pyret = Py_BuildValue("z", cresult);
ms_free(cresult);
pylinphone_trace(-1, "[PYLINPHONE] <<< %s -> %p", __FUNCTION__, pyret);
return pyret;
}
static PyObject * pylinphone_testing_module_method_set_dns_user_hosts_file(PyObject *self, PyObject *args) {
PyObject *_core;
char *_path;
@ -28,6 +47,7 @@ static PyObject * pylinphone_testing_module_method_set_dns_user_hosts_file(PyObj
}
static PyMethodDef pylinphone_TestingModuleMethods[] = {
{ "get_random_token", pylinphone_testing_module_method_get_random_token, METH_VARARGS, "Gets a random token of the specified length." },
{ "set_dns_user_hosts_file", pylinphone_testing_module_method_set_dns_user_hosts_file, METH_VARARGS, "Allows to set a user specified hosts file." },
/* Sentinel */
{ NULL, NULL, 0, NULL }

View file

@ -4,6 +4,7 @@ from copy import deepcopy
import linphone
import logging
import os
import sys
import time
@ -51,6 +52,114 @@ class Logger(logging.Logger):
method(msg)
class Account:
def __init__(self, id_addr, unique_id):
self.created = False
self.done = False
self.auth_requested = False
self.identity = id_addr.clone()
self.password = linphone.testing.get_random_token(8)
self.modified_identity = id_addr.clone()
modified_username = "{username}_{unique_id}".format(username=id_addr.username, unique_id=unique_id)
self.modified_identity.username = modified_username
class AccountManager:
def __init__(self):
self.unique_id = linphone.testing.get_random_token(6)
self.accounts = []
@classmethod
def wait_for_until(cls, lc1, lc2, func, timeout):
lcs = []
if lc1 is not None:
lcs.append(lc1)
if lc2 is not None:
lcs.append(lc2)
return cls.wait_for_list(lcs, func, timeout)
@classmethod
def wait_for_list(cls, lcs, func, timeout):
start = datetime.now()
end = start + timedelta(milliseconds = timeout)
res = func(*lcs)
while not res and datetime.now() < end:
for lc in lcs:
lc.iterate()
time.sleep(0.02)
res = func(*lcs)
return res
@classmethod
def account_created_on_server_cb(cls, lc, cfg, state, message):
if state == linphone.RegistrationOk:
lc.user_data.created = True
elif state == linphone.RegistrationCleared:
lc.user_data.done = True
@classmethod
def account_created_auth_requested_cb(cls, lc, realm, username, domain):
lc.user_data.auth_requested = True
def check_account(self, cfg, logger=None):
create_account = False
lc = cfg.core
identity = cfg.identity
id_addr = linphone.Address.new(identity)
account = get_account(id_addr)
if account is None:
if logger is not None:
logger.info("No account for {identity} exists, going to create one.".format(identity=identity))
account = Account(id_addr, self.unique_id)
self.accounts.append(account)
create_account = True
cfg.identity = account.modified_identity.as_string()
if create_account:
self._create_account_on_server(account, cfg, logger)
ai = linphone.AuthInfo.new(account.modified_identity.username, None, account.password, None, None, account.modified_identity.domain)
lc.add_auth_info(ai)
return account.modified_identity
def _create_account_on_server(self, account, refcfg, logger=None):
vtable = {}
tmp_identity = account.modified_identity.clone()
vtable['registration_state_changed'] = AccountManager.account_created_on_server_cb
vtable['auth_info_requested'] = AccountManager.account_created_auth_requested_cb
lc = CoreManager.configure_lc_from(vtable, tester_resources_path, None, account)
cfg = lc.create_proxy_config()
tmp_identity.password = account.password
tmp_identity.set_header("X-Create-Account", "yes")
cfg.identity = tmp_identity.as_string()
server_addr = linphone.Address.new(refcfg.server_addr)
server_addr.transport = linphone.TransportType.TransportTcp;
server_addr.port = 0
cfg.server_addr = server_addr.as_string()
cfg.expires = 3600
lc.add_proxy_config(cfg)
if AccountManager.wait_for_until(lc, None, lambda lc: lc.user_data.auth_requested == True, 10000) != True:
if logger is not None:
logger.critical("[TESTER] Account for {identity} could not be created on server.".format(identity=refcfg.identity))
sys.exit(-1)
cfg.stop_refreshing()
cfg.edit()
cfg.identity = account.modified_identity.as_string()
cfg.done()
ai = linphone.AuthInfo.new(account.modified_identity.username, None, account.password, None, None, account.modified_identity.domain)
lc.add_auth_info(ai)
if AccountManager.wait_for_until(lc, None, lambda lc: lc.user_data.created == True, 3000) != True:
if logger is not None:
logger.critical("[TESTER] Account for {identity} is not working on server.".format(identity=refcfg.identity))
sys.exit(-1)
lc.remove_proxy_config(cfg)
if AccountManager.wait_for_until(lc, None, lambda lc: lc.user_data.done == True, 3000) != True:
if logger is not None:
logger.critical("[TESTER] Account creation could not clean the registration context.")
sys.exit(-1)
account_manager = AccountManager()
class CoreManagerStats:
def __init__(self):
self.reset()
@ -162,6 +271,21 @@ class CoreManagerStats:
class CoreManager:
@classmethod
def configure_lc_from(vtable, resources_path, rc_path, user_data=None):
filepath = None
if rc_path is not None:
filepath = os.path.join(resources_path, rc_path)
assert_equals(os.path.isfile(filepath), True)
lc = linphone.Core.new(vtable, None, filepath)
linphone.testing.set_dns_user_hosts_file(lc, os.path.join(resources_path, 'tester_hosts'))
lc.root_ca = os.path.join(resources_path, 'certificates', 'cn', 'cafile.pem')
lc.ring = os.path.join(resources_path, 'sounds', 'oldphone.wav')
lc.ringback = os.path.join(resources_path, 'sounds', 'ringback.wav')
lc.static_picture = os.path.join(resources_path, 'images', 'nowebcamCIF.jpg')
lc.user_data = user_data
return lc
@classmethod
def wait_for_until(cls, manager1, manager2, func, timeout):
managers = []
@ -468,8 +592,8 @@ class CoreManager:
rc_path = None
if rc_file is not None:
rc_path = os.path.join('rcfiles', rc_file)
self.lc = self.configure_lc_from(vtable, tester_resources_path, rc_path)
self.lc.user_data = self
self.lc = CoreManager.configure_lc_from(vtable, tester_resources_path, rc_path, self)
self.check_accounts()
if check_for_proxies and rc_file is not None:
proxy_count = len(self.lc.proxy_config_list)
else:
@ -486,19 +610,6 @@ class CoreManager:
def stop(self):
self.lc = None
def configure_lc_from(self, vtable, resources_path, rc_path):
filepath = None
if rc_path is not None:
filepath = os.path.join(resources_path, rc_path)
assert_equals(os.path.isfile(filepath), True)
lc = linphone.Core.new(vtable, None, filepath)
linphone.testing.set_dns_user_hosts_file(lc, os.path.join(resources_path, 'tester_hosts'))
lc.root_ca = os.path.join(resources_path, 'certificates', 'cn', 'cafile.pem')
lc.ring = os.path.join(resources_path, 'sounds', 'oldphone.wav')
lc.ringback = os.path.join(resources_path, 'sounds', 'ringback.wav')
lc.static_picture = os.path.join(resources_path, 'images', 'nowebcamCIF.jpg')
return lc
def enable_audio_codec(self, mime, rate):
codecs = self.lc.audio_codecs
for codec in codecs:
@ -510,3 +621,8 @@ class CoreManager:
def disable_all_audio_codecs_except_one(self, mime):
self.enable_audio_codec(mime, -1)
def check_accounts(self):
pcl = self.lc.proxy_config_list
for cfg in pcl:
self.identity = account_manager.check_account(cfg, self.logger)