diff --git a/common/utils/itti/intertask_interface.c b/common/utils/itti/intertask_interface.c index 554f50a2a103bf35044030974e8347d5bf22491e..85b5fef5096a4ddabf4f0463ebb07d6675c2d3d0 100644 --- a/common/utils/itti/intertask_interface.c +++ b/common/utils/itti/intertask_interface.c @@ -391,6 +391,103 @@ int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, Me return 0; } +/* same as itti_send_msg_to_task but returns -1 in case of failure instead of crashing */ +/* TODO: this is a hack - the whole logic needs a proper rework. */ +/* look for HACK_RLC_UM_LIMIT for others places related to the hack. Please do not remove this comment. */ +int itti_try_send_msg_to_task(task_id_t destination_task_id, instance_t instance, MessageDef *message) +{ + thread_id_t destination_thread_id; + task_id_t origin_task_id; + message_list_t *new; + uint32_t priority; + message_number_t message_number; + uint32_t message_id; + + AssertFatal (message != NULL, "Message is NULL!\n"); + AssertFatal (destination_task_id < itti_desc.task_max, "Destination task id (%d) is out of range (%d)\n", destination_task_id, itti_desc.task_max); + + destination_thread_id = TASK_GET_THREAD_ID(destination_task_id); + message->ittiMsgHeader.destinationTaskId = destination_task_id; + message->ittiMsgHeader.instance = instance; + message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame; + message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot; + message_id = message->ittiMsgHeader.messageId; + AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max); + + origin_task_id = ITTI_MSG_ORIGIN_ID(message); + + priority = itti_get_message_priority (message_id); + + /* Increment the global message number */ + message_number = itti_increment_message_number (); + + itti_dump_queue_message (origin_task_id, message_number, message, itti_desc.messages_info[message_id].name, + sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize); + + if (destination_task_id != TASK_UNKNOWN) { + + if (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_ENDED) { + ITTI_DEBUG(ITTI_DEBUG_ISSUES, " Message %s, number %lu with priority %d can not be sent from %s to queue (%u:%s), ended destination task!\n", + itti_desc.messages_info[message_id].name, + message_number, + priority, + itti_get_task_name(origin_task_id), + destination_task_id, + itti_get_task_name(destination_task_id)); + } else { + /* We cannot send a message if the task is not running */ + AssertFatal (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_READY, + "Task %s Cannot send message %s (%d) to thread %d, it is not in ready state (%d)!\n", + itti_get_task_name(origin_task_id), + itti_desc.messages_info[message_id].name, + message_id, + destination_thread_id, + itti_desc.threads[destination_thread_id].task_state); + + /* Allocate new list element */ + new = (message_list_t *) itti_malloc (origin_task_id, destination_task_id, sizeof(struct message_list_s)); + + /* Fill in members */ + new->msg = message; + new->message_number = message_number; + new->message_priority = priority; + + /* Enqueue message in destination task queue */ + if (lfds611_queue_enqueue(itti_desc.tasks[destination_task_id].message_queue, new) == 0) { + itti_free(origin_task_id, new); + return -1; + } + + { + /* Only use event fd for tasks, subtasks will pool the queue */ + if (TASK_GET_PARENT_TASK_ID(destination_task_id) == TASK_UNKNOWN) { + ssize_t write_ret; + eventfd_t sem_counter = 1; + + /* Call to write for an event fd must be of 8 bytes */ + write_ret = write (itti_desc.threads[destination_thread_id].task_event_fd, &sem_counter, sizeof(sem_counter)); + AssertFatal (write_ret == sizeof(sem_counter), "Write to task message FD (%d) failed (%d/%d)\n", + destination_thread_id, (int) write_ret, (int) sizeof(sem_counter)); + } + } + + ITTI_DEBUG(ITTI_DEBUG_SEND, " Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n", + itti_desc.messages_info[message_id].name, + message_number, + priority, + itti_get_task_name(origin_task_id), + destination_task_id, + itti_get_task_name(destination_task_id)); + } + } else { + /* This is a debug message to TASK_UNKNOWN, we can release safely release it */ + int result = itti_free(origin_task_id, message); + AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result); + } + + return 0; +} + void itti_subscribe_event_fd(task_id_t task_id, int fd) { thread_id_t thread_id; diff --git a/common/utils/itti/intertask_interface.h b/common/utils/itti/intertask_interface.h index 606c851d1de02dd6647f68321499a83da3e6fcc3..ada34ce1b722a09722373cc5a650870364891b4e 100644 --- a/common/utils/itti/intertask_interface.h +++ b/common/utils/itti/intertask_interface.h @@ -108,6 +108,18 @@ int itti_send_broadcast_message(MessageDef *message_p); **/ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message); +/* TODO: this is a hack. Almost no caller of itti_send_msg_to_task checks + * the return value so it has been changed to crash the program in case + * of failure instead of returning -1 as the documentation above says. + * The RLC UM code may receive too much data when doing UDP at a higher + * throughput than the link allows and so for this specific case we need + * a version that actually returns -1 on failure. + * + * This needs to be cleaned at some point. + */ +/* look for HACK_RLC_UM_LIMIT for others places related to the hack. Please do not remove this comment. */ +int itti_try_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message); + /** \brief Add a new fd to monitor. * NOTE: it is up to the user to read data associated with the fd * \param task_id Task ID of the receiving task diff --git a/openair2/LAYER2/RLC/rlc.c b/openair2/LAYER2/RLC/rlc.c index d27d46ff4282b4ca3488a85ffb649f6f5d7d80ee..783441bd586928fb2dfe9dfc61dc1e926ca31ada 100644 --- a/openair2/LAYER2/RLC/rlc.c +++ b/openair2/LAYER2/RLC/rlc.c @@ -437,6 +437,15 @@ rlc_op_status_t rlc_data_req (const protocol_ctxt_t* const ctxt_pP, break; case RLC_MODE_UM: + /* TODO: this is a hack, needs better solution. Let's not use too + * much memory and store at maximum 5 millions bytes. + */ + /* look for HACK_RLC_UM_LIMIT for others places related to the hack. Please do not remove this comment. */ + if (rlc_um_get_buffer_occupancy(&rlc_union_p->rlc.um) > 5000000) { + free_mem_block(sdu_pP, __func__); + return RLC_OP_STATUS_OUT_OF_RESSOURCES; + } + new_sdu_p = get_free_mem_block (sdu_sizeP + sizeof (struct rlc_um_data_req_alloc), __func__); if (new_sdu_p != NULL) { diff --git a/openair3/UDP/udp_eNB_task.c b/openair3/UDP/udp_eNB_task.c index 527715eb7a2470c3577bb1a3b5a4fa40303af74b..4d6bd6e033a45417a2044e2cfb39ffff3ac8d1c8 100644 --- a/openair3/UDP/udp_eNB_task.c +++ b/openair3/UDP/udp_eNB_task.c @@ -270,10 +270,21 @@ void udp_eNB_receiver(struct udp_socket_desc_s *udp_sock_pP) n, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); #endif - if (itti_send_msg_to_task(udp_sock_pP->task_id, INSTANCE_DEFAULT, message_p) < 0) { + /* TODO: this is a hack. Let's accept failures and do nothing when + * it happens. Since itti_send_msg_to_task crashes when the message + * queue is full we wrote itti_try_send_msg_to_task that returns -1 + * if the queue is full. + */ + /* look for HACK_RLC_UM_LIMIT for others places related to the hack. Please do not remove this comment. */ + //if (itti_send_msg_to_task(udp_sock_pP->task_id, INSTANCE_DEFAULT, message_p) < 0) { + if (itti_try_send_msg_to_task(udp_sock_pP->task_id, INSTANCE_DEFAULT, message_p) < 0) { +#if 0 LOG_I(UDP_, "Failed to send message %d to task %d\n", UDP_DATA_IND, udp_sock_pP->task_id); +#endif + itti_free(TASK_UDP, message_p); + itti_free(TASK_UDP, forwarded_buffer); return; } }