本文档旨在说明如何在RT-Thread(运行于Cortex-M85核)与裸机程序(运行于Cortex-M33核)之间,使用rpmsg-lite组件并配合MCMGR管理器实现高效的异构多核通信。
- 硬件平台:Renesas RA8P1(ARM Cortex-M85 & ARM Cortex-M33)

- 软件环境
- M85核:RT-Thread操作系统
- M33核:裸机程序
- 通信组件:rpmsg-lite + MCMGR
初识rpmsg-lite
RPMsg-Lite是一款轻量级的开源异构处理器通信框架,专为多核系统中的小型微控制器(MCU)设计。它支持在非对称多处理(AMP)配置中进行高效消息传递,例如运行 Linux的主处理器 与运行实时操作系统(如RT-Thread)的协处理器之间。该框架针对资源受限环境优化,相比OpenAMP提供了更简化的API,支持零拷贝和静态内存选项,并确保与RPMsg协议兼容。其核心机制是通过共享内存进行数据交换,通常要求配置非缓存内存,非常适合用于卸载CPU密集型任务。
rpmsg-lite组件优势
1. 静态API创建
相比标准的RPMsg,RPMsg-Lite采用静态API创建方式。与动态分配相比,静态创建至少能减少5KB的代码体积,同时避免了动态内存管理的开销。

2. 基于共享内存的数据零拷贝
传统的驱动外设通信通常需要经历从应用层到驱动层,再到硬件缓冲区的多次数据拷贝,这会引入额外的延迟。而RPMsg-Lite实现了数据零拷贝,数据直接从应用层写入共享内存,仅需一次内存拷贝,能将数据交换周期最低降至微秒(μs)级。当然,这部分性能高度依赖于硬件平台本身的IPC(处理器间通信)机制。

3. 硬中断的高效触发
传统的共享中断需要轮询所有关联外设的状态寄存器。RPMsg-Lite则直接使用 platform_notify 来主动触发核间中断通知。这个触发时机可以由用户灵活控制,例如可以累计N个消息后再触发一次中断,从而实现核间通信的批量处理,提升效率。

rpmsg-lite工程架构
rpmsg-lite工程主要分为两种架构版本:
- RT-Thread + Freertos + RPMsg-Lite:此示例中,RPMsg-Lite结合RTOS自身的消息队列来实现子组件
queue。

- RT-Thread + Bare metal + RPMsg-Lite + MCMGR:为了方便裸机系统与主核实时同步状态,此版本移植了官方的MCMGR(Multiple Core Manager)组件来管理核间同步。

rpmsg-lite通信流程(RTOS版)
需要明确,裸机版与RTOS版的rpmsg-lite收发逻辑不同,此处先分析RTOS版本。

