Skip to content

Commit

Permalink
openamp: change rx/tx buffer hold flag to count
Browse files Browse the repository at this point in the history
This commit fix one issue, before this commit, if rpmsg_hold_rx_buffer()
is called in the endpoint callback to hold current received buffer and
call rpmsg_release_rx_buffer() to release this buffer immediately,
this buffer will be returned to the virtqueue twice:
1. the first time is in rpmsg_virtio_release_rx_buffer()
2. and the second time is in rpmsg_virtio_return_buffer() after ept->cb()
Follow shows this process:

rpmsg_virtio_rx_callback()
  - get rp_hdr
  - ept->cb(data = RPMSG_LOCATE_DATA(rp_hdr))
    - rpsmg_hold_rx_buffer(data)
    - rpmsg_release_rx_buffer(data) return buffer to virtqueue
  - rpmsg_virtio_return_buffer(data) return same buffer to virtqueue again

So this commit changes the HELD flag in rp_hdr to count to fix this issue
and also supports hold/release rx buffer recursively.

Signed-off-by: Guiding Li <[email protected]>
Signed-off-by: Yin Tao <[email protected]>
Signed-off-by: Bowen Wang <[email protected]>
  • Loading branch information
GUIDINGLI authored and CV-Bowen committed Dec 4, 2023
1 parent b33183f commit b7bd536
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 30 deletions.
4 changes: 3 additions & 1 deletion lib/rpmsg/rpmsg_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ extern "C" {
#define RPMSG_ASSERT(_exp, _msg) metal_assert(_exp)
#endif

#define RPMSG_BUF_HELD (1U << 31) /* Flag to suggest to hold the buffer */
/* Mask to get the rpmsg buffer held counter from rpmsg_hdr reserved field */
#define RPMSG_BUF_HELD_SHIFT 16
#define RPMSG_BUF_HELD_MASK (0xFFFFU << RPMSG_BUF_HELD_SHIFT)

#define RPMSG_LOCATE_HDR(p) \
((struct rpmsg_hdr *)((unsigned char *)(p) - sizeof(struct rpmsg_hdr)))
Expand Down
106 changes: 77 additions & 29 deletions lib/rpmsg/rpmsg_virtio.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@
/* Time to wait - In multiple of 1 msecs. */
#define RPMSG_TICKS_PER_INTERVAL 1000

/*
* Get the buffer held counter value.
* If 0 the buffer can be released
*/
#define RPMSG_BUF_HELD_COUNTER(rp_hdr) \
(((rp_hdr)->reserved & RPMSG_BUF_HELD_MASK) >> RPMSG_BUF_HELD_SHIFT)

/* Increase buffer held counter */
#define RPMSG_BUF_HELD_INC(rp_hdr) \
((rp_hdr)->reserved += 1 << RPMSG_BUF_HELD_SHIFT)

/* Decrease buffer held counter */
#define RPMSG_BUF_HELD_DEC(rp_hdr) \
((rp_hdr)->reserved -= 1 << RPMSG_BUF_HELD_SHIFT)

/**
* struct vbuff_reclaimer_t - vring buffer recycler
*
Expand Down Expand Up @@ -269,37 +284,70 @@ static int rpmsg_virtio_wait_remote_ready(struct rpmsg_virtio_device *rvdev)
}
#endif /*!VIRTIO_DRIVER_ONLY*/

/**
* @internal
*
* @brief Check whether rpmsg buffer need to be released or not
*
* @param rp_hdr Pointer to rpmsg buffer header
*
* @return true indicates this buffer need to be released
*/
static bool rpmsg_virtio_buf_held_dec_test(struct rpmsg_hdr *rp_hdr)
{
/* Check the held counter first */
if (RPMSG_BUF_HELD_COUNTER(rp_hdr) <= 0) {
metal_err("Buffer held counter must be larger than 0 before decreased\r\n");
return false;
}

/* Decrease the held counter */
RPMSG_BUF_HELD_DEC(rp_hdr);

/* Check whether to release the Rx buffer */
if (RPMSG_BUF_HELD_COUNTER(rp_hdr) > 0)
return false;

return true;
}

static void rpmsg_virtio_hold_rx_buffer(struct rpmsg_device *rdev, void *rxbuf)
{
struct rpmsg_hdr *rp_hdr;
metal_mutex_acquire(&rdev->lock);
RPMSG_BUF_HELD_INC(RPMSG_LOCATE_HDR(rxbuf));
metal_mutex_release(&rdev->lock);
}

(void)rdev;
static bool rpmsg_virtio_release_rx_buffer_nolock(struct rpmsg_virtio_device *rvdev,
struct rpmsg_hdr *rp_hdr)
{
uint16_t idx;
uint32_t len;

rp_hdr = RPMSG_LOCATE_HDR(rxbuf);
/* The reserved field contains buffer index */
idx = (uint16_t)(rp_hdr->reserved & ~RPMSG_BUF_HELD_MASK);
/* Return buffer on virtqueue. */
len = virtqueue_get_buffer_length(rvdev->rvq, idx);
rpmsg_virtio_return_buffer(rvdev, rp_hdr, len, idx);

/* Set held status to keep buffer */
rp_hdr->reserved |= RPMSG_BUF_HELD;
return true;
}

static void rpmsg_virtio_release_rx_buffer(struct rpmsg_device *rdev,
void *rxbuf)
{
struct rpmsg_virtio_device *rvdev;
struct rpmsg_hdr *rp_hdr;
uint16_t idx;
uint32_t len;

rvdev = metal_container_of(rdev, struct rpmsg_virtio_device, rdev);
rp_hdr = RPMSG_LOCATE_HDR(rxbuf);
/* The reserved field contains buffer index */
idx = (uint16_t)(rp_hdr->reserved & ~RPMSG_BUF_HELD);

metal_mutex_acquire(&rdev->lock);
/* Return buffer on virtqueue. */
len = virtqueue_get_buffer_length(rvdev->rvq, idx);
rpmsg_virtio_return_buffer(rvdev, rp_hdr, len, idx);
/* Tell peer we return some rx buffers */
virtqueue_kick(rvdev->rvq);
if (rpmsg_virtio_buf_held_dec_test(rp_hdr)) {
rpmsg_virtio_release_rx_buffer_nolock(rvdev, rp_hdr);
/* Tell peer we return some rx buffers */
virtqueue_kick(rvdev->rvq);
}
metal_mutex_release(&rdev->lock);
}

Expand Down Expand Up @@ -342,6 +390,9 @@ static void *rpmsg_virtio_get_tx_payload_buffer(struct rpmsg_device *rdev,
/* Store the index into the reserved field to be used when sending */
rp_hdr->reserved = idx;

/* Increase the held counter to hold this Tx buffer */
RPMSG_BUF_HELD_INC(rp_hdr);

/* Actual data buffer size is vring buffer size minus header length */
*len -= sizeof(struct rpmsg_hdr);
return RPMSG_LOCATE_DATA(rp_hdr);
Expand Down Expand Up @@ -405,20 +456,20 @@ static int rpmsg_virtio_release_tx_buffer(struct rpmsg_device *rdev, void *txbuf
struct rpmsg_hdr *rp_hdr = RPMSG_LOCATE_HDR(txbuf);
void *vbuff = rp_hdr; /* only used to avoid warning on the cast of a packed structure */
struct vbuff_reclaimer_t *r_desc = (struct vbuff_reclaimer_t *)vbuff;
uint16_t idx;

/*
* Reuse the RPMsg buffer to temporary store the vbuff_reclaimer_t structure.
* Stores the index locally before overwriting the RPMsg header.
*/
idx = rp_hdr->reserved;

rvdev = metal_container_of(rdev, struct rpmsg_virtio_device, rdev);

metal_mutex_acquire(&rdev->lock);

r_desc->idx = idx;
metal_list_add_tail(&rvdev->reclaimer, &r_desc->node);
/* Check whether to release the Tx buffer */
if (rpmsg_virtio_buf_held_dec_test(rp_hdr)) {
/*
* Reuse the RPMsg buffer to temporary store the vbuff_reclaimer_t structure.
* Stores the index locally before overwriting the RPMsg header.
*/
r_desc->idx = (uint16_t)(rp_hdr->reserved & ~RPMSG_BUF_HELD_MASK);
metal_list_add_tail(&rvdev->reclaimer, &r_desc->node);
}

metal_mutex_release(&rdev->lock);

Expand Down Expand Up @@ -515,6 +566,7 @@ static void rpmsg_virtio_rx_callback(struct virtqueue *vq)
metal_mutex_acquire(&rdev->lock);
ept = rpmsg_get_ept_from_addr(rdev, rp_hdr->dst);
rpmsg_ept_incref(ept);
RPMSG_BUF_HELD_INC(rp_hdr);
metal_mutex_release(&rdev->lock);

if (ept) {
Expand All @@ -534,12 +586,8 @@ static void rpmsg_virtio_rx_callback(struct virtqueue *vq)

metal_mutex_acquire(&rdev->lock);
rpmsg_ept_decref(ept);

/* Check whether callback wants to hold buffer */
if (!(rp_hdr->reserved & RPMSG_BUF_HELD)) {
/* No, return used buffers. */
rpmsg_virtio_return_buffer(rvdev, rp_hdr, len, idx);
}
if (rpmsg_virtio_buf_held_dec_test(rp_hdr))
rpmsg_virtio_release_rx_buffer_nolock(rvdev, rp_hdr);

rp_hdr = rpmsg_virtio_get_rx_buffer(rvdev, &len, &idx);
if (!rp_hdr) {
Expand Down

0 comments on commit b7bd536

Please sign in to comment.