Skip to content

Commit

Permalink
support close on server error and period ping for stream manager (#381)
Browse files Browse the repository at this point in the history
- support close connection on server error for stream manager
- support period ping for stream manager
- Fixed a bug that stream manager will hang forever
  • Loading branch information
TingDaoK authored Jul 12, 2022
1 parent dbdd7fc commit c8fc870
Show file tree
Hide file tree
Showing 12 changed files with 403 additions and 44 deletions.
9 changes: 8 additions & 1 deletion include/aws/http/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,13 @@ void aws_http_connection_release(struct aws_http_connection *connection);
AWS_HTTP_API
void aws_http_connection_close(struct aws_http_connection *connection);

/**
* Stop accepting new requests for the connection. It will NOT start the shutdown process for the connection. The
* requests that are already open can still wait to be completed, but new requests will fail to be created,
*/
AWS_HTTP_API
void aws_http_connection_stop_new_requests(struct aws_http_connection *connection);

/**
* Returns true unless the connection is closed or closing.
*/
Expand Down Expand Up @@ -597,7 +604,7 @@ void aws_http2_connection_get_remote_settings(
*/

AWS_HTTP_API
int aws_http2_connection_send_goaway(
void aws_http2_connection_send_goaway(
struct aws_http_connection *http2_connection,
uint32_t http2_error,
bool allow_more_streams,
Expand Down
20 changes: 20 additions & 0 deletions include/aws/http/http2_stream_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,26 @@ struct aws_http2_stream_manager_options {
void *shutdown_complete_user_data;
aws_http2_stream_manager_shutdown_complete_fn *shutdown_complete_callback;

/**
* Optional.
* When set, connection will be closed if 5xx response received from server.
*/
bool close_connection_on_server_error;
/**
* Optional.
* The period for all the connections held by stream manager to send a PING in milliseconds.
* If you specify 0, manager will NOT send any PING.
* Note: if set, it must be large than the time of ping timeout setting.
*/
size_t connection_ping_period_ms;
/**
* Optional.
* Network connection will be closed if a ping response is not received
* within this amount of time (milliseconds).
* If you specify 0, a default value will be used.
*/
size_t connection_ping_timeout_ms;

/* TODO: More flexible policy about the connections, but will always has these three values below. */
/**
* Optional.
Expand Down
3 changes: 2 additions & 1 deletion include/aws/http/private/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct aws_http_connection_vtable {
const struct aws_http_request_handler_options *options);
int (*stream_send_response)(struct aws_http_stream *stream, struct aws_http_message *response);
void (*close)(struct aws_http_connection *connection);
void (*stop_new_requests)(struct aws_http_connection *connection);
bool (*is_open)(const struct aws_http_connection *connection);
bool (*new_requests_allowed)(const struct aws_http_connection *connection);

Expand All @@ -56,7 +57,7 @@ struct aws_http_connection_vtable {
const struct aws_byte_cursor *optional_opaque_data,
aws_http2_on_ping_complete_fn *on_completed,
void *user_data);
int (*send_goaway)(
void (*send_goaway)(
struct aws_http_connection *http2_connection,
uint32_t http2_error,
bool allow_more_streams,
Expand Down
17 changes: 17 additions & 0 deletions include/aws/http/private/http2_stream_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,23 @@ enum aws_h2_sm_connection_state_type {
/* Live with the streams opening, and if there no outstanding pending acquisition and no opening streams on the
* connection, this structure should die */
struct aws_h2_sm_connection {
struct aws_allocator *allocator;
struct aws_http2_stream_manager *stream_manager;
struct aws_http_connection *connection;
uint32_t num_streams_assigned; /* From a stream assigned to the connection until the stream completed
or failed to be created from the connection. */
uint32_t max_concurrent_streams; /* lower bound between user configured and the other side */

/* task to send ping periodically from connection thread. */
struct aws_ref_count ref_count;
struct aws_channel_task ping_task;
struct aws_channel_task ping_timeout_task;
struct {
bool ping_received;
bool stopped_new_requests;
uint64_t next_ping_task_time;
} thread_data;

enum aws_h2_sm_connection_state_type state;
};

Expand Down Expand Up @@ -82,7 +93,13 @@ struct aws_http2_stream_manager {
struct aws_ref_count internal_ref_count;
struct aws_client_bootstrap *bootstrap;

/* Configurations */
size_t max_connections;
/* Connection will be closed if 5xx response received from server. */
bool close_connection_on_server_error;

uint64_t connection_ping_period_ns;
uint64_t connection_ping_timeout_ns;

/**
* Default is no limit. 0 will be considered as using the default value.
Expand Down
10 changes: 7 additions & 3 deletions source/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ void aws_http_connection_close(struct aws_http_connection *connection) {
connection->vtable->close(connection);
}

void aws_http_connection_stop_new_requests(struct aws_http_connection *connection) {
AWS_ASSERT(connection);
connection->vtable->stop_new_requests(connection);
}

bool aws_http_connection_is_open(const struct aws_http_connection *connection) {
AWS_ASSERT(connection);
return connection->vtable->is_open(connection);
Expand Down Expand Up @@ -296,16 +301,15 @@ int aws_http2_connection_ping(
return http2_connection->vtable->send_ping(http2_connection, optional_opaque_data, on_ack, user_data);
}

int aws_http2_connection_send_goaway(
void aws_http2_connection_send_goaway(
struct aws_http_connection *http2_connection,
uint32_t http2_error,
bool allow_more_streams,
const struct aws_byte_cursor *optional_debug_data) {
AWS_ASSERT(http2_connection);
AWS_PRECONDITION(http2_connection->vtable);
AWS_FATAL_ASSERT(http2_connection->http_version == AWS_HTTP_VERSION_2);
return http2_connection->vtable->send_goaway(
http2_connection, http2_error, allow_more_streams, optional_debug_data);
http2_connection->vtable->send_goaway(http2_connection, http2_error, allow_more_streams, optional_debug_data);
}

int aws_http2_connection_get_sent_goaway(
Expand Down
14 changes: 14 additions & 0 deletions source/h1_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ static struct aws_http_stream *s_new_server_request_handler_stream(
const struct aws_http_request_handler_options *options);
static int s_stream_send_response(struct aws_http_stream *stream, struct aws_http_message *response);
static void s_connection_close(struct aws_http_connection *connection_base);
static void s_connection_stop_new_request(struct aws_http_connection *connection_base);
static bool s_connection_is_open(const struct aws_http_connection *connection_base);
static bool s_connection_new_requests_allowed(const struct aws_http_connection *connection_base);
static int s_decoder_on_request(
Expand Down Expand Up @@ -90,6 +91,7 @@ static struct aws_http_connection_vtable s_h1_connection_vtable = {
.new_server_request_handler_stream = s_new_server_request_handler_stream,
.stream_send_response = s_stream_send_response,
.close = s_connection_close,
.stop_new_requests = s_connection_stop_new_request,
.is_open = s_connection_is_open,
.new_requests_allowed = s_connection_new_requests_allowed,
.change_settings = NULL,
Expand Down Expand Up @@ -196,6 +198,18 @@ static void s_connection_close(struct aws_http_connection *connection_base) {
s_stop(connection, false /*stop_reading*/, false /*stop_writing*/, true /*schedule_shutdown*/, AWS_ERROR_SUCCESS);
}

static void s_connection_stop_new_request(struct aws_http_connection *connection_base) {
struct aws_h1_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h1_connection, base);

{ /* BEGIN CRITICAL SECTION */
aws_h1_connection_lock_synced_data(connection);
if (!connection->synced_data.new_stream_error_code) {
connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
}
aws_h1_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */
}

static bool s_connection_is_open(const struct aws_http_connection *connection_base) {
struct aws_h1_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h1_connection, base);
bool is_open;
Expand Down
34 changes: 20 additions & 14 deletions source/h2_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ static struct aws_http_stream *s_connection_make_request(
struct aws_http_connection *client_connection,
const struct aws_http_make_request_options *options);
static void s_connection_close(struct aws_http_connection *connection_base);
static void s_connection_stop_new_request(struct aws_http_connection *connection_base);
static bool s_connection_is_open(const struct aws_http_connection *connection_base);
static bool s_connection_new_requests_allowed(const struct aws_http_connection *connection_base);
static void s_connection_update_window(struct aws_http_connection *connection_base, uint32_t increment_size);
Expand All @@ -68,7 +69,7 @@ static int s_connection_send_ping(
const struct aws_byte_cursor *optional_opaque_data,
aws_http2_on_ping_complete_fn *on_completed,
void *user_data);
static int s_connection_send_goaway(
static void s_connection_send_goaway(
struct aws_http_connection *connection_base,
uint32_t http2_error,
bool allow_more_streams,
Expand Down Expand Up @@ -168,6 +169,7 @@ static struct aws_http_connection_vtable s_h2_connection_vtable = {
.new_server_request_handler_stream = NULL,
.stream_send_response = NULL,
.close = s_connection_close,
.stop_new_requests = s_connection_stop_new_request,
.is_open = s_connection_is_open,
.new_requests_allowed = s_connection_new_requests_allowed,
.update_window = s_connection_update_window,
Expand Down Expand Up @@ -565,10 +567,9 @@ static struct aws_h2_pending_goaway *s_new_pending_goaway(
}
struct aws_h2_pending_goaway *pending_goaway;
void *debug_data_storage;
if (!aws_mem_acquire_many(
allocator, 2, &pending_goaway, sizeof(struct aws_h2_pending_goaway), &debug_data_storage, debug_data.len)) {
return NULL;
}
/* mem acquire cannot fail anymore */
aws_mem_acquire_many(
allocator, 2, &pending_goaway, sizeof(struct aws_h2_pending_goaway), &debug_data_storage, debug_data.len);
if (debug_data.len) {
memcpy(debug_data_storage, debug_data.ptr, debug_data.len);
debug_data.ptr = debug_data_storage;
Expand Down Expand Up @@ -2128,6 +2129,18 @@ static void s_connection_close(struct aws_http_connection *connection_base) {
s_stop(connection, false /*stop_reading*/, false /*stop_writing*/, true /*schedule_shutdown*/, AWS_ERROR_SUCCESS);
}

static void s_connection_stop_new_request(struct aws_http_connection *connection_base) {
struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);

{ /* BEGIN CRITICAL SECTION */
s_lock_synced_data(connection);
if (!connection->synced_data.new_stream_error_code) {
connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
}
s_unlock_synced_data(connection);
} /* END CRITICAL SECTION */
}

static bool s_connection_is_open(const struct aws_http_connection *connection_base) {
struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
bool is_open;
Expand Down Expand Up @@ -2358,7 +2371,7 @@ static int s_connection_send_ping(
return aws_raise_error(AWS_ERROR_INVALID_STATE);
}

static int s_connection_send_goaway(
static void s_connection_send_goaway(
struct aws_http_connection *connection_base,
uint32_t http2_error,
bool allow_more_streams,
Expand All @@ -2368,11 +2381,6 @@ static int s_connection_send_goaway(
struct aws_h2_pending_goaway *pending_goaway =
s_new_pending_goaway(connection->base.alloc, http2_error, allow_more_streams, optional_debug_data);

if (!pending_goaway) {
/* error happened during acquire memory. Error code raised there and skip logging. */
return AWS_OP_ERR;
}

bool was_cross_thread_work_scheduled = false;
bool connection_open;
{ /* BEGIN CRITICAL SECTION */
Expand All @@ -2383,7 +2391,7 @@ static int s_connection_send_goaway(
s_unlock_synced_data(connection);
CONNECTION_LOG(DEBUG, connection, "Goaway not sent, connection is closed or closing.");
aws_mem_release(connection->base.alloc, pending_goaway);
goto done;
return;
}
was_cross_thread_work_scheduled = connection->synced_data.is_cross_thread_work_task_scheduled;
connection->synced_data.is_cross_thread_work_task_scheduled = true;
Expand All @@ -2404,8 +2412,6 @@ static int s_connection_send_goaway(
CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task");
aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task);
}
done:
return AWS_OP_SUCCESS;
}

static void s_get_settings_general(
Expand Down
Loading

0 comments on commit c8fc870

Please sign in to comment.