mirror of
				https://github.com/open5gs/open5gs.git
				synced 2025-11-04 05:53:18 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			301 lines
		
	
	
		
			9.0 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			301 lines
		
	
	
		
			9.0 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
/* Licensed to the Apache Software Foundation (ASF) under one or more
 | 
						|
 * contributor license agreements.  See the NOTICE file distributed with
 | 
						|
 * this work for additional information regarding copyright ownership.
 | 
						|
 * The ASF licenses this file to You under the Apache License, Version 2.0
 | 
						|
 * (the "License"); you may not use this file except in compliance with
 | 
						|
 * the License.  You may obtain a copy of the License at
 | 
						|
 *
 | 
						|
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 *
 | 
						|
 * Unless required by applicable law or agreed to in writing, software
 | 
						|
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
 * See the License for the specific language governing permissions and
 | 
						|
 * limitations under the License.
 | 
						|
 */
 | 
						|
 | 
						|
/*
 | 
						|
 * Copyright (C) 2019-2020 by Sukchan Lee <acetcom@gmail.com>
 | 
						|
 *
 | 
						|
 * This file is part of Open5GS.
 | 
						|
 *
 | 
						|
 * This program is free software: you can redistribute it and/or modify
 | 
						|
 * it under the terms of the GNU Affero General Public License as published by
 | 
						|
 * the Free Software Foundation, either version 3 of the License, or
 | 
						|
 * (at your option) any later version.
 | 
						|
 *
 | 
						|
 * This program is distributed in the hope that it will be useful,
 | 
						|
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
 * GNU General Public License for more details.
 | 
						|
 *
 | 
						|
 * You should have received a copy of the GNU General Public License
 | 
						|
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 | 
						|
 */
 | 
						|
 | 
						|
#include "ogs-core.h"
 | 
						|
 | 
						|
#undef OGS_LOG_DOMAIN
 | 
						|
#define OGS_LOG_DOMAIN __ogs_event_domain
 | 
						|
 | 
						|
typedef struct ogs_queue_s {
 | 
						|
    void              **data;
 | 
						|
    unsigned int        nelts; /**< # elements */
 | 
						|
    unsigned int        in;    /**< next empty location */
 | 
						|
    unsigned int        out;   /**< next filled location */
 | 
						|
    unsigned int        bounds;/**< max size of queue */
 | 
						|
    unsigned int        full_waiters;
 | 
						|
    unsigned int        empty_waiters;
 | 
						|
    ogs_thread_mutex_t  one_big_mutex;
 | 
						|
    ogs_thread_cond_t   not_empty;
 | 
						|
    ogs_thread_cond_t   not_full;
 | 
						|
    int                 terminated;
 | 
						|
} ogs_queue_t;
 | 
						|
 | 
						|
/**
 | 
						|
 * Detects when the ogs_queue_t is full. This utility function is expected
 | 
						|
 * to be called from within critical sections, and is not threadsafe.
 | 
						|
 */
 | 
						|
#define ogs_queue_full(queue) ((queue)->nelts == (queue)->bounds)
 | 
						|
 | 
						|
/**
 | 
						|
 * Detects when the ogs_queue_t is empty. This utility function is expected
 | 
						|
 * to be called from within critical sections, and is not threadsafe.
 | 
						|
 */
 | 
						|
#define ogs_queue_empty(queue) ((queue)->nelts == 0)
 | 
						|
 | 
						|
/**
 | 
						|
 * Callback routine that is called to destroy this
 | 
						|
 * ogs_queue_t when its pool is destroyed.
 | 
						|
 */
 | 
						|
ogs_queue_t *ogs_queue_create(unsigned int capacity)
 | 
						|
{
 | 
						|
    ogs_queue_t *queue = ogs_calloc(1, sizeof *queue);
 | 
						|
    ogs_assert(queue);
 | 
						|
 | 
						|
    ogs_thread_mutex_init(&queue->one_big_mutex);
 | 
						|
    ogs_thread_cond_init(&queue->not_empty);
 | 
						|
    ogs_thread_cond_init(&queue->not_full);
 | 
						|
 | 
						|
    queue->data = ogs_calloc(1, capacity * sizeof(void*));
 | 
						|
    queue->bounds = capacity;
 | 
						|
    queue->nelts = 0;
 | 
						|
    queue->in = 0;
 | 
						|
    queue->out = 0;
 | 
						|
    queue->terminated = 0;
 | 
						|
    queue->full_waiters = 0;
 | 
						|
    queue->empty_waiters = 0;
 | 
						|
 | 
						|
    return queue;
 | 
						|
}
 | 
						|
 | 
						|
