ms: fix thread prio startup issue

This fixes the 20 second startup delay caused by tx/control threads
getting temporarily stuck while trying to set their own priority.

Apparently the only sane way for core affinity+priority is to set both
as attributes during thread creation using pthreads.

This switches the cmd queue to the timeout version, too, to ensure the
thread doesn't get stuck waiting for messages, and allows cleaner exits.

Change-Id: I7e2f83a9b9df024acaf9076c58189cb6b7bcc34b
This commit is contained in:
Eric
2023-08-29 23:23:24 +02:00
parent 287ae681b7
commit d372eb2f0b
5 changed files with 93 additions and 31 deletions

View File

@@ -336,6 +336,15 @@ struct ms_trx : public BASET {
set_name_aff_sched(h, tgt.name, tgt.core, tgt.schedtype, tgt.prio);
}
using pt_sig = void *(*)(void *);
pthread_t spawn_worker_thread(sched_params::thread_names name, pt_sig fun, void *arg)
{
auto tgt = schdp[hw_target][name];
// std::cerr << "scheduling for: " << tgt.name << ":" << tgt.core << " prio:" << tgt.prio << std::endl;
return do_spawn_thr(tgt.name, tgt.core, tgt.schedtype, tgt.prio, fun, arg);
}
private:
void set_name_aff_sched(std::thread::native_handle_type h, const char *name, int cpunum, int schedtype,
int prio)
@@ -359,4 +368,28 @@ struct ms_trx : public BASET {
return exit(0);
}
}
pthread_t do_spawn_thr(const char *name, int cpunum, int schedtype, int prio, pt_sig fun, void *arg)
{
pthread_t thread;
pthread_attr_t attr;
pthread_attr_init(&attr);
sched_param sch_params;
sch_params.sched_priority = prio;
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpunum, &cpuset);
auto a = pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpuset);
a |= pthread_attr_setschedpolicy(&attr, schedtype);
a |= pthread_attr_setschedparam(&attr, &sch_params);
a |= pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);
if(a)
std::cerr << "thread arg rc:" << a << std::endl;
pthread_create(&thread, &attr, fun, arg);
pthread_setname_np(thread, name);
pthread_attr_destroy(&attr);
return thread;
}
};

View File

@@ -38,5 +38,5 @@ struct internal_q_tx_buf {
}
};
using tx_queue_t = spsc_cond<8 * 1, internal_q_tx_buf, true, false>;
using cmd_queue_t = spsc_cond<8 * 1, trxcon_phyif_cmd, true, false>;
using cmd_queue_t = spsc_cond_timeout<8 * 1, trxcon_phyif_cmd, true, false>;
using cmdr_queue_t = spsc_cond<8 * 1, trxcon_phyif_rsp, false, false>;

View File

@@ -27,6 +27,8 @@
#include <radioInterface.h>
#include <grgsm_vitac/grgsm_vitac.h>
// #define TXDEBUG
extern "C" {
#include "sch.h"
@@ -66,29 +68,54 @@ void upper_trx::stop_upper_threads()
{
g_exit_flag = true;
if (thr_control.joinable())
thr_control.join();
if (thr_rx.joinable())
thr_rx.join();
if (thr_tx.joinable())
thr_tx.join();
pthread_join(thr_control, NULL);
pthread_join(thr_tx, NULL);
}
void upper_trx::start_threads()
{
thr_control = std::thread([this] {
set_name_aff_sched(sched_params::thread_names::U_CTL);
while (!g_exit_flag) {
driveControl();
}
std::cerr << "exit U control!" << std::endl;
});
thr_tx = std::thread([this] {
set_name_aff_sched(sched_params::thread_names::U_TX);
while (!g_exit_flag) {
driveTx();
}
std::cerr << "exit U tx!" << std::endl;
});
DBGLG(...) << "spawning threads.." << std::endl;
thr_control = spawn_worker_thread(
sched_params::thread_names::U_CTL,
[](void *args) -> void * {
upper_trx *t = reinterpret_cast<upper_trx *>(args);
#ifdef TXDEBUG
struct sched_param param;
int policy;
pthread_getschedparam(pthread_self(), &policy, &param);
printf("ID: %lu, CPU: %d policy = %d priority = %d\n", pthread_self(), sched_getcpu(), policy,
param.sched_priority);
#endif
std::cerr << "started U control!" << std::endl;
while (!g_exit_flag) {
t->driveControl();
}
std::cerr << "exit U control!" << std::endl;
return 0;
},
this);
thr_tx = spawn_worker_thread(
sched_params::thread_names::U_TX,
[](void *args) -> void * {
upper_trx *t = reinterpret_cast<upper_trx *>(args);
#ifdef TXDEBUG
struct sched_param param;
int policy;
pthread_getschedparam(pthread_self(), &policy, &param);
printf("ID: %lu, CPU: %d policy = %d priority = %d\n", pthread_self(), sched_getcpu(), policy,
param.sched_priority);
#endif
std::cerr << "started U tx!" << std::endl;
while (!g_exit_flag) {
t->driveTx();
}
std::cerr << "exit U tx!" << std::endl;
return 0;
},
this);
#ifdef LSANDEBUG
std::thread([this] {
@@ -251,7 +278,7 @@ void upper_trx::driveTx()
internal_q_tx_buf *burst = &e;
#ifdef TXDEBUG
#ifdef TXDEBUG2
DBGLG() << "got burst!" << burst->r.fn << ":" << burst->ts << " current: " << timekeeper.gsmtime().FN()
<< " dff: " << (int64_t)((int64_t)timekeeper.gsmtime().FN() - (int64_t)burst->r.fn) << std::endl;
#endif
@@ -270,7 +297,7 @@ void upper_trx::driveTx()
// float -> int16
blade_sample_type burst_buf[txburst->size()];
convert_and_scale(burst_buf, txburst->begin(), txburst->size() * 2, 1);
#ifdef TXDEBUG
#ifdef TXDEBUG2
auto check = signalVector(txburst->size(), 40);
convert_and_scale(check.begin(), burst_buf, txburst->size() * 2, 1);
estim_burst_params ebp;
@@ -313,7 +340,7 @@ static const char *cmd2str(trxcon_phyif_cmd_type c)
static void print_cmd(trxcon_phyif_cmd_type c)
{
DBGLG() << cmd2str(c) << std::endl;
DBGLG() << "handling " << cmd2str(c) << std::endl;
}
#endif
@@ -323,6 +350,8 @@ bool upper_trx::driveControl()
trxcon_phyif_cmd cmd;
while (!cmdq_to_phy.spsc_pop(&cmd)) {
cmdq_to_phy.spsc_prep_pop();
if (g_exit_flag)
return false;
}
if (g_exit_flag)

View File

@@ -27,22 +27,22 @@
#include "ms.h"
class upper_trx : public ms_trx {
bool mOn;
volatile bool mOn;
char demodded_softbits[444];
// void driveControl();
bool driveControl();
void driveReceiveFIFO();
void driveTx();
bool pullRadioVector(GSM::Time &wTime, int &RSSI, int &timingOffset);
std::thread thr_control, thr_rx, thr_tx;
pthread_t thr_control, thr_tx;
public:
void start_threads();
void main_loop();
void stop_upper_threads();
bool driveControl();
void driveReceiveFIFO();
void driveTx();
upper_trx(){};
};