找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1563

积分

0

好友

231

主题
发表于 4 天前 | 查看: 12| 回复: 0

本文档旨在说明如何在RT-Thread(运行于Cortex-M85核)与裸机程序(运行于Cortex-M33核)之间,使用rpmsg-lite组件并配合MCMGR管理器实现高效的异构多核通信。

  • 硬件平台:Renesas RA8P1(ARM Cortex-M85 & ARM Cortex-M33)
    硬件平台示意图
  • 软件环境
    • M85核:RT-Thread操作系统
    • M33核:裸机程序
  • 通信组件:rpmsg-lite + MCMGR

初识rpmsg-lite

RPMsg-Lite是一款轻量级的开源异构处理器通信框架,专为多核系统中的小型微控制器(MCU)设计。它支持在非对称多处理(AMP)配置中进行高效消息传递,例如运行 Linux的主处理器 与运行实时操作系统(如RT-Thread)的协处理器之间。该框架针对资源受限环境优化,相比OpenAMP提供了更简化的API,支持零拷贝和静态内存选项,并确保与RPMsg协议兼容。其核心机制是通过共享内存进行数据交换,通常要求配置非缓存内存,非常适合用于卸载CPU密集型任务。

rpmsg-lite组件优势

1. 静态API创建

相比标准的RPMsg,RPMsg-Lite采用静态API创建方式。与动态分配相比,静态创建至少能减少5KB的代码体积,同时避免了动态内存管理的开销。
静态API创建对比图

2. 基于共享内存的数据零拷贝

传统的驱动外设通信通常需要经历从应用层到驱动层,再到硬件缓冲区的多次数据拷贝,这会引入额外的延迟。而RPMsg-Lite实现了数据零拷贝,数据直接从应用层写入共享内存,仅需一次内存拷贝,能将数据交换周期最低降至微秒(μs)级。当然,这部分性能高度依赖于硬件平台本身的IPC(处理器间通信)机制。
零拷贝原理图

3. 硬中断的高效触发

传统的共享中断需要轮询所有关联外设的状态寄存器。RPMsg-Lite则直接使用 platform_notify 来主动触发核间中断通知。这个触发时机可以由用户灵活控制,例如可以累计N个消息后再触发一次中断,从而实现核间通信的批量处理,提升效率。
中断触发机制图

rpmsg-lite工程架构

rpmsg-lite工程主要分为两种架构版本:

  • RT-Thread + Freertos + RPMsg-Lite:此示例中,RPMsg-Lite结合RTOS自身的消息队列来实现子组件 queue
    RTOS版本架构图
  • RT-Thread + Bare metal + RPMsg-Lite + MCMGR:为了方便裸机系统与主核实时同步状态,此版本移植了官方的MCMGR(Multiple Core Manager)组件来管理核间同步。
    MCMGR版本架构图

rpmsg-lite通信流程(RTOS版)

需要明确,裸机版与RTOS版的rpmsg-lite收发逻辑不同,此处先分析RTOS版本。
RTOS通信流程图
上图简要说明了流程。首先明确用户层使用的关键API:rpmsg-dev的初始化、消息发送与接收。

1. 初始化
// master核初始化
struct rpmsg_lite_instance *rpmsg_lite_master_init(void *shmem_addr,
                                                   size_t shmem_length,
                                                   uint32_t link_id,
                                                   uint32_t init_flags)
// remote核初始化
struct rpmsg_lite_instance *rpmsg_lite_remote_init(void *shmem_addr,
                                                   uint32_t link_id,
                                                   uint32_t init_flags)

这一步会在Master和Remote核上分别初始化一个 rpmsg-dev 实体,并声明共享内存的起始地址。在此过程中,会绑定两个virtual queue(virtqueue)的回调函数(这是收发的关键):

  • callback[0] = rpmsg_lite_rx_callback;
  • callback[1] = rpmsg_lite_tx_callback;

接着创建端点(Endpoint):

struct rpmsg_lite_endpoint *rpmsg_lite_create_ept(struct rpmsg_lite_instance *rpmsg_lite_dev,
                                                   uint32_t addr,
                                                   rl_ept_rx_cb_t rx_cb,
                                                   void *rx_cb_data);

这里传递的参数最终会给到virtqueue的初始化创建,并作为 callback_fc 参数进行绑定:

int32_t virtqueue_create(uint16_t id,
                         const char *name,
                         struct vring_alloc_info *ring,
                         void (*callback_fc)(struct virtqueue *vq),
                         void (*notify_fc)(struct virtqueue *vq),
                         struct virtqueue **v_queue)

同时,virtqueue_notify() 函数会作为第五个参数绑定为通知函数,这一层对接到底层移植层(Environment Layer)的 platform_notify(),通常用于触发核间中断,从而通知另一个核心处理数据:

