Files
osmo-trx/Transceiver52M/device/ipc2/ipcif.h
Eric 42cc715fac ill-fated ipcv2 for mstrx
The problem here is that the ipc if requires reducing the clock ind
inteval to a few fn/ts on the trx side to work at all, but scheduling
still does not work out, unless you start driving the IF using the tx
side, at which point the approach is useless, because it does not really
do more than a burst loopback that is easier to do with the higher
layers/interfaces.

The code is still useful should there be a reason to continue working on
this.

Change-Id: I8f582c7c06fed8d1dcc5ea52472a97dc313fdde5
2022-12-01 12:00:04 +01:00

388 lines
9.9 KiB
C++

/*
* (C) 2022 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
* All Rights Reserved
*
* Author: Eric Wild <ewild@sysmocom.de>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
#pragma once
#include <atomic>
#include <complex>
#include <cassert>
#include <deque>
#include <mutex>
#include <vector>
#include "shmif.h"
const int max_ul_rdlen = 1024 * 10;
const int max_dl_rdlen = 1024 * 10;
using sample_t = std::complex<int16_t>;
struct shm_if {
std::atomic<bool> ms_connected;
struct {
shm::sema r;
shm::sema w;
std::atomic<uint64_t> ts;
std::atomic<uint64_t> ts_req;
std::atomic<size_t> len_written_sps; // ->
sample_t buffer[max_ul_rdlen];
} ul;
struct {
shm::sema r;
shm::sema w;
std::atomic<uint64_t> ts;
std::atomic<uint64_t> ts_req;
std::atomic<size_t> len_written_sps;
sample_t buffer[max_dl_rdlen];
} dl;
};
// unique up to signed_type/2 diff
// ex: uint8/int8 (250, 0) = -6
template <typename A> auto unsigned_diff(A a, A b) -> typename std::make_signed<A>::type
{
using stype = typename std::make_signed<A>::type;
return (a > b) ? static_cast<stype>(a - b) : -static_cast<stype>(b - a);
};
constexpr inline int samp2byte(int v)
{
return v * sizeof(sample_t);
}
constexpr inline int byte2samp(int v)
{
return v / sizeof(sample_t);
}
struct ulentry {
bool done;
uint64_t ts;
unsigned int len_in_sps;
unsigned int read_pos_in_sps;
sample_t buf[1000];
};
/*
write: find read index +.. until marked free = "end" of current list
check:
within begin, end AND not free?
y:
copy (chunk)
if chunk advance burst buf ptr
n: next, advance, remove old.
*/
template <unsigned int num_bursts> class ulburstprovider {
std::mutex ul_q_m;
// std::deque<ulentry> ul_q;
// classic circular buffer
ulentry foo[num_bursts];
int current_index; // % num_bursts
void cur_buf_done()
{
foo[current_index].done = true;
current_index = current_index + 1 % num_bursts;
}
bool is_empty()
{
return foo[current_index].done = true;
}
void reset()
{
for (auto &i : foo)
i = {};
current_index = 0;
}
ulentry &find_free_at_end()
{
for (int i = current_index, max_to_search = 0; max_to_search < num_bursts;
i = (i + 1 % num_bursts), max_to_search++) {
if (foo[i].done)
return foo[i];
}
return foo[0]; // FIXME actually broken, q full, wat do?
}
void push_back(ulentry &e)
{
auto free_buf = find_free_at_end();
free_buf = e;
e.done = false;
}
public:
void add(ulentry &e)
{
std::lock_guard<std::mutex> foo(ul_q_m);
push_back(e);
}
void get(uint64_t requested_ts, unsigned int req_len_in_sps, sample_t *buf, unsigned int max_buf_write_len)
{
std::lock_guard<std::mutex> g(ul_q_m);
/*
1) if empty return
2) if not empty prune stale bursts
3) if only future bursts also return and zero buf
*/
for (int i = current_index, max_to_search = 0; max_to_search < num_bursts;
i = (i + 1 % num_bursts), max_to_search++) {
auto cur_entry = foo[i];
if (is_empty()) { // might be empty due to advance below!
memset(buf, 0, samp2byte(req_len_in_sps));
return;
}
if (cur_entry.ts + cur_entry.len_in_sps < requested_ts) { // remove late bursts
if (i == current_index) // only advance if we are at the front
cur_buf_done();
else
assert(true);
} else if (cur_entry.ts >= requested_ts + byte2samp(max_buf_write_len)) { // not in range
memset(buf, 0, samp2byte(req_len_in_sps));
return;
// FIXME: what about requested_ts <= entry.ts <= ts + reqlen?
} else {
// requested_ts <= cur_entry.ts <= requested_ts + byte2samp(max_write_len)
auto before_sps = unsigned_diff(cur_entry.ts, requested_ts);
// at least one whole buffer before our most recent "head" burst?
// set 0, return.
if (-before_sps >= byte2samp(max_buf_write_len)) {
memset(buf, 0, samp2byte(req_len_in_sps));
return;
}
// less than one full buffer before: pad 0
auto to_pad_sps = -before_sps;
memset(buf, 0, samp2byte(to_pad_sps));
requested_ts += to_pad_sps;
req_len_in_sps -= to_pad_sps;
if (!req_len_in_sps)
return;
// actual burst data after possible 0 pad
auto max_sps_to_write = std::min(cur_entry.len_in_sps, req_len_in_sps);
memcpy(&buf[samp2byte(to_pad_sps)], cur_entry.buf, samp2byte(max_sps_to_write));
requested_ts += max_sps_to_write;
req_len_in_sps -= max_sps_to_write;
cur_entry.read_pos_in_sps += max_sps_to_write;
//this buf is done...
if (cur_entry.read_pos_in_sps == cur_entry.len_in_sps) {
cur_buf_done();
}
if (!req_len_in_sps)
return;
}
}
}
};
class trxmsif {
shm::shm<shm_if> m;
shm_if *ptr;
ulburstprovider<10> p;
template <typename T> void read(T &direction, size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf)
{
static int readoffset_sps;
// auto &direction = ptr->dl;
auto buf = &direction.buffer[0];
size_t len_avail_sps = direction.len_written_sps.load();
auto left_to_read = len_avail_sps - readoffset_sps;
shm::mtx_log::print_guard() << "\tr @" << direction.ts.load() << " " << readoffset_sps << std::endl;
// no data, wait for new buffer, maybe some data left afterwards
if (!left_to_read) {
assert(readoffset_sps == len_avail_sps);
readoffset_sps = 0;
direction.r.reset_unsafe();
direction.ts_req = (*read_ts);
direction.w.set(1);
direction.r.wait_and_reset(1);
assert(*read_ts != direction.ts.load());
// shm::sema_guard g(dl.r, dl.w);
*read_ts = direction.ts.load();
len_avail_sps = direction.len_written_sps.load();
readoffset_sps += howmany_sps;
assert(len_avail_sps >= howmany_sps);
memcpy(outbuf, buf, samp2byte(howmany_sps));
shm::mtx_log::print_guard() << "\tr+ " << *read_ts << " " << howmany_sps << std::endl;
return;
}
*read_ts = direction.ts.load() + readoffset_sps;
left_to_read = len_avail_sps - readoffset_sps;
// data left from prev read
if (left_to_read >= howmany_sps) {
memcpy(outbuf, &buf[readoffset_sps], samp2byte(howmany_sps));
readoffset_sps += howmany_sps;
shm::mtx_log::print_guard() << "\tr++ " << *read_ts << " " << howmany_sps << std::endl;
return;
} else {
memcpy(outbuf, &buf[readoffset_sps], samp2byte(left_to_read));
readoffset_sps = 0;
auto still_left_to_read = howmany_sps - left_to_read;
{
direction.r.reset_unsafe();
direction.ts_req = (*read_ts);
direction.w.set(1);
direction.r.wait_and_reset(1);
assert(*read_ts != direction.ts.load());
len_avail_sps = direction.len_written_sps.load();
assert(len_avail_sps >= still_left_to_read);
memcpy(&outbuf[left_to_read], buf, samp2byte(still_left_to_read));
readoffset_sps += still_left_to_read;
shm::mtx_log::print_guard()
<< "\tr+++2 " << *read_ts << " " << howmany_sps << " " << still_left_to_read
<< " new @" << direction.ts.load() << std::endl;
}
}
}
public:
trxmsif() : m("trx-ms-if")
{
}
bool create()
{
m.create();
ptr = m.p();
return m.isgood();
}
bool connect()
{
m.open();
ptr = m.p();
ptr->ms_connected = true;
ptr->dl.w.set(1);
return m.isgood();
}
bool good()
{
return m.isgood();
}
bool is_connected()
{
return ptr->ms_connected == true;
}
/* is being read from ms side */
void read_dl(size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf)
{
return read(ptr->dl, howmany_sps, read_ts, outbuf);
}
/* is being read from trx/network side */
void read_ul(size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf)
{
// if (ptr->ms_connected != true) {
memset(outbuf, 0, samp2byte(howmany_sps));
// return;
// }
// return read(ptr->ul, howmany_sps, read_ts, outbuf);
}
void write_dl(size_t howmany_sps, uint64_t write_ts, sample_t *inbuf)
{
auto &dl = ptr->dl;
auto buf = &dl.buffer[0];
if (ptr->ms_connected != true)
return;
assert(sizeof(dl.buffer) >= samp2byte(howmany_sps));
// print_guard() << "####w " << std::endl;
{
shm::sema_wait_guard g(dl.w, dl.r);
memcpy(buf, inbuf, samp2byte(howmany_sps));
dl.ts.store(write_ts);
dl.len_written_sps.store(howmany_sps);
}
shm::mtx_log::print_guard() << std::endl
<< "####w+ " << write_ts << " " << howmany_sps << std::endl
<< std::endl;
}
void write_ul(size_t howmany_sps_sps, uint64_t write_ts, sample_t *inbuf)
{
auto &ul = ptr->ul;
assert(sizeof(ul.buffer) >= samp2byte(howmany_sps_sps));
// print_guard() << "####w " << std::endl;
ulentry e;
e.ts = write_ts;
e.len_in_sps = howmany_sps_sps;
e.done = false;
e.read_pos_in_sps = 0;
assert(sizeof(e.buf) >= samp2byte(howmany_sps_sps));
memcpy(e.buf, inbuf, samp2byte(howmany_sps_sps));
p.add(e);
shm::mtx_log::print_guard() << std::endl
<< "####q+ " << write_ts << " " << howmany_sps_sps << std::endl
<< std::endl;
}
void drive_tx()
{
auto &ul = ptr->ul;
auto buf = &ul.buffer[0];
const auto max_write_len = sizeof(ul.buffer);
// ul_q_m.lock();
// ul_q.push_front(e);
// ul_q_m.unlock();
// ul.w.wait_and_reset();
// no read waiting for a write
if (!ul.w.check_unsafe(1))
return;
// FIXME: store written, notify after get!
auto requested_ts = ul.ts_req.load();
p.get(requested_ts, byte2samp(max_write_len), buf, max_write_len);
// memset(buf, 0, max_write_len);
ul.ts.store(requested_ts);
ul.len_written_sps.store(byte2samp(max_write_len));
ul.w.reset_unsafe();
ul.r.set(1);
}
void signal_read_start()
{ /* nop */
}
};