Skip to content

Commit

Permalink
Merge pull request #5410 from BOINC/dpa_sporadic_wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
AenBleidd authored Oct 28, 2023
2 parents f9e6855 + 43efa3a commit b8c58bc
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 71 deletions.
104 changes: 98 additions & 6 deletions api/boinc_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@
#include <cstdio>
#include <cstdarg>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <sys/time.h>
Expand Down Expand Up @@ -144,6 +146,10 @@ using std::vector;
// CPPFLAGS=-DGETRUSAGE_IN_TIMER_THREAD
#endif

// Anything shared between the worker and timer thread
// must be declared volatile to ensure that writes in one thread
// are seen immediately by the other.

const char* api_version = "API_VERSION_" PACKAGE_VERSION;
static APP_INIT_DATA aid;
static FILE_LOCK file_lock;
Expand All @@ -155,7 +161,7 @@ static volatile double last_checkpoint_cpu_time;
static volatile bool ready_to_checkpoint = false;
static volatile int in_critical_section = 0;
static volatile double last_wu_cpu_time;
static volatile bool standalone = false;
static volatile bool standalone = true;
static volatile double initial_wu_cpu_time;
static volatile bool have_new_trickle_up = false;
static volatile bool have_trickle_down = true;
Expand Down Expand Up @@ -194,7 +200,9 @@ char remote_desktop_addr[256];
bool send_remote_desktop_addr = false;
int app_min_checkpoint_period = 0;
// min checkpoint period requested by app
SPORADIC_AC_STATE ac_state;
static volatile SPORADIC_AC_STATE ac_state;
static volatile int ac_fd, ca_fd;
static volatile bool do_sporadic_files;

#define TIMER_PERIOD 0.1
// Sleep interval for timer thread;
Expand Down Expand Up @@ -518,6 +526,57 @@ static bool client_dead() {
return false;
}

// called once/sec in timer thread.
// Copy sporadic app messages to/from files (for wrappers)
//
static void sporadic_files() {
static time_t last_ac_mod_time = 0;
static SPORADIC_CA_STATE last_ca_state = CA_NONE;
char buf[256];

// if C->A state has changed, write to file
//
if (last_ca_state != boinc_status.ca_state) {
sprintf(buf, "%d\n", boinc_status.ca_state);
lseek(ca_fd, 0, SEEK_SET);
if (write(ca_fd, buf, sizeof(buf))) {}
// one way to avoid warnings
last_ca_state = boinc_status.ca_state;
}

// check if app has updated file with A->C state
//
struct stat sbuf;
int ret = fstat(ac_fd, &sbuf);
if (!ret) {
#ifdef _WIN32
time_t t = sbuf.st_mtime;
#elif defined(__APPLE__)
time_t t = sbuf.st_mtimespec.tv_sec;
#else
time_t t = sbuf.st_mtim.tv_sec;
#endif
if (t != last_ac_mod_time) {
lseek(ac_fd, 0, SEEK_SET);
int nc = read(ac_fd, buf, sizeof(buf));
if (nc>0) {
int val;
buf[nc] = 0;
int n = sscanf(buf, "%d", &val);
if (n == 1) {
ac_state = (SPORADIC_AC_STATE)val;
} else {
ac_state = AC_NONE;
fprintf(stderr, "API: error parsing AC state: %s\n", buf);
}
last_ac_mod_time = t;
} else {
fprintf(stderr, "API: error reading AC state: %d\n", nc);
}
}
}
}

#ifndef _WIN32
// For multithread apps on Unix, the main process executes the following.
//
Expand Down Expand Up @@ -687,6 +746,7 @@ int boinc_init_options_general(BOINC_OPTIONS& opt) {
}
}