void platform_notify(uint32_t vector_id)
{
    env_lock_mutex(platform_lock);
    R_IPC_MessageSend(&g_ipc1_ctrl, (uint32_t)(RL_GET_Q_ID(vector_id)));
    env_unlock_mutex(platform_lock);
}

注意:为了方便多核通信,virtqueue默认配置为两个,双核间的收发virtqueue一一对应:

Master            Remote
vqx[0]:tx_vq     ->        rx_vq
vqx[0]:rx_vq     ->        tx_vq
2. 发送数据

发送数据的API如下:

int32_t rpmsg_lite_send(struct rpmsg_lite_instance *rpmsg_lite_dev,
                        struct rpmsg_lite_endpoint *ept,
                        uint32_t dst,
                        char *data,
                        uint32_t size,
                        uintptr_t timeout)

参数 data 是待传输的数据,后续步骤为:

  1. 从共享内存的发送virtqueue(vq_tx_master)申请缓冲区。
    buffer = rpmsg_lite_dev->vq_ops->vq_tx_alloc(rpmsg_lite_dev->tvq, &buff_len, &idx);
  2. 将共享内存地址转换为 rpmsg_std_msg 结构,并拷贝实际数据。
    rpmsg_msg = (struct rpmsg_std_msg *)buffer;
    env_memcpy(rpmsg_msg->data, data, size);
  3. 将申请到的缓冲区标记为可用,并加入 vq_ring.avail 队列,这常用于缓存场景下的多核通信。
    /* Enqueue buffer on virtqueue. */
    rpmsg_lite_dev->vq_ops->vq_tx(rpmsg_lite_dev->tvq, buffer, buff_len, idx);

    至此,数据已放入virtqueue。接着需要发送一个通知给对端核心,触发其从共享队列中读取数据。调用链如下:

    rpmsg_lite_send
    -> rpmsg_lite_format_message
        -> virtqueue_kick
            -> vq_ring_notify_host
                -> virtqueue_notify
                    -> platform_notify
3. 接收数据

在RTOS下,接收数据使用 rpmsg_queue_recv API:

int32_t rpmsg_queue_recv(struct rpmsg_lite_instance *rpmsg_lite_dev,
                         rpmsg_queue_handle q,
                         uint32_t *src,
                         char *data,
                         uint32_t maxlen,
                         uint32_t *len,
                         uintptr_t timeout)

其内部本质是从消息队列中取出数据。如上图红色箭头所示,当接收到来自另一个核心的IPC中断通知后,会触发 env_isr

void g_ipc0_callback(ipc_callback_args_t *p_args)
{
    rt_interrupt_enter();
    /* Check for message received event */
    if (IPC_EVENT_MESSAGE_RECEIVED & p_args->event)
    {
        env_isr(p_args->message);
    }
    rt_interrupt_leave();
}

此处会通知本核心的virtqueue,并触发对应的回调函数(接收回调为 rpmsg_lite_rx_callback)。在该回调函数中,会执行 ept->rx_cb,即初始化rpmsg-lite时绑定的端点回调函数 rpmsg_queue_rx_cb

my_ept = rpmsg_lite_create_ept(my_rpmsg, LOCAL_EPT_ADDR, rpmsg_queue_rx_cb, my_queue);

int32_t rpmsg_queue_rx_cb(void *payload, uint32_t payload_len, uint32_t src, void *priv)
{
    rpmsg_queue_rx_cb_data_t msg;
    RL_ASSERT(priv != RL_NULL);
    msg.data = payload;
    msg.len  = payload_len;
    msg.src  = src;
    /* if message is successfully added into queue then hold rpmsg buffer */
    if (0 != env_put_queue(priv, &msg, 0))
    {
        /* hold the rx buffer */
        return RL_HOLD;
    }
    return RL_RELEASE;
}

随后,rpmsg-lite的消息队列线程被激活,进而处理接收到的消息。

rpmsg-lite通信流程(MCMGR版)

MCMGR组件开源地址:https://github.com/nxp-mcuxpresso/mcux-mcmgr

1. MCMGR双核间同步
// M85核(主核)
// 首先注册一个事件回调函数,实时侦测来自次核的通知(APP_RPMSG_READY_EVENT_DATA 和 APP_RPMSG_EP_READY_EVENT_DATA 事件)
static void RPMsgRemoteReadyEventHandler(uint16_t eventData, void *context)
{
    uint16_t *data = (uint16_t *)context;
    *data = eventData;
}
/* Register the application event before starting the secondary core */
(void)MCMGR_RegisterEvent(kMCMGR_RemoteApplicationEvent, RPMsgRemoteReadyEventHandler,
                          (void *)&RPMsgRemoteReadyEventData);
