Skip to content

Commit

Permalink
pkpool: return packets to allocating pool
Browse files Browse the repository at this point in the history
Populate p->context.poolid when allocating a packet
and use lc_pool_put_to when returning it
so that packets don't get stuck in the same pool.
This reduces the number of retries needed,
but may have other effects - needs testing.
  • Loading branch information
omor1 committed Jun 5, 2020
1 parent 461b6ed commit 0af5316
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ lc_status lc_cq_pop(lc_ep ep, lc_req** req_ptr)
lc_status lc_cq_reqfree(lc_ep ep, lc_req* req)
{
lc_packet* packet = (lc_packet*) req->parent;
lc_pool_put(ep->pkpool, packet);
lci_pk_free_data(ep, packet);
return LC_OK;
}
8 changes: 4 additions & 4 deletions src/include/proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ static inline void lci_ce_queue(lc_ep ep, lc_packet* p)
static inline void lci_handle_rtr(struct lci_ep* ep, lc_packet* p)
{
dprintf("Recv RTR %p\n", p);
lci_pk_init(ep, -1, LC_PROTO_LONG, p);
lci_pk_init(ep, p->context.poolid, LC_PROTO_LONG, p);
// dprintf("%d] rma %p --> %p %.4x via %d\n", lcg_rank, p->data.rts.src_addr, p->data.rtr.tgt_addr, crc32c((char*) p->data.rts.src_addr, p->data.rts.size), p->data.rtr.rkey);

lc_server_rma_rtr(ep->server, p->context.req->rhandle,
Expand All @@ -121,7 +121,7 @@ static inline void lci_handle_rtr(struct lci_ep* ep, lc_packet* p)
static inline void lci_handle_rts(struct lci_ep* ep, lc_packet* p)
{
dprintf("Recv RTS: %p\n", p);
lci_pk_init(ep, -1, LC_PROTO_RTR, p);
lci_pk_init(ep, p->context.poolid, LC_PROTO_RTR, p);
lc_proto proto = MAKE_PROTO(ep->gid, LC_PROTO_RTR, 0);
lci_prepare_rtr(ep, p->context.req->buffer, p->data.rts.size, p);
lc_server_sendm(ep->server, p->context.req->rhandle,
Expand Down Expand Up @@ -264,10 +264,10 @@ static inline void lci_serve_send(lc_packet* p)
} else if (proto == LC_PROTO_LONG) {
dprintf("SENT LONG: %p\n", p);
p->data.rts.cb((void*) p->data.rts.ce);
lci_pk_free(ep, p);
lci_pk_free_data(ep, p);
} else if (proto == LC_PROTO_RTS) {
dprintf("SENT RTS: %p\n", p);
lci_pk_free(ep, p);
lci_pk_free_data(ep, p);
} else {
dprintf("SENT UNKNOWN: %p\n", p);
lci_pk_free_data(ep, p);
Expand Down
6 changes: 5 additions & 1 deletion src/include/server/server_psm2.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,10 @@ static inline int lc_server_progress(lc_server* s)
lci_serve_recv_rdma(p, status.msg_tag.tag1);
} else {
p->context.req = &p->context.req_s;
lc_pool_put(s->pkpool, p);
if (p->context.poolid != -1)
lc_pool_put_to(s->pkpool, p, p->context.poolid);
else
lc_pool_put(s->pkpool, p);
}
} else if (ctx & PSM_SEND) {
lc_packet* p = (lc_packet*) (ctx ^ PSM_SEND);
Expand All @@ -318,6 +321,7 @@ static inline void lc_server_post_recv(lc_server* s, lc_packet* p)
}

psm2_mq_tag_t rtag = PSM_TAG_TRECV_DATA();
p->context.poolid = lc_pool_get_local(s->pkpool);

PSM_SAFECALL(psm2_mq_irecv2(
s->mq, PSM2_MQ_ANY_ADDR, &rtag, /* message tag */
Expand Down
6 changes: 3 additions & 3 deletions src/long.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ lc_status lc_sendl(void* src, size_t size, int rank, int tag, lc_ep ep,
lc_send_cb cb, void* ce)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, -1, LC_PROTO_RTS, p);
lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_RTS, p);
struct lci_rep* rep = &(ep->rep[rank]);
lci_prepare_rts(src, size, cb, ce, p);
lc_server_sendm(ep->server, rep->handle,
Expand All @@ -20,7 +20,7 @@ lc_status lc_putl(void* src, size_t size, int rank, uintptr_t addr,
lc_ep ep, lc_send_cb cb, void* ce)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, -1, LC_PROTO_LONG, p);
lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_LONG, p);
p->data.rts.cb = cb;
p->data.rts.ce = (uintptr_t) ce;

Expand All @@ -34,7 +34,7 @@ lc_status lc_putls(void* src, size_t size, int rank, uintptr_t addr, int meta,
lc_ep ep, lc_send_cb cb, void* ce)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, -1, LC_PROTO_LONG, p);
lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_LONG, p);
p->data.rts.cb = cb;
p->data.rts.ce = (uintptr_t) ce;
struct lci_rep* rep = &(ep->rep[rank]);
Expand Down
13 changes: 5 additions & 8 deletions src/medium.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
lc_status lc_sendm(void* src, size_t size, int rank, int tag, lc_ep ep)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, (size > 1024) ? lc_pool_get_local(ep->pkpool) : -1,
LC_PROTO_DATA, p);
lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_DATA, p);
struct lci_rep* rep = &(ep->rep[rank]);
memcpy(p->data.buffer, src, size);
lc_server_sendm(ep->server, rep->handle, size, p,
Expand All @@ -18,8 +17,7 @@ lc_status lc_sendm(void* src, size_t size, int rank, int tag, lc_ep ep)
lc_status lc_putm(void* src, size_t size, int rank, uintptr_t addr, lc_ep ep)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, (size > 1024) ? lc_pool_get_local(ep->pkpool) : -1,
LC_PROTO_DATA, p);
lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_DATA, p);
struct lci_rep* rep = &(ep->rep[rank]);
memcpy(&p->data, src, size);
lc_server_putm(ep->server, rep->handle, rep->base, (uint32_t) (addr - rep->base),
Expand All @@ -30,8 +28,7 @@ lc_status lc_putm(void* src, size_t size, int rank, uintptr_t addr, lc_ep ep)
lc_status lc_putms(void* src, size_t size, int rank, uintptr_t addr, int meta, lc_ep ep)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, (size > 1024) ? lc_pool_get_local(ep->pkpool) : -1,
LC_PROTO_DATA, p);
lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_DATA, p);
struct lci_rep* rep = &(ep->rep[rank]);
memcpy(&p->data, src, size);
lc_server_putms(ep->server, rep->handle, rep->base, (uint32_t) (addr - rep->base),
Expand All @@ -49,8 +46,8 @@ lc_status lc_recvm(void* src, size_t size, int rank, int tag, lc_ep ep,
lc_packet* p = (lc_packet*) value;
memcpy(src, p->data.buffer, p->context.req->size);
req->size = p->context.req->size;
lc_signal((void*) &req->sync);
lci_pk_free_data(ep, p);
p->context.req = req;
lci_ce_dispatch(ep, p, ep->cap);
}
return LC_OK;
}

0 comments on commit 0af5316

Please sign in to comment.