Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Starts periodic schedule automatically #6212

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 54 additions & 17 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ let (queue : t Ipq.t) = Ipq.create 50 queue_default

let lock = Mutex.create ()

let stopping = Atomic.make false

let (loop_thread : Thread.t option ref) = ref None

module Clock = struct
let span s = Mtime.Span.of_uint64_ns (Int64.of_float (s *. 1e9))

Expand All @@ -50,23 +54,18 @@ module Clock = struct
Mtime.min_stamp
end

let add_to_queue name ty start newfunc =
let ( ++ ) = Clock.add_span in
let item =
{Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start}
in
with_lock lock (fun () -> Ipq.add queue item) ;
Delay.signal delay

let remove_from_queue name =
let loop_stop () =
with_lock lock @@ fun () ->
match !pending_event with
| Some ev when ev.name = name ->
pending_event := None
| Some _ | None ->
let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in
if index > -1 then
Ipq.remove queue index
match !loop_thread with
| Some thread ->
Atomic.set stopping true ;
Delay.signal delay ;
Delay.signal delay ;
Thread.join thread ;
Atomic.set stopping false ;
loop_thread := None
| None ->
()

let add_periodic_pending () =
with_lock lock @@ fun () ->
Expand All @@ -84,7 +83,7 @@ let add_periodic_pending () =
let loop () =
debug "%s started" __MODULE__ ;
try
while true do
while not (Atomic.get stopping) do
let now = Mtime_clock.now () in
let deadline, item =
with_lock lock @@ fun () ->
Expand Down Expand Up @@ -133,3 +132,41 @@ let loop () =
error
"Scheduler thread died! This daemon will no longer function well and \
should be restarted."

let loop_start wrapper =
loop_stop () ;
with_lock lock @@ fun () ->
match !loop_thread with
| Some _thread ->
()
| None ->
let loop =
match wrapper with
| Some wrapper ->
fun () -> wrapper loop
| None ->
loop
in
loop_thread := Some (Thread.create loop ()) ;
()

let add_to_queue name ty start newfunc =
let ( ++ ) = Clock.add_span in
let item =
{Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start}
in
with_lock lock (fun () ->
Ipq.add queue item ;
if !loop_thread = None then loop_thread := Some (Thread.create loop ())
) ;
Delay.signal delay

let remove_from_queue name =
with_lock lock @@ fun () ->
match !pending_event with
| Some ev when ev.name = name ->
pending_event := None
| Some _ | None ->
let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in
if index > -1 then
Ipq.remove queue index
4 changes: 2 additions & 2 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ val add_to_queue : string -> func_ty -> float -> (unit -> unit) -> unit
val remove_from_queue : string -> unit
(** Remove a scheduled item by name *)

val loop : unit -> unit
(** The scheduler's main loop, started by {!Xapi} on start-up. *)
val loop_start : ((unit -> unit) -> unit) option -> unit
(** Start the scheduler's main loop. *)
26 changes: 16 additions & 10 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@

module Scheduler = Xapi_stdext_threads_scheduler.Scheduler

let started = Atomic.make false

let start_schedule () =
if not (Atomic.exchange started true) then
Thread.create Scheduler.loop () |> ignore

let send event data = Event.(send event data |> sync)

let receive event = Event.(receive event |> sync)
Expand All @@ -35,7 +29,6 @@ let test_single () =
Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () ->
send finished true
) ;
start_schedule () ;
Alcotest.(check bool) "result" true (receive finished)

let test_remove_self () =
Expand All @@ -49,7 +42,6 @@ let test_remove_self () =
) ;
send which "self"
) ;
start_schedule () ;
let cnt = Mtime_clock.counter () in
Alcotest.(check string) "same event name" "self" (receive which) ;
Alcotest.(check string) "same event name" "stop" (receive which) ;
Expand All @@ -61,7 +53,6 @@ let test_empty () =
Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () ->
send finished true
) ;
start_schedule () ;
Alcotest.(check bool) "finished" true (receive finished) ;
(* wait loop to go to wait with no work to do *)
Thread.delay 0.1 ;
Expand All @@ -79,7 +70,6 @@ let test_wakeup () =
Scheduler.add_to_queue "long" Scheduler.OneShot 2.0 (fun () ->
send which "long"
) ;
start_schedule () ;
(* wait loop to go to wait with no work to do *)
Thread.delay 0.1 ;
let cnt = Mtime_clock.counter () in
Expand All @@ -92,12 +82,28 @@ let test_wakeup () =
let elapsed_ms = elapsed_ms cnt in
Alcotest.check is_less "small time" 150 elapsed_ms

let test_start () =
let which = Event.new_channel () in
Scheduler.add_to_queue "done" Scheduler.OneShot 0.05 (fun () ->
send which "done"
) ;
let wrapper f = send which "wrapper" ; f () in
Scheduler.loop_start (Some wrapper) ;
Alcotest.(check string) "same event name" "wrapper" (receive which) ;
Scheduler.loop_start (Some wrapper) ;
Alcotest.(check string) "same event name" "wrapper" (receive which) ;
let cnt = Mtime_clock.counter () in
Alcotest.(check string) "same event name" "done" (receive which) ;
let elapsed_ms = elapsed_ms cnt in
Alcotest.check is_less "small time" 100 elapsed_ms

let tests =
[
("test_single", `Quick, test_single)
; ("test_remove_self", `Quick, test_remove_self)
; ("test_empty", `Quick, test_empty)
; ("test_wakeup", `Quick, test_wakeup)
; ("test_start", `Quick, test_start)
]

let () = Alcotest.run "Scheduler" [("generic", tests)]
1 change: 0 additions & 1 deletion ocaml/tests/common/test_event_common.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ let start_periodic_scheduler () =
else (
Scheduler.add_to_queue "dummy" (Scheduler.Periodic 60.0) 0.0 (fun () -> ()) ;
Xapi_event.register_hooks () ;
ignore (Thread.create Scheduler.loop ()) ;
ps_start := true
) ;
Mutex.unlock scheduler_mutex
Expand Down
12 changes: 10 additions & 2 deletions ocaml/xapi/xapi.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,13 @@ let server_init () =
)
in
try
let scheduler_name = "Periodic scheduler" in
Server_helpers.exec_with_new_task "server_init" (fun __context ->
let scheduler_wrapper func =
Server_helpers.exec_with_subtask ~__context scheduler_name
(fun ~__context -> Startup.thread_exn_wrapper scheduler_name func
)
in
Startup.run ~__context
[
("XAPI SERVER STARTING", [], print_server_starting_message)
Expand Down Expand Up @@ -1113,8 +1119,10 @@ let server_init () =
, bring_up_management_if ~__context
)
; ( "Starting periodic scheduler"
, [Startup.OnThread]
, Xapi_stdext_threads_scheduler.Scheduler.loop
, []
, fun () ->
Xapi_stdext_threads_scheduler.Scheduler.loop_start
(Some scheduler_wrapper)
)
; ( "Synchronising host configuration files"
, []
Expand Down
Loading