-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdelegationManager.ml
289 lines (249 loc) · 10.5 KB
/
delegationManager.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
(************************************************************************)
(* * The Coq Proof Assistant / The Coq Development Team *)
(* v * INRIA, CNRS and contributors - Copyright 1999-2019 *)
(* <O___,, * (see CREDITS file for the list of authors) *)
(* \VV/ **************************************************************)
(* // * This file is distributed under the terms of the *)
(* * GNU Lesser General Public License Version 2.1 *)
(* * (see LICENSE file for the text of the license) *)
(************************************************************************)
let debug_delegation_manager = CDebug.create ~name:"vscoq.delegationManager" ()
let log msg = debug_delegation_manager Pp.(fun () ->
str @@ Format.asprintf "[%d] %s" (Unix.getpid ()) msg)
type sentence_id = Stateid.t
type link = {
write_to : Unix.file_descr;
read_from: Unix.file_descr;
}
type execution_status =
| Success of Vernacstate.t option
| Error of string Loc.located * Vernacstate.t option (* State to use for resiliency *)
let write_value { write_to; _ } x =
let data = Marshal.to_bytes x [] in
let datalength = Bytes.length data in
log @@ Printf.sprintf "marshaling %d bytes" datalength;
let writeno = Unix.write write_to data 0 datalength in
assert(writeno = datalength);
log @@ Printf.sprintf "marshaling done";
flush_all ()
let abort_on_unix_error f x =
try
f x
with Unix.Unix_error(e,f,p) ->
Printf.eprintf "Error: %s: %s: %s\n%!" f p (Unix.error_message e);
exit 3
module type Job = sig
type t
val name : string
val binary_name : string
val pool_size : int
type update_request
val appendFeedback : sentence_id -> (Feedback.level * Loc.t option * Pp.t) -> update_request
end
(* One typically created a job id way before the worker is spawned, so we
allocate a slot for the PID, but set it later. The sentence_id is used
for error reporting (e.g. fail to spawn) *)
type job_id = sentence_id * int option ref
let mk_job_id sid = sid, ref None
let cancel_job (_,id) =
match !id with
| None -> ()
| Some pid -> Unix.kill pid 9
(* TODO: this queue should not be here, it should be "per URI" but we want to
keep here the conversion (STM) feedback -> (LSP) feedback *)
let master_feedback_queue = Queue.create ()
let install_feedback send =
Feedback.add_feeder (fun fb ->
match fb.Feedback.contents with
| Feedback.Message(Feedback.Debug,loc,m) ->
(* This is crucial to avoid a busy loop: since a feedback triggers and
event, if SEL debug is on we loop, since processing the debug print
is also a feedback *)
Printf.eprintf "%s\n" @@ Pp.string_of_ppcmds m
| Feedback.Message(lvl,loc,m) -> send (fb.Feedback.span_id,(lvl,loc,m))
| Feedback.AddedAxiom -> send (fb.Feedback.span_id,(Feedback.Warning,None,Pp.str "axiom added"))
| _ -> () (* STM feedbacks are handled differently *))
let master_feeder = install_feedback (fun x -> Queue.push x master_feedback_queue)
let local_feedback : (sentence_id * (Feedback.level * Loc.t option * Pp.t)) Sel.event =
Sel.on_queue master_feedback_queue (fun x -> x)
module MakeWorker (Job : Job) = struct
let debug_worker = CDebug.create ~name:("vscoq.Worker." ^ Job.name) ()
let log_worker msg = debug_worker Pp.(fun () ->
str @@ Format.asprintf " [%d] %s" (Unix.getpid ()) msg)
let install_feedback_worker link =
Feedback.del_feeder master_feeder;
ignore(install_feedback (fun (id,fb) -> write_value link (Job.appendFeedback id fb)))
(* This is the lifetime of a delegation, there is one start event, many progress
evants, then one ending event. *)
type delegation =
| WorkerStart : job_id * 'job * ('job -> send_back:(Job.update_request -> unit) -> unit) * string -> delegation
| WorkerProgress of { link : link; update_request : Job.update_request }
| WorkerEnd of (int * Unix.process_status)
| WorkerIOError of exn
let pr_event = function
| WorkerEnd _ -> Pp.str "WorkerEnd"
| WorkerIOError _ -> Pp.str "WorkerIOError"
| WorkerProgress _ -> Pp.str "WorkerProgress"
| WorkerStart _ -> Pp.str "WorkerStart"
type events = delegation Sel.event list
type role = Master | Worker of link
(* The pool is just a queue of tokens *)
let pool = Queue.create ()
let () = for _i = 0 to Job.pool_size do Queue.push () pool done
(* In order to create a job we enqueue this event *)
let worker_available ~jobs ~fork_action : delegation Sel.event =
Sel.on_queues jobs pool (fun (job_id, job) () ->
WorkerStart (job_id,job,fork_action,Job.binary_name))
(* When a worker is spawn, we enqueue this event, since eventually it will die *)
let worker_ends pid : delegation Sel.event =
Sel.on_death_of ~pid (fun reason -> WorkerEnd(pid,reason))
(* When a worker is spawn, we enqueue this event, since eventually will make progress *)
let worker_progress link : delegation Sel.event =
Sel.on_ocaml_value link.read_from (function
| Error e -> WorkerIOError e
| Ok update_request -> WorkerProgress { link; update_request; })
(* ************ spawning *************************************************** *)
let accept_timeout ?(timeout=2.0) sr =
let r, _, _ = Unix.select [sr] [] [] timeout in
if r = [] then None
else Some (Unix.accept sr)
let fork_worker : job_id -> (role * events, string * events) result = fun (_, job_id) ->
let open Unix in
try
let chan = socket PF_INET SOCK_STREAM 0 in
bind chan (ADDR_INET (Unix.inet_addr_loopback,0));
listen chan 1;
let address = getsockname chan in
log @@ "forking...";
flush_all ();
let null = openfile "/dev/null" [O_RDWR] 0o640 in
let pid = fork () in
if pid = 0 then begin
(* Children process *)
dup2 null stdin;
close chan;
log_worker @@ "borning...";
let chan = socket PF_INET SOCK_STREAM 0 in
connect chan address;
let read_from = chan in
let write_to = chan in
let link = { write_to; read_from } in
install_feedback_worker link;
Ok (Worker link, [])
end else
(* Parent process *)
let () = job_id := Some pid in
match accept_timeout chan with
| None ->
close chan;
log @@ Printf.sprintf "forked pid %d did not connect back" pid;
Unix.kill pid 9;
Error ("worker did not connect back", [worker_ends pid])
| Some (worker, _worker_addr) ->
close chan;
log @@ Printf.sprintf "forked pid %d called back" pid;
let read_from = worker in
let write_to = worker in
let link = { write_to; read_from } in
Ok (Master, [worker_progress link; worker_ends pid])
with Unix_error(e,f,p) ->
Error (f ^": "^ p^": " ^error_message e,[])
;;
let option_name = "-" ^ Str.global_replace (Str.regexp_string " ") "." Job.name ^ "_master_address"
let create_process_worker procname (_,job_id) job =
let open Unix in
try
let chan = socket PF_INET SOCK_STREAM 0 in
bind chan (ADDR_INET (Unix.inet_addr_loopback,0));
listen chan 1;
let port = match getsockname chan with
| ADDR_INET(_,port) -> port
| _ -> assert false in
let null = openfile "/dev/null" [O_RDWR] 0o640 in
let extra_flags = if CDebug.get_flags () = "all" then [|"-debug"|] else [||] in
let args = Array.append [|procname;option_name;string_of_int port|] extra_flags in
let pid = create_process procname args null stdout stderr in
close null;
let () = job_id := Some pid in
log @@ Printf.sprintf "created worker %d, waiting on port %d" pid port;
match accept_timeout chan with
| Some(worker, _worker_addr) ->
close chan;
let read_from = worker in
let write_to = worker in
let link = { write_to; read_from } in
install_feedback_worker link;
log @@ "sending job";
write_value link job;
flush_all ();
log @@ "sent";
Ok [worker_progress link; worker_ends pid]
| None ->
log @@ Printf.sprintf "child process %d did not connect back" pid;
Unix.kill pid 9;
Error ("worker did not connect back", [worker_ends pid])
with Unix_error(e,f,p) ->
Error (f ^": "^ p^": " ^error_message e,[])
(* **************** /spawning ********************************************** *)
let handle_event = function
| WorkerIOError e ->
log @@ "worker IO Error: " ^ Printexc.to_string e;
(None, [])
| WorkerEnd (pid, _status) ->
log @@ Printf.sprintf "worker %d went on holidays" pid;
Queue.push () pool;
(None,[])
| WorkerProgress { link; update_request } ->
log "worker progress";
(Some update_request, [worker_progress link])
| WorkerStart (job_id,job,action,procname) ->
log "worker starts";
if Sys.os_type = "Unix" then
match fork_worker job_id with
| Ok(Master, events) ->
log "worker spawned (fork)";
(None, events)
| Ok(Worker link, _) ->
action job ~send_back:(abort_on_unix_error write_value link);
exit 0
| Error(msg, cleanup_events) ->
log @@ "worker did not spawn: " ^ msg;
(Some(Job.appendFeedback (fst job_id) (Feedback.Error,None,Pp.str msg)), cleanup_events)
else
match create_process_worker procname job_id job with
| Ok events ->
log "worker spawned (create_process)";
(None, events)
| Error(msg, cleanup_events) ->
log @@ "worker did not spawn: " ^ msg;
(Some(Job.appendFeedback (fst job_id) (Feedback.Error,None,Pp.str msg)), cleanup_events)
(* the only option is the socket port *)
type options = int
let setup_plumbing port =
try
let open Unix in
let chan = socket PF_INET SOCK_STREAM 0 in
let address = ADDR_INET (inet_addr_loopback,port) in
log_worker @@ "connecting to " ^ string_of_int port;
connect chan address;
let read_from = chan in
let write_to = chan in
let link = { read_from; write_to } in
(* Unix.read_value does not exist, we use Sel *)
match Sel.wait [Sel.on_ocaml_value read_from (fun x -> x)] with
| [Ok (job : Job.t)], _ -> (write_value link, job)
| [Error exn], _ ->
log_worker @@ "error receiving job: " ^ Printexc.to_string exn;
exit 1
| _ -> assert false
with Unix.Unix_error(code,syscall,param) ->
log_worker @@ Printf.sprintf "error starting: %s: %s: %s" syscall param (Unix.error_message code);
exit 1
let parse_options extra_args =
match extra_args with
| [ o ; port ] when o = option_name -> int_of_string port, []
| _ ->
Printf.eprintf "unknown arguments: %s" (String.concat " " extra_args);
exit 2
let log = log_worker
end