Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make wrappers work with sporadic apps #5410

Merged
merged 5 commits into from
Oct 28, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 96 additions & 6 deletions api/boinc_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@
#include <cstdio>
#include <cstdarg>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <sys/time.h>
Expand Down Expand Up @@ -144,6 +146,10 @@ using std::vector;
// CPPFLAGS=-DGETRUSAGE_IN_TIMER_THREAD
#endif

// Anything shared between the worker and timer thread
// must be declared volatile to ensure that writes in one thread
// are seen immediately by the other.

const char* api_version = "API_VERSION_" PACKAGE_VERSION;
static APP_INIT_DATA aid;
static FILE_LOCK file_lock;
Expand All @@ -155,7 +161,7 @@ static volatile double last_checkpoint_cpu_time;
static volatile bool ready_to_checkpoint = false;
static volatile int in_critical_section = 0;
static volatile double last_wu_cpu_time;
static volatile bool standalone = false;
static volatile bool standalone = true;
static volatile double initial_wu_cpu_time;
static volatile bool have_new_trickle_up = false;
static volatile bool have_trickle_down = true;
Expand Down Expand Up @@ -194,7 +200,9 @@ char remote_desktop_addr[256];
bool send_remote_desktop_addr = false;
int app_min_checkpoint_period = 0;
// min checkpoint period requested by app
SPORADIC_AC_STATE ac_state;
static volatile SPORADIC_AC_STATE ac_state;
static volatile int ac_fd, ca_fd;
static volatile bool do_sporadic_files;

#define TIMER_PERIOD 0.1
// Sleep interval for timer thread;
Expand Down Expand Up @@ -518,6 +526,55 @@ static bool client_dead() {
return false;
}

// called once/sec in timer thread.
// Copy sporadic app messages to/from files (for wrappers)
//
static void sporadic_files() {
static time_t last_ac_mod_time = 0;
static SPORADIC_CA_STATE last_ca_state = CA_NONE;
char buf[256];

// if C->A state has changed, write to file
//
if (last_ca_state != boinc_status.ca_state) {
sprintf(buf, "%d\n", boinc_status.ca_state);
lseek(ca_fd, 0, SEEK_SET);
if (write(ca_fd, buf, sizeof(buf))) {}
// one way to avoid warnings
last_ca_state = boinc_status.ca_state;
}

// check if app has updated file with A->C state
//
struct stat sbuf;
int ret = fstat(ac_fd, &sbuf);
if (!ret) {
#ifdef _WIN32
time_t t = sbuf.st_mtime;
#else
time_t t = sbuf.st_mtim.tv_sec;
#endif
if (t != last_ac_mod_time) {
lseek(ac_fd, 0, SEEK_SET);
int nc = read(ac_fd, buf, sizeof(buf));
if (nc>0) {
int val;
buf[nc] = 0;
int n = sscanf(buf, "%d", &val);
if (n == 1) {
ac_state = (SPORADIC_AC_STATE)val;
} else {
ac_state = AC_NONE;
fprintf(stderr, "API: error parsing AC state: %s\n", buf);
}
last_ac_mod_time = t;
} else {
fprintf(stderr, "API: error reading AC state: %d\n", nc);
}
}
}
}

#ifndef _WIN32
// For multithread apps on Unix, the main process executes the following.
//
Expand Down Expand Up @@ -687,6 +744,7 @@ int boinc_init_options_general(BOINC_OPTIONS& opt) {
}
}

