diff --git a/benchmarks/000.microbenchmarks/010.sleep/config.json b/benchmarks/000.microbenchmarks/010.sleep/config.json index c7e8fc23..1121edc0 100644 --- a/benchmarks/000.microbenchmarks/010.sleep/config.json +++ b/benchmarks/000.microbenchmarks/010.sleep/config.json @@ -1,5 +1,5 @@ { "timeout": 120, "memory": 128, - "languages": ["python", "nodejs"] + "languages": ["python", "nodejs", "cpp"] } diff --git a/benchmarks/000.microbenchmarks/010.sleep/cpp/main.cpp b/benchmarks/000.microbenchmarks/010.sleep/cpp/main.cpp new file mode 100644 index 00000000..23d40288 --- /dev/null +++ b/benchmarks/000.microbenchmarks/010.sleep/cpp/main.cpp @@ -0,0 +1,21 @@ + +#include +#include +#include + +#include +#include +#include + +std::tuple function(Aws::Utils::Json::JsonView json) +{ + int sleep = json.GetInteger("sleep"); + + std::chrono::seconds timespan(sleep); + std::this_thread::sleep_for(timespan); + + Aws::Utils::Json::JsonValue val; + val.WithObject("result", std::to_string(sleep)); + return std::make_tuple(val, 0); +} + diff --git a/benchmarks/000.microbenchmarks/051.communication.storage/config.json b/benchmarks/000.microbenchmarks/051.communication.storage/config.json new file mode 100644 index 00000000..06275b3a --- /dev/null +++ b/benchmarks/000.microbenchmarks/051.communication.storage/config.json @@ -0,0 +1,16 @@ +{ + "timeout": 900, + "memory": 2048, + "languages": ["cpp"], + "deployment": { + "package": { + "build_dependencies": { + "cpp": { + "common": ["boost"], + "aws": ["sdk"] + } + } + }, + "image": null + } +} diff --git a/benchmarks/000.microbenchmarks/051.communication.storage/cpp/dependencies.json b/benchmarks/000.microbenchmarks/051.communication.storage/cpp/dependencies.json new file mode 100644 index 00000000..ad69992a --- /dev/null +++ b/benchmarks/000.microbenchmarks/051.communication.storage/cpp/dependencies.json @@ -0,0 +1,7 @@ +{ + "package": { + "common": ["boost"], + "aws": ["sdk"] + }, + "image": null +} diff --git a/benchmarks/000.microbenchmarks/051.communication.storage/cpp/main.cpp b/benchmarks/000.microbenchmarks/051.communication.storage/cpp/main.cpp new file mode 100644 index 00000000..c948abac --- /dev/null +++ b/benchmarks/000.microbenchmarks/051.communication.storage/cpp/main.cpp @@ -0,0 +1,153 @@ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "storage.hpp" +#include "utils.hpp" + +std::tuple function(Aws::Utils::Json::JsonView json) +{ + Storage client = Storage::get_client(); + + auto bucket = json.GetString("bucket"); + auto key = json.GetString("key"); + auto role = json.GetString("role"); // producer or consumer + auto size = json.GetInteger("size"); + auto invoc = json.GetObject("invocations"); + int reps = invoc.GetInteger("invocations"); + int iteration = invoc.GetInteger("iteration"); + int warmup_reps = invoc.GetInteger("warmup"); + bool with_backoff = invoc.GetBool("with_backoff"); + int offset = invoc.GetInteger("offset"); + std::cout << "Invoked handler for role " << role << " with file size " << size + << " and " << reps << " messages per lambda " << std::endl; + + char* pBuf = new char[size]; + memset(pBuf, 'A', sizeof(char)*size); + + std::string data_key = client.key_join({key, "messages"}); + std::string results_key = client.key_join({key, "results"}); + + std::vector times; + std::vector retries_times; + int retries = 0; + if (role == "producer") { + + for(int i = 0; i < warmup_reps; ++i) { + + std::string prefix = std::to_string(size) + "_" + std::to_string(i + offset); + std::string new_key = client.key_join({data_key, prefix}); + std::string new_key_response = client.key_join({data_key, prefix + "_response"}); + client.upload_file(bucket, new_key, size, pBuf); + int ret = client.download_file(bucket, new_key_response, retries, with_backoff); + if(ret == 0) { + std::cerr << "Failed download " << i << '\n'; + break; + } + } + + for(int i = warmup_reps; i < reps + warmup_reps; ++i) { + + std::string prefix = std::to_string(size) + "_" + std::to_string(i + offset); + std::string new_key = client.key_join({data_key, prefix}); + std::string new_key_response = client.key_join({data_key, prefix + "_response"}); + auto beg = timeSinceEpochMillisec(); + client.upload_file(bucket, new_key, size, pBuf); + int ret = client.download_file(bucket, new_key_response, retries, with_backoff); + auto end = timeSinceEpochMillisec(); + times.push_back(end - beg); + retries_times.push_back(retries); + + if(ret == 0) { + std::cerr << "Failed download " << i << '\n'; + break; + } + } + + std::stringstream ss; + ss << times.size() << '\n'; + for(size_t i = 0; i < times.size(); ++i) + ss << times[i] << '\n'; + std::stringstream ss2; + ss2 << retries_times.size() << '\n'; + for(size_t i = 0; i < retries_times.size(); ++i) + ss2 << retries_times[i] << '\n'; + + auto times_str = ss.str(); + char* data = new char[times_str.length() + 1]; + strcpy(data, times_str.c_str()); + + auto retries_times_str = ss2.str(); + char* data2 = new char[retries_times_str.length() + 1]; + strcpy(data2, retries_times_str.c_str()); + + std::string new_key = client.key_join({results_key, "producer_times_" + std::to_string(size) + "_" + std::to_string(iteration) + ".txt"}); + client.upload_file(bucket, new_key, times_str.length(), data); + new_key = client.key_join({results_key, "producer_retries_" + std::to_string(size) + "_" + std::to_string(iteration) + ".txt"}); + client.upload_file(bucket, new_key, retries_times_str.length(), data2); + + delete[] data; + delete[] data2; + } else if (role == "consumer") { + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + for(int i = 0; i < warmup_reps; ++i) { + + std::string prefix = std::to_string(size) + "_" + std::to_string(i + offset); + std::string new_key = client.key_join({data_key, prefix}); + std::string new_key_response = client.key_join({data_key, prefix + "_response"}); + int ret = client.download_file(bucket, new_key, retries, with_backoff); + client.upload_file(bucket, new_key_response, size, pBuf); + if(ret == 0) { + std::cerr << "Failed download " << i << '\n'; + break; + } + } + + for(int i = warmup_reps; i < reps + warmup_reps; ++i) { + + std::string prefix = std::to_string(size) + "_" + std::to_string(i + offset); + std::string new_key = client.key_join({data_key, prefix}); + std::string new_key_response = client.key_join({data_key, prefix + "_response"}); + int ret = client.download_file(bucket, new_key, retries, with_backoff); + client.upload_file(bucket, new_key_response, size, pBuf); + retries_times.push_back(retries); + if(ret == 0) { + std::cerr << "Failed download " << i << '\n'; + break; + } + + } + + std::stringstream ss2; + ss2 << retries_times.size() << '\n'; + for(int i = 0; i < retries_times.size(); ++i) + ss2 << retries_times[i] << '\n'; + auto retries_times_str = ss2.str(); + char* data = new char[retries_times_str.length() + 1]; + strcpy(data, retries_times_str.c_str()); + std::string new_key = client.key_join({results_key, "consumer_retries_" + std::to_string(size) + "_" + std::to_string(iteration) + ".txt"}); + client.upload_file(bucket, new_key, retries_times_str.length(), data); + delete[] data; + } + + delete[] pBuf; + + Aws::Utils::Json::JsonValue val; + val.WithObject("result", std::to_string(size)); + return std::make_tuple(val, 0); +} + diff --git a/benchmarks/000.microbenchmarks/051.communication.storage/input.py b/benchmarks/000.microbenchmarks/051.communication.storage/input.py new file mode 100644 index 00000000..eb2d5a62 --- /dev/null +++ b/benchmarks/000.microbenchmarks/051.communication.storage/input.py @@ -0,0 +1,12 @@ + +size_generators = { + 'test' : 1, + 'small' : 100, + 'large': 1000 +} + +def buckets_count(): + return (0, 0) + +def generate_input(data_dir, size, input_buckets, output_buckets, upload_func): + return { 'sleep': size_generators[size] } diff --git a/benchmarks/000.microbenchmarks/052.communication.key-value/config.json b/benchmarks/000.microbenchmarks/052.communication.key-value/config.json new file mode 100644 index 00000000..c9622c8f --- /dev/null +++ b/benchmarks/000.microbenchmarks/052.communication.key-value/config.json @@ -0,0 +1,16 @@ +{ + "timeout": 540, + "memory": 2048, + "languages": ["cpp"], + "deployment": { + "package": { + "build_dependencies": { + "cpp": { + "common": ["boost"], + "aws": ["sdk"] + } + } + }, + "image": null + } +} diff --git a/benchmarks/000.microbenchmarks/052.communication.key-value/cpp/dependencies.json b/benchmarks/000.microbenchmarks/052.communication.key-value/cpp/dependencies.json new file mode 100644 index 00000000..ad69992a --- /dev/null +++ b/benchmarks/000.microbenchmarks/052.communication.key-value/cpp/dependencies.json @@ -0,0 +1,7 @@ +{ + "package": { + "common": ["boost"], + "aws": ["sdk"] + }, + "image": null +} diff --git a/benchmarks/000.microbenchmarks/052.communication.key-value/cpp/main.cpp b/benchmarks/000.microbenchmarks/052.communication.key-value/cpp/main.cpp new file mode 100644 index 00000000..149b0793 --- /dev/null +++ b/benchmarks/000.microbenchmarks/052.communication.key-value/cpp/main.cpp @@ -0,0 +1,205 @@ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "storage.hpp" +#include "key-value.hpp" +#include "utils.hpp" + +template +std::tuple to_string(const std::vector & data) +{ + + std::stringstream ss; + ss << data.size() << '\n'; + for(int i = 0; i < data.size(); ++i) + ss << data[i] << '\n'; + auto data_str = ss.str(); + char* string_data = new char[data_str.length() + 1]; + strcpy(string_data, data_str.c_str()); + + return std::make_tuple(string_data, data_str.length()); +} + +std::tuple function(Aws::Utils::Json::JsonView json) +{ + Storage client = Storage::get_client(); + KeyValue channel_client; + + auto bucket = json.GetString("bucket"); + auto key = json.GetString("key"); + auto role = json.GetString("role"); // producer or consumer + auto size = json.GetInteger("size"); + auto invoc = json.GetObject("invocations"); + int reps = invoc.GetInteger("invocations"); + int iteration = invoc.GetInteger("iteration"); + int warmup_reps = invoc.GetInteger("warmup"); + bool with_backoff = invoc.GetBool("with_backoff"); + int offset = invoc.GetInteger("offset"); + std::cout << "Invoked handler for role " << role << " with file size " << size + << " and " << reps << " messages per lambda " << std::endl; + + unsigned char* pBuf = new unsigned char[size]; + memset(pBuf, 'A', sizeof(unsigned char)*size); + + std::string data_key = client.key_join({key, "messages"}); + std::string results_key = client.key_join({key, "results"}); + + std::vector times; + std::vector retries_times; + std::vector read_capacity_units; + std::vector write_capacity_units; + + int retries = 0; + double read_units = 0; + double write_units = 0; + + if (role == "producer") { + + for(int i = 0; i < warmup_reps; ++i) { + + std::string prefix = std::to_string(size) + "_" + std::to_string(i + offset); + std::string new_key = client.key_join({data_key, prefix}); + std::string new_key_response = client.key_join({data_key, prefix + "_response"}); + + channel_client.upload_file(bucket, new_key, write_units, size, pBuf); + int ret = channel_client.download_file(bucket, new_key_response, retries, read_units, with_backoff); + + if(ret == 0) { + std::cerr << "Failed download " << i << '\n'; + break; + } + + } + + for(int i = warmup_reps; i < reps + warmup_reps; ++i) { + + std::string prefix = std::to_string(size) + "_" + std::to_string(i + offset); + std::string new_key = client.key_join({data_key, prefix}); + std::string new_key_response = client.key_join({data_key, prefix + "_response"}); + + auto beg = timeSinceEpochMillisec(); + channel_client.upload_file(bucket, new_key, write_units, size, pBuf); + int ret = channel_client.download_file(bucket, new_key_response, retries, read_units, with_backoff); + auto end = timeSinceEpochMillisec(); + + times.push_back(end - beg); + retries_times.push_back(retries); + read_capacity_units.push_back(read_units); + write_capacity_units.push_back(write_units); + + if(ret == 0) { + std::cerr << "Failed download " << i << '\n'; + break; + } + } + + { + auto data = to_string(times); + std::string new_key = client.key_join({results_key, "producer_times_" + std::to_string(size) + "_" + std::to_string(iteration) + ".txt"}); + client.upload_file(bucket, new_key, std::get<1>(data), std::get<0>(data)); + delete[] std::get<0>(data); + } + + { + auto data = to_string(retries_times); + std::string new_key = client.key_join({results_key, "producer_retries_" + std::to_string(size) + "_" + std::to_string(iteration) + ".txt"}); + client.upload_file(bucket, new_key, std::get<1>(data), std::get<0>(data)); + delete[] std::get<0>(data); + } + + { + auto data = to_string(write_capacity_units); + std::string new_key = client.key_join({results_key, "producer_write_units_" + std::to_string(size) + "_" + std::to_string(iteration) + ".txt"}); + client.upload_file(bucket, new_key, std::get<1>(data), std::get<0>(data)); + delete[] std::get<0>(data); + } + + { + auto data = to_string(read_capacity_units); + std::string new_key = client.key_join({results_key, "producer_read_units_" + std::to_string(size) + "_" + std::to_string(iteration) + ".txt"}); + client.upload_file(bucket, new_key, std::get<1>(data), std::get<0>(data)); + delete[] std::get<0>(data); + } + + } else if (role == "consumer") { + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + for(int i = 0; i < warmup_reps; ++i) { + + std::string prefix = std::to_string(size) + "_" + std::to_string(i + offset); + std::string new_key = client.key_join({data_key, prefix}); + std::string new_key_response = client.key_join({data_key, prefix + "_response"}); + + int ret = channel_client.download_file(bucket, new_key, retries, read_units, with_backoff); + channel_client.upload_file(bucket, new_key_response, write_units, size, pBuf); + + if(ret == 0) { + std::cerr << "Failed download " << i << '\n'; + break; + } + + } + + for(int i = warmup_reps; i < reps + warmup_reps; ++i) { + + std::string prefix = std::to_string(size) + "_" + std::to_string(i + offset); + std::string new_key = client.key_join({data_key, prefix}); + std::string new_key_response = client.key_join({data_key, prefix + "_response"}); + + int ret = channel_client.download_file(bucket, new_key, retries, read_units, with_backoff); + channel_client.upload_file(bucket, new_key_response, write_units, size, pBuf); + + retries_times.push_back(retries); + read_capacity_units.push_back(read_units); + write_capacity_units.push_back(write_units); + + if(ret == 0) { + std::cerr << "Failed download " << i << '\n'; + break; + } + + } + + { + auto data = to_string(retries_times); + std::string new_key = client.key_join({results_key, "consumer_retries_" + std::to_string(size) + "_" + std::to_string(iteration) + ".txt"}); + client.upload_file(bucket, new_key, std::get<1>(data), std::get<0>(data)); + delete[] std::get<0>(data); + } + + { + auto data = to_string(write_capacity_units); + std::string new_key = client.key_join({results_key, "consumer_write_units_" + std::to_string(size) + "_" + std::to_string(iteration) + ".txt"}); + client.upload_file(bucket, new_key, std::get<1>(data), std::get<0>(data)); + delete[] std::get<0>(data); + } + + { + auto data = to_string(read_capacity_units); + std::string new_key = client.key_join({results_key, "consumer_read_units_" + std::to_string(size) + "_" + std::to_string(iteration) + ".txt"}); + client.upload_file(bucket, new_key, std::get<1>(data), std::get<0>(data)); + delete[] std::get<0>(data); + } + } + + delete[] pBuf; + + Aws::Utils::Json::JsonValue val; + val.WithObject("result", std::to_string(size)); + return std::make_tuple(val, 0); +} + diff --git a/benchmarks/000.microbenchmarks/052.communication.key-value/input.py b/benchmarks/000.microbenchmarks/052.communication.key-value/input.py new file mode 100644 index 00000000..eb2d5a62 --- /dev/null +++ b/benchmarks/000.microbenchmarks/052.communication.key-value/input.py @@ -0,0 +1,12 @@ + +size_generators = { + 'test' : 1, + 'small' : 100, + 'large': 1000 +} + +def buckets_count(): + return (0, 0) + +def generate_input(data_dir, size, input_buckets, output_buckets, upload_func): + return { 'sleep': size_generators[size] } diff --git a/benchmarks/000.microbenchmarks/053.communication.redis/config.json b/benchmarks/000.microbenchmarks/053.communication.redis/config.json new file mode 100644 index 00000000..c9622c8f --- /dev/null +++ b/benchmarks/000.microbenchmarks/053.communication.redis/config.json @@ -0,0 +1,16 @@ +{ + "timeout": 540, + "memory": 2048, + "languages": ["cpp"], + "deployment": { + "package": { + "build_dependencies": { + "cpp": { + "common": ["boost"], + "aws": ["sdk"] + } + } + }, + "image": null + } +} diff --git a/benchmarks/000.microbenchmarks/053.communication.redis/cpp/dependencies.json b/benchmarks/000.microbenchmarks/053.communication.redis/cpp/dependencies.json new file mode 100644 index 00000000..105e8f7e --- /dev/null +++ b/benchmarks/000.microbenchmarks/053.communication.redis/cpp/dependencies.json @@ -0,0 +1,7 @@ +{ + "package": { + "common": ["boost", "hiredis"], + "aws": ["sdk"] + }, + "image": null +} diff --git a/benchmarks/000.microbenchmarks/053.communication.redis/cpp/main.cpp b/benchmarks/000.microbenchmarks/053.communication.redis/cpp/main.cpp new file mode 100644 index 00000000..2342af73 --- /dev/null +++ b/benchmarks/000.microbenchmarks/053.communication.redis/cpp/main.cpp @@ -0,0 +1,192 @@ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "storage.hpp" +#include "redis.hpp" +#include "utils.hpp" + +template +std::tuple to_string(const std::vector & data) +{ + + std::stringstream ss; + ss << data.size() << '\n'; + for(int i = 0; i < data.size(); ++i) + ss << data[i] << '\n'; + auto data_str = ss.str(); + char* string_data = new char[data_str.length() + 1]; + strcpy(string_data, data_str.c_str()); + + return std::make_tuple(string_data, data_str.length()); +} + +std::tuple function(Aws::Utils::Json::JsonView json) +{ + Storage client = Storage::get_client(); + + auto bucket = json.GetString("bucket"); + auto key = json.GetString("key"); + auto role = json.GetString("role"); // producer or consumer + auto size = json.GetInteger("size"); + auto invoc = json.GetObject("invocations"); + int reps = invoc.GetInteger("invocations"); + int iteration = invoc.GetInteger("iteration"); + int warmup_reps = invoc.GetInteger("warmup"); + bool with_backoff = invoc.GetBool("with_backoff"); + int offset = invoc.GetInteger("offset"); + std::cout << "Invoked handler for role " << role << " with file size " << size + << " and " << reps << " messages per lambda " << std::endl; + + auto redis_cfg = json.GetObject("redis"); + std::string redis_hostname = redis_cfg.GetString("hostname"); + int redis_port = redis_cfg.GetInteger("port"); + + Redis channel_client{redis_hostname, redis_port}; + if(!channel_client.is_initialized()) { + + std::string error_msg = "Couldn't access Redis cluster"; + std::cerr << error_msg << std::endl; + + Aws::Utils::Json::JsonValue val; + val.WithObject("result", error_msg); + + return std::make_tuple(val, 1); + } + + char* pBuf = new char[size]; + memset(pBuf, 'A', sizeof(char)*size); + + std::string data_key = client.key_join({key, "messages"}); + std::string results_key = client.key_join({key, "results"}); + + std::vector times; + std::vector retries_times; + + int retries = 0; + + if (role == "producer") { + + for(int i = 0; i < warmup_reps; ++i) { + + std::string prefix = std::to_string(size) + "_" + std::to_string(i + offset); + std::string new_key = client.key_join({data_key, prefix}); + std::string new_key_response = client.key_join({data_key, prefix + "_response"}); + + channel_client.upload_file(new_key, size, pBuf); + int ret = channel_client.download_file(new_key_response, retries, with_backoff); + + if(ret == 0) { + std::cerr << "Failed download " << i << '\n'; + channel_client.delete_file(new_key); + break; + } + + channel_client.delete_file(new_key); + channel_client.delete_file(new_key_response); + + } + + for(int i = warmup_reps; i < reps + warmup_reps; ++i) { + + std::string prefix = std::to_string(size) + "_" + std::to_string(i + offset); + std::string new_key = client.key_join({data_key, prefix}); + std::string new_key_response = client.key_join({data_key, prefix + "_response"}); + + auto beg = timeSinceEpochMillisec(); + channel_client.upload_file(new_key, size, pBuf); + int ret = channel_client.download_file(new_key_response, retries, with_backoff); + auto end = timeSinceEpochMillisec(); + + times.push_back(end - beg); + retries_times.push_back(retries); + + if(ret == 0) { + std::cerr << "Failed download " << i << '\n'; + channel_client.delete_file(new_key); + break; + } + + channel_client.delete_file(new_key); + channel_client.delete_file(new_key_response); + } + + { + auto data = to_string(times); + std::string new_key = client.key_join({results_key, "producer_times_" + std::to_string(size) + "_" + std::to_string(iteration) + ".txt"}); + client.upload_file(bucket, new_key, std::get<1>(data), std::get<0>(data)); + delete[] std::get<0>(data); + } + + { + auto data = to_string(retries_times); + std::string new_key = client.key_join({results_key, "producer_retries_" + std::to_string(size) + "_" + std::to_string(iteration) + ".txt"}); + client.upload_file(bucket, new_key, std::get<1>(data), std::get<0>(data)); + delete[] std::get<0>(data); + } + + } else if (role == "consumer") { + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + for(int i = 0; i < warmup_reps; ++i) { + + std::string prefix = std::to_string(size) + "_" + std::to_string(i + offset); + std::string new_key = client.key_join({data_key, prefix}); + std::string new_key_response = client.key_join({data_key, prefix + "_response"}); + + int ret = channel_client.download_file(new_key, retries, with_backoff); + if(ret == 0) { + std::cerr << "Failed download " << i << '\n'; + break; + } + + channel_client.upload_file(new_key_response, size, pBuf); + + } + + for(int i = warmup_reps; i < reps + warmup_reps; ++i) { + + std::string prefix = std::to_string(size) + "_" + std::to_string(i + offset); + std::string new_key = client.key_join({data_key, prefix}); + std::string new_key_response = client.key_join({data_key, prefix + "_response"}); + + int ret = channel_client.download_file(new_key, retries, with_backoff); + if(ret == 0) { + std::cerr << "Failed download " << i << '\n'; + break; + } + + retries_times.push_back(retries); + channel_client.upload_file(new_key_response, size, pBuf); + + } + + { + auto data = to_string(retries_times); + std::string new_key = client.key_join({results_key, "consumer_retries_" + std::to_string(size) + "_" + std::to_string(iteration) + ".txt"}); + client.upload_file(bucket, new_key, std::get<1>(data), std::get<0>(data)); + delete[] std::get<0>(data); + } + + } + + delete[] pBuf; + + Aws::Utils::Json::JsonValue val; + val.WithObject("result", std::to_string(size)); + return std::make_tuple(val, 0); +} + diff --git a/benchmarks/000.microbenchmarks/053.communication.redis/input.py b/benchmarks/000.microbenchmarks/053.communication.redis/input.py new file mode 100644 index 00000000..eb2d5a62 --- /dev/null +++ b/benchmarks/000.microbenchmarks/053.communication.redis/input.py @@ -0,0 +1,12 @@ + +size_generators = { + 'test' : 1, + 'small' : 100, + 'large': 1000 +} + +def buckets_count(): + return (0, 0) + +def generate_input(data_dir, size, input_buckets, output_buckets, upload_func): + return { 'sleep': size_generators[size] } diff --git a/benchmarks/000.microbenchmarks/054.communication.tcp/config.json b/benchmarks/000.microbenchmarks/054.communication.tcp/config.json new file mode 100644 index 00000000..c9622c8f --- /dev/null +++ b/benchmarks/000.microbenchmarks/054.communication.tcp/config.json @@ -0,0 +1,16 @@ +{ + "timeout": 540, + "memory": 2048, + "languages": ["cpp"], + "deployment": { + "package": { + "build_dependencies": { + "cpp": { + "common": ["boost"], + "aws": ["sdk"] + } + } + }, + "image": null + } +} diff --git a/benchmarks/000.microbenchmarks/054.communication.tcp/cpp/dependencies.json b/benchmarks/000.microbenchmarks/054.communication.tcp/cpp/dependencies.json new file mode 100644 index 00000000..105e8f7e --- /dev/null +++ b/benchmarks/000.microbenchmarks/054.communication.tcp/cpp/dependencies.json @@ -0,0 +1,7 @@ +{ + "package": { + "common": ["boost", "hiredis"], + "aws": ["sdk"] + }, + "image": null +} diff --git a/benchmarks/000.microbenchmarks/054.communication.tcp/cpp/main.cpp b/benchmarks/000.microbenchmarks/054.communication.tcp/cpp/main.cpp new file mode 100644 index 00000000..674a328f --- /dev/null +++ b/benchmarks/000.microbenchmarks/054.communication.tcp/cpp/main.cpp @@ -0,0 +1,149 @@ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "storage.hpp" +#include "tcp.hpp" +#include "utils.hpp" + +template +std::tuple to_string(const std::vector & data) +{ + + std::stringstream ss; + ss << data.size() << '\n'; + for(int i = 0; i < data.size(); ++i) + ss << data[i] << '\n'; + auto data_str = ss.str(); + char* string_data = new char[data_str.length() + 1]; + strcpy(string_data, data_str.c_str()); + + return std::make_tuple(string_data, data_str.length()); +} + +std::tuple function(Aws::Utils::Json::JsonView json) +{ + Storage client = Storage::get_client(); + + auto bucket = json.GetString("bucket"); + auto key = json.GetString("key"); + auto role = json.GetString("role"); // producer or consumer + auto size = json.GetInteger("size"); + auto invoc = json.GetObject("invocations"); + int reps = invoc.GetInteger("invocations"); + int iteration = invoc.GetInteger("iteration"); + int warmup_reps = invoc.GetInteger("warmup"); + int offset = invoc.GetInteger("offset"); + std::cout << "Invoked handler for role " << role << " with file size " << size + << " and " << reps << " messages per lambda " << std::endl; + + auto tcp_cfg = json.GetObject("tcpuncher"); + std::string address = tcp_cfg.GetString("address"); + std::string pairing_key = tcp_cfg.GetString("pairing_key"); + int id = tcp_cfg.GetInteger("id"); + + char* recv_buffer = new char[size]; + char* pBuf = new char[size]; + memset(pBuf, 'A', sizeof(char)*size); + + TCP channel_client{address, pairing_key}; + if (role == "producer") { + channel_client.connect_producer(1); + } else { + channel_client.connect_consumer(id); + } + + std::string data_key = client.key_join({key, "messages"}); + std::string results_key = client.key_join({key, "results"}); + + std::vector times; + + if (role == "producer") { + + for(int i = 0; i < warmup_reps; ++i) { + + int upload_ret = channel_client.upload_file(0, size, pBuf); + int download_ret = channel_client.download_file(0, size, recv_buffer); + + if(upload_ret == 0 || download_ret == 0) { + std::cerr << "Failed processing " << i << '\n'; + break; + } + + + } + + for(int i = warmup_reps; i < reps + warmup_reps; ++i) { + + auto beg = timeSinceEpochMillisec(); + int upload_ret = channel_client.upload_file(0, size, pBuf); + int download_ret = channel_client.download_file(0, size, recv_buffer); + auto end = timeSinceEpochMillisec(); + + times.push_back(end - beg); + + if(upload_ret == 0 || download_ret == 0) { + std::cerr << "Failed download " << i << '\n'; + break; + } + + } + + { + auto data = to_string(times); + std::string new_key = client.key_join({results_key, "producer_times_" + std::to_string(size) + "_" + std::to_string(iteration) + ".txt"}); + client.upload_file(bucket, new_key, std::get<1>(data), std::get<0>(data)); + delete[] std::get<0>(data); + } + + } else if (role == "consumer") { + + for(int i = 0; i < warmup_reps; ++i) { + + int download_ret = channel_client.download_file(0, size, recv_buffer); + int upload_ret = channel_client.upload_file(0, size, pBuf); + + if(download_ret == 0 || upload_ret == 0) { + std::cerr << "Failed processing " << i << '\n'; + break; + } + + + } + + for(int i = warmup_reps; i < reps + warmup_reps; ++i) { + + int download_ret = channel_client.download_file(0, size, recv_buffer); + int upload_ret = channel_client.upload_file(0, size, pBuf); + + if(download_ret == 0 || upload_ret == 0) { + std::cerr << "Failed processing " << i << '\n'; + break; + } + + + } + + } + + delete[] pBuf; + delete[] recv_buffer; + + Aws::Utils::Json::JsonValue val; + val.WithObject("result", std::to_string(size)); + return std::make_tuple(val, 0); +} + diff --git a/benchmarks/000.microbenchmarks/054.communication.tcp/input.py b/benchmarks/000.microbenchmarks/054.communication.tcp/input.py new file mode 100644 index 00000000..eb2d5a62 --- /dev/null +++ b/benchmarks/000.microbenchmarks/054.communication.tcp/input.py @@ -0,0 +1,12 @@ + +size_generators = { + 'test' : 1, + 'small' : 100, + 'large': 1000 +} + +def buckets_count(): + return (0, 0) + +def generate_input(data_dir, size, input_buckets, output_buckets, upload_func): + return { 'sleep': size_generators[size] } diff --git a/benchmarks/wrappers/aws/cpp/handler.cpp b/benchmarks/wrappers/aws/cpp/handler.cpp new file mode 100644 index 00000000..4321a132 --- /dev/null +++ b/benchmarks/wrappers/aws/cpp/handler.cpp @@ -0,0 +1,80 @@ + +#include + +#include +#include +#include + +#include +#include +#include + +#include "utils.hpp" + +// Global variables that are retained across function invocations +bool cold_execution = true; +std::string container_id = ""; +std::string cold_start_var = ""; + +std::tuple function(Aws::Utils::Json::JsonView req); + +aws::lambda_runtime::invocation_response handler(aws::lambda_runtime::invocation_request const &req) +{ + Aws::Utils::Json::JsonValue json(req.payload); + Aws::Utils::Json::JsonView json_view = json.View(); + // HTTP trigger with API Gateaway sends payload as a serialized JSON + // stored under key 'body' in the main JSON + // The SDK trigger converts everything for us + if(json_view.ValueExists("body")){ + Aws::Utils::Json::JsonValue parsed_body{json_view.GetString("body")}; + json = std::move(parsed_body); + json_view = json.View(); + } + + const auto begin = std::chrono::system_clock::now(); + auto [ret, exit_code] = function(json.View()); + const auto end = std::chrono::system_clock::now(); + + Aws::Utils::Json::JsonValue body; + body.WithObject("result", ret); + + // Switch cold execution after the first one. + if(cold_execution) + cold_execution = false; + + auto b = std::chrono::duration_cast(begin.time_since_epoch()).count() / 1000.0 / 1000.0; + auto e = std::chrono::duration_cast(end.time_since_epoch()).count() / 1000.0 / 1000.0; + body.WithDouble("begin", b); + body.WithDouble("end", e); + body.WithDouble("results_time", e - b); + body.WithString("request_id", req.request_id); + body.WithBool("is_cold", cold_execution); + body.WithString("container_id", container_id); + body.WithString("cold_start_var", cold_start_var); + body.WithInteger("exit_code", exit_code); + + Aws::Utils::Json::JsonValue final_result; + final_result.WithObject("body", body); + + if(!exit_code) + return aws::lambda_runtime::invocation_response::success(final_result.View().WriteReadable(), "application/json"); + else + return aws::lambda_runtime::invocation_response::failure(final_result.View().WriteReadable(), "FailedInvocation"); +} + +int main() +{ + Aws::SDKOptions options; + Aws::InitAPI(options); + + const char * cold_var = std::getenv("cold_start"); + if(cold_var) + cold_start_var = cold_var; + container_id = boost::uuids::to_string(boost::uuids::random_generator()()); + + aws::lambda_runtime::run_handler(handler); + + Aws::ShutdownAPI(options); + return 0; +} + diff --git a/benchmarks/wrappers/aws/cpp/key-value.cpp b/benchmarks/wrappers/aws/cpp/key-value.cpp new file mode 100644 index 00000000..3637a8c1 --- /dev/null +++ b/benchmarks/wrappers/aws/cpp/key-value.cpp @@ -0,0 +1,103 @@ + +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include "key-value.hpp" +#include "utils.hpp" + +KeyValue::KeyValue() +{ + Aws::Client::ClientConfiguration config; + //config.region = "eu-central-1"; + config.caFile = "/etc/pki/tls/certs/ca-bundle.crt"; + + char const TAG[] = "LAMBDA_ALLOC"; + auto credentialsProvider = Aws::MakeShared(TAG); + _client.reset(new Aws::DynamoDB::DynamoDBClient(credentialsProvider, config)); +} + +uint64_t KeyValue::download_file(Aws::String const &table, Aws::String const &key, + int &required_retries, double& read_units, bool with_backoff) +{ + Aws::DynamoDB::Model::GetItemRequest req; + + // Set up the request + req.SetTableName(table); + req.SetReturnConsumedCapacity(Aws::DynamoDB::Model::ReturnConsumedCapacity::TOTAL); + Aws::DynamoDB::Model::AttributeValue hashKey; + hashKey.SetS(key); + req.AddKey("key", hashKey); + + auto bef = timeSinceEpochMillisec(); + int retries = 0; + const int MAX_RETRIES = 1500; + + while (retries < MAX_RETRIES) { + auto get_result = _client->GetItem(req); + if (get_result.IsSuccess()) { + + // Reference the retrieved fields/values + auto result = get_result.GetResult(); + const Aws::Map& item = result.GetItem(); + if (item.size() > 0) { + uint64_t finishedTime = timeSinceEpochMillisec(); + + required_retries = retries; + // GetReadCapacityUnits returns 0? + read_units = result.GetConsumedCapacity().GetCapacityUnits(); + + return finishedTime - bef; + } + + } else { + retries += 1; + if(with_backoff) { + int sleep_time = retries; + if (retries > 100) { + sleep_time = retries * 2; + } + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); + } + } + } + return 0; +} + +uint64_t KeyValue::upload_file(Aws::String const &table, + Aws::String const &key, + double& write_units, + int size, unsigned char* pBuf) +{ + Aws::Utils::ByteBuffer buf(pBuf, size); + + Aws::DynamoDB::Model::PutItemRequest req; + req.SetTableName(table); + req.SetReturnConsumedCapacity(Aws::DynamoDB::Model::ReturnConsumedCapacity::TOTAL); + + Aws::DynamoDB::Model::AttributeValue av; + av.SetB(buf); + req.AddItem("data", av); + av.SetS(key); + req.AddItem("key", av); + + uint64_t bef = timeSinceEpochMillisec(); + const Aws::DynamoDB::Model::PutItemOutcome put_result = _client->PutItem(req); + if (!put_result.IsSuccess()) { + std::cout << put_result.GetError().GetMessage() << std::endl; + return 1; + } + auto result = put_result.GetResult(); + // GetWriteCapacityUnits returns 0? + write_units = result.GetConsumedCapacity().GetCapacityUnits(); + uint64_t finishedTime = timeSinceEpochMillisec(); + + return finishedTime - bef; +} diff --git a/benchmarks/wrappers/aws/cpp/key-value.hpp b/benchmarks/wrappers/aws/cpp/key-value.hpp new file mode 100644 index 00000000..e2fc48a3 --- /dev/null +++ b/benchmarks/wrappers/aws/cpp/key-value.hpp @@ -0,0 +1,31 @@ + +#include +#include +#include +#include + +#include +#include + +class KeyValue +{ + // non-copyable, non-movable + std::shared_ptr _client; +public: + + KeyValue(); + +uint64_t download_file(Aws::String const &bucket, + Aws::String const &key, + int& required_retries, + double& read_units, + bool with_backoff = false); + +uint64_t upload_file(Aws::String const &bucket, + Aws::String const &key, + double& write_units, + int size, + unsigned char* pBuf); + +}; + diff --git a/benchmarks/wrappers/aws/cpp/redis.cpp b/benchmarks/wrappers/aws/cpp/redis.cpp new file mode 100644 index 00000000..41b17c75 --- /dev/null +++ b/benchmarks/wrappers/aws/cpp/redis.cpp @@ -0,0 +1,104 @@ + +#include + +#include + +#include "redis.hpp" +#include "utils.hpp" + +Redis::Redis(std::string redis_hostname, int redis_port) +{ + _context = redisConnect(redis_hostname.c_str(), redis_port); + if (_context == nullptr || _context->err) { + if (_context) { + std::cerr << "Redis Error: " << _context->errstr << '\n'; + } else { + std::cerr << "Can't allocate redis context\n"; + } + } +} + +bool Redis::is_initialized() +{ + return _context != nullptr; +} + +Redis::~Redis() +{ + redisFree(_context); +} + +uint64_t Redis::download_file(Aws::String const &key, + int &required_retries, bool with_backoff) +{ + std::string comm = "GET " + key; + + auto bef = timeSinceEpochMillisec(); + int retries = 0; + const int MAX_RETRIES = 50000; + + while (retries < MAX_RETRIES) { + + redisReply* reply = (redisReply*) redisCommand(_context, comm.c_str()); + + if (reply->type == REDIS_REPLY_NIL || reply->type == REDIS_REPLY_ERROR) { + + retries += 1; + if(with_backoff) { + int sleep_time = retries; + if (retries > 100) { + sleep_time = retries * 2; + } + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); + } + + } else { + + uint64_t finishedTime = timeSinceEpochMillisec(); + required_retries = retries; + + freeReplyObject(reply); + return finishedTime - bef; + + } + freeReplyObject(reply); + } + return 0; +} + +uint64_t Redis::upload_file(Aws::String const &key, + int size, char* pBuf) +{ + std::string comm = "SET " + key + " %b"; + + + uint64_t bef = timeSinceEpochMillisec(); + redisReply* reply = (redisReply*) redisCommand(_context, comm.c_str(), pBuf, size); + uint64_t finishedTime = timeSinceEpochMillisec(); + + if (reply->type == REDIS_REPLY_NIL || reply->type == REDIS_REPLY_ERROR) { + std::cerr << "Failed to write in Redis!" << std::endl; + abort(); + } + freeReplyObject(reply); + + return finishedTime - bef; +} + +uint64_t Redis::delete_file(std::string const &key) +{ + std::string comm = "DEL " + key; + + uint64_t bef = timeSinceEpochMillisec(); + redisReply* reply = (redisReply*) redisCommand(_context, comm.c_str()); + uint64_t finishedTime = timeSinceEpochMillisec(); + + if (reply->type == REDIS_REPLY_NIL || reply->type == REDIS_REPLY_ERROR) { + std::cerr << "Couldn't delete the key!" << '\n'; + abort(); + } + freeReplyObject(reply); + + return finishedTime - bef; +} + diff --git a/benchmarks/wrappers/aws/cpp/redis.hpp b/benchmarks/wrappers/aws/cpp/redis.hpp new file mode 100644 index 00000000..f30de43f --- /dev/null +++ b/benchmarks/wrappers/aws/cpp/redis.hpp @@ -0,0 +1,26 @@ + +#include +#include + +#include + +#include + +class Redis +{ + redisContext* _context; +public: + + Redis(std::string redis_hostname, int redis_port); + ~Redis(); + + bool is_initialized(); + + uint64_t download_file(Aws::String const &key, int &required_retries, bool with_backoff); + + uint64_t upload_file(Aws::String const &key, int size, char* pBuf); + + uint64_t delete_file(std::string const &key); + +}; + diff --git a/benchmarks/wrappers/aws/cpp/storage.cpp b/benchmarks/wrappers/aws/cpp/storage.cpp new file mode 100644 index 00000000..cb68b698 --- /dev/null +++ b/benchmarks/wrappers/aws/cpp/storage.cpp @@ -0,0 +1,94 @@ + +#include + +#include +#include +#include +#include + +#include + +#include "storage.hpp" +#include "utils.hpp" + +Storage Storage::get_client() +{ + Aws::Client::ClientConfiguration config; + //config.region = "eu-central-1"; + config.caFile = "/etc/pki/tls/certs/ca-bundle.crt"; + + std::cout << std::getenv("AWS_REGION") << std::endl; + + char const TAG[] = "LAMBDA_ALLOC"; + auto credentialsProvider = Aws::MakeShared(TAG); + Aws::S3::S3Client client(credentialsProvider, config); + return Storage(std::move(client)); +} + +std::string Storage::key_join(std::initializer_list paths) +{ + std::string path = *paths.begin(); + for (auto iter = paths.begin() + 1, end = paths.end(); iter != end; ++iter) + path.append("/").append(*iter); + return path; +} + +uint64_t Storage::download_file(Aws::String const &bucket, Aws::String const &key, + int &required_retries, bool with_backoff) +{ + + + Aws::S3::Model::GetObjectRequest request; + request.WithBucket(bucket).WithKey(key); + auto bef = timeSinceEpochMillisec(); + + int retries = 0; + const int MAX_RETRIES = 1500; + while (retries < MAX_RETRIES) { + auto outcome = this->_client.GetObject(request); + if (outcome.IsSuccess()) { + auto& s = outcome.GetResult().GetBody(); + uint64_t finishedTime = timeSinceEpochMillisec(); + // Perform NOP on result to prevent optimizations + std::stringstream ss; + ss << s.rdbuf(); + std::string first(" "); + ss.get(&first[0], 1); + required_retries = retries; + return finishedTime - bef; + } else { + retries += 1; + if(with_backoff) { + int sleep_time = retries; + if (retries > 100) { + sleep_time = retries * 2; + } + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); + } + } + } + return 0; + +} + +uint64_t Storage::upload_file(Aws::String const &bucket, + Aws::String const &key, + int size, char* pBuf) +{ + /** + * We use Boost's bufferstream to wrap the array as an IOStream. Usign a light-weight streambuf wrapper, as many solutions + * (e.g. https://stackoverflow.com/questions/13059091/creating-an-input-stream-from-constant-memory) on the internet + * suggest does not work because the S3 SDK relies on proper functioning tellp(), etc... (for instance to get the body length). + */ + const std::shared_ptr input_data = std::make_shared(pBuf, size); + + Aws::S3::Model::PutObjectRequest request; + request.WithBucket(bucket).WithKey(key); + request.SetBody(input_data); + uint64_t bef_upload = timeSinceEpochMillisec(); + Aws::S3::Model::PutObjectOutcome outcome = this->_client.PutObject(request); + if (!outcome.IsSuccess()) { + std::cerr << "Error: PutObject: " << outcome.GetError().GetMessage() << std::endl; + } + return bef_upload; +} diff --git a/benchmarks/wrappers/aws/cpp/storage.hpp b/benchmarks/wrappers/aws/cpp/storage.hpp new file mode 100644 index 00000000..c7d7519b --- /dev/null +++ b/benchmarks/wrappers/aws/cpp/storage.hpp @@ -0,0 +1,32 @@ + +#include +#include +#include + +#include +#include + +class Storage +{ + Aws::S3::S3Client _client; +public: + + Storage(Aws::S3::S3Client && client): + _client(client) + {} + + static Storage get_client(); + + std::string key_join(std::initializer_list paths); + + uint64_t download_file(Aws::String const &bucket, + Aws::String const &key, + int &required_retries, + bool with_backoff = false); + + uint64_t upload_file(Aws::String const &bucket, + Aws::String const &key, + int size, char* pBuf); + +}; + diff --git a/benchmarks/wrappers/aws/cpp/tcp.cpp b/benchmarks/wrappers/aws/cpp/tcp.cpp new file mode 100644 index 00000000..0fe492e2 --- /dev/null +++ b/benchmarks/wrappers/aws/cpp/tcp.cpp @@ -0,0 +1,90 @@ + +#include +#include + +#include +#include + +#include + +#include "tcp.hpp" +#include "utils.hpp" + +void TCP::connect_producer(int num_consumers) +{ + _sockets.resize(num_consumers); + for (int i = 0; i < num_consumers; i++) { + std::string prod_pairing_key = _pairing_key + "_" + std::to_string(i); + std::cerr << "Producer begins pairing " << prod_pairing_key << '\n'; + _sockets[i] = pair(prod_pairing_key, _address); + } + std::cerr << "Succesful pairing on all consumers" << '\n'; +} + +void TCP::connect_consumer(int id) +{ + _sockets.resize(1); + std::string prod_pairing_key = _pairing_key + "_" + std::to_string(id); + std::cerr << "Begin pairing " << prod_pairing_key << '\n'; + _sockets[0] = pair(prod_pairing_key, _address); + std::cerr << "Succesful pairing " << prod_pairing_key << '\n'; + + struct timeval timeout; + timeout.tv_sec = 10; + timeout.tv_usec = 0; + + if(setsockopt(_sockets[0], SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) < 0) + std::cerr << "Couldn't set timeout!" << std::endl; + + if(setsockopt(_sockets[0], SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof timeout) < 0) + std::cerr << "Couldn't set timeout!" << std::endl; +} + +TCP::~TCP() +{ + for(int socket : _sockets) + close(socket); +} + +uint64_t TCP::download_file(int id, int size, char* recv_buffer) +{ + uint64_t bef = timeSinceEpochMillisec(); + bool failure = false; + + int recv_bytes = 0; + while (recv_bytes < size) { + int n = recv(_sockets[id], recv_buffer + recv_bytes, size - recv_bytes, 0); + if (n > 0) { + recv_bytes += n; + } + if (n == -1) { + std::cout << "Error: " << errno << std::endl; + failure = true; + break; + } + } + uint64_t finishedTime = timeSinceEpochMillisec(); + + return failure ? 0 : finishedTime - bef; +} + +uint64_t TCP::upload_file(int id, int size, char* pBuf) +{ + uint64_t bef = timeSinceEpochMillisec(); + bool failure = false; + + int sent_bytes = 0; + while (sent_bytes < size) { + int bytes = send(_sockets[id], pBuf + sent_bytes, size - sent_bytes, 0); + sent_bytes += bytes; + if (bytes == -1) { + failure = true; + std::cerr << "Failed sending! " << errno << std::endl; + break; + } + } + uint64_t finishedTime = timeSinceEpochMillisec(); + + return failure ? 0 : finishedTime - bef; +} + diff --git a/benchmarks/wrappers/aws/cpp/tcp.hpp b/benchmarks/wrappers/aws/cpp/tcp.hpp new file mode 100644 index 00000000..31698376 --- /dev/null +++ b/benchmarks/wrappers/aws/cpp/tcp.hpp @@ -0,0 +1,28 @@ + +#include +#include +#include + +class TCP +{ + std::string _address; + std::string _pairing_key; + std::vector _sockets; +public: + + TCP(std::string hole_puncher_ip, std::string pairing_key): + _address(hole_puncher_ip), + _pairing_key(pairing_key) + {} + ~TCP(); + + void connect_consumer(int id); + void connect_producer(int consumers); + + + uint64_t download_file(int id, int size, char* recv_buffer); + + uint64_t upload_file(int id, int size, char* pBuf); + +}; + diff --git a/benchmarks/wrappers/aws/cpp/utils.cpp b/benchmarks/wrappers/aws/cpp/utils.cpp new file mode 100644 index 00000000..d3d9207e --- /dev/null +++ b/benchmarks/wrappers/aws/cpp/utils.cpp @@ -0,0 +1,10 @@ + +#include "utils.hpp" + +uint64_t timeSinceEpochMillisec() +{ + auto now = std::chrono::high_resolution_clock::now(); + auto time = now.time_since_epoch(); + return std::chrono::duration_cast< std::chrono::microseconds >(time).count(); +} + diff --git a/benchmarks/wrappers/aws/cpp/utils.hpp b/benchmarks/wrappers/aws/cpp/utils.hpp new file mode 100644 index 00000000..0fff5f8a --- /dev/null +++ b/benchmarks/wrappers/aws/cpp/utils.hpp @@ -0,0 +1,5 @@ + +#include + +uint64_t timeSinceEpochMillisec(); + diff --git a/config/example.json b/config/example.json index dc4da9ad..d0837634 100644 --- a/config/example.json +++ b/config/example.json @@ -38,6 +38,18 @@ "function_copy_idx": 0, "repetitions": 5, "sleep": 1 + }, + "communication-p2p": { + "type": "storage", + "sizes": [1, 512, 1024, 16384, 131072, 1048576], + "invocations": { + "warmup": 5, + "invocations_per_round": [10, 50, 100, 200, 500], + "total": 1000 + }, + "tcpuncher": { + "address": "" + } } }, "deployment": { diff --git a/config/systems.json b/config/systems.json index c38f1233..36cd661e 100644 --- a/config/systems.json +++ b/config/systems.json @@ -62,6 +62,19 @@ "uuid": "3.4.0" } } + }, + "cpp": { + "base_images": { + "all": "amazon/aws-lambda-provided:al2.2022.04.27.09" + }, + "dependencies": [ + "runtime", "sdk", "boost", "hiredis", "tcpunch" + ], + "versions": ["all"], + "images": ["build"], + "deployment": { + "files": [ "handler.cpp", "key-value.cpp", "key-value.hpp", "storage.cpp", "storage.hpp", "redis.hpp", "redis.cpp", "tcp.hpp", "tcp.cpp", "utils.cpp", "utils.hpp"] + } } } }, diff --git a/docker/aws/cpp/Dockerfile.build b/docker/aws/cpp/Dockerfile.build new file mode 100755 index 00000000..bc188020 --- /dev/null +++ b/docker/aws/cpp/Dockerfile.build @@ -0,0 +1,34 @@ + +ARG BASE_REPOSITORY +ARG BASE_IMAGE +FROM ${BASE_REPOSITORY}:dependencies-sdk.aws.cpp.all as sdk +FROM ${BASE_REPOSITORY}:dependencies-boost.aws.cpp.all as boost +FROM ${BASE_REPOSITORY}:dependencies-hiredis.aws.cpp.all as hiredis +FROM ${BASE_REPOSITORY}:dependencies-runtime.aws.cpp.all as runtime +FROM ${BASE_REPOSITORY}:dependencies-tcpunch.aws.cpp.all as tcpunch + +FROM ${BASE_IMAGE} as builder + +RUN yum install -y cmake3 curl libcurl libcurl-devel git gcc gcc-c++ make tar gzip zip zlib-devel openssl-devel openssl-static +ENV GOSU_VERSION 1.14 +# https://github.com/tianon/gosu/releases/tag/1.14 +# key https://keys.openpgp.org/search?q=tianon%40debian.org +RUN curl -o /usr/local/bin/gosu -SL "https://github.com/tianon/gosu/releases/download/${GOSU_VERSION}/gosu-amd64" \ + && chmod +x /usr/local/bin/gosu +RUN mkdir -p /sebs/ +COPY docker/entrypoint.sh /sebs/entrypoint.sh +COPY docker/cpp_installer.sh /sebs/installer.sh +RUN chmod +x /sebs/entrypoint.sh +RUN chmod +x /sebs/installer.sh + +COPY --from=sdk /opt /opt +COPY --from=boost /opt /opt +COPY --from=runtime /opt /opt +COPY --from=hiredis /opt /opt +COPY --from=tcpunch /opt /opt + +# useradd and groupmod is installed in /usr/sbin which is not in PATH +ENV PATH=/usr/sbin:$PATH +CMD /bin/bash /sebs/installer.sh +ENTRYPOINT ["/sebs/entrypoint.sh"] + diff --git a/docker/aws/cpp/Dockerfile.dependencies-boost b/docker/aws/cpp/Dockerfile.dependencies-boost new file mode 100755 index 00000000..ab987b9e --- /dev/null +++ b/docker/aws/cpp/Dockerfile.dependencies-boost @@ -0,0 +1,19 @@ +ARG BASE_IMAGE +FROM ${BASE_IMAGE} as builder +ARG WORKERS +ENV WORKERS=${WORKERS} + + +RUN yum install -y cmake curl libcurl libcurl-devel git gcc gcc-c++ make tar gzip which python-devel +RUN curl -LO https://boostorg.jfrog.io/artifactory/main/release/1.79.0/source/boost_1_79_0.tar.gz\ + && tar -xf boost_1_79_0.tar.gz && cd boost_1_79_0\ + && echo "using gcc : : $(which gcc10-c++) ;" >> tools/build/src/user-config.jam\ + && ./bootstrap.sh --prefix=/opt\ + && ./b2 -j${WORKERS} --prefix=/opt cxxflags="-fPIC" link=static install +#RUN curl -LO https://boostorg.jfrog.io/artifactory/main/release/1.79.0/source/boost_1_79_0.tar.gz\ +# && tar -xf boost_1_79_0.tar.gz && cd boost_1_79_0 && ./bootstrap.sh --prefix=/opt\ +# && ./b2 -j${WORKERS} --prefix=/opt cxxflags="-fPIC" link=static install + +FROM ${BASE_IMAGE} + +COPY --from=builder /opt /opt diff --git a/docker/aws/cpp/Dockerfile.dependencies-hiredis b/docker/aws/cpp/Dockerfile.dependencies-hiredis new file mode 100755 index 00000000..4842a22d --- /dev/null +++ b/docker/aws/cpp/Dockerfile.dependencies-hiredis @@ -0,0 +1,12 @@ +ARG BASE_IMAGE +FROM ${BASE_IMAGE} as builder +ARG WORKERS +ENV WORKERS=${WORKERS} + +RUN yum install -y git make gcc gcc-c++ +RUN git clone https://github.com/redis/hiredis.git && cd hiredis\ + && PREFIX=/opt make -j${WORKERS} install + +FROM ${BASE_IMAGE} + +COPY --from=builder /opt /opt diff --git a/docker/aws/cpp/Dockerfile.dependencies-runtime b/docker/aws/cpp/Dockerfile.dependencies-runtime new file mode 100755 index 00000000..ed570ec5 --- /dev/null +++ b/docker/aws/cpp/Dockerfile.dependencies-runtime @@ -0,0 +1,14 @@ +ARG BASE_IMAGE +FROM ${BASE_IMAGE} as builder +ARG WORKERS +ENV WORKERS=${WORKERS} + + +RUN yum install -y cmake3 curl libcurl libcurl-devel git gcc gcc-c++ make tar gzip +RUN git clone https://github.com/awslabs/aws-lambda-cpp.git\ + && cmake3 -DCMAKE_BUILD_TYPE=Release -DBUILD_SHARED_LIBS=OFF -DCMAKE_INSTALL_PREFIX=/opt -B aws-lambda-cpp/build -S aws-lambda-cpp\ + && cmake3 --build aws-lambda-cpp/build --parallel ${WORKERS} --target install + +FROM ${BASE_IMAGE} + +COPY --from=builder /opt /opt diff --git a/docker/aws/cpp/Dockerfile.dependencies-sdk b/docker/aws/cpp/Dockerfile.dependencies-sdk new file mode 100755 index 00000000..f1c44f53 --- /dev/null +++ b/docker/aws/cpp/Dockerfile.dependencies-sdk @@ -0,0 +1,14 @@ +ARG BASE_IMAGE +FROM ${BASE_IMAGE} as builder +ARG WORKERS +ENV WORKERS=${WORKERS} + +RUN yum install -y cmake3 curl libcurl libcurl-devel git gcc gcc-c++ make zlib-devel openssl-devel +RUN git clone --recurse-submodules https://github.com/aws/aws-sdk-cpp.git\ + && cd aws-sdk-cpp && mkdir build && cd build\ + && cmake3 -DCMAKE_BUILD_TYPE=Release -DBUILD_ONLY="s3;dynamodb;sqs" -DENABLE_TESTING=OFF -DCMAKE_INSTALL_PREFIX=/opt/ ..\ + && cmake3 --build . --parallel ${WORKERS} --target install + +FROM ${BASE_IMAGE} + +COPY --from=builder /opt /opt diff --git a/docker/aws/cpp/Dockerfile.dependencies-tcpunch b/docker/aws/cpp/Dockerfile.dependencies-tcpunch new file mode 100755 index 00000000..e9622858 --- /dev/null +++ b/docker/aws/cpp/Dockerfile.dependencies-tcpunch @@ -0,0 +1,14 @@ +ARG BASE_IMAGE +FROM ${BASE_IMAGE} as builder +ARG WORKERS +ENV WORKERS=${WORKERS} + + +RUN yum install -y cmake3 curl libcurl libcurl-devel git gcc gcc-c++ make tar gzip which python-devel +RUN git clone https://github.com/spcl/tcpunch.git && cd tcpunch \ + && cmake3 -DCMAKE_INSTALL_PREFIX=/opt -DCMAKE_BUILD_TYPE=Release -S client -B build \ + && cmake3 --build build --target install + +FROM ${BASE_IMAGE} + +COPY --from=builder /opt /opt diff --git a/docker/aws/python/Dockerfile.build b/docker/aws/python/Dockerfile.build index 960fc300..f41752ed 100755 --- a/docker/aws/python/Dockerfile.build +++ b/docker/aws/python/Dockerfile.build @@ -4,7 +4,7 @@ ARG VERSION ENV PYTHON_VERSION=${VERSION} # useradd, groupmod -RUN yum install -y shadow-utils +RUN yum install -y cmake curl libcurl libcurl-devel ENV GOSU_VERSION 1.14 # https://github.com/tianon/gosu/releases/tag/1.14 # key https://keys.openpgp.org/search?q=tianon%40debian.org diff --git a/docker/cpp_installer.sh b/docker/cpp_installer.sh new file mode 100644 index 00000000..fe237d9b --- /dev/null +++ b/docker/cpp_installer.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +mkdir -p /mnt/function/build +cmake3 -DCMAKE_BUILD_TYPE=Release -S /mnt/function/ -B /mnt/function/build > /mnt/function/build/configuration.log +VERBOSE=1 cmake3 --build /mnt/function/build --target aws-lambda-package-benchmark > /mnt/function/build/compilation.log + diff --git a/docs/platforms.md b/docs/platforms.md index ff6d22d6..63f99743 100644 --- a/docs/platforms.md +++ b/docs/platforms.md @@ -6,12 +6,17 @@ Furthermore, we support the open source FaaS system OpenWhisk. ## AWS Lambda AWS provides one year of free services, including a significant amount of computing time in AWS Lambda. -To work with AWS, you need to provide access and secret keys to a role with permissions -sufficient to manage functions and S3 resources. +To work with AWS, you need to provide access and secret keys with permissions +sufficient to manage functions, S3 resources. +When working with communication benchmarks, it might be necessary to use DynamoDB as well. Additionally, the account must have `AmazonAPIGatewayAdministrator` permission to set up automatically AWS HTTP trigger. + You can provide a [role](https://docs.aws.amazon.com/lambda/latest/dg/lambda-intro-execution-role.html) with permissions to access AWS Lambda and S3; otherwise, one will be created automatically. +To run communication benchmarks, the role needs to access DynamoDB. +Furthermore, to access AWS ElastiCache, the role needs an additional permission: +`AWSLambdaVPCAccessExecutionRole`. To use a user-defined lambda role, set the name in config JSON - see an example in `config/example.json`. **Pass the credentials as environmental variables for the first run.** They will be cached for future use. diff --git a/sebs.py b/sebs.py index ce78036c..38fd3b22 100755 --- a/sebs.py +++ b/sebs.py @@ -63,7 +63,7 @@ def simplified_common_params(func): @click.option( "--language", default=None, - type=click.Choice(["python", "nodejs"]), + type=click.Choice(["python", "nodejs", "cpp"]), help="Benchmark language", ) @click.option("--language-version", default=None, type=str, help="Benchmark language version") @@ -460,7 +460,7 @@ def experiment_invoke(experiment, **kwargs): @click.argument("experiment", type=str) # , help="Benchmark to be launched.") @click.option("--extend-time-interval", type=int, default=-1) # , help="Benchmark to be launched.") @common_params -def experment_process(experiment, extend_time_interval, **kwargs): +def experiment_process(experiment, extend_time_interval, **kwargs): ( config, output_dir, diff --git a/sebs/aws/aws.py b/sebs/aws/aws.py index 6c34af90..01aa9f3d 100644 --- a/sebs/aws/aws.py +++ b/sebs/aws/aws.py @@ -6,6 +6,7 @@ from typing import cast, Dict, List, Optional, Tuple, Type, Union # noqa import boto3 +import botocore import docker from sebs.aws.s3 import S3 @@ -19,6 +20,7 @@ from sebs.faas.function import Function, ExecutionResult, Trigger, FunctionConfig from sebs.faas.storage import PersistentStorage from sebs.faas.system import System +from sebs.types import Language class AWS(System): @@ -72,9 +74,12 @@ def initialize(self, config: Dict[str, str] = {}): def get_lambda_client(self): if not hasattr(self, "client"): + # allow for long invocations + config = botocore.config.Config( + read_timeout=900, connect_timeout=900, retries={"max_attempts": 0} + ) self.client = self.session.client( - service_name="lambda", - region_name=self.config.region, + service_name="lambda", region_name=self.config.region, config=config ) return self.client @@ -94,6 +99,7 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage: self.storage = S3( self.session, self.cache_client, + self.config.resources, self.config.region, access_key=self.config.credentials.access_key, secret_key=self.config.credentials.secret_key, @@ -125,42 +131,57 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage: def package_code( self, directory: str, - language_name: str, + language: Language, language_version: str, benchmark: str, is_cached: bool, ) -> Tuple[str, int]: CONFIG_FILES = { - "python": ["handler.py", "requirements.txt", ".python_packages"], - "nodejs": ["handler.js", "package.json", "node_modules"], + Language.PYTHON: ["handler.py", "requirements.txt", ".python_packages"], + Language.NODEJS: ["handler.js", "package.json", "node_modules"], } - package_config = CONFIG_FILES[language_name] - function_dir = os.path.join(directory, "function") - os.makedirs(function_dir) - # move all files to 'function' except handler.py - for file in os.listdir(directory): - if file not in package_config: - file = os.path.join(directory, file) - shutil.move(file, function_dir) - - # FIXME: use zipfile - # create zip with hidden directory but without parent directory - execute("zip -qu -r9 {}.zip * .".format(benchmark), shell=True, cwd=directory) - benchmark_archive = "{}.zip".format(os.path.join(directory, benchmark)) - self.logging.info("Created {} archive".format(benchmark_archive)) - - bytes_size = os.path.getsize(os.path.join(directory, benchmark_archive)) - mbytes = bytes_size / 1024.0 / 1024.0 - self.logging.info("Zip archive size {:2f} MB".format(mbytes)) - - return os.path.join(directory, "{}.zip".format(benchmark)), bytes_size + + if language in [Language.PYTHON, Language.NODEJS]: + package_config = CONFIG_FILES[language] + function_dir = os.path.join(directory, "function") + os.makedirs(function_dir) + # move all files to 'function' except handler.py + for file in os.listdir(directory): + if file not in package_config: + file = os.path.join(directory, file) + shutil.move(file, function_dir) + + # FIXME: use zipfile + # create zip with hidden directory but without parent directory + execute("zip -qu -r9 {}.zip * .".format(benchmark), shell=True, cwd=directory) + benchmark_archive = "{}.zip".format(os.path.join(directory, benchmark)) + self.logging.info("Created {} archive".format(benchmark_archive)) + + bytes_size = os.path.getsize(os.path.join(directory, benchmark_archive)) + mbytes = bytes_size / 1024.0 / 1024.0 + self.logging.info("Zip archive size {:2f} MB".format(mbytes)) + + return os.path.join(directory, "{}.zip".format(benchmark)), bytes_size + elif language == Language.CPP: + + # lambda C++ runtime build scripts create the .zip file in build directory + benchmark_archive = os.path.join(directory, "build", "benchmark.zip") + self.logging.info("Created {} archive".format(benchmark_archive)) + + bytes_size = os.path.getsize(os.path.join(directory, benchmark_archive)) + mbytes = bytes_size / 1024.0 / 1024.0 + self.logging.info("Zip archive size {:2f} MB".format(mbytes)) + + return benchmark_archive, bytes_size + else: + raise NotImplementedError() def create_function(self, code_package: Benchmark, func_name: str) -> "LambdaFunction": package = code_package.code_location benchmark = code_package.benchmark - language = code_package.language_name + language = code_package.language language_runtime = code_package.language_version timeout = code_package.benchmark_config.timeout memory = code_package.benchmark_config.memory @@ -210,7 +231,7 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LambdaFun code_config = {"S3Bucket": code_bucket, "S3Key": code_package_name} ret = self.client.create_function( FunctionName=func_name, - Runtime="{}{}".format(language, language_runtime), + Runtime=self.cloud_runtime(language, language_runtime), Handler="handler.handler", Role=self.config.resources.lambda_role(self.session), MemorySize=memory, @@ -534,3 +555,11 @@ def wait_function_updated(self, func: LambdaFunction): waiter = self.client.get_waiter("function_updated_v2") waiter.wait(FunctionName=func.name) self.logging.info("Lambda function has been updated.") + + def cloud_runtime(self, language: Language, language_version: str): + if language in [Language.NODEJS, Language.PYTHON]: + return ("{}{}".format(language, language_version),) + elif language == Language.CPP: + return "provided.al2" + else: + raise NotImplementedError() diff --git a/sebs/aws/config.py b/sebs/aws/config.py index 849f40aa..3cdbe4e7 100644 --- a/sebs/aws/config.py +++ b/sebs/aws/config.py @@ -99,19 +99,17 @@ def serialize(self) -> dict: out = {"arn": self.arn, "endpoint": self.endpoint} return out - def __init__(self, lambda_role: str): - super().__init__() - self._lambda_role = lambda_role + def __init__(self): + super().__init__(name="aws") + self._lambda_role = "" self._http_apis: Dict[str, AWSResources.HTTPApi] = {} self._region: Optional[str] = None + self._dynamodb_client = None @staticmethod def typename() -> str: return "AWS.Resources" - def set_region(self, region: str): - self._region = region - def lambda_role(self, boto3_session: boto3.session.Session) -> str: if not self._lambda_role: iam_client = boto3_session.client(service_name="iam") @@ -126,8 +124,11 @@ def lambda_role(self, boto3_session: boto3.session.Session) -> str: ], } role_name = "sebs-lambda-role" + # FIXME: this should be configurable attached_policies = [ "arn:aws:iam::aws:policy/AmazonS3FullAccess", + "arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess", + "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole", "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", ] try: @@ -187,23 +188,28 @@ def http_api( self.logging.info(f"Using cached HTTP API {api_name}") return http_api - # FIXME: python3.7+ future annotatons @staticmethod - def initialize(dct: dict) -> Resources: - ret = AWSResources(dct["lambda-role"] if "lambda-role" in dct else "") + def initialize(res: Resources, dct: dict): + + ret = cast(AWSResources, res) + super(AWSResources, AWSResources).initialize(ret, dct) + ret._lambda_role = dct["lambda-role"] if "lambda-role" in dct else "" if "http-apis" in dct: for key, value in dct["http-apis"].items(): ret._http_apis[key] = AWSResources.HTTPApi.deserialize(value) + return ret def serialize(self) -> dict: out = { + **super().serialize(), "lambda-role": self._lambda_role, "http-apis": {key: value.serialize() for (key, value) in self._http_apis.items()}, } return out def update_cache(self, cache: Cache): + super().update_cache(cache) cache.update_config(val=self._lambda_role, keys=["aws", "resources", "lambda-role"]) for name, api in self._http_apis.items(): cache.update_config(val=api.serialize(), keys=["aws", "resources", "http-apis", name]) @@ -211,30 +217,50 @@ def update_cache(self, cache: Cache): @staticmethod def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources: + ret = AWSResources() cached_config = cache.get_config("aws") - ret: AWSResources # Load cached values if cached_config and "resources" in cached_config: - ret = cast(AWSResources, AWSResources.initialize(cached_config["resources"])) + AWSResources.initialize(ret, cached_config["resources"]) ret.logging_handlers = handlers ret.logging.info("Using cached resources for AWS") else: # Check for new config if "resources" in config: - ret = cast(AWSResources, AWSResources.initialize(config["resources"])) + AWSResources.initialize(ret, config["resources"]) ret.logging_handlers = handlers ret.logging.info("No cached resources for AWS found, using user configuration.") else: - ret = AWSResources(lambda_role="") + AWSResources.initialize(ret, {}) ret.logging_handlers = handlers ret.logging.info("No resources for AWS found, initialize!") return ret + def _create_key_value_table(self, name: str): + + if self._dynamodb_client is None: + self._dynamodb_client = boto3.client("dynamodb", region_name=self.region) + + try: + self._dynamodb_client.create_table( + AttributeDefinitions=[{"AttributeName": "key", "AttributeType": "S"}], + TableName=name, + KeySchema=[{"AttributeName": "key", "KeyType": "HASH"}], + BillingMode="PAY_PER_REQUEST", + ) + self.logging.info(f"Waiting to create DynamoDB table {name}.") + waiter = self._dynamodb_client.get_waiter("table_exists") + waiter.wait(TableName=name) + self.logging.info(f"DynamoDB table {name} has been created.") + except self._dynamodb_client.exceptions.ResourceInUseException: + # it's ok that the table already exists + self.logging.info(f"Using existing DynamoDB table {name}.") + class AWSConfig(Config): def __init__(self, credentials: AWSCredentials, resources: AWSResources): - super().__init__() + super().__init__(name="aws") self._credentials = credentials self._resources = resources @@ -250,11 +276,9 @@ def credentials(self) -> AWSCredentials: def resources(self) -> AWSResources: return self._resources - # FIXME: use future annotations (see sebs/faas/system) @staticmethod def initialize(cfg: Config, dct: dict): - config = cast(AWSConfig, cfg) - config._region = dct["region"] + super(AWSConfig, AWSConfig).initialize(cfg, dct) @staticmethod def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config: @@ -273,7 +297,7 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config config_obj.logging.info("Using user-provided config for AWS") AWSConfig.initialize(config_obj, config) - resources.set_region(config_obj.region) + resources.region = config_obj.region return config_obj """ @@ -284,14 +308,13 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config """ def update_cache(self, cache: Cache): - cache.update_config(val=self.region, keys=["aws", "region"]) + super().update_cache(cache) self.credentials.update_cache(cache) self.resources.update_cache(cache) def serialize(self) -> dict: out = { - "name": "aws", - "region": self._region, + **super().serialize(), "credentials": self._credentials.serialize(), "resources": self._resources.serialize(), } diff --git a/sebs/aws/s3.py b/sebs/aws/s3.py index 765cace3..f46360e4 100644 --- a/sebs/aws/s3.py +++ b/sebs/aws/s3.py @@ -4,6 +4,7 @@ import boto3 from sebs.cache import Cache +from sebs.faas.config import Resources from ..faas.storage import PersistentStorage @@ -28,12 +29,13 @@ def __init__( self, session: boto3.session.Session, cache_client: Cache, + resources: Resources, location: str, access_key: str, secret_key: str, replace_existing: bool, ): - super().__init__(location, cache_client, replace_existing) + super().__init__(location, cache_client, resources, replace_existing) self.client = session.client( "s3", region_name=location, @@ -45,15 +47,20 @@ def __init__( def correct_name(self, name: str) -> str: return name - def _create_bucket(self, name: str, buckets: List[str] = []): + def _create_bucket(self, name: str, buckets: List[str] = [], randomize_name: bool = False): for bucket_name in buckets: if name in bucket_name: self.logging.info( "Bucket {} for {} already exists, skipping.".format(bucket_name, name) ) return bucket_name - random_name = str(uuid.uuid4())[0:16] - bucket_name = "{}-{}".format(name, random_name) + + if randomize_name: + random_name = str(uuid.uuid4())[0:16] + bucket_name = "{}-{}".format(name, random_name) + else: + bucket_name = name + try: # this is incredible # https://github.com/boto/boto3/issues/125 diff --git a/sebs/aws/triggers.py b/sebs/aws/triggers.py index f1831459..acb99ea7 100644 --- a/sebs/aws/triggers.py +++ b/sebs/aws/triggers.py @@ -4,6 +4,8 @@ import json from typing import Dict, Optional # noqa +from botocore.exceptions import ReadTimeoutError + from sebs.aws.aws import AWS from sebs.faas.function import ExecutionResult, Trigger @@ -37,9 +39,18 @@ def sync_invoke(self, payload: dict) -> ExecutionResult: serialized_payload = json.dumps(payload).encode("utf-8") client = self.deployment_client.get_lambda_client() - begin = datetime.datetime.now() - ret = client.invoke(FunctionName=self.name, Payload=serialized_payload, LogType="Tail") - end = datetime.datetime.now() + + try: + begin = datetime.datetime.now() + ret = client.invoke(FunctionName=self.name, Payload=serialized_payload, LogType="Tail") + end = datetime.datetime.now() + except ReadTimeoutError: + end = datetime.datetime.now() + self.logging.error("Invocation of {} failed!".format(self.name)) + self.logging.error("Input: {}".format(serialized_payload.decode("utf-8"))) + aws_result = ExecutionResult.from_times(begin, end) + aws_result.stats.failure = True + return aws_result aws_result = ExecutionResult.from_times(begin, end) aws_result.request_id = ret["ResponseMetadata"]["RequestId"] diff --git a/sebs/azure/azure.py b/sebs/azure/azure.py index e957d693..721750a1 100644 --- a/sebs/azure/azure.py +++ b/sebs/azure/azure.py @@ -20,6 +20,7 @@ from ..faas.function import Function, FunctionConfig, ExecutionResult from ..faas.storage import PersistentStorage from ..faas.system import System +from sebs.types import Language class Azure(System): @@ -98,6 +99,7 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage: self.storage = BlobStorage( self.config.region, self.cache_client, + self.config.resources, self.config.resources.data_storage_account(self.cli_instance).connection_string, replace_existing=replace_existing, ) @@ -117,7 +119,7 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage: def package_code( self, directory: str, - language_name: str, + language: Language, language_version: str, benchmark: str, is_cached: bool, @@ -125,12 +127,12 @@ def package_code( # In previous step we ran a Docker container which installed packages # Python packages are in .python_packages because this is expected by Azure - EXEC_FILES = {"python": "handler.py", "nodejs": "handler.js"} + EXEC_FILES = {Language.PYTHON: "handler.py", Language.NODEJS: "handler.js"} CONFIG_FILES = { - "python": ["requirements.txt", ".python_packages"], - "nodejs": ["package.json", "node_modules"], + Language.PYTHON: ["requirements.txt", ".python_packages"], + Language.NODEJS: ["package.json", "node_modules"], } - package_config = CONFIG_FILES[language_name] + package_config = CONFIG_FILES[language] handler_dir = os.path.join(directory, "handler") os.makedirs(handler_dir) @@ -143,7 +145,7 @@ def package_code( # generate function.json # TODO: extension to other triggers than HTTP default_function_json = { - "scriptFile": EXEC_FILES[language_name], + "scriptFile": EXEC_FILES[language], "bindings": [ { "authLevel": "function", @@ -260,7 +262,7 @@ def default_function_name(self, code_package: Benchmark) -> str: code_package.benchmark, code_package.language_name, code_package.language_version, - self.config.resources_id, + self.config.resources.resources_id, ) .replace(".", "-") .replace("_", "-") diff --git a/sebs/azure/blob_storage.py b/sebs/azure/blob_storage.py index 96558ff6..90b1089b 100644 --- a/sebs/azure/blob_storage.py +++ b/sebs/azure/blob_storage.py @@ -4,6 +4,7 @@ from azure.storage.blob import BlobServiceClient from sebs.cache import Cache +from sebs.faas.config import Resources from ..faas.storage import PersistentStorage @@ -16,21 +17,31 @@ def typename() -> str: def deployment_name(): return "azure" - def __init__(self, region: str, cache_client: Cache, conn_string: str, replace_existing: bool): - super().__init__(region, cache_client, replace_existing) + def __init__( + self, + region: str, + cache_client: Cache, + resources: Resources, + conn_string: str, + replace_existing: bool, + ): + super().__init__(region, cache_client, resources, replace_existing) self.client: BlobServiceClient = BlobServiceClient.from_connection_string(conn_string) """ Internal implementation of creating a new container. """ - def _create_bucket(self, name: str, containers: List[str] = []) -> str: + def _create_bucket( + self, name: str, containers: List[str] = [], randomize_name: bool = False + ) -> str: for c in containers: if name in c: self.logging.info("Container {} for {} already exists, skipping.".format(c, name)) return c - random_name = str(uuid.uuid4())[0:16] - name = "{}-{}".format(name, random_name) + if randomize_name: + random_name = str(uuid.uuid4())[0:16] + name = "{}-{}".format(name, random_name) self.client.create_container(name) self.logging.info("Created container {}".format(name)) return name diff --git a/sebs/azure/config.py b/sebs/azure/config.py index 6bb5ee51..47beb7dd 100644 --- a/sebs/azure/config.py +++ b/sebs/azure/config.py @@ -133,7 +133,7 @@ def __init__( storage_accounts: List["AzureResources.Storage"] = [], data_storage_account: Optional["AzureResources.Storage"] = None, ): - super().__init__() + super().__init__(name="azure") self._resource_group = resource_group self._storage_accounts = storage_accounts self._data_storage_account = data_storage_account @@ -225,16 +225,16 @@ def _create_storage_account(self, cli_instance: AzureCLI) -> "AzureResources.Sto def update_cache(self, cache_client: Cache): cache_client.update_config(val=self.serialize(), keys=["azure", "resources"]) - # FIXME: python3.7+ future annotatons @staticmethod - def initialize(dct: dict) -> Resources: - return AzureResources( - resource_group=dct["resource_group"], - storage_accounts=[ - AzureResources.Storage.deserialize(x) for x in dct["storage_accounts"] - ], - data_storage_account=AzureResources.Storage.deserialize(dct["data_storage_account"]), - ) + def initialize(res: Resources, dct: dict): + + ret = cast(AzureResources, res) + + ret._resource_group = dct["resource_group"] + ret._storage_accounts = [ + AzureResources.Storage.deserialize(x) for x in dct["storage_accounts"] + ] + ret._data_storage_account = AzureResources.Storage.deserialize(dct["data_storage_account"]) def serialize(self) -> dict: out: Dict[str, Any] = {} @@ -250,15 +250,15 @@ def serialize(self) -> dict: def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources: cached_config = cache.get_config("azure") - ret: AzureResources + ret = AzureResources() # Load cached values if cached_config and "resources" in cached_config and len(cached_config["resources"]) > 0: logging.info("Using cached resources for Azure") - ret = cast(AzureResources, AzureResources.initialize(cached_config["resources"])) + AzureResources.initialize(ret, cached_config["resources"]) else: # Check for new config if "resources" in config: - ret = cast(AzureResources, AzureResources.initialize(config["resources"])) + AzureResources.initialize(ret, config["resources"]) ret.logging_handlers = handlers ret.logging.info("No cached resources for Azure found, using user configuration.") else: @@ -270,8 +270,7 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resour class AzureConfig(Config): def __init__(self, credentials: AzureCredentials, resources: AzureResources): - super().__init__() - self._resources_id = "" + super().__init__(name="azure") self._credentials = credentials self._resources = resources @@ -283,23 +282,9 @@ def credentials(self) -> AzureCredentials: def resources(self) -> AzureResources: return self._resources - @property - def resources_id(self) -> str: - return self._resources_id - - # FIXME: use future annotations (see sebs/faas/system) @staticmethod def initialize(cfg: Config, dct: dict): - config = cast(AzureConfig, cfg) - config._region = dct["region"] - if "resources_id" in dct: - config._resources_id = dct["resources_id"] - else: - config._resources_id = str(uuid.uuid1())[0:8] - config.logging.info( - f"Azure: generating unique resource name for " - f"the experiments: {config._resources_id}" - ) + super(AzureConfig, AzureConfig).initialize(cfg, dct) @staticmethod def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config: @@ -329,16 +314,13 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config """ def update_cache(self, cache: Cache): - cache.update_config(val=self.region, keys=["azure", "region"]) - cache.update_config(val=self.resources_id, keys=["azure", "resources_id"]) + super().update_cache(cache) self.credentials.update_cache(cache) self.resources.update_cache(cache) def serialize(self) -> dict: out = { - "name": "azure", - "region": self._region, - "resources_id": self.resources_id, + **super().serialize(), "credentials": self._credentials.serialize(), "resources": self._resources.serialize(), } diff --git a/sebs/benchmark.py b/sebs/benchmark.py index 0c51b2cd..d1720e56 100644 --- a/sebs/benchmark.py +++ b/sebs/benchmark.py @@ -4,6 +4,7 @@ import os import shutil import subprocess +import textwrap from typing import Any, Callable, Dict, List, Tuple import docker @@ -11,12 +12,12 @@ from sebs.config import SeBSConfig from sebs.cache import Cache from sebs.utils import find_benchmark, project_absolute_path, LoggingBase +from sebs.types import Language from sebs.faas.storage import PersistentStorage from typing import TYPE_CHECKING if TYPE_CHECKING: from sebs.experiments.config import Config as ExperimentConfig - from sebs.faas.function import Language class BenchmarkConfig: @@ -137,7 +138,7 @@ def language_version(self): @property # noqa: A003 def hash(self): path = os.path.join(self.benchmark_path, self.language_name) - self._hash_value = Benchmark.hash_directory(path, self._deployment_name, self.language_name) + self._hash_value = Benchmark.hash_directory(path, self._deployment_name, self.language) return self._hash_value @hash.setter # noqa: A003 @@ -192,14 +193,19 @@ def __init__( """ @staticmethod - def hash_directory(directory: str, deployment: str, language: str): + def hash_directory(directory: str, deployment: str, language: Language): hash_sum = hashlib.md5() FILES = { - "python": ["*.py", "requirements.txt*"], - "nodejs": ["*.js", "package.json"], + Language.PYTHON: ["*.py", "requirements.txt*"], + Language.NODEJS: ["*.js", "package.json"], + Language.CPP: ["*.cpp", "*.hpp", "dependencies.json"], + } + WRAPPERS = { + Language.PYTHON: ["*.py"], + Language.NODEJS: ["*.js"], + Language.CPP: ["*.cpp", "*.hpp"], } - WRAPPERS = {"python": "*.py", "nodejs": "*.js"} NON_LANG_FILES = ["*.sh", "*.json"] selected_files = FILES[language] + NON_LANG_FILES for file_type in selected_files: @@ -208,13 +214,14 @@ def hash_directory(directory: str, deployment: str, language: str): with open(path, "rb") as opened_file: hash_sum.update(opened_file.read()) # wrappers - wrappers = project_absolute_path( - "benchmarks", "wrappers", deployment, language, WRAPPERS[language] - ) - for f in glob.glob(wrappers): - path = os.path.join(directory, f) - with open(path, "rb") as opened_file: - hash_sum.update(opened_file.read()) + for wrapper in WRAPPERS[language]: + wrappers = project_absolute_path( + "benchmarks", "wrappers", deployment, language.value, wrapper + ) + for f in glob.glob(wrappers): + path = os.path.join(directory, f) + with open(path, "rb") as opened_file: + hash_sum.update(opened_file.read()) return hash_sum.hexdigest() def serialize(self) -> dict: @@ -246,11 +253,12 @@ def query_cache(self): def copy_code(self, output_dir): FILES = { - "python": ["*.py", "requirements.txt*"], - "nodejs": ["*.js", "package.json"], + Language.PYTHON: ["*.py", "requirements.txt*"], + Language.NODEJS: ["*.js", "package.json"], + Language.CPP: ["*.cpp", "*.hpp", "dependencies.json"], } path = os.path.join(self.benchmark_path, self.language_name) - for file_type in FILES[self.language_name]: + for file_type in FILES[self.language]: for f in glob.glob(os.path.join(path, file_type)): shutil.copy2(os.path.join(path, f), output_dir) @@ -307,6 +315,51 @@ def add_deployment_package_nodejs(self, output_dir): with open(package_config, "w") as package_file: json.dump(package_json, package_file, indent=2) + def add_deployment_package_cpp(self, output_dir): + + # FIXME: Configure CMakeLists.txt dependencies + # FIXME: Configure for AWS - this should be generic + # FIXME: optional hiredis + cmake_script = """ + cmake_minimum_required(VERSION 3.9) + set(CMAKE_CXX_STANDARD 11) + project(benchmark LANGUAGES CXX) + add_executable( + ${PROJECT_NAME} "handler.cpp" "key-value.cpp" + "storage.cpp" "redis.cpp" "tcp.cpp" "utils.cpp" "main.cpp" + ) + target_include_directories(${PROJECT_NAME} PRIVATE ".") + + target_compile_features(${PROJECT_NAME} PRIVATE "cxx_std_11") + target_compile_options(${PROJECT_NAME} PRIVATE "-Wall" "-Wextra") + + find_package(aws-lambda-runtime) + target_link_libraries(${PROJECT_NAME} PRIVATE AWS::aws-lambda-runtime) + + find_package(Boost REQUIRED) + target_include_directories(${PROJECT_NAME} PRIVATE ${Boost_INCLUDE_DIRS}) + target_link_libraries(${PROJECT_NAME} PRIVATE ${Boost_LIBRARIES}) + + find_package(AWSSDK COMPONENTS s3 dynamodb core) + target_link_libraries(${PROJECT_NAME} PUBLIC ${AWSSDK_LINK_LIBRARIES}) + + find_package(PkgConfig REQUIRED) + set(ENV{PKG_CONFIG_PATH} "/opt/lib/pkgconfig") + pkg_check_modules(HIREDIS REQUIRED IMPORTED_TARGET hiredis) + + target_include_directories(${PROJECT_NAME} PUBLIC PkgConfig::HIREDIS) + target_link_libraries(${PROJECT_NAME} PUBLIC PkgConfig::HIREDIS) + + find_package(tcpunch) + target_link_libraries(${PROJECT_NAME} PUBLIC tcpunch::tcpunch) + + # this line creates a target that packages your binary and zips it up + aws_lambda_package_target(${PROJECT_NAME}) + """ + build_script = os.path.join(output_dir, "CMakeLists.txt") + with open(build_script, "w") as script_file: + script_file.write(textwrap.dedent(cmake_script)) + def add_deployment_package(self, output_dir): from sebs.faas.function import Language @@ -314,6 +367,8 @@ def add_deployment_package(self, output_dir): self.add_deployment_package_python(output_dir) elif self.language == Language.NODEJS: self.add_deployment_package_nodejs(output_dir) + elif self.language == Language.CPP: + self.add_deployment_package_cpp(output_dir) else: raise NotImplementedError @@ -370,8 +425,12 @@ def install_dependencies(self, output_dir): } # run Docker container to install packages - PACKAGE_FILES = {"python": "requirements.txt", "nodejs": "package.json"} - file = os.path.join(output_dir, PACKAGE_FILES[self.language_name]) + PACKAGE_FILES = { + Language.PYTHON: "requirements.txt", + Language.NODEJS: "package.json", + Language.CPP: "CMakeLists.txt", + } + file = os.path.join(output_dir, PACKAGE_FILES[self.language]) if os.path.exists(file): try: self.logging.info( @@ -455,7 +514,7 @@ def install_dependencies(self, output_dir): self.logging.info("Docker build: {}".format(line)) except docker.errors.ContainerError as e: self.logging.error("Package build failed!") - self.logging.error(e) + self.logging.error(f"Stderr: {e.stderr.decode()}") self.logging.error(f"Docker mount volumes: {volumes}") raise e @@ -464,7 +523,7 @@ def recalculate_code_size(self): return self._code_size def build( - self, deployment_build_step: Callable[[str, str, str, str, bool], Tuple[str, int]] + self, deployment_build_step: Callable[[str, Language, str, str, bool], Tuple[str, int]] ) -> Tuple[bool, str]: # Skip build if files are up to date and user didn't enforce rebuild @@ -495,7 +554,7 @@ def build( self.install_dependencies(self._output_dir) self._code_location, self._code_size = deployment_build_step( os.path.abspath(self._output_dir), - self.language_name, + self.language, self.language_version, self.benchmark, self.is_cached, diff --git a/sebs/experiments/__init__.py b/sebs/experiments/__init__.py index ff820d40..45c061f3 100644 --- a/sebs/experiments/__init__.py +++ b/sebs/experiments/__init__.py @@ -4,3 +4,4 @@ from .network_ping_pong import NetworkPingPong # noqa from .eviction_model import EvictionModel # noqa from .invocation_overhead import InvocationOverhead # noqa +from .communication_p2p import CommunicationP2P # noqa diff --git a/sebs/experiments/communication_p2p.py b/sebs/experiments/communication_p2p.py new file mode 100644 index 00000000..bcb34ff2 --- /dev/null +++ b/sebs/experiments/communication_p2p.py @@ -0,0 +1,329 @@ +from __future__ import annotations + +import concurrent +import json +import glob +import os +import uuid +from enum import Enum +from typing import TYPE_CHECKING + +import csv + +from sebs.faas.config import Resources +from sebs.faas.system import System as FaaSSystem +from sebs.experiments.experiment import Experiment +from sebs.experiments.config import Config as ExperimentConfig +from sebs.experiments.result import Result as ExperimentResult +from sebs.utils import serialize + +if TYPE_CHECKING: + from sebs import SeBS + + +class CommunicationP2P(Experiment): + class Type(str, Enum): + STORAGE = ("storage",) + KEY_VALUE = "key-value" + REDIS = "redis" + TCP = "tcp" + + @staticmethod + def deserialize(val: str) -> CommunicationP2P.Type: + for member in CommunicationP2P.Type: + if member.value == val: + return member + raise Exception(f"Unknown experiment type {val}") + + def __init__(self, config: ExperimentConfig): + super().__init__(config) + self.settings = self.config.experiment_settings(self.name()) + self.benchmarks = { + CommunicationP2P.Type.STORAGE: "051.communication.storage", + CommunicationP2P.Type.KEY_VALUE: "052.communication.key-value", + CommunicationP2P.Type.REDIS: "053.communication.redis", + CommunicationP2P.Type.TCP: "054.communication.tcp", + } + + def prepare(self, sebs_client: "SeBS", deployment_client: FaaSSystem): + + # deploy network test function + from sebs import SeBS # noqa + from sebs.faas.function import Trigger + + experiment_type = CommunicationP2P.Type.deserialize(self.settings["type"]) + benchmark_name = self.benchmarks.get(experiment_type) + assert benchmark_name is not None + self._benchmark = sebs_client.get_benchmark(benchmark_name, deployment_client, self.config) + self._function = deployment_client.get_function(self._benchmark) + + triggers = self._function.triggers(Trigger.TriggerType.LIBRARY) + self._trigger = triggers[0] + + self._storage = deployment_client.get_storage(replace_existing=True) + self._out_dir = os.path.join(sebs_client.output_dir, self.name(), experiment_type) + if not os.path.exists(self._out_dir): + os.makedirs(self._out_dir) + + self._experiment_bucket = self._storage.experiments_bucket() + self._deployment_client = deployment_client + + if experiment_type == CommunicationP2P.Type.KEY_VALUE: + + self._table_name = deployment_client.config.resources.get_key_value_table( + Resources.StorageType.EXPERIMENTS + ) + self.logging.info(f"Using key-value storage with table {self._table_name}") + + def run(self): + + for invocations in self.settings["invocations"]["invocations_per_round"]: + + type_name = self.settings["type"] + deployment_name = self._deployment_client.name() + experiment_id = f"{deployment_name}-{type_name}-{str(uuid.uuid4())[0:8]}" + bucket_key = os.path.join(self.name(), experiment_id) + result = ExperimentResult(self.config, self._deployment_client.config) + result.begin() + + input_config = { + "bucket": self._experiment_bucket, + "key": bucket_key, + "invocations": { + "warmup": self.settings["invocations"]["warmup"], + "invocations": invocations, + "with_backoff": False, + }, + } + self._additional_settings(type_name, input_config) + + for size in self.settings["sizes"]: + + self.logging.info( + f"Begin experiment {experiment_id}, with {size} size, with {invocations} " + f" invocations per function call." + ) + + input_config["size"] = size + total_iters = self.settings["invocations"]["total"] + invocations_processed = 0 + iteration = 0 + offset = 0 + errors = 0 + max_retries = 3 + + pool = concurrent.futures.ThreadPoolExecutor(2) + + while invocations_processed < total_iters: + + self.logging.info( + f"Invoking {invocations} repetitions, message offset {offset}." + ) + + current_input = input_config + current_input["invocations"]["iteration"] = iteration + current_input["invocations"]["offset"] = offset + + key = str(uuid.uuid4())[0:8] + producer_input = self._additional_settings_producer( + type_name, current_input, key + ) + consumer_input = self._additional_settings_consumer( + type_name, current_input, key + ) + + # FIXME: propert implementation in language triggers + fut = pool.submit(self._trigger.sync_invoke, producer_input) + consumer = self._trigger.sync_invoke(consumer_input) + producer = fut.result() + + if consumer.stats.failure or producer.stats.failure: + self.logging.info("One of invocations failed, repeating!") + # update offset to NOT reuse messages + offset += self.settings["invocations"]["warmup"] + invocations + errors += 1 + + if errors >= max_retries: + self.logging.error("More than three failed invocations, giving up!") + raise RuntimeError() + + continue + else: + errors += 1 + + result.add_invocation(self._function, consumer) + result.add_invocation(self._function, producer) + + invocations_processed += invocations + iteration += 1 + offset += self.settings["invocations"]["warmup"] + invocations + + self.logging.info(f"Finished {invocations_processed}/{total_iters}") + + result.end() + + results_config = { + "type": type_name, + "deployment": deployment_name, + "benchmark": input_config, + "bucket": self._experiment_bucket, + "experiment_id": experiment_id, + "samples": total_iters, + "sizes": self.settings["sizes"], + } + + file_name = f"invocations_{invocations}_results.json" + with open(os.path.join(self._out_dir, file_name), "w") as out_f: + out_f.write(serialize({"experiment": result, "results": results_config})) + + def process( + self, + sebs_client: "SeBS", + deployment_client, + directory: str, + logging_filename: str, + extend_time_interval: int = -1, + ): + storage = deployment_client.get_storage(replace_existing=True) + + files_to_read = [ + "consumer_retries_{}_{}.txt", + "producer_times_{}_{}.txt", + "producer_retries_{}_{}.txt", + ] + additional_files = { + CommunicationP2P.Type.KEY_VALUE: [ + "producer_write_units_{}_{}.txt", + "producer_read_units_{}_{}.txt", + "consumer_write_units_{}_{}.txt", + "consumer_read_units_{}_{}.txt", + ] + } + + with open(os.path.join(directory, self.name(), "result.csv"), "w") as csvfile: + + writer = csv.writer(csvfile, delimiter=",") + writer.writerow( + [ + "channel", + "size", + "invocations-lambda", + "value_type", + "value", + ] + ) + + for experiment_type in [ + CommunicationP2P.Type.STORAGE, + CommunicationP2P.Type.KEY_VALUE, + CommunicationP2P.Type.REDIS, + CommunicationP2P.Type.TCP, + ]: + + out_dir = os.path.join(directory, self.name(), experiment_type) + for f in glob.glob(os.path.join(out_dir, "*.json")): + + experiment_data = {} + with open(f) as fd: + experiment_data = json.load(fd) + + invocations = experiment_data["results"]["benchmark"]["invocations"][ + "invocations" + ] + iterations = experiment_data["results"]["benchmark"]["invocations"]["iteration"] + bucket_name = experiment_data["results"]["bucket"] + bucket_key = experiment_data["results"]["benchmark"]["key"] + sizes = experiment_data["results"]["sizes"] + + results_dir = os.path.join(out_dir, f"results_{invocations}") + os.makedirs(results_dir, exist_ok=True) + + for size in sizes: + + for i in range(iterations + 1): + + for filename in [ + *files_to_read, + *additional_files.get(experiment_type, []), + ]: + + bucket_path = "/".join( + (bucket_key, "results", filename.format(size, i)) + ) + storage.download( + bucket_name, + bucket_path, + os.path.join(results_dir, filename.format(size, i)), + ) + + self.logging.info( + f"Downloaded results from storage for {size} size, " + f"{invocations} invocations run." + ) + + # Process the downloaded data + for i in range(iterations + 1): + + for filename in [ + *files_to_read, + *additional_files.get(experiment_type, []), + ]: + + path = os.path.join(results_dir, filename.format(size, i)) + + data = open(path, "r").read().split() + double_data = [float(x) for x in data] + for val in double_data[1:]: + writer.writerow( + [ + experiment_type, + size, + invocations, + filename.split("_{}")[0], + val, + ] + ) + + self.logging.info( + f"Processed results from storage for {size} size, " + f"{invocations} invocations run." + ) + + def _additional_settings(self, experiment_type: CommunicationP2P.Type, config: dict): + + if experiment_type == CommunicationP2P.Type.REDIS: + config["redis"] = self.settings["redis"] + elif experiment_type == CommunicationP2P.Type.TCP: + config["tcpuncher"] = self.settings["tcpuncher"] + + def _additional_settings_producer( + self, experiment_type: CommunicationP2P.Type, config: dict, key: str + ) -> dict: + new_config = config.copy() + new_config["role"] = "producer" + + if experiment_type == CommunicationP2P.Type.TCP: + new_config["tcpuncher"]["id"] = 0 + new_config["tcpuncher"]["pairing_key"] = key + + return new_config + + def _additional_settings_consumer( + self, experiment_type: CommunicationP2P.Type, config: dict, key: str + ) -> dict: + new_config = config.copy() + new_config["role"] = "consumer" + + if experiment_type == CommunicationP2P.Type.TCP: + new_config["tcpuncher"]["id"] = 0 + new_config["tcpuncher"]["pairing_key"] = key + + return new_config + + @staticmethod + def name() -> str: + return "communication-p2p" + + @staticmethod + def typename() -> str: + return "Experiment.CommunicationP2P" diff --git a/sebs/experiments/config.py b/sebs/experiments/config.py index a5ca3f0b..f99edc24 100644 --- a/sebs/experiments/config.py +++ b/sebs/experiments/config.py @@ -61,9 +61,10 @@ def deserialize(config: dict) -> "Config": PerfCost, InvocationOverhead, EvictionModel, + CommunicationP2P, ) - for exp in [NetworkPingPong, PerfCost, InvocationOverhead, EvictionModel]: + for exp in [NetworkPingPong, PerfCost, InvocationOverhead, EvictionModel, CommunicationP2P]: if exp.name() in config: cfg._experiment_configs[exp.name()] = config[exp.name()] diff --git a/sebs/faas/config.py b/sebs/faas/config.py index 294e7b49..16ec5754 100644 --- a/sebs/faas/config.py +++ b/sebs/faas/config.py @@ -1,12 +1,14 @@ +from __future__ import annotations + from abc import ABC from abc import abstractmethod +from enum import Enum +from typing import Dict, Optional +import uuid from sebs.cache import Cache from sebs.utils import has_platform, LoggingBase, LoggingHandlers -# FIXME: Replace type hints for static generators after migration to 3.7 -# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel - """ Credentials for FaaS system used to authorize operations on functions and other resources. @@ -29,7 +31,7 @@ def __init__(self): @staticmethod @abstractmethod - def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> "Credentials": + def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Credentials: pass """ @@ -51,8 +53,87 @@ def serialize(self) -> dict: class Resources(ABC, LoggingBase): - def __init__(self): + class StorageType(str, Enum): + DEPLOYMENT = "deployment" + BENCHMARKS = "benchmarks" + EXPERIMENTS = "experiments" + + @staticmethod + def deserialize(val: str) -> Resources.StorageType: + for member in Resources.StorageType: + if member.value == val: + return member + raise Exception(f"Unknown storage bucket type type {val}") + + def __init__(self, name: str): super().__init__() + self._name = name + self._buckets: Dict[Resources.StorageType, str] = {} + self._tables: Dict[Resources.StorageType, str] = {} + self._resources_id = "" + + @property + def resources_id(self) -> str: + return self._resources_id + + @resources_id.setter + def resources_id(self, resources_id: str): + self._resources_id = resources_id + + @property + def region(self) -> str: + return self._region + + @region.setter + def region(self, region: str): + self._region = region + + def get_storage_bucket(self, bucket_type: Resources.StorageType) -> Optional[str]: + return self._buckets.get(bucket_type) + + def get_storage_bucket_name(self, bucket_type: Resources.StorageType) -> str: + return f"sebs-{bucket_type.value}-{self._resources_id}" + + def set_storage_bucket(self, bucket_type: Resources.StorageType, bucket_name: str): + self._buckets[bucket_type] = bucket_name + + def _create_key_value_table(self, name: str): + raise NotImplementedError() + + def get_key_value_table(self, table_type: Resources.StorageType) -> str: + + table = self._tables.get(table_type) + + if table is None: + + table = self.get_key_value_table_name(table_type) + self._create_key_value_table(table) + self.set_key_value_table(table_type, table) + + return table + + def get_key_value_table_name(self, table_type: Resources.StorageType) -> str: + return f"sebs-{table_type.value}-{self._resources_id}" + + def set_key_value_table(self, table_type: Resources.StorageType, table_name: str): + self._tables[table_type] = table_name + + @staticmethod + @abstractmethod + def initialize(res: Resources, dct: dict): + if "resources_id" in dct: + res._resources_id = dct["resources_id"] + else: + res._resources_id = str(uuid.uuid1())[0:8] + res.logging.info( + f"Generating unique resource name for " f"the experiments: {res._resources_id}" + ) + if "storage_buckets" in dct: + for key, value in dct["storage_buckets"].items(): + res._buckets[Resources.StorageType.deserialize(key)] = value + if "key-value-tables" in dct: + for key, value in dct["key-value-tables"].items(): + res._tables[Resources.StorageType.deserialize(key)] = value """ Create credentials instance from user config and cached values. @@ -60,7 +141,7 @@ def __init__(self): @staticmethod @abstractmethod - def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> "Resources": + def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources: pass """ @@ -69,7 +150,21 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> "Resou @abstractmethod def serialize(self) -> dict: - pass + out = {"resources_id": self.resources_id} + for key, value in self._buckets.items(): + out[key.value] = value + return out + + def update_cache(self, cache: Cache): + cache.update_config(val=self.resources_id, keys=[self._name, "resources", "resources_id"]) + for key, value in self._buckets.items(): + cache.update_config( + val=value, keys=[self._name, "resources", "storage_buckets", key.value] + ) + for key, value in self._tables.items(): + cache.update_config( + val=value, keys=[self._name, "resources", "key-value-tables", key.value] + ) """ @@ -79,11 +174,10 @@ def serialize(self) -> dict: class Config(ABC, LoggingBase): - - _region: str - - def __init__(self): + def __init__(self, name: str): super().__init__() + self._region = "" + self._name = name @property def region(self) -> str: @@ -101,7 +195,12 @@ def resources(self) -> Resources: @staticmethod @abstractmethod - def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> "Config": + def initialize(cfg: Config, dct: dict): + cfg._region = dct["region"] + + @staticmethod + @abstractmethod + def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config: from sebs.local.config import LocalConfig name = config["name"] @@ -126,10 +225,9 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> "Confi assert func, "Unknown config type!" return func(config[name] if name in config else config, cache, handlers) - @abstractmethod def serialize(self) -> dict: - pass + return {"name": self._name, "region": self._region} @abstractmethod def update_cache(self, cache: Cache): - pass + cache.update_config(val=self.region, keys=[self._name, "region"]) diff --git a/sebs/faas/function.py b/sebs/faas/function.py index 5b1bf748..87b01ae0 100644 --- a/sebs/faas/function.py +++ b/sebs/faas/function.py @@ -9,6 +9,7 @@ from enum import Enum from typing import Callable, Dict, List, Optional, Type, TypeVar # noqa +from sebs.types import Language, Architecture from sebs.benchmark import Benchmark from sebs.utils import LoggingBase @@ -208,6 +209,13 @@ def _http_invoke(self, payload: dict, url: str, verify_ssl: bool = True) -> Exec try: output = json.loads(data.getvalue()) + if "body" in output: + # AWS C++ trigger returns payload as a dictionary inside "body" + # but add a conversion step just in case + if isinstance(output["body"], dict): + output = output["body"] + else: + output = json.loads(output["body"]) if status_code != 200: self.logging.error("Invocation on URL {} failed!".format(url)) @@ -254,34 +262,6 @@ def deserialize(cached_config: dict) -> "Trigger": pass -class Language(Enum): - PYTHON = "python" - NODEJS = "nodejs" - - # FIXME: 3.7+ python with future annotations - @staticmethod - def deserialize(val: str) -> Language: - for member in Language: - if member.value == val: - return member - raise Exception(f"Unknown language type {member}") - - -class Architecture(Enum): - X86 = "x86" - ARM = "arm" - - def serialize(self) -> str: - return self.value - - @staticmethod - def deserialize(val: str) -> Architecture: - for member in Architecture: - if member.value == val: - return member - raise Exception(f"Unknown architecture type {member}") - - @dataclass class Runtime: @@ -293,8 +273,7 @@ def serialize(self) -> dict: @staticmethod def deserialize(config: dict) -> Runtime: - languages = {"python": Language.PYTHON, "nodejs": Language.NODEJS} - return Runtime(language=languages[config["language"]], version=config["version"]) + return Runtime(language=Language.deserialize(config["language"]), version=config["version"]) T = TypeVar("T", bound="FunctionConfig") diff --git a/sebs/faas/storage.py b/sebs/faas/storage.py index d3781f2e..613f3fbd 100644 --- a/sebs/faas/storage.py +++ b/sebs/faas/storage.py @@ -4,6 +4,7 @@ from abc import abstractmethod from typing import List, Tuple +from sebs.faas.config import Resources from sebs.cache import Cache from sebs.utils import LoggingBase @@ -34,7 +35,9 @@ def replace_existing(self, val: bool): def region(self): return self._region - def __init__(self, region: str, cache_client: Cache, replace_existing: bool): + def __init__( + self, region: str, cache_client: Cache, resources: Resources, replace_existing: bool + ): super().__init__() self._cache_client = cache_client self.cached = False @@ -43,6 +46,7 @@ def __init__(self, region: str, cache_client: Cache, replace_existing: bool): self.input_buckets_files: List[List[str]] = [] self._replace_existing = replace_existing self._region = region + self._cloud_resources = resources @property def input(self) -> List[str]: # noqa: A003 @@ -57,7 +61,7 @@ def correct_name(self, name: str) -> str: pass @abstractmethod - def _create_bucket(self, name: str, buckets: List[str] = []): + def _create_bucket(self, name: str, buckets: List[str] = [], randomize_name: bool = False): pass """ @@ -214,6 +218,21 @@ def allocate_buckets(self, benchmark: str, requested_buckets: Tuple[int, int]): ) self.save_storage(benchmark) + def experiments_bucket(self) -> str: + + bucket_type = Resources.StorageType.EXPERIMENTS + bucket = self._cloud_resources.get_storage_bucket(bucket_type) + if bucket is None: + self.logging.info("Initialize a new bucket for experiments results") + # FIXME: detect existing vucket + bucket = self._create_bucket( + self.correct_name(self._cloud_resources.get_storage_bucket_name(bucket_type)), + randomize_name=False, + ) + self._cloud_resources.set_storage_bucket(bucket_type, bucket) + + return bucket + """ Implements a handy routine for uploading input data by benchmarks. It should skip uploading existing files unless storage client has been @@ -233,6 +252,7 @@ def uploader_func(self, bucket_idx: int, file: str, filepath: str) -> None: """ def save_storage(self, benchmark: str): + self.cache_client.update_storage( self.deployment_name(), benchmark, diff --git a/sebs/faas/system.py b/sebs/faas/system.py index 64923255..80621fbc 100644 --- a/sebs/faas/system.py +++ b/sebs/faas/system.py @@ -11,6 +11,7 @@ from sebs.faas.function import Function, Trigger, ExecutionResult from sebs.faas.storage import PersistentStorage from sebs.utils import LoggingBase +from sebs.types import Language from .config import Config """ @@ -107,7 +108,7 @@ def get_storage(self, replace_existing: bool) -> PersistentStorage: def package_code( self, directory: str, - language_name: str, + language: Language, language_version: str, benchmark: str, is_cached: bool, diff --git a/sebs/gcp/config.py b/sebs/gcp/config.py index c4624ad3..f13c3656 100644 --- a/sebs/gcp/config.py +++ b/sebs/gcp/config.py @@ -106,8 +106,8 @@ def __init__(self): super().__init__() @staticmethod - def initialize(dct: dict) -> Resources: - return GCPResources() + def initialize(res: Resources, dct: dict): + pass """ Serialize to JSON for storage in cache. @@ -118,14 +118,15 @@ def serialize(self) -> dict: @staticmethod def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> "Resources": + cached_config = cache.get_config("gcp") - ret: GCPResources + ret = GCPResources() if cached_config and "resources" in cached_config: - ret = cast(GCPResources, GCPResources.initialize(cached_config["resources"])) + GCPResources.initialize(ret, cached_config["resources"]) ret.logging_handlers = handlers ret.logging.info("Using cached resources for GCP") else: - ret = cast(GCPResources, GCPResources.initialize(config)) + GCPResources.initialize(ret, config["resources"]) ret.logging_handlers = handlers ret.logging.info("No cached resources for GCP found, using user configuration.") return ret @@ -141,17 +142,11 @@ def update_cache(self, cache: Cache): class GCPConfig(Config): - - _project_name: str - def __init__(self, credentials: GCPCredentials, resources: GCPResources): - super().__init__() + super().__init__(name="gcp") self._credentials = credentials self._resources = resources - - @property - def region(self) -> str: - return self._region + self._project_name: str = "" @property def project_name(self) -> str: @@ -216,22 +211,21 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> "Confi @staticmethod def initialize(cfg: Config, dct: dict): + super(GCPConfig, GCPConfig).initialize(cfg, dct) config = cast(GCPConfig, cfg) config._project_name = dct["project_name"] - config._region = dct["region"] def serialize(self) -> dict: out = { - "name": "gcp", + **super().serialize(), "project_name": self._project_name, - "region": self._region, "credentials": self._credentials.serialize(), "resources": self._resources.serialize(), } return out def update_cache(self, cache: Cache): + super().update_cache(cache) cache.update_config(val=self.project_name, keys=["gcp", "project_name"]) - cache.update_config(val=self.region, keys=["gcp", "region"]) self.credentials.update_cache(cache) self.resources.update_cache(cache) diff --git a/sebs/gcp/gcp.py b/sebs/gcp/gcp.py index cd97ab9e..9ab790e7 100644 --- a/sebs/gcp/gcp.py +++ b/sebs/gcp/gcp.py @@ -23,6 +23,7 @@ from sebs.gcp.storage import GCPStorage from sebs.gcp.function import GCPFunction from sebs.utils import LoggingHandlers +from sebs.types import Language """ This class provides basic abstractions for the FaaS system. @@ -93,7 +94,9 @@ def get_storage( buckets=None, ) -> PersistentStorage: if not self.storage: - self.storage = GCPStorage(self.config.region, self.cache_client, replace_existing) + self.storage = GCPStorage( + self.config.region, self.cache_client, self.config.resources, replace_existing + ) self.storage.logging_handlers = self.logging_handlers else: self.storage.replace_existing = replace_existing @@ -132,21 +135,21 @@ def format_function_name(func_name: str) -> str: def package_code( self, directory: str, - language_name: str, + language: Language, language_version: str, benchmark: str, is_cached: bool, ) -> Tuple[str, int]: CONFIG_FILES = { - "python": ["handler.py", ".python_packages"], - "nodejs": ["handler.js", "node_modules"], + Language.PYTHON: ["handler.py", ".python_packages"], + Language.NODEJS: ["handler.js", "node_modules"], } HANDLER = { - "python": ("handler.py", "main.py"), - "nodejs": ("handler.js", "index.js"), + Language.PYTHON: ("handler.py", "main.py"), + Language.NODEJS: ("handler.js", "index.js"), } - package_config = CONFIG_FILES[language_name] + package_config = CONFIG_FILES[language] function_dir = os.path.join(directory, "function") os.makedirs(function_dir) for file in os.listdir(directory): @@ -159,7 +162,7 @@ def package_code( requirements.close() # rename handler function.py since in gcp it has to be caled main.py - old_name, new_name = HANDLER[language_name] + old_name, new_name = HANDLER[language] old_path = os.path.join(directory, old_name) new_path = os.path.join(directory, new_name) shutil.move(old_path, new_path) diff --git a/sebs/gcp/storage.py b/sebs/gcp/storage.py index b59b18e0..88d542c5 100644 --- a/sebs/gcp/storage.py +++ b/sebs/gcp/storage.py @@ -6,6 +6,7 @@ from google.api_core import exceptions from sebs.cache import Cache +from sebs.faas.config import Resources from ..faas.storage import PersistentStorage @@ -26,8 +27,10 @@ def replace_existing(self) -> bool: def replace_existing(self, val: bool): self._replace_existing = val - def __init__(self, region: str, cache_client: Cache, replace_existing: bool): - super().__init__(region, cache_client, replace_existing) + def __init__( + self, region: str, cache_client: Cache, resources: Resources, replace_existing: bool + ): + super().__init__(region, cache_client, resources, replace_existing) self.replace_existing = replace_existing self.client = gcp_storage.Client() self.cached = False @@ -35,7 +38,7 @@ def __init__(self, region: str, cache_client: Cache, replace_existing: bool): def correct_name(self, name: str) -> str: return name - def _create_bucket(self, name, buckets: List[str] = []): + def _create_bucket(self, name, buckets: List[str] = [], randomize_name: bool = False): found_bucket = False for bucket_name in buckets: if name in bucket_name: @@ -43,8 +46,13 @@ def _create_bucket(self, name, buckets: List[str] = []): break if not found_bucket: - random_name = str(uuid.uuid4())[0:16] - bucket_name = "{}-{}".format(name, random_name).replace(".", "_") + + if randomize_name: + random_name = str(uuid.uuid4())[0:16] + bucket_name = "{}-{}".format(name, random_name).replace(".", "_") + else: + bucket_name = name + self.client.create_bucket(bucket_name) logging.info("Created bucket {}".format(bucket_name)) return bucket_name diff --git a/sebs/local/config.py b/sebs/local/config.py index 5b091664..d9de6f1a 100644 --- a/sebs/local/config.py +++ b/sebs/local/config.py @@ -23,7 +23,7 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Creden class LocalResources(Resources): def __init__(self, storage_cfg: Optional[MinioConfig] = None): - super().__init__() + super().__init__(name="local") self._storage = storage_cfg @property @@ -33,6 +33,10 @@ def storage_config(self) -> Optional[MinioConfig]: def serialize(self) -> dict: return {} + @staticmethod + def initialize(res: Resources, cfg: dict): + pass + @staticmethod def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources: ret = LocalResources() diff --git a/sebs/local/deployment.py b/sebs/local/deployment.py index d23d87d4..9593c5e7 100644 --- a/sebs/local/deployment.py +++ b/sebs/local/deployment.py @@ -3,6 +3,7 @@ from sebs.cache import Cache from sebs.local.function import LocalFunction +from sebs.local.config import LocalResources from sebs.storage.minio import Minio, MinioConfig from sebs.utils import serialize @@ -30,6 +31,7 @@ def serialize(self, path: str): ) ) + # FIXME: do we still use it? @staticmethod def deserialize(path: str, cache_client: Cache) -> "Deployment": with open(path, "r") as in_f: @@ -40,7 +42,7 @@ def deserialize(path: str, cache_client: Cache) -> "Deployment": for func in input_data["functions"]: deployment._functions.append(LocalFunction.deserialize(func)) deployment._storage = Minio.deserialize( - MinioConfig.deserialize(input_data["storage"]), cache_client + MinioConfig.deserialize(input_data["storage"]), cache_client, LocalResources() ) return deployment diff --git a/sebs/local/local.py b/sebs/local/local.py index ad18551e..0ffe9f6a 100644 --- a/sebs/local/local.py +++ b/sebs/local/local.py @@ -75,7 +75,7 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage: "The local deployment is missing the configuration of pre-allocated storage!" ) self.storage = Minio.deserialize( - self.config.resources.storage_config, self.cache_client + self.config.resources.storage_config, self.cache_client, self.config.resources ) self.storage.logging_handlers = self.logging_handlers else: diff --git a/sebs/openwhisk/config.py b/sebs/openwhisk/config.py index dfaad3fc..16e11ad0 100644 --- a/sebs/openwhisk/config.py +++ b/sebs/openwhisk/config.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from sebs.cache import Cache from sebs.faas.config import Credentials, Resources, Config from sebs.utils import LoggingHandlers @@ -23,7 +25,7 @@ def __init__( password: Optional[str] = None, registry_updated: bool = False, ): - super().__init__() + super().__init__(name="openwhisk") self._docker_registry = registry if registry != "" else None self._docker_username = username if username != "" else None self._docker_password = password if password != "" else None @@ -60,17 +62,21 @@ def registry_updated(self) -> bool: return self._registry_updated @staticmethod - def initialize(dct: dict) -> Resources: - return OpenWhiskResources(dct["registry"], dct["username"], dct["password"]) + def initialize(res: Resources, dct: dict): + ret = cast(OpenWhiskResources, res) + ret._docker_registry = dct["registry"] + ret._docker_username = dct["username"] + ret._docker_password = dct["password"] @staticmethod def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources: cached_config = cache.get_config("openwhisk") - ret: OpenWhiskResources + ret = OpenWhiskResources() # Check for new config - overrides but check if it's different if "docker_registry" in config: - ret = cast(OpenWhiskResources, OpenWhiskResources.initialize(config["docker_registry"])) + + OpenWhiskResources.initialize(ret, config["docker_registry"]) ret.logging.info("Using user-provided Docker registry for OpenWhisk.") ret.logging_handlers = handlers @@ -89,10 +95,7 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resour and "resources" in cached_config and "docker" in cached_config["resources"] ): - ret = cast( - OpenWhiskResources, - OpenWhiskResources.initialize(cached_config["resources"]["docker"]), - ) + OpenWhiskResources.initialize(ret, cached_config["resources"]["docker"]) ret.logging_handlers = handlers ret.logging.info("Using cached Docker registry for OpenWhisk") else: @@ -160,7 +163,7 @@ class OpenWhiskConfig(Config): cache: Cache def __init__(self, config: dict, cache: Cache): - super().__init__() + super().__init__(name="openwhisk") self._credentials = OpenWhiskCredentials() self._resources = OpenWhiskResources() self.shutdownStorage = config["shutdownStorage"] @@ -184,7 +187,7 @@ def initialize(cfg: Config, dct: dict): def serialize(self) -> dict: return { - "name": "openwhisk", + **super().serialize(), "shutdownStorage": self.shutdownStorage, "removeCluster": self.removeCluster, "wskExec": self.wsk_exec, @@ -207,6 +210,7 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config return res def update_cache(self, cache: Cache): + super().update_cache(cache) cache.update_config(val=self.shutdownStorage, keys=["openwhisk", "shutdownStorage"]) cache.update_config(val=self.removeCluster, keys=["openwhisk", "removeCluster"]) cache.update_config(val=self.wsk_exec, keys=["openwhisk", "wskExec"]) diff --git a/sebs/openwhisk/openwhisk.py b/sebs/openwhisk/openwhisk.py index 0337a9bb..ebe2347f 100644 --- a/sebs/openwhisk/openwhisk.py +++ b/sebs/openwhisk/openwhisk.py @@ -15,6 +15,7 @@ from .config import OpenWhiskConfig from .function import OpenWhiskFunction, OpenWhiskFunctionConfig from ..config import SeBSConfig +from sebs.types import Language class OpenWhisk(System): @@ -57,7 +58,7 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage: "OpenWhisk is missing the configuration of pre-allocated storage!" ) self.storage = Minio.deserialize( - self.config.resources.storage_config, self.cache_client + self.config.resources.storage_config, self.cache_client, self.config.resources ) self.storage.logging_handlers = self.logging_handlers else: @@ -112,7 +113,7 @@ def find_image(self, repository_name, image_tag) -> bool: def build_base_image( self, directory: str, - language_name: str, + language: Language, language_version: str, benchmark: str, is_cached: bool, @@ -133,7 +134,7 @@ def build_base_image( registry_name = self.config.resources.docker_registry repository_name = self.system_config.docker_repository() image_tag = self.system_config.benchmark_image_tag( - self.name(), benchmark, language_name, language_version + self.name(), benchmark, language.value, language_version ) if registry_name is not None: repository_name = f"{registry_name}/{repository_name}" @@ -159,7 +160,7 @@ def build_base_image( build_dir = os.path.join(directory, "docker") os.makedirs(build_dir) shutil.copy( - os.path.join(PROJECT_DIR, "docker", self.name(), language_name, "Dockerfile.function"), + os.path.join(PROJECT_DIR, "docker", self.name(), language.value, "Dockerfile.function"), os.path.join(build_dir, "Dockerfile"), ) @@ -171,7 +172,7 @@ def build_base_image( with open(os.path.join(build_dir, ".dockerignore"), "w") as f: f.write("Dockerfile") - builder_image = self.system_config.benchmark_base_images(self.name(), language_name)[ + builder_image = self.system_config.benchmark_base_images(self.name(), language.value)[ language_version ] self.logging.info(f"Build the benchmark base image {repository_name}:{image_tag}.") @@ -200,7 +201,7 @@ def build_base_image( def package_code( self, directory: str, - language_name: str, + language: Language, language_version: str, benchmark: str, is_cached: bool, @@ -208,15 +209,15 @@ def package_code( # Regardless of Docker image status, we need to create .zip file # to allow registration of function with OpenWhisk - self.build_base_image(directory, language_name, language_version, benchmark, is_cached) + self.build_base_image(directory, language, language_version, benchmark, is_cached) # We deploy Minio config in code package since this depends on local # deployment - it cannnot be a part of Docker image CONFIG_FILES = { - "python": ["__main__.py"], - "nodejs": ["index.js"], + Language.PYTHON: ["__main__.py"], + Language.NODEJS: ["index.js"], } - package_config = CONFIG_FILES[language_name] + package_config = CONFIG_FILES[language] benchmark_archive = os.path.join(directory, f"{benchmark}.zip") subprocess.run( diff --git a/sebs/openwhisk/storage.py b/sebs/openwhisk/storage.py index d94182c4..79e8e17c 100644 --- a/sebs/openwhisk/storage.py +++ b/sebs/openwhisk/storage.py @@ -1,5 +1,6 @@ import docker +from sebs.faas.config import Resources from sebs.storage import minio from sebs.storage.config import MinioConfig from sebs.cache import Cache @@ -10,9 +11,17 @@ class Minio(minio.Minio): def deployment_name() -> str: return "openwhisk" - def __init__(self, docker_client: docker.client, cache_client: Cache, replace_existing: bool): - super().__init__(docker_client, cache_client, replace_existing) + def __init__( + self, + docker_client: docker.client, + cache_client: Cache, + res: Resources, + replace_existing: bool, + ): + super().__init__(docker_client, cache_client, res, replace_existing) @staticmethod - def deserialize(cached_config: MinioConfig, cache_client: Cache) -> "Minio": - return super(Minio, Minio)._deserialize(cached_config, cache_client, Minio) + def deserialize( + cached_config: MinioConfig, cache_client: Cache, resources: Resources + ) -> "Minio": + return super(Minio, Minio)._deserialize(cached_config, cache_client, resources, Minio) diff --git a/sebs/sebs.py b/sebs/sebs.py index 1a7aa65f..10591743 100644 --- a/sebs/sebs.py +++ b/sebs/sebs.py @@ -138,6 +138,7 @@ def get_experiment( NetworkPingPong, InvocationOverhead, EvictionModel, + CommunicationP2P, ) implementations: Dict[str, Type[Experiment]] = { @@ -145,6 +146,7 @@ def get_experiment( "network-ping-pong": NetworkPingPong, "invocation-overhead": InvocationOverhead, "eviction-model": EvictionModel, + CommunicationP2P.name(): CommunicationP2P, } if experiment_type not in implementations: raise RuntimeError(f"Experiment {experiment_type} not supported!") diff --git a/sebs/storage/minio.py b/sebs/storage/minio.py index 6c79f05d..3c686000 100644 --- a/sebs/storage/minio.py +++ b/sebs/storage/minio.py @@ -9,6 +9,7 @@ from sebs.cache import Cache from sebs.types import Storage as StorageTypes +from sebs.faas.config import Resources from sebs.faas.storage import PersistentStorage from sebs.storage.config import MinioConfig @@ -25,8 +26,14 @@ def deployment_name() -> str: # the location does not matter MINIO_REGION = "us-east-1" - def __init__(self, docker_client: docker.client, cache_client: Cache, replace_existing: bool): - super().__init__(self.MINIO_REGION, cache_client, replace_existing) + def __init__( + self, + docker_client: docker.client, + cache_client: Cache, + resources: Resources, + replace_existing: bool, + ): + super().__init__(self.MINIO_REGION, cache_client, resources, replace_existing) self._docker_client = docker_client self._storage_container: Optional[docker.container] = None self._cfg = MinioConfig() @@ -122,7 +129,7 @@ def get_connection(self): http_client=Minio._define_http_client(), ) - def _create_bucket(self, name: str, buckets: List[str] = []): + def _create_bucket(self, name: str, buckets: List[str] = [], randomize_name: bool = False): for bucket_name in buckets: if name in bucket_name: self.logging.info( @@ -130,7 +137,10 @@ def _create_bucket(self, name: str, buckets: List[str] = []): ) return bucket_name # minio has limit of bucket name to 16 characters - bucket_name = "{}-{}".format(name, str(uuid.uuid4())[0:16]) + if randomize_name: + bucket_name = "{}-{}".format(name, str(uuid.uuid4())[0:16]) + else: + bucket_name = name try: self.connection.make_bucket(bucket_name, location=self.MINIO_REGION) self.logging.info("Created bucket {}".format(bucket_name)) @@ -208,9 +218,11 @@ def serialize(self) -> dict: T = TypeVar("T", bound="Minio") @staticmethod - def _deserialize(cached_config: MinioConfig, cache_client: Cache, obj_type: Type[T]) -> T: + def _deserialize( + cached_config: MinioConfig, cache_client: Cache, res: Resources, obj_type: Type[T] + ) -> T: docker_client = docker.from_env() - obj = obj_type(docker_client, cache_client, False) + obj = obj_type(docker_client, cache_client, res, False) obj._cfg = cached_config if cached_config.instance_id: instance_id = cached_config.instance_id @@ -226,5 +238,5 @@ def _deserialize(cached_config: MinioConfig, cache_client: Cache, obj_type: Type return obj @staticmethod - def deserialize(cached_config: MinioConfig, cache_client: Cache) -> "Minio": - return Minio._deserialize(cached_config, cache_client, Minio) + def deserialize(cached_config: MinioConfig, cache_client: Cache, res: Resources) -> "Minio": + return Minio._deserialize(cached_config, cache_client, res, Minio) diff --git a/sebs/types.py b/sebs/types.py index 43574337..998ea542 100644 --- a/sebs/types.py +++ b/sebs/types.py @@ -1,16 +1,45 @@ +from __future__ import annotations from enum import Enum class Platforms(str, Enum): - AWS = ("aws",) - AZURE = ("azure",) - GCP = ("gcp",) - LOCAL = ("local",) + AWS = "aws" + AZURE = "azure" + GCP = "gcp" + LOCAL = "local" OPENWHISK = "openwhisk" class Storage(str, Enum): - AWS_S3 = ("aws-s3",) - AZURE_BLOB_STORAGE = ("azure-blob-storage",) - GCP_STORAGE = ("google-cloud-storage",) + AWS_S3 = "aws-s3" + AZURE_BLOB_STORAGE = "azure-blob-storage" + GCP_STORAGE = "google-cloud-storage" MINIO = "minio" + + +class Language(str, Enum): + PYTHON = "python" + NODEJS = "nodejs" + CPP = "cpp" + + @staticmethod + def deserialize(val: str) -> Language: + for member in Language: + if member.value == val: + return member + raise Exception(f"Unknown language type {val}") + + +class Architecture(str, Enum): + X86 = "x86" + ARM = "arm" + + def serialize(self) -> str: + return self.value + + @staticmethod + def deserialize(val: str) -> Architecture: + for member in Architecture: + if member.value == val: + return member + raise Exception(f"Unknown architecture type {val}") diff --git a/tools/build_docker_images.py b/tools/build_docker_images.py index 8f1eb320..109b9ecd 100755 --- a/tools/build_docker_images.py +++ b/tools/build_docker_images.py @@ -12,8 +12,10 @@ parser.add_argument( "--deployment", default=None, choices=["local", "aws", "azure", "gcp"], action="store" ) -parser.add_argument("--type", default=None, choices=["build", "run", "manage"], action="store") -parser.add_argument("--language", default=None, choices=["python", "nodejs"], action="store") +parser.add_argument("--type", default=None, choices=["build", "dependencies", "run", "manage"], action="store") +parser.add_argument("--type-tag", default=None, type=str, action="store") +parser.add_argument("--language", default=None, choices=["python", "nodejs", "cpp"], action="store") +parser.add_argument('--parallel', default=1, type=int, action='store') args = parser.parse_args() config = json.load(open(os.path.join(PROJECT_DIR, "config", "systems.json"), "r")) client = docker.from_env() @@ -40,6 +42,8 @@ def build(image_type, system, language=None, version=None, version_name=None): # if we pass an integer, the build will fail with 'connection reset by peer' buildargs = { "VERSION": version, + 'WORKERS': str(args.parallel), + 'BASE_REPOSITORY': config["general"]["docker_repository"] } if version: buildargs["BASE_IMAGE"] = version_name @@ -50,7 +54,6 @@ def build(image_type, system, language=None, version=None, version_name=None): ) client.images.build(path=PROJECT_DIR, dockerfile=dockerfile, buildargs=buildargs, tag=target) - def build_language(system, language, language_config): configs = [] if "base_images" in language_config: @@ -74,6 +77,22 @@ def build_systems(system, system_config): build(args.type, system) else: print(f"Skipping manage image for {system}") + elif args.type == "dependencies": + if args.language: + if "dependencies" in system_config["languages"][args.language]: + language_config = system_config["languages"][args.language] + # for all dependencies + if args.type_tag: + # for all image versions + for version, base_image in language_config["base_images"].items(): + build(f"{args.type}-{args.type_tag}", system, args.language, version, base_image) + else: + for dep in system_config["languages"][args.language]["dependencies"]: + # for all image versions + for version, base_image in language_config["base_images"].items(): + build(f"{args.type}-{dep}", system, args.language, version, base_image) + else: + raise RuntimeError('Language must be specified for dependencies') else: if args.language: build_language(system, args.language, system_config["languages"][args.language])