while (APP_RPMSG_READY_EVENT_DATA != RPMsgRemoteReadyEventData)
{
};
/* Wait until the secondary core application signals the rpmsg remote endpoint has been created. */
while (APP_RPMSG_EP_READY_EVENT_DATA != RPMsgRemoteReadyEventData)
{
}

// M33核(次核,裸机)
// 通过MCMGR_TriggerEvent触发IPC中断,发送通知给主核
/* Signal the other core we are ready by triggering the event and passing the APP_RPMSG_READY_EVENT_DATA */
(void)MCMGR_TriggerEvent(kMCMGR_RemoteApplicationEvent, APP_RPMSG_READY_EVENT_DATA);
/* Signal the other core the endpoint has been created by triggering the event and passing the
 * APP_RPMSG_READY_EP_EVENT_DATA */
(void)MCMGR_TriggerEvent(kMCMGR_RemoteApplicationEvent, APP_RPMSG_EP_READY_EVENT_DATA);
2. MCMGR双核间通信

<1> 接收
接收逻辑的关键在于端点创建时绑定的回调函数 rx_cb

struct rpmsg_lite_endpoint *rpmsg_lite_create_ept(struct rpmsg_lite_instance *rpmsg_lite_dev,
                                                   uint32_t addr,
                                                   rl_ept_rx_cb_t rx_cb,
                                                   void *rx_cb_data,
                                                   struct rpmsg_lite_ept_static_context *ept_context)

MCMGR多核管理器统一注册事件,并基于底层 platform_notify 机制触发 env_isr,进而执行 mcmgr_ipc_callback 来调用接收回调 rx_cb。这个触发动作由对端核心的 rpmsg_lite_send 发起(通过 virtqueue_kick)。

// 示例:用户定义的接收回调
static int32_t my_ept_read_cb(void *payload, uint32_t payload_len, uint32_t src, void *priv)
{
    int32_t *has_received = priv;
    if (payload_len <= sizeof(THE_MESSAGE))
    {
        (void)rt_memcpy((void *)&msg, payload, payload_len);
        *has_received = 1;
    }
    (void)rt_kprintf("Primary core received a msg\r\n");
    (void)rt_kprintf("Message: Size=%x, DATA = %i\r\n", payload_len, msg.DATA);
    return RL_RELEASE;
}
// 创建端点时绑定回调
my_ept = rpmsg_lite_create_ept(my_rpmsg, LOCAL_EPT_ADDR, my_ept_read_cb, (void *)&has_received, &my_ept_context);

因此,接收流程为:

mcmgr_ipc_callback
    -> mcmgr_event_handler
        -> env_isr
            -> virtqueue_notification
                -> rpmsg_lite_rx_callback(vq->callback_fc)
                    -> my_ept_read_cb(ept->rx_cb)

<2> 发送
发送逻辑的关键API仍是 rpmsg_lite_send

int32_t rpmsg_lite_send(struct rpmsg_lite_instance *rpmsg_lite_dev,
                        struct rpmsg_lite_endpoint *ept,
                        uint32_t dst,
                        char *data,
                        uint32_t size,
                        uintptr_t timeout)

流程如下:

(void)rpmsg_lite_send(my_rpmsg, my_ept, REMOTE_EPT_ADDR, (char *)&msg, sizeof(THE_MESSAGE), RL_DONT_BLOCK);

内部会调用 rpmsg_lite_format_message,核心步骤与RTOS版类似:

  1. 从共享内存的发送virtqueue申请缓冲区。
  2. 将数据拷贝至共享内存的 rpmsg_std_msg 结构中。
  3. 将缓冲区加入virtqueue的可用环(avail ring)。

最后,执行通知机制:

virtqueue_kick
    -> vq_ring_notify_host(vq->notify_fc(vq))
        -> platform_nofity
            -> (void)MCMGR_TriggerEventForce(kMCMGR_RemoteRPMsgEvent, (uint16_t)RL_GET_Q_ID(vector_id));
                -> trig ipc callback
                    -> mcmgr_ipc_callback

至此,便完成了一次通过MCMGR管理的核间消息发送。通过结合RPMsg-Lite的高效通信与MCMGR的核间状态管理,可以在类似 RT-Thread这样的实时操作系统 与裸机程序之间构建稳定、高效的异构多核通信链路。




上一篇:DuckDB嵌入式数据库实战:告别Pandas内存爆炸与Excel大文件崩溃
下一篇:CSS加载性能优化:压缩字典实战指南与核心原理解析
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-24 19:14 , Processed in 0.242062 second(s), 38 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表