Skip to content

Commit

Permalink
add message overload warning
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudwu committed Oct 8, 2014
1 parent 535bbc7 commit 2f6bfe9
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 3 deletions.
6 changes: 6 additions & 0 deletions lualib/skynet.lua
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ end

local coroutine_pool = {}
local coroutine_yield = coroutine.yield
local coroutine_count = 0

local function co_create(f)
local co = table.remove(coroutine_pool)
Expand All @@ -109,6 +110,11 @@ local function co_create(f)
f(coroutine_yield())
end
end)
coroutine_count = coroutine_count + 1
if coroutine_count > 1024 then
skynet.error("May overload, create 1024 task")
coroutine_count = 0
end
else
coroutine.resume(co, f)
end
Expand Down
36 changes: 33 additions & 3 deletions skynet-src/skynet_mq.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// 1 means mq is in global mq , or the message is dispatching.

#define MQ_IN_GLOBAL 1
#define MQ_OVERLOAD 1024

struct message_queue {
uint32_t handle;
Expand All @@ -24,6 +25,8 @@ struct message_queue {
int lock;
int release;
int in_global;
int overload;
int overload_threshold;
struct skynet_message *queue;
struct message_queue *next;
};
Expand Down Expand Up @@ -116,6 +119,8 @@ skynet_mq_create(uint32_t handle) {
// If the service init success, skynet_context_new will call skynet_mq_force_push to push it to global queue.
q->in_global = MQ_IN_GLOBAL;
q->release = 0;
q->overload = 0;
q->overload_threshold = MQ_OVERLOAD;
q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap);
q->next = NULL;

Expand Down Expand Up @@ -150,17 +155,42 @@ skynet_mq_length(struct message_queue *q) {
return tail + cap - head;
}

int
skynet_mq_overload(struct message_queue *q) {
if (q->overload) {
int overload = q->overload;
q->overload = 0;
return overload;
}
return 0;
}

int
skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {
int ret = 1;
LOCK(q)

if (q->head != q->tail) {
*message = q->queue[q->head];
*message = q->queue[q->head++];
ret = 0;
if ( ++ q->head >= q->cap) {
q->head = 0;
int head = q->head;
int tail = q->tail;
int cap = q->cap;

if (head >= cap) {
q->head = head = 0;
}
int length = tail - head;
if (length < 0) {
length += cap;
}
while (length > q->overload_threshold) {
q->overload = length;
q->overload_threshold *= 2;
}
} else {
// reset overload_threshold when queue is empty
q->overload_threshold = MQ_OVERLOAD;
}

if (ret) {
Expand Down
1 change: 1 addition & 0 deletions skynet-src/skynet_mq.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ void skynet_mq_push(struct message_queue *q, struct skynet_message *message);

// return the length of message queue, for debug
int skynet_mq_length(struct message_queue *q);
int skynet_mq_overload(struct message_queue *q);

void skynet_mq_init();

Expand Down
4 changes: 4 additions & 0 deletions skynet-src/skynet_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue
n = skynet_mq_length(q);
n >>= weight;
}
int overload = skynet_mq_overload(q);
if (overload) {
skynet_error(ctx, "May overload, message queue length = %d", overload);
}

skynet_monitor_trigger(sm, msg.source , handle);

Expand Down
44 changes: 44 additions & 0 deletions test/testoverload.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
local skynet = require "skynet"

local mode = ...

if mode == "slave" then

local CMD = {}

function CMD.sum(n)
skynet.error("for loop begin")
local s = 0
for i = 1, n do
s = s + i
end
skynet.error("for loop end")
end

function CMD.blackhole()
end

skynet.start(function()
skynet.dispatch("lua", function(_,_, cmd, ...)
local f = CMD[cmd]
f(...)
end)
end)

else

skynet.start(function()
local slave = skynet.newservice(SERVICE_NAME, "slave")
for step = 1, 20 do
skynet.error("overload test ".. step)
for i = 1, 512 * step do
skynet.send(slave, "lua", "blackhole")
end
skynet.sleep(step)
end
local n = 1000000000
skynet.error(string.format("endless test n=%d", n))
skynet.send(slave, "lua", "sum", n)
end)

end

0 comments on commit 2f6bfe9

Please sign in to comment.