Skip to content
Snippets Groups Projects
Commit e070050d authored by Rohit Gupta's avatar Rohit Gupta
Browse files

added files for autotest framework

parent 03595b40
No related branches found
No related tags found
No related merge requests found
#******************************************************************************
# Eurecom OpenAirInterface
# Copyright(c) 1999 - 2013 Eurecom
# This program is free software; you can redistribute it and/or modify it
# under the terms and conditions of the GNU General Public License,
# version 2, as published by the Free Software Foundation.
# This program is distributed in the hope it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
# The full GNU General Public License is included in this distribution in
# the file called "COPYING".
# Contact Information
# Openair Admin: openair_admin@eurecom.fr
# Openair Tech : openair_tech@eurecom.fr
# Forums : http://forums.eurecom.fsr/openairinterface
# Address : Eurecom, Compus SophiaTech 450, route des chappes, 06451 Biot, France
#*****************************************************************************
# \file core.py
# \brief OAI core testing class that provides various primitives to send/recv cmd to openair class searching for patterns and process the responses and tag the test case as passed/failed/skipped
# \author Navid Nikaein
# \date 2013
# \version 0.1
# @ingroup _test
import pexpect
import time
import re
import string
import sys
import os
import openair
import log
class core:
def __init__(self):
self.send_cr = 1
self.expect_echo = 0
self.expect_response = 1
self.flag_errors = 1
self.log = None
def clean(self, obj):
if type(obj) is str:
return obj
else:
return repr(obj)
def mark(self, marker):
if self.log:
print >> self.log, "\n\n{" + marker + "}\n\n"
def expected(self, expected, got):
return "================================= Failure ===================================\n"+\
"_________________________________ Expected __________________________________\n"+\
"--->" + expected + "<-\n" +\
"_________________________________ Received __________________________________\n"+\
"--->" + got + "<-\n" +\
"=============================================================================\n"
def unexpected(self, notexpected, got):
return "================================= Failure ===================================\n"+\
"__________________________ not expect to find _______________________________\n"+\
"---> " + self.clean(notexpected) + "\n" +\
"_________________________________ Received___________________________________\n"+\
"---> " + self.clean(got) + "\n" +\
"=============================================================================\n"
def failed(self, command, expect,debug):
time.sleep(2)
ret = "================================= Failure =================================\n"
ret +="_________________________________ Sent ____________________________________\n"
ret +="---> " + command + "\n"
ret +="______________________________Searching for _______________________________\n"
ret +="---> " + self.clean(expect) + "\n"
if debug >= 1 :
ret +="________________________________ Received _________________________________\n"
ret +="---> " + self.clean(self.oai.before) + "\n"
ret +="_______________________________ Remaining _________________________________\n"
ret +="---> " + self.clean(self.oai.after) + "\n"
ret +="===========================================================================\n"
return ret
def err(self, command):
return "============================ Error received ================================\n"+\
"__________________________________ Sent ____________________________________\n"+\
'---> ' + command + "\n"+\
"_________________________________ Error was ________________________________\n"+\
"---> " + self.oai.before + "\n" +\
"============================================================================\n"
def wait_quiet(self, timeout=0.5):
while 1:
try:
self.oai.expect(['..*'], timeout=0.5)
except pexpect.TIMEOUT, e:
return
# print '[Flushing ' + self.oai.after + ']'
# **************************Send*****************************
# 1) send a command and return, do not wait
# ************************************************************
def send_nowait(self, command, sudo=False):
rsp1 = self.prompt1
rsp2 = self.prompt2
if sudo == True:
command = 'echo \'' + self.password + '\' | sudo -S -E ' + command
self.wait_quiet()
if self.send_cr:
log.stats['cmd'] += 1
self.oai.sendline(command)
else:
self.oai.send(command)
# **************************Send*****************************
# 1) send a command
# 2) wait for a return prompt. Don't capture the response.
# 3) Check for error or timeout.
# ************************************************************
def send(self, command,sudo=False, timeout = 50, rsp1=None, rsp2=None,debug=0):
if not rsp1:
rsp1 = self.prompt1
if not rsp2:
rsp2 = self.prompt2
self.wait_quiet()
if sudo == True:
command = 'echo \'' + self.password + '\' | sudo -S -E ' + command
if self.send_cr:
log.stats['cmd'] += 1
self.oai.sendline(command)
else:
self.oai.send(command)
if self.expect_echo:
#cmd = self.oai.expect([re.escape(command), pexpect.TIMEOUT], timeout=timeout);
cmd = self.oai.expect_exact([command, pexpect.TIMEOUT], timeout=timeout);
if cmd != 0:
raise log.err(self.failed(command, command,debug))
if self.expect_response:
#index = self.oai.expect([re.escape(rsp1), re.escape(rsp2),'%', pexpect.TIMEOUT], timeout=timeout)
index = self.oai.expect_exact([rsp1, rsp2, pexpect.TIMEOUT], timeout=timeout)
if index == 0 or index == 1:
return 'OK'
elif index == 2:
#self.oai.expect([re.escape(rsp1), re.escape(rsp2), pexpect.TIMEOUT], timeout=timeout)
self.oai.expect_exact([rsp1, rsp2, pexpect.TIMEOUT], timeout=timeout)
if self.flag_errors:
raise log.err(self.err(command))
else:
return 'OK'
else:
raise log.err(self.failed(command, rsp1 + ' or ' + rsp2,debug))
# **************************send_recv*************************
# 1) send a command
# 2) wait for either rsp1 or rsp2 is found (normally prompts)
# 3) return everything seen before that
# ************************************************************
def send_recv(self, command, sudo=False, timeout=100, rsp1=None, rsp2=None,debug=0):
if not rsp1:
rsp1 = self.prompt1
if not rsp2:
rsp2 = self.prompt2
self.wait_quiet()
if sudo == True:
command = 'echo \'' + self.password + '\' | sudo -S -E ' + command
if self.send_cr:
log.stats['cmd'] += 1
self.oai.sendline(command)
else:
self.oai.send(command)
#index = self.oai.expect([re.escape(rsp1), re.escape(rsp2), pexpect.TIMEOUT], timeout=timeout);
index = self.oai.expect_exact([rsp1, rsp2, pexpect.TIMEOUT], timeout=timeout);
if index == 0 or index == 1 :
return self.oai.before
else:
raise log.err(self.failed(command, rsp1 + ' or ' + rsp2,debug))
# **************************send_expect*************************
# 1) send a command, and optionally specify a the time to wait
# 2) search for an expected pattern in the response
# 3) raise an error if not found
# **************************************************************
def send_expect(self, command, expect, sudo=False, delay = 50, rsp1=None, rsp2=None,debug=0):
if debug :
print command
print expect
print delay
rsp = self.send_recv(command, sudo, delay, rsp1, rsp2)
#print rsp
if (rsp.find(expect) != -1):
return 'Ok'
raise log.err(self.failed(command, expect,debug))
# **************************send_expect_re*************************
# 1) send a command, and optionally specify a the time to wait
# 2) search for an expected pattern defined by a regular expression in the response
# 3) return a error if raise_err flag is set and re not found, otherwise return 'Ok'
# *****************************************************************
def send_expect_re(self, command, expect, sudo=False, raise_err=1, delay = 50, rsp1=None, rsp2=None,debug=0):
rsp = self.send_recv(command, sudo, delay, rsp1, rsp2)
# print rsp
match = re.compile(expect).search(rsp)
if match:
return match
if raise_err:
raise log.err(self.failed(command, expect,debug))
else :
return None
# **************************send_expect*************************
# 1) send a command, and optionally specify a the time to wait
# 2) search for an expected pattern defined by a re in the response
# 3) return ok if not found
# **************************************************************
def send_expect_false(self, command, expect, sudo=False, delay = 5, rsp1=None, rsp2=None,debug=0):
rsp = self.send_recv(command, sudo, delay, rsp1, rsp2)
# print rsp
if (rsp.find(expect) == -1):
return 'OK'
raise log.err(self.failed(command, expect,debug))
# **************************send_wait*************************
# 1) send a command, and optionally specify a the time to wait
# 2) search for an expected pattern in the response
# 3) retry for a numretries if not found
# 4) return an error if not found after the numtries
# 3) return the response if found
# **************************************************************
def send_wait(self, command, expect, sudo=False, numretries=3, rsp1=None, rsp2=None,debug=0):
timer = 0
for i in range(numretries):
rsp = self.send_recv(command, sudo, 10, rsp1, rsp2)
if (rsp.find(expect) != -1):
return rsp;
time.sleep(2)
timer = timer+2
raise log.err(self.failed(command, expect,debug))
# **************************send_wait_re*************************
# 1) send a command, and optionally specify a the time to wait
# 2) search for an expected pattern defined by a re in the response
# 3) retry for a numretries if not found
# 4) return an error if not found after the numtries
# 3) return the response if found
# **************************************************************
def send_wait_re(self, command, expect, sudo=False, numretries=3, rsp1=None, rsp2=None,debug=0):
timer = 0
for i in range(numretries):
rsp = self.send_recv(command,sudo)
if re.compile(expect).search(rsp):
# print "Found in",i,"attempts"
return rsp;
time.sleep(2)
timer = timer+2
raise log.err(self.failed(command, expect,debug))
# **************************send_wait_false*************************
# 1) send a command, and optionally specify a the time to wait
# 2) search for an expect pattern in the response
# 3) return the response if not found
# 4) return an error if the pattern found after the numtries
# **************************************************************
def send_wait_false(self, command, expect, sudo=False, numretries=3, rsp1=None, rsp2=None,debug=0):
timer = 1
for i in range(numretries):
rsp = self.send_recv(command,sudo)
if (rsp.find(expect) == -1):
return rsp;
time.sleep(2)
timer = timer+2
raise log.err(self.failed(command, expect,debug))
# **************************send_wait_false*************************
# 1) send a command, and optionally specify a the time to wait
# 2) search for an expect pattern defined by a re in the response
# 3) return the response if not found
# 4) return an error if the pattern found after the numtries
# **************************************************************
def send_wait_false_re(self, command, expect, sudo=False, numretries=3, rsp1=None, rsp2=None,debug=0):
timer = 0
for i in range(numretries):
rsp = self.send_recv(command,sudo)
if not re.compile(expect).search(rsp):
return rsp;
time.sleep(2)
timer = timer+2
raise log.err(self.failed(command, expect,debug))
# **************************find*************************
# 1) find an exact pattern in a given string
# 2) raise an error if not found
# **************************************************************
def find(self, string, pattern):
word = string.replace(pattern,'*','\*')
words = string.replace(word,' ','\s*')
if re.search(words,string):
pass
else:
raise log.err(string)
# **************************find_false**************************
# 1) find an exact pattern in a given string
# 2) raise an error if found
# **************************************************************
def find_false(self, string, pattern):
if string.find(pattern) != -1:
raise log.err(string)
# **************************find_re*************************
# 1) find an exact re pattern in a given string
# 2) raise an error if not found
# **************************************************************
def find_re(self, string, pattern):
if not re.compile(pattern).search(string):
raise log.err(string)
# **************************find_false_re*************************
# 1) find an exact re pattern in a given string
# 2) raise an error if found
# **************************************************************
def find_false_re(self, string, pattern):
if re.compile(pattern).search(string):
raise log.err(string)
#******************************************************************************
# Eurecom OpenAirInterface
# Copyright(c) 1999 - 2013 Eurecom
# This program is free software; you can redistribute it and/or modify it
# under the terms and conditions of the GNU General Public License,
# version 2, as published by the Free Software Foundation.
# This program is distributed in the hope it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
# The full GNU General Public License is included in this distribution in
# the file called "COPYING".
# Contact Information
# Openair Admin: openair_admin@eurecom.fr
# Openair Tech : openair_tech@eurecom.fr
# Forums : http://forums.eurecom.fsr/openairinterface
# Address : Eurecom, Compus SophiaTech 450, route des chappes, 06451 Biot, France
#*****************************************************************************
# \file log.py
# \brief provides primitives and defines how the logs and statistics are generated
# \author Navid Nikaein
# \date 2013
# \version 0.1
# @ingroup _test
import sys
import re
import time
import datetime
import array
import xml.etree.ElementTree as ET
debug = False
docfile = ''
start_time = time.time()
testcase_starttime = start_time
debug = 0
stats = {'passed':0, 'failed':0, 'skipped':0, 'internal_errors':0, 'cmd':0}
# xml result (jUnit like)
xUnitTestsuites = ET.Element( 'testsuites' )
xUnitTestsuite = ET.SubElement( xUnitTestsuites, 'testsuite' )
xUnitTestsuite.set( 'name', 'OAI' )
xUnitTestsuite.set( 'timestamp', datetime.datetime.fromtimestamp(start_time).strftime('%Y-%m-%dT%H:%M:%S') )
xUnitTestsuite.set( 'hostname', 'localhost' )
#xUnitSystemOut = ET.SubElement( xUnitTestsuite, 'system-out' )
class bcolors:
header = '\033[95m'
okblue = '\033[94m'
okgreen = '\033[92m'
warning = '\033[93m'
fail = '\033[91m'
normal = '\033[0m'
def __init__(self):
if not sys.stdout.isatty():
self.disable()
def disable(self):
self.header = ''
self.okblue = ''
self.okgreen = ''
self.warning = ''
self.fail = ''
self.normal = ''
class err(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
def writefile(logfile, message):
F_testlog = open(logfile, 'a')
F_testlog.write(message + '\n')
F_testlog.close()
def sleep(seconds):
time.sleep(seconds)
def start():
"""Start the timer for the following testcase."""
global testcase_starttime
testcase_starttime = time.time()
def set_debug_level(level):
debug = level
def statistics(logfile):
global start_time
#if stats['passed'] == 0:
# print "no test executed...exiting"
# sys.exit()
total_tests = stats['passed'] + stats['failed'] + stats['skipped']
total_ex_tests = stats['passed'] + stats['failed']
elapsed_time = time.gmtime(time.time() - start_time)
print '\n'
log_record('info', '===============================================')
log_record('info', 'Total tests performed ' + repr(total_tests))
log_record('info', 'Tests passed ' + repr(stats['passed']))
log_record('info', 'Tests failed ' + repr(stats['failed']))
log_record('info', 'Tests skipped ' + repr(stats['skipped']))
log_record('info', '')
log_record('info', 'Total commands sent ' + repr(stats['cmd']))
log_record('info', 'Total elapsed time (h:m:s) ' + time.strftime('%H:%M:%S', elapsed_time))
log_record('info', '===============================================')
log_record('info', 'Testing pass rate ' + repr((stats['passed'] * 100) / total_tests) + '%')
log_record('info', '===============================================')
writefile(logfile, '\n=====================Results===================')
writefile(logfile, 'Total tests performed ' + repr(total_tests))
writefile(logfile, 'Tests passed ' + repr(stats['passed']))
writefile(logfile, 'Tests failed ' + repr(stats['failed']))
writefile(logfile, 'Tests skipped ' + repr(stats['skipped']))
writefile(logfile, '')
writefile(logfile, 'Total commands sent ' + repr(stats['cmd']))
writefile(logfile, 'Total elapsed time (h:m:s) ' + time.strftime('%H:%M:%S', elapsed_time))
writefile(logfile, '===============================================')
writefile(logfile, 'Testing pass rate ' + repr((stats['passed'] * 100) / total_tests) + '%')
writefile(logfile, '===============================================\n')
xUnitTestsuite.set( 'tests', repr(total_tests) )
xUnitTestsuite.set( 'failures', repr(stats['failed']) )
xUnitTestsuite.set( 'skipped', repr(stats['skipped']) )
xUnitTestsuite.set( 'errors', '0' )
time_delta = datetime.datetime.now() - datetime.datetime.fromtimestamp(start_time)
xUnitTestsuite.set( 'time', repr(time_delta.total_seconds()) )
writefile( logfile + '.xml', ET.tostring( xUnitTestsuites, encoding="utf-8", method="xml" ) )
def log_record(level, message):
ts = time.strftime('%d %b %Y %H:%M')
message = ts + ' [' + level + '] ' + message
if level == 'passed' :
print bcolors.okgreen + message + bcolors.normal
elif level == 'failed' :
print bcolors.fail + message + bcolors.normal
elif level == 'skipped' :
print bcolors.warning + message + bcolors.normal
else :
print message
def fail(case, testnum, testname, conf, message, diag, output,trace):
# report(case, testnum, testname, conf, 'failed', output, diag, message)
report(case, testnum, testname, conf, 'failed', output, diag)
log_record('failed', case + testnum + ' : ' + testname + ' ('+ conf+')')
if message :
log_record('failed', "Output follows:\n" + message )
if trace :
log_record('failed', "trace file can be found in " + trace + "\n" )
stats['failed'] += 1
def failquiet(case, testnum, testname, conf):
log_record('failed', case + testnum + ' :' + testname + ' ('+ conf+')')
stats['failed'] += 1
def ok(case, testnum, testname, conf, message, output):
report(case, testnum, testname, conf, 'passed', output)
log_record('passed', case + testnum + ' : ' + testname + ' ('+ conf+')')
if message :
print bcolors.okgreen + message + bcolors.normal
stats['passed'] += 1
def skip(case, testnum, testname, conf, message=None, diag=None, output=None):
log_record('skipped', case + testnum + ' :' + testname + ' ('+ conf+')')
report(case, testnum, testname, conf, 'skipped', output, diag)
if message :
log_record('skipped', "Output follows:\n" + message )
if diag :
log_record('skipped', "Diagnostic: \n" + diag )
stats['skipped'] += 1
def report(case, test, name, conf, status, output, diag=None, desc=None):
writefile (output, '[' +status+ '] ' + case + test + ' : ' + name + ' ('+ conf+')')
if diag :
writefile (output, '-------> ' + diag)
if desc:
writefile(output, desc)
#log_record('report', + case + test + ' documented')
e = ET.SubElement( xUnitTestsuite, 'testcase' )
e.set( 'name', case + '_' + test + '_' + name )
e.set( 'classname', 'shellscript' )
e.set( 'time', repr( time.time() - testcase_starttime ) )
if status == 'failed':
e = ET.SubElement( e, 'failure' )
e.set( 'message', 'failed' )
e.text = diag
if status == 'skipped':
e = ET.SubElement( e, 'skipped' )
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment