Compare commits

...

6 Commits

Author SHA1 Message Date
Harald Welte
ba59fcf912 Convert RTP/RTCP/OSMUX I/O from osmo_fd to osmo_io
Converting from osmo_fd to osmo_io allows us to switch to the new
io_uring backend and benefit from related performance benefits.

In a benchmark running 200 concurrent bi-directional voice calls with
GSM-EFR codec, I am observing:

* the code before this patch uses 40..42% of a single core on a
  Ryzen 5950X at 200 calls (=> 200 endpoints with each two connections)

* no increase in CPU utilization before/after this patch, i.e. the
  osmo_io overhead for the osmo_fd backend is insignificant compared
  to the direct osmo_fd mode before

* an almost exactly 50% reduction of CPU utilization when running the
  same osmo-mgw build with LIBOSMO_IO_BACKEND=IO_URING - top shows
  19..21% for the same workload instead of 40..42% with the OSMO_FD
  default backend.

* An increase of about 4 Megabytes in both RSS and VIRT size when
  enabling the OSMO_IO backend.  This is likely the memory-mapped rings.

No memory leakage is observed when using either of the backends.

Change-Id: I8471960d5d8088a70cf105f2f40dfa5d5458169a
2024-03-20 12:43:42 +01:00
Harald Welte
0188aad11c Change msgb ownership in processing of received msgb
The old approach was: rtp_data_net() reads a msgb from the incomging
socket, calls through whatever function chain and in the end free's it.
So none of the intermediate functions was permitted to take msgb
ownership.

This was a good choice as all processing would happen synchronously,
up to the point where that msgb was written on the output RTP socket.

Let's change this from passing msgb ownership throug the whole call
chain, through rx_rtp() to the various *_dispatch_rtp() functions.

This is required for upcoming migration to osmo_io, as in that case the
write (sendto) calls are asynchronous and hence msgb ownership needs
to be transferred.

Change-Id: I6a331f3c6b2eb51ea312ac6ef8c357185ddb79cf
2024-03-20 12:43:40 +01:00
Harald Welte
1ce53ac6a9 remove osmo_fd from mgcp_create_bind()
preparation for osmo_io

Change-Id: I4a3b66a14fdfbc867daca0f0a05f694d5e0d7b66
2024-03-20 12:36:21 +01:00
Harald Welte
179525e07b don't log useless "transcoding disabled" message
The entire mgw has no transcoding support.  So printing that message is
useless to begin with.  And printing it for *every RTP packet* is even
more useless.  Let's remove it.

Change-Id: If0ee2607404afc3a00665a5cf22a9e0eb62eb476
2024-03-19 18:31:48 +01:00
Harald Welte
f12ad38d46 simplify unused transcoding/processing call-back
the processing call-back is working with a raw buffer + length,
while we actually work with struct msgb.  Let's simply pass the msgb
into the call-back, and the call-back can then do what they want with
the contents of that msgb.

