mirror of
https://gitea.osmocom.org/cellular-infrastructure/osmo-msc.git
synced 2025-11-18 20:55:28 +00:00
Since the split of OsmoNiTB, OsmoMSC does not deal with the radio access network directly. Therefore the only purpose of T3212 is to control subscriber expiration in the local VLR. The timeout value indicated in System Information Type 3 needs to be configured separately in the BSC/RNC. This means that we don't need to store it in deci-hours anymore. Let's move T3212 to the group of VLR specific timers, so it can be configured and introspected using the generic 'timer' command, and deprecate the old '[no] periodic location update' command. It should be also noted that in the old code subscriber expiration timeout was actually set to twice the T3212 value plus one minute. After this change, we apply the configured value 'as-is', but keep the old behaviour for 'periodic location update' command. Change-Id: I9b12066599a7c834a53a93acf5902d91273bc74f
304 lines
12 KiB
Python
Executable File
304 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# (C) 2013 by Katerina Barone-Adesi <kat.obsc@gmail.com>
|
|
# (C) 2013 by Holger Hans Peter Freyther
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import os, sys
|
|
import time
|
|
import unittest
|
|
import socket
|
|
import subprocess
|
|
|
|
import osmopy.obscvty as obscvty
|
|
import osmopy.osmoutil as osmoutil
|
|
from osmopy.osmo_ipa import IPA
|
|
|
|
# to be able to find $top_srcdir/doc/...
|
|
confpath = os.path.join(sys.path[0], '..')
|
|
|
|
class TestVTYBase(unittest.TestCase):
|
|
|
|
def checkForEndAndExit(self):
|
|
res = self.vty.command("list")
|
|
#print ('looking for "exit"\n')
|
|
self.assertTrue(res.find(' exit\r') > 0)
|
|
#print 'found "exit"\nlooking for "end"\n'
|
|
self.assertTrue(res.find(' end\r') > 0)
|
|
#print 'found "end"\n'
|
|
|
|
def vty_command(self):
|
|
raise Exception("Needs to be implemented by a subclass")
|
|
|
|
def vty_app(self):
|
|
raise Exception("Needs to be implemented by a subclass")
|
|
|
|
def setUp(self):
|
|
osmo_vty_cmd = self.vty_command()[:]
|
|
config_index = osmo_vty_cmd.index('-c')
|
|
if config_index:
|
|
cfi = config_index + 1
|
|
osmo_vty_cmd[cfi] = os.path.join(confpath, osmo_vty_cmd[cfi])
|
|
|
|
try:
|
|
self.proc = osmoutil.popen_devnull(osmo_vty_cmd)
|
|
except OSError:
|
|
print("Current directory: %s" % os.getcwd(), file=sys.stderr)
|
|
print("Consider setting -b", file=sys.stderr)
|
|
|
|
appstring = self.vty_app()[2]
|
|
appport = self.vty_app()[0]
|
|
self.vty = obscvty.VTYInteract(appstring, "127.0.0.1", appport)
|
|
|
|
def tearDown(self):
|
|
if self.vty:
|
|
self.vty._close_socket()
|
|
self.vty = None
|
|
osmoutil.end_proc(self.proc)
|
|
|
|
class TestVTYMSC(TestVTYBase):
|
|
|
|
def vty_command(self):
|
|
return ["./src/osmo-msc/osmo-msc", "-c",
|
|
"doc/examples/osmo-msc/osmo-msc.cfg"]
|
|
|
|
def vty_app(self):
|
|
return (4254, "./src/osmo-msc/osmo-msc", "OsmoMSC", "msc")
|
|
|
|
def testConfigNetworkTree(self, include_bsc_items=True):
|
|
self.vty.enable()
|
|
self.assertTrue(self.vty.verify("configure terminal",['']))
|
|
self.assertEqual(self.vty.node(), 'config')
|
|
self.checkForEndAndExit()
|
|
self.assertTrue(self.vty.verify("network",['']))
|
|
self.assertEqual(self.vty.node(), 'config-net')
|
|
self.checkForEndAndExit()
|
|
self.vty.command("write terminal")
|
|
self.assertTrue(self.vty.verify("exit",['']))
|
|
self.assertEqual(self.vty.node(), 'config')
|
|
self.assertTrue(self.vty.verify("exit",['']))
|
|
self.assertTrue(self.vty.node() is None)
|
|
|
|
def checkForSmpp(self):
|
|
"""SMPP is not always enabled, check if it is"""
|
|
res = self.vty.command("list")
|
|
return "smpp" in res
|
|
|
|
def testSmppFirst(self):
|
|
# enable the configuration
|
|
self.vty.enable()
|
|
self.vty.command("configure terminal")
|
|
|
|
if not self.checkForSmpp():
|
|
return
|
|
|
|
self.vty.command("smpp")
|
|
|
|
# check the default
|
|
res = self.vty.command("write terminal")
|
|
self.assertTrue(res.find(' no smpp-first') > 0)
|
|
|
|
self.vty.verify("smpp-first", [''])
|
|
res = self.vty.command("write terminal")
|
|
self.assertTrue(res.find(' smpp-first') > 0)
|
|
self.assertEqual(res.find('no smpp-first'), -1)
|
|
|
|
self.vty.verify("no smpp-first", [''])
|
|
res = self.vty.command("write terminal")
|
|
self.assertTrue(res.find('no smpp-first') > 0)
|
|
|
|
def testVtyTree(self):
|
|
self.vty.enable()
|
|
self.assertTrue(self.vty.verify("configure terminal", ['']))
|
|
self.assertEqual(self.vty.node(), 'config')
|
|
self.checkForEndAndExit()
|
|
self.assertTrue(self.vty.verify('mncc-int', ['']))
|
|
self.assertEqual(self.vty.node(), 'config-mncc-int')
|
|
self.checkForEndAndExit()
|
|
self.assertTrue(self.vty.verify('exit', ['']))
|
|
|
|
if self.checkForSmpp():
|
|
self.assertEqual(self.vty.node(), 'config')
|
|
self.assertTrue(self.vty.verify('smpp', ['']))
|
|
self.assertEqual(self.vty.node(), 'config-smpp')
|
|
self.checkForEndAndExit()
|
|
self.assertTrue(self.vty.verify("exit", ['']))
|
|
|
|
self.assertEqual(self.vty.node(), 'config')
|
|
self.assertTrue(self.vty.verify("exit", ['']))
|
|
self.assertTrue(self.vty.node() is None)
|
|
|
|
# Check searching for outer node's commands
|
|
self.vty.command("configure terminal")
|
|
self.vty.command('mncc-int')
|
|
|
|
if self.checkForSmpp():
|
|
self.vty.command('smpp')
|
|
self.assertEqual(self.vty.node(), 'config-smpp')
|
|
self.vty.command('mncc-int')
|
|
|
|
self.assertEqual(self.vty.node(), 'config-mncc-int')
|
|
|
|
def testSi2Q(self):
|
|
self.vty.enable()
|
|
self.vty.command("configure terminal")
|
|
self.vty.command("network")
|
|
self.vty.command("bts 0")
|
|
before = self.vty.command("show running-config")
|
|
self.vty.command("si2quater neighbor-list add earfcn 1911 threshold 11 2")
|
|
self.vty.command("si2quater neighbor-list add earfcn 1924 threshold 11 3")
|
|
self.vty.command("si2quater neighbor-list add earfcn 2111 threshold 11")
|
|
self.vty.command("si2quater neighbor-list del earfcn 1911")
|
|
self.vty.command("si2quater neighbor-list del earfcn 1924")
|
|
self.vty.command("si2quater neighbor-list del earfcn 2111")
|
|
self.assertEqual(before, self.vty.command("show running-config"))
|
|
self.vty.command("si2quater neighbor-list add uarfcn 1976 13 1")
|
|
self.vty.command("si2quater neighbor-list add uarfcn 1976 38 1")
|
|
self.vty.command("si2quater neighbor-list add uarfcn 1976 44 1")
|
|
self.vty.command("si2quater neighbor-list add uarfcn 1976 120 1")
|
|
self.vty.command("si2quater neighbor-list add uarfcn 1976 140 1")
|
|
self.vty.command("si2quater neighbor-list add uarfcn 1976 163 1")
|
|
self.vty.command("si2quater neighbor-list add uarfcn 1976 166 1")
|
|
self.vty.command("si2quater neighbor-list add uarfcn 1976 217 1")
|
|
self.vty.command("si2quater neighbor-list add uarfcn 1976 224 1")
|
|
self.vty.command("si2quater neighbor-list add uarfcn 1976 225 1")
|
|
self.vty.command("si2quater neighbor-list add uarfcn 1976 226 1")
|
|
self.vty.command("si2quater neighbor-list del uarfcn 1976 13")
|
|
self.vty.command("si2quater neighbor-list del uarfcn 1976 38")
|
|
self.vty.command("si2quater neighbor-list del uarfcn 1976 44")
|
|
self.vty.command("si2quater neighbor-list del uarfcn 1976 120")
|
|
self.vty.command("si2quater neighbor-list del uarfcn 1976 140")
|
|
self.vty.command("si2quater neighbor-list del uarfcn 1976 163")
|
|
self.vty.command("si2quater neighbor-list del uarfcn 1976 166")
|
|
self.vty.command("si2quater neighbor-list del uarfcn 1976 217")
|
|
self.vty.command("si2quater neighbor-list del uarfcn 1976 224")
|
|
self.vty.command("si2quater neighbor-list del uarfcn 1976 225")
|
|
self.vty.command("si2quater neighbor-list del uarfcn 1976 226")
|
|
self.assertEqual(before, self.vty.command("show running-config"))
|
|
|
|
def testEnableDisablePeriodicLU(self):
|
|
self.vty.enable()
|
|
self.vty.command("configure terminal")
|
|
self.vty.command("network")
|
|
self.vty.command("bts 0")
|
|
|
|
# Test invalid input
|
|
self.vty.verify("periodic location update 0", ['% Unknown command.'])
|
|
self.vty.verify("periodic location update 5", ['% Unknown command.'])
|
|
self.vty.verify("periodic location update 1531", ['% Unknown command.'])
|
|
|
|
depr_str = "% 'periodic location update' is now deprecated: " \
|
|
"use 'timer T3212' to change subscriber expiration timeout."
|
|
set_str = "% Setting T3212 to 121 minutes (emulating the old behaviour)."
|
|
|
|
# Enable periodic LU (deprecated command)
|
|
self.vty.verify("periodic location update 60", [depr_str, set_str])
|
|
res = self.vty.command("write terminal")
|
|
self.assertTrue(res.find('timer vlr T3212 121') > 0)
|
|
self.assertEqual(res.find('periodic location update 60'), -1)
|
|
self.assertEqual(res.find('no periodic location update'), -1)
|
|
|
|
# Now disable it (deprecated command)
|
|
self.vty.verify("no periodic location update", [depr_str])
|
|
res = self.vty.command("write terminal")
|
|
self.assertEqual(res.find('no periodic location update'), -1)
|
|
self.assertEqual(res.find('timer vlr T3212 121'), -1)
|
|
|
|
def testShowNetwork(self):
|
|
res = self.vty.command("show network")
|
|
self.assertTrue(res.startswith('BSC is on Country Code') >= 0)
|
|
|
|
def ipa_handle_small(x, verbose = False):
|
|
s = data2str(x.recv(4))
|
|
if len(s) != 4*2:
|
|
raise Exception("expected to receive 4 bytes, but got %d (%r)" % (len(s)/2, s))
|
|
if "0001fe00" == s:
|
|
if (verbose):
|
|
print("\tBSC <- NAT: PING?")
|
|
x.send(IPA().pong())
|
|
elif "0001fe06" == s:
|
|
if (verbose):
|
|
print("\tBSC <- NAT: IPA ID ACK")
|
|
x.send(IPA().id_ack())
|
|
elif "0001fe00" == s:
|
|
if (verbose):
|
|
print("\tBSC <- NAT: PONG!")
|
|
else:
|
|
if (verbose):
|
|
print("\tBSC <- NAT: ", s)
|
|
|
|
def ipa_handle_resp(x, tk, verbose = False, proc=None):
|
|
s = data2str(x.recv(38))
|
|
if "0023fe040108010701020103010401050101010011" in s:
|
|
retries = 3
|
|
while True:
|
|
print("\tsending IPA identity(%s) at %s" % (tk, time.strftime("%T")))
|
|
try:
|
|
x.send(IPA().id_resp(IPA().identity(name = tk.encode('utf-8'))))
|
|
print("\tdone sending IPA identity(%s) at %s" % (tk,
|
|
time.strftime("%T")))
|
|
break
|
|
except:
|
|
print("\tfailed sending IPA identity at", time.strftime("%T"))
|
|
if proc:
|
|
print("\tproc.poll() = %r" % proc.poll())
|
|
if retries < 1:
|
|
print("\tgiving up")
|
|
raise
|
|
print("\tretrying (%d attempts left)" % retries)
|
|
retries -= 1
|
|
else:
|
|
if (verbose):
|
|
print("\tBSC <- NAT: ", s)
|
|
|
|
if __name__ == '__main__':
|
|
import argparse
|
|
import sys
|
|
|
|
workdir = '.'
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("-v", "--verbose", dest="verbose",
|
|
action="store_true", help="verbose mode")
|
|
parser.add_argument("-p", "--pythonconfpath", dest="p",
|
|
help="searchpath for config")
|
|
parser.add_argument("-w", "--workdir", dest="w",
|
|
help="Working directory")
|
|
parser.add_argument("test_name", nargs="*", help="(parts of) test names to run, case-insensitive")
|
|
args = parser.parse_args()
|
|
|
|
verbose_level = 1
|
|
if args.verbose:
|
|
verbose_level = 2
|
|
|
|
if args.w:
|
|
workdir = args.w
|
|
|
|
if args.p:
|
|
confpath = args.p
|
|
|
|
print("confpath %s, workdir %s" % (confpath, workdir))
|
|
os.chdir(workdir)
|
|
print("Running tests for specific VTY commands")
|
|
suite = unittest.TestSuite()
|
|
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestVTYMSC))
|
|
|
|
if args.test_name:
|
|
osmoutil.pick_tests(suite, *args.test_name)
|
|
|
|
res = unittest.TextTestRunner(verbosity=verbose_level, stream=sys.stdout).run(suite)
|
|
sys.exit(len(res.errors) + len(res.failures))
|
|
|
|
# vim: shiftwidth=4 expandtab nocin ai
|