From 0ef79fb67dd054ea00805acd2788e32afdfd6c8c Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Tue, 15 Apr 2014 22:06:15 -0700 Subject: [PATCH] umtrx: work on io impl and recv streamer --- host/CMakeLists.txt | 1 + host/cores/async_packet_handler.hpp | 71 ++ host/cores/super_recv_packet_handler.hpp | 802 +++++++++++++++++++++++ host/cores/super_send_packet_handler.hpp | 474 ++++++++++++++ host/cores/validate_subdev_spec.cpp | 73 +++ host/cores/validate_subdev_spec.hpp | 38 ++ host/umtrx_impl.cpp | 13 +- host/umtrx_impl.hpp | 6 + host/umtrx_io_impl.cpp | 170 ++++- 9 files changed, 1636 insertions(+), 12 deletions(-) create mode 100644 host/cores/async_packet_handler.hpp create mode 100644 host/cores/super_recv_packet_handler.hpp create mode 100644 host/cores/super_send_packet_handler.hpp create mode 100644 host/cores/validate_subdev_spec.cpp create mode 100644 host/cores/validate_subdev_spec.hpp diff --git a/host/CMakeLists.txt b/host/CMakeLists.txt index 259e2371..41c762b8 100644 --- a/host/CMakeLists.txt +++ b/host/CMakeLists.txt @@ -38,6 +38,7 @@ list(APPEND UMTRX_SOURCES cores/rx_dsp_core_200.cpp cores/tx_dsp_core_200.cpp cores/time64_core_200.cpp + cores/validate_subdev_spec.cpp ) ######################################################################## diff --git a/host/cores/async_packet_handler.hpp b/host/cores/async_packet_handler.hpp new file mode 100644 index 00000000..fef03483 --- /dev/null +++ b/host/cores/async_packet_handler.hpp @@ -0,0 +1,71 @@ +// +// Copyright 2012 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// + +#ifndef INCLUDED_LIBUHD_USRP_COMMON_ASYNC_PACKET_HANDLER_HPP +#define INCLUDED_LIBUHD_USRP_COMMON_ASYNC_PACKET_HANDLER_HPP + +#include +#include +#include +#include +#include + +namespace uhd{ namespace usrp{ + + template + void load_metadata_from_buff( + const to_host_type &to_host, + async_metadata_t &metadata, + const transport::vrt::if_packet_info_t &if_packet_info, + const boost::uint32_t *vrt_hdr, + const double tick_rate, + const size_t channel = 0 + ){ + const boost::uint32_t *payload = vrt_hdr + if_packet_info.num_header_words32; + + //load into metadata + metadata.channel = channel; + metadata.has_time_spec = if_packet_info.has_tsf; + metadata.time_spec = time_spec_t::from_ticks(if_packet_info.tsf, tick_rate); + metadata.event_code = async_metadata_t::event_code_t(to_host(payload[0]) & 0xff); + + //load user payload + for (size_t i = 1; i < if_packet_info.num_payload_words32; i++){ + if (i-1 == 4) break; //limit of 4 words32 + metadata.user_payload[i-1] = to_host(payload[i]); + } + } + + UHD_INLINE void standard_async_msg_prints(const async_metadata_t &metadata) + { + if (metadata.event_code & + ( async_metadata_t::EVENT_CODE_UNDERFLOW + | async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET) + ) UHD_MSG(fastpath) << "U"; + else if (metadata.event_code & + ( async_metadata_t::EVENT_CODE_SEQ_ERROR + | async_metadata_t::EVENT_CODE_SEQ_ERROR_IN_BURST) + ) UHD_MSG(fastpath) << "S"; + else if (metadata.event_code & + async_metadata_t::EVENT_CODE_TIME_ERROR + ) UHD_MSG(fastpath) << "L"; + } + + +}} //namespace uhd::usrp + +#endif /* INCLUDED_LIBUHD_USRP_COMMON_ASYNC_PACKET_HANDLER_HPP */ diff --git a/host/cores/super_recv_packet_handler.hpp b/host/cores/super_recv_packet_handler.hpp new file mode 100644 index 00000000..5c84327a --- /dev/null +++ b/host/cores/super_recv_packet_handler.hpp @@ -0,0 +1,802 @@ +// +// Copyright 2011-2013 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// + +#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP +#define INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// Included for debugging +#ifdef UHD_TXRX_DEBUG_PRINTS +#include +#include +#include "boost/date_time/posix_time/posix_time.hpp" +#endif + +namespace uhd{ namespace transport{ namespace sph{ + +UHD_INLINE boost::uint32_t get_context_code( + const boost::uint32_t *vrt_hdr, const vrt::if_packet_info_t &if_packet_info +){ + //extract the context word (we dont know the endianness so mirror the bytes) + boost::uint32_t word0 = vrt_hdr[if_packet_info.num_header_words32] | + uhd::byteswap(vrt_hdr[if_packet_info.num_header_words32]); + return word0 & 0xff; +} + +typedef boost::function handle_overflow_type; +static inline void handle_overflow_nop(void){} + +/*********************************************************************** + * Super receive packet handler + * + * A receive packet handler represents a group of channels. + * The channel group shares a common sample rate. + * All channels are received in unison in recv(). + **********************************************************************/ +class recv_packet_handler{ +public: + typedef boost::function get_buff_type; + typedef boost::function handle_flowctrl_type; + typedef boost::function issue_stream_cmd_type; + typedef void(*vrt_unpacker_type)(const boost::uint32_t *, vrt::if_packet_info_t &); + //typedef boost::function vrt_unpacker_type; + + /*! + * Make a new packet handler for receive + * \param size the number of transport channels + */ + recv_packet_handler(const size_t size = 1): + _queue_error_for_next_call(false), + _buffers_infos_index(0) + { + #ifdef ERROR_INJECT_DROPPED_PACKETS + recvd_packets = 0; + #endif + + this->resize(size); + set_alignment_failure_threshold(1000); + } + + ~recv_packet_handler(void){ + _task_barrier.interrupt(); + _task_handlers.clear(); + } + + //! Resize the number of transport channels + void resize(const size_t size){ + if (this->size() == size) return; + _task_handlers.clear(); + _props.resize(size); + //re-initialize all buffers infos by re-creating the vector + _buffers_infos = std::vector(4, buffers_info_type(size)); + _task_barrier.resize(size); + _task_handlers.resize(size); + for (size_t i = 1/*skip 0*/; i < size; i++){ + _task_handlers[i] = task::make(boost::bind(&recv_packet_handler::converter_thread_task, this, i)); + }; + } + + //! Get the channel width of this handler + size_t size(void) const{ + return _props.size(); + } + + //! Setup the vrt unpacker function and offset + void set_vrt_unpacker(const vrt_unpacker_type &vrt_unpacker, const size_t header_offset_words32 = 0){ + _vrt_unpacker = vrt_unpacker; + _header_offset_words32 = header_offset_words32; + } + + /*! + * Set the threshold for alignment failure. + * How many packets throw out before giving up? + * \param threshold number of packets per channel + */ + void set_alignment_failure_threshold(const size_t threshold){ + _alignment_faulure_threshold = threshold*this->size(); + } + + //! Set the rate of ticks per second + void set_tick_rate(const double rate){ + _tick_rate = rate; + } + + //! Set the rate of samples per second + void set_samp_rate(const double rate){ + _samp_rate = rate; + } + + /*! + * Set the function to get a managed buffer. + * \param xport_chan which transport channel + * \param get_buff the getter function + */ + void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff, const bool flush = false){ + if (flush){ + while (get_buff(0.0)); + } + _props.at(xport_chan).get_buff = get_buff; + } + + /*! + * Flush all transports in the streamer: + * The packet payload is discarded. + */ + void flush_all(const double timeout = 0.0) + { + _flush_all(timeout); + return; + } + + /*! + * Set the function to handle flow control + * \param xport_chan which transport channel + * \param handle_flowctrl the callback function + */ + void set_xport_handle_flowctrl(const size_t xport_chan, const handle_flowctrl_type &handle_flowctrl, const size_t update_window, const bool do_init = false) + { + _props.at(xport_chan).handle_flowctrl = handle_flowctrl; + //we need the window size to be within the 0xfff (max 12 bit seq) + _props.at(xport_chan).fc_update_window = std::min(update_window, 0xfff); + if (do_init) handle_flowctrl(0); + } + + //! Set the conversion routine for all channels + void set_converter(const uhd::convert::id_type &id){ + _num_outputs = id.num_outputs; + _converter = uhd::convert::get_converter(id)(); + this->set_scale_factor(1/32767.); //update after setting converter + _bytes_per_otw_item = uhd::convert::get_bytes_per_item(id.input_format); + _bytes_per_cpu_item = uhd::convert::get_bytes_per_item(id.output_format); + } + + //! Set the transport channel's overflow handler + void set_overflow_handler(const size_t xport_chan, const handle_overflow_type &handle_overflow){ + _props.at(xport_chan).handle_overflow = handle_overflow; + } + + //! Set the scale factor used in float conversion + void set_scale_factor(const double scale_factor){ + _converter->set_scalar(scale_factor); + } + + //! Set the callback to issue stream commands + void set_issue_stream_cmd(const size_t xport_chan, const issue_stream_cmd_type &issue_stream_cmd) + { + _props.at(xport_chan).issue_stream_cmd = issue_stream_cmd; + } + + //! Overload call to issue stream commands + void issue_stream_cmd(const stream_cmd_t &stream_cmd) + { + for (size_t i = 0; i < _props.size(); i++) + { + if (_props[i].issue_stream_cmd) _props[i].issue_stream_cmd(stream_cmd); + } + } + + /******************************************************************* + * Receive: + * The entry point for the fast-path receive calls. + * Dispatch into combinations of single packet receive calls. + ******************************************************************/ + UHD_INLINE size_t recv( + const uhd::rx_streamer::buffs_type &buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t &metadata, + const double timeout, + const bool one_packet + ){ + //handle metadata queued from a previous receive + if (_queue_error_for_next_call){ + _queue_error_for_next_call = false; + metadata = _queue_metadata; + //We want to allow a full buffer recv to be cut short by a timeout, + //but do not want to generate an inline timeout message packet. + if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) return 0; + } + + size_t accum_num_samps = recv_one_packet( + buffs, nsamps_per_buff, metadata, timeout + ); + + if (one_packet){ +#ifdef UHD_TXRX_DEBUG_PRINTS + dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet); +#endif + return accum_num_samps; + } + + //first recv had an error code set, return immediately + if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) return accum_num_samps; + + //loop until buffer is filled or error code + while(accum_num_samps < nsamps_per_buff){ + size_t num_samps = recv_one_packet( + buffs, nsamps_per_buff - accum_num_samps, _queue_metadata, + timeout, accum_num_samps*_bytes_per_cpu_item + ); + + //metadata had an error code set, store for next call and return + if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE){ + _queue_error_for_next_call = true; + break; + } + accum_num_samps += num_samps; + } +#ifdef UHD_TXRX_DEBUG_PRINTS + dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet); +#endif + return accum_num_samps; + } + +private: + vrt_unpacker_type _vrt_unpacker; + size_t _header_offset_words32; + double _tick_rate, _samp_rate; + bool _queue_error_for_next_call; + size_t _alignment_faulure_threshold; + rx_metadata_t _queue_metadata; + struct xport_chan_props_type{ + xport_chan_props_type(void): + packet_count(0), + handle_overflow(&handle_overflow_nop), + fc_update_window(0) + {} + get_buff_type get_buff; + issue_stream_cmd_type issue_stream_cmd; + size_t packet_count; + handle_overflow_type handle_overflow; + handle_flowctrl_type handle_flowctrl; + size_t fc_update_window; + }; + std::vector _props; + size_t _num_outputs; + size_t _bytes_per_otw_item; //used in conversion + size_t _bytes_per_cpu_item; //used in conversion + uhd::convert::converter::sptr _converter; //used in conversion + + //! information stored for a received buffer + struct per_buffer_info_type{ + void reset() + { + buff.reset(); + vrt_hdr = NULL; + time = time_spec_t(0.0); + copy_buff = NULL; + } + managed_recv_buffer::sptr buff; + const boost::uint32_t *vrt_hdr; + vrt::if_packet_info_t ifpi; + time_spec_t time; + const char *copy_buff; + }; + + //!information stored for a set of aligned buffers + struct buffers_info_type : std::vector { + buffers_info_type(const size_t size): + std::vector(size), + indexes_todo(size, true), + alignment_time_valid(false), + data_bytes_to_copy(0), + fragment_offset_in_samps(0) + {/* NOP */} + void reset() + { + indexes_todo.set(); + alignment_time = time_spec_t(0.0); + alignment_time_valid = false; + data_bytes_to_copy = 0; + fragment_offset_in_samps = 0; + metadata.reset(); + for (size_t i = 0; i < size(); i++) + at(i).reset(); + } + boost::dynamic_bitset<> indexes_todo; //used in alignment logic + time_spec_t alignment_time; //used in alignment logic + bool alignment_time_valid; //used in alignment logic + size_t data_bytes_to_copy; //keeps track of state + size_t fragment_offset_in_samps; //keeps track of state + rx_metadata_t metadata; //packet description + }; + + //! a circular queue of buffer infos + std::vector _buffers_infos; + size_t _buffers_infos_index; + buffers_info_type &get_curr_buffer_info(void){return _buffers_infos[_buffers_infos_index];} + buffers_info_type &get_prev_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 3)%4];} + buffers_info_type &get_next_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 1)%4];} + void increment_buffer_info(void){_buffers_infos_index = (_buffers_infos_index + 1)%4;} + + //! possible return options for the packet receiver + enum packet_type{ + PACKET_IF_DATA, + PACKET_TIMESTAMP_ERROR, + PACKET_INLINE_MESSAGE, + PACKET_TIMEOUT_ERROR, + PACKET_SEQUENCE_ERROR + }; + + #ifdef ERROR_INJECT_DROPPED_PACKETS + int recvd_packets; + #endif + + /******************************************************************* + * Get and process a single packet from the transport: + * Receive a single packet at the given index. + * Extract all the relevant info and store. + * Check the info to determine the return code. + ******************************************************************/ + UHD_INLINE packet_type get_and_process_single_packet( + const size_t index, + per_buffer_info_type &prev_buffer_info, + per_buffer_info_type &curr_buffer_info, + double timeout + ){ + //get a single packet from the transport layer + managed_recv_buffer::sptr &buff = curr_buffer_info.buff; + buff = _props[index].get_buff(timeout); + if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR; + + #ifdef ERROR_INJECT_DROPPED_PACKETS + if (++recvd_packets > 1000) + { + recvd_packets = 0; + buff.reset(); + buff = _props[index].get_buff(timeout); + if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR; + } + #endif + + //bounds check before extract + size_t num_packet_words32 = buff->size()/sizeof(boost::uint32_t); + if (num_packet_words32 <= _header_offset_words32){ + throw std::runtime_error("recv buffer smaller than vrt packet offset"); + } + + //extract packet info + per_buffer_info_type &info = curr_buffer_info; + info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32; + info.vrt_hdr = buff->cast() + _header_offset_words32; + _vrt_unpacker(info.vrt_hdr, info.ifpi); + info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true + info.copy_buff = reinterpret_cast(info.vrt_hdr + info.ifpi.num_header_words32); + + //handle flow control + if (_props[index].handle_flowctrl) + { + if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0) + { + _props[index].handle_flowctrl(info.ifpi.packet_count); + } + } + + //-------------------------------------------------------------- + //-- Determine return conditions: + //-- The order of these checks is HOLY. + //-------------------------------------------------------------- + + //1) check for inline IF message packets + if (info.ifpi.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){ + return PACKET_INLINE_MESSAGE; + } + + //2) check for sequence errors + #ifndef SRPH_DONT_CHECK_SEQUENCE + const size_t seq_mask = (info.ifpi.link_type == vrt::if_packet_info_t::LINK_TYPE_NONE)? 0xf : 0xfff; + const size_t expected_packet_count = _props[index].packet_count; + _props[index].packet_count = (info.ifpi.packet_count + 1) & seq_mask; + if (expected_packet_count != info.ifpi.packet_count){ + return PACKET_SEQUENCE_ERROR; + } + #endif + + //3) check for out of order timestamps + if (info.ifpi.has_tsf and prev_buffer_info.time > info.time){ + return PACKET_TIMESTAMP_ERROR; + } + + //4) otherwise the packet is normal! + return PACKET_IF_DATA; + } + + void _flush_all(double timeout) + { + for (size_t i = 0; i < _props.size(); i++) + { + per_buffer_info_type prev_buffer_info, curr_buffer_info; + while (true) + { + //receive a single packet from the transport + try + { + // call into get_and_process_single_packet() + // to make sure flow control is handled + if (get_and_process_single_packet( + i, + prev_buffer_info, + curr_buffer_info, + timeout) == PACKET_TIMEOUT_ERROR) break; + } catch(...){} + prev_buffer_info = curr_buffer_info; + curr_buffer_info.reset(); + } + } + get_prev_buffer_info().reset(); + get_curr_buffer_info().reset(); + get_next_buffer_info().reset(); + } + + /******************************************************************* + * Alignment check: + * Check the received packet for alignment and mark accordingly. + ******************************************************************/ + UHD_INLINE void alignment_check( + const size_t index, buffers_info_type &info + ){ + //if alignment time was not valid or if the sequence id is newer: + // use this index's time as the alignment time + // reset the indexes list and remove this index + if (not info.alignment_time_valid or info[index].time > info.alignment_time){ + info.alignment_time_valid = true; + info.alignment_time = info[index].time; + info.indexes_todo.set(); + info.indexes_todo.reset(index); + info.data_bytes_to_copy = info[index].ifpi.num_payload_bytes; + } + + //if the sequence id matches: + // remove this index from the list and continue + else if (info[index].time == info.alignment_time){ + info.indexes_todo.reset(index); + } + + //if the sequence id is older: + // continue with the same index to try again + //else if (info[index].time < info.alignment_time)... + } + + /******************************************************************* + * Get aligned buffers: + * Iterate through each index and try to accumulate aligned buffers. + * Handle all of the edge cases like inline messages and errors. + * The logic will throw out older packets until it finds a match. + ******************************************************************/ + UHD_INLINE void get_aligned_buffs(double timeout){ + + get_prev_buffer_info().reset(); // no longer need the previous info - reset it for future use + + increment_buffer_info(); //increment to next buffer + + buffers_info_type &prev_info = get_prev_buffer_info(); + buffers_info_type &curr_info = get_curr_buffer_info(); + buffers_info_type &next_info = get_next_buffer_info(); + + //Loop until we get a message of an aligned set of buffers: + // - Receive a single packet and extract its info. + // - Handle the packet type yielded by the receive. + // - Check the timestamps for alignment conditions. + size_t iterations = 0; + while (curr_info.indexes_todo.any()){ + + //get the index to process for this iteration + const size_t index = curr_info.indexes_todo.find_first(); + packet_type packet; + + //receive a single packet from the transport + try{ + packet = get_and_process_single_packet( + index, prev_info[index], curr_info[index], timeout + ); + } + + //handle the case when the get packet throws + catch(const std::exception &e){ + UHD_MSG(error) << boost::format( + "The receive packet handler caught an exception.\n%s" + ) % e.what() << std::endl; + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; + return; + } + + switch(packet){ + case PACKET_IF_DATA: + alignment_check(index, curr_info); + break; + + case PACKET_TIMESTAMP_ERROR: + //If the user changes the device time while streaming or without flushing, + //we can receive a packet that comes before the previous packet in time. + //This could cause the alignment logic to discard future received packets. + //Therefore, when this occurs, we reset the info to restart from scratch. + if (curr_info.alignment_time_valid and curr_info.alignment_time != curr_info[index].time){ + curr_info.alignment_time_valid = false; + } + alignment_check(index, curr_info); + break; + + case PACKET_INLINE_MESSAGE: + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = next_info[index].ifpi.has_tsf; + curr_info.metadata.time_spec = next_info[index].time; + curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi)); + if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){ + rx_metadata_t metadata = curr_info.metadata; + _props[index].handle_overflow(); + curr_info.metadata = metadata; + UHD_MSG(fastpath) << "O"; + } + return; + + case PACKET_TIMEOUT_ERROR: + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; + return; + + case PACKET_SEQUENCE_ERROR: + alignment_check(index, curr_info); + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = prev_info.metadata.has_time_spec; + curr_info.metadata.time_spec = prev_info.metadata.time_spec + time_spec_t::from_ticks( + prev_info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t)/_bytes_per_otw_item, _samp_rate); + curr_info.metadata.out_of_sequence = true; + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + UHD_MSG(fastpath) << "D"; + return; + + } + + //too many iterations: detect alignment failure + if (iterations++ > _alignment_faulure_threshold){ + UHD_MSG(error) << boost::format( + "The receive packet handler failed to time-align packets.\n" + "%u received packets were processed by the handler.\n" + "However, a timestamp match could not be determined.\n" + ) % iterations << std::endl; + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; + _props[index].handle_overflow(); + return; + } + + } + + //set the metadata from the buffer information at index zero + curr_info.metadata.has_time_spec = curr_info[0].ifpi.has_tsf; + curr_info.metadata.time_spec = curr_info[0].time; + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + curr_info.metadata.start_of_burst = curr_info[0].ifpi.sob; + curr_info.metadata.end_of_burst = curr_info[0].ifpi.eob; + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_NONE; + + } + + /******************************************************************* + * Receive a single packet: + * Handles fragmentation, messages, errors, and copy-conversion. + * When no fragments are available, call the get aligned buffers. + * Then copy-convert available data into the user's IO buffers. + ******************************************************************/ + UHD_INLINE size_t recv_one_packet( + const uhd::rx_streamer::buffs_type &buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t &metadata, + const double timeout, + const size_t buffer_offset_bytes = 0 + ){ + //get the next buffer if the current one has expired + if (get_curr_buffer_info().data_bytes_to_copy == 0) + { + //perform receive with alignment logic + get_aligned_buffs(timeout); + } + + buffers_info_type &info = get_curr_buffer_info(); + metadata = info.metadata; + + //interpolate the time spec (useful when this is a fragment) + metadata.time_spec += time_spec_t::from_ticks(info.fragment_offset_in_samps, _samp_rate); + + //extract the number of samples available to copy + const size_t nsamps_available = info.data_bytes_to_copy/_bytes_per_otw_item; + const size_t nsamps_to_copy = std::min(nsamps_per_buff*_num_outputs, nsamps_available); + const size_t bytes_to_copy = nsamps_to_copy*_bytes_per_otw_item; + const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_num_outputs; + + //setup the data to share with converter threads + _convert_nsamps = nsamps_to_copy_per_io_buff; + _convert_buffs = &buffs; + _convert_buffer_offset_bytes = buffer_offset_bytes; + _convert_bytes_to_copy = bytes_to_copy; + + //perform N channels of conversion + converter_thread_task(0); + + //update the copy buffer's availability + info.data_bytes_to_copy -= bytes_to_copy; + + //setup the fragment flags and offset + metadata.more_fragments = info.data_bytes_to_copy != 0; + metadata.fragment_offset = info.fragment_offset_in_samps; + info.fragment_offset_in_samps += nsamps_to_copy; //set for next call + + return nsamps_to_copy_per_io_buff; + } + + /******************************************************************* + * Perform one thread's work of the conversion task. + * The entry and exit use a dual synchronization barrier, + * to wait for data to become ready and block until completion. + ******************************************************************/ + UHD_INLINE void converter_thread_task(const size_t index) + { + _task_barrier.wait(); + + //shortcut references to local data structures + buffers_info_type &buff_info = get_curr_buffer_info(); + per_buffer_info_type &info = buff_info[index]; + const rx_streamer::buffs_type &buffs = *_convert_buffs; + + //fill IO buffs with pointers into the output buffer + void *io_buffs[4/*max interleave*/]; + for (size_t i = 0; i < _num_outputs; i++){ + char *b = reinterpret_cast(buffs[index*_num_outputs + i]); + io_buffs[i] = b + _convert_buffer_offset_bytes; + } + const ref_vector out_buffs(io_buffs, _num_outputs); + + //perform the conversion operation + _converter->conv(info.copy_buff, out_buffs, _convert_nsamps); + + //advance the pointer for the source buffer + info.copy_buff += _convert_bytes_to_copy; + + //release the buffer if fully consumed + if (buff_info.data_bytes_to_copy == _convert_bytes_to_copy){ + info.buff.reset(); //effectively a release + } + + if (index == 0) _task_barrier.wait_others(); + } + + //! Shared variables for the worker threads + reusable_barrier _task_barrier; + std::vector _task_handlers; + size_t _convert_nsamps; + const rx_streamer::buffs_type *_convert_buffs; + size_t _convert_buffer_offset_bytes; + size_t _convert_bytes_to_copy; + + /* + * This last section is only for debugging purposes. + * It causes a lot of prints to stderr which can be piped to a file. + * Gathered data can be used to post process it with external tools. + */ +#ifdef UHD_TXRX_DEBUG_PRINTS + struct dbg_recv_stat_t { + dbg_recv_stat_t(long wc, size_t nspb, size_t nsr, uhd::rx_metadata_t md, double to, bool op, double rate): + wallclock(wc), nsamps_per_buff(nspb), nsamps_recv(nsr), metadata(md), timeout(to), one_packet(op), samp_rate(rate) + {} + long wallclock; + size_t nsamps_per_buff; + size_t nsamps_recv; + uhd::rx_metadata_t metadata; + double timeout; + bool one_packet; + double samp_rate; + // Create a formatted print line for all the info gathered in this struct. + std::string print_line() { + boost::format fmt("recv,%ld,%f,%i,%i,%s,%i,%s,%s,%s,%i,%s,%ld"); + fmt % wallclock; + fmt % timeout % (int)nsamps_per_buff % (int) nsamps_recv; + fmt % (one_packet ? "true":"false"); + fmt % metadata.error_code; + fmt % (metadata.start_of_burst ? "true":"false") % (metadata.end_of_burst ? "true":"false"); + fmt % (metadata.more_fragments ? "true":"false") % (int)metadata.fragment_offset; + fmt % (metadata.has_time_spec ? "true":"false") % metadata.time_spec.to_ticks(samp_rate); + return fmt.str(); + } + }; + + void dbg_gather_data(const size_t nsamps_per_buff, const size_t nsamps_recv, + uhd::rx_metadata_t &metadata, const double timeout, + const bool one_packet, + bool dbg_print_directly = true + ) + { + // Initialize a struct with all available data. It can return a formatted string with all infos if wanted. + dbg_recv_stat_t data(boost::get_system_time().time_of_day().total_microseconds(), + nsamps_per_buff, + nsamps_recv, + metadata, + timeout, + one_packet, + _samp_rate + ); + if(dbg_print_directly) { + dbg_print_err(data.print_line()); + } + } + + + + void dbg_print_err(std::string msg) { + std::string dbg_prefix("super_recv_packet_handler,"); + msg = dbg_prefix + msg; + fprintf(stderr, "%s\n", msg.c_str()); + } +#endif +}; + +class recv_packet_streamer : public recv_packet_handler, public rx_streamer{ +public: + recv_packet_streamer(const size_t max_num_samps){ + _max_num_samps = max_num_samps; + } + + size_t get_num_channels(void) const{ + return this->size(); + } + + size_t get_max_num_samps(void) const{ + return _max_num_samps; + } + + size_t recv( + const rx_streamer::buffs_type &buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t &metadata, + const double timeout, + const bool one_packet + ){ + return recv_packet_handler::recv(buffs, nsamps_per_buff, metadata, timeout, one_packet); + } + + void issue_stream_cmd(const stream_cmd_t &stream_cmd) + { + return recv_packet_handler::issue_stream_cmd(stream_cmd); + } + +private: + size_t _max_num_samps; +}; + +}}} //namespace + +#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP */ diff --git a/host/cores/super_send_packet_handler.hpp b/host/cores/super_send_packet_handler.hpp new file mode 100644 index 00000000..c2810842 --- /dev/null +++ b/host/cores/super_send_packet_handler.hpp @@ -0,0 +1,474 @@ +// +// Copyright 2011-2013 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// + +#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP +#define INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef UHD_TXRX_DEBUG_PRINTS +// Included for debugging +#include +#include +#include "boost/date_time/posix_time/posix_time.hpp" +#include +#include +#endif + +namespace uhd { +namespace transport { +namespace sph { + +/*********************************************************************** + * Super send packet handler + * + * A send packet handler represents a group of channels. + * The channel group shares a common sample rate. + * All channels are sent in unison in send(). + **********************************************************************/ +class send_packet_handler{ +public: + typedef boost::function get_buff_type; + typedef boost::function async_receiver_type; + typedef void(*vrt_packer_type)(boost::uint32_t *, vrt::if_packet_info_t &); + //typedef boost::function vrt_packer_type; + + /*! + * Make a new packet handler for send + * \param size the number of transport channels + */ + send_packet_handler(const size_t size = 1): + _next_packet_seq(0), _cached_metadata(false) + { + this->set_enable_trailer(true); + this->resize(size); + } + + ~send_packet_handler(void){ + _task_barrier.interrupt(); + _task_handlers.clear(); + } + + //! Resize the number of transport channels + void resize(const size_t size){ + if (this->size() == size) return; + _task_handlers.clear(); + _props.resize(size); + static const boost::uint64_t zero = 0; + _zero_buffs.resize(size, &zero); + _task_barrier.resize(size); + _task_handlers.resize(size); + for (size_t i = 1/*skip 0*/; i < size; i++){ + _task_handlers[i] = task::make(boost::bind(&send_packet_handler::converter_thread_task, this, i)); + }; + } + + //! Get the channel width of this handler + size_t size(void) const{ + return _props.size(); + } + + //! Setup the vrt packer function and offset + void set_vrt_packer(const vrt_packer_type &vrt_packer, const size_t header_offset_words32 = 0){ + _vrt_packer = vrt_packer; + _header_offset_words32 = header_offset_words32; + } + + //! Set the stream ID for a specific channel (or no SID) + void set_xport_chan_sid(const size_t xport_chan, const bool has_sid, const boost::uint32_t sid = 0){ + _props.at(xport_chan).has_sid = has_sid; + _props.at(xport_chan).sid = sid; + } + + void set_enable_trailer(const bool enable) + { + _has_tlr = enable; + } + + //! Set the rate of ticks per second + void set_tick_rate(const double rate){ + _tick_rate = rate; + } + + //! Set the rate of samples per second + void set_samp_rate(const double rate){ + _samp_rate = rate; + } + + /*! + * Set the function to get a managed buffer. + * \param xport_chan which transport channel + * \param get_buff the getter function + */ + void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){ + _props.at(xport_chan).get_buff = get_buff; + } + + //! Set the conversion routine for all channels + void set_converter(const uhd::convert::id_type &id){ + _num_inputs = id.num_inputs; + _converter = uhd::convert::get_converter(id)(); + this->set_scale_factor(32767.); //update after setting converter + _bytes_per_otw_item = uhd::convert::get_bytes_per_item(id.output_format); + _bytes_per_cpu_item = uhd::convert::get_bytes_per_item(id.input_format); + } + + /*! + * Set the maximum number of samples per host packet. + * Ex: A USRP1 in dual channel mode would be half. + * \param num_samps the maximum samples in a packet + */ + void set_max_samples_per_packet(const size_t num_samps){ + _max_samples_per_packet = num_samps; + } + + //! Set the scale factor used in float conversion + void set_scale_factor(const double scale_factor){ + _converter->set_scalar(scale_factor); + } + + //! Set the callback to get async messages + void set_async_receiver(const async_receiver_type &async_receiver) + { + _async_receiver = async_receiver; + } + + //! Overload call to get async metadata + bool recv_async_msg( + uhd::async_metadata_t &async_metadata, double timeout = 0.1 + ){ + if (_async_receiver) return _async_receiver(async_metadata, timeout); + boost::this_thread::sleep(boost::posix_time::microseconds(long(timeout*1e6))); + return false; + } + + /******************************************************************* + * Send: + * The entry point for the fast-path send calls. + * Dispatch into combinations of single packet send calls. + ******************************************************************/ + UHD_INLINE size_t send( + const uhd::tx_streamer::buffs_type &buffs, + const size_t nsamps_per_buff, + const uhd::tx_metadata_t &metadata, + const double timeout + ){ + //translate the metadata to vrt if packet info + vrt::if_packet_info_t if_packet_info; + if_packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA; + //if_packet_info.has_sid = false; //set per channel + if_packet_info.has_cid = false; + if_packet_info.has_tlr = _has_tlr; + if_packet_info.has_tsi = false; + if_packet_info.has_tsf = metadata.has_time_spec; + if_packet_info.tsf = metadata.time_spec.to_ticks(_tick_rate); + if_packet_info.sob = metadata.start_of_burst; + if_packet_info.eob = metadata.end_of_burst; + + /* + * Metadata is cached when we get a send requesting a start of burst with no samples. + * It is applied here on the next call to send() that actually has samples to send. + */ + if (_cached_metadata && nsamps_per_buff != 0) + { + // If the new metada has a time_spec, do not use the cached time_spec. + if (!metadata.has_time_spec) + { + if_packet_info.has_tsf = _metadata_cache.has_time_spec; + if_packet_info.tsf = _metadata_cache.time_spec.to_ticks(_tick_rate); + } + if_packet_info.sob = _metadata_cache.start_of_burst; + if_packet_info.eob = _metadata_cache.end_of_burst; + _cached_metadata = false; + } + + if (nsamps_per_buff <= _max_samples_per_packet){ + + //TODO remove this code when sample counts of zero are supported by hardware + #ifndef SSPH_DONT_PAD_TO_ONE + static const boost::uint64_t zero = 0; + _zero_buffs.resize(buffs.size(), &zero); + + if (nsamps_per_buff == 0) + { + // if this is a start of a burst and there are no samples + if (metadata.start_of_burst) + { + // cache metadata and apply on the next send() + _metadata_cache = metadata; + _cached_metadata = true; + return 0; + } else { + // send requests with no samples are handled here (such as end of burst) + return send_one_packet(_zero_buffs, 1, if_packet_info, timeout) & 0x0; + } + } + #endif + + size_t nsamps_sent = send_one_packet(buffs, nsamps_per_buff, if_packet_info, timeout); +#ifdef UHD_TXRX_DEBUG_PRINTS + dbg_print_send(nsamps_per_buff, nsamps_sent, metadata, timeout); +#endif + return nsamps_sent; } + size_t total_num_samps_sent = 0; + + //false until final fragment + if_packet_info.eob = false; + + const size_t num_fragments = (nsamps_per_buff-1)/_max_samples_per_packet; + const size_t final_length = ((nsamps_per_buff-1)%_max_samples_per_packet)+1; + + //loop through the following fragment indexes + for (size_t i = 0; i < num_fragments; i++){ + + //send a fragment with the helper function + const size_t num_samps_sent = send_one_packet( + buffs, _max_samples_per_packet, + if_packet_info, timeout, + total_num_samps_sent*_bytes_per_cpu_item + ); + total_num_samps_sent += num_samps_sent; + if (num_samps_sent == 0) return total_num_samps_sent; + + //setup metadata for the next fragment + const time_spec_t time_spec = metadata.time_spec + time_spec_t::from_ticks(total_num_samps_sent, _samp_rate); + if_packet_info.tsf = time_spec.to_ticks(_tick_rate); + if_packet_info.sob = false; + + } + + //send the final fragment with the helper function + if_packet_info.eob = metadata.end_of_burst; + size_t nsamps_sent = total_num_samps_sent + + send_one_packet(buffs, final_length, if_packet_info, timeout, + total_num_samps_sent * _bytes_per_cpu_item); +#ifdef UHD_TXRX_DEBUG_PRINTS + dbg_print_send(nsamps_per_buff, nsamps_sent, metadata, timeout); + +#endif + return nsamps_sent; + } + +private: + + vrt_packer_type _vrt_packer; + size_t _header_offset_words32; + double _tick_rate, _samp_rate; + struct xport_chan_props_type{ + xport_chan_props_type(void):has_sid(false),sid(0){} + get_buff_type get_buff; + bool has_sid; + boost::uint32_t sid; + managed_send_buffer::sptr buff; + }; + std::vector _props; + size_t _num_inputs; + size_t _bytes_per_otw_item; //used in conversion + size_t _bytes_per_cpu_item; //used in conversion + uhd::convert::converter::sptr _converter; //used in conversion + size_t _max_samples_per_packet; + std::vector _zero_buffs; + size_t _next_packet_seq; + bool _has_tlr; + async_receiver_type _async_receiver; + bool _cached_metadata; + uhd::tx_metadata_t _metadata_cache; + +#ifdef UHD_TXRX_DEBUG_PRINTS + struct dbg_send_stat_t { + dbg_send_stat_t(long wc, size_t nspb, size_t nss, uhd::tx_metadata_t md, double to, double rate): + wallclock(wc), nsamps_per_buff(nspb), nsamps_sent(nss), metadata(md), timeout(to), samp_rate(rate) + {} + long wallclock; + size_t nsamps_per_buff; + size_t nsamps_sent; + uhd::tx_metadata_t metadata; + double timeout; + double samp_rate; + // Create a formatted print line for all the info gathered in this struct. + std::string print_line() { + boost::format fmt("send,%ld,%f,%i,%i,%s,%s,%s,%ld"); + fmt % wallclock; + fmt % timeout % (int)nsamps_per_buff % (int) nsamps_sent; + fmt % (metadata.start_of_burst ? "true":"false") % (metadata.end_of_burst ? "true":"false"); + fmt % (metadata.has_time_spec ? "true":"false") % metadata.time_spec.to_ticks(samp_rate); + return fmt.str(); + } + }; + + void dbg_print_send(size_t nsamps_per_buff, size_t nsamps_sent, + const uhd::tx_metadata_t &metadata, const double timeout, + bool dbg_print_directly = true) + { + dbg_send_stat_t data(boost::get_system_time().time_of_day().total_microseconds(), + nsamps_per_buff, + nsamps_sent, + metadata, + timeout, + _samp_rate + ); + if(dbg_print_directly){ + dbg_print_err(data.print_line()); + } + } + void dbg_print_err(std::string msg) { + msg = "super_send_packet_handler," + msg; + fprintf(stderr, "%s\n", msg.c_str()); + } + + +#endif + + /******************************************************************* + * Send a single packet: + ******************************************************************/ + UHD_INLINE size_t send_one_packet( + const uhd::tx_streamer::buffs_type &buffs, + const size_t nsamps_per_buff, + vrt::if_packet_info_t &if_packet_info, + const double timeout, + const size_t buffer_offset_bytes = 0 + ){ + + //load the rest of the if_packet_info in here + if_packet_info.num_payload_bytes = nsamps_per_buff*_num_inputs*_bytes_per_otw_item; + if_packet_info.num_payload_words32 = (if_packet_info.num_payload_bytes + 3/*round up*/)/sizeof(boost::uint32_t); + if_packet_info.packet_count = _next_packet_seq; + + //get a buffer for each channel or timeout + BOOST_FOREACH(xport_chan_props_type &props, _props){ + if (not props.buff) props.buff = props.get_buff(timeout); + if (not props.buff) return 0; //timeout + } + + //setup the data to share with converter threads + _convert_nsamps = nsamps_per_buff; + _convert_buffs = &buffs; + _convert_buffer_offset_bytes = buffer_offset_bytes; + _convert_if_packet_info = &if_packet_info; + + //perform N channels of conversion + converter_thread_task(0); + + _next_packet_seq++; //increment sequence after commits + return nsamps_per_buff; + } + + /******************************************************************* + * Perform one thread's work of the conversion task. + * The entry and exit use a dual synchronization barrier, + * to wait for data to become ready and block until completion. + ******************************************************************/ + UHD_INLINE void converter_thread_task(const size_t index) + { + _task_barrier.wait(); + + //shortcut references to local data structures + managed_send_buffer::sptr &buff = _props[index].buff; + vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info; + const tx_streamer::buffs_type &buffs = *_convert_buffs; + + //fill IO buffs with pointers into the output buffer + const void *io_buffs[4/*max interleave*/]; + for (size_t i = 0; i < _num_inputs; i++){ + const char *b = reinterpret_cast(buffs[index*_num_inputs + i]); + io_buffs[i] = b + _convert_buffer_offset_bytes; + } + const ref_vector in_buffs(io_buffs, _num_inputs); + + //pack metadata into a vrt header + boost::uint32_t *otw_mem = buff->cast() + _header_offset_words32; + if_packet_info.has_sid = _props[index].has_sid; + if_packet_info.sid = _props[index].sid; + _vrt_packer(otw_mem, if_packet_info); + otw_mem += if_packet_info.num_header_words32; + + //perform the conversion operation + _converter->conv(in_buffs, otw_mem, _convert_nsamps); + + //commit the samples to the zero-copy interface + const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32; + buff->commit(num_vita_words32*sizeof(boost::uint32_t)); + buff.reset(); //effectively a release + + if (index == 0) _task_barrier.wait_others(); + } + + //! Shared variables for the worker threads + reusable_barrier _task_barrier; + std::vector _task_handlers; + size_t _convert_nsamps; + const tx_streamer::buffs_type *_convert_buffs; + size_t _convert_buffer_offset_bytes; + vrt::if_packet_info_t *_convert_if_packet_info; + +}; + +class send_packet_streamer : public send_packet_handler, public tx_streamer{ +public: + send_packet_streamer(const size_t max_num_samps){ + _max_num_samps = max_num_samps; + this->set_max_samples_per_packet(_max_num_samps); + } + + size_t get_num_channels(void) const{ + return this->size(); + } + + size_t get_max_num_samps(void) const{ + return _max_num_samps; + } + + size_t send( + const tx_streamer::buffs_type &buffs, + const size_t nsamps_per_buff, + const uhd::tx_metadata_t &metadata, + const double timeout + ){ + return send_packet_handler::send(buffs, nsamps_per_buff, metadata, timeout); + } + + bool recv_async_msg( + uhd::async_metadata_t &async_metadata, double timeout = 0.1 + ){ + return send_packet_handler::recv_async_msg(async_metadata, timeout); + } + +private: + size_t _max_num_samps; +}; + +} // namespace sph +} // namespace transport +} // namespace uhd + +#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP */ diff --git a/host/cores/validate_subdev_spec.cpp b/host/cores/validate_subdev_spec.cpp new file mode 100644 index 00000000..fab40b20 --- /dev/null +++ b/host/cores/validate_subdev_spec.cpp @@ -0,0 +1,73 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// + +#include "validate_subdev_spec.hpp" +#include +#include +#include +#include + +using namespace uhd; +using namespace uhd::usrp; + +namespace uhd{ namespace usrp{ + + static std::ostream& operator<< (std::ostream &out, const subdev_spec_pair_t &pair){ + out << pair.db_name << ":" << pair.sd_name; + return out; + } + +}} + +void uhd::usrp::validate_subdev_spec( + property_tree::sptr tree, + const subdev_spec_t &spec, + const std::string &type, + const std::string &mb +){ + const size_t num_dsps = tree->list(str(boost::format("/mboards/%s/%s_dsps") % mb % type)).size(); + + //sanity checking on the length + if (spec.size() == 0) throw uhd::value_error(str(boost::format( + "Empty %s subdevice specification is not supported.\n" + ) % type)); + if (spec.size() > num_dsps) throw uhd::value_error(str(boost::format( + "The subdevice specification \"%s\" is too long.\n" + "The user specified %u channels, but there are only %u %s dsps on mboard %s.\n" + ) % spec.to_string() % spec.size() % num_dsps % type % mb)); + + //make a list of all possible specs + subdev_spec_t all_specs; + BOOST_FOREACH(const std::string &db, tree->list(str(boost::format("/mboards/%s/dboards") % mb))){ + BOOST_FOREACH(const std::string &sd, tree->list(str(boost::format("/mboards/%s/dboards/%s/%s_frontends") % mb % db % type))){ + all_specs.push_back(subdev_spec_pair_t(db, sd)); + } + } + + //validate that the spec is possible + BOOST_FOREACH(const subdev_spec_pair_t &pair, spec){ + uhd::assert_has(all_specs, pair, str(boost::format("%s subdevice specification on mboard %s") % type % mb)); + } + + //enable selected frontends, disable others + BOOST_FOREACH(const subdev_spec_pair_t &pair, all_specs){ + const bool enb = uhd::has(spec, pair); + tree->access(str(boost::format( + "/mboards/%s/dboards/%s/%s_frontends/%s/enabled" + ) % mb % pair.db_name % type % pair.sd_name)).set(enb); + } +} diff --git a/host/cores/validate_subdev_spec.hpp b/host/cores/validate_subdev_spec.hpp new file mode 100644 index 00000000..7d9e2c30 --- /dev/null +++ b/host/cores/validate_subdev_spec.hpp @@ -0,0 +1,38 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// + +#ifndef INCLUDED_LIBUHD_USRP_COMMON_VALIDATE_SUBDEV_SPEC_HPP +#define INCLUDED_LIBUHD_USRP_COMMON_VALIDATE_SUBDEV_SPEC_HPP + +#include +#include +#include +#include + +namespace uhd{ namespace usrp{ + + //! Validate a subdev spec against a property tree + void validate_subdev_spec( + property_tree::sptr tree, + const subdev_spec_t &spec, + const std::string &type, //rx or tx + const std::string &mb = "0" + ); + +}} //namespace uhd::usrp + +#endif /* INCLUDED_LIBUHD_USRP_COMMON_VALIDATE_SUBDEV_SPEC_HPP */ diff --git a/host/umtrx_impl.cpp b/host/umtrx_impl.cpp index 4076aac4..864d2d96 100644 --- a/host/umtrx_impl.cpp +++ b/host/umtrx_impl.cpp @@ -49,7 +49,8 @@ UHD_STATIC_BLOCK(register_umtrx_device){ **********************************************************************/ umtrx_impl::umtrx_impl(const device_addr_t &device_addr) { - UHD_MSG(status) << "Opening a UmTRX device..." << std::endl; + _device_ip_addr = device_addr["addr"]; + UHD_MSG(status) << "Opening a UmTRX device... " << _device_ip_addr << std::endl; //////////////////////////////////////////////////////////////////// // create controller objects and initialize the properties tree @@ -62,7 +63,7 @@ umtrx_impl::umtrx_impl(const device_addr_t &device_addr) // create the iface that controls i2c, spi, uart, and wb //////////////////////////////////////////////////////////////// _iface = umtrx_iface::make(udp_simple::make_connected( - device_addr["addr"], BOOST_STRINGIZE(USRP2_UDP_CTRL_PORT) + _device_ip_addr, BOOST_STRINGIZE(USRP2_UDP_CTRL_PORT) )); _tree->create(mb_path / "name").set(_iface->get_cname()); _tree->create(mb_path / "fw_version").set(_iface->get_fw_version_string()); @@ -239,8 +240,8 @@ umtrx_impl::umtrx_impl(const device_addr_t &device_addr) _tree->create >(mb_path / "time_source" / "options") .publish(boost::bind(&time64_core_200::get_time_sources, _time64)); //setup reference source props - _tree->create(mb_path / "clock_source" / "value"); - //?? .subscribe(boost::bind(&umtrx_impl::update_clock_source, this, mb, _1)); + _tree->create(mb_path / "clock_source" / "value") + .subscribe(boost::bind(&umtrx_impl::update_clock_source, this, _1)); static const std::vector clock_sources = boost::assign::list_of("internal")("external"); _tree->create >(mb_path / "clock_source"/ "options").set(clock_sources); @@ -348,6 +349,8 @@ umtrx_impl::umtrx_impl(const device_addr_t &device_addr) _tree->access(mb_path / "tick_rate") .set(this->get_master_clock_rate()); this->time64_self_test(); + _rx_streamers.resize(_rx_dsps.size()); + _tx_streamers.resize(_tx_dsps.size()); } umtrx_impl::~umtrx_impl(void){ @@ -374,3 +377,5 @@ void umtrx_impl::time64_self_test(void) const bool within_range = (secs_elapsed < (1.5)*sleepTime and secs_elapsed > (0.5)*sleepTime); UHD_MSG(status) << (within_range? "pass" : "fail") << std::endl; } + +void umtrx_impl::update_clock_source(const std::string &){} diff --git a/host/umtrx_impl.hpp b/host/umtrx_impl.hpp index 34dc2b8e..8d9f3e47 100644 --- a/host/umtrx_impl.hpp +++ b/host/umtrx_impl.hpp @@ -80,6 +80,7 @@ public: private: //communication interfaces + std::string _device_ip_addr; umtrx_iface::sptr _iface; //controls for perifs @@ -102,6 +103,11 @@ private: void update_rx_samp_rate(const size_t, const double rate); void update_tx_samp_rate(const size_t, const double rate); void time64_self_test(void); + void update_rates(void); + + //streaming + std::vector > _rx_streamers; + std::vector > _tx_streamers; }; #endif /* INCLUDED_UMTRX_IMPL_HPP */ diff --git a/host/umtrx_io_impl.cpp b/host/umtrx_io_impl.cpp index 11af5f83..f04125f0 100644 --- a/host/umtrx_io_impl.cpp +++ b/host/umtrx_io_impl.cpp @@ -18,6 +18,13 @@ #include "umtrx_impl.hpp" #include "umtrx_regs.hpp" +#include "cores/validate_subdev_spec.hpp" +#include "cores/async_packet_handler.hpp" +#include "cores/super_recv_packet_handler.hpp" +#include "cores/super_send_packet_handler.hpp" + +//A reasonable number of frames for send/recv and async/sync +static const size_t DEFAULT_NUM_FRAMES = 32; using namespace uhd; using namespace uhd::usrp; @@ -25,16 +32,163 @@ using namespace uhd::transport; namespace asio = boost::asio; namespace pt = boost::posix_time; -uhd::rx_streamer::sptr umtrx_impl::get_rx_stream(const uhd::stream_args_t &args){} -uhd::tx_streamer::sptr umtrx_impl::get_tx_stream(const uhd::stream_args_t &args){} bool umtrx_impl::recv_async_msg(uhd::async_metadata_t &, double){} -void umtrx_impl::update_tick_rate(const double rate){} + +void umtrx_impl::update_rx_subdev_spec(const uhd::usrp::subdev_spec_t &){} +void umtrx_impl::update_tx_subdev_spec(const uhd::usrp::subdev_spec_t &){} - void umtrx_impl::update_rx_subdev_spec(const uhd::usrp::subdev_spec_t &){} - void umtrx_impl::update_tx_subdev_spec(const uhd::usrp::subdev_spec_t &){} - void umtrx_impl::update_clock_source(const std::string &){} -void umtrx_impl::update_rx_samp_rate(const size_t, const double rate){} - void umtrx_impl::update_tx_samp_rate(const size_t, const double rate){} +/*********************************************************************** + * Update rates + **********************************************************************/ +void umtrx_impl::update_rates(void) +{ + fs_path root = "/mboards/0"; + _tree->access(root / "tick_rate").update(); + + //and now that the tick rate is set, init the host rates to something + BOOST_FOREACH(const std::string &name, _tree->list(root / "rx_dsps")) + { + _tree->access(root / "rx_dsps" / name / "rate" / "value").update(); + } + BOOST_FOREACH(const std::string &name, _tree->list(root / "tx_dsps")) + { + _tree->access(root / "tx_dsps" / name / "rate" / "value").update(); + } +} + +void umtrx_impl::update_rx_samp_rate(const size_t dsp, const double rate) +{ + boost::shared_ptr my_streamer = + boost::dynamic_pointer_cast(_tx_streamers[dsp].lock()); + if (not my_streamer) return; + + my_streamer->set_samp_rate(rate); + const double adj = _rx_dsps[dsp]->get_scaling_adjustment(); + my_streamer->set_scale_factor(adj); +} + +void umtrx_impl::update_tx_samp_rate(const size_t dsp, const double rate) +{ + boost::shared_ptr my_streamer = + boost::dynamic_pointer_cast(_tx_streamers[dsp].lock()); + if (not my_streamer) return; + + my_streamer->set_samp_rate(rate); + const double adj = _tx_dsps[dsp]->get_scaling_adjustment(); + my_streamer->set_scale_factor(adj); +} + +void umtrx_impl::update_tick_rate(const double rate) +{ + //update the tick rate on all existing streamers -> thread safe + for (size_t i = 0; i < _rx_streamers.size(); i++) + { + boost::shared_ptr my_streamer = + boost::dynamic_pointer_cast(_rx_streamers[i].lock()); + if (not my_streamer) continue; + my_streamer->set_tick_rate(rate); + } + for (size_t i = 0; i < _tx_streamers.size(); i++) + { + boost::shared_ptr my_streamer = + boost::dynamic_pointer_cast(_tx_streamers[i].lock()); + if (not my_streamer) continue; + my_streamer->set_tick_rate(rate); + } +} + +/*********************************************************************** + * Receive streamer + **********************************************************************/ +uhd::rx_streamer::sptr umtrx_impl::get_rx_stream(const uhd::stream_args_t &args_) +{ + stream_args_t args = args_; + + //setup defaults for unspecified values + args.otw_format = args.otw_format.empty()? "sc16" : args.otw_format; + args.channels = args.channels.empty()? std::vector(1, 0) : args.channels; + + //setup the transport hints (default to a large recv buff) + if (not args.args.has_key("recv_buff_size")) + { + #if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) + //limit buffer resize on macos or it will error + args.args["recv_buff_size"] = "1e6"; + #elif defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32) + //set to half-a-second of buffering at max rate + args.args["recv_buff_size"] = "50e6"; + #endif + } + zero_copy_xport_params default_params; + default_params.send_frame_size = transport::udp_simple::mtu; + default_params.recv_frame_size = transport::udp_simple::mtu; + default_params.num_send_frames = DEFAULT_NUM_FRAMES; + default_params.num_recv_frames = DEFAULT_NUM_FRAMES; + + //create the transport + udp_zero_copy::buff_params ignored_params; + //TODO determine port, program reply + zero_copy_if::sptr xport = udp_zero_copy::make(_device_ip_addr, BOOST_STRINGIZE(0), default_params, ignored_params, args.args); + + //calculate packet size + static const size_t hdr_size = 0 + + vrt::max_if_hdr_words32*sizeof(boost::uint32_t) + + sizeof(vrt::if_packet_info_t().tlr) //forced to have trailer + - sizeof(vrt::if_packet_info_t().cid) //no class id ever used + - sizeof(vrt::if_packet_info_t().tsi) //no int time ever used + ; + const size_t bpp = xport->get_recv_frame_size() - hdr_size; + const size_t bpi = convert::get_bytes_per_item(args.otw_format); + const size_t spp = unsigned(args.args.cast("spp", bpp/bpi)); + + //make the new streamer given the samples per packet + boost::shared_ptr my_streamer = boost::make_shared(spp); + + //init some streamer stuff + my_streamer->resize(args.channels.size()); + my_streamer->set_vrt_unpacker(&vrt::if_hdr_unpack_be); + + //set the converter + uhd::convert::id_type id; + id.input_format = args.otw_format + "_item32_be"; + id.num_inputs = 1; + id.output_format = args.cpu_format; + id.num_outputs = 1; + my_streamer->set_converter(id); + + //bind callbacks for the handler + for (size_t chan_i = 0; chan_i < args.channels.size(); chan_i++) + { + const size_t dsp = args.channels[chan_i]; + _rx_dsps[dsp]->set_nsamps_per_packet(spp); //seems to be a good place to set this + _rx_dsps[dsp]->setup(args); + //this->program_stream_dest(_mbc[mb].rx_dsp_xports[dsp], args); + my_streamer->set_xport_chan_get_buff(chan_i, boost::bind( + &zero_copy_if::get_recv_buff, xport, _1 + ), true /*flush*/); + my_streamer->set_issue_stream_cmd(chan_i, boost::bind( + &rx_dsp_core_200::issue_stream_command, _rx_dsps[dsp], _1)); + _rx_streamers[dsp] = my_streamer; //store weak pointer + } + + //set the packet threshold to be an entire socket buffer's worth + const size_t packets_per_sock_buff = size_t(50e6/xport->get_recv_frame_size()); + my_streamer->set_alignment_failure_threshold(packets_per_sock_buff); + + //sets all tick and samp rates on this streamer + this->update_rates(); + + return my_streamer; +} + + +/*********************************************************************** + * Transmit streamer + **********************************************************************/ +uhd::tx_streamer::sptr umtrx_impl::get_tx_stream(const uhd::stream_args_t &args) +{ + +}