mirror of
https://github.com/open5gs/open5gs.git
synced 2025-11-03 05:23:38 +00:00
Control Plane and Data Plane Thread is merged to One thread in SGW/PGW
- for protecting context, SGW/PGW is implemented with only one thread. - In PGW, processing control plane could be delayed +10ms when diameter thread sends message queue to the PGW control plane. - In other case, all performance may be same with previous architecture.
This commit is contained in:
@@ -92,6 +92,15 @@ CORE_DECLARE(status_t) event_delete(msgq_id queue_id);
|
||||
*/
|
||||
CORE_DECLARE(status_t) event_send(msgq_id queue_id, event_t *e);
|
||||
|
||||
/**
|
||||
* Receive a event from event queue
|
||||
*
|
||||
* @return If success, return CORE_OK
|
||||
* If queue is empty, return CORE_EAGAIN
|
||||
* If else, return CORE_ERROR.
|
||||
*/
|
||||
CORE_DECLARE(status_t) event_recv(msgq_id queue_id, event_t *e);
|
||||
|
||||
/**
|
||||
* Receive a event from event queue with timeout
|
||||
*
|
||||
|
||||
@@ -50,6 +50,22 @@ status_t event_send(msgq_id queue_id, event_t *e)
|
||||
return rv;
|
||||
}
|
||||
|
||||
status_t event_recv(msgq_id queue_id, event_t *e)
|
||||
{
|
||||
status_t rv;
|
||||
|
||||
d_assert(e, return -1, "Null param");
|
||||
d_assert(queue_id, return -1, "event queue isn't initialized");
|
||||
|
||||
rv = msgq_recv(queue_id, (char*)e, EVENT_SIZE);
|
||||
if (rv == CORE_ERROR)
|
||||
{
|
||||
d_error("msgq_timedrecv failed", rv);
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
status_t event_timedrecv(msgq_id queue_id, event_t *e, c_time_t timeout)
|
||||
{
|
||||
status_t rv;
|
||||
|
||||
@@ -80,9 +80,10 @@ static void *THREAD_FUNC sm_main(thread_id id, void *data)
|
||||
|
||||
prev_tm = time_now();
|
||||
|
||||
#define EVENT_LOOP_TIMEOUT 50 /* 50ms */
|
||||
while ((!thread_should_stop()))
|
||||
{
|
||||
rv = event_timedrecv(mme_self()->queue_id, &event, EVENT_WAIT_TIMEOUT);
|
||||
rv = event_timedrecv(mme_self()->queue_id, &event, EVENT_LOOP_TIMEOUT);
|
||||
|
||||
d_assert(rv != CORE_ERROR, continue,
|
||||
"While receiving a event message, error occurs");
|
||||
@@ -90,7 +91,7 @@ static void *THREAD_FUNC sm_main(thread_id id, void *data)
|
||||
now_tm = time_now();
|
||||
|
||||
/* if the gap is over 10 ms, execute preriodic jobs */
|
||||
if (now_tm - prev_tm > EVENT_WAIT_TIMEOUT)
|
||||
if (now_tm - prev_tm > EVENT_LOOP_TIMEOUT * 1000)
|
||||
{
|
||||
tm_execute_tm_service(
|
||||
&mme_self()->tm_service, mme_self()->queue_id);
|
||||
@@ -118,7 +119,7 @@ static void *THREAD_FUNC net_main(thread_id id, void *data)
|
||||
{
|
||||
while (!thread_should_stop())
|
||||
{
|
||||
net_fds_read_run(50);
|
||||
net_fds_read_run(EVENT_LOOP_TIMEOUT);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
||||
@@ -8,11 +8,8 @@
|
||||
|
||||
#include "pgw_fd_path.h"
|
||||
|
||||
static thread_id sm_thread;
|
||||
static void *THREAD_FUNC sm_main(thread_id id, void *data);
|
||||
|
||||
static thread_id net_thread;
|
||||
static void *THREAD_FUNC net_main(thread_id id, void *data);
|
||||
static thread_id pgw_thread;
|
||||
static void *THREAD_FUNC pgw_main(thread_id id, void *data);
|
||||
|
||||
static int initialized = 0;
|
||||
|
||||
@@ -36,9 +33,7 @@ status_t pgw_initialize()
|
||||
ret = pgw_fd_init();
|
||||
if (ret != 0) return CORE_ERROR;
|
||||
|
||||
rv = thread_create(&sm_thread, NULL, sm_main, NULL);
|
||||
if (rv != CORE_OK) return rv;
|
||||
rv = thread_create(&net_thread, NULL, net_main, NULL);
|
||||
rv = thread_create(&pgw_thread, NULL, pgw_main, NULL);
|
||||
if (rv != CORE_OK) return rv;
|
||||
|
||||
initialized = 1;
|
||||
@@ -50,8 +45,7 @@ void pgw_terminate(void)
|
||||
{
|
||||
if (!initialized) return;
|
||||
|
||||
thread_delete(net_thread);
|
||||
thread_delete(sm_thread);
|
||||
thread_delete(pgw_thread);
|
||||
|
||||
pgw_fd_final();
|
||||
|
||||
@@ -60,16 +54,16 @@ void pgw_terminate(void)
|
||||
gtp_xact_final();
|
||||
}
|
||||
|
||||
static void *THREAD_FUNC sm_main(thread_id id, void *data)
|
||||
static void *THREAD_FUNC pgw_main(thread_id id, void *data)
|
||||
{
|
||||
event_t event;
|
||||
fsm_t pgw_sm;
|
||||
c_time_t prev_tm, now_tm;
|
||||
int r;
|
||||
status_t rv;
|
||||
|
||||
memset(&event, 0, sizeof(event_t));
|
||||
|
||||
pgw_self()->queue_id = event_create(MSGQ_O_BLOCK);
|
||||
pgw_self()->queue_id = event_create(MSGQ_O_NONBLOCK);
|
||||
d_assert(pgw_self()->queue_id, return NULL,
|
||||
"PGW event queue creation failed");
|
||||
tm_service_init(&pgw_self()->tm_service);
|
||||
@@ -81,30 +75,35 @@ static void *THREAD_FUNC sm_main(thread_id id, void *data)
|
||||
|
||||
prev_tm = time_now();
|
||||
|
||||
#define EVENT_LOOP_TIMEOUT 10 /* 10ms */
|
||||
while ((!thread_should_stop()))
|
||||
{
|
||||
r = event_timedrecv(pgw_self()->queue_id, &event, EVENT_WAIT_TIMEOUT);
|
||||
|
||||
d_assert(r != CORE_ERROR, continue,
|
||||
"While receiving a event message, error occurs");
|
||||
|
||||
now_tm = time_now();
|
||||
|
||||
/* if the gap is over 10 ms, execute preriodic jobs */
|
||||
if (now_tm - prev_tm > EVENT_WAIT_TIMEOUT)
|
||||
net_fds_read_run(EVENT_LOOP_TIMEOUT);
|
||||
do
|
||||
{
|
||||
tm_execute_tm_service(
|
||||
&pgw_self()->tm_service, pgw_self()->queue_id);
|
||||
rv = event_recv(pgw_self()->queue_id, &event);
|
||||
|
||||
prev_tm = now_tm;
|
||||
}
|
||||
d_assert(rv != CORE_ERROR, continue,
|
||||
"While receiving a event message, error occurs");
|
||||
|
||||
if (r == CORE_TIMEUP)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
now_tm = time_now();
|
||||
|
||||
fsm_dispatch(&pgw_sm, (fsm_event_t*)&event);
|
||||
/* if the gap is over event_loop timeout, execute preriodic jobs */
|
||||
if (now_tm - prev_tm > (EVENT_LOOP_TIMEOUT * 1000))
|
||||
{
|
||||
tm_execute_tm_service(
|
||||
&pgw_self()->tm_service, pgw_self()->queue_id);
|
||||
|
||||
prev_tm = now_tm;
|
||||
}
|
||||
|
||||
if (rv == CORE_EAGAIN)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
fsm_dispatch(&pgw_sm, (fsm_event_t*)&event);
|
||||
} while(rv == CORE_OK);
|
||||
}
|
||||
|
||||
fsm_final(&pgw_sm, 0);
|
||||
@@ -114,13 +113,3 @@ static void *THREAD_FUNC sm_main(thread_id id, void *data)
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void *THREAD_FUNC net_main(thread_id id, void *data)
|
||||
{
|
||||
while (!thread_should_stop())
|
||||
{
|
||||
net_fds_read_run(50);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@@ -6,11 +6,8 @@
|
||||
#include "sgw_context.h"
|
||||
#include "sgw_event.h"
|
||||
|
||||
static thread_id sm_thread;
|
||||
static void *THREAD_FUNC sm_main(thread_id id, void *data);
|
||||
|
||||
static thread_id net_thread;
|
||||
static void *THREAD_FUNC net_main(thread_id id, void *data);
|
||||
static thread_id sgw_thread;
|
||||
static void *THREAD_FUNC sgw_main(thread_id id, void *data);
|
||||
|
||||
static int initialized = 0;
|
||||
|
||||
@@ -27,9 +24,7 @@ status_t sgw_initialize()
|
||||
rv = sgw_context_setup_trace_module();
|
||||
if (rv != CORE_OK) return rv;
|
||||
|
||||
rv = thread_create(&sm_thread, NULL, sm_main, NULL);
|
||||
if (rv != CORE_OK) return rv;
|
||||
rv = thread_create(&net_thread, NULL, net_main, NULL);
|
||||
rv = thread_create(&sgw_thread, NULL, sgw_main, NULL);
|
||||
if (rv != CORE_OK) return rv;
|
||||
|
||||
initialized = 1;
|
||||
@@ -41,24 +36,23 @@ void sgw_terminate(void)
|
||||
{
|
||||
if (!initialized) return;
|
||||
|
||||
thread_delete(net_thread);
|
||||
thread_delete(sm_thread);
|
||||
thread_delete(sgw_thread);
|
||||
|
||||
sgw_context_final();
|
||||
|
||||
gtp_xact_final();
|
||||
}
|
||||
|
||||
static void *THREAD_FUNC sm_main(thread_id id, void *data)
|
||||
static void *THREAD_FUNC sgw_main(thread_id id, void *data)
|
||||
{
|
||||
event_t event;
|
||||
fsm_t sgw_sm;
|
||||
c_time_t prev_tm, now_tm;
|
||||
int r;
|
||||
status_t rv;
|
||||
|
||||
memset(&event, 0, sizeof(event_t));
|
||||
|
||||
sgw_self()->queue_id = event_create(MSGQ_O_BLOCK);
|
||||
sgw_self()->queue_id = event_create(MSGQ_O_NONBLOCK);
|
||||
d_assert(sgw_self()->queue_id, return NULL,
|
||||
"SGW event queue creation failed");
|
||||
tm_service_init(&sgw_self()->tm_service);
|
||||
@@ -70,30 +64,35 @@ static void *THREAD_FUNC sm_main(thread_id id, void *data)
|
||||
|
||||
prev_tm = time_now();
|
||||
|
||||
#define EVENT_LOOP_TIMEOUT 10 /* 10ms */
|
||||
while ((!thread_should_stop()))
|
||||
{
|
||||
r = event_timedrecv(sgw_self()->queue_id, &event, EVENT_WAIT_TIMEOUT);
|
||||
|
||||
d_assert(r != CORE_ERROR, continue,
|
||||
"While receiving a event message, error occurs");
|
||||
|
||||
now_tm = time_now();
|
||||
|
||||
/* if the gap is over 10 ms, execute preriodic jobs */
|
||||
if (now_tm - prev_tm > EVENT_WAIT_TIMEOUT)
|
||||
net_fds_read_run(EVENT_LOOP_TIMEOUT);
|
||||
do
|
||||
{
|
||||
tm_execute_tm_service(
|
||||
&sgw_self()->tm_service, sgw_self()->queue_id);
|
||||
rv = event_recv(sgw_self()->queue_id, &event);
|
||||
|
||||
prev_tm = now_tm;
|
||||
}
|
||||
d_assert(rv != CORE_ERROR, continue,
|
||||
"While receiving a event message, error occurs");
|
||||
|
||||
if (r == CORE_TIMEUP)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
now_tm = time_now();
|
||||
|
||||
fsm_dispatch(&sgw_sm, (fsm_event_t*)&event);
|
||||
/* if the gap is over event_loop timeout, execute preriodic jobs */
|
||||
if (now_tm - prev_tm > (EVENT_LOOP_TIMEOUT * 1000))
|
||||
{
|
||||
tm_execute_tm_service(
|
||||
&sgw_self()->tm_service, sgw_self()->queue_id);
|
||||
|
||||
prev_tm = now_tm;
|
||||
}
|
||||
|
||||
if (rv == CORE_EAGAIN)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
fsm_dispatch(&sgw_sm, (fsm_event_t*)&event);
|
||||
} while(rv == CORE_OK);
|
||||
}
|
||||
|
||||
fsm_final(&sgw_sm, 0);
|
||||
@@ -103,13 +102,3 @@ static void *THREAD_FUNC sm_main(thread_id id, void *data)
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void *THREAD_FUNC net_main(thread_id id, void *data)
|
||||
{
|
||||
while (!thread_should_stop())
|
||||
{
|
||||
net_fds_read_run(50);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user