standalone = false;
retval = boinc_parse_init_data_file();
if (retval) {
standalone = true;
Expand Down Expand Up @@ -770,8 +828,10 @@ int boinc_finish_message(int status, const char* msg, bool is_notice) {
boinc_msg_prefix(buf, sizeof(buf)), status
);
finishing = true;
boinc_sleep(2.0); // let the timer thread send final messages
boinc_disable_timer_thread = true; // then disable it
if (!standalone) {
boinc_sleep(2.0); // let the timer thread send final messages
boinc_disable_timer_thread = true; // then disable it
}

if (options.main_program) {
FILE* f = fopen(BOINC_FINISH_CALLED_FILE, "w");
Expand Down Expand Up @@ -882,6 +942,28 @@ int boinc_is_standalone() {
return 0;
}

int boinc_sporadic_dir(const char* dir) {
char buf[MAXPATHLEN];

do_sporadic_files = true;
sprintf(buf, "%s/ac", dir);
ac_fd = open(buf, O_CREAT|O_RDONLY, 0666);
if (ac_fd < 0) {
fprintf(stderr, "can't open sporadic file %s\n", buf);
do_sporadic_files = false;
}
sprintf(buf, "%s/ca", dir);
ca_fd = open(buf, O_CREAT|O_WRONLY, 0666);
if (ca_fd < 0) {
fprintf(stderr, "can't open sporadic file %s\n", buf);
do_sporadic_files = false;
}
if (!do_sporadic_files) return ERR_FOPEN;
boinc_status.ca_state = CA_DONT_COMPUTE;
ac_state = AC_NONE;
return 0;
}

// called from the timer thread if we need to exit,
// e.g. quit message from client, or client has gone away
//
Expand Down Expand Up @@ -993,6 +1075,10 @@ int boinc_report_app_status_aux(
snprintf(buf, sizeof(buf), "<bytes_received>%f</bytes_received>\n", _bytes_received);
safe_strcat(msg_buf, buf);
}
if (ac_state) {
sprintf(buf, "<sporadic_ac>%d</sporadic_ac>\n", ac_state);
strlcat(msg_buf, buf, sizeof(msg_buf));
}
#ifdef MSGS_FROM_FILE
if (fout) {
fputs(msg_buf, fout);
Expand Down Expand Up @@ -1058,7 +1144,7 @@ static int suspend_activities(bool called_from_worker) {
suspend_or_resume_descendants(false);
}
// if called from worker thread, sleep until suspension is over
// if called from time thread, don't need to do anything;
// if called from timer thread, don't need to do anything;
// suspension is done by signal handler in worker thread
//
if (called_from_worker) {
Expand Down Expand Up @@ -1365,6 +1451,10 @@ static void timer_handler() {
app_client_shm->shm->graphics_reply.send_msg(buf);
send_remote_desktop_addr = false;
}

if (do_sporadic_files) {
sporadic_files();
}
}

#ifdef _WIN32
Expand Down Expand Up @@ -1489,7 +1579,7 @@ int start_timer_thread() {

// called in the worker thread.
// set up a handler for SIGALRM.
// If Android, we'll get signals from the time thread.
// If Android, we'll get signals from the timer thread.
// otherwise, set an interval timer to deliver signals
//
static int start_worker_signals() {
Expand Down
1 change: 1 addition & 0 deletions api/boinc_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ extern int boinc_finish_message(
);
extern void boinc_sporadic_set_ac_state(SPORADIC_AC_STATE);
extern SPORADIC_CA_STATE boinc_sporadic_get_ca_state();
extern int boinc_sporadic_dir(const char*);

/////////// API ENDS HERE

Expand Down
4 changes: 2 additions & 2 deletions client/app_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -738,8 +738,8 @@ void ACTIVE_TASK_SET::send_heartbeats() {
if (log_flags.heartbeat_debug) {
if (sent) {
msg_printf(atp->result->project, MSG_INFO,
"[heartbeat] Heartbeat sent to task %s",
atp->result->name
"[heartbeat] Heartbeat sent to task %s: %s",
atp->result->name, buf
);
} else {
msg_printf(atp->result->project, MSG_INFO,
Expand Down
3 changes: 3 additions & 0 deletions client/app_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ void CLIENT_STATE::app_test_init() {
// can put other stuff here like
av->avg_ncpus = 1;
av->flops = 1e9;
#if 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this changed by intention?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

av->gpu_ram = 1e7;
av->gpu_usage.rsc_type = PROC_TYPE_NVIDIA_GPU;
av->gpu_usage.usage = 1;
#endif
app_versions.push_back(av);

WORKUNIT *wu = new WORKUNIT;
Expand All @@ -77,6 +79,7 @@ void CLIENT_STATE::app_test_init() {
wu->rsc_fpops_bound = 1e12;
wu->rsc_memory_bound = 1e9;
wu->rsc_disk_bound = 1e9;
wu->command_line = "--sporadic";
workunits.push_back(wu);

RESULT *res = new RESULT;
Expand Down
10 changes: 6 additions & 4 deletions client/cpu_sched.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1626,10 +1626,12 @@ ACTIVE_TASK* CLIENT_STATE::get_task(RESULT* rp) {
ACTIVE_TASK *atp = lookup_active_task_by_result(rp);
if (!atp) {
atp = new ACTIVE_TASK;
int retval = atp->get_free_slot(rp);
if (retval) {
delete atp;
return NULL;
if (!rp->project->app_test) {
int retval = atp->get_free_slot(rp);
if (retval) {
delete atp;
return NULL;
}
}
atp->init(rp);
active_tasks.active_tasks.push_back(atp);
Expand Down
1 change: 1 addition & 0 deletions client/cs_cmdline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ static void print_options(char* prog) {
" --abort_jobs_on_exit when client exits, abort and report jobs\n"
" --allow_remote_gui_rpc allow remote GUI RPC connections\n"
" --allow_multiple_clients allow >1 instances per host\n"
" --app_test F run a simulated job with the given app\n"
" --attach_project <URL> <key> attach to a project\n"
" --check_all_logins for idle detection, check remote logins too\n"
" --daemon run as daemon (Unix)\n"
Expand Down
1 change: 1 addition & 0 deletions client/cs_sporadic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
SPORADIC_RESOURCES sporadic_resources;

void SPORADIC_RESOURCES::print() {
if (!ncpus_used) return;
msg_printf(NULL, MSG_INFO, "Sporadic resources:");
msg_printf(NULL, MSG_INFO, " %f CPUs", ncpus_used);
msg_printf(NULL, MSG_INFO, " %f MB RAM", mem_used/MEGA);
Expand Down
75 changes: 66 additions & 9 deletions samples/sporadic/sporadic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,27 @@
// when OK, compute for NCOMP secs
// suspend as needed
//
// computing is embedded in loop.
// computing is embedded in the loop.
// in a real app you'd want to use threads

// by default this uses the BOINC API for communicating sporadic state.
// --wrapped: use files instead (run under wrapper)

#define NWAIT 10
#define NCOMP 10

#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>

#include "boinc_api.h"
#include "util.h"
#include "common_defs.h"

void boinc_sporadic_set_ac_state(SPORADIC_AC_STATE);
SPORADIC_CA_STATE boinc_sporadic_get_ca_state();
bool wrapped = false;
int ac_fd, ca_fd;

void compute_one_sec() {
double start = dtime();
Expand All @@ -31,22 +40,70 @@ void compute_one_sec() {
}
}

int main(int, char**) {
boinc_init();
void set_ac_state(SPORADIC_AC_STATE ac_state) {
static SPORADIC_AC_STATE last = AC_NONE;
if (wrapped) {
if (ac_state != last) {
char buf[256];
sprintf(buf, "%d\n", ac_state);
lseek(ac_fd, 0, SEEK_SET);
write(ac_fd, buf, strlen(buf));
}
last = ac_state;
} else {
boinc_sporadic_set_ac_state(ac_state);
}
}

SPORADIC_CA_STATE get_ca_state() {
if (wrapped) {
// could check mod time; don't bother
char buf[256];
lseek(ca_fd, 0, SEEK_SET);
read(ca_fd, buf, sizeof(buf));
int s;
int n = sscanf(buf, "%d", &s);
if (n==1) return (SPORADIC_CA_STATE)s;
fprintf(stderr, "can't read CA state\n");
exit(1);
} else {
return boinc_sporadic_get_ca_state();
}
}

int main(int argc, char** argv) {
SPORADIC_CA_STATE ca_state;
SPORADIC_AC_STATE ac_state;

for (int i=1; i<argc; i++) {
if (!strcmp(argv[i], "--wrapped")) {
wrapped = true;
}
}

if (wrapped) {
ca_fd = open("ca", O_RDONLY);
ac_fd = open("ac", O_WRONLY);
if (ca_fd<0 || ac_fd<0) {
fprintf(stderr, "can't open files\n");
exit(1);
}
} else {
boinc_init();
}

fprintf(stderr, "starting\n");
while (true) {
// wait for a bit
ac_state = AC_DONT_WANT_COMPUTE;
boinc_sporadic_set_ac_state(ac_state);
set_ac_state(ac_state);
for (int i=0; i<NWAIT; i++) {
fprintf(stderr, "sleep - don't want to compute\n");
boinc_sleep(1);
}
// wait until client says we can possibly compute
while (1) {
ca_state = boinc_sporadic_get_ca_state();
ca_state = get_ca_state();
if (ca_state != CA_COULD_COMPUTE) {
fprintf(stderr, "sleep - waiting for COULD_COMPUTE\n");
boinc_sleep(1);
Expand All @@ -56,11 +113,11 @@ int main(int, char**) {
}
// tell the client we want to compute
ac_state = AC_WANT_COMPUTE;
boinc_sporadic_set_ac_state(ac_state);
set_ac_state(ac_state);
int n = NCOMP;
while (true) {
// compute only if client says so
ca_state = boinc_sporadic_get_ca_state();
ca_state = get_ca_state();
fprintf(stderr, "CA state: %d\n", ca_state);
if (ca_state == CA_COMPUTING) {
fprintf(stderr, "computing 1 sec\n");
Expand Down
Loading
Loading