Skip to content

Commit

Permalink
openamp: change rx/tx buffer hold flag to count
Browse files Browse the repository at this point in the history
This commit fix one issue, before this commit, if rpmsg_hold_rx_buffer()
is called in the endpoint callback to hold current received buffer and
call rpmsg_release_rx_buffer() to release this buffer immediately,
this buffer will be returned to the virtqueue twice:
1. the first time is in rpmsg_virtio_release_rx_buffer()
2. and the second time is in rpmsg_virtio_return_buffer() after ept->cb()
Follow shows this process:

rpmsg_virtio_rx_callback()
  - get rp_hdr
  - ept->cb(data = RPMSG_LOCATE_DATA(rp_hdr))
    - rpsmg_hold_rx_buffer(data)
    - rpmsg_release_rx_buffer(data) return buffer to virtqueue
  - rpmsg_virtio_return_buffer(data) return same buffer to virtqueue again

So this commit changes the HELD flag in rp_hdr to count to fix this issue
and also supports hold/release rx buffer recursively.

Signed-off-by: Guiding Li <[email protected]>
Signed-off-by: Yin Tao <[email protected]>
Signed-off-by: Bowen Wang <[email protected]>
  • Loading branch information
GUIDINGLI authored and CV-Bowen committed Oct 30, 2023
1 parent 4e13c09 commit 7f670be
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 33 deletions.
3 changes: 2 additions & 1 deletion lib/rpmsg/rpmsg_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ extern "C" {
#define RPMSG_ASSERT(_exp, _msg) metal_assert(_exp)
#endif

#define RPMSG_BUF_HELD (1U << 31) /* Flag to suggest to hold the buffer */
/* Mask to get the rpmsg buffer held counter from rpmsg_hdr reserved field */
#define RPMSG_BUF_HELD_MASK ((uint32_t)0xFFFF << RPMSG_BUF_HELD_SHIFT)

#define RPMSG_LOCATE_HDR(p) \
((struct rpmsg_hdr *)((unsigned char *)(p) - sizeof(struct rpmsg_hdr)))
Expand Down
103 changes: 71 additions & 32 deletions lib/rpmsg/rpmsg_virtio.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@
/* Time to wait - In multiple of 1 msecs. */
#define RPMSG_TICKS_PER_INTERVAL 1000

/*
* Get the buffer held counter value.
* If 0 the buffer can be released
*/
#define RPMSG_BUF_HELD_COUNTER(rp_hdr) \
(((rp_hdr)->reserved & RPMSG_BUF_HELD_MASK) >> RPMSG_BUF_HELD_SHIFT)

/* Increase buffer held counter */
#define RPMSG_BUF_HELD_INC(rp_hdr) \
((rp_hdr)->reserved += 1 << RPMSG_BUF_HELD_SHIFT)

/* Decrease buffer held counter */
#define RPMSG_BUF_HELD_DEC(rp_hdr) \
((rp_hdr)->reserved -= 1 << RPMSG_BUF_HELD_SHIFT)

/**
* struct vbuff_reclaimer_t - vring buffer recycler
*
Expand Down Expand Up @@ -315,35 +330,52 @@ static int _rpmsg_virtio_get_buffer_size(struct rpmsg_virtio_device *rvdev)

static void rpmsg_virtio_hold_rx_buffer(struct rpmsg_device *rdev, void *rxbuf)
{
struct rpmsg_hdr *rp_hdr;
metal_mutex_acquire(&rdev->lock);
RPMSG_BUF_HELD_INC(RPMSG_LOCATE_HDR(rxbuf));
metal_mutex_release(&rdev->lock);
}

(void)rdev;
static bool rpmsg_virtio_release_rx_buffer_nolock(struct rpmsg_device *rdev,
struct rpmsg_hdr *rp_hdr)
{
struct rpmsg_virtio_device *rvdev;
uint16_t idx;
uint32_t len;

rvdev = metal_container_of(rdev, struct rpmsg_virtio_device, rdev);

rp_hdr = RPMSG_LOCATE_HDR(rxbuf);
/* Check the held counter first */
if (RPMSG_BUF_HELD_COUNTER(rp_hdr) <= 0)
return false;

/* Set held status to keep buffer */
rp_hdr->reserved |= RPMSG_BUF_HELD;
/* Decrease the held counter */
RPMSG_BUF_HELD_DEC(rp_hdr);

/* Check whether to release the Rx buffer */
if (RPMSG_BUF_HELD_COUNTER(rp_hdr) > 0)
return false;

/* The reserved field contains buffer index */
idx = (uint16_t)(rp_hdr->reserved & ~RPMSG_BUF_HELD_MASK);
/* Return buffer on virtqueue. */
len = virtqueue_get_buffer_length(rvdev->rvq, idx);
rpmsg_virtio_return_buffer(rvdev, rp_hdr, len, idx);

return true;
}