standalone = false;
retval = boinc_parse_init_data_file();
if (retval) {
standalone = true;
Expand Down Expand Up @@ -770,8 +830,10 @@ int boinc_finish_message(int status, const char* msg, bool is_notice) {
boinc_msg_prefix(buf, sizeof(buf)), status
);
finishing = true;
boinc_sleep(2.0); // let the timer thread send final messages
boinc_disable_timer_thread = true; // then disable it
if (!standalone) {
boinc_sleep(2.0); // let the timer thread send final messages
boinc_disable_timer_thread = true; // then disable it
}

if (options.main_program) {
FILE* f = fopen(BOINC_FINISH_CALLED_FILE, "w");
Expand Down Expand Up @@ -882,6 +944,28 @@ int boinc_is_standalone() {
return 0;
}

int boinc_sporadic_dir(const char* dir) {
char buf[MAXPATHLEN];

do_sporadic_files = true;
sprintf(buf, "%s/ac", dir);
ac_fd = open(buf, O_CREAT|O_RDONLY, 0666);
if (ac_fd < 0) {
fprintf(stderr, "can't open sporadic file %s\n", buf);
do_sporadic_files = false;
}
sprintf(buf, "%s/ca", dir);
ca_fd = open(buf, O_CREAT|O_WRONLY, 0666);
if (ca_fd < 0) {
fprintf(stderr, "can't open sporadic file %s\n", buf);
do_sporadic_files = false;
}
if (!do_sporadic_files) return ERR_FOPEN;
boinc_status.ca_state = CA_DONT_COMPUTE;
ac_state = AC_NONE;
return 0;
}

// called from the timer thread if we need to exit,
// e.g. quit message from client, or client has gone away
//
Expand Down Expand Up @@ -993,6 +1077,10 @@ int boinc_report_app_status_aux(
snprintf(buf, sizeof(buf), "<bytes_received>%f</bytes_received>\n", _bytes_received);
safe_strcat(msg_buf, buf);
}
if (ac_state) {
sprintf(buf, "<sporadic_ac>%d</sporadic_ac>\n", ac_state);
strlcat(msg_buf, buf, sizeof(msg_buf));
}
#ifdef MSGS_FROM_FILE
if (fout) {
fputs(msg_buf, fout);
Expand Down Expand Up @@ -1058,7 +1146,7 @@ static int suspend_activities(bool called_from_worker) {
suspend_or_resume_descendants(false);
}
// if called from worker thread, sleep until suspension is over
// if called from time thread, don't need to do anything;
// if called from timer thread, don't need to do anything;
// suspension is done by signal handler in worker thread
//
if (called_from_worker) {
Expand Down Expand Up @@ -1365,6 +1453,10 @@ static void timer_handler() {
app_client_shm->shm->graphics_reply.send_msg(buf);
send_remote_desktop_addr = false;
}

if (do_sporadic_files) {
sporadic_files();
}
}

#ifdef _WIN32
Expand Down Expand Up @@ -1489,7 +1581,7 @@ int start_timer_thread() {

// called in the worker thread.
// set up a handler for SIGALRM.
// If Android, we'll get signals from the time thread.
// If Android, we'll get signals from the timer thread.
// otherwise, set an interval timer to deliver signals
//
static int start_worker_signals() {
Expand Down
1 change: 1 addition & 0 deletions api/boinc_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ extern int boinc_finish_message(
);
extern void boinc_sporadic_set_ac_state(SPORADIC_AC_STATE);
extern SPORADIC_CA_STATE boinc_sporadic_get_ca_state();
extern int boinc_sporadic_dir(const char*);

/////////// API ENDS HERE

Expand Down
4 changes: 2 additions & 2 deletions client/app_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -738,8 +738,8 @@ void ACTIVE_TASK_SET::send_heartbeats() {
if (log_flags.heartbeat_debug) {
if (sent) {
msg_printf(atp->result->project, MSG_INFO,
"[heartbeat] Heartbeat sent to task %s",
atp->result->name
"[heartbeat] Heartbeat sent to task %s: %s",
atp->result->name, buf
);
} else {
msg_printf(atp->result->project, MSG_INFO,
Expand Down
3 changes: 3 additions & 0 deletions client/app_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ void CLIENT_STATE::app_test_init() {
// can put other stuff here like
av->avg_ncpus = 1;
av->flops = 1e9;
#if 0
av->gpu_ram = 1e7;
av->gpu_usage.rsc_type = PROC_TYPE_NVIDIA_GPU;
av->gpu_usage.usage = 1;
#endif
app_versions.push_back(av);

WORKUNIT *wu = new WORKUNIT;
Expand All @@ -77,6 +79,7 @@ void CLIENT_STATE::app_test_init() {
wu->rsc_fpops_bound = 1e12;
wu->rsc_memory_bound = 1e9;
wu->rsc_disk_bound = 1e9;
wu->command_line = "--sporadic";
workunits.push_back(wu);

RESULT *res = new RESULT;
Expand Down
10 changes: 6 additions & 4 deletions client/cpu_sched.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1626,10 +1626,12 @@ ACTIVE_TASK* CLIENT_STATE::get_task(RESULT* rp) {
ACTIVE_TASK *atp = lookup_active_task_by_result(rp);
if (!atp) {
atp = new ACTIVE_TASK;
int retval = atp->get_free_slot(rp);
if (retval) {
delete atp;
return NULL;
if (!rp->project->app_test) {
int retval = atp->get_free_slot(rp);
if (retval) {
delete atp;
return NULL;
}
}
atp->init(rp);
active_tasks.active_tasks.push_back(atp);
Expand Down
1 change: 1 addition & 0 deletions client/cs_cmdline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ static void print_options(char* prog) {
" --abort_jobs_on_exit when client exits, abort and report jobs\n"
" --allow_remote_gui_rpc allow remote GUI RPC connections\n"
" --allow_multiple_clients allow >1 instances per host\n"
" --app_test F run a simulated job with the given app\n"
" --attach_project <URL> <key> attach to a project\n"
" --check_all_logins for idle detection, check remote logins too\n"
" --daemon run as daemon (Unix)\n"
Expand Down
1 change: 1 addition & 0 deletions client/cs_sporadic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
SPORADIC_RESOURCES sporadic_resources;

void SPORADIC_RESOURCES::print() {
if (!ncpus_used) return;
msg_printf(NULL, MSG_INFO, "Sporadic resources:");
msg_printf(NULL, MSG_INFO, " %f CPUs", ncpus_used);
msg_printf(NULL, MSG_INFO, " %f MB RAM", mem_used/MEGA);
Expand Down
77 changes: 68 additions & 9 deletions samples/sporadic/sporadic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,29 @@
// when OK, compute for NCOMP secs
// suspend as needed
//
// computing is embedded in loop.
// computing is embedded in the loop.
// in a real app you'd want to use threads

// by default this uses the BOINC API for communicating sporadic state.
// --wrapped: use files instead (run under wrapper)

#define NWAIT 10
#define NCOMP 10

#include <sys/types.h>
#include <sys/stat.h>
#ifndef _WIN32
#include <unistd.h>
#endif
#include <fcntl.h>
#include <stdio.h>

#include "boinc_api.h"
#include "util.h"
#include "common_defs.h"

void boinc_sporadic_set_ac_state(SPORADIC_AC_STATE);
SPORADIC_CA_STATE boinc_sporadic_get_ca_state();
bool wrapped = false;
int ac_fd, ca_fd;

void compute_one_sec() {
double start = dtime();
Expand All @@ -31,22 +42,70 @@ void compute_one_sec() {
}
}

int main(int, char**) {
boinc_init();
void set_ac_state(SPORADIC_AC_STATE ac_state) {
static SPORADIC_AC_STATE last = AC_NONE;
if (wrapped) {
if (ac_state != last) {
char buf[256];
sprintf(buf, "%d\n", ac_state);
lseek(ac_fd, 0, SEEK_SET);
write(ac_fd, buf, strlen(buf));
}
last = ac_state;
} else {
boinc_sporadic_set_ac_state(ac_state);
}
}

SPORADIC_CA_STATE get_ca_state() {
if (wrapped) {
// could check mod time; don't bother
char buf[256];
lseek(ca_fd, 0, SEEK_SET);
read(ca_fd, buf, sizeof(buf));
int s;
int n = sscanf(buf, "%d", &s);
if (n==1) return (SPORADIC_CA_STATE)s;
fprintf(stderr, "can't read CA state\n");
exit(1);
} else {
return boinc_sporadic_get_ca_state();
}
}

int main(int argc, char** argv) {
SPORADIC_CA_STATE ca_state;
SPORADIC_AC_STATE ac_state;

for (int i=1; i<argc; i++) {
if (!strcmp(argv[i], "--wrapped")) {
wrapped = true;
}
}

if (wrapped) {
ca_fd = open("ca", O_RDONLY);
ac_fd = open("ac", O_WRONLY);
if (ca_fd<0 || ac_fd<0) {
fprintf(stderr, "can't open files\n");
exit(1);
}
} else {
boinc_init();
}

fprintf(stderr, "starting\n");
while (true) {
// wait for a bit
ac_state = AC_DONT_WANT_COMPUTE;
boinc_sporadic_set_ac_state(ac_state);
set_ac_state(ac_state);
for (int i=0; i<NWAIT; i++) {
fprintf(stderr, "sleep - don't want to compute\n");
boinc_sleep(1);
}
// wait until client says we can possibly compute
while (1) {
ca_state = boinc_sporadic_get_ca_state();
ca_state = get_ca_state();
if (ca_state != CA_COULD_COMPUTE) {
fprintf(stderr, "sleep - waiting for COULD_COMPUTE\n");
boinc_sleep(1);
Expand All @@ -56,11 +115,11 @@ int main(int, char**) {
}
// tell the client we want to compute
ac_state = AC_WANT_COMPUTE;
boinc_sporadic_set_ac_state(ac_state);
set_ac_state(ac_state);
int n = NCOMP;
while (true) {
// compute only if client says so
ca_state = boinc_sporadic_get_ca_state();
ca_state = get_ca_state();
fprintf(stderr, "CA state: %d\n", ca_state);
if (ca_state == CA_COMPUTING) {
fprintf(stderr, "computing 1 sec\n");
Expand Down
Loading

0 comments on commit b8c58bc

Please sign in to comment.