mirror of
https://github.com/fairwaves/UHD-Fairwaves.git
synced 2025-11-03 05:23:14 +00:00
umtrx: work on io impl and recv streamer
This commit is contained in:
@@ -38,6 +38,7 @@ list(APPEND UMTRX_SOURCES
|
||||
cores/rx_dsp_core_200.cpp
|
||||
cores/tx_dsp_core_200.cpp
|
||||
cores/time64_core_200.cpp
|
||||
cores/validate_subdev_spec.cpp
|
||||
)
|
||||
|
||||
########################################################################
|
||||
|
||||
71
host/cores/async_packet_handler.hpp
Normal file
71
host/cores/async_packet_handler.hpp
Normal file
@@ -0,0 +1,71 @@
|
||||
//
|
||||
// Copyright 2012 Ettus Research LLC
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
//
|
||||
|
||||
#ifndef INCLUDED_LIBUHD_USRP_COMMON_ASYNC_PACKET_HANDLER_HPP
|
||||
#define INCLUDED_LIBUHD_USRP_COMMON_ASYNC_PACKET_HANDLER_HPP
|
||||
|
||||
#include <uhd/config.hpp>
|
||||
#include <uhd/transport/vrt_if_packet.hpp>
|
||||
#include <uhd/types/metadata.hpp>
|
||||
#include <uhd/utils/byteswap.hpp>
|
||||
#include <uhd/utils/msg.hpp>
|
||||
|
||||
namespace uhd{ namespace usrp{
|
||||
|
||||
template <typename to_host_type>
|
||||
void load_metadata_from_buff(
|
||||
const to_host_type &to_host,
|
||||
async_metadata_t &metadata,
|
||||
const transport::vrt::if_packet_info_t &if_packet_info,
|
||||
const boost::uint32_t *vrt_hdr,
|
||||
const double tick_rate,
|
||||
const size_t channel = 0
|
||||
){
|
||||
const boost::uint32_t *payload = vrt_hdr + if_packet_info.num_header_words32;
|
||||
|
||||
//load into metadata
|
||||
metadata.channel = channel;
|
||||
metadata.has_time_spec = if_packet_info.has_tsf;
|
||||
metadata.time_spec = time_spec_t::from_ticks(if_packet_info.tsf, tick_rate);
|
||||
metadata.event_code = async_metadata_t::event_code_t(to_host(payload[0]) & 0xff);
|
||||
|
||||
//load user payload
|
||||
for (size_t i = 1; i < if_packet_info.num_payload_words32; i++){
|
||||
if (i-1 == 4) break; //limit of 4 words32
|
||||
metadata.user_payload[i-1] = to_host(payload[i]);
|
||||
}
|
||||
}
|
||||
|
||||
UHD_INLINE void standard_async_msg_prints(const async_metadata_t &metadata)
|
||||
{
|
||||
if (metadata.event_code &
|
||||
( async_metadata_t::EVENT_CODE_UNDERFLOW
|
||||
| async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET)
|
||||
) UHD_MSG(fastpath) << "U";
|
||||
else if (metadata.event_code &
|
||||
( async_metadata_t::EVENT_CODE_SEQ_ERROR
|
||||
| async_metadata_t::EVENT_CODE_SEQ_ERROR_IN_BURST)
|
||||
) UHD_MSG(fastpath) << "S";
|
||||
else if (metadata.event_code &
|
||||
async_metadata_t::EVENT_CODE_TIME_ERROR
|
||||
) UHD_MSG(fastpath) << "L";
|
||||
}
|
||||
|
||||
|
||||
}} //namespace uhd::usrp
|
||||
|
||||
#endif /* INCLUDED_LIBUHD_USRP_COMMON_ASYNC_PACKET_HANDLER_HPP */
|
||||
802
host/cores/super_recv_packet_handler.hpp
Normal file
802
host/cores/super_recv_packet_handler.hpp
Normal file
@@ -0,0 +1,802 @@
|
||||
//
|
||||
// Copyright 2011-2013 Ettus Research LLC
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
//
|
||||
|
||||
#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP
|
||||
#define INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP
|
||||
|
||||
#include <uhd/config.hpp>
|
||||
#include <uhd/exception.hpp>
|
||||
#include <uhd/convert.hpp>
|
||||
#include <uhd/stream.hpp>
|
||||
#include <uhd/utils/msg.hpp>
|
||||
#include <uhd/utils/tasks.hpp>
|
||||
#include <uhd/utils/atomic.hpp>
|
||||
#include <uhd/utils/byteswap.hpp>
|
||||
#include <uhd/types/metadata.hpp>
|
||||
#include <uhd/transport/vrt_if_packet.hpp>
|
||||
#include <uhd/transport/zero_copy.hpp>
|
||||
#include <boost/dynamic_bitset.hpp>
|
||||
#include <boost/foreach.hpp>
|
||||
#include <boost/function.hpp>
|
||||
#include <boost/format.hpp>
|
||||
#include <boost/bind.hpp>
|
||||
#include <boost/make_shared.hpp>
|
||||
#include <boost/thread/barrier.hpp>
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
|
||||
// Included for debugging
|
||||
#ifdef UHD_TXRX_DEBUG_PRINTS
|
||||
#include <boost/format.hpp>
|
||||
#include <boost/thread/thread.hpp>
|
||||
#include "boost/date_time/posix_time/posix_time.hpp"
|
||||
#endif
|
||||
|
||||
namespace uhd{ namespace transport{ namespace sph{
|
||||
|
||||
UHD_INLINE boost::uint32_t get_context_code(
|
||||
const boost::uint32_t *vrt_hdr, const vrt::if_packet_info_t &if_packet_info
|
||||
){
|
||||
//extract the context word (we dont know the endianness so mirror the bytes)
|
||||
boost::uint32_t word0 = vrt_hdr[if_packet_info.num_header_words32] |
|
||||
uhd::byteswap(vrt_hdr[if_packet_info.num_header_words32]);
|
||||
return word0 & 0xff;
|
||||
}
|
||||
|
||||
typedef boost::function<void(void)> handle_overflow_type;
|
||||
static inline void handle_overflow_nop(void){}
|
||||
|
||||
/***********************************************************************
|
||||
* Super receive packet handler
|
||||
*
|
||||
* A receive packet handler represents a group of channels.
|
||||
* The channel group shares a common sample rate.
|
||||
* All channels are received in unison in recv().
|
||||
**********************************************************************/
|
||||
class recv_packet_handler{
|
||||
public:
|
||||
typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type;
|
||||
typedef boost::function<void(const size_t)> handle_flowctrl_type;
|
||||
typedef boost::function<void(const stream_cmd_t&)> issue_stream_cmd_type;
|
||||
typedef void(*vrt_unpacker_type)(const boost::uint32_t *, vrt::if_packet_info_t &);
|
||||
//typedef boost::function<void(const boost::uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type;
|
||||
|
||||
/*!
|
||||
* Make a new packet handler for receive
|
||||
* \param size the number of transport channels
|
||||
*/
|
||||
recv_packet_handler(const size_t size = 1):
|
||||
_queue_error_for_next_call(false),
|
||||
_buffers_infos_index(0)
|
||||
{
|
||||
#ifdef ERROR_INJECT_DROPPED_PACKETS
|
||||
recvd_packets = 0;
|
||||
#endif
|
||||
|
||||
this->resize(size);
|
||||
set_alignment_failure_threshold(1000);
|
||||
}
|
||||
|
||||
~recv_packet_handler(void){
|
||||
_task_barrier.interrupt();
|
||||
_task_handlers.clear();
|
||||
}
|
||||
|
||||
//! Resize the number of transport channels
|
||||
void resize(const size_t size){
|
||||
if (this->size() == size) return;
|
||||
_task_handlers.clear();
|
||||
_props.resize(size);
|
||||
//re-initialize all buffers infos by re-creating the vector
|
||||
_buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size));
|
||||
_task_barrier.resize(size);
|
||||
_task_handlers.resize(size);
|
||||
for (size_t i = 1/*skip 0*/; i < size; i++){
|
||||
_task_handlers[i] = task::make(boost::bind(&recv_packet_handler::converter_thread_task, this, i));
|
||||
};
|
||||
}
|
||||
|
||||
//! Get the channel width of this handler
|
||||
size_t size(void) const{
|
||||
return _props.size();
|
||||
}
|
||||
|
||||
//! Setup the vrt unpacker function and offset
|
||||
void set_vrt_unpacker(const vrt_unpacker_type &vrt_unpacker, const size_t header_offset_words32 = 0){
|
||||
_vrt_unpacker = vrt_unpacker;
|
||||
_header_offset_words32 = header_offset_words32;
|
||||
}
|
||||
|
||||
/*!
|
||||
* Set the threshold for alignment failure.
|
||||
* How many packets throw out before giving up?
|
||||
* \param threshold number of packets per channel
|
||||
*/
|
||||
void set_alignment_failure_threshold(const size_t threshold){
|
||||
_alignment_faulure_threshold = threshold*this->size();
|
||||
}
|
||||
|
||||
//! Set the rate of ticks per second
|
||||
void set_tick_rate(const double rate){
|
||||
_tick_rate = rate;
|
||||
}
|
||||
|
||||
//! Set the rate of samples per second
|
||||
void set_samp_rate(const double rate){
|
||||
_samp_rate = rate;
|
||||
}
|
||||
|
||||
/*!
|
||||
* Set the function to get a managed buffer.
|
||||
* \param xport_chan which transport channel
|
||||
* \param get_buff the getter function
|
||||
*/
|
||||
void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff, const bool flush = false){
|
||||
if (flush){
|
||||
while (get_buff(0.0));
|
||||
}
|
||||
_props.at(xport_chan).get_buff = get_buff;
|
||||
}
|
||||
|
||||
/*!
|
||||
* Flush all transports in the streamer:
|
||||
* The packet payload is discarded.
|
||||
*/
|
||||
void flush_all(const double timeout = 0.0)
|
||||
{
|
||||
_flush_all(timeout);
|
||||
return;
|
||||
}
|
||||
|
||||
/*!
|
||||
* Set the function to handle flow control
|
||||
* \param xport_chan which transport channel
|
||||
* \param handle_flowctrl the callback function
|
||||
*/
|
||||
void set_xport_handle_flowctrl(const size_t xport_chan, const handle_flowctrl_type &handle_flowctrl, const size_t update_window, const bool do_init = false)
|
||||
{
|
||||
_props.at(xport_chan).handle_flowctrl = handle_flowctrl;
|
||||
//we need the window size to be within the 0xfff (max 12 bit seq)
|
||||
_props.at(xport_chan).fc_update_window = std::min<size_t>(update_window, 0xfff);
|
||||
if (do_init) handle_flowctrl(0);
|
||||
}
|
||||
|
||||
//! Set the conversion routine for all channels
|
||||
void set_converter(const uhd::convert::id_type &id){
|
||||
_num_outputs = id.num_outputs;
|
||||
_converter = uhd::convert::get_converter(id)();
|
||||
this->set_scale_factor(1/32767.); //update after setting converter
|
||||
_bytes_per_otw_item = uhd::convert::get_bytes_per_item(id.input_format);
|
||||
_bytes_per_cpu_item = uhd::convert::get_bytes_per_item(id.output_format);
|
||||
}
|
||||
|
||||
//! Set the transport channel's overflow handler
|
||||
void set_overflow_handler(const size_t xport_chan, const handle_overflow_type &handle_overflow){
|
||||
_props.at(xport_chan).handle_overflow = handle_overflow;
|
||||
}
|
||||
|
||||
//! Set the scale factor used in float conversion
|
||||
void set_scale_factor(const double scale_factor){
|
||||
_converter->set_scalar(scale_factor);
|
||||
}
|
||||
|
||||
//! Set the callback to issue stream commands
|
||||
void set_issue_stream_cmd(const size_t xport_chan, const issue_stream_cmd_type &issue_stream_cmd)
|
||||
{
|
||||
_props.at(xport_chan).issue_stream_cmd = issue_stream_cmd;
|
||||
}
|
||||
|
||||
//! Overload call to issue stream commands
|
||||
void issue_stream_cmd(const stream_cmd_t &stream_cmd)
|
||||
{
|
||||
for (size_t i = 0; i < _props.size(); i++)
|
||||
{
|
||||
if (_props[i].issue_stream_cmd) _props[i].issue_stream_cmd(stream_cmd);
|
||||
}
|
||||
}
|
||||
|
||||
/*******************************************************************
|
||||
* Receive:
|
||||
* The entry point for the fast-path receive calls.
|
||||
* Dispatch into combinations of single packet receive calls.
|
||||
******************************************************************/
|
||||
UHD_INLINE size_t recv(
|
||||
const uhd::rx_streamer::buffs_type &buffs,
|
||||
const size_t nsamps_per_buff,
|
||||
uhd::rx_metadata_t &metadata,
|
||||
const double timeout,
|
||||
const bool one_packet
|
||||
){
|
||||
//handle metadata queued from a previous receive
|
||||
if (_queue_error_for_next_call){
|
||||
_queue_error_for_next_call = false;
|
||||
metadata = _queue_metadata;
|
||||
//We want to allow a full buffer recv to be cut short by a timeout,
|
||||
//but do not want to generate an inline timeout message packet.
|
||||
if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) return 0;
|
||||
}
|
||||
|
||||
size_t accum_num_samps = recv_one_packet(
|
||||
buffs, nsamps_per_buff, metadata, timeout
|
||||
);
|
||||
|
||||
if (one_packet){
|
||||
#ifdef UHD_TXRX_DEBUG_PRINTS
|
||||
dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet);
|
||||
#endif
|
||||
return accum_num_samps;
|
||||
}
|
||||
|
||||
//first recv had an error code set, return immediately
|
||||
if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) return accum_num_samps;
|
||||
|
||||
//loop until buffer is filled or error code
|
||||
while(accum_num_samps < nsamps_per_buff){
|
||||
size_t num_samps = recv_one_packet(
|
||||
buffs, nsamps_per_buff - accum_num_samps, _queue_metadata,
|
||||
timeout, accum_num_samps*_bytes_per_cpu_item
|
||||
);
|
||||
|
||||
//metadata had an error code set, store for next call and return
|
||||
if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE){
|
||||
_queue_error_for_next_call = true;
|
||||
break;
|
||||
}
|
||||
accum_num_samps += num_samps;
|
||||
}
|
||||
#ifdef UHD_TXRX_DEBUG_PRINTS
|
||||
dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet);
|
||||
#endif
|
||||
return accum_num_samps;
|
||||
}
|
||||
|
||||
private:
|
||||
vrt_unpacker_type _vrt_unpacker;
|
||||
size_t _header_offset_words32;
|
||||
double _tick_rate, _samp_rate;
|
||||
bool _queue_error_for_next_call;
|
||||
size_t _alignment_faulure_threshold;
|
||||
rx_metadata_t _queue_metadata;
|
||||
struct xport_chan_props_type{
|
||||
xport_chan_props_type(void):
|
||||
packet_count(0),
|
||||
handle_overflow(&handle_overflow_nop),
|
||||
fc_update_window(0)
|
||||
{}
|
||||
get_buff_type get_buff;
|
||||
issue_stream_cmd_type issue_stream_cmd;
|
||||
size_t packet_count;
|
||||
handle_overflow_type handle_overflow;
|
||||
handle_flowctrl_type handle_flowctrl;
|
||||
size_t fc_update_window;
|
||||
};
|
||||
std::vector<xport_chan_props_type> _props;
|
||||
size_t _num_outputs;
|
||||
size_t _bytes_per_otw_item; //used in conversion
|
||||
size_t _bytes_per_cpu_item; //used in conversion
|
||||
uhd::convert::converter::sptr _converter; //used in conversion
|
||||
|
||||
//! information stored for a received buffer
|
||||
struct per_buffer_info_type{
|
||||
void reset()
|
||||
{
|
||||
buff.reset();
|
||||
vrt_hdr = NULL;
|
||||
time = time_spec_t(0.0);
|
||||
copy_buff = NULL;
|
||||
}
|
||||
managed_recv_buffer::sptr buff;
|
||||
const boost::uint32_t *vrt_hdr;
|
||||
vrt::if_packet_info_t ifpi;
|
||||
time_spec_t time;
|
||||
const char *copy_buff;
|
||||
};
|
||||
|
||||
//!information stored for a set of aligned buffers
|
||||
struct buffers_info_type : std::vector<per_buffer_info_type> {
|
||||
buffers_info_type(const size_t size):
|
||||
std::vector<per_buffer_info_type>(size),
|
||||
indexes_todo(size, true),
|
||||
alignment_time_valid(false),
|
||||
data_bytes_to_copy(0),
|
||||
fragment_offset_in_samps(0)
|
||||
{/* NOP */}
|
||||
void reset()
|
||||
{
|
||||
indexes_todo.set();
|
||||
alignment_time = time_spec_t(0.0);
|
||||
alignment_time_valid = false;
|
||||
data_bytes_to_copy = 0;
|
||||
fragment_offset_in_samps = 0;
|
||||
metadata.reset();
|
||||
for (size_t i = 0; i < size(); i++)
|
||||
at(i).reset();
|
||||
}
|
||||
boost::dynamic_bitset<> indexes_todo; //used in alignment logic
|
||||
time_spec_t alignment_time; //used in alignment logic
|
||||
bool alignment_time_valid; //used in alignment logic
|
||||
size_t data_bytes_to_copy; //keeps track of state
|
||||
size_t fragment_offset_in_samps; //keeps track of state
|
||||
rx_metadata_t metadata; //packet description
|
||||
};
|
||||
|
||||
//! a circular queue of buffer infos
|
||||
std::vector<buffers_info_type> _buffers_infos;
|
||||
size_t _buffers_infos_index;
|
||||
buffers_info_type &get_curr_buffer_info(void){return _buffers_infos[_buffers_infos_index];}
|
||||
buffers_info_type &get_prev_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 3)%4];}
|
||||
buffers_info_type &get_next_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 1)%4];}
|
||||
void increment_buffer_info(void){_buffers_infos_index = (_buffers_infos_index + 1)%4;}
|
||||
|
||||
//! possible return options for the packet receiver
|
||||
enum packet_type{
|
||||
PACKET_IF_DATA,
|
||||
PACKET_TIMESTAMP_ERROR,
|
||||
PACKET_INLINE_MESSAGE,
|
||||
PACKET_TIMEOUT_ERROR,
|
||||
PACKET_SEQUENCE_ERROR
|
||||
};
|
||||
|
||||
#ifdef ERROR_INJECT_DROPPED_PACKETS
|
||||
int recvd_packets;
|
||||
#endif
|
||||
|
||||
/*******************************************************************
|
||||
* Get and process a single packet from the transport:
|
||||
* Receive a single packet at the given index.
|
||||
* Extract all the relevant info and store.
|
||||
* Check the info to determine the return code.
|
||||
******************************************************************/
|
||||
UHD_INLINE packet_type get_and_process_single_packet(
|
||||
const size_t index,
|
||||
per_buffer_info_type &prev_buffer_info,
|
||||
per_buffer_info_type &curr_buffer_info,
|
||||
double timeout
|
||||
){
|
||||
//get a single packet from the transport layer
|
||||
managed_recv_buffer::sptr &buff = curr_buffer_info.buff;
|
||||
buff = _props[index].get_buff(timeout);
|
||||
if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR;
|
||||
|
||||
#ifdef ERROR_INJECT_DROPPED_PACKETS
|
||||
if (++recvd_packets > 1000)
|
||||
{
|
||||
recvd_packets = 0;
|
||||
buff.reset();
|
||||
buff = _props[index].get_buff(timeout);
|
||||
if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR;
|
||||
}
|
||||
#endif
|
||||
|
||||
//bounds check before extract
|
||||
size_t num_packet_words32 = buff->size()/sizeof(boost::uint32_t);
|
||||
if (num_packet_words32 <= _header_offset_words32){
|
||||
throw std::runtime_error("recv buffer smaller than vrt packet offset");
|
||||
}
|
||||
|
||||
//extract packet info
|
||||
per_buffer_info_type &info = curr_buffer_info;
|
||||
info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32;
|
||||
info.vrt_hdr = buff->cast<const boost::uint32_t *>() + _header_offset_words32;
|
||||
_vrt_unpacker(info.vrt_hdr, info.ifpi);
|
||||
info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true
|
||||
info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32);
|
||||
|
||||
//handle flow control
|
||||
if (_props[index].handle_flowctrl)
|
||||
{
|
||||
if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0)
|
||||
{
|
||||
_props[index].handle_flowctrl(info.ifpi.packet_count);
|
||||
}
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------
|
||||
//-- Determine return conditions:
|
||||
//-- The order of these checks is HOLY.
|
||||
//--------------------------------------------------------------
|
||||
|
||||
//1) check for inline IF message packets
|
||||
if (info.ifpi.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){
|
||||
return PACKET_INLINE_MESSAGE;
|
||||
}
|
||||
|
||||
//2) check for sequence errors
|
||||
#ifndef SRPH_DONT_CHECK_SEQUENCE
|
||||
const size_t seq_mask = (info.ifpi.link_type == vrt::if_packet_info_t::LINK_TYPE_NONE)? 0xf : 0xfff;
|
||||
const size_t expected_packet_count = _props[index].packet_count;
|
||||
_props[index].packet_count = (info.ifpi.packet_count + 1) & seq_mask;
|
||||
if (expected_packet_count != info.ifpi.packet_count){
|
||||
return PACKET_SEQUENCE_ERROR;
|
||||
}
|
||||
#endif
|
||||
|
||||
//3) check for out of order timestamps
|
||||
if (info.ifpi.has_tsf and prev_buffer_info.time > info.time){
|
||||
return PACKET_TIMESTAMP_ERROR;
|
||||
}
|
||||
|
||||
//4) otherwise the packet is normal!
|
||||
return PACKET_IF_DATA;
|
||||
}
|
||||
|
||||
void _flush_all(double timeout)
|
||||
{
|
||||
for (size_t i = 0; i < _props.size(); i++)
|
||||
{
|
||||
per_buffer_info_type prev_buffer_info, curr_buffer_info;
|
||||
while (true)
|
||||
{
|
||||
//receive a single packet from the transport
|
||||
try
|
||||
{
|
||||
// call into get_and_process_single_packet()
|
||||
// to make sure flow control is handled
|
||||
if (get_and_process_single_packet(
|
||||
i,
|
||||
prev_buffer_info,
|
||||
curr_buffer_info,
|
||||
timeout) == PACKET_TIMEOUT_ERROR) break;
|
||||
} catch(...){}
|
||||
prev_buffer_info = curr_buffer_info;
|
||||
curr_buffer_info.reset();
|
||||
}
|
||||
}
|
||||
get_prev_buffer_info().reset();
|
||||
get_curr_buffer_info().reset();
|
||||
get_next_buffer_info().reset();
|
||||
}
|
||||
|
||||
/*******************************************************************
|
||||
* Alignment check:
|
||||
* Check the received packet for alignment and mark accordingly.
|
||||
******************************************************************/
|
||||
UHD_INLINE void alignment_check(
|
||||
const size_t index, buffers_info_type &info
|
||||
){
|
||||
//if alignment time was not valid or if the sequence id is newer:
|
||||
// use this index's time as the alignment time
|
||||
// reset the indexes list and remove this index
|
||||
if (not info.alignment_time_valid or info[index].time > info.alignment_time){
|
||||
info.alignment_time_valid = true;
|
||||
info.alignment_time = info[index].time;
|
||||
info.indexes_todo.set();
|
||||
info.indexes_todo.reset(index);
|
||||
info.data_bytes_to_copy = info[index].ifpi.num_payload_bytes;
|
||||
}
|
||||
|
||||
//if the sequence id matches:
|
||||
// remove this index from the list and continue
|
||||
else if (info[index].time == info.alignment_time){
|
||||
info.indexes_todo.reset(index);
|
||||
}
|
||||
|
||||
//if the sequence id is older:
|
||||
// continue with the same index to try again
|
||||
//else if (info[index].time < info.alignment_time)...
|
||||
}
|
||||
|
||||
/*******************************************************************
|
||||
* Get aligned buffers:
|
||||
* Iterate through each index and try to accumulate aligned buffers.
|
||||
* Handle all of the edge cases like inline messages and errors.
|
||||
* The logic will throw out older packets until it finds a match.
|
||||
******************************************************************/
|
||||
UHD_INLINE void get_aligned_buffs(double timeout){
|
||||
|
||||
get_prev_buffer_info().reset(); // no longer need the previous info - reset it for future use
|
||||
|
||||
increment_buffer_info(); //increment to next buffer
|
||||
|
||||
buffers_info_type &prev_info = get_prev_buffer_info();
|
||||
buffers_info_type &curr_info = get_curr_buffer_info();
|
||||
buffers_info_type &next_info = get_next_buffer_info();
|
||||
|
||||
//Loop until we get a message of an aligned set of buffers:
|
||||
// - Receive a single packet and extract its info.
|
||||
// - Handle the packet type yielded by the receive.
|
||||
// - Check the timestamps for alignment conditions.
|
||||
size_t iterations = 0;
|
||||
while (curr_info.indexes_todo.any()){
|
||||
|
||||
//get the index to process for this iteration
|
||||
const size_t index = curr_info.indexes_todo.find_first();
|
||||
packet_type packet;
|
||||
|
||||
//receive a single packet from the transport
|
||||
try{
|
||||
packet = get_and_process_single_packet(
|
||||
index, prev_info[index], curr_info[index], timeout
|
||||
);
|
||||
}
|
||||
|
||||
//handle the case when the get packet throws
|
||||
catch(const std::exception &e){
|
||||
UHD_MSG(error) << boost::format(
|
||||
"The receive packet handler caught an exception.\n%s"
|
||||
) % e.what() << std::endl;
|
||||
std::swap(curr_info, next_info); //save progress from curr -> next
|
||||
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
|
||||
return;
|
||||
}
|
||||
|
||||
switch(packet){
|
||||
case PACKET_IF_DATA:
|
||||
alignment_check(index, curr_info);
|
||||
break;
|
||||
|
||||
case PACKET_TIMESTAMP_ERROR:
|
||||
//If the user changes the device time while streaming or without flushing,
|
||||
//we can receive a packet that comes before the previous packet in time.
|
||||
//This could cause the alignment logic to discard future received packets.
|
||||
//Therefore, when this occurs, we reset the info to restart from scratch.
|
||||
if (curr_info.alignment_time_valid and curr_info.alignment_time != curr_info[index].time){
|
||||
curr_info.alignment_time_valid = false;
|
||||
}
|
||||
alignment_check(index, curr_info);
|
||||
break;
|
||||
|
||||
case PACKET_INLINE_MESSAGE:
|
||||
std::swap(curr_info, next_info); //save progress from curr -> next
|
||||
curr_info.metadata.has_time_spec = next_info[index].ifpi.has_tsf;
|
||||
curr_info.metadata.time_spec = next_info[index].time;
|
||||
curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi));
|
||||
if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){
|
||||
rx_metadata_t metadata = curr_info.metadata;
|
||||
_props[index].handle_overflow();
|
||||
curr_info.metadata = metadata;
|
||||
UHD_MSG(fastpath) << "O";
|
||||
}
|
||||
return;
|
||||
|
||||
case PACKET_TIMEOUT_ERROR:
|
||||
std::swap(curr_info, next_info); //save progress from curr -> next
|
||||
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
|
||||
return;
|
||||
|
||||
case PACKET_SEQUENCE_ERROR:
|
||||
alignment_check(index, curr_info);
|
||||
std::swap(curr_info, next_info); //save progress from curr -> next
|
||||
curr_info.metadata.has_time_spec = prev_info.metadata.has_time_spec;
|
||||
curr_info.metadata.time_spec = prev_info.metadata.time_spec + time_spec_t::from_ticks(
|
||||
prev_info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t)/_bytes_per_otw_item, _samp_rate);
|
||||
curr_info.metadata.out_of_sequence = true;
|
||||
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
|
||||
UHD_MSG(fastpath) << "D";
|
||||
return;
|
||||
|
||||
}
|
||||
|
||||
//too many iterations: detect alignment failure
|
||||
if (iterations++ > _alignment_faulure_threshold){
|
||||
UHD_MSG(error) << boost::format(
|
||||
"The receive packet handler failed to time-align packets.\n"
|
||||
"%u received packets were processed by the handler.\n"
|
||||
"However, a timestamp match could not be determined.\n"
|
||||
) % iterations << std::endl;
|
||||
std::swap(curr_info, next_info); //save progress from curr -> next
|
||||
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
|
||||
_props[index].handle_overflow();
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//set the metadata from the buffer information at index zero
|
||||
curr_info.metadata.has_time_spec = curr_info[0].ifpi.has_tsf;
|
||||
curr_info.metadata.time_spec = curr_info[0].time;
|
||||
curr_info.metadata.more_fragments = false;
|
||||
curr_info.metadata.fragment_offset = 0;
|
||||
curr_info.metadata.start_of_burst = curr_info[0].ifpi.sob;
|
||||
curr_info.metadata.end_of_burst = curr_info[0].ifpi.eob;
|
||||
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_NONE;
|
||||
|
||||
}
|
||||
|
||||
/*******************************************************************
|
||||
* Receive a single packet:
|
||||
* Handles fragmentation, messages, errors, and copy-conversion.
|
||||
* When no fragments are available, call the get aligned buffers.
|
||||
* Then copy-convert available data into the user's IO buffers.
|
||||
******************************************************************/
|
||||
UHD_INLINE size_t recv_one_packet(
|
||||
const uhd::rx_streamer::buffs_type &buffs,
|
||||
const size_t nsamps_per_buff,
|
||||
uhd::rx_metadata_t &metadata,
|
||||
const double timeout,
|
||||
const size_t buffer_offset_bytes = 0
|
||||
){
|
||||
//get the next buffer if the current one has expired
|
||||
if (get_curr_buffer_info().data_bytes_to_copy == 0)
|
||||
{
|
||||
//perform receive with alignment logic
|
||||
get_aligned_buffs(timeout);
|
||||
}
|
||||
|
||||
buffers_info_type &info = get_curr_buffer_info();
|
||||
metadata = info.metadata;
|
||||
|
||||
//interpolate the time spec (useful when this is a fragment)
|
||||
metadata.time_spec += time_spec_t::from_ticks(info.fragment_offset_in_samps, _samp_rate);
|
||||
|
||||
//extract the number of samples available to copy
|
||||
const size_t nsamps_available = info.data_bytes_to_copy/_bytes_per_otw_item;
|
||||
const size_t nsamps_to_copy = std::min(nsamps_per_buff*_num_outputs, nsamps_available);
|
||||
const size_t bytes_to_copy = nsamps_to_copy*_bytes_per_otw_item;
|
||||
const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_num_outputs;
|
||||
|
||||
//setup the data to share with converter threads
|
||||
_convert_nsamps = nsamps_to_copy_per_io_buff;
|
||||
_convert_buffs = &buffs;
|
||||
_convert_buffer_offset_bytes = buffer_offset_bytes;
|
||||
_convert_bytes_to_copy = bytes_to_copy;
|
||||
|
||||
//perform N channels of conversion
|
||||
converter_thread_task(0);
|
||||
|
||||
//update the copy buffer's availability
|
||||
info.data_bytes_to_copy -= bytes_to_copy;
|
||||
|
||||
//setup the fragment flags and offset
|
||||
metadata.more_fragments = info.data_bytes_to_copy != 0;
|
||||
metadata.fragment_offset = info.fragment_offset_in_samps;
|
||||
info.fragment_offset_in_samps += nsamps_to_copy; //set for next call
|
||||
|
||||
return nsamps_to_copy_per_io_buff;
|
||||
}
|
||||
|
||||
/*******************************************************************
|
||||
* Perform one thread's work of the conversion task.
|
||||
* The entry and exit use a dual synchronization barrier,
|
||||
* to wait for data to become ready and block until completion.
|
||||
******************************************************************/
|
||||
UHD_INLINE void converter_thread_task(const size_t index)
|
||||
{
|
||||
_task_barrier.wait();
|
||||
|
||||
//shortcut references to local data structures
|
||||
buffers_info_type &buff_info = get_curr_buffer_info();
|
||||
per_buffer_info_type &info = buff_info[index];
|
||||
const rx_streamer::buffs_type &buffs = *_convert_buffs;
|
||||
|
||||
//fill IO buffs with pointers into the output buffer
|
||||
void *io_buffs[4/*max interleave*/];
|
||||
for (size_t i = 0; i < _num_outputs; i++){
|
||||
char *b = reinterpret_cast<char *>(buffs[index*_num_outputs + i]);
|
||||
io_buffs[i] = b + _convert_buffer_offset_bytes;
|
||||
}
|
||||
const ref_vector<void *> out_buffs(io_buffs, _num_outputs);
|
||||
|
||||
//perform the conversion operation
|
||||
_converter->conv(info.copy_buff, out_buffs, _convert_nsamps);
|
||||
|
||||
//advance the pointer for the source buffer
|
||||
info.copy_buff += _convert_bytes_to_copy;
|
||||
|
||||
//release the buffer if fully consumed
|
||||
if (buff_info.data_bytes_to_copy == _convert_bytes_to_copy){
|
||||
info.buff.reset(); //effectively a release
|
||||
}
|
||||
|
||||
if (index == 0) _task_barrier.wait_others();
|
||||
}
|
||||
|
||||
//! Shared variables for the worker threads
|
||||
reusable_barrier _task_barrier;
|
||||
std::vector<task::sptr> _task_handlers;
|
||||
size_t _convert_nsamps;
|
||||
const rx_streamer::buffs_type *_convert_buffs;
|
||||
size_t _convert_buffer_offset_bytes;
|
||||
size_t _convert_bytes_to_copy;
|
||||
|
||||
/*
|
||||
* This last section is only for debugging purposes.
|
||||
* It causes a lot of prints to stderr which can be piped to a file.
|
||||
* Gathered data can be used to post process it with external tools.
|
||||
*/
|
||||
#ifdef UHD_TXRX_DEBUG_PRINTS
|
||||
struct dbg_recv_stat_t {
|
||||
dbg_recv_stat_t(long wc, size_t nspb, size_t nsr, uhd::rx_metadata_t md, double to, bool op, double rate):
|
||||
wallclock(wc), nsamps_per_buff(nspb), nsamps_recv(nsr), metadata(md), timeout(to), one_packet(op), samp_rate(rate)
|
||||
{}
|
||||
long wallclock;
|
||||
size_t nsamps_per_buff;
|
||||
size_t nsamps_recv;
|
||||
uhd::rx_metadata_t metadata;
|
||||
double timeout;
|
||||
bool one_packet;
|
||||
double samp_rate;
|
||||
// Create a formatted print line for all the info gathered in this struct.
|
||||
std::string print_line() {
|
||||
boost::format fmt("recv,%ld,%f,%i,%i,%s,%i,%s,%s,%s,%i,%s,%ld");
|
||||
fmt % wallclock;
|
||||
fmt % timeout % (int)nsamps_per_buff % (int) nsamps_recv;
|
||||
fmt % (one_packet ? "true":"false");
|
||||
fmt % metadata.error_code;
|
||||
fmt % (metadata.start_of_burst ? "true":"false") % (metadata.end_of_burst ? "true":"false");
|
||||
fmt % (metadata.more_fragments ? "true":"false") % (int)metadata.fragment_offset;
|
||||
fmt % (metadata.has_time_spec ? "true":"false") % metadata.time_spec.to_ticks(samp_rate);
|
||||
return fmt.str();
|
||||
}
|
||||
};
|
||||
|
||||
void dbg_gather_data(const size_t nsamps_per_buff, const size_t nsamps_recv,
|
||||
uhd::rx_metadata_t &metadata, const double timeout,
|
||||
const bool one_packet,
|
||||
bool dbg_print_directly = true
|
||||
)
|
||||
{
|
||||
// Initialize a struct with all available data. It can return a formatted string with all infos if wanted.
|
||||
dbg_recv_stat_t data(boost::get_system_time().time_of_day().total_microseconds(),
|
||||
nsamps_per_buff,
|
||||
nsamps_recv,
|
||||
metadata,
|
||||
timeout,
|
||||
one_packet,
|
||||
_samp_rate
|
||||
);
|
||||
if(dbg_print_directly) {
|
||||
dbg_print_err(data.print_line());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
void dbg_print_err(std::string msg) {
|
||||
std::string dbg_prefix("super_recv_packet_handler,");
|
||||
msg = dbg_prefix + msg;
|
||||
fprintf(stderr, "%s\n", msg.c_str());
|
||||
}
|
||||
#endif
|
||||
};
|
||||
|
||||
class recv_packet_streamer : public recv_packet_handler, public rx_streamer{
|
||||
public:
|
||||
recv_packet_streamer(const size_t max_num_samps){
|
||||
_max_num_samps = max_num_samps;
|
||||
}
|
||||
|
||||
size_t get_num_channels(void) const{
|
||||
return this->size();
|
||||
}
|
||||
|
||||
size_t get_max_num_samps(void) const{
|
||||
return _max_num_samps;
|
||||
}
|
||||
|
||||
size_t recv(
|
||||
const rx_streamer::buffs_type &buffs,
|
||||
const size_t nsamps_per_buff,
|
||||
uhd::rx_metadata_t &metadata,
|
||||
const double timeout,
|
||||
const bool one_packet
|
||||
){
|
||||
return recv_packet_handler::recv(buffs, nsamps_per_buff, metadata, timeout, one_packet);
|
||||
}
|
||||
|
||||
void issue_stream_cmd(const stream_cmd_t &stream_cmd)
|
||||
{
|
||||
return recv_packet_handler::issue_stream_cmd(stream_cmd);
|
||||
}
|
||||
|
||||
private:
|
||||
size_t _max_num_samps;
|
||||
};
|
||||
|
||||
}}} //namespace
|
||||
|
||||
#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP */
|
||||
474
host/cores/super_send_packet_handler.hpp
Normal file
474
host/cores/super_send_packet_handler.hpp
Normal file
@@ -0,0 +1,474 @@
|
||||
//
|
||||
// Copyright 2011-2013 Ettus Research LLC
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
//
|
||||
|
||||
#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP
|
||||
#define INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP
|
||||
|
||||
#include <uhd/config.hpp>
|
||||
#include <uhd/exception.hpp>
|
||||
#include <uhd/convert.hpp>
|
||||
#include <uhd/stream.hpp>
|
||||
#include <uhd/utils/msg.hpp>
|
||||
#include <uhd/utils/tasks.hpp>
|
||||
#include <uhd/utils/atomic.hpp>
|
||||
#include <uhd/utils/byteswap.hpp>
|
||||
#include <uhd/types/metadata.hpp>
|
||||
#include <uhd/transport/vrt_if_packet.hpp>
|
||||
#include <uhd/transport/zero_copy.hpp>
|
||||
#include <boost/thread/thread_time.hpp>
|
||||
#include <boost/foreach.hpp>
|
||||
#include <boost/function.hpp>
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
|
||||
#ifdef UHD_TXRX_DEBUG_PRINTS
|
||||
// Included for debugging
|
||||
#include <boost/format.hpp>
|
||||
#include <boost/thread/thread.hpp>
|
||||
#include "boost/date_time/posix_time/posix_time.hpp"
|
||||
#include <map>
|
||||
#include <fstream>
|
||||
#endif
|
||||
|
||||
namespace uhd {
|
||||
namespace transport {
|
||||
namespace sph {
|
||||
|
||||
/***********************************************************************
|
||||
* Super send packet handler
|
||||
*
|
||||
* A send packet handler represents a group of channels.
|
||||
* The channel group shares a common sample rate.
|
||||
* All channels are sent in unison in send().
|
||||
**********************************************************************/
|
||||
class send_packet_handler{
|
||||
public:
|
||||
typedef boost::function<managed_send_buffer::sptr(double)> get_buff_type;
|
||||
typedef boost::function<bool(uhd::async_metadata_t &, const double)> async_receiver_type;
|
||||
typedef void(*vrt_packer_type)(boost::uint32_t *, vrt::if_packet_info_t &);
|
||||
//typedef boost::function<void(boost::uint32_t *, vrt::if_packet_info_t &)> vrt_packer_type;
|
||||
|
||||
/*!
|
||||
* Make a new packet handler for send
|
||||
* \param size the number of transport channels
|
||||
*/
|
||||
send_packet_handler(const size_t size = 1):
|
||||
_next_packet_seq(0), _cached_metadata(false)
|
||||
{
|
||||
this->set_enable_trailer(true);
|
||||
this->resize(size);
|
||||
}
|
||||
|
||||
~send_packet_handler(void){
|
||||
_task_barrier.interrupt();
|
||||
_task_handlers.clear();
|
||||
}
|
||||
|
||||
//! Resize the number of transport channels
|
||||
void resize(const size_t size){
|
||||
if (this->size() == size) return;
|
||||
_task_handlers.clear();
|
||||
_props.resize(size);
|
||||
static const boost::uint64_t zero = 0;
|
||||
_zero_buffs.resize(size, &zero);
|
||||
_task_barrier.resize(size);
|
||||
_task_handlers.resize(size);
|
||||
for (size_t i = 1/*skip 0*/; i < size; i++){
|
||||
_task_handlers[i] = task::make(boost::bind(&send_packet_handler::converter_thread_task, this, i));
|
||||
};
|
||||
}
|
||||
|
||||
//! Get the channel width of this handler
|
||||
size_t size(void) const{
|
||||
return _props.size();
|
||||
}
|
||||
|
||||
//! Setup the vrt packer function and offset
|
||||
void set_vrt_packer(const vrt_packer_type &vrt_packer, const size_t header_offset_words32 = 0){
|
||||
_vrt_packer = vrt_packer;
|
||||
_header_offset_words32 = header_offset_words32;
|
||||
}
|
||||
|
||||
//! Set the stream ID for a specific channel (or no SID)
|
||||
void set_xport_chan_sid(const size_t xport_chan, const bool has_sid, const boost::uint32_t sid = 0){
|
||||
_props.at(xport_chan).has_sid = has_sid;
|
||||
_props.at(xport_chan).sid = sid;
|
||||
}
|
||||
|
||||
void set_enable_trailer(const bool enable)
|
||||
{
|
||||
_has_tlr = enable;
|
||||
}
|
||||
|
||||
//! Set the rate of ticks per second
|
||||
void set_tick_rate(const double rate){
|
||||
_tick_rate = rate;
|
||||
}
|
||||
|
||||
//! Set the rate of samples per second
|
||||
void set_samp_rate(const double rate){
|
||||
_samp_rate = rate;
|
||||
}
|
||||
|
||||
/*!
|
||||
* Set the function to get a managed buffer.
|
||||
* \param xport_chan which transport channel
|
||||
* \param get_buff the getter function
|
||||
*/
|
||||
void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){
|
||||
_props.at(xport_chan).get_buff = get_buff;
|
||||
}
|
||||
|
||||
//! Set the conversion routine for all channels
|
||||
void set_converter(const uhd::convert::id_type &id){
|
||||
_num_inputs = id.num_inputs;
|
||||
_converter = uhd::convert::get_converter(id)();
|
||||
this->set_scale_factor(32767.); //update after setting converter
|
||||
_bytes_per_otw_item = uhd::convert::get_bytes_per_item(id.output_format);
|
||||
_bytes_per_cpu_item = uhd::convert::get_bytes_per_item(id.input_format);
|
||||
}
|
||||
|
||||
/*!
|
||||
* Set the maximum number of samples per host packet.
|
||||
* Ex: A USRP1 in dual channel mode would be half.
|
||||
* \param num_samps the maximum samples in a packet
|
||||
*/
|
||||
void set_max_samples_per_packet(const size_t num_samps){
|
||||
_max_samples_per_packet = num_samps;
|
||||
}
|
||||
|
||||
//! Set the scale factor used in float conversion
|
||||
void set_scale_factor(const double scale_factor){
|
||||
_converter->set_scalar(scale_factor);
|
||||
}
|
||||
|
||||
//! Set the callback to get async messages
|
||||
void set_async_receiver(const async_receiver_type &async_receiver)
|
||||
{
|
||||
_async_receiver = async_receiver;
|
||||
}
|
||||
|
||||
//! Overload call to get async metadata
|
||||
bool recv_async_msg(
|
||||
uhd::async_metadata_t &async_metadata, double timeout = 0.1
|
||||
){
|
||||
if (_async_receiver) return _async_receiver(async_metadata, timeout);
|
||||
boost::this_thread::sleep(boost::posix_time::microseconds(long(timeout*1e6)));
|
||||
return false;
|
||||
}
|
||||
|
||||
/*******************************************************************
|
||||
* Send:
|
||||
* The entry point for the fast-path send calls.
|
||||
* Dispatch into combinations of single packet send calls.
|
||||
******************************************************************/
|
||||
UHD_INLINE size_t send(
|
||||
const uhd::tx_streamer::buffs_type &buffs,
|
||||
const size_t nsamps_per_buff,
|
||||
const uhd::tx_metadata_t &metadata,
|
||||
const double timeout
|
||||
){
|
||||
//translate the metadata to vrt if packet info
|
||||
vrt::if_packet_info_t if_packet_info;
|
||||
if_packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA;
|
||||
//if_packet_info.has_sid = false; //set per channel
|
||||
if_packet_info.has_cid = false;
|
||||
if_packet_info.has_tlr = _has_tlr;
|
||||
if_packet_info.has_tsi = false;
|
||||
if_packet_info.has_tsf = metadata.has_time_spec;
|
||||
if_packet_info.tsf = metadata.time_spec.to_ticks(_tick_rate);
|
||||
if_packet_info.sob = metadata.start_of_burst;
|
||||
if_packet_info.eob = metadata.end_of_burst;
|
||||
|
||||
/*
|
||||
* Metadata is cached when we get a send requesting a start of burst with no samples.
|
||||
* It is applied here on the next call to send() that actually has samples to send.
|
||||
*/
|
||||
if (_cached_metadata && nsamps_per_buff != 0)
|
||||
{
|
||||
// If the new metada has a time_spec, do not use the cached time_spec.
|
||||
if (!metadata.has_time_spec)
|
||||
{
|
||||
if_packet_info.has_tsf = _metadata_cache.has_time_spec;
|
||||
if_packet_info.tsf = _metadata_cache.time_spec.to_ticks(_tick_rate);
|
||||
}
|
||||
if_packet_info.sob = _metadata_cache.start_of_burst;
|
||||
if_packet_info.eob = _metadata_cache.end_of_burst;
|
||||
_cached_metadata = false;
|
||||
}
|
||||
|
||||
if (nsamps_per_buff <= _max_samples_per_packet){
|
||||
|
||||
//TODO remove this code when sample counts of zero are supported by hardware
|
||||
#ifndef SSPH_DONT_PAD_TO_ONE
|
||||
static const boost::uint64_t zero = 0;
|
||||
_zero_buffs.resize(buffs.size(), &zero);
|
||||
|
||||
if (nsamps_per_buff == 0)
|
||||
{
|
||||
// if this is a start of a burst and there are no samples
|
||||
if (metadata.start_of_burst)
|
||||
{
|
||||
// cache metadata and apply on the next send()
|
||||
_metadata_cache = metadata;
|
||||
_cached_metadata = true;
|
||||
return 0;
|
||||
} else {
|
||||
// send requests with no samples are handled here (such as end of burst)
|
||||
return send_one_packet(_zero_buffs, 1, if_packet_info, timeout) & 0x0;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
size_t nsamps_sent = send_one_packet(buffs, nsamps_per_buff, if_packet_info, timeout);
|
||||
#ifdef UHD_TXRX_DEBUG_PRINTS
|
||||
dbg_print_send(nsamps_per_buff, nsamps_sent, metadata, timeout);
|
||||
#endif
|
||||
return nsamps_sent; }
|
||||
size_t total_num_samps_sent = 0;
|
||||
|
||||
//false until final fragment
|
||||
if_packet_info.eob = false;
|
||||
|
||||
const size_t num_fragments = (nsamps_per_buff-1)/_max_samples_per_packet;
|
||||
const size_t final_length = ((nsamps_per_buff-1)%_max_samples_per_packet)+1;
|
||||
|
||||
//loop through the following fragment indexes
|
||||
for (size_t i = 0; i < num_fragments; i++){
|
||||
|
||||
//send a fragment with the helper function
|
||||
const size_t num_samps_sent = send_one_packet(
|
||||
buffs, _max_samples_per_packet,
|
||||
if_packet_info, timeout,
|
||||
total_num_samps_sent*_bytes_per_cpu_item
|
||||
);
|
||||
total_num_samps_sent += num_samps_sent;
|
||||
if (num_samps_sent == 0) return total_num_samps_sent;
|
||||
|
||||
//setup metadata for the next fragment
|
||||
const time_spec_t time_spec = metadata.time_spec + time_spec_t::from_ticks(total_num_samps_sent, _samp_rate);
|
||||
if_packet_info.tsf = time_spec.to_ticks(_tick_rate);
|
||||
if_packet_info.sob = false;
|
||||
|
||||
}
|
||||
|
||||
//send the final fragment with the helper function
|
||||
if_packet_info.eob = metadata.end_of_burst;
|
||||
size_t nsamps_sent = total_num_samps_sent
|
||||
+ send_one_packet(buffs, final_length, if_packet_info, timeout,
|
||||
total_num_samps_sent * _bytes_per_cpu_item);
|
||||
#ifdef UHD_TXRX_DEBUG_PRINTS
|
||||
dbg_print_send(nsamps_per_buff, nsamps_sent, metadata, timeout);
|
||||
|
||||
#endif
|
||||
return nsamps_sent;
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
vrt_packer_type _vrt_packer;
|
||||
size_t _header_offset_words32;
|
||||
double _tick_rate, _samp_rate;
|
||||
struct xport_chan_props_type{
|
||||
xport_chan_props_type(void):has_sid(false),sid(0){}
|
||||
get_buff_type get_buff;
|
||||
bool has_sid;
|
||||
boost::uint32_t sid;
|
||||
managed_send_buffer::sptr buff;
|
||||
};
|
||||
std::vector<xport_chan_props_type> _props;
|
||||
size_t _num_inputs;
|
||||
size_t _bytes_per_otw_item; //used in conversion
|
||||
size_t _bytes_per_cpu_item; //used in conversion
|
||||
uhd::convert::converter::sptr _converter; //used in conversion
|
||||
size_t _max_samples_per_packet;
|
||||
std::vector<const void *> _zero_buffs;
|
||||
size_t _next_packet_seq;
|
||||
bool _has_tlr;
|
||||
async_receiver_type _async_receiver;
|
||||
bool _cached_metadata;
|
||||
uhd::tx_metadata_t _metadata_cache;
|
||||
|
||||
#ifdef UHD_TXRX_DEBUG_PRINTS
|
||||
struct dbg_send_stat_t {
|
||||
dbg_send_stat_t(long wc, size_t nspb, size_t nss, uhd::tx_metadata_t md, double to, double rate):
|
||||
wallclock(wc), nsamps_per_buff(nspb), nsamps_sent(nss), metadata(md), timeout(to), samp_rate(rate)
|
||||
{}
|
||||
long wallclock;
|
||||
size_t nsamps_per_buff;
|
||||
size_t nsamps_sent;
|
||||
uhd::tx_metadata_t metadata;
|
||||
double timeout;
|
||||
double samp_rate;
|
||||
// Create a formatted print line for all the info gathered in this struct.
|
||||
std::string print_line() {
|
||||
boost::format fmt("send,%ld,%f,%i,%i,%s,%s,%s,%ld");
|
||||
fmt % wallclock;
|
||||
fmt % timeout % (int)nsamps_per_buff % (int) nsamps_sent;
|
||||
fmt % (metadata.start_of_burst ? "true":"false") % (metadata.end_of_burst ? "true":"false");
|
||||
fmt % (metadata.has_time_spec ? "true":"false") % metadata.time_spec.to_ticks(samp_rate);
|
||||
return fmt.str();
|
||||
}
|
||||
};
|
||||
|
||||
void dbg_print_send(size_t nsamps_per_buff, size_t nsamps_sent,
|
||||
const uhd::tx_metadata_t &metadata, const double timeout,
|
||||
bool dbg_print_directly = true)
|
||||
{
|
||||
dbg_send_stat_t data(boost::get_system_time().time_of_day().total_microseconds(),
|
||||
nsamps_per_buff,
|
||||
nsamps_sent,
|
||||
metadata,
|
||||
timeout,
|
||||
_samp_rate
|
||||
);
|
||||
if(dbg_print_directly){
|
||||
dbg_print_err(data.print_line());
|
||||
}
|
||||
}
|
||||
void dbg_print_err(std::string msg) {
|
||||
msg = "super_send_packet_handler," + msg;
|
||||
fprintf(stderr, "%s\n", msg.c_str());
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
/*******************************************************************
|
||||
* Send a single packet:
|
||||
******************************************************************/
|
||||
UHD_INLINE size_t send_one_packet(
|
||||
const uhd::tx_streamer::buffs_type &buffs,
|
||||
const size_t nsamps_per_buff,
|
||||
vrt::if_packet_info_t &if_packet_info,
|
||||
const double timeout,
|
||||
const size_t buffer_offset_bytes = 0
|
||||
){
|
||||
|
||||
//load the rest of the if_packet_info in here
|
||||
if_packet_info.num_payload_bytes = nsamps_per_buff*_num_inputs*_bytes_per_otw_item;
|
||||
if_packet_info.num_payload_words32 = (if_packet_info.num_payload_bytes + 3/*round up*/)/sizeof(boost::uint32_t);
|
||||
if_packet_info.packet_count = _next_packet_seq;
|
||||
|
||||
//get a buffer for each channel or timeout
|
||||
BOOST_FOREACH(xport_chan_props_type &props, _props){
|
||||
if (not props.buff) props.buff = props.get_buff(timeout);
|
||||
if (not props.buff) return 0; //timeout
|
||||
}
|
||||
|
||||
//setup the data to share with converter threads
|
||||
_convert_nsamps = nsamps_per_buff;
|
||||
_convert_buffs = &buffs;
|
||||
_convert_buffer_offset_bytes = buffer_offset_bytes;
|
||||
_convert_if_packet_info = &if_packet_info;
|
||||
|
||||
//perform N channels of conversion
|
||||
converter_thread_task(0);
|
||||
|
||||
_next_packet_seq++; //increment sequence after commits
|
||||
return nsamps_per_buff;
|
||||
}
|
||||
|
||||
/*******************************************************************
|
||||
* Perform one thread's work of the conversion task.
|
||||
* The entry and exit use a dual synchronization barrier,
|
||||
* to wait for data to become ready and block until completion.
|
||||
******************************************************************/
|
||||
UHD_INLINE void converter_thread_task(const size_t index)
|
||||
{
|
||||
_task_barrier.wait();
|
||||
|
||||
//shortcut references to local data structures
|
||||
managed_send_buffer::sptr &buff = _props[index].buff;
|
||||
vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info;
|
||||
const tx_streamer::buffs_type &buffs = *_convert_buffs;
|
||||
|
||||
//fill IO buffs with pointers into the output buffer
|
||||
const void *io_buffs[4/*max interleave*/];
|
||||
for (size_t i = 0; i < _num_inputs; i++){
|
||||
const char *b = reinterpret_cast<const char *>(buffs[index*_num_inputs + i]);
|
||||
io_buffs[i] = b + _convert_buffer_offset_bytes;
|
||||
}
|
||||
const ref_vector<const void *> in_buffs(io_buffs, _num_inputs);
|
||||
|
||||
//pack metadata into a vrt header
|
||||
boost::uint32_t *otw_mem = buff->cast<boost::uint32_t *>() + _header_offset_words32;
|
||||
if_packet_info.has_sid = _props[index].has_sid;
|
||||
if_packet_info.sid = _props[index].sid;
|
||||
_vrt_packer(otw_mem, if_packet_info);
|
||||
otw_mem += if_packet_info.num_header_words32;
|
||||
|
||||
//perform the conversion operation
|
||||
_converter->conv(in_buffs, otw_mem, _convert_nsamps);
|
||||
|
||||
//commit the samples to the zero-copy interface
|
||||
const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32;
|
||||
buff->commit(num_vita_words32*sizeof(boost::uint32_t));
|
||||
buff.reset(); //effectively a release
|
||||
|
||||
if (index == 0) _task_barrier.wait_others();
|
||||
}
|
||||
|
||||
//! Shared variables for the worker threads
|
||||
reusable_barrier _task_barrier;
|
||||
std::vector<task::sptr> _task_handlers;
|
||||
size_t _convert_nsamps;
|
||||
const tx_streamer::buffs_type *_convert_buffs;
|
||||
size_t _convert_buffer_offset_bytes;
|
||||
vrt::if_packet_info_t *_convert_if_packet_info;
|
||||
|
||||
};
|
||||
|
||||
class send_packet_streamer : public send_packet_handler, public tx_streamer{
|
||||
public:
|
||||
send_packet_streamer(const size_t max_num_samps){
|
||||
_max_num_samps = max_num_samps;
|
||||
this->set_max_samples_per_packet(_max_num_samps);
|
||||
}
|
||||
|
||||
size_t get_num_channels(void) const{
|
||||
return this->size();
|
||||
}
|
||||
|
||||
size_t get_max_num_samps(void) const{
|
||||
return _max_num_samps;
|
||||
}
|
||||
|
||||
size_t send(
|
||||
const tx_streamer::buffs_type &buffs,
|
||||
const size_t nsamps_per_buff,
|
||||
const uhd::tx_metadata_t &metadata,
|
||||
const double timeout
|
||||
){
|
||||
return send_packet_handler::send(buffs, nsamps_per_buff, metadata, timeout);
|
||||
}
|
||||
|
||||
bool recv_async_msg(
|
||||
uhd::async_metadata_t &async_metadata, double timeout = 0.1
|
||||
){
|
||||
return send_packet_handler::recv_async_msg(async_metadata, timeout);
|
||||
}
|
||||
|
||||
private:
|
||||
size_t _max_num_samps;
|
||||
};
|
||||
|
||||
} // namespace sph
|
||||
} // namespace transport
|
||||
} // namespace uhd
|
||||
|
||||
#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP */
|
||||
73
host/cores/validate_subdev_spec.cpp
Normal file
73
host/cores/validate_subdev_spec.cpp
Normal file
@@ -0,0 +1,73 @@
|
||||
//
|
||||
// Copyright 2011 Ettus Research LLC
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
//
|
||||
|
||||
#include "validate_subdev_spec.hpp"
|
||||
#include <uhd/exception.hpp>
|
||||
#include <uhd/utils/assert_has.hpp>
|
||||
#include <boost/foreach.hpp>
|
||||
#include <boost/format.hpp>
|
||||
|
||||
using namespace uhd;
|
||||
using namespace uhd::usrp;
|
||||
|
||||
namespace uhd{ namespace usrp{
|
||||
|
||||
static std::ostream& operator<< (std::ostream &out, const subdev_spec_pair_t &pair){
|
||||
out << pair.db_name << ":" << pair.sd_name;
|
||||
return out;
|
||||
}
|
||||
|
||||
}}
|
||||
|
||||
void uhd::usrp::validate_subdev_spec(
|
||||
property_tree::sptr tree,
|
||||
const subdev_spec_t &spec,
|
||||
const std::string &type,
|
||||
const std::string &mb
|
||||
){
|
||||
const size_t num_dsps = tree->list(str(boost::format("/mboards/%s/%s_dsps") % mb % type)).size();
|
||||
|
||||
//sanity checking on the length
|
||||
if (spec.size() == 0) throw uhd::value_error(str(boost::format(
|
||||
"Empty %s subdevice specification is not supported.\n"
|
||||
) % type));
|
||||
if (spec.size() > num_dsps) throw uhd::value_error(str(boost::format(
|
||||
"The subdevice specification \"%s\" is too long.\n"
|
||||
"The user specified %u channels, but there are only %u %s dsps on mboard %s.\n"
|
||||
) % spec.to_string() % spec.size() % num_dsps % type % mb));
|
||||
|
||||
//make a list of all possible specs
|
||||
subdev_spec_t all_specs;
|
||||
BOOST_FOREACH(const std::string &db, tree->list(str(boost::format("/mboards/%s/dboards") % mb))){
|
||||
BOOST_FOREACH(const std::string &sd, tree->list(str(boost::format("/mboards/%s/dboards/%s/%s_frontends") % mb % db % type))){
|
||||
all_specs.push_back(subdev_spec_pair_t(db, sd));
|
||||
}
|
||||
}
|
||||
|
||||
//validate that the spec is possible
|
||||
BOOST_FOREACH(const subdev_spec_pair_t &pair, spec){
|
||||
uhd::assert_has(all_specs, pair, str(boost::format("%s subdevice specification on mboard %s") % type % mb));
|
||||
}
|
||||
|
||||
//enable selected frontends, disable others
|
||||
BOOST_FOREACH(const subdev_spec_pair_t &pair, all_specs){
|
||||
const bool enb = uhd::has(spec, pair);
|
||||
tree->access<bool>(str(boost::format(
|
||||
"/mboards/%s/dboards/%s/%s_frontends/%s/enabled"
|
||||
) % mb % pair.db_name % type % pair.sd_name)).set(enb);
|
||||
}
|
||||
}
|
||||
38
host/cores/validate_subdev_spec.hpp
Normal file
38
host/cores/validate_subdev_spec.hpp
Normal file
@@ -0,0 +1,38 @@
|
||||
//
|
||||
// Copyright 2011 Ettus Research LLC
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
//
|
||||
|
||||
#ifndef INCLUDED_LIBUHD_USRP_COMMON_VALIDATE_SUBDEV_SPEC_HPP
|
||||
#define INCLUDED_LIBUHD_USRP_COMMON_VALIDATE_SUBDEV_SPEC_HPP
|
||||
|
||||
#include <uhd/config.hpp>
|
||||
#include <uhd/usrp/subdev_spec.hpp>
|
||||
#include <uhd/property_tree.hpp>
|
||||
#include <string>
|
||||
|
||||
namespace uhd{ namespace usrp{
|
||||
|
||||
//! Validate a subdev spec against a property tree
|
||||
void validate_subdev_spec(
|
||||
property_tree::sptr tree,
|
||||
const subdev_spec_t &spec,
|
||||
const std::string &type, //rx or tx
|
||||
const std::string &mb = "0"
|
||||
);
|
||||
|
||||
}} //namespace uhd::usrp
|
||||
|
||||
#endif /* INCLUDED_LIBUHD_USRP_COMMON_VALIDATE_SUBDEV_SPEC_HPP */
|
||||
@@ -49,7 +49,8 @@ UHD_STATIC_BLOCK(register_umtrx_device){
|
||||
**********************************************************************/
|
||||
umtrx_impl::umtrx_impl(const device_addr_t &device_addr)
|
||||
{
|
||||
UHD_MSG(status) << "Opening a UmTRX device..." << std::endl;
|
||||
_device_ip_addr = device_addr["addr"];
|
||||
UHD_MSG(status) << "Opening a UmTRX device... " << _device_ip_addr << std::endl;
|
||||
|
||||
////////////////////////////////////////////////////////////////////
|
||||
// create controller objects and initialize the properties tree
|
||||
@@ -62,7 +63,7 @@ umtrx_impl::umtrx_impl(const device_addr_t &device_addr)
|
||||
// create the iface that controls i2c, spi, uart, and wb
|
||||
////////////////////////////////////////////////////////////////
|
||||
_iface = umtrx_iface::make(udp_simple::make_connected(
|
||||
device_addr["addr"], BOOST_STRINGIZE(USRP2_UDP_CTRL_PORT)
|
||||
_device_ip_addr, BOOST_STRINGIZE(USRP2_UDP_CTRL_PORT)
|
||||
));
|
||||
_tree->create<std::string>(mb_path / "name").set(_iface->get_cname());
|
||||
_tree->create<std::string>(mb_path / "fw_version").set(_iface->get_fw_version_string());
|
||||
@@ -239,8 +240,8 @@ umtrx_impl::umtrx_impl(const device_addr_t &device_addr)
|
||||
_tree->create<std::vector<std::string> >(mb_path / "time_source" / "options")
|
||||
.publish(boost::bind(&time64_core_200::get_time_sources, _time64));
|
||||
//setup reference source props
|
||||
_tree->create<std::string>(mb_path / "clock_source" / "value");
|
||||
//?? .subscribe(boost::bind(&umtrx_impl::update_clock_source, this, mb, _1));
|
||||
_tree->create<std::string>(mb_path / "clock_source" / "value")
|
||||
.subscribe(boost::bind(&umtrx_impl::update_clock_source, this, _1));
|
||||
|
||||
static const std::vector<std::string> clock_sources = boost::assign::list_of("internal")("external");
|
||||
_tree->create<std::vector<std::string> >(mb_path / "clock_source"/ "options").set(clock_sources);
|
||||
@@ -348,6 +349,8 @@ umtrx_impl::umtrx_impl(const device_addr_t &device_addr)
|
||||
_tree->access<double>(mb_path / "tick_rate")
|
||||
.set(this->get_master_clock_rate());
|
||||
this->time64_self_test();
|
||||
_rx_streamers.resize(_rx_dsps.size());
|
||||
_tx_streamers.resize(_tx_dsps.size());
|
||||
}
|
||||
|
||||
umtrx_impl::~umtrx_impl(void){
|
||||
@@ -374,3 +377,5 @@ void umtrx_impl::time64_self_test(void)
|
||||
const bool within_range = (secs_elapsed < (1.5)*sleepTime and secs_elapsed > (0.5)*sleepTime);
|
||||
UHD_MSG(status) << (within_range? "pass" : "fail") << std::endl;
|
||||
}
|
||||
|
||||
void umtrx_impl::update_clock_source(const std::string &){}
|
||||
|
||||
@@ -80,6 +80,7 @@ public:
|
||||
private:
|
||||
|
||||
//communication interfaces
|
||||
std::string _device_ip_addr;
|
||||
umtrx_iface::sptr _iface;
|
||||
|
||||
//controls for perifs
|
||||
@@ -102,6 +103,11 @@ private:
|
||||
void update_rx_samp_rate(const size_t, const double rate);
|
||||
void update_tx_samp_rate(const size_t, const double rate);
|
||||
void time64_self_test(void);
|
||||
void update_rates(void);
|
||||
|
||||
//streaming
|
||||
std::vector<boost::weak_ptr<uhd::rx_streamer> > _rx_streamers;
|
||||
std::vector<boost::weak_ptr<uhd::tx_streamer> > _tx_streamers;
|
||||
};
|
||||
|
||||
#endif /* INCLUDED_UMTRX_IMPL_HPP */
|
||||
|
||||
@@ -18,6 +18,13 @@
|
||||
|
||||
#include "umtrx_impl.hpp"
|
||||
#include "umtrx_regs.hpp"
|
||||
#include "cores/validate_subdev_spec.hpp"
|
||||
#include "cores/async_packet_handler.hpp"
|
||||
#include "cores/super_recv_packet_handler.hpp"
|
||||
#include "cores/super_send_packet_handler.hpp"
|
||||
|
||||
//A reasonable number of frames for send/recv and async/sync
|
||||
static const size_t DEFAULT_NUM_FRAMES = 32;
|
||||
|
||||
using namespace uhd;
|
||||
using namespace uhd::usrp;
|
||||
@@ -25,16 +32,163 @@ using namespace uhd::transport;
|
||||
namespace asio = boost::asio;
|
||||
namespace pt = boost::posix_time;
|
||||
|
||||
uhd::rx_streamer::sptr umtrx_impl::get_rx_stream(const uhd::stream_args_t &args){}
|
||||
uhd::tx_streamer::sptr umtrx_impl::get_tx_stream(const uhd::stream_args_t &args){}
|
||||
bool umtrx_impl::recv_async_msg(uhd::async_metadata_t &, double){}
|
||||
|
||||
void umtrx_impl::update_tick_rate(const double rate){}
|
||||
|
||||
void umtrx_impl::update_rx_subdev_spec(const uhd::usrp::subdev_spec_t &){}
|
||||
void umtrx_impl::update_tx_subdev_spec(const uhd::usrp::subdev_spec_t &){}
|
||||
|
||||
|
||||
void umtrx_impl::update_rx_subdev_spec(const uhd::usrp::subdev_spec_t &){}
|
||||
void umtrx_impl::update_tx_subdev_spec(const uhd::usrp::subdev_spec_t &){}
|
||||
void umtrx_impl::update_clock_source(const std::string &){}
|
||||
|
||||
void umtrx_impl::update_rx_samp_rate(const size_t, const double rate){}
|
||||
void umtrx_impl::update_tx_samp_rate(const size_t, const double rate){}
|
||||
/***********************************************************************
|
||||
* Update rates
|
||||
**********************************************************************/
|
||||
void umtrx_impl::update_rates(void)
|
||||
{
|
||||
fs_path root = "/mboards/0";
|
||||
_tree->access<double>(root / "tick_rate").update();
|
||||
|
||||
//and now that the tick rate is set, init the host rates to something
|
||||
BOOST_FOREACH(const std::string &name, _tree->list(root / "rx_dsps"))
|
||||
{
|
||||
_tree->access<double>(root / "rx_dsps" / name / "rate" / "value").update();
|
||||
}
|
||||
BOOST_FOREACH(const std::string &name, _tree->list(root / "tx_dsps"))
|
||||
{
|
||||
_tree->access<double>(root / "tx_dsps" / name / "rate" / "value").update();
|
||||
}
|
||||
}
|
||||
|
||||
void umtrx_impl::update_rx_samp_rate(const size_t dsp, const double rate)
|
||||
{
|
||||
boost::shared_ptr<sph::recv_packet_streamer> my_streamer =
|
||||
boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_tx_streamers[dsp].lock());
|
||||
if (not my_streamer) return;
|
||||
|
||||
my_streamer->set_samp_rate(rate);
|
||||
const double adj = _rx_dsps[dsp]->get_scaling_adjustment();
|
||||
my_streamer->set_scale_factor(adj);
|
||||
}
|
||||
|
||||
void umtrx_impl::update_tx_samp_rate(const size_t dsp, const double rate)
|
||||
{
|
||||
boost::shared_ptr<sph::send_packet_streamer> my_streamer =
|
||||
boost::dynamic_pointer_cast<sph::send_packet_streamer>(_tx_streamers[dsp].lock());
|
||||
if (not my_streamer) return;
|
||||
|
||||
my_streamer->set_samp_rate(rate);
|
||||
const double adj = _tx_dsps[dsp]->get_scaling_adjustment();
|
||||
my_streamer->set_scale_factor(adj);
|
||||
}
|
||||
|
||||
void umtrx_impl::update_tick_rate(const double rate)
|
||||
{
|
||||
//update the tick rate on all existing streamers -> thread safe
|
||||
for (size_t i = 0; i < _rx_streamers.size(); i++)
|
||||
{
|
||||
boost::shared_ptr<sph::recv_packet_streamer> my_streamer =
|
||||
boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_rx_streamers[i].lock());
|
||||
if (not my_streamer) continue;
|
||||
my_streamer->set_tick_rate(rate);
|
||||
}
|
||||
for (size_t i = 0; i < _tx_streamers.size(); i++)
|
||||
{
|
||||
boost::shared_ptr<sph::send_packet_streamer> my_streamer =
|
||||
boost::dynamic_pointer_cast<sph::send_packet_streamer>(_tx_streamers[i].lock());
|
||||
if (not my_streamer) continue;
|
||||
my_streamer->set_tick_rate(rate);
|
||||
}
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
* Receive streamer
|
||||
**********************************************************************/
|
||||
uhd::rx_streamer::sptr umtrx_impl::get_rx_stream(const uhd::stream_args_t &args_)
|
||||
{
|
||||
stream_args_t args = args_;
|
||||
|
||||
//setup defaults for unspecified values
|
||||
args.otw_format = args.otw_format.empty()? "sc16" : args.otw_format;
|
||||
args.channels = args.channels.empty()? std::vector<size_t>(1, 0) : args.channels;
|
||||
|
||||
//setup the transport hints (default to a large recv buff)
|
||||
if (not args.args.has_key("recv_buff_size"))
|
||||
{
|
||||
#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD)
|
||||
//limit buffer resize on macos or it will error
|
||||
args.args["recv_buff_size"] = "1e6";
|
||||
#elif defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32)
|
||||
//set to half-a-second of buffering at max rate
|
||||
args.args["recv_buff_size"] = "50e6";
|
||||
#endif
|
||||
}
|
||||
zero_copy_xport_params default_params;
|
||||
default_params.send_frame_size = transport::udp_simple::mtu;
|
||||
default_params.recv_frame_size = transport::udp_simple::mtu;
|
||||
default_params.num_send_frames = DEFAULT_NUM_FRAMES;
|
||||
default_params.num_recv_frames = DEFAULT_NUM_FRAMES;
|
||||
|
||||
//create the transport
|
||||
udp_zero_copy::buff_params ignored_params;
|
||||
//TODO determine port, program reply
|
||||
zero_copy_if::sptr xport = udp_zero_copy::make(_device_ip_addr, BOOST_STRINGIZE(0), default_params, ignored_params, args.args);
|
||||
|
||||
//calculate packet size
|
||||
static const size_t hdr_size = 0
|
||||
+ vrt::max_if_hdr_words32*sizeof(boost::uint32_t)
|
||||
+ sizeof(vrt::if_packet_info_t().tlr) //forced to have trailer
|
||||
- sizeof(vrt::if_packet_info_t().cid) //no class id ever used
|
||||
- sizeof(vrt::if_packet_info_t().tsi) //no int time ever used
|
||||
;
|
||||
const size_t bpp = xport->get_recv_frame_size() - hdr_size;
|
||||
const size_t bpi = convert::get_bytes_per_item(args.otw_format);
|
||||
const size_t spp = unsigned(args.args.cast<double>("spp", bpp/bpi));
|
||||
|
||||
//make the new streamer given the samples per packet
|
||||
boost::shared_ptr<sph::recv_packet_streamer> my_streamer = boost::make_shared<sph::recv_packet_streamer>(spp);
|
||||
|
||||
//init some streamer stuff
|
||||
my_streamer->resize(args.channels.size());
|
||||
my_streamer->set_vrt_unpacker(&vrt::if_hdr_unpack_be);
|
||||
|
||||
//set the converter
|
||||
uhd::convert::id_type id;
|
||||
id.input_format = args.otw_format + "_item32_be";
|
||||
id.num_inputs = 1;
|
||||
id.output_format = args.cpu_format;
|
||||
id.num_outputs = 1;
|
||||
my_streamer->set_converter(id);
|
||||
|
||||
//bind callbacks for the handler
|
||||
for (size_t chan_i = 0; chan_i < args.channels.size(); chan_i++)
|
||||
{
|
||||
const size_t dsp = args.channels[chan_i];
|
||||
_rx_dsps[dsp]->set_nsamps_per_packet(spp); //seems to be a good place to set this
|
||||
_rx_dsps[dsp]->setup(args);
|
||||
//this->program_stream_dest(_mbc[mb].rx_dsp_xports[dsp], args);
|
||||
my_streamer->set_xport_chan_get_buff(chan_i, boost::bind(
|
||||
&zero_copy_if::get_recv_buff, xport, _1
|
||||
), true /*flush*/);
|
||||
my_streamer->set_issue_stream_cmd(chan_i, boost::bind(
|
||||
&rx_dsp_core_200::issue_stream_command, _rx_dsps[dsp], _1));
|
||||
_rx_streamers[dsp] = my_streamer; //store weak pointer
|
||||
}
|
||||
|
||||
//set the packet threshold to be an entire socket buffer's worth
|
||||
const size_t packets_per_sock_buff = size_t(50e6/xport->get_recv_frame_size());
|
||||
my_streamer->set_alignment_failure_threshold(packets_per_sock_buff);
|
||||
|
||||
//sets all tick and samp rates on this streamer
|
||||
this->update_rates();
|
||||
|
||||
return my_streamer;
|
||||
}
|
||||
|
||||
|
||||
/***********************************************************************
|
||||
* Transmit streamer
|
||||
**********************************************************************/
|
||||
uhd::tx_streamer::sptr umtrx_impl::get_tx_stream(const uhd::stream_args_t &args)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user