diff --git a/cmake_targets/autotests/core.py b/cmake_targets/autotests/core.py new file mode 100644 index 0000000000000000000000000000000000000000..c899069f89a726d4b8cba18e688af16665a90751 --- /dev/null +++ b/cmake_targets/autotests/core.py @@ -0,0 +1,349 @@ +#****************************************************************************** + +# Eurecom OpenAirInterface +# Copyright(c) 1999 - 2013 Eurecom + +# This program is free software; you can redistribute it and/or modify it +# under the terms and conditions of the GNU General Public License, +# version 2, as published by the Free Software Foundation. + +# This program is distributed in the hope it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. + +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA. + +# The full GNU General Public License is included in this distribution in +# the file called "COPYING". + +# Contact Information +# Openair Admin: openair_admin@eurecom.fr +# Openair Tech : openair_tech@eurecom.fr +# Forums : http://forums.eurecom.fsr/openairinterface +# Address : Eurecom, Compus SophiaTech 450, route des chappes, 06451 Biot, France + +#***************************************************************************** + +# \file core.py +# \brief OAI core testing class that provides various primitives to send/recv cmd to openair class searching for patterns and process the responses and tag the test case as passed/failed/skipped +# \author Navid Nikaein +# \date 2013 +# \version 0.1 +# @ingroup _test + + +import pexpect +import time +import re +import string +import sys +import os + +import openair +import log + +class core: + def __init__(self): + self.send_cr = 1 + self.expect_echo = 0 + self.expect_response = 1 + self.flag_errors = 1 + self.log = None + + def clean(self, obj): + if type(obj) is str: + return obj + else: + return repr(obj) + + def mark(self, marker): + if self.log: + print >> self.log, "\n\n{" + marker + "}\n\n" + + def expected(self, expected, got): + return "================================= Failure ===================================\n"+\ + "_________________________________ Expected __________________________________\n"+\ + "--->" + expected + "<-\n" +\ + "_________________________________ Received __________________________________\n"+\ + "--->" + got + "<-\n" +\ + "=============================================================================\n" + + def unexpected(self, notexpected, got): + return "================================= Failure ===================================\n"+\ + "__________________________ not expect to find _______________________________\n"+\ + "---> " + self.clean(notexpected) + "\n" +\ + "_________________________________ Received___________________________________\n"+\ + "---> " + self.clean(got) + "\n" +\ + "=============================================================================\n" + + def failed(self, command, expect,debug): + time.sleep(2) + ret = "================================= Failure =================================\n" + ret +="_________________________________ Sent ____________________________________\n" + ret +="---> " + command + "\n" + ret +="______________________________Searching for _______________________________\n" + ret +="---> " + self.clean(expect) + "\n" + if debug >= 1 : + ret +="________________________________ Received _________________________________\n" + ret +="---> " + self.clean(self.oai.before) + "\n" + ret +="_______________________________ Remaining _________________________________\n" + ret +="---> " + self.clean(self.oai.after) + "\n" + ret +="===========================================================================\n" + return ret + + def err(self, command): + return "============================ Error received ================================\n"+\ + "__________________________________ Sent ____________________________________\n"+\ + '---> ' + command + "\n"+\ + "_________________________________ Error was ________________________________\n"+\ + "---> " + self.oai.before + "\n" +\ + "============================================================================\n" + + + def wait_quiet(self, timeout=0.5): + while 1: + try: + self.oai.expect(['..*'], timeout=0.5) + except pexpect.TIMEOUT, e: + return +# print '[Flushing ' + self.oai.after + ']' + + # **************************Send***************************** + # 1) send a command and return, do not wait + # ************************************************************ + def send_nowait(self, command, sudo=False): + rsp1 = self.prompt1 + rsp2 = self.prompt2 + if sudo == True: + command = 'echo \'' + self.password + '\' | sudo -S -E ' + command + + self.wait_quiet() + if self.send_cr: + log.stats['cmd'] += 1 + self.oai.sendline(command) + else: + self.oai.send(command) + + + # **************************Send***************************** + # 1) send a command + # 2) wait for a return prompt. Don't capture the response. + # 3) Check for error or timeout. + # ************************************************************ + def send(self, command,sudo=False, timeout = 50, rsp1=None, rsp2=None,debug=0): + if not rsp1: + rsp1 = self.prompt1 + if not rsp2: + rsp2 = self.prompt2 + self.wait_quiet() + if sudo == True: + command = 'echo \'' + self.password + '\' | sudo -S -E ' + command + if self.send_cr: + log.stats['cmd'] += 1 + self.oai.sendline(command) + else: + self.oai.send(command) + + if self.expect_echo: + #cmd = self.oai.expect([re.escape(command), pexpect.TIMEOUT], timeout=timeout); + cmd = self.oai.expect_exact([command, pexpect.TIMEOUT], timeout=timeout); + if cmd != 0: + raise log.err(self.failed(command, command,debug)) + + if self.expect_response: + #index = self.oai.expect([re.escape(rsp1), re.escape(rsp2),'%', pexpect.TIMEOUT], timeout=timeout) + index = self.oai.expect_exact([rsp1, rsp2, pexpect.TIMEOUT], timeout=timeout) + + if index == 0 or index == 1: + return 'OK' + elif index == 2: + #self.oai.expect([re.escape(rsp1), re.escape(rsp2), pexpect.TIMEOUT], timeout=timeout) + self.oai.expect_exact([rsp1, rsp2, pexpect.TIMEOUT], timeout=timeout) + if self.flag_errors: + raise log.err(self.err(command)) + else: + return 'OK' + else: + raise log.err(self.failed(command, rsp1 + ' or ' + rsp2,debug)) + + # **************************send_recv************************* + # 1) send a command + # 2) wait for either rsp1 or rsp2 is found (normally prompts) + # 3) return everything seen before that + # ************************************************************ + def send_recv(self, command, sudo=False, timeout=100, rsp1=None, rsp2=None,debug=0): + if not rsp1: + rsp1 = self.prompt1 + if not rsp2: + rsp2 = self.prompt2 + self.wait_quiet() + if sudo == True: + command = 'echo \'' + self.password + '\' | sudo -S -E ' + command + if self.send_cr: + log.stats['cmd'] += 1 + self.oai.sendline(command) + else: + self.oai.send(command) + #index = self.oai.expect([re.escape(rsp1), re.escape(rsp2), pexpect.TIMEOUT], timeout=timeout); + index = self.oai.expect_exact([rsp1, rsp2, pexpect.TIMEOUT], timeout=timeout); + if index == 0 or index == 1 : + return self.oai.before + else: + raise log.err(self.failed(command, rsp1 + ' or ' + rsp2,debug)) + + + # **************************send_expect************************* + # 1) send a command, and optionally specify a the time to wait + # 2) search for an expected pattern in the response + # 3) raise an error if not found + # ************************************************************** + def send_expect(self, command, expect, sudo=False, delay = 50, rsp1=None, rsp2=None,debug=0): + if debug : + print command + print expect + print delay + rsp = self.send_recv(command, sudo, delay, rsp1, rsp2) + #print rsp + if (rsp.find(expect) != -1): + return 'Ok' + + raise log.err(self.failed(command, expect,debug)) + + + # **************************send_expect_re************************* + # 1) send a command, and optionally specify a the time to wait + # 2) search for an expected pattern defined by a regular expression in the response + # 3) return a error if raise_err flag is set and re not found, otherwise return 'Ok' + # ***************************************************************** + def send_expect_re(self, command, expect, sudo=False, raise_err=1, delay = 50, rsp1=None, rsp2=None,debug=0): + rsp = self.send_recv(command, sudo, delay, rsp1, rsp2) +# print rsp + match = re.compile(expect).search(rsp) + if match: + return match + if raise_err: + raise log.err(self.failed(command, expect,debug)) + else : + return None + + # **************************send_expect************************* + # 1) send a command, and optionally specify a the time to wait + # 2) search for an expected pattern defined by a re in the response + # 3) return ok if not found + # ************************************************************** + def send_expect_false(self, command, expect, sudo=False, delay = 5, rsp1=None, rsp2=None,debug=0): + rsp = self.send_recv(command, sudo, delay, rsp1, rsp2) + # print rsp + if (rsp.find(expect) == -1): + return 'OK' + + raise log.err(self.failed(command, expect,debug)) + + + # **************************send_wait************************* + # 1) send a command, and optionally specify a the time to wait + # 2) search for an expected pattern in the response + # 3) retry for a numretries if not found + # 4) return an error if not found after the numtries + # 3) return the response if found + # ************************************************************** + def send_wait(self, command, expect, sudo=False, numretries=3, rsp1=None, rsp2=None,debug=0): + timer = 0 + for i in range(numretries): + rsp = self.send_recv(command, sudo, 10, rsp1, rsp2) + if (rsp.find(expect) != -1): + return rsp; + time.sleep(2) + timer = timer+2 + raise log.err(self.failed(command, expect,debug)) + + # **************************send_wait_re************************* + # 1) send a command, and optionally specify a the time to wait + # 2) search for an expected pattern defined by a re in the response + # 3) retry for a numretries if not found + # 4) return an error if not found after the numtries + # 3) return the response if found + # ************************************************************** + def send_wait_re(self, command, expect, sudo=False, numretries=3, rsp1=None, rsp2=None,debug=0): + timer = 0 + for i in range(numretries): + rsp = self.send_recv(command,sudo) + if re.compile(expect).search(rsp): +# print "Found in",i,"attempts" + return rsp; + time.sleep(2) + timer = timer+2 + raise log.err(self.failed(command, expect,debug)) + + # **************************send_wait_false************************* + # 1) send a command, and optionally specify a the time to wait + # 2) search for an expect pattern in the response + # 3) return the response if not found + # 4) return an error if the pattern found after the numtries + # ************************************************************** + def send_wait_false(self, command, expect, sudo=False, numretries=3, rsp1=None, rsp2=None,debug=0): + timer = 1 + for i in range(numretries): + rsp = self.send_recv(command,sudo) + if (rsp.find(expect) == -1): + return rsp; + time.sleep(2) + timer = timer+2 + raise log.err(self.failed(command, expect,debug)) + + # **************************send_wait_false************************* + # 1) send a command, and optionally specify a the time to wait + # 2) search for an expect pattern defined by a re in the response + # 3) return the response if not found + # 4) return an error if the pattern found after the numtries + # ************************************************************** + def send_wait_false_re(self, command, expect, sudo=False, numretries=3, rsp1=None, rsp2=None,debug=0): + timer = 0 + for i in range(numretries): + rsp = self.send_recv(command,sudo) + if not re.compile(expect).search(rsp): + return rsp; + time.sleep(2) + timer = timer+2 + raise log.err(self.failed(command, expect,debug)) + + # **************************find************************* + # 1) find an exact pattern in a given string + # 2) raise an error if not found + # ************************************************************** + def find(self, string, pattern): + word = string.replace(pattern,'*','\*') + words = string.replace(word,' ','\s*') + if re.search(words,string): + pass + else: + raise log.err(string) + + + # **************************find_false************************** + # 1) find an exact pattern in a given string + # 2) raise an error if found + # ************************************************************** + def find_false(self, string, pattern): + if string.find(pattern) != -1: + raise log.err(string) + + # **************************find_re************************* + # 1) find an exact re pattern in a given string + # 2) raise an error if not found + # ************************************************************** + def find_re(self, string, pattern): + if not re.compile(pattern).search(string): + raise log.err(string) + + # **************************find_false_re************************* + # 1) find an exact re pattern in a given string + # 2) raise an error if found + # ************************************************************** + def find_false_re(self, string, pattern): + if re.compile(pattern).search(string): + raise log.err(string) + diff --git a/cmake_targets/autotests/log.py b/cmake_targets/autotests/log.py new file mode 100644 index 0000000000000000000000000000000000000000..0b4baff3e98ba672730e11725d5c943bad1fb1d2 --- /dev/null +++ b/cmake_targets/autotests/log.py @@ -0,0 +1,206 @@ +#****************************************************************************** + +# Eurecom OpenAirInterface +# Copyright(c) 1999 - 2013 Eurecom + +# This program is free software; you can redistribute it and/or modify it +# under the terms and conditions of the GNU General Public License, +# version 2, as published by the Free Software Foundation. + +# This program is distributed in the hope it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. + +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA. + +# The full GNU General Public License is included in this distribution in +# the file called "COPYING". + +# Contact Information +# Openair Admin: openair_admin@eurecom.fr +# Openair Tech : openair_tech@eurecom.fr +# Forums : http://forums.eurecom.fsr/openairinterface +# Address : Eurecom, Compus SophiaTech 450, route des chappes, 06451 Biot, France + +#***************************************************************************** + +# \file log.py +# \brief provides primitives and defines how the logs and statistics are generated +# \author Navid Nikaein +# \date 2013 +# \version 0.1 +# @ingroup _test + +import sys +import re +import time +import datetime +import array +import xml.etree.ElementTree as ET + + +debug = False +docfile = '' +start_time = time.time() +testcase_starttime = start_time +debug = 0 +stats = {'passed':0, 'failed':0, 'skipped':0, 'internal_errors':0, 'cmd':0} + +# xml result (jUnit like) +xUnitTestsuites = ET.Element( 'testsuites' ) +xUnitTestsuite = ET.SubElement( xUnitTestsuites, 'testsuite' ) +xUnitTestsuite.set( 'name', 'OAI' ) +xUnitTestsuite.set( 'timestamp', datetime.datetime.fromtimestamp(start_time).strftime('%Y-%m-%dT%H:%M:%S') ) +xUnitTestsuite.set( 'hostname', 'localhost' ) +#xUnitSystemOut = ET.SubElement( xUnitTestsuite, 'system-out' ) + +class bcolors: + header = '\033[95m' + okblue = '\033[94m' + okgreen = '\033[92m' + warning = '\033[93m' + fail = '\033[91m' + normal = '\033[0m' + + def __init__(self): + if not sys.stdout.isatty(): + self.disable() + + def disable(self): + self.header = '' + self.okblue = '' + self.okgreen = '' + self.warning = '' + self.fail = '' + self.normal = '' + +class err(Exception): + def __init__(self, value): + self.value = value + def __str__(self): + return repr(self.value) + +def writefile(logfile, message): + F_testlog = open(logfile, 'a') + F_testlog.write(message + '\n') + F_testlog.close() + + +def sleep(seconds): + time.sleep(seconds) + +def start(): + """Start the timer for the following testcase.""" + global testcase_starttime + testcase_starttime = time.time() + +def set_debug_level(level): + debug = level + +def statistics(logfile): + global start_time + + #if stats['passed'] == 0: + # print "no test executed...exiting" + # sys.exit() + + total_tests = stats['passed'] + stats['failed'] + stats['skipped'] + total_ex_tests = stats['passed'] + stats['failed'] + elapsed_time = time.gmtime(time.time() - start_time) + print '\n' + log_record('info', '===============================================') + log_record('info', 'Total tests performed ' + repr(total_tests)) + log_record('info', 'Tests passed ' + repr(stats['passed'])) + log_record('info', 'Tests failed ' + repr(stats['failed'])) + log_record('info', 'Tests skipped ' + repr(stats['skipped'])) + log_record('info', '') + log_record('info', 'Total commands sent ' + repr(stats['cmd'])) + log_record('info', 'Total elapsed time (h:m:s) ' + time.strftime('%H:%M:%S', elapsed_time)) + log_record('info', '===============================================') + log_record('info', 'Testing pass rate ' + repr((stats['passed'] * 100) / total_tests) + '%') + log_record('info', '===============================================') + + writefile(logfile, '\n=====================Results===================') + writefile(logfile, 'Total tests performed ' + repr(total_tests)) + writefile(logfile, 'Tests passed ' + repr(stats['passed'])) + writefile(logfile, 'Tests failed ' + repr(stats['failed'])) + writefile(logfile, 'Tests skipped ' + repr(stats['skipped'])) + writefile(logfile, '') + writefile(logfile, 'Total commands sent ' + repr(stats['cmd'])) + writefile(logfile, 'Total elapsed time (h:m:s) ' + time.strftime('%H:%M:%S', elapsed_time)) + writefile(logfile, '===============================================') + writefile(logfile, 'Testing pass rate ' + repr((stats['passed'] * 100) / total_tests) + '%') + writefile(logfile, '===============================================\n') + + xUnitTestsuite.set( 'tests', repr(total_tests) ) + xUnitTestsuite.set( 'failures', repr(stats['failed']) ) + xUnitTestsuite.set( 'skipped', repr(stats['skipped']) ) + xUnitTestsuite.set( 'errors', '0' ) + time_delta = datetime.datetime.now() - datetime.datetime.fromtimestamp(start_time) + xUnitTestsuite.set( 'time', repr(time_delta.total_seconds()) ) + writefile( logfile + '.xml', ET.tostring( xUnitTestsuites, encoding="utf-8", method="xml" ) ) + +def log_record(level, message): + ts = time.strftime('%d %b %Y %H:%M') + message = ts + ' [' + level + '] ' + message + if level == 'passed' : + print bcolors.okgreen + message + bcolors.normal + elif level == 'failed' : + print bcolors.fail + message + bcolors.normal + elif level == 'skipped' : + print bcolors.warning + message + bcolors.normal + else : + print message + +def fail(case, testnum, testname, conf, message, diag, output,trace): +# report(case, testnum, testname, conf, 'failed', output, diag, message) + report(case, testnum, testname, conf, 'failed', output, diag) + log_record('failed', case + testnum + ' : ' + testname + ' ('+ conf+')') + if message : + log_record('failed', "Output follows:\n" + message ) + if trace : + log_record('failed', "trace file can be found in " + trace + "\n" ) + stats['failed'] += 1 + +def failquiet(case, testnum, testname, conf): + log_record('failed', case + testnum + ' :' + testname + ' ('+ conf+')') + stats['failed'] += 1 + +def ok(case, testnum, testname, conf, message, output): + report(case, testnum, testname, conf, 'passed', output) + log_record('passed', case + testnum + ' : ' + testname + ' ('+ conf+')') + if message : + print bcolors.okgreen + message + bcolors.normal + stats['passed'] += 1 + + +def skip(case, testnum, testname, conf, message=None, diag=None, output=None): + log_record('skipped', case + testnum + ' :' + testname + ' ('+ conf+')') + report(case, testnum, testname, conf, 'skipped', output, diag) + if message : + log_record('skipped', "Output follows:\n" + message ) + if diag : + log_record('skipped', "Diagnostic: \n" + diag ) + stats['skipped'] += 1 + + +def report(case, test, name, conf, status, output, diag=None, desc=None): + writefile (output, '[' +status+ '] ' + case + test + ' : ' + name + ' ('+ conf+')') + if diag : + writefile (output, '-------> ' + diag) + if desc: + writefile(output, desc) + #log_record('report', + case + test + ' documented') + e = ET.SubElement( xUnitTestsuite, 'testcase' ) + e.set( 'name', case + '_' + test + '_' + name ) + e.set( 'classname', 'shellscript' ) + e.set( 'time', repr( time.time() - testcase_starttime ) ) + if status == 'failed': + e = ET.SubElement( e, 'failure' ) + e.set( 'message', 'failed' ) + e.text = diag + if status == 'skipped': + e = ET.SubElement( e, 'skipped' )