Change-Id: I002624f9008726e3d754d48aa2282c38e3b42953
2024-03-19 18:29:02 +01:00
Harald Welte
dd25ba52bd remove strange loop for non-existant transcoding support
The existing support preparing the mgw for transcoding (which doesn't exist)
has some kind of method where the transcoding function might be called
multiple times in a row.  However, as it is not used, it is not entirely
clear how it was intended to work.  Let's remove this unused looping
feature which makes it hard to understand how upcoming osmo_io should
deal with it.

Change-Id: Ie1a629fd31c5ab806fc929d1e6b279c4be5b8246
2024-03-19 18:24:23 +01:00
8 changed files with 352 additions and 264 deletions

View File

@@ -39,3 +39,5 @@ libosmo-mgcp-client deprecate public API New code should no longer use codecs[],
is backwards compat code that moves codecs[] entries, if any, over to
ptmap[], so callers may migrate at own leisure.
osmo-mgw remove cfg Remove VTY config item 'sdp audio fmtp-extra' (see OS#6313)
libosmocore bump_dep; workaround Bump libosmocore version dependency after I68328adb952ca8833ba047cb3b49ccc6f8a1f1b5
has been merged to libosmocore.git; then remove my_msgb_copy_c wrapper function.

View File

@@ -24,6 +24,7 @@
#include <osmocom/core/msgb.h>
#include <osmocom/core/socket.h>
#include <osmocom/core/osmo_io.h>
#include <osmocom/core/write_queue.h>
#include <osmocom/core/timer.h>
#include <osmocom/core/logging.h>
@@ -70,12 +71,10 @@ typedef int (*mgcp_rqnt)(struct mgcp_endpoint *endp, char tone);
/**
* Return:
* < 0 in case no audio was processed
* >= 0 in case audio was processed. The remaining payload
* length will be returned.
* >= 0 in case audio was processed.
*/
typedef int (*mgcp_processing)(struct mgcp_endpoint *endp,
struct mgcp_rtp_end *dst_end,
char *data, int *len, int buf_size);
struct mgcp_rtp_end *dst_end, struct msgb *msg);
struct mgcp_conn_rtp;
@@ -206,6 +205,5 @@ int mgcp_send_reset_ep(struct mgcp_endpoint *endp);
int mgcp_send_reset_all(struct mgcp_config *cfg);
int mgcp_create_bind(const char *source_addr, struct osmo_fd *fd, int port, uint8_t dscp,
uint8_t prio);
int mgcp_udp_send(int fd, const struct osmo_sockaddr *addr, const char *buf, int len);
int mgcp_create_bind(const char *source_addr, int port, uint8_t dscp, uint8_t prio);
int mgcp_udp_send(struct osmo_io_fd *iofd, const struct osmo_sockaddr *addr, const char *buf, int len);

View File

@@ -4,6 +4,7 @@
#include <stdbool.h>
#include <osmocom/core/socket.h>
#include <osmocom/core/osmo_io.h>
#include <osmocom/mgcp/mgcp.h>
@@ -120,8 +121,8 @@ struct mgcp_rtp_end {
bool rfc5993_hr_convert;
/* Each end has a separate socket for RTP and RTCP */
struct osmo_fd rtp;
struct osmo_fd rtcp;
struct osmo_io_fd *rtp;
struct osmo_io_fd *rtcp;
/* local UDP port number of the RTP socket; RTCP is +1 */
int local_port;
@@ -159,8 +160,7 @@ void mgcp_patch_and_count(const struct mgcp_endpoint *endp,
int mgcp_get_local_addr(char *addr, struct mgcp_conn_rtp *conn);
/* payload processing default functions */
int mgcp_rtp_processing_default(struct mgcp_endpoint *endp, struct mgcp_rtp_end *dst_end,
char *data, int *len, int buf_size);
int mgcp_rtp_processing_default(struct mgcp_endpoint *endp, struct mgcp_rtp_end *dst_end, struct msgb *msg);
int mgcp_setup_rtp_processing_default(struct mgcp_endpoint *endp,
struct mgcp_conn_rtp *conn_dst,
@@ -180,7 +180,7 @@ void rtpconn_rate_ctr_add(struct mgcp_conn_rtp *conn_rtp, struct mgcp_endpoint *
int id, int inc);
void rtpconn_rate_ctr_inc(struct mgcp_conn_rtp *conn_rtp, struct mgcp_endpoint *endp,
int id);
void forward_data_tap(int fd, struct mgcp_rtp_tap *tap, struct msgb *msg);
void forward_data_tap(struct osmo_io_fd *iofd, struct mgcp_rtp_tap *tap, struct msgb *msg);
uint32_t mgcp_get_current_ts(unsigned codec_rate);
int amr_oa_bwe_convert(struct mgcp_endpoint *endp, struct msgb *msg, bool target_is_oa);

View File

@@ -106,8 +106,8 @@ static int mgcp_rtp_conn_init(struct mgcp_conn_rtp *conn_rtp, struct mgcp_conn *
/* backpointer to the generic part of the connection */
conn->u.rtp.conn = conn;
end->rtp.fd = -1;
end->rtcp.fd = -1;
end->rtp = NULL;
end->rtcp = NULL;
memset(&end->addr, 0, sizeof(end->addr));
end->rtcp_port = 0;

View File

@@ -301,7 +301,6 @@ static void sync_frame_out_cb(void *user_data, const ubit_t *bits, unsigned int
mgcp_send(endp, 1, NULL, msg, &conn_dst->u.rtp, &conn_dst->u.rtp);
msgb_free(msg);
return;
skip:
rate_ctr_inc(rate_ctr_group_get_ctr(rate_ctrs, E1_I460_TRAU_RX_FAIL_CTR));

View File

@@ -311,7 +311,6 @@ static int bridge_iuup_to_rtp_peer(struct mgcp_conn_rtp *conn_rtp_src, struct mg
};
rc = mgcp_send(conn_rtp_dst->conn->endp, true, NULL, msg, conn_rtp_src, conn_rtp_dst);
msgb_free(msg);
return rc;
}
@@ -513,10 +512,9 @@ static int mgcp_send_iuup(struct mgcp_endpoint *endp, struct msgb *msg,
osmo_sockaddr_port(&rtp_end->addr.u.sa), ntohs(rtp_end->rtcp_port));
/* Forward a copy of the RTP data to a debug ip/port */
forward_data_tap(rtp_end->rtp.fd, &conn_src->tap_out,
msg);
forward_data_tap(rtp_end->rtp, &conn_src->tap_out, msg);
len = mgcp_udp_send(rtp_end->rtp.fd, &rtp_end->addr, (char *)hdr, buflen);
len = mgcp_udp_send(rtp_end->rtp, &rtp_end->addr, (char *)hdr, buflen);
if (len <= 0)
return len;
@@ -640,7 +638,7 @@ free_ret:
}
/* Build IuUP RNL Data primitive from msg containing an incoming RTP pkt from
* peer and send it down the IuUP layer towards the destination as IuUP/RTP: */
* peer and send it down the IuUP layer towards the destination as IuUP/RTP. Takes ownership of msg. */
int mgcp_conn_iuup_send_rtp(struct mgcp_conn_rtp *conn_src_rtp, struct mgcp_conn_rtp *conn_dest_rtp, struct msgb *msg)
{
struct osmo_iuup_rnl_prim *irp;

View File

@@ -70,6 +70,18 @@ void rtpconn_rate_ctr_inc(struct mgcp_conn_rtp *conn_rtp, struct mgcp_endpoint *
rtpconn_rate_ctr_add(conn_rtp, endp, id, 1);
}
/* wrapper around libosmocore msgb_copy_c, which [at least before libosmocore.git Change-Id
* I68328adb952ca8833ba047cb3b49ccc6f8a1f1b5] doesn't copy the cb */
static inline struct msgb *mgw_msgb_copy_c(void *ctx, struct msgb *msg, const char *name)
{
struct msgb *msg2 = msgb_copy_c(ctx, msg, name);
if (OSMO_UNLIKELY(!msg2))
return NULL;
memcpy(msg2->cb, msg->cb, sizeof(msg2->cb));
return msg2;
}
static int rx_rtp(struct msgb *msg);
bool mgcp_rtp_end_remote_addr_available(const struct mgcp_rtp_end *rtp_end)
@@ -404,15 +416,12 @@ static int align_rtp_timestamp_offset(const struct mgcp_endpoint *endp,
/*! dummy callback to disable transcoding (see also cfg->rtp_processing_cb).
* \param[in] associated endpoint.
* \param[in] destination RTP end.
* \param[in,out] pointer to buffer with voice data.
* \param[in] voice data length.
* \param[in] maximum size of caller provided voice data buffer.
* \param[in,out] msg message bufffer containing data. Function might change length.
* \returns ignores input parameters, return always 0. */
int mgcp_rtp_processing_default(struct mgcp_endpoint *endp,
struct mgcp_rtp_end *dst_end,
char *data, int *len, int buf_size)
struct msgb *msg)
{
LOGPENDP(endp, DRTP, LOGL_DEBUG, "transcoding disabled\n");
return 0;
}
@@ -785,16 +794,18 @@ static int amr_oa_check(char *data, int len)
/* Forward data to a debug tap. This is debug function that is intended for
* debugging the voice traffic with tools like gstreamer */
void forward_data_tap(int fd, struct mgcp_rtp_tap *tap, struct msgb *msg)
void forward_data_tap(struct osmo_io_fd *iofd, struct mgcp_rtp_tap *tap, struct msgb *msg)
{
int rc;
if (!tap->enabled)
return;
rc = sendto(fd, msgb_data(msg), msgb_length(msg), 0, (struct sockaddr *)&tap->forward,
sizeof(tap->forward));
struct msgb *msg2 = msgb_copy(msg, "RTP TAP Tx");
if (!msg2)
return;
rc = osmo_iofd_sendto_msgb(iofd, msg2, 0, &tap->forward);
if (rc < 0)
LOGP(DRTP, LOGL_ERROR,
"Forwarding tapped (debug) voice data failed.\n");
@@ -966,7 +977,7 @@ static int check_rtp(struct mgcp_conn_rtp *conn_src, struct msgb *msg)
return 0;
}
/*! Dispatch msg bridged from the sister conn in the endpoint.
/*! Dispatch msg bridged from the sister conn in the endpoint. Takes ownership of msgb.
* \param[in] conn_dst The destination conn that should handle and transmit the content to
* its peer outside MGW.
* \param[in] msg msgb containing an RTP pkt received by the sister conn in the endpoint,
@@ -988,8 +999,10 @@ static int mgcp_conn_rtp_dispatch_rtp(struct mgcp_conn_rtp *conn_dst, struct msg
/* Before we try to deliver the packet, we check if the destination
* port and IP-Address make sense at all. If not, we will be unable
* to deliver the packet. */
if (check_rtp_destin(conn_dst) != 0)
if (check_rtp_destin(conn_dst) != 0) {
msgb_free(msg);
return -1;
}
/* Depending on the RTP connection type, deliver the RTP packet to the
* destination connection. */
@@ -1024,33 +1037,38 @@ static int mgcp_conn_rtp_dispatch_rtp(struct mgcp_conn_rtp *conn_dst, struct msg
* be discarded, this should not happen, normally the MGCP type
* should be properly set */
LOGPENDP(endp, DRTP, LOGL_ERROR, "bad MGCP type -- data discarded!\n");
msgb_free(msg);
return -1;
}
/*! send udp packet.
* \param[in] fd associated file descriptor.
/*! send message buffer via udp socket.
* \param[in] iofd associated file descriptor.
* \param[in] addr destination ip-address.
* \param[in] msg message buffer that holds the data to be send.
* \returns bytes sent, -1 on error. */
static int mgcp_udp_send_msg(struct osmo_io_fd *iofd, const struct osmo_sockaddr *addr, struct msgb *msg)
{
LOGP(DRTP, LOGL_DEBUG, "sending %i bytes length packet to %s ...\n", msgb_length(msg),
osmo_sockaddr_to_str(addr));
return osmo_iofd_sendto_msgb(iofd, msg, 0, addr);
}
/*! send udp packet from raw buffer/length.
* \param[in] iofd associated file descriptor.
* \param[in] addr destination ip-address.
* \param[in] buf buffer that holds the data to be send.
* \param[in] len length of the data to be sent.
* \returns bytes sent, -1 on error. */
int mgcp_udp_send(int fd, const struct osmo_sockaddr *addr, const char *buf, int len)
int mgcp_udp_send(struct osmo_io_fd *iofd, const struct osmo_sockaddr *addr, const char *buf, int len)
{
char ipbuf[INET6_ADDRSTRLEN];
size_t addr_len;
struct msgb *msg = msgb_alloc_c(iofd, len, "mgcp_udp_send");
if (!msg)
return -ENOMEM;
memcpy(msg->tail, buf, len);
msgb_put(msg, len);
LOGP(DRTP, LOGL_DEBUG,
"sending %i bytes length packet to %s:%u ...\n", len,
osmo_sockaddr_ntop(&addr->u.sa, ipbuf),
osmo_sockaddr_port(&addr->u.sa));
if (addr->u.sa.sa_family == AF_INET6) {
addr_len = sizeof(addr->u.sin6);
} else {
addr_len = sizeof(addr->u.sin);
}
return sendto(fd, buf, len, 0, &addr->u.sa, addr_len);
return mgcp_udp_send_msg(iofd, addr, msg);
}
/*! send RTP dummy packet (to keep NAT connection open).
@@ -1078,8 +1096,7 @@ int mgcp_send_dummy(struct mgcp_endpoint *endp, struct mgcp_conn_rtp *conn)
if (mgcp_conn_rtp_is_iuup(conn))
rc = mgcp_conn_iuup_send_dummy(conn);
else
rc = mgcp_udp_send(conn->end.rtp.fd, &conn->end.addr,
rtp_dummy_payload, sizeof(rtp_dummy_payload));
rc = mgcp_udp_send(conn->end.rtp, &conn->end.addr, rtp_dummy_payload, sizeof(rtp_dummy_payload));
if (rc == -1)
goto failed;
@@ -1090,7 +1107,7 @@ int mgcp_send_dummy(struct mgcp_endpoint *endp, struct mgcp_conn_rtp *conn)
was_rtcp = 1;
rtcp_addr = conn->end.addr;
osmo_sockaddr_set_port(&rtcp_addr.u.sa, ntohs(conn->end.rtcp_port));
rc = mgcp_udp_send(conn->end.rtcp.fd, &rtcp_addr,
rc = mgcp_udp_send(conn->end.rtcp, &rtcp_addr,
rtp_dummy_payload, sizeof(rtp_dummy_payload));
if (rc >= 0)
@@ -1104,7 +1121,7 @@ failed:
return -1;
}
/*! Send RTP/RTCP data to a specified destination connection.
/*! Send RTP/RTCP data to a specified destination connection. Takes ownership of msg.
* \param[in] endp associated endpoint (for configuration, logging).
* \param[in] is_rtp flag to specify if the packet is of type RTP or RTCP.
* \param[in] addr spoofed source address (set to NULL to disable).
@@ -1143,6 +1160,7 @@ int mgcp_send(struct mgcp_endpoint *endp, int is_rtp, struct osmo_sockaddr *addr
if (is_rtp && !mgcp_conn_rtp_is_iuup(conn_src)) {
if (mgcp_patch_pt(conn_dst, msg) < 0) {
LOGPENDP(endp, DRTP, LOGL_NOTICE, "unable to patch payload type RTP packet, discarding...\n");
msgb_free(msg);
return -EINVAL;
}
}
@@ -1168,70 +1186,66 @@ int mgcp_send(struct mgcp_endpoint *endp, int is_rtp, struct osmo_sockaddr *addr
osmo_sockaddr_port(&rtp_end->addr.u.sa), ntohs(rtp_end->rtcp_port)
);
} else if (is_rtp) {
int cont;
int nbytes = 0;
int buflen = msgb_length(msg);
/* Make sure we have a valid RTP header, in cases where no RTP
* header is present, we will generate one. */
gen_rtp_header(msg, rtp_end, rtp_state);
do {
/* Run transcoder */
cont = endp->trunk->cfg->rtp_processing_cb(endp, rtp_end, (char *)msgb_data(msg), &buflen, RTP_BUF_SIZE);
if (cont < 0)
break;
/* Run transcoder */
rc = endp->trunk->cfg->rtp_processing_cb(endp, rtp_end, msg);
if (rc < 0) {
LOGPENDP(endp, DRTP, LOGL_ERROR, "Error %d during transcoding\n", rc);
msgb_free(msg);
return rc;
}
if (addr)
mgcp_patch_and_count(endp, rtp_state, rtp_end,
addr, msg);
if (addr)
mgcp_patch_and_count(endp, rtp_state, rtp_end, addr, msg);
if (mgcp_conn_rtp_is_iuup(conn_dst) || mgcp_conn_rtp_is_iuup(conn_src)) {
/* the iuup code will correctly transform to the correct AMR mode */
} else if (mgcp_codec_amr_align_mode_is_indicated(conn_dst->end.codec)) {
rc = amr_oa_bwe_convert(endp, msg,
conn_dst->end.codec->param.amr_octet_aligned);
if (rc < 0) {
LOGPENDP(endp, DRTP, LOGL_ERROR,
"Error in AMR octet-aligned <-> bandwidth-efficient mode conversion (target=%s)\n",
conn_dst->end.codec->param.amr_octet_aligned ? "octet-aligned" : "bandwidth-efficient");
break;
}
} else if (rtp_end->rfc5993_hr_convert &&
strcmp(conn_src->end.codec->subtype_name, "GSM-HR-08") == 0) {
rc = rfc5993_hr_convert(endp, msg);
if (rc < 0) {
LOGPENDP(endp, DRTP, LOGL_ERROR, "Error while converting to GSM-HR-08\n");
break;
}
if (mgcp_conn_rtp_is_iuup(conn_dst) || mgcp_conn_rtp_is_iuup(conn_src)) {
/* the iuup code will correctly transform to the correct AMR mode */
} else if (mgcp_codec_amr_align_mode_is_indicated(conn_dst->end.codec)) {
rc = amr_oa_bwe_convert(endp, msg, conn_dst->end.codec->param.amr_octet_aligned);
if (rc < 0) {
LOGPENDP(endp, DRTP, LOGL_ERROR,
"Error in AMR octet-aligned <-> bandwidth-efficient mode conversion (target=%s)\n",
conn_dst->end.codec->param.amr_octet_aligned ? "octet-aligned" : "bandwidth-efficient");
msgb_free(msg);
return rc;
}
} else if (rtp_end->rfc5993_hr_convert &&
strcmp(conn_src->end.codec->subtype_name, "GSM-HR-08") == 0) {
rc = rfc5993_hr_convert(endp, msg);
if (rc < 0) {
LOGPENDP(endp, DRTP, LOGL_ERROR, "Error while converting to GSM-HR-08\n");
msgb_free(msg);
return rc;
}
}
LOGPENDP(endp, DRTP, LOGL_DEBUG,
"process/send to %s %s "
"rtp_port:%u rtcp_port:%u\n",
dest_name,
osmo_sockaddr_ntop(&rtp_end->addr.u.sa, ipbuf),
osmo_sockaddr_port(&rtp_end->addr.u.sa), ntohs(rtp_end->rtcp_port)
);
LOGPENDP(endp, DRTP, LOGL_DEBUG,
"process/send to %s %s "
"rtp_port:%u rtcp_port:%u\n",
dest_name,
osmo_sockaddr_ntop(&rtp_end->addr.u.sa, ipbuf),
osmo_sockaddr_port(&rtp_end->addr.u.sa), ntohs(rtp_end->rtcp_port)
);
/* Forward a copy of the RTP data to a debug ip/port */
forward_data_tap(rtp_end->rtp.fd, &conn_src->tap_out,
msg);
/* Forward a copy of the RTP data to a debug ip/port */
forward_data_tap(rtp_end->rtp, &conn_src->tap_out, msg);
len = mgcp_udp_send(rtp_end->rtp.fd, &rtp_end->addr,
(char *)msgb_data(msg), msgb_length(msg));
len = msgb_length(msg);
if (len <= 0)
return len;
rc = mgcp_udp_send_msg(rtp_end->rtp, &rtp_end->addr, msg);
if (rc < 0) {
msgb_free(msg);
return rc;
}
rtpconn_rate_ctr_inc(conn_dst, endp, RTP_PACKETS_TX_CTR);
rtpconn_rate_ctr_add(conn_dst, endp, RTP_OCTETS_TX_CTR, len);
rtp_state->alt_rtp_tx_sequence++;
rtpconn_rate_ctr_inc(conn_dst, endp, RTP_PACKETS_TX_CTR);
rtpconn_rate_ctr_add(conn_dst, endp, RTP_OCTETS_TX_CTR, len);
rtp_state->alt_rtp_tx_sequence++;
nbytes += len;
buflen = cont;
} while (buflen > 0);
return nbytes;
return 0;
} else if (!trunk->omit_rtcp) {
struct osmo_sockaddr rtcp_addr = rtp_end->addr;
osmo_sockaddr_set_port(&rtcp_addr.u.sa, rtp_end->rtcp_port);
@@ -1242,19 +1256,54 @@ int mgcp_send(struct mgcp_endpoint *endp, int is_rtp, struct osmo_sockaddr *addr
osmo_sockaddr_port(&rtcp_addr.u.sa)
);
len = mgcp_udp_send(rtp_end->rtcp.fd, &rtcp_addr,
(char *)msgb_data(msg), msgb_length(msg));
len = msgb_length(msg);
rc = mgcp_udp_send_msg(rtp_end->rtcp, &rtcp_addr, msg);
if (rc < 0) {
msgb_free(msg);
return rc;
}
rtpconn_rate_ctr_inc(conn_dst, endp, RTP_PACKETS_TX_CTR);
rtpconn_rate_ctr_add(conn_dst, endp, RTP_OCTETS_TX_CTR, len);
rtp_state->alt_rtp_tx_sequence++;
return len;
return 0;
}
msgb_free(msg);
return 0;
}
/*! determine if there's only a single recipient in endp for data received via conn_src.
* The function returns NULL in case there is no recipient, or in case there are multiple recipients.
* \param endp The MGCP endpoint whose connections to analyze
* \param conn_src The source MGCP connection [which shall not count in results]
* \returns recipient donnection if there is only one; NULL in case there are multiple */
static struct mgcp_conn *rtpbridge_get_only_recipient(struct mgcp_endpoint *endp, struct mgcp_conn *conn_src)
{
struct mgcp_conn *conn_ret = NULL;
struct mgcp_conn *conn_dst;
llist_for_each_entry(conn_dst, &endp->conns, entry) {
if (conn_dst == conn_src)
continue;
switch (conn_dst->mode) {
case MGCP_CONN_SEND_ONLY:
case MGCP_CONN_RECV_SEND:
case MGCP_CONN_CONFECHO:
if (conn_ret)
return NULL;
conn_ret = conn_dst;
break;
default:
break;
}
}
return conn_ret;
}
/*! Dispatch incoming RTP packet to opposite RTP connection.
* \param[in] msg Message buffer to bridge, coming from source connection.
* msg shall contain "struct osmo_rtp_msg_ctx *" attached in
@@ -1312,23 +1361,44 @@ int mgcp_dispatch_rtp_bridge_cb(struct msgb *msg)
return rc;
}
/* If the mode is "confecho", send RTP back to the sender. */
if (conn->mode == MGCP_CONN_CONFECHO)
rc = mgcp_conn_rtp_dispatch_rtp(conn_src, msg);
/* All the use cases above are 1:1 where we have one source msgb and we're sending that to one
* destination. msgb ownership had been passed to the respective _*dospatch_rtp() function.
* In the cases below, we actually [can] have multiple recipients, so we copy the original msgb
* for each of the recipients. */
/* Dispatch RTP packet to all other connection(s) that send audio. */
llist_for_each_entry(conn_dst, &endp->conns, entry) {
if (conn_dst == conn)
continue;
switch (conn_dst->mode) {
case MGCP_CONN_SEND_ONLY:
case MGCP_CONN_RECV_SEND:
case MGCP_CONN_CONFECHO:
rc = mgcp_conn_rtp_dispatch_rtp(&conn_dst->u.rtp, msg);
break;
default:
break;
/* If the mode is "confecho", send RTP back to the sender. */
if (conn->mode == MGCP_CONN_CONFECHO) {
struct msgb *msg2 = mgw_msgb_copy_c(conn, msg, "RTP confecho");
if (OSMO_LIKELY(msg2))
rc = mgcp_conn_rtp_dispatch_rtp(conn_src, msg2);
}
conn_dst = rtpbridge_get_only_recipient(endp, conn);
if (OSMO_LIKELY(conn_dst)) {
/* we only have a single recipient and cann hence send the original msgb without copying */
rc = mgcp_conn_rtp_dispatch_rtp(&conn_dst->u.rtp, msg);
} else {
/* Dispatch RTP packet to all other connection(s) that send audio. */
llist_for_each_entry(conn_dst, &endp->conns, entry) {
struct msgb *msg2;
if (conn_dst == conn)
continue;
switch (conn_dst->mode) {
case MGCP_CONN_SEND_ONLY:
case MGCP_CONN_RECV_SEND:
case MGCP_CONN_CONFECHO:
/* we have multiple recipients and must make copies for each recipient */
msg2 = mgw_msgb_copy_c(conn_dst, msg, "RTP Tx copy");
if (OSMO_LIKELY(msg2))
rc = mgcp_conn_rtp_dispatch_rtp(&conn_dst->u.rtp, msg2);
break;
default:
break;
}
}
/* as we only sent copies in the previous llist_for_each_entry() loop, we must free the
* original one */
msgb_free(msg);
}
return rc;
}
@@ -1400,7 +1470,7 @@ void mgcp_cleanup_e1_bridge_cb(struct mgcp_endpoint *endp, struct mgcp_conn *con
}
/* Handle incoming RTP data from NET */
static int rtp_data_net(struct osmo_fd *fd, unsigned int what)
static void rtp_recvfrom_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct osmo_sockaddr *saddr)
{
/* NOTE: This is a generic implementation. RTP data is received. In
* case of loopback the data is just sent back to its origin. All
@@ -1411,49 +1481,34 @@ static int rtp_data_net(struct osmo_fd *fd, unsigned int what)
struct mgcp_conn_rtp *conn_src;
struct mgcp_endpoint *endp;
struct osmo_sockaddr addr;
socklen_t slen = sizeof(addr);
char ipbuf[INET6_ADDRSTRLEN];
int ret;
enum rtp_proto proto;
struct osmo_rtp_msg_ctx *mc;
struct msgb *msg;
int rc;
conn_src = (struct mgcp_conn_rtp *)fd->data;
conn_src = (struct mgcp_conn_rtp *) osmo_iofd_get_data(iofd);
OSMO_ASSERT(conn_src);
endp = conn_src->conn->endp;
OSMO_ASSERT(endp);
msg = msgb_alloc_c(endp->trunk, RTP_BUF_SIZE, "RTP-rx");
proto = (fd == &conn_src->end.rtp)? MGCP_PROTO_RTP : MGCP_PROTO_RTCP;
proto = (iofd == conn_src->end.rtp)? MGCP_PROTO_RTP : MGCP_PROTO_RTCP;
ret = recvfrom(fd->fd, msgb_data(msg), msg->data_len, 0, (struct sockaddr *)&addr.u.sa, &slen);
if (ret <= 0) {
LOG_CONN_RTP(conn_src, LOGL_ERROR, "recvfrom error: %s\n", strerror(errno));
rc = -1;
goto out;
if (res <= 0) {
LOG_CONN_RTP(conn_src, LOGL_ERROR, "recvfrom error: %s\n", strerror(-res));
goto out_free;
}
msgb_put(msg, ret);
LOG_CONN_RTP(conn_src, LOGL_DEBUG, "%s: rx %u bytes from %s:%u\n",
LOG_CONN_RTP(conn_src, LOGL_DEBUG, "%s: rx %u bytes from %s\n",
proto == MGCP_PROTO_RTP ? "RTP" : "RTCP",
msgb_length(msg), osmo_sockaddr_ntop(&addr.u.sa, ipbuf),
osmo_sockaddr_port(&addr.u.sa));
msgb_length(msg), osmo_sockaddr_to_str(saddr));
if ((proto == MGCP_PROTO_RTP && check_rtp(conn_src, msg))
|| (proto == MGCP_PROTO_RTCP && check_rtcp(conn_src, msg))) {
/* Logging happened in the two check_ functions */
rc = -1;
goto out;
goto out_free;
}
if (mgcp_is_rtp_dummy_payload(msg)) {
LOG_CONN_RTP(conn_src, LOGL_DEBUG, "rx dummy packet (dropped)\n");
rc = 0;
goto out;
goto out_free;
}
/* Since the msgb remains owned and freed by this function, the msg ctx data struct can just be on the stack and
@@ -1462,7 +1517,7 @@ static int rtp_data_net(struct osmo_fd *fd, unsigned int what)
*mc = (struct osmo_rtp_msg_ctx){
.proto = proto,
.conn_src = conn_src,
.from_addr = &addr,
.from_addr = (struct osmo_sockaddr *) saddr,
};
LOG_CONN_RTP(conn_src, LOGL_DEBUG, "msg ctx: %d %p %s\n",
mc->proto, mc->conn_src,
@@ -1477,16 +1532,17 @@ static int rtp_data_net(struct osmo_fd *fd, unsigned int what)
/* FIXME: count RTP and RTCP separately, also count IuUP payload-less separately */
/* Forward a copy of the RTP data to a debug ip/port */
forward_data_tap(fd->fd, &conn_src->tap_in, msg);
forward_data_tap(iofd, &conn_src->tap_in, msg);
rc = rx_rtp(msg);
rx_rtp(msg);
return;
out:
out_free:
msgb_free(msg);
return rc;
}
/* Note: This function is able to handle RTP and RTCP */
/* Note: This function is able to handle RTP and RTCP. msgb ownership is transferred, so this function or its
* downstream consumers must make sure to [eventually] free the msgb. */
static int rx_rtp(struct msgb *msg)
{
struct osmo_rtp_msg_ctx *mc = OSMO_RTP_MSG_CTX(msg);
@@ -1499,7 +1555,7 @@ static int rx_rtp(struct msgb *msg)
/* Check if the origin of the RTP packet seems plausible */
if (!trunk->rtp_accept_all && check_rtp_origin(conn_src, from_addr))
return -1;
goto out_free;
/* Handle AMR frame format conversion (octet-aligned vs. bandwith-efficient) */
if (mc->proto == MGCP_PROTO_RTP
@@ -1509,13 +1565,13 @@ static int rx_rtp(struct msgb *msg)
* communicated via SDP when the connection was created/modfied. */
int oa = amr_oa_check((char*)msgb_data(msg), msgb_length(msg));
if (oa < 0)
return -1;
goto out_free;
if (((bool)oa) != conn_src->end.codec->param.amr_octet_aligned) {
LOG_CONN_RTP(conn_src, LOGL_NOTICE,
"rx_rtp(%u bytes): Expected RTP AMR octet-aligned=%u but got octet-aligned=%u."
" check the config of your call-agent!\n",
msgb_length(msg), conn_src->end.codec->param.amr_octet_aligned, oa);
return -1;
goto out_free;
}
}
@@ -1524,17 +1580,36 @@ static int rx_rtp(struct msgb *msg)
/* Execute endpoint specific implementation that handles the
* dispatching of the RTP data */
return conn->endp->type->dispatch_rtp_cb(msg);
out_free:
msgb_free(msg);
return -1;
}
static void rtp_sendto_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct osmo_sockaddr *daddr)
{
/* nothing; osmo_io takes care of msgb_free */
if (res < 0) {
struct mgcp_conn_rtp *conn_rtp = (struct mgcp_conn_rtp *) osmo_iofd_get_data(iofd);
int priv_nr = osmo_iofd_get_priv_nr(iofd);
char errbuf[129];
strerror_r(-res, errbuf, sizeof(errbuf));
LOG_CONN_RTP(conn_rtp, LOGL_NOTICE, "%s sendto(%s) failed: %s\n", priv_nr ? "RTCP" : "RTP",
osmo_sockaddr_to_str(daddr), errbuf);
}
}
static const struct osmo_io_ops rtp_ioops = {
.recvfrom_cb = rtp_recvfrom_cb,
.sendto_cb = rtp_sendto_cb,
};
/*! bind RTP port to osmo_fd.
* \param[in] source_addr source (local) address to bind on.
* \param[in] fd associated file descriptor.
* \param[in] port to bind on.
* \param[in] dscp IP DSCP value to use.
* \param[in] prio socket priority to use.
* \returns 0 on success, -1 on ERROR. */
int mgcp_create_bind(const char *source_addr, struct osmo_fd *fd, int port, uint8_t dscp,
uint8_t prio)
* \returns file descriptor on success, -1 on ERROR. */
int mgcp_create_bind(const char *source_addr, int port, uint8_t dscp, uint8_t prio)
{
int rc;
@@ -1546,43 +1621,46 @@ int mgcp_create_bind(const char *source_addr, struct osmo_fd *fd, int port, uint
source_addr, port);
return -1;
}
fd->fd = rc;
LOGP(DRTP, LOGL_DEBUG, "created socket + bound UDP port (%s:%i).\n", source_addr, port);
return 0;
return rc;
}
/* Bind RTP and RTCP port (helper function for mgcp_bind_net_rtp_port()) */
static int bind_rtp(struct mgcp_config *cfg, const char *source_addr,
struct mgcp_rtp_end *rtp_end, struct mgcp_endpoint *endp)
{
int rc, rtp_fd, rtcp_fd;
/* NOTE: The port that is used for RTCP is the RTP port incremented by one
* (e.g. RTP-Port = 16000 ==> RTCP-Port = 16001) */
if (mgcp_create_bind(source_addr, &rtp_end->rtp, rtp_end->local_port,
cfg->endp_dscp, cfg->endp_priority) != 0) {
rc = mgcp_create_bind(source_addr, rtp_end->local_port, cfg->endp_dscp, cfg->endp_priority);
if (rc < 0) {
LOGPENDP(endp, DRTP, LOGL_ERROR,
"failed to create RTP port: %s:%d\n",
source_addr, rtp_end->local_port);
goto cleanup0;
}
rtp_fd = rc;
if (mgcp_create_bind(source_addr, &rtp_end->rtcp, rtp_end->local_port + 1,
cfg->endp_dscp, cfg->endp_priority) != 0) {
rc = mgcp_create_bind(source_addr, rtp_end->local_port + 1, cfg->endp_dscp, cfg->endp_priority);
if (rc < 0) {
LOGPENDP(endp, DRTP, LOGL_ERROR,
"failed to create RTCP port: %s:%d\n",
source_addr, rtp_end->local_port + 1);
goto cleanup1;
}
rtcp_fd = rc;
if (osmo_fd_register(&rtp_end->rtp) != 0) {
if (osmo_iofd_register(rtp_end->rtp, rtp_fd) < 0) {
LOGPENDP(endp, DRTP, LOGL_ERROR,
"failed to register RTP port %d\n",
rtp_end->local_port);
goto cleanup2;
}
if (osmo_fd_register(&rtp_end->rtcp) != 0) {
if (osmo_iofd_register(rtp_end->rtcp, rtcp_fd) != 0) {
LOGPENDP(endp, DRTP, LOGL_ERROR,
"failed to register RTCP port %d\n",
rtp_end->local_port + 1);
@@ -1592,13 +1670,11 @@ static int bind_rtp(struct mgcp_config *cfg, const char *source_addr,
return 0;
cleanup3:
osmo_fd_unregister(&rtp_end->rtp);
osmo_iofd_unregister(rtp_end->rtp);
cleanup2:
close(rtp_end->rtcp.fd);
rtp_end->rtcp.fd = -1;
close(rtcp_fd);
cleanup1:
close(rtp_end->rtp.fd);
rtp_end->rtp.fd = -1;
close(rtp_fd);
cleanup0:
return -1;
}
@@ -1617,7 +1693,8 @@ int mgcp_bind_net_rtp_port(struct mgcp_endpoint *endp, int rtp_port,
snprintf(name, sizeof(name), "%s-%s", conn->conn->name, conn->conn->id);
end = &conn->end;
if (end->rtp.fd != -1 || end->rtcp.fd != -1) {
if ((end->rtp && osmo_iofd_get_fd(end->rtp) != -1) ||
(end->rtcp && osmo_iofd_get_fd(end->rtcp) != -1)) {
LOGPENDP(endp, DRTP, LOGL_ERROR, "%u was already bound on conn:%s\n",
rtp_port, mgcp_conn_dump(conn->conn));
@@ -1630,8 +1707,18 @@ int mgcp_bind_net_rtp_port(struct mgcp_endpoint *endp, int rtp_port,
}
end->local_port = rtp_port;
osmo_fd_setup(&end->rtp, -1, OSMO_FD_READ, rtp_data_net, conn, 0);
osmo_fd_setup(&end->rtcp, -1, OSMO_FD_READ, rtp_data_net, conn, 0);
end->rtp = osmo_iofd_setup(conn->conn, -1, name, OSMO_IO_FD_MODE_RECVFROM_SENDTO, &rtp_ioops, conn);
if (!end->rtp)
return -EIO;
osmo_iofd_set_alloc_info(end->rtp, RTP_BUF_SIZE, 0);
end->rtcp = osmo_iofd_setup(conn->conn, -1, name, OSMO_IO_FD_MODE_RECVFROM_SENDTO, &rtp_ioops, conn);
if (!end->rtcp) {
osmo_iofd_free(end->rtp);
end->rtp = NULL;
return -EIO;
}
osmo_iofd_set_alloc_info(end->rtcp, RTP_BUF_SIZE, 0);
osmo_iofd_set_priv_nr(end->rtcp, 1); /* we use priv_nr as identifier for RTCP */
return bind_rtp(endp->trunk->cfg, conn->end.local_addr, end, endp);
}
@@ -1640,15 +1727,13 @@ int mgcp_bind_net_rtp_port(struct mgcp_endpoint *endp, int rtp_port,
* \param[in] end RTP end */
void mgcp_free_rtp_port(struct mgcp_rtp_end *end)
{
if (end->rtp.fd != -1) {
osmo_fd_unregister(&end->rtp);
close(end->rtp.fd);
end->rtp.fd = -1;
if (end->rtp) {
osmo_iofd_free(end->rtp);
end->rtp = NULL;
}
if (end->rtcp.fd != -1) {
osmo_fd_unregister(&end->rtcp);
close(end->rtcp.fd);
end->rtcp.fd = -1;
if (end->rtcp) {
osmo_iofd_free(end->rtcp);
end->rtcp = NULL;
}
}

View File

@@ -13,9 +13,11 @@
#include <string.h> /* for memcpy */
#include <stdlib.h> /* for abs */
#include <inttypes.h> /* for PRIu64 */
#include <unistd.h> /* for PRIu64 */
#include <netinet/in.h>
#include <osmocom/core/msgb.h>
#include <osmocom/core/socket.h>
#include <osmocom/core/osmo_io.h>
#include <osmocom/core/talloc.h>
#include <osmocom/netif/osmux.h>
@@ -30,8 +32,8 @@
#include <osmocom/mgcp/mgcp_endp.h>
#include <osmocom/mgcp/mgcp_trunk.h>
static struct osmo_fd osmux_fd_v4;
static struct osmo_fd osmux_fd_v6;
static struct osmo_io_fd *osmux_fd_v4;
static struct osmo_io_fd *osmux_fd_v6;
static LLIST_HEAD(osmux_handle_list);
@@ -76,34 +78,31 @@ static void rtpconn_osmux_rate_ctr_inc(struct mgcp_conn_rtp *conn_rtp, int id)
static void osmux_deliver_cb(struct msgb *batch_msg, void *data)
{
struct osmux_handle *handle = data;
socklen_t dest_len;
int rc, fd;
struct mgcp_trunk *trunk = (struct mgcp_trunk *)osmux_fd_v4.data;
int rc;
struct osmo_io_fd *iofd;
struct mgcp_trunk *trunk = (struct mgcp_trunk *) osmo_iofd_get_data(osmux_fd_v4);
struct rate_ctr_group *all_osmux_stats = trunk->ratectr.all_osmux_conn_stats;
switch (handle->rem_addr.u.sa.sa_family) {
case AF_INET6:
dest_len = sizeof(handle->rem_addr.u.sin6);
fd = osmux_fd_v6.fd;
iofd = osmux_fd_v6;
break;
case AF_INET:
default:
dest_len = sizeof(handle->rem_addr.u.sin);
fd = osmux_fd_v4.fd;
iofd = osmux_fd_v4;
break;
}
rc = sendto(fd, batch_msg->data, batch_msg->len, 0,
(struct sockaddr *)&handle->rem_addr.u.sa, dest_len);
rc = osmo_iofd_sendto_msgb(iofd, batch_msg, 0, &handle->rem_addr);
if (rc < 0) {
char errbuf[129];
strerror_r(errno, errbuf, sizeof(errbuf));
strerror_r(-rc, errbuf, sizeof(errbuf));
LOGP(DOSMUX, LOGL_NOTICE, "osmux sendto(%s) failed: %s\n",
osmo_sockaddr_to_str(&handle->rem_addr), errbuf);
rate_ctr_inc(rate_ctr_group_get_ctr(all_osmux_stats, OSMUX_DROPPED_PACKETS_CTR));
msgb_free(batch_msg);
} else {
rate_ctr_inc(rate_ctr_group_get_ctr(all_osmux_stats, OSMUX_PACKETS_TX_CTR));
}
msgb_free(batch_msg);
}
/* Lookup existing OSMUX handle for specified destination address. */
@@ -204,17 +203,17 @@ osmux_handle_find_or_create(const struct mgcp_trunk *trunk, const struct osmo_so
return h->in;
}
/*! send RTP packet through OSMUX connection.
/*! send RTP packet through OSMUX connection. Takes ownership of msg.
* \param[in] conn associated RTP connection
* \param[in] msg msgb containing an RTP AMR packet
* \returns 0 on success, -1 on ERROR */
int conn_osmux_send_rtp(struct mgcp_conn_rtp *conn, struct msgb *msg)
{
int ret;
struct msgb *msg2;
if (!conn->end.output_enabled) {
rtpconn_osmux_rate_ctr_inc(conn, OSMUX_RTP_PACKETS_TX_DROPPED_CTR);
msgb_free(msg);
return -1;
}
@@ -222,22 +221,19 @@ int conn_osmux_send_rtp(struct mgcp_conn_rtp *conn, struct msgb *msg)
LOGPCONN(conn->conn, DOSMUX, LOGL_INFO, "forwarding RTP to Osmux conn not yet enabled, dropping (cid=%d)\n",
conn->osmux.remote_cid);
rtpconn_osmux_rate_ctr_inc(conn, OSMUX_RTP_PACKETS_TX_DROPPED_CTR);
msgb_free(msg);
return -1;
}
/* msg is not owned by us and will be freed by the caller stack upon return: */
msg2 = msgb_copy_c(conn->conn, msg, "osmux-rtp-send");
if (!msg2)
return -1;
/* Osmux implementation works with AMR OA only, make sure we convert to it if needed: */
if (amr_oa_bwe_convert(conn->conn->endp, msg2, true) < 0) {
if (amr_oa_bwe_convert(conn->conn->endp, msg, true) < 0) {
LOGPCONN(conn->conn, DOSMUX, LOGL_ERROR,
"Error converting to AMR octet-aligned mode\n");
msgb_free(msg);
return -1;
}
while ((ret = osmux_xfrm_input(conn->osmux.in, msg2, conn->osmux.remote_cid)) > 0) {
while ((ret = osmux_xfrm_input(conn->osmux.in, msg, conn->osmux.remote_cid)) > 0) {
/* batch full, build and deliver it */
osmux_xfrm_input_deliver(conn->osmux.in);
}
@@ -245,7 +241,7 @@ int conn_osmux_send_rtp(struct mgcp_conn_rtp *conn, struct msgb *msg)
rtpconn_osmux_rate_ctr_inc(conn, OSMUX_RTP_PACKETS_TX_DROPPED_CTR);
} else {
rtpconn_osmux_rate_ctr_inc(conn, OSMUX_RTP_PACKETS_TX_CTR);
rtpconn_osmux_rate_ctr_add(conn, OSMUX_AMR_OCTETS_TX_CTR, msgb_length(msg2) - sizeof(struct rtp_hdr));
rtpconn_osmux_rate_ctr_add(conn, OSMUX_AMR_OCTETS_TX_CTR, msgb_length(msg) - sizeof(struct rtp_hdr));
}
return 0;
}
@@ -325,29 +321,7 @@ static void scheduled_from_osmux_tx_rtp_cb(struct msgb *msg, void *data)
};
endp->type->dispatch_rtp_cb(msg);
msgb_free(msg);
}
static struct msgb *osmux_recv(struct osmo_fd *ofd, struct osmo_sockaddr *addr)
{
struct msgb *msg;
socklen_t slen = sizeof(addr->u.sas);
int ret;
msg = msgb_alloc(4096, "OSMUX");
if (!msg) {
LOGP(DOSMUX, LOGL_ERROR, "cannot allocate message\n");
return NULL;
}
ret = recvfrom(ofd->fd, msg->data, msg->data_len, 0, &addr->u.sa, &slen);
if (ret <= 0) {
msgb_free(msg);
LOGP(DOSMUX, LOGL_ERROR, "cannot receive message\n");
return NULL;
}
msgb_put(msg, ret);
return msg;
/* dispatch_rtp_cb() has taken ownership of the msgb */
}
/* To be called every time some AMR data is received on a connection
@@ -445,22 +419,16 @@ out:
}
#define osmux_chunk_length(msg, rem) ((rem) - (msg)->len)
static int osmux_read_fd_cb(struct osmo_fd *ofd, unsigned int what)
static void osmux_recvfrom_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct osmo_sockaddr *rem_addr)
{
struct msgb *msg;
struct osmux_hdr *osmuxh;
struct osmo_sockaddr rem_addr;
uint32_t rem;
struct mgcp_trunk *trunk = ofd->data;
struct mgcp_trunk *trunk = osmo_iofd_get_data(iofd);
struct rate_ctr_group *all_rtp_stats = trunk->ratectr.all_osmux_conn_stats;
uint32_t rem;
char addr_str[64];
msg = osmux_recv(ofd, &rem_addr);
if (!msg)
return -1;
rate_ctr_inc(rate_ctr_group_get_ctr(all_rtp_stats, OSMUX_PACKETS_RX_CTR));
osmo_sockaddr_to_str_buf(addr_str, sizeof(addr_str), &rem_addr);
osmo_sockaddr_to_str_buf(addr_str, sizeof(addr_str), rem_addr);
if (trunk->cfg->osmux.usage == OSMUX_USAGE_OFF) {
LOGP(DOSMUX, LOGL_ERROR,
@@ -470,14 +438,16 @@ static int osmux_read_fd_cb(struct osmo_fd *ofd, unsigned int what)
}
/* Catch legacy dummy message and process them separately: */
if (msg->len == 2 && msg->data[0] == MGCP_DUMMY_LOAD)
return osmux_handle_legacy_dummy(trunk, &rem_addr, msg);
if (msg->len == 2 && msg->data[0] == MGCP_DUMMY_LOAD) {
osmux_handle_legacy_dummy(trunk, rem_addr, msg);
return;
}
rem = msg->len;
while((osmuxh = osmux_xfrm_output_pull(msg)) != NULL) {
struct mgcp_conn_rtp *conn_src;
conn_src = osmux_conn_lookup(trunk, osmuxh->circuit_id,
&rem_addr);
rem_addr);
if (!conn_src) {
LOGP(DOSMUX, LOGL_DEBUG,
"Cannot find a src conn for %s CID=%d\n",
@@ -485,7 +455,7 @@ static int osmux_read_fd_cb(struct osmo_fd *ofd, unsigned int what)
goto next;
}
if (conn_osmux_event_data_received(conn_src, &rem_addr) < 0)
if (conn_osmux_event_data_received(conn_src, rem_addr) < 0)
goto next;
mgcp_conn_watchdog_kick(conn_src->conn);
@@ -499,58 +469,94 @@ next:
}
out:
msgb_free(msg);
return 0;
}
static void osmux_sendto_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct osmo_sockaddr *rem_addr)
{
/* nothing; osmo_io takes care of msgb_free */
if (res < 0) {
struct mgcp_trunk *trunk = (struct mgcp_trunk *) osmo_iofd_get_data(iofd);
struct rate_ctr_group *all_osmux_stats = trunk->ratectr.all_osmux_conn_stats;
char errbuf[129];
strerror_r(-res, errbuf, sizeof(errbuf));
LOGP(DOSMUX, LOGL_NOTICE, "osmux sendto(%s) failed: %s\n", osmo_sockaddr_to_str(rem_addr), errbuf);
rate_ctr_inc(rate_ctr_group_get_ctr(all_osmux_stats, OSMUX_DROPPED_PACKETS_CTR));
}
}
static const struct osmo_io_ops osmux_ioops = {
.recvfrom_cb = osmux_recvfrom_cb,
.sendto_cb = osmux_sendto_cb,
};
int osmux_init(struct mgcp_trunk *trunk)
{
int ret;
int ret, fd;
struct mgcp_config *cfg = trunk->cfg;
/* So far we only support running on one trunk: */
OSMO_ASSERT(trunk == mgcp_trunk_by_num(cfg, MGCP_TRUNK_VIRTUAL, MGCP_VIRT_TRUNK_ID));
osmo_fd_setup(&osmux_fd_v4, -1, OSMO_FD_READ, osmux_read_fd_cb, trunk, 0);
osmo_fd_setup(&osmux_fd_v6, -1, OSMO_FD_READ, osmux_read_fd_cb, trunk, 0);
osmux_fd_v4 = osmo_iofd_setup(trunk, -1, "osmux_fd_v4", OSMO_IO_FD_MODE_RECVFROM_SENDTO, &osmux_ioops, trunk);
if (!osmux_fd_v4)
goto out;
osmo_iofd_set_alloc_info(osmux_fd_v4, 4096, 0);
if (cfg->osmux.local_addr_v4) {
ret = mgcp_create_bind(cfg->osmux.local_addr_v4, &osmux_fd_v4, cfg->osmux.local_port,
ret = mgcp_create_bind(cfg->osmux.local_addr_v4, cfg->osmux.local_port,
cfg->endp_dscp, cfg->endp_priority);
if (ret < 0) {
LOGP(DOSMUX, LOGL_ERROR, "Cannot bind OSMUX IPv4 socket to %s:%u\n",
cfg->osmux.local_addr_v4, cfg->osmux.local_port);
return ret;
goto out_free_v4;
}
fd = ret;
ret = osmo_fd_register(&osmux_fd_v4);
ret = osmo_iofd_register(osmux_fd_v4, fd);
if (ret < 0) {
LOGP(DOSMUX, LOGL_ERROR, "Cannot register OSMUX IPv4 socket %s\n",
osmo_sock_get_name2(osmux_fd_v4.fd));
return ret;
LOGP(DOSMUX, LOGL_ERROR, "Cannot register OSMUX IPv4 socket %s\n", osmo_sock_get_name2(fd));
close(fd);
goto out_free_v4;
}
LOGP(DOSMUX, LOGL_INFO, "OSMUX IPv4 socket listening on %s\n",
osmo_sock_get_name2(osmux_fd_v4.fd));
LOGP(DOSMUX, LOGL_INFO, "OSMUX IPv4 socket listening on %s\n", osmo_sock_get_name2(fd));
}
osmux_fd_v6 = osmo_iofd_setup(trunk, -1, "osmux_fd_v6", OSMO_IO_FD_MODE_RECVFROM_SENDTO, &osmux_ioops, trunk);
if (!osmux_fd_v6)
goto out_free_v4;
osmo_iofd_set_alloc_info(osmux_fd_v6, 4096, 0);
if (cfg->osmux.local_addr_v6) {
ret = mgcp_create_bind(cfg->osmux.local_addr_v6, &osmux_fd_v6, cfg->osmux.local_port,
ret = mgcp_create_bind(cfg->osmux.local_addr_v6, cfg->osmux.local_port,
cfg->endp_dscp, cfg->endp_priority);
if (ret < 0) {
LOGP(DOSMUX, LOGL_ERROR, "Cannot bind OSMUX IPv6 socket to [%s]:%u\n",
cfg->osmux.local_addr_v6, cfg->osmux.local_port);
return ret;
goto out_free_v6;
}
fd = ret;
ret = osmo_fd_register(&osmux_fd_v6);
ret = osmo_iofd_register(osmux_fd_v6, fd);
if (ret < 0) {
LOGP(DOSMUX, LOGL_ERROR, "Cannot register OSMUX IPv6 socket %s\n",
osmo_sock_get_name2(osmux_fd_v6.fd));
return ret;
LOGP(DOSMUX, LOGL_ERROR, "Cannot register OSMUX IPv6 socket %s\n", osmo_sock_get_name2(fd));
close(fd);
goto out_free_v6;
}
LOGP(DOSMUX, LOGL_INFO, "OSMUX IPv6 socket listening on %s\n",
osmo_sock_get_name2(osmux_fd_v6.fd));
LOGP(DOSMUX, LOGL_INFO, "OSMUX IPv6 socket listening on %s\n", osmo_sock_get_name2(fd));
}
cfg->osmux.initialized = true;
return 0;
out_free_v6:
/* osmo_iofd_free performs unregister + close */
osmo_iofd_free(osmux_fd_v6);
osmux_fd_v6 = NULL;
out_free_v4:
/* osmo_iofd_free performs unregister + close */
osmo_iofd_free(osmux_fd_v4);
osmux_fd_v4 = NULL;
out:
return -1;
}
/*! relase OSXMUX cid, that had been allocated to this connection.
@@ -716,7 +722,7 @@ int osmux_send_dummy(struct mgcp_conn_rtp *conn)
osmo_sockaddr_ntop(&conn->end.addr.u.sa, ipbuf),
osmo_sockaddr_port(&conn->end.addr.u.sa), conn->osmux.remote_cid);
return mgcp_udp_send(osmux_fd_v4.fd, &conn->end.addr, (char *)osmuxh, buf_len);
return mgcp_udp_send(osmux_fd_v4, &conn->end.addr, (char *)osmuxh, buf_len);
}
/* Keeps track of locally allocated Osmux circuit ID. +7 to round up to 8 bit boundary. */