上图简要说明了流程。首先明确用户层使用的关键API:rpmsg-dev的初始化、消息发送与接收。
1. 初始化
// master核初始化
struct rpmsg_lite_instance *rpmsg_lite_master_init(void *shmem_addr,
size_t shmem_length,
uint32_t link_id,
uint32_t init_flags)
// remote核初始化
struct rpmsg_lite_instance *rpmsg_lite_remote_init(void *shmem_addr,
uint32_t link_id,
uint32_t init_flags)
这一步会在Master和Remote核上分别初始化一个 rpmsg-dev 实体,并声明共享内存的起始地址。在此过程中,会绑定两个virtual queue(virtqueue)的回调函数(这是收发的关键):
callback[0] = rpmsg_lite_rx_callback;
callback[1] = rpmsg_lite_tx_callback;
接着创建端点(Endpoint):
struct rpmsg_lite_endpoint *rpmsg_lite_create_ept(struct rpmsg_lite_instance *rpmsg_lite_dev,
uint32_t addr,
rl_ept_rx_cb_t rx_cb,
void *rx_cb_data);
这里传递的参数最终会给到virtqueue的初始化创建,并作为 callback_fc 参数进行绑定:
int32_t virtqueue_create(uint16_t id,
const char *name,
struct vring_alloc_info *ring,
void (*callback_fc)(struct virtqueue *vq),
void (*notify_fc)(struct virtqueue *vq),
struct virtqueue **v_queue)
同时,virtqueue_notify() 函数会作为第五个参数绑定为通知函数,这一层对接到底层移植层(Environment Layer)的 platform_notify(),通常用于触发核间中断,从而通知另一个核心处理数据:
void platform_notify(uint32_t vector_id)
{
env_lock_mutex(platform_lock);
R_IPC_MessageSend(&g_ipc1_ctrl, (uint32_t)(RL_GET_Q_ID(vector_id)));
env_unlock_mutex(platform_lock);
}
注意:为了方便多核通信,virtqueue默认配置为两个,双核间的收发virtqueue一一对应:
Master Remote
vqx[0]:tx_vq -> rx_vq
vqx[0]:rx_vq -> tx_vq
2. 发送数据
发送数据的API如下:
int32_t rpmsg_lite_send(struct rpmsg_lite_instance *rpmsg_lite_dev,
struct rpmsg_lite_endpoint *ept,
uint32_t dst,
char *data,
uint32_t size,
uintptr_t timeout)
参数 data 是待传输的数据,后续步骤为:
- 从共享内存的发送virtqueue(
vq_tx_master)申请缓冲区。
buffer = rpmsg_lite_dev->vq_ops->vq_tx_alloc(rpmsg_lite_dev->tvq, &buff_len, &idx);
- 将共享内存地址转换为
rpmsg_std_msg 结构,并拷贝实际数据。
rpmsg_msg = (struct rpmsg_std_msg *)buffer;
env_memcpy(rpmsg_msg->data, data, size);
- 将申请到的缓冲区标记为可用,并加入
vq_ring.avail 队列,这常用于缓存场景下的多核通信。
/* Enqueue buffer on virtqueue. */
rpmsg_lite_dev->vq_ops->vq_tx(rpmsg_lite_dev->tvq, buffer, buff_len, idx);
至此,数据已放入virtqueue。接着需要发送一个通知给对端核心,触发其从共享队列中读取数据。调用链如下:
rpmsg_lite_send
-> rpmsg_lite_format_message
-> virtqueue_kick
-> vq_ring_notify_host
-> virtqueue_notify
-> platform_notify
3. 接收数据
在RTOS下,接收数据使用 rpmsg_queue_recv API:
int32_t rpmsg_queue_recv(struct rpmsg_lite_instance *rpmsg_lite_dev,
rpmsg_queue_handle q,
uint32_t *src,
char *data,
uint32_t maxlen,
uint32_t *len,
uintptr_t timeout)
其内部本质是从消息队列中取出数据。如上图红色箭头所示,当接收到来自另一个核心的IPC中断通知后,会触发 env_isr:
void g_ipc0_callback(ipc_callback_args_t *p_args)
{
rt_interrupt_enter();
/* Check for message received event */
if (IPC_EVENT_MESSAGE_RECEIVED & p_args->event)
{
env_isr(p_args->message);
}
rt_interrupt_leave();
}
此处会通知本核心的virtqueue,并触发对应的回调函数(接收回调为 rpmsg_lite_rx_callback)。在该回调函数中,会执行 ept->rx_cb,即初始化rpmsg-lite时绑定的端点回调函数 rpmsg_queue_rx_cb:
my_ept = rpmsg_lite_create_ept(my_rpmsg, LOCAL_EPT_ADDR, rpmsg_queue_rx_cb, my_queue);
int32_t rpmsg_queue_rx_cb(void *payload, uint32_t payload_len, uint32_t src, void *priv)
{
rpmsg_queue_rx_cb_data_t msg;
RL_ASSERT(priv != RL_NULL);
msg.data = payload;
msg.len = payload_len;
msg.src = src;
/* if message is successfully added into queue then hold rpmsg buffer */
if (0 != env_put_queue(priv, &msg, 0))
{
/* hold the rx buffer */
return RL_HOLD;
}
return RL_RELEASE;
}
随后,rpmsg-lite的消息队列线程被激活,进而处理接收到的消息。
rpmsg-lite通信流程(MCMGR版)
MCMGR组件开源地址:https://github.com/nxp-mcuxpresso/mcux-mcmgr
1. MCMGR双核间同步
// M85核(主核)
// 首先注册一个事件回调函数,实时侦测来自次核的通知(APP_RPMSG_READY_EVENT_DATA 和 APP_RPMSG_EP_READY_EVENT_DATA 事件)
static void RPMsgRemoteReadyEventHandler(uint16_t eventData, void *context)
{
uint16_t *data = (uint16_t *)context;
*data = eventData;
}
/* Register the application event before starting the secondary core */
(void)MCMGR_RegisterEvent(kMCMGR_RemoteApplicationEvent, RPMsgRemoteReadyEventHandler,
(void *)&RPMsgRemoteReadyEventData);
while (APP_RPMSG_READY_EVENT_DATA != RPMsgRemoteReadyEventData)
{
};
/* Wait until the secondary core application signals the rpmsg remote endpoint has been created. */
while (APP_RPMSG_EP_READY_EVENT_DATA != RPMsgRemoteReadyEventData)
{
}
// M33核(次核,裸机)
// 通过MCMGR_TriggerEvent触发IPC中断,发送通知给主核
/* Signal the other core we are ready by triggering the event and passing the APP_RPMSG_READY_EVENT_DATA */
(void)MCMGR_TriggerEvent(kMCMGR_RemoteApplicationEvent, APP_RPMSG_READY_EVENT_DATA);
/* Signal the other core the endpoint has been created by triggering the event and passing the
* APP_RPMSG_READY_EP_EVENT_DATA */
(void)MCMGR_TriggerEvent(kMCMGR_RemoteApplicationEvent, APP_RPMSG_EP_READY_EVENT_DATA);
2. MCMGR双核间通信
<1> 接收
接收逻辑的关键在于端点创建时绑定的回调函数 rx_cb:
struct rpmsg_lite_endpoint *rpmsg_lite_create_ept(struct rpmsg_lite_instance *rpmsg_lite_dev,
uint32_t addr,
rl_ept_rx_cb_t rx_cb,
void *rx_cb_data,
struct rpmsg_lite_ept_static_context *ept_context)
MCMGR多核管理器统一注册事件,并基于底层 platform_notify 机制触发 env_isr,进而执行 mcmgr_ipc_callback 来调用接收回调 rx_cb。这个触发动作由对端核心的 rpmsg_lite_send 发起(通过 virtqueue_kick)。
// 示例:用户定义的接收回调
static int32_t my_ept_read_cb(void *payload, uint32_t payload_len, uint32_t src, void *priv)
{
int32_t *has_received = priv;
if (payload_len <= sizeof(THE_MESSAGE))
{
(void)rt_memcpy((void *)&msg, payload, payload_len);
*has_received = 1;
}
(void)rt_kprintf("Primary core received a msg\r\n");
(void)rt_kprintf("Message: Size=%x, DATA = %i\r\n", payload_len, msg.DATA);
return RL_RELEASE;
}
// 创建端点时绑定回调
my_ept = rpmsg_lite_create_ept(my_rpmsg, LOCAL_EPT_ADDR, my_ept_read_cb, (void *)&has_received, &my_ept_context);
因此,接收流程为:
mcmgr_ipc_callback
-> mcmgr_event_handler
-> env_isr
-> virtqueue_notification
-> rpmsg_lite_rx_callback(vq->callback_fc)
-> my_ept_read_cb(ept->rx_cb)
<2> 发送
发送逻辑的关键API仍是 rpmsg_lite_send:
int32_t rpmsg_lite_send(struct rpmsg_lite_instance *rpmsg_lite_dev,
struct rpmsg_lite_endpoint *ept,
uint32_t dst,
char *data,
uint32_t size,
uintptr_t timeout)
流程如下:
(void)rpmsg_lite_send(my_rpmsg, my_ept, REMOTE_EPT_ADDR, (char *)&msg, sizeof(THE_MESSAGE), RL_DONT_BLOCK);
内部会调用 rpmsg_lite_format_message,核心步骤与RTOS版类似:
- 从共享内存的发送virtqueue申请缓冲区。
- 将数据拷贝至共享内存的
rpmsg_std_msg 结构中。
- 将缓冲区加入virtqueue的可用环(avail ring)。
最后,执行通知机制:
virtqueue_kick
-> vq_ring_notify_host(vq->notify_fc(vq))
-> platform_nofity
-> (void)MCMGR_TriggerEventForce(kMCMGR_RemoteRPMsgEvent, (uint16_t)RL_GET_Q_ID(vector_id));
-> trig ipc callback
-> mcmgr_ipc_callback
至此,便完成了一次通过MCMGR管理的核间消息发送。通过结合RPMsg-Lite的高效通信与MCMGR的核间状态管理,可以在类似 RT-Thread这样的实时操作系统 与裸机程序之间构建稳定、高效的异构多核通信链路。