void ogs_queue_destroy(ogs_queue_t *queue)
 | 
						|
{
 | 
						|
    ogs_assert(queue);
 | 
						|
 | 
						|
    ogs_free(queue->data);
 | 
						|
 | 
						|
    ogs_thread_cond_destroy(&queue->not_empty);
 | 
						|
    ogs_thread_cond_destroy(&queue->not_full);
 | 
						|
    ogs_thread_mutex_destroy(&queue->one_big_mutex);
 | 
						|
 | 
						|
    ogs_free(queue);
 | 
						|
}
 | 
						|
 | 
						|
static int queue_push(ogs_queue_t *queue, void *data, ogs_time_t timeout)
 | 
						|
{
 | 
						|
    int rv;
 | 
						|
 | 
						|
    if (queue->terminated) {
 | 
						|
        return OGS_DONE; /* no more elements ever again */
 | 
						|
    }
 | 
						|
 | 
						|
    ogs_thread_mutex_lock(&queue->one_big_mutex);
 | 
						|
 | 
						|
    if (ogs_queue_full(queue)) {
 | 
						|
        if (!timeout) {
 | 
						|
            ogs_thread_mutex_unlock(&queue->one_big_mutex);
 | 
						|
            return OGS_RETRY;
 | 
						|
        }
 | 
						|
        if (!queue->terminated) {
 | 
						|
            queue->full_waiters++;
 | 
						|
            if (timeout > 0) {
 | 
						|
                rv = ogs_thread_cond_timedwait(&queue->not_full,
 | 
						|
                                               &queue->one_big_mutex,
 | 
						|
                                               timeout);
 | 
						|
            }
 | 
						|
            else {
 | 
						|
                rv = ogs_thread_cond_wait(&queue->not_full,
 | 
						|
                                          &queue->one_big_mutex);
 | 
						|
            }
 | 
						|
            queue->full_waiters--;
 | 
						|
            if (rv != OGS_OK) {
 | 
						|
                ogs_thread_mutex_unlock(&queue->one_big_mutex);
 | 
						|
                return rv;
 | 
						|
            }
 | 
						|
        }
 | 
						|
        /* If we wake up and it's still empty, then we were interrupted */
 | 
						|
        if (ogs_queue_full(queue)) {
 | 
						|
            ogs_warn("queue full (intr)");
 | 
						|
            ogs_thread_mutex_unlock(&queue->one_big_mutex);
 | 
						|
            if (queue->terminated) {
 | 
						|
                return OGS_DONE; /* no more elements ever again */
 | 
						|
            }
 | 
						|
            else {
 | 
						|
                return OGS_ERROR;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    queue->data[queue->in] = data;
 | 
						|
    queue->in++;
 | 
						|
    if (queue->in >= queue->bounds)
 | 
						|
        queue->in -= queue->bounds;
 | 
						|
    queue->nelts++;
 | 
						|
 | 
						|
    if (queue->empty_waiters) {
 | 
						|
        ogs_trace("signal !empty");
 | 
						|
        ogs_thread_cond_signal(&queue->not_empty);
 | 
						|
    }
 | 
						|
 | 
						|
    ogs_thread_mutex_unlock(&queue->one_big_mutex);
 | 
						|
    return OGS_OK;
 | 
						|
}
 | 
						|
 | 
						|
int ogs_queue_push(ogs_queue_t *queue, void *data)
 | 
						|
{
 | 
						|
    return queue_push(queue, data, OGS_INFINITE_TIME);
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * Push new data onto the queue. If the queue is full, return OGS_RETRY. If
 | 
						|
 * the push operation completes successfully, it signals other threads
 | 
						|
 * waiting in ogs_queue_pop() that they may continue consuming sockets.
 | 
						|
 */
 | 
						|
int ogs_queue_trypush(ogs_queue_t *queue, void *data)
 | 
						|
{
 | 
						|
    return queue_push(queue, data, 0);
 | 
						|
}
 | 
						|
 | 
						|
int ogs_queue_timedpush(ogs_queue_t *queue, void *data, ogs_time_t timeout)
 | 
						|
{
 | 
						|
    return queue_push(queue, data, timeout);
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * not thread safe
 | 
						|
 */
 | 
						|
unsigned int ogs_queue_size(ogs_queue_t *queue) {
 | 
						|
    return queue->nelts;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * Retrieves the next item from the queue. If there are no
 | 
						|
 * items available, it will either return OGS_RETRY (timeout = 0),
 | 
						|
 * or block until one becomes available (infinitely with timeout < 0,
 | 
						|
 * otherwise until the given timeout expires). Once retrieved, the
 | 
						|
 * item is placed into the address specified by 'data'.
 | 
						|
 */
 | 
						|
static int queue_pop(ogs_queue_t *queue, void **data, ogs_time_t timeout)
 | 
						|
{
 | 
						|
    int rv;
 | 
						|
 | 
						|
    if (queue->terminated) {
 | 
						|
        return OGS_DONE; /* no more elements ever again */
 | 
						|
    }
 | 
						|
 | 
						|
    ogs_thread_mutex_lock(&queue->one_big_mutex);
 | 
						|
 | 
						|
    /* Keep waiting until we wake up and find that the queue is not empty. */
 | 
						|
    if (ogs_queue_empty(queue)) {
 | 
						|
        if (!timeout) {
 | 
						|
            ogs_thread_mutex_unlock(&queue->one_big_mutex);
 | 
						|
            return OGS_RETRY;
 | 
						|
        }
 | 
						|
        if (!queue->terminated) {
 | 
						|
            queue->empty_waiters++;
 | 
						|
            if (timeout > 0) {
 | 
						|
                rv = ogs_thread_cond_timedwait(&queue->not_empty,
 | 
						|
                                               &queue->one_big_mutex,
 | 
						|
                                               timeout);
 | 
						|
            }
 | 
						|
            else {
 | 
						|
                rv = ogs_thread_cond_wait(&queue->not_empty,
 | 
						|
                                          &queue->one_big_mutex);
 | 
						|
            }
 | 
						|
            queue->empty_waiters--;
 | 
						|
            if (rv != OGS_OK) {
 | 
						|
                ogs_thread_mutex_unlock(&queue->one_big_mutex);
 | 
						|
                return rv;
 | 
						|
            }
 | 
						|
        }
 | 
						|
        /* If we wake up and it's still empty, then we were interrupted */
 | 
						|
        if (ogs_queue_empty(queue)) {
 | 
						|
            ogs_warn("queue empty (intr)");
 | 
						|
            ogs_thread_mutex_unlock(&queue->one_big_mutex);
 | 
						|
            if (queue->terminated) {
 | 
						|
                return OGS_DONE; /* no more elements ever again */
 | 
						|
            } else {
 | 
						|
                return OGS_ERROR;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    } 
 | 
						|
 | 
						|
    *data = queue->data[queue->out];
 | 
						|
    queue->nelts--;
 | 
						|
 | 
						|
    queue->out++;
 | 
						|
    if (queue->out >= queue->bounds)
 | 
						|
        queue->out -= queue->bounds;
 | 
						|
    if (queue->full_waiters) {
 | 
						|
        ogs_trace("signal !full");
 | 
						|
        ogs_thread_cond_signal(&queue->not_full);
 | 
						|
    }
 | 
						|
 | 
						|
    ogs_thread_mutex_unlock(&queue->one_big_mutex);
 | 
						|
    return OGS_OK;
 | 
						|
}
 | 
						|
 | 
						|
int ogs_queue_pop(ogs_queue_t *queue, void **data)
 | 
						|
{
 | 
						|
    return queue_pop(queue, data, OGS_INFINITE_TIME);
 | 
						|
}
 | 
						|
 | 
						|
int ogs_queue_trypop(ogs_queue_t *queue, void **data)
 | 
						|
{
 | 
						|
    return queue_pop(queue, data, 0);
 | 
						|
}
 | 
						|
 | 
						|
int ogs_queue_timedpop(ogs_queue_t *queue, void **data, ogs_time_t timeout)
 | 
						|
{
 | 
						|
    return queue_pop(queue, data, timeout);
 | 
						|
}
 | 
						|
 | 
						|
int ogs_queue_interrupt_all(ogs_queue_t *queue)
 | 
						|
{
 | 
						|
    ogs_debug("interrupt all");
 | 
						|
    ogs_thread_mutex_lock(&queue->one_big_mutex);
 | 
						|
 | 
						|
    ogs_thread_cond_broadcast(&queue->not_empty);
 | 
						|
    ogs_thread_cond_broadcast(&queue->not_full);
 | 
						|
 | 
						|
    ogs_thread_mutex_unlock(&queue->one_big_mutex);
 | 
						|
 | 
						|
    return OGS_OK;
 | 
						|
}
 | 
						|
 | 
						|
int ogs_queue_term(ogs_queue_t *queue)
 | 
						|
{
 | 
						|
    ogs_thread_mutex_lock(&queue->one_big_mutex);
 | 
						|
 | 
						|
    /* we must hold one_big_mutex when setting this... otherwise,
 | 
						|
     * we could end up setting it and waking everybody up just after a 
 | 
						|
     * would-be popper checks it but right before they block
 | 
						|
     */
 | 
						|
    queue->terminated = 1;
 | 
						|
    ogs_thread_mutex_unlock(&queue->one_big_mutex);
 | 
						|
 | 
						|
    return ogs_queue_interrupt_all(queue);
 | 
						|
}
 | 
						|
 |