-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparfu_boss_functions.cc
202 lines (177 loc) · 7.88 KB
/
parfu_boss_functions.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
////////////////////////////////////////////////////////////////////////////////
//
// University of Illinois/NCSA Open Source License
// http://otm.illinois.edu/disclose-protect/illinois-open-source-license
//
// Parfu is copyright (c) 2017-2022,
// by The Trustees of the University of Illinois.
// All rights reserved.
//
// Parfu was developed by:
// The University of Illinois
// The National Center For Supercomputing Applications (NCSA)
// Blue Waters Science and Engineering Applications Support Team (SEAS)
// Craig P Steffen <[email protected]>
// Roland Haas <[email protected]>
//
// https://github.com/ncsa/parfu_archive_tool
// http://www.ncsa.illinois.edu/People/csteffen/parfu/
//
// For full licnse text see the LICENSE file provided with the source
// distribution.
//
////////////////////////////////////////////////////////////////////////////////
#include "parfu_main.hh"
// rank 0 sending instruction messages to all other ranks
int parfu_broadcast_order(string instruction,
string message){
// this function is specifically assuming that rank 0 is
// sending the message.
// instruction is a string with a single letter
// message is the bulk of the message.
int *message_length=nullptr;
// char *message_buffer=nullptr;
int mpi_return_val;
message_length = new int;
string message_contents = string("");
message_contents.append(instruction);
message_contents.append(message);
// the +1 is because of the null-terminated C string. We're not currently
// using C string functions to parse these messages, but we might, and this
// allows the null to be transmitted and allows this buffer to be safe for
// those functions (I think) in case we change our mind.
*message_length = message_contents.size()+1;
mpi_return_val = MPI_Bcast(message_length,1,MPI_INT,0,MPI_COMM_WORLD);
mpi_return_val += MPI_Bcast(((void*)(message_contents.data())),
message_contents.size()+1,
MPI_CHAR,0,MPI_COMM_WORLD);
delete message_length;
return mpi_return_val;
}
// rank 0 sending individual instruction to a single destination node
int parfu_send_order_to_rank(int dest_rank,
int tag,
string instruction,
string message){
// this function is specifically assuming that rank 0 is
// sending the message.
// instruction is a string with a single letter
// message is the bulk of the message.
int *message_length=nullptr;
int mpi_return_val;
message_length = new int;
string message_contents = string("");
message_contents.append(instruction);
message_contents.append(message);
// the +1 is because of the null-terminated C string. We're not currently
// using C string functions to parse these messages, but we might, and this
// allows the null to be transmitted and allows this buffer to be safe for
// those functions (I think) in case we change our mind.
*message_length = message_contents.size()+1;
mpi_return_val = MPI_Send(message_length,1,MPI_INT,dest_rank,tag,MPI_COMM_WORLD);
mpi_return_val += MPI_Send(((void*)(message_contents.data())),
message_contents.size()+1,
MPI_CHAR,dest_rank,tag,MPI_COMM_WORLD);
delete message_length;
return mpi_return_val;
}
#define INT_STRING_BUFFER_SIZE (20)
int push_out_all_orders(vector <string> *transfer_order_list,
unsigned int total_ranks){
unsigned int next_order=0;
unsigned int next_rank=1;
unsigned int total_orders = transfer_order_list->size();
char *return_receive_buffer=nullptr;
string return_receive_string;
int mpi_return_val;
// MPI_Status message_status;
int worker_rank_received;
// I assume any rank number printed to a string
// will fit within INT_STRING_BUFFER_SIZE characters
return_receive_buffer=(char*)malloc(INT_STRING_BUFFER_SIZE);
// First we distribute initial orders to ranks.
// We start at order index 0 but at rank 1, because
// *we* are rank zero.
cerr << "POAO: A ranks:" << total_ranks << " orders:" << total_orders << "\n";
// cerr << "POAO: A next
while( (next_rank < total_ranks) &&
(next_order < total_orders)){
cerr << "POAO ph1: send order " << next_order << "\n";
parfu_send_order_to_rank(next_rank,
0, // MPI_Send tag=0
string("C"), // C for "create" mode
(transfer_order_list->at(next_order)));
// cerr << "debugL1: sending order " << next_order << "\n";
// update loop
next_order++;
next_rank++;
}
// we've distributed order sets to ranks until we ran out of
// one of them. We need to now deal with our status depending
// on whether we've run out of ranks, run out of order sets,
// or neither.
// If all the orders have been sent, then we don't need to
// worry about sending any more, we just need to wait for
// the returns from each of the order sets, then exit.
// this should work whether or not the ranks have been exhausted
// Because we're only doing as many receives as orders, so the
// ranks that never received any orders won't be sending us
// a complete message.
cerr << "POAO next order:" << next_order << " next rank:" << next_rank << "\n";
if(next_order >= total_orders){
cerr << "POAO: sent all orders, so wait for finished responses; next order:" << next_order <<" \n";
for(unsigned i=0;i<total_orders;i++){
if((mpi_return_val = MPI_Recv((void*)(return_receive_buffer),
INT_STRING_BUFFER_SIZE,MPI_CHAR,
MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,
MPI_STATUS_IGNORE))!=MPI_SUCCESS){
cerr << "push_out_all_orders: MPI_Recv returned " << mpi_return_val << "!\n";
}
// cerr << "individual RX: got buffer.\n";
} // for(unsigned i=0;i<total_orders;i++){
} // if(next_order >= total_orders){
else{
// in this case, we have orders left, which means that every single
// worker rank got orders. So starting here, all worker ranks are busy,
// and we have leftover order sets to hand out.
// As the busy worker ranks finish and send back that they're done, we
// hand each one that does that a new work item while we still have
// any
while(next_order < total_orders){
cerr << "POAO: next order:" << next_order << "out of total:" << total_orders << "\n";
if((mpi_return_val = MPI_Recv((void*)(return_receive_buffer),
INT_STRING_BUFFER_SIZE,MPI_CHAR,
MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,
MPI_STATUS_IGNORE))!=MPI_SUCCESS){
cerr << "push_out_all_orders: MPI_Recv returned " << mpi_return_val << "!\n";
}
// cerr << "debugL1: sending order " << next_order << "\n";
return_receive_string=string(return_receive_buffer);
// cerr << "debug: indicated rank:" << return_receive_string << "\n";
worker_rank_received=stoi(return_receive_string);
parfu_send_order_to_rank(worker_rank_received,
0,
string("C"), // this has the "create" message baked in
// we may want to make this an input parameter
(transfer_order_list->at(next_order)));
cerr << "POAO: sent order " << next_order << " to rank " << worker_rank_received << "\n";
// loop cleanup
next_order++;
} // while
// at this point, all work items have been distributed. So we just need
// to wait for them all to report that they're finished
// [TODO perhaps we should move writing the catalog to here?]
// VERY IMPORTANT!!! Index "i" MUST start at 1, NOT ZERO here. Otherwise,
// you end up waiting for one more receive than you're ever going to get
// and it deadlocks
for(unsigned i=1;i<total_ranks;i++){
if((mpi_return_val = MPI_Recv((void*)(return_receive_buffer),
INT_STRING_BUFFER_SIZE,MPI_CHAR,
MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,
MPI_STATUS_IGNORE))!=MPI_SUCCESS){
cerr << "push_out_all_orders: MPI_Recv returned " << mpi_return_val << "!\n";
} // if((mpi_return_val...
}// for(i=0
} // else
return 0;
}