static void rpmsg_virtio_release_rx_buffer(struct rpmsg_device *rdev,
void *rxbuf)
{
struct rpmsg_virtio_device *rvdev;
struct rpmsg_hdr *rp_hdr;
uint16_t idx;
uint32_t len;

rvdev = metal_container_of(rdev, struct rpmsg_virtio_device, rdev);
rp_hdr = RPMSG_LOCATE_HDR(rxbuf);
/* The reserved field contains buffer index */
idx = (uint16_t)(rp_hdr->reserved & ~RPMSG_BUF_HELD);

metal_mutex_acquire(&rdev->lock);
/* Return buffer on virtqueue. */
len = virtqueue_get_buffer_length(rvdev->rvq, idx);
rpmsg_virtio_return_buffer(rvdev, rp_hdr, len, idx);
/* Tell peer we return some rx buffers */
virtqueue_kick(rvdev->rvq);
if (rpmsg_virtio_release_rx_buffer_nolock(rdev, RPMSG_LOCATE_HDR(rxbuf))) {
/* Tell peer we return some rx buffers */
virtqueue_kick(rvdev->rvq);
}
metal_mutex_release(&rdev->lock);
}

Expand Down Expand Up @@ -386,6 +418,9 @@ static void *rpmsg_virtio_get_tx_payload_buffer(struct rpmsg_device *rdev,
/* Store the index into the reserved field to be used when sending */
rp_hdr->reserved = idx;

/* Increase the held counter to hold this Tx buffer */
RPMSG_BUF_HELD_INC(rp_hdr);

/* Actual data buffer size is vring buffer size minus header length */
*len -= sizeof(struct rpmsg_hdr);
return RPMSG_LOCATE_DATA(rp_hdr);
Expand Down Expand Up @@ -449,20 +484,29 @@ static int rpmsg_virtio_release_tx_buffer(struct rpmsg_device *rdev, void *txbuf
struct rpmsg_hdr *rp_hdr = RPMSG_LOCATE_HDR(txbuf);
void *vbuff = rp_hdr; /* only used to avoid warning on the cast of a packed structure */
struct vbuff_reclaimer_t *r_desc = (struct vbuff_reclaimer_t *)vbuff;
uint16_t idx;

/*
* Reuse the RPMsg buffer to temporary store the vbuff_reclaimer_t structure.
* Stores the index locally before overwriting the RPMsg header.
*/
idx = rp_hdr->reserved;

rvdev = metal_container_of(rdev, struct rpmsg_virtio_device, rdev);

metal_mutex_acquire(&rdev->lock);

r_desc->idx = idx;
metal_list_add_tail(&rvdev->reclaimer, &r_desc->node);
/* Check the held counter first */
if (RPMSG_BUF_HELD_COUNTER(rp_hdr) <= 0) {
metal_mutex_release(&rdev->lock);
return RPMSG_ERR_PARAM;
}

/* Decrease the held counter */
RPMSG_BUF_HELD_DEC(rp_hdr);

/* Check whether to release the Tx buffer */
if (!RPMSG_BUF_HELD_COUNTER(rp_hdr)) {
/*
* Reuse the RPMsg buffer to temporary store the vbuff_reclaimer_t structure.
* Stores the index locally before overwriting the RPMsg header.
*/
r_desc->idx = (uint16_t)(rp_hdr->reserved & ~RPMSG_BUF_HELD_MASK);
metal_list_add_tail(&rvdev->reclaimer, &r_desc->node);
}

metal_mutex_release(&rdev->lock);

Expand Down Expand Up @@ -558,6 +602,7 @@ static void rpmsg_virtio_rx_callback(struct virtqueue *vq)
/* Get the channel node from the remote device channels list. */
metal_mutex_acquire(&rdev->lock);
ept = rpmsg_get_ept_from_addr(rdev, rp_hdr->dst);
RPMSG_BUF_HELD_INC(rp_hdr);
metal_mutex_release(&rdev->lock);

if (ept) {
Expand All @@ -576,13 +621,7 @@ static void rpmsg_virtio_rx_callback(struct virtqueue *vq)
}

metal_mutex_acquire(&rdev->lock);

/* Check whether callback wants to hold buffer */
if (!(rp_hdr->reserved & RPMSG_BUF_HELD)) {
/* No, return used buffers. */
rpmsg_virtio_return_buffer(rvdev, rp_hdr, len, idx);
}

rpmsg_virtio_release_rx_buffer_nolock(rdev, rp_hdr);
rp_hdr = rpmsg_virtio_get_rx_buffer(rvdev, &len, &idx);
if (!rp_hdr) {
/* tell peer we return some rx buffer */
Expand Down

0 comments on commit 7f670be

Please sign in to comment.