Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2 pkpool #20

Open
wants to merge 7 commits into
base: v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 30 additions & 19 deletions include/lc/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct dequeue;
extern "C" {
#endif

LC_INLINE int lc_worker_id()
LC_INLINE int lc_worker_id(void)
{
if (unlikely(lcg_core_id == -1)) {
lcg_core_id = sched_getcpu();
Expand Down Expand Up @@ -90,46 +90,57 @@ LC_INLINE int32_t lc_pool_get_local(struct lc_pool* pool)
return pid;
}

LC_INLINE void* lc_pool_get_slow(struct lc_pool* pool) {
LC_INLINE int32_t lc_pool_get_steal_id(struct lc_pool* pool, int32_t pid)
{
danghvu marked this conversation as resolved.
Show resolved Hide resolved
int32_t npools = pool->npools;
if (npools == 1)
return pid; /* if only one pool, no one else to steal from */
int32_t r = rand() % (npools - 1);
return (r + pid + 1) % npools;
}

LC_INLINE void* lc_pool_steal(struct lc_pool* pool, int32_t pid)
{
void* elm = NULL;
while (!elm) {
int steal = rand() % (pool->npools);
if (likely(pool->lpools[steal] != NULL))
elm = dq_pop_bot(pool->lpools[steal]);
}
int32_t target = lc_pool_get_steal_id(pool, pid);
if (target != pid && likely(pool->lpools[target] != NULL))
elm = dq_pop_bot(pool->lpools[target]);
return elm;
}

LC_INLINE void lc_pool_put(struct lc_pool* pool, void* elm) {
LC_INLINE void lc_pool_put(struct lc_pool* pool, void* elm)
{
int32_t pid = lc_pool_get_local(pool);
struct dequeue* lpool = pool->lpools[pid];
dq_push_top(lpool, elm);
}

LC_INLINE void lc_pool_put_to(struct lc_pool* pool, void* elm, int32_t pid) {
LC_INLINE void lc_pool_put_to(struct lc_pool* pool, void* elm, int32_t pid)
{
struct dequeue* lpool = pool->lpools[pid];
dq_push_top(lpool, elm);
}

LC_INLINE void* lc_pool_get(struct lc_pool* pool) {
LC_INLINE void* lc_pool_get_nb(struct lc_pool* pool)
{
int32_t pid = lc_pool_get_local(pool);
struct dequeue* lpool = pool->lpools[pid];
void *elm = NULL;
elm = dq_pop_top(lpool);
void* elm = dq_pop_top(lpool);
if (elm == NULL)
elm = lc_pool_get_slow(pool);
elm = lc_pool_steal(pool, pid);
return elm;
}

LC_INLINE void* lc_pool_get_nb(struct lc_pool* pool) {
LC_INLINE void* lc_pool_get(struct lc_pool* pool)
{
int32_t pid = lc_pool_get_local(pool);
struct dequeue* lpool = pool->lpools[pid];
void* elm = NULL;
elm = dq_pop_top(lpool);
if (elm == NULL) {
int steal = rand() % (pool->npools);
if (likely(pool->lpools[steal] != NULL))
elm = dq_pop_bot(pool->lpools[steal]);
while (elm == NULL) {
/* must try self every iteration since we never steal from self */
elm = dq_pop_top(lpool);
if (elm == NULL)
elm = lc_pool_steal(pool, pid);
}
return elm;
}
Expand Down
2 changes: 1 addition & 1 deletion src/cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ lc_status lc_cq_pop(lc_ep ep, lc_req** req_ptr)
lc_status lc_cq_reqfree(lc_ep ep, lc_req* req)
{
lc_packet* packet = (lc_packet*) req->parent;
lc_pool_put(ep->pkpool, packet);
lci_pk_free_data(ep, packet);
return LC_OK;
}
3 changes: 3 additions & 0 deletions src/include/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@
#define LC_SERVER_NUM_PKTS 1024
#define LC_CACHE_LINE 64

#define LC_PKT_RET_MED_SIZE 1024
// #define LC_PKT_RET_LONG

#endif
8 changes: 4 additions & 4 deletions src/include/proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ static inline void lci_ce_queue(lc_ep ep, lc_packet* p)
static inline void lci_handle_rtr(struct lci_ep* ep, lc_packet* p)
{
dprintf("Recv RTR %p\n", p);
lci_pk_init(ep, -1, LC_PROTO_LONG, p);
lci_pk_init(ep, p->context.poolid, LC_PROTO_LONG, p);
// dprintf("%d] rma %p --> %p %.4x via %d\n", lcg_rank, p->data.rts.src_addr, p->data.rtr.tgt_addr, crc32c((char*) p->data.rts.src_addr, p->data.rts.size), p->data.rtr.rkey);

lc_server_rma_rtr(ep->server, p->context.req->rhandle,
Expand All @@ -121,7 +121,7 @@ static inline void lci_handle_rtr(struct lci_ep* ep, lc_packet* p)
static inline void lci_handle_rts(struct lci_ep* ep, lc_packet* p)
{
dprintf("Recv RTS: %p\n", p);
lci_pk_init(ep, -1, LC_PROTO_RTR, p);
lci_pk_init(ep, p->context.poolid, LC_PROTO_RTR, p);
lc_proto proto = MAKE_PROTO(ep->gid, LC_PROTO_RTR, 0);
lci_prepare_rtr(ep, p->context.req->buffer, p->data.rts.size, p);
lc_server_sendm(ep->server, p->context.req->rhandle,
Expand Down Expand Up @@ -264,10 +264,10 @@ static inline void lci_serve_send(lc_packet* p)
} else if (proto == LC_PROTO_LONG) {
dprintf("SENT LONG: %p\n", p);
p->data.rts.cb((void*) p->data.rts.ce);
lci_pk_free(ep, p);
lci_pk_free_data(ep, p);
} else if (proto == LC_PROTO_RTS) {
dprintf("SENT RTS: %p\n", p);
lci_pk_free(ep, p);
lci_pk_free_data(ep, p);
} else {
dprintf("SENT UNKNOWN: %p\n", p);
lci_pk_free_data(ep, p);
Expand Down
6 changes: 5 additions & 1 deletion src/include/server/server_psm2.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,10 @@ static inline int lc_server_progress(lc_server* s)
lci_serve_recv_rdma(p, status.msg_tag.tag1);
} else {
p->context.req = &p->context.req_s;
lc_pool_put(s->pkpool, p);
if (p->context.poolid != -1)
lc_pool_put_to(s->pkpool, p, p->context.poolid);
else
lc_pool_put(s->pkpool, p);
}
} else if (ctx & PSM_SEND) {
lc_packet* p = (lc_packet*) (ctx ^ PSM_SEND);
Expand All @@ -318,6 +321,7 @@ static inline void lc_server_post_recv(lc_server* s, lc_packet* p)
}

psm2_mq_tag_t rtag = PSM_TAG_TRECV_DATA();
p->context.poolid = lc_pool_get_local(s->pkpool);

PSM_SAFECALL(psm2_mq_irecv2(
s->mq, PSM2_MQ_ANY_ADDR, &rtag, /* message tag */
Expand Down
13 changes: 10 additions & 3 deletions src/long.c
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
#include "lc.h"
#include "config.h"

#include "lc_priv.h"
#include "lc/pool.h"

#ifdef LC_PKT_RET_LONG
#define LC_LONG_POOL_ID(ep) (lc_pool_get_local(ep->pkpool))
#else
#define LC_LONG_POOL_ID(ep) (-1)
#endif

lc_status lc_sendl(void* src, size_t size, int rank, int tag, lc_ep ep,
lc_send_cb cb, void* ce)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, -1, LC_PROTO_RTS, p);
lci_pk_init(ep, LC_LONG_POOL_ID(ep), LC_PROTO_RTS, p);
struct lci_rep* rep = &(ep->rep[rank]);
lci_prepare_rts(src, size, cb, ce, p);
lc_server_sendm(ep->server, rep->handle,
Expand All @@ -20,7 +27,7 @@ lc_status lc_putl(void* src, size_t size, int rank, uintptr_t addr,
lc_ep ep, lc_send_cb cb, void* ce)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, -1, LC_PROTO_LONG, p);
lci_pk_init(ep, LC_LONG_POOL_ID(ep), LC_PROTO_LONG, p);
p->data.rts.cb = cb;
p->data.rts.ce = (uintptr_t) ce;

Expand All @@ -34,7 +41,7 @@ lc_status lc_putls(void* src, size_t size, int rank, uintptr_t addr, int meta,
lc_ep ep, lc_send_cb cb, void* ce)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, -1, LC_PROTO_LONG, p);
lci_pk_init(ep, LC_LONG_POOL_ID(ep), LC_PROTO_LONG, p);
p->data.rts.cb = cb;
p->data.rts.ce = (uintptr_t) ce;
struct lci_rep* rep = &(ep->rep[rank]);
Expand Down
7 changes: 4 additions & 3 deletions src/medium.c
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#include "lc.h"
#include "config.h"

#include "lc_priv.h"
#include "lc/pool.h"

lc_status lc_sendm(void* src, size_t size, int rank, int tag, lc_ep ep)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, (size > 1024) ? lc_pool_get_local(ep->pkpool) : -1,
lci_pk_init(ep, (size > LC_PKT_RET_MED_SIZE) ? lc_pool_get_local(ep->pkpool) : -1,
LC_PROTO_DATA, p);
struct lci_rep* rep = &(ep->rep[rank]);
memcpy(p->data.buffer, src, size);
Expand All @@ -18,7 +19,7 @@ lc_status lc_sendm(void* src, size_t size, int rank, int tag, lc_ep ep)
lc_status lc_putm(void* src, size_t size, int rank, uintptr_t addr, lc_ep ep)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, (size > 1024) ? lc_pool_get_local(ep->pkpool) : -1,
lci_pk_init(ep, (size > LC_PKT_RET_MED_SIZE) ? lc_pool_get_local(ep->pkpool) : -1,
LC_PROTO_DATA, p);
struct lci_rep* rep = &(ep->rep[rank]);
memcpy(&p->data, src, size);
Expand All @@ -30,7 +31,7 @@ lc_status lc_putm(void* src, size_t size, int rank, uintptr_t addr, lc_ep ep)
lc_status lc_putms(void* src, size_t size, int rank, uintptr_t addr, int meta, lc_ep ep)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, (size > 1024) ? lc_pool_get_local(ep->pkpool) : -1,
lci_pk_init(ep, (size > LC_PKT_RET_MED_SIZE) ? lc_pool_get_local(ep->pkpool) : -1,
LC_PROTO_DATA, p);
struct lci_rep* rep = &(ep->rep[rank]);
memcpy(&p->data, src, size);
Expand Down