mirror of
https://gitea.osmocom.org/cellular-infrastructure/osmo-mgw.git
synced 2025-11-03 05:23:43 +00:00
30f1f37638 introduced new channel
combinations but had a copy and paste error in the description.
The jenkins system didn't run the external tests so this issue
and others were not noticed until now.
Fix the copy and paste and update the test result.
605 lines
21 KiB
Python
605 lines
21 KiB
Python
#!/usr/bin/env python
|
|
|
|
# (C) 2013 by Jacob Erlbeck <jerlbeck@sysmocom.de>
|
|
# (C) 2014 by Holger Hans Peter Freyther
|
|
# based on vty_test_runner.py:
|
|
# (C) 2013 by Katerina Barone-Adesi <kat.obsc@gmail.com>
|
|
# (C) 2013 by Holger Hans Peter Freyther
|
|
# based on bsc_control.py.
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import os
|
|
import time
|
|
import unittest
|
|
import socket
|
|
import sys
|
|
import struct
|
|
|
|
import osmopy.obscvty as obscvty
|
|
import osmopy.osmoutil as osmoutil
|
|
|
|
confpath = '.'
|
|
verbose = False
|
|
|
|
class TestCtrlBase(unittest.TestCase):
|
|
|
|
def ctrl_command(self):
|
|
raise Exception("Needs to be implemented by a subclass")
|
|
|
|
def ctrl_app(self):
|
|
raise Exception("Needs to be implemented by a subclass")
|
|
|
|
def setUp(self):
|
|
osmo_ctrl_cmd = self.ctrl_command()[:]
|
|
config_index = osmo_ctrl_cmd.index('-c')
|
|
if config_index:
|
|
cfi = config_index + 1
|
|
osmo_ctrl_cmd[cfi] = os.path.join(confpath, osmo_ctrl_cmd[cfi])
|
|
|
|
try:
|
|
print "Launch: %s from %s" % (' '.join(osmo_ctrl_cmd), os.getcwd())
|
|
self.proc = osmoutil.popen_devnull(osmo_ctrl_cmd)
|
|
except OSError:
|
|
print >> sys.stderr, "Current directory: %s" % os.getcwd()
|
|
print >> sys.stderr, "Consider setting -b"
|
|
time.sleep(2)
|
|
|
|
appstring = self.ctrl_app()[2]
|
|
appport = self.ctrl_app()[0]
|
|
self.connect("127.0.0.1", appport)
|
|
self.next_id = 1000
|
|
|
|
def tearDown(self):
|
|
self.disconnect()
|
|
osmoutil.end_proc(self.proc)
|
|
|
|
def prefix_ipa_ctrl_header(self, data):
|
|
return struct.pack(">HBB", len(data)+1, 0xee, 0) + data
|
|
|
|
def remove_ipa_ctrl_header(self, data):
|
|
if (len(data) < 4):
|
|
raise BaseException("Answer too short!")
|
|
(plen, ipa_proto, osmo_proto) = struct.unpack(">HBB", data[:4])
|
|
if (plen + 3 > len(data)):
|
|
print "Warning: Wrong payload length (expected %i, got %i)" % (plen, len(data) - 3)
|
|
if (ipa_proto != 0xee or osmo_proto != 0):
|
|
raise BaseException("Wrong protocol in answer!")
|
|
|
|
return data[4:plen+3], data[plen+3:]
|
|
|
|
def disconnect(self):
|
|
if not (self.sock is None):
|
|
self.sock.close()
|
|
|
|
def connect(self, host, port):
|
|
if verbose:
|
|
print "Connecting to host %s:%i" % (host, port)
|
|
|
|
sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sck.setblocking(1)
|
|
sck.connect((host, port))
|
|
self.sock = sck
|
|
return sck
|
|
|
|
def send(self, data):
|
|
if verbose:
|
|
print "Sending \"%s\"" %(data)
|
|
data = self.prefix_ipa_ctrl_header(data)
|
|
return self.sock.send(data) == len(data)
|
|
|
|
def send_set(self, var, value, id):
|
|
setmsg = "SET %s %s %s" %(id, var, value)
|
|
return self.send(setmsg)
|
|
|
|
def send_get(self, var, id):
|
|
getmsg = "GET %s %s" %(id, var)
|
|
return self.send(getmsg)
|
|
|
|
def do_set(self, var, value):
|
|
id = self.next_id
|
|
self.next_id += 1
|
|
self.send_set(var, value, id)
|
|
return self.recv_msgs()[id]
|
|
|
|
def do_get(self, var):
|
|
id = self.next_id
|
|
self.next_id += 1
|
|
self.send_get(var, id)
|
|
return self.recv_msgs()[id]
|
|
|
|
def recv_msgs(self):
|
|
responses = {}
|
|
data = self.sock.recv(4096)
|
|
while (len(data)>0):
|
|
(answer, data) = self.remove_ipa_ctrl_header(data)
|
|
if verbose:
|
|
print "Got message:", answer
|
|
(mtype, id, msg) = answer.split(None, 2)
|
|
id = int(id)
|
|
rsp = {'mtype': mtype, 'id': id}
|
|
if mtype == "ERROR":
|
|
rsp['error'] = msg
|
|
else:
|
|
split = msg.split(None, 1)
|
|
rsp['var'] = split[0]
|
|
if len(split) > 1:
|
|
rsp['value'] = split[1]
|
|
else:
|
|
rsp['value'] = None
|
|
responses[id] = rsp
|
|
|
|
if verbose:
|
|
print "Decoded replies: ", responses
|
|
|
|
return responses
|
|
|
|
|
|
class TestCtrlBSC(TestCtrlBase):
|
|
|
|
def tearDown(self):
|
|
TestCtrlBase.tearDown(self)
|
|
os.unlink("tmp_dummy_sock")
|
|
|
|
def ctrl_command(self):
|
|
return ["./src/osmo-bsc/osmo-bsc", "-r", "tmp_dummy_sock", "-c",
|
|
"doc/examples/osmo-bsc/osmo-bsc.cfg"]
|
|
|
|
def ctrl_app(self):
|
|
return (4249, "./src/osmo-bsc/osmo-bsc", "OsmoBSC", "bsc")
|
|
|
|
def testCtrlErrs(self):
|
|
r = self.do_get('invalid')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
self.assertEquals(r['error'], 'Command not found')
|
|
|
|
r = self.do_set('rf_locked', '999')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
self.assertEquals(r['error'], 'Value failed verification.')
|
|
|
|
r = self.do_get('bts')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
self.assertEquals(r['error'], 'Error while parsing the index.')
|
|
|
|
r = self.do_get('bts.999')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
self.assertEquals(r['error'], 'Error while resolving object')
|
|
|
|
def testBtsLac(self):
|
|
r = self.do_get('bts.0.location-area-code')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.location-area-code')
|
|
self.assertEquals(r['value'], '1')
|
|
|
|
r = self.do_set('bts.0.location-area-code', '23')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.location-area-code')
|
|
self.assertEquals(r['value'], '23')
|
|
|
|
r = self.do_get('bts.0.location-area-code')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.location-area-code')
|
|
self.assertEquals(r['value'], '23')
|
|
|
|
r = self.do_set('bts.0.location-area-code', '-1')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
self.assertEquals(r['error'], 'Input not within the range')
|
|
|
|
def testBtsCi(self):
|
|
r = self.do_get('bts.0.cell-identity')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.cell-identity')
|
|
self.assertEquals(r['value'], '0')
|
|
|
|
r = self.do_set('bts.0.cell-identity', '23')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.cell-identity')
|
|
self.assertEquals(r['value'], '23')
|
|
|
|
r = self.do_get('bts.0.cell-identity')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.cell-identity')
|
|
self.assertEquals(r['value'], '23')
|
|
|
|
r = self.do_set('bts.0.cell-identity', '-1')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
self.assertEquals(r['error'], 'Input not within the range')
|
|
|
|
def testBtsGenerateSystemInformation(self):
|
|
r = self.do_get('bts.0.send-new-system-informations')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
self.assertEquals(r['error'], 'Write only attribute')
|
|
|
|
# No RSL links so it will fail
|
|
r = self.do_set('bts.0.send-new-system-informations', '1')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
self.assertEquals(r['error'], 'Failed to generate SI')
|
|
|
|
def testBtsChannelLoad(self):
|
|
r = self.do_set('bts.0.channel-load', '1')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
self.assertEquals(r['error'], 'Read only attribute')
|
|
|
|
# No RSL link so everything is 0
|
|
r = self.do_get('bts.0.channel-load')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['value'], 'CCCH+SDCCH4,0,0 TCH/F,0,0 TCH/H,0,0 SDCCH8,0,0 TCH/F_PDCH,0,0 CCCH+SDCCH4+CBCH,0,0 SDCCH8+CBCH,0,0')
|
|
|
|
def testBtsOmlConnectionState(self):
|
|
"""Check OML state. It will not be connected"""
|
|
r = self.do_set('bts.0.oml-connection-state', '1')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
self.assertEquals(r['error'], 'Read only attribute')
|
|
|
|
# No RSL link so everything is 0
|
|
r = self.do_get('bts.0.oml-connection-state')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['value'], 'disconnected')
|
|
|
|
def testTrxPowerRed(self):
|
|
r = self.do_get('bts.0.trx.0.max-power-reduction')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction')
|
|
self.assertEquals(r['value'], '20')
|
|
|
|
r = self.do_set('bts.0.trx.0.max-power-reduction', '22')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction')
|
|
self.assertEquals(r['value'], '22')
|
|
|
|
r = self.do_get('bts.0.trx.0.max-power-reduction')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction')
|
|
self.assertEquals(r['value'], '22')
|
|
|
|
r = self.do_set('bts.0.trx.0.max-power-reduction', '1')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
self.assertEquals(r['error'], 'Value must be even')
|
|
|
|
def testTrxArfcn(self):
|
|
r = self.do_get('bts.0.trx.0.arfcn')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.trx.0.arfcn')
|
|
self.assertEquals(r['value'], '871')
|
|
|
|
r = self.do_set('bts.0.trx.0.arfcn', '873')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.trx.0.arfcn')
|
|
self.assertEquals(r['value'], '873')
|
|
|
|
r = self.do_get('bts.0.trx.0.arfcn')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.trx.0.arfcn')
|
|
self.assertEquals(r['value'], '873')
|
|
|
|
r = self.do_set('bts.0.trx.0.arfcn', '2000')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
self.assertEquals(r['error'], 'Input not within the range')
|
|
|
|
def testRfLock(self):
|
|
r = self.do_get('bts.0.rf_state')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.rf_state')
|
|
self.assertEquals(r['value'], 'inoperational,unlocked,on')
|
|
|
|
r = self.do_set('rf_locked', '1')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'rf_locked')
|
|
self.assertEquals(r['value'], '1')
|
|
|
|
time.sleep(1.5)
|
|
|
|
r = self.do_get('bts.0.rf_state')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.rf_state')
|
|
self.assertEquals(r['value'], 'inoperational,locked,off')
|
|
|
|
r = self.do_set('rf_locked', '0')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'rf_locked')
|
|
self.assertEquals(r['value'], '0')
|
|
|
|
time.sleep(1.5)
|
|
|
|
r = self.do_get('bts.0.rf_state')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.rf_state')
|
|
self.assertEquals(r['value'], 'inoperational,unlocked,on')
|
|
|
|
def testTimezone(self):
|
|
r = self.do_get('bts.0.timezone')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.timezone')
|
|
self.assertEquals(r['value'], 'off')
|
|
|
|
r = self.do_set('bts.0.timezone', '-2,15,2')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.timezone')
|
|
self.assertEquals(r['value'], '-2,15,2')
|
|
|
|
r = self.do_get('bts.0.timezone')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.timezone')
|
|
self.assertEquals(r['value'], '-2,15,2')
|
|
|
|
# Test invalid input
|
|
r = self.do_set('bts.0.timezone', '-2,15,2,5,6,7')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.timezone')
|
|
self.assertEquals(r['value'], '-2,15,2')
|
|
|
|
r = self.do_set('bts.0.timezone', '-2,15')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
r = self.do_set('bts.0.timezone', '-2')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
r = self.do_set('bts.0.timezone', '1')
|
|
|
|
r = self.do_set('bts.0.timezone', 'off')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.timezone')
|
|
self.assertEquals(r['value'], 'off')
|
|
|
|
r = self.do_get('bts.0.timezone')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'bts.0.timezone')
|
|
self.assertEquals(r['value'], 'off')
|
|
|
|
def testMcc(self):
|
|
r = self.do_set('mcc', '23')
|
|
r = self.do_get('mcc')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'mcc')
|
|
self.assertEquals(r['value'], '23')
|
|
|
|
r = self.do_set('mcc', '023')
|
|
r = self.do_get('mcc')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'mcc')
|
|
self.assertEquals(r['value'], '23')
|
|
|
|
def testMnc(self):
|
|
r = self.do_set('mnc', '9')
|
|
r = self.do_get('mnc')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'mnc')
|
|
self.assertEquals(r['value'], '9')
|
|
|
|
r = self.do_set('mnc', '09')
|
|
r = self.do_get('mnc')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'mnc')
|
|
self.assertEquals(r['value'], '9')
|
|
|
|
|
|
def testMccMncApply(self):
|
|
# Test some invalid input
|
|
r = self.do_set('mcc-mnc-apply', 'WRONG')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
|
|
r = self.do_set('mcc-mnc-apply', '1,')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
|
|
r = self.do_set('mcc-mnc-apply', '200,3')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'mcc-mnc-apply')
|
|
self.assertEquals(r['value'], 'Tried to drop the BTS')
|
|
|
|
# Set it again
|
|
r = self.do_set('mcc-mnc-apply', '200,3')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'mcc-mnc-apply')
|
|
self.assertEquals(r['value'], 'Nothing changed')
|
|
|
|
# Change it
|
|
r = self.do_set('mcc-mnc-apply', '200,4')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'mcc-mnc-apply')
|
|
self.assertEquals(r['value'], 'Tried to drop the BTS')
|
|
|
|
# Change it
|
|
r = self.do_set('mcc-mnc-apply', '201,4')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'mcc-mnc-apply')
|
|
self.assertEquals(r['value'], 'Tried to drop the BTS')
|
|
|
|
# Verify
|
|
r = self.do_get('mnc')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'mnc')
|
|
self.assertEquals(r['value'], '4')
|
|
|
|
r = self.do_get('mcc')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'mcc')
|
|
self.assertEquals(r['value'], '201')
|
|
|
|
# Change it
|
|
r = self.do_set('mcc-mnc-apply', '202,03')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'mcc-mnc-apply')
|
|
self.assertEquals(r['value'], 'Tried to drop the BTS')
|
|
|
|
r = self.do_get('mnc')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'mnc')
|
|
self.assertEquals(r['value'], '3')
|
|
|
|
r = self.do_get('mcc')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'mcc')
|
|
self.assertEquals(r['value'], '202')
|
|
|
|
class TestCtrlNITB(TestCtrlBase):
|
|
|
|
def tearDown(self):
|
|
TestCtrlBase.tearDown(self)
|
|
os.unlink("test_hlr.sqlite3")
|
|
|
|
def ctrl_command(self):
|
|
return ["./src/osmo-nitb/osmo-nitb", "-c",
|
|
"doc/examples/osmo-nitb/nanobts/openbsc.cfg", "-l", "test_hlr.sqlite3"]
|
|
|
|
def ctrl_app(self):
|
|
return (4249, "./src/osmo-nitb/osmo-nitb", "OsmoBSC", "nitb")
|
|
|
|
def testSubscriberAddRemove(self):
|
|
r = self.do_set('subscriber-modify-v1', '2620345,445566')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'subscriber-modify-v1')
|
|
self.assertEquals(r['value'], 'OK')
|
|
|
|
r = self.do_set('subscriber-modify-v1', '2620345,445567')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'subscriber-modify-v1')
|
|
self.assertEquals(r['value'], 'OK')
|
|
|
|
# TODO. verify that the entry has been created and modified? Invoke
|
|
# the sqlite3 CLI or do it through the DB libraries?
|
|
|
|
r = self.do_set('subscriber-delete-v1', '2620345')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['value'], 'Removed')
|
|
|
|
r = self.do_set('subscriber-delete-v1', '2620345')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
self.assertEquals(r['error'], 'Failed to find subscriber')
|
|
|
|
def testSubscriberList(self):
|
|
# TODO. Add command to mark a subscriber as active
|
|
r = self.do_get('subscriber-list-active-v1')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'subscriber-list-active-v1')
|
|
self.assertEquals(r['value'], None)
|
|
|
|
def testApplyConfiguration(self):
|
|
r = self.do_get('bts.0.apply-configuration')
|
|
self.assertEquals(r['mtype'], 'ERROR')
|
|
self.assertEquals(r['error'], 'Write only attribute')
|
|
|
|
r = self.do_set('bts.0.apply-configuration', '1')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['value'], 'Tried to drop the BTS')
|
|
|
|
class TestCtrlNAT(TestCtrlBase):
|
|
|
|
def ctrl_command(self):
|
|
return ["./src/osmo-bsc_nat/osmo-bsc_nat", "-c",
|
|
"doc/examples/osmo-bsc_nat/osmo-bsc_nat.cfg"]
|
|
|
|
def ctrl_app(self):
|
|
return (4250, "./src/osmo-bsc_nat/osmo-bsc_nat", "OsmoNAT", "nat")
|
|
|
|
def testAccessList(self):
|
|
r = self.do_get('net.0.bsc_cfg.0.access-list-name')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'net')
|
|
self.assertEquals(r['value'], None)
|
|
|
|
r = self.do_set('net.0.bsc_cfg.0.access-list-name', 'bla')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'net')
|
|
self.assertEquals(r['value'], 'bla')
|
|
|
|
r = self.do_get('net.0.bsc_cfg.0.access-list-name')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'net')
|
|
self.assertEquals(r['value'], 'bla')
|
|
|
|
r = self.do_set('net.0.bsc_cfg.0.no-access-list-name', '1')
|
|
self.assertEquals(r['mtype'], 'SET_REPLY')
|
|
self.assertEquals(r['var'], 'net')
|
|
self.assertEquals(r['value'], None)
|
|
|
|
r = self.do_get('net.0.bsc_cfg.0.access-list-name')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'net')
|
|
self.assertEquals(r['value'], None)
|
|
|
|
class TestCtrlSGSN(TestCtrlBase):
|
|
def ctrl_command(self):
|
|
return ["./src/gprs/osmo-sgsn", "-c",
|
|
"doc/examples/osmo-sgsn/osmo-sgsn.cfg"]
|
|
|
|
def ctrl_app(self):
|
|
return (4251, "./src/gprs/osmo-sgsn", "OsmoSGSN", "sgsn")
|
|
|
|
def testListSubscribers(self):
|
|
# TODO. Add command to mark a subscriber as active
|
|
r = self.do_get('subscriber-list-active-v1')
|
|
self.assertEquals(r['mtype'], 'GET_REPLY')
|
|
self.assertEquals(r['var'], 'subscriber-list-active-v1')
|
|
self.assertEquals(r['value'], None)
|
|
|
|
def add_bsc_test(suite, workdir):
|
|
if not os.path.isfile(os.path.join(workdir, "src/osmo-bsc/osmo-bsc")):
|
|
print("Skipping the BSC test")
|
|
return
|
|
test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlBSC)
|
|
suite.addTest(test)
|
|
|
|
def add_nitb_test(suite, workdir):
|
|
test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlNITB)
|
|
suite.addTest(test)
|
|
|
|
def add_nat_test(suite, workdir):
|
|
if not os.path.isfile(os.path.join(workdir, "src/osmo-bsc_nat/osmo-bsc_nat")):
|
|
print("Skipping the NAT test")
|
|
return
|
|
test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlNAT)
|
|
suite.addTest(test)
|
|
|
|
def add_sgsn_test(suite, workdir):
|
|
if not os.path.isfile(os.path.join(workdir, "src/gprs/osmo-sgsn")):
|
|
print("Skipping the SGSN test")
|
|
return
|
|
test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlSGSN)
|
|
suite.addTest(test)
|
|
|
|
if __name__ == '__main__':
|
|
import argparse
|
|
import sys
|
|
|
|
workdir = '.'
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("-v", "--verbose", dest="verbose",
|
|
action="store_true", help="verbose mode")
|
|
parser.add_argument("-p", "--pythonconfpath", dest="p",
|
|
help="searchpath for config")
|
|
parser.add_argument("-w", "--workdir", dest="w",
|
|
help="Working directory")
|
|
args = parser.parse_args()
|
|
|
|
verbose_level = 1
|
|
if args.verbose:
|
|
verbose_level = 2
|
|
verbose = True
|
|
|
|
if args.w:
|
|
workdir = args.w
|
|
|
|
if args.p:
|
|
confpath = args.p
|
|
|
|
print "confpath %s, workdir %s" % (confpath, workdir)
|
|
os.chdir(workdir)
|
|
print "Running tests for specific control commands"
|
|
suite = unittest.TestSuite()
|
|
add_bsc_test(suite, workdir)
|
|
add_nitb_test(suite, workdir)
|
|
add_nat_test(suite, workdir)
|
|
add_sgsn_test(suite, workdir)
|
|
res = unittest.TextTestRunner(verbosity=verbose_level).run(suite)
|
|
sys.exit(len(res.errors) + len(res.failures))
|