diff --git a/doc/content/xapi-guard/_index.md b/doc/content/xapi-guard/_index.md index bcafb968b07..433f92f9d5e 100644 --- a/doc/content/xapi-guard/_index.md +++ b/doc/content/xapi-guard/_index.md @@ -17,6 +17,8 @@ Principles 2. Xenopsd is able to control xapi-guard through message switch, this access is not limited. 3. Listening to domain socket is restored whenever the daemon restarts to minimize disruption of running domains. +4. Disruptions to requests when xapi is unavailable is minimized. + The startup procedure is not blocked by the availability of xapi, and write requests from domains must not fail because xapi is unavailable. Overview @@ -26,3 +28,71 @@ Xapi-guard forwards calls from domains to xapi to persist UEFI variables, and up To do this, it listens to 1 socket per service (varstored, or swtpm) per domain. To create these sockets before the domains are running, it listens to a message-switch socket. This socket listens to calls from xenopsd, which orchestrates the domain creation. + +To protect the domains from xapi being unavailable transiently, xapi-guard provides an on-disk cache for vTPM writes. +This cache acts as a buffer and stores the requests temporarily until xapi can be contacted again. +This situation usually happens when xapi is being restarted as part of an update. +SWTPM, the vTPM daemon, reads the contents of the TPM from xapi-guard on startup, suspend, and resume. +During normal operation SWTPM does not send read requests from xapi-guard. + +Structure +--------- + +The cache module consists of two Lwt threads, one that writes to disk, and another one that reads from disk. +The writer is triggered when a VM writes to the vTPM. +It never blocks if xapi is unreachable, but responds as soon as the data has been stored either by xapi or on the local disk, such that the VM receives a timely response to the write request. +Both try to send the requests to xapi, depending on the state, to attempt write all the cached data back to xapi, and stop using the cache. +The threads communicate through a bounded queue, this is done to limit the amount of memory used. +This queue is a performance optimisation, where the writer informs the reader precisely which are the names of the cache files, such that the reader does not need to list the cache directory. +And a full queue does not mean data loss, just a loss of performance; vTPM writes are still cached. + +This means that the cache operates in three modes: +- Direct: during normal operation the disk is not used at all +- Engaged: both threads use the queue to order events +- Disengaged: A thread dumps request to disk while the other reads the cache + until it's empty + +```mermaid +--- +title: Cache State +--- +stateDiagram-v2 + Disengaged + note right of Disengaged + Writer doesn't add requests to queue + Reader reads from cache and tries to push to xapi + end note + Direct + note left of Direct + Writer bypasses cache, send to xapi + Reader waits + end note + Engaged + note right of Engaged + Writer writes to cache and adds requests to queue + Reader reads from queue and tries to push to xapi + end note + + [*] --> Disengaged + + Disengaged --> Disengaged : Reader pushed pending TPMs to xapi, in the meantime TPMs appeared in the cache + Disengaged --> Direct : Reader pushed pending TPMs to xapi, cache is empty + + Direct --> Direct : Writer receives TPM, sent to xapi + Direct --> Engaged : Writer receives TPM, error when sent to xapi + + Engaged --> Direct : Reader sent TPM to xapi, finds an empty queue + Engaged --> Engaged : Writer receives TPM, queue is not full + Engaged --> Disengaged : Writer receives TPM, queue is full +``` + +Startup +------ + +At startup, there's a dedicated routine to transform the existing contents of the cache. +This is currently done because the timestamp reference change on each boot. +This means that the existing contents might have timestamps considered more recent than timestamps of writes coming from running events, leading to missing content updates. +This must be avoided and instead the updates with offending timestamps are renamed to a timestamp taken from the current timestamp, ensuring a consistent +ordering. +The routine is also used to keep a minimal file tree: unrecognised files are deleted, temporary files created to ensure atomic writes are left untouched, and empty directories are deleted. +This mechanism can be changed in the future to migrate to other formats. diff --git a/ocaml/xapi-guard/lib/disk_cache.ml b/ocaml/xapi-guard/lib/disk_cache.ml new file mode 100644 index 00000000000..0f0a6e2c248 --- /dev/null +++ b/ocaml/xapi-guard/lib/disk_cache.ml @@ -0,0 +1,604 @@ +(* Copyright (C) Cloud Software Group, Inc. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; version 2.1 only. with the special + exception on linking described in file LICENSE. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. +*) + +module D = Debug.Make (struct let name = __MODULE__ end) + +let ( // ) = Filename.concat + +let runtime_data = "/var/lib" // "xapi-guard" + +let ( let* ) = Lwt.bind + +let ( let@ ) f x = f x + +let with_lock = Lwt_mutex.with_lock + +type t = Uuidm.t * Mtime.t * Types.Tpm.key + +let cache_of service = runtime_data // Types.Service.to_string service + +let fistpoint () = + let name = "/tmp/fist_disable_xapi_guard_cache" in + Lwt.catch + (fun () -> + let* () = Lwt_unix.access name [Unix.F_OK] in + Lwt.return true + ) + (fun _ -> Lwt.return false) + +let files_in dir ~otherwise = + Lwt.catch + (fun () -> + let* listing = Lwt_unix.files_of_directory dir |> Lwt_stream.to_list in + List.filter_map + (function "." | ".." -> None | name -> Some (dir // name)) + listing + |> Lwt.return + ) + otherwise + +let unlink_safe file = + let __FUN = __FUNCTION__ in + Lwt.catch + (fun () -> Lwt_unix.unlink file) + (function + | Unix.(Unix_error (ENOENT, _, _)) -> + Lwt.pause () + | e -> + D.info "%s: error %s when deleting %s, ignoring" __FUN + (Printexc.to_string e) file ; + Lwt.pause () + ) + +type valid_file = t * string + +type file = + | Latest of valid_file + | Outdated of valid_file + | Temporary of string + | Invalid of string + +let path_of_key root (uuid, timestamp, key) = + root + // Uuidm.to_string uuid + // Types.Tpm.(serialize_key key |> string_of_int) + // Mtime.(to_uint64_ns timestamp |> Int64.to_string) + +let key_of_path path = + let ( let* ) = Option.bind in + let key_dir = Filename.(dirname path) in + let* uuid = Filename.(basename (dirname key_dir)) |> Uuidm.of_string in + let* key = + Filename.basename key_dir + |> int_of_string_opt + |> Option.map Types.Tpm.deserialize_key + in + let* timestamp = + Filename.basename path + |> Int64.of_string_opt + |> Option.map Mtime.of_uint64_ns + in + Some ((uuid, timestamp, key), path) + +let path_is_temp path = + let pathlen = String.length path in + String.ends_with ~suffix:".pre" path + && key_of_path (String.sub path 0 (pathlen - 4)) |> Option.is_some + +let temp_of_path path = path ^ ".pre" + +let sort_updates contents = + let classify elem = + match key_of_path elem with + | None -> + let file = + if path_is_temp elem then + Temporary elem + else + Invalid elem + in + Either.Right file + | Some valid_file -> + Either.Left valid_file + in + let valid_files, invalid = List.partition_map classify contents in + + let valid = + let ordered = + List.fast_sort + (fun ((_, x, _), _) ((_, y, _), _) -> Mtime.compare y x) + valid_files + in + match ordered with + | [] -> + [] + | latest :: outdated -> + Latest latest :: List.map (fun outdated -> Outdated outdated) outdated + in + List.concat [valid; invalid] + +let get_all_contents root = + let empty = Fun.const (Lwt.return []) in + let contents_of_key key = + let* contents = files_in key ~otherwise:empty in + Lwt.return (sort_updates contents) + in + let* tpms = files_in root ~otherwise:empty in + let* files = + Lwt_list.map_p + (fun tpm -> + let* keys = files_in tpm ~otherwise:empty in + Lwt_list.map_p contents_of_key keys + ) + tpms + in + Lwt.return List.(concat (concat files)) + +(** Warning, may raise Unix.Unix_error *) +let read_from ~filename = + let flags = Unix.[O_RDONLY] in + let perm = 0o000 in + Lwt_io.with_file ~flags ~perm ~mode:Input filename Lwt_io.read + +let persist_to ~filename:f_path ~contents = + let atomic_write_to_file ~perm f = + let tmp_path = temp_of_path f_path in + let dirname = Filename.dirname f_path in + let flags = Unix.[O_WRONLY; O_CREAT; O_SYNC] in + let* fd_tmp = Lwt_unix.openfile tmp_path flags perm in + let* () = + Lwt.finalize + (fun () -> + (* do not close fd when closing the channel, avoids double-closing the fd *) + let close () = Lwt.return_unit in + let chan = Lwt_io.of_fd ~mode:Output ~close fd_tmp in + let* () = + Lwt.finalize (fun () -> f chan) (fun () -> Lwt_io.close chan) + in + Lwt_unix.fsync fd_tmp + ) + (fun () -> Lwt_unix.close fd_tmp) + in + let* () = Lwt_unix.rename tmp_path f_path in + let* fd_dir = Lwt_unix.openfile dirname [O_RDONLY] 0 in + Lwt.finalize + (fun () -> Lwt_unix.fsync fd_dir) + (fun () -> Lwt_unix.close fd_dir) + in + let write out_chan = Lwt_io.write out_chan contents in + atomic_write_to_file ~perm:0o600 write + +(** - Direct: request doesn't pass through the cache + - Engaged: both side coordinate through the queue, writer ends the mode + when the queue has been filled. + - Disengaged: writer ignores the queue, reader empties it and the cache; + then it changes the mode to engaged. +*) +type state = Direct | Engaged | Disengaged + +type channel = { + queue: t Lwt_bounded_stream.t + ; push: t option -> unit option + ; lock: Lwt_mutex.t (* lock for the states *) + ; mutable state: state +} + +(* + Notes: + - uses Mtime.t to force usage of monotonic time + - This means that between runs (and reboots) cached stated is lost if not + persisted first. + IDEA: carryover: read contents of cache and "convert it" to the current run + + TODO: + - Exponential backoff on xapi push error + - Limit error logging on xapi push error: once per downtime is enough + *) + +module Writer : sig + val with_cache : + direct: + (t -> (string, exn) Lwt_result.t) + * (t -> string -> (unit, exn) Lwt_result.t) + -> Types.Service.t + -> channel + -> ((t -> string Lwt.t) * (t -> string -> unit Lwt.t) -> 'a Lwt.t) + -> 'a Lwt.t + (** [with_cache ~direct typ queue context] creates a cache for content of + type [typ]. The cache is readable and writable through the function + [context], which is provided a reading and writing functions [direct]. + It uses [channel] to push events to + + Example: + Xapi_guard.Disk_cache.(Writer.with_cache ~direct:(read, upload) Tpm channel) + @@ fun read_tpm, write_tpm -> write_tpm (uuid, time, key) contents + *) +end = struct + let mkdir_p ?(perm = 0o755) path = + let rec loop acc path = + let create_dir () = Lwt_unix.mkdir path perm in + let create_subdirs () = Lwt_list.iter_s (fun (_, f) -> f ()) acc in + Lwt.try_bind create_dir create_subdirs (function + | Unix.(Unix_error (EEXIST, _, _)) -> + (* create directories, parents first *) + create_subdirs () + | Unix.(Unix_error (ENOENT, _, _)) -> + let parent = Filename.dirname path in + loop ((path, create_dir) :: acc) parent + | exn -> + let msg = + Printf.sprintf {|Could not create directory "%s" because: %s|} + path (Printexc.to_string exn) + in + Lwt.fail (Failure msg) + ) + in + loop [] path + + let files_in_existing dir = + let create_dir = function + | Unix.(Unix_error (ENOENT, _, _)) -> + let* () = mkdir_p dir ~perm:0o700 in + Lwt.return [] + | e -> + raise e + in + files_in dir ~otherwise:create_dir + + let fail exn = + Debug.log_backtrace exn (Backtrace.get exn) ; + Lwt_result.fail exn + + let read_contents ~direct root (uuid, now, key) = + let read_remote () = + let read, _ = direct in + let* result = + Lwt.try_bind + (fun () -> read (uuid, now, key)) + (function + | Ok contents -> Lwt_result.return contents | Error exn -> fail exn + ) + fail + in + match result with + | Ok contents -> + Lwt.return contents + | Error exn -> + raise exn + in + + let key_str = Types.Tpm.(serialize_key key |> string_of_int) in + let key_dir = root // Uuidm.(to_string uuid) // key_str in + + (* 1. Get updates *) + let* contents = files_in key_dir ~otherwise:(fun _ -> Lwt.return []) in + let updates = sort_updates contents in + + (* 2. Pick latest *) + let only_latest = function + | Latest (_, p) -> + Either.Left p + | Temporary p | Outdated (_, p) | Invalid p -> + Right p + in + let latest, _ = List.partition_map only_latest updates in + + (* 3. fall back to remote read if needed *) + let get_contents path = + Lwt.catch (fun () -> read_from ~filename:path) (fun _ -> read_remote ()) + in + + match latest with path :: _ -> get_contents path | [] -> read_remote () + + let write_contents ~direct root queue (uuid, now, key) contents = + let __FUN = __FUNCTION__ in + + let _, direct = direct in + let key_str = Types.Tpm.(serialize_key key |> string_of_int) in + let key_dir = root // Uuidm.(to_string uuid) // key_str in + (* 1. Record existing requests in cache *) + let* outdated_contents = files_in_existing key_dir in + + let filename = key_dir // (Mtime.to_uint64_ns now |> Int64.to_string) in + + (* 2. Try to push the changes, if possible. If it's not possible because of + the mode or a failure, write new timestamped content to cache, + atomically; and finally notify the other side if needed *) + (* Note that all queue operations must use while holding its mutex *) + let persist () = persist_to ~filename ~contents in + let persist_and_push () = + let push () = + match queue.push (Some (uuid, now, key)) with + | Some () -> + Lwt.return_unit + | None -> + (* Queue is full, change mode to ignore queue *) + queue.state <- Disengaged ; + Lwt.return_unit + in + let* () = persist () in + push () + in + let engage_and_persist exn = + queue.state <- Engaged ; + D.info "%s: Error on push. Reason: %s" __FUN (Printexc.to_string exn) ; + let* () = persist_and_push () in + Lwt_result.return () + in + let read_state_and_push on_exception () = + match queue.state with + | Direct -> + let* result = + Lwt.try_bind + (fun () -> direct (uuid, now, key) contents) + (function + | Ok () -> Lwt_result.return () | Error exn -> on_exception exn + ) + on_exception + in + Lwt.return result + | Engaged -> + let* () = persist_and_push () in + Lwt_result.return () + | Disengaged -> + let* () = persist () in + Lwt_result.return () + in + let* cache_disabled = fistpoint () in + let on_exception = if cache_disabled then fail else engage_and_persist in + + let* result = with_lock queue.lock (read_state_and_push on_exception) in + let* () = + match result with Ok () -> Lwt.return_unit | Error exn -> raise exn + in + + (* 4. Delete previous requests from filesystem *) + let* _ = Lwt_list.map_p unlink_safe outdated_contents in + Lwt.return_unit + + let with_cache ~direct typ queue f = + let root = cache_of typ in + let* () = mkdir_p root ~perm:0o700 in + f (read_contents ~direct root, write_contents ~direct root queue) +end + +module Watcher : sig + val watch : + direct:(t -> string -> (unit, exn) Lwt_result.t) + -> Types.Service.t + -> channel + -> unit + -> unit Lwt.t +end = struct + type push_cache = File of valid_file | Update_all | Wait + + (* Outdated and invalid files can be deleted, keep temporary files just in case + they need to be recovered *) + let discarder = function + | Latest _ as f -> + Either.Left f + | Temporary _ as f -> + Left f + | Outdated (_, p) -> + Right p + | Invalid p -> + Right p + + let get_latest_and_delete_rest root = + let* files = get_all_contents root in + let keep, to_delete = List.partition_map discarder files in + let* () = Lwt_list.iter_p unlink_safe to_delete in + (* Ignore temporaty files *) + let latest = + List.filter_map (function Latest f -> Some f | _ -> None) keep + in + Lwt.return latest + + let retry_push push (uuid, timestamp, key) contents = + let __FUN = __FUNCTION__ in + let push' () = push (uuid, timestamp, key) contents in + let rec retry k = + let on_error e = + D.info "%s: Error on push, attempt %i. Reason: %s" __FUN k + (Printexc.to_string e) ; + let* () = Lwt_unix.sleep 0.1 in + retry (k + 1) + in + Lwt.try_bind push' + (function Ok () -> Lwt.return_unit | Error e -> on_error e) + on_error + in + retry 1 + + let push_file push (key, path) = + let __FUN = __FUNCTION__ in + let on_error = function + | Unix.(Unix_error (ENOENT, _, _)) -> + Lwt.return_unit + | exn -> + D.info "%s: error when reading '%s': %s" __FUN path + Printexc.(to_string exn) ; + Lwt.return_unit + in + + Lwt.try_bind + (fun () -> read_from ~filename:path) + (fun contents -> + let* () = retry_push push key contents in + unlink_safe path + ) + on_error + + let push_files push files = Lwt_list.iter_s (push_file push) (List.rev files) + + let update_all queue push root = + let __FUN = __FUNCTION__ in + let* contents = get_latest_and_delete_rest root in + let* () = push_files push contents in + let@ () = with_lock queue.lock in + let* contents = get_latest_and_delete_rest root in + let* () = + match contents with + | [] -> + queue.state <- Direct ; + D.debug "%s: Cache clean; Going direct" __FUN ; + Lwt.return_unit + | _ -> + Lwt.return_unit + in + Lwt.return_unit + + let resolve queue push root = function + | File file -> ( + let* () = push_file push file in + let@ () = with_lock queue.lock in + match queue.state with + | Direct | Disengaged -> + Lwt.return_unit + | Engaged -> + let () = + if Lwt_bounded_stream.size queue.queue = 0 then + queue.state <- Direct + in + Lwt.return_unit + ) + | Update_all -> + update_all queue push root + | Wait -> + (* Do not busy loop when the system can cope with the requests *) + Lwt_unix.sleep 0.2 + + let watch ~direct typ queue = + let root = cache_of typ in + let __FUN = __FUNCTION__ in + let rec loop () = + (* When the pushing side is disengaged it doesn't push events to the + queue, this means that trying to drain it completely would leave the + pulling side locked waiting when the queue is empty. + - Read the number of elements in the queue while draining it and + then switch to read the contents from the cache; or + - Switch immediately to reading the contents from cache and ignore + the contents of the queue by calling an specialized method in the + queue module to drain it. + *) + let get_action () = + let@ () = with_lock queue.lock in + match queue.state with + | Disengaged when Lwt_bounded_stream.size queue.queue < 1 -> + let* () = Lwt.pause () in + Lwt.return Update_all + | Direct -> + let* () = Lwt.pause () in + Lwt.return Wait + | _ -> ( + let* elem = Lwt_bounded_stream.get queue.queue in + match elem with + | None -> + raise (Failure "Other side closed channel, cannot continue") + | Some elem -> + Lwt.return (File (elem, path_of_key root elem)) + ) + in + let* action = get_action () in + let* () = resolve queue direct root action in + loop () + in + loop +end + +(** Module use to change the cache contents before the reader and writer start + running *) +module Setup : sig + val retime_cache_contents : Types.Service.t -> unit Lwt.t +end = struct + type file_action = + | Keep of file + | Delete of string + | Move of {from: string; into: string} + + let get_fs_action root now = function + | Latest ((uuid, timestamp, key), from) as latest -> + if Mtime.is_later ~than:now timestamp then + let timestamp = now in + let into = path_of_key root (uuid, timestamp, key) in + Move {from; into} + else + Keep latest + | Temporary _ as temp -> + Keep temp + | Invalid p | Outdated (_, p) -> + Delete p + + let commit __FUN = function + | Keep (Temporary p) -> + D.warn "%s: Found temporary file, ignoring '%s'" __FUN p ; + Lwt.return_unit + | Keep _ -> + Lwt.return_unit + | Delete p -> + D.info "%s: Deleting '%s'" __FUN p ; + Lwt_unix.unlink p + | Move {from; into} -> + D.info "%s: Moving '%s' to '%s'" __FUN from into ; + Lwt_unix.rename from into + + let rec delete_empty_dirs ~delete_root root = + (* Delete subdirectories, then *) + let* files = files_in root ~otherwise:(fun _ -> Lwt.return []) in + let* () = + Lwt_list.iter_p + (fun path -> + let* {st_kind; _} = Lwt_unix.stat path in + match st_kind with + | S_DIR -> + delete_empty_dirs ~delete_root:true path + | _ -> + Lwt.return_unit + ) + files + in + if not delete_root then + Lwt.return_unit + else + let* files = files_in root ~otherwise:(fun _ -> Lwt.return []) in + Lwt.catch + (fun () -> + if files = [] then + Lwt_unix.rmdir root + else + Lwt.return_unit + ) + (fun _ -> Lwt.return_unit) + + (* The code assumes it's the only with access to the disk cache while running *) + let retime_cache_contents typ = + let now = Mtime_clock.now () in + let root = cache_of typ in + let* contents = get_all_contents root in + let* () = + contents + |> List.map (get_fs_action root now) + |> Lwt_list.iter_p (commit __FUNCTION__) + in + delete_empty_dirs ~delete_root:false root +end + +let setup typ read write = + let* () = Setup.retime_cache_contents typ in + let queue, push = Lwt_bounded_stream.create 2 in + let lock = Lwt_mutex.create () in + let q = {queue; push; lock; state= Disengaged} in + Lwt.return + ( Writer.with_cache ~direct:(read, write) typ q + , Watcher.watch ~direct:write typ q + ) diff --git a/ocaml/xapi-guard/lib/disk_cache.mli b/ocaml/xapi-guard/lib/disk_cache.mli new file mode 100644 index 00000000000..08c345615f5 --- /dev/null +++ b/ocaml/xapi-guard/lib/disk_cache.mli @@ -0,0 +1,31 @@ +(* Copyright (C) Cloud Software Group, Inc. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; version 2.1 only. with the special + exception on linking described in file LICENSE. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. +*) + +(** [t] t is the minimal type to recognise elements in a cache. This does not + contain the contents of the elements being contained, only the metadata *) +type t = Uuidm.t * Mtime.t * Types.Tpm.key + +val setup : + Types.Service.t + -> (t -> (string, exn) Lwt_result.t) + -> (t -> string -> (unit, exn) Lwt_result.t) + -> ( ( ((t -> string Lwt.t) * (t -> string -> unit Lwt.t) -> 'a Lwt.t) + -> 'a Lwt.t + ) + * (unit -> unit Lwt.t) + ) + Lwt.t +(** [setup service read_callback push_callback] Returns a local disk buffer for + [service] which will use [push_callback] to push the elements to their + final destination and [read_callback] to read elements if they are not in + the buffer. *) diff --git a/ocaml/xapi-guard/lib/dune b/ocaml/xapi-guard/lib/dune index 87d10e7766e..052810ead5f 100644 --- a/ocaml/xapi-guard/lib/dune +++ b/ocaml/xapi-guard/lib/dune @@ -20,10 +20,13 @@ ) (library (name xapi_guard) - (modules dorpc types) + (modules dorpc types disk_cache lwt_bounded_stream) (libraries rpclib.core + inotify + inotify.lwt lwt + lwt.unix uri xapi-backtrace xapi-consts diff --git a/ocaml/xapi-guard/lib/lwt_bounded_stream.ml b/ocaml/xapi-guard/lib/lwt_bounded_stream.ml new file mode 100644 index 00000000000..90efe83758b --- /dev/null +++ b/ocaml/xapi-guard/lib/lwt_bounded_stream.ml @@ -0,0 +1,48 @@ +(* + * Copyright (c) 2012 Citrix Systems + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +let ( let* ) = Lwt.bind + +type 'a t = {stream: 'a Lwt_stream.t; capacity: int; size: int ref} + +let create capacity = + let stream, stream_push = Lwt_stream.create () in + let t = {stream; capacity; size= ref 0} in + let push = function + | Some _ when !(t.size) > t.capacity -> + None + | None -> + stream_push None ; Some () + | elem -> + stream_push elem ; incr t.size ; Some () + in + (t, push) + +let size {size; _} = !size + +let get_available t = + let all = Lwt_stream.get_available t.stream in + t.size := !(t.size) - List.length all ; + all + +let get t = + let* elem = Lwt_stream.get t.stream in + decr t.size ; Lwt.return elem + +let nget n t = + let* all = Lwt_stream.nget n t.stream in + t.size := !(t.size) - List.length all ; + Lwt.return all diff --git a/ocaml/xapi-guard/lib/lwt_bounded_stream.mli b/ocaml/xapi-guard/lib/lwt_bounded_stream.mli new file mode 100644 index 00000000000..b2b310f77e0 --- /dev/null +++ b/ocaml/xapi-guard/lib/lwt_bounded_stream.mli @@ -0,0 +1,34 @@ +(* + * Copyright (c) 2012 Citrix Systems + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +(** Similar to Lwt_stream.bounded_push except threads never block in push() *) +type 'a t + +val create : int -> 'a t * ('a option -> unit option) +(** [create capacity] creates a stream which can contain at most + [capacity] elements *) + +val get_available : 'a t -> 'a list +(** [get_available t] returns all available elements from [t] without blocking *) + +val get : 'a t -> 'a option Lwt.t +(** [get t] returns an element from [t] *) + +val nget : int -> 'a t -> 'a list Lwt.t +(** [nget n t] returns [n] elements from [t] *) + +val size : 'a t -> int +(** [size t] return the number of enqueued elements *) diff --git a/ocaml/xapi-guard/lib/server_interface.ml b/ocaml/xapi-guard/lib/server_interface.ml index 4bcaae8a387..0884c2bf1b2 100644 --- a/ocaml/xapi-guard/lib/server_interface.ml +++ b/ocaml/xapi-guard/lib/server_interface.ml @@ -18,6 +18,7 @@ open Lwt.Syntax module D = Debug.Make (struct let name = __MODULE__ end) open D +module Tpm = Xapi_guard.Types.Tpm type rpc_t = Rpc.t @@ -50,8 +51,8 @@ let () = * this is only needed for syscalls that would otherwise block *) Lwt_unix.set_pool_size 16 -let with_xapi ~cache f = - Lwt_unix.with_timeout 120. (fun () -> SessionCache.with_session cache f) +let with_xapi ~cache ?(timeout = 120.) f = + Lwt_unix.with_timeout timeout (fun () -> SessionCache.with_session cache f) let serve_forever_lwt path callback = let conn_closed _ = () in @@ -99,109 +100,70 @@ let serve_forever_lwt_callback rpc_fn path _ req body = in Cohttp_lwt_unix.Server.respond_string ~status:`Method_not_allowed ~body () -(* The TPM has 3 kinds of states *) -type state = { - permall: string (** permanent storage *) - ; savestate: string (** for ACPI S3 *) - ; volatilestate: string (** for snapshot/migration/etc. *) -} - -let split_char = ' ' - -let join_string = String.make 1 split_char - -let deserialize t = - match String.split_on_char split_char t with - | [permall] -> - (* backwards compat with reading tpm2-00.permall *) - {permall; savestate= ""; volatilestate= ""} - | [permall; savestate; volatilestate] -> - {permall; savestate; volatilestate} - | splits -> - Fmt.failwith "Invalid state: too many splits %d" (List.length splits) - -let serialize t = - (* it is assumed that swtpm has already base64 encoded this *) - String.concat join_string [t.permall; t.savestate; t.volatilestate] - -let lookup_key key t = - match key with - | "/tpm2-00.permall" -> - t.permall - | "/tpm2-00.savestate" -> - t.savestate - | "/tpm2-00.volatilestate" -> - t.volatilestate - | s -> - Fmt.invalid_arg "Unknown TPM state key: %s" s - -let update_key key state t = - if String.contains state split_char then - Fmt.invalid_arg - "State to be stored (%d bytes) contained forbidden separator: %c" - (String.length state) split_char ; - match key with - | "/tpm2-00.permall" -> - {t with permall= state} - | "/tpm2-00.savestate" -> - {t with savestate= state} - | "/tpm2-00.volatilestate" -> - {t with volatilestate= state} - | s -> - Fmt.invalid_arg "Unknown TPM state key: %s" s +let with_xapi_vtpm ~cache vm_uuid = + let vm_uuid_str = Uuidm.to_string vm_uuid in + let* vm = + with_xapi ~cache @@ Xen_api_lwt_unix.VM.get_by_uuid ~uuid:vm_uuid_str + in + let* vTPMs = with_xapi ~cache @@ Xen_api_lwt_unix.VM.get_VTPMs ~self:vm in + match vTPMs with + | [] -> + D.warn + "%s: received a request from a VM that has no VTPM associated, \ + ignoring request" + __FUNCTION__ ; + let msg = + Printf.sprintf "No VTPM associated with VM %s, nothing to do" + vm_uuid_str + in + raise (Failure msg) + | self :: _ -> + Lwt.return self + +let push_vtpm ~cache (vm_uuid, _timestamp, key) contents = + let* self = with_xapi_vtpm ~cache vm_uuid in + let* old_contents = with_xapi ~cache @@ VTPM.get_contents ~self in + let contents = + Tpm.(old_contents |> deserialize |> update key contents |> serialize) + in + let* () = with_xapi ~cache @@ VTPM.set_contents ~self ~contents in + Lwt_result.return () -let empty = "" +let read_vtpm ~cache (vm_uuid, _timestamp, key) = + let* self = with_xapi_vtpm ~cache vm_uuid in + let* contents = with_xapi ~cache @@ VTPM.get_contents ~self in + let body = Tpm.(contents |> deserialize |> lookup ~key) in + Lwt_result.return body -let serve_forever_lwt_callback_vtpm ~cache mutex vm_uuid _ req body = - let get_vtpm_ref () = - let* vm = - with_xapi ~cache @@ Xen_api_lwt_unix.VM.get_by_uuid ~uuid:vm_uuid - in - let* vTPMs = with_xapi ~cache @@ Xen_api_lwt_unix.VM.get_VTPMs ~self:vm in - match vTPMs with - | [] -> - D.warn - "%s: received a request from a VM that has no VTPM associated, \ - ignoring request" - __FUNCTION__ ; - let msg = - Printf.sprintf "No VTPM associated with VM %s, nothing to do" vm_uuid - in - raise (Failure msg) - | self :: _ -> - let* uuid = with_xapi ~cache @@ Xen_api_lwt_unix.VTPM.get_uuid ~self in - with_xapi ~cache @@ VTPM.get_by_uuid ~uuid - in +let serve_forever_lwt_callback_vtpm ~cache mutex (read, persist) vm_uuid _ req + body = let uri = Cohttp.Request.uri req in + let timestamp = Mtime_clock.now () in (* in case the connection is interrupted/etc. we may still have pending operations, so use a per vTPM mutex to ensure we really only have 1 pending operation at a time for a vTPM *) Lwt_mutex.with_lock mutex @@ fun () -> (* TODO: some logging *) match (Cohttp.Request.meth req, Uri.path uri) with - | `GET, key when key <> "/" -> - let* self = get_vtpm_ref () in - let* contents = with_xapi ~cache @@ VTPM.get_contents ~self in - let body = contents |> deserialize |> lookup_key key in + | `GET, path when path <> "/" -> + let key = Tpm.key_of_swtpm path in + let* body = read (vm_uuid, timestamp, key) in let headers = Cohttp.Header.of_list [("Content-Type", "application/octet-stream")] in Cohttp_lwt_unix.Server.respond_string ~headers ~status:`OK ~body () - | `PUT, key when key <> "/" -> + | `PUT, path when path <> "/" -> let* body = Cohttp_lwt.Body.to_string body in - let* self = get_vtpm_ref () in - let* contents = with_xapi ~cache @@ VTPM.get_contents ~self in - let contents = - contents |> deserialize |> update_key key body |> serialize - in - let* () = with_xapi ~cache @@ VTPM.set_contents ~self ~contents in + let key = Tpm.key_of_swtpm path in + let* () = persist (vm_uuid, timestamp, key) body in Cohttp_lwt_unix.Server.respond ~status:`No_content ~body:Cohttp_lwt.Body.empty () - | `DELETE, key when key <> "/" -> - let* self = get_vtpm_ref () in + | `DELETE, path when path <> "/" -> + let* self = with_xapi_vtpm ~cache vm_uuid in let* contents = with_xapi ~cache @@ VTPM.get_contents ~self in + let key = Tpm.key_of_swtpm path in let contents = - contents |> deserialize |> update_key key empty |> serialize + Tpm.(contents |> deserialize |> update key empty_state |> serialize) in let* () = with_xapi ~cache @@ VTPM.set_contents ~self ~contents in Cohttp_lwt_unix.Server.respond ~status:`No_content @@ -211,10 +173,11 @@ let serve_forever_lwt_callback_vtpm ~cache mutex vm_uuid _ req body = Cohttp_lwt_unix.Server.respond_string ~status:`Method_not_allowed ~body () (* Create a restricted RPC function and socket for a specific VM *) -let make_server_varstored ~cache path vm_uuid = +let make_server_varstored _persist ~cache path vm_uuid = + let vm_uuid_str = Uuidm.to_string vm_uuid in let module Server = Xapi_idl_guard_varstored.Interface.RPC_API (Rpc_lwt.GenServer ()) in - let get_vm_ref () = with_xapi ~cache @@ VM.get_by_uuid ~uuid:vm_uuid in + let get_vm_ref () = with_xapi ~cache @@ VM.get_by_uuid ~uuid:vm_uuid_str in let ret v = (* TODO: maybe map XAPI exceptions *) Lwt.bind v Lwt.return_ok |> Rpc_lwt.T.put @@ -236,7 +199,7 @@ let make_server_varstored ~cache path vm_uuid = (let* (_ : _ Ref.t) = with_xapi ~cache @@ Message.create ~name:"VM_SECURE_BOOT_FAILED" ~priority ~cls:`VM - ~obj_uuid:vm_uuid ~body + ~obj_uuid:vm_uuid_str ~body in Lwt.return_unit ) @@ -253,7 +216,9 @@ let make_server_varstored ~cache path vm_uuid = serve_forever_lwt_callback (Rpc_lwt.server Server.implementation) path |> serve_forever_lwt path -let make_server_vtpm_rest ~cache path vm_uuid = +let make_server_vtpm_rest read_write ~cache path vm_uuid = let mutex = Lwt_mutex.create () in - let callback = serve_forever_lwt_callback_vtpm ~cache mutex vm_uuid in + let callback = + serve_forever_lwt_callback_vtpm ~cache mutex read_write vm_uuid + in serve_forever_lwt path callback diff --git a/ocaml/xapi-guard/lib/types.ml b/ocaml/xapi-guard/lib/types.ml index 7b705c89a01..3f2b41c7682 100644 --- a/ocaml/xapi-guard/lib/types.ml +++ b/ocaml/xapi-guard/lib/types.ml @@ -3,3 +3,80 @@ module Service = struct let to_string = function Varstored -> "Varstored" | Swtpm -> "Swtpm" end + +module Tpm = struct + (* The TPM has 3 kinds of states *) + type t = { + permall: string (** permanent storage *) + ; savestate: string (** for ACPI S3 *) + ; volatilestate: string (** for snapshot/migration/etc. *) + } + + type key = Perm | Save | Volatile + + let key_of_swtpm = function + | "/tpm2-00.permall" -> + Perm + | "/tpm2-00.savestate" -> + Save + | "/tpm2-00.volatilestate" -> + Volatile + | s -> + Fmt.invalid_arg "Unknown TPM state key: %s" s + + let serialize_key = function Perm -> 0 | Save -> 1 | Volatile -> 2 + + let deserialize_key = function + | 0 -> + Perm + | 1 -> + Save + | 2 -> + Volatile + | s -> + Fmt.invalid_arg "Unknown TPM state key: %i" s + + let empty_state = "" + + let empty = {permall= ""; savestate= ""; volatilestate= ""} + + let split_char = ' ' + + let join_string = String.make 1 split_char + + let deserialize t = + match String.split_on_char split_char t with + | [permall] -> + (* backwards compat with reading tpm2-00.permall *) + {permall; savestate= ""; volatilestate= ""} + | [permall; savestate; volatilestate] -> + {permall; savestate; volatilestate} + | splits -> + Fmt.failwith "Invalid state: too many splits %d" (List.length splits) + + let serialize t = + (* it is assumed that swtpm has already base64 encoded this *) + String.concat join_string [t.permall; t.savestate; t.volatilestate] + + let lookup ~key t = + match key with + | Perm -> + t.permall + | Save -> + t.savestate + | Volatile -> + t.volatilestate + + let update key state t = + if String.contains state split_char then + Fmt.invalid_arg + "State to be stored (%d bytes) contained forbidden separator: %c" + (String.length state) split_char ; + match key with + | Perm -> + {t with permall= state} + | Save -> + {t with savestate= state} + | Volatile -> + {t with volatilestate= state} +end diff --git a/ocaml/xapi-guard/lib/types.mli b/ocaml/xapi-guard/lib/types.mli index 6bb036826d7..f210ea8c96a 100644 --- a/ocaml/xapi-guard/lib/types.mli +++ b/ocaml/xapi-guard/lib/types.mli @@ -5,3 +5,32 @@ module Service : sig val to_string : t -> string end + +module Tpm : sig + (** TPMs have 3 kind of states *) + type t + + (** key to access a single state *) + type key + + val key_of_swtpm : string -> key + (** [key_of_swtpm path] returns a state key represented by [path]. These paths + are parts of the requests generated by SWTPM and may contain slashes *) + + val deserialize_key : int -> key + + val serialize_key : key -> int + (** [serialize key] returns the state key represented by [key]. *) + + val empty : t + + val empty_state : string + + val deserialize : string -> t + + val serialize : t -> string + + val update : key -> string -> t -> t + + val lookup : key:key -> t -> string +end diff --git a/ocaml/xapi-guard/src/dune b/ocaml/xapi-guard/src/dune index dfbf8e9a9e4..baac1d24101 100644 --- a/ocaml/xapi-guard/src/dune +++ b/ocaml/xapi-guard/src/dune @@ -1,5 +1,6 @@ (executable (name main) + (modules main) (libraries astring cmdliner diff --git a/ocaml/xapi-guard/src/main.ml b/ocaml/xapi-guard/src/main.ml index b80bf354516..9fb40aa038b 100644 --- a/ocaml/xapi-guard/src/main.ml +++ b/ocaml/xapi-guard/src/main.ml @@ -18,6 +18,8 @@ open Xapi_guard_server module Types = Xapi_guard.Types module SessionCache = Xen_api_lwt_unix.SessionCache +let ( let@ ) f x = f x + let daemon_name = "xapi-guard" module D = Debug.Make (struct let name = daemon_name end) @@ -97,7 +99,7 @@ let () = Xen_api_lwt_unix.SessionCache.destroy cache ) -let listen_for_vm {Persistent.vm_uuid; path; gid; typ} = +let listen_for_vm read_write {Persistent.vm_uuid; path; gid; typ} = let make_server = match typ with | Varstored -> @@ -110,19 +112,25 @@ let listen_for_vm {Persistent.vm_uuid; path; gid; typ} = (Types.Service.to_string typ) path vm_uuid_str ; let* () = safe_unlink path in - let* stop_server = make_server ~cache path vm_uuid_str in + let* stop_server = make_server read_write ~cache path vm_uuid in let* () = log_fds () in Hashtbl.add sockets path (stop_server, (vm_uuid, gid, typ)) ; let* () = Lwt_unix.chmod path 0o660 in Lwt_unix.chown path 0 gid -let resume () = +let resume ~vtpm_read_write ~uefi_read_write () = let* vms = Persistent.loadfrom recover_path in - let+ () = Lwt_list.iter_p listen_for_vm vms in + let listen_to_vm = function + | Persistent.{typ= Varstored; _} as vm -> + listen_for_vm uefi_read_write vm + | Persistent.{typ= Swtpm; _} as vm -> + listen_for_vm vtpm_read_write vm + in + let+ () = Lwt_list.iter_p listen_to_vm vms in D.debug "%s: completed" __FUNCTION__ (* caller here is trusted (xenopsd through message-switch) *) -let depriv_varstored_create dbg vm_uuid gid path = +let depriv_varstored_create write_push dbg vm_uuid gid path = if Hashtbl.mem sockets path then Lwt.return_error (Xapi_idl_guard_privileged.Interface.InternalError @@ -134,7 +142,9 @@ let depriv_varstored_create dbg vm_uuid gid path = @@ ( D.debug "[%s] creating deprivileged socket at %s, owned by group %d" dbg path gid ; - let* () = listen_for_vm {Persistent.path; vm_uuid; gid; typ= Varstored} in + let* () = + listen_for_vm write_push {Persistent.path; vm_uuid; gid; typ= Varstored} + in store_args sockets ) @@ -156,7 +166,7 @@ let depriv_varstored_destroy dbg gid path = D.debug "[%s] stopped server for gid %d and removed socket" dbg gid ; Lwt.return_unit -let depriv_swtpm_create dbg vm_uuid gid path = +let depriv_swtpm_create read_write dbg vm_uuid gid path = if Hashtbl.mem sockets path then Lwt.return_error (Xapi_idl_guard_privileged.Interface.InternalError @@ -168,7 +178,9 @@ let depriv_swtpm_create dbg vm_uuid gid path = @@ ( D.debug "[%s] creating deprivileged socket at %s, owned by group %d" dbg path gid ; - let* () = listen_for_vm {Persistent.path; vm_uuid; gid; typ= Swtpm} in + let* () = + listen_for_vm read_write {Persistent.path; vm_uuid; gid; typ= Swtpm} + in store_args sockets ) @@ -197,6 +209,9 @@ let depriv_swtpm_destroy dbg gid path = Lwt.return_unit (* TODO: these 2 APIs need to be updated to go through the generic interface *) +(* These 2 functions are only reachable from message-switch. They are part of + the control plane and be called when xapi controls the lifecycle of a VM, so + it's OK to assume it's available. *) let vtpm_set_contents dbg vtpm_uuid contents = let open Xen_api_lwt_unix in @@ -215,55 +230,89 @@ let vtpm_get_contents _dbg vtpm_uuid = @@ let* self = Server_interface.with_xapi ~cache @@ VTPM.get_by_uuid ~uuid in Server_interface.with_xapi ~cache @@ VTPM.get_contents ~self -let rpc_fn = +let rpc_fn ~vtpm_read_write ~uefi_read_write = let module Server = Xapi_idl_guard_privileged.Interface.RPC_API (Rpc_lwt.GenServer ()) in (* bind APIs *) - Server.varstore_create depriv_varstored_create ; + Server.varstore_create (depriv_varstored_create uefi_read_write) ; Server.varstore_destroy depriv_varstored_destroy ; - Server.vtpm_create depriv_swtpm_create ; + Server.vtpm_create (depriv_swtpm_create vtpm_read_write) ; Server.vtpm_destroy depriv_swtpm_destroy ; Server.vtpm_set_contents vtpm_set_contents ; Server.vtpm_get_contents vtpm_get_contents ; Rpc_lwt.server Server.implementation -let process body = +let process ~vtpm_read_write ~uefi_read_write body = let+ response = Xapi_guard.Dorpc.wrap_rpc Xapi_idl_guard_privileged.Interface.E.error (fun () -> let call = Jsonrpc.call_of_string body in D.debug "Received request from message-switch, method %s" call.Rpc.name ; - rpc_fn call + rpc_fn ~vtpm_read_write ~uefi_read_write call ) in Jsonrpc.string_of_response response +let retry_forever fname f = + let rec loop () = + let* () = + Lwt.catch f (function exn -> + D.info "%s failed with %s, retrying..." fname (Printexc.to_string exn) ; + Lwt_unix.sleep 0.5 + ) + in + (loop [@tailcall]) () + in + loop () + +let cache_reader with_watcher = retry_forever "cache watcher" with_watcher + let make_message_switch_server () = + let* with_swtpm_cache, with_watch = + Xapi_guard.Disk_cache.( + setup Swtpm + Server_interface.(read_vtpm ~cache) + Server_interface.(push_vtpm ~cache) + ) + in let open Message_switch_lwt.Protocol_lwt in let wait_server, server_stopped = Lwt.task () in - let* result = - Server.listen ~process ~switch:!Xcp_client.switch_path - ~queue:Xapi_idl_guard_privileged.Interface.queue_name () + let@ vtpm_read_write = with_swtpm_cache in + let uefi_read_write = + (* This is unused for the time being, added to be consistent with both + interfaces *) + ((fun _ -> Lwt.return ""), fun _ _ -> Lwt.return_unit) in - match Server.error_to_msg result with - | Ok t -> - Lwt_switch.add_hook (Some Server_interface.shutdown) (fun () -> - D.debug "Stopping message-switch queue server" ; - let+ () = Server.shutdown ~t () in - Lwt.wakeup server_stopped () - ) ; - (* best effort resume *) - let* () = - Lwt.catch resume (fun e -> - D.log_backtrace () ; - D.warn "Resume failed: %s" (Printexc.to_string e) ; - Lwt.return_unit - ) - in - wait_server - | Error (`Msg m) -> - Lwt.fail_with - (Printf.sprintf "Failed to listen on message-switch queue: %s" m) + let server = + let* result = + Server.listen + ~process:(process ~vtpm_read_write ~uefi_read_write) + ~switch:!Xcp_client.switch_path + ~queue:Xapi_idl_guard_privileged.Interface.queue_name () + in + match Server.error_to_msg result with + | Ok t -> + Lwt_switch.add_hook (Some Server_interface.shutdown) (fun () -> + D.debug "Stopping message-switch queue server" ; + let+ () = Server.shutdown ~t () in + Lwt.wakeup server_stopped () + ) ; + (* best effort resume *) + let* () = + Lwt.catch (resume ~vtpm_read_write ~uefi_read_write) (fun e -> + D.log_backtrace () ; + D.warn "Resume failed: %s" (Printexc.to_string e) ; + Lwt.return_unit + ) + in + wait_server + | Error (`Msg m) -> + Lwt.fail_with + (Printf.sprintf "Failed to listen on message-switch queue: %s" m) + in + let reader = cache_reader with_watch in + let* _ = Lwt.all [server; reader] in + Lwt.return_unit let main log_level = Debug.set_level log_level ; diff --git a/ocaml/xapi-guard/test/cache_test.ml b/ocaml/xapi-guard/test/cache_test.ml new file mode 100644 index 00000000000..97b144839a6 --- /dev/null +++ b/ocaml/xapi-guard/test/cache_test.ml @@ -0,0 +1,204 @@ +let ( let@ ) f x = f x + +let ( let* ) = Lwt.bind + +module Tpm = Xapi_guard.Types.Tpm + +module TPMs = struct + let writes_created = Atomic.make 1 + + let reads_created = Atomic.make 1 + + let request_persist uuid write = + let __FUN = __FUNCTION__ in + + let key = Tpm.deserialize_key (Random.int 3) in + + let time = Mtime_clock.now () in + let serial_n = Atomic.fetch_and_add writes_created 1 in + let contents = + Printf.sprintf "contents %s" (Mtime.to_uint64_ns time |> Int64.to_string) + in + let* () = + Logs_lwt.app (fun m -> + m "%s: Write № %i requested: %a/%i/%a" __FUN serial_n Uuidm.pp uuid + Tpm.(serialize_key key) + Mtime.pp time + ) + in + write (uuid, time, key) contents + + let request_read uuid read = + let __FUN = __FUNCTION__ in + + let key = Tpm.deserialize_key (Random.int 3) in + + let time = Mtime_clock.now () in + let serial_n = Atomic.fetch_and_add reads_created 1 in + let* () = + Logs_lwt.app (fun m -> + m "%s: Read № %i requested: %a/%i/%a" __FUN serial_n Uuidm.pp uuid + Tpm.(serialize_key key) + Mtime.pp time + ) + in + let* () = Lwt_unix.sleep 0.05 in + read (uuid, time, key) +end + +let lwt_reporter () = + let buf_fmt ~like = + let b = Buffer.create 512 in + ( Fmt.with_buffer ~like b + , fun () -> + let m = Buffer.contents b in + Buffer.reset b ; m + ) + in + let app, app_flush = buf_fmt ~like:Fmt.stdout in + let dst, dst_flush = buf_fmt ~like:Fmt.stderr in + let reporter = Logs_fmt.reporter ~app ~dst () in + let report src level ~over k msgf = + let k () = + let write () = + match level with + | Logs.App -> + Lwt_io.write Lwt_io.stdout (app_flush ()) + | _ -> + Lwt_io.write Lwt_io.stderr (dst_flush ()) + in + let unblock () = over () ; Lwt.return_unit in + Lwt.finalize write unblock |> Lwt.ignore_result ; + k () + in + reporter.Logs.report src level ~over:(fun () -> ()) k msgf + in + {Logs.report} + +let setup_log level = + Logs.set_level level ; + Logs.set_reporter (lwt_reporter ()) ; + () + +let ok = Lwt_result.ok + +let retry_forever fname f = + let rec loop () = + let* () = + Lwt.catch f (function exn -> + let* () = + Logs_lwt.app (fun m -> + m "%s failed with %s, retrying..." fname (Printexc.to_string exn) + ) + in + Lwt_unix.sleep 0.5 + ) + in + (loop [@tailcall]) () + in + loop () + +let max_writes = 128 + +let max_reads = 500_000 + +let received_writes = ref 0 + +let received_reads = ref 0 + +let throttled_reads = Mtime.Span.(200 * ms) + +let failing_writes_period = Mtime.Span.(500 * ms) + +let epoch = Mtime_clock.now () + +let should_fail () : bool = + let rec polarity elapsed = + if Mtime.Span.compare elapsed failing_writes_period < 0 then + true + else + not (polarity Mtime.Span.(abs_diff elapsed failing_writes_period)) + in + let elapsed = Mtime.span epoch (Mtime_clock.now ()) in + polarity elapsed + +let log_write (uuid, timestamp, key) content = + let __FUN = __FUNCTION__ in + let ( let* ) = Lwt_result.bind in + let maybe_fail () = + if should_fail () then + Lwt_result.fail + (failwith (Printf.sprintf {|oops, could not write '%s'|} content)) + else + Lwt_result.return () + in + let* () = maybe_fail () in + received_writes := !received_writes + 1 ; + Logs_lwt.app (fun m -> + m "%s Write № %i detected: %a/%i/%a" __FUN !received_writes Uuidm.pp uuid + Tpm.(serialize_key key) + Mtime.pp timestamp + ) + |> ok + +let log_read (uuid, timestamp, key) = + let __FUN = __FUNCTION__ in + let ( let* ) = Lwt_result.bind in + received_reads := !received_reads + 1 ; + let* () = + Logs_lwt.app (fun m -> + m "%s Read to source № %i detected: %a/%i/%a" __FUN !received_reads + Uuidm.pp uuid + Tpm.(serialize_key key) + Mtime.pp timestamp + ) + |> ok + in + Lwt_result.return "yes" + +let to_cache with_read_writes = + let __FUN = __FUNCTION__ in + let elapsed = Mtime_clock.counter () in + let persist uuid (_, write_tpm) = TPMs.request_persist uuid write_tpm in + let read uuid (read_tpm, _) = + let* contents = TPMs.request_read uuid read_tpm in + Logs_lwt.app (fun m -> m "%s Read received: '%s'" __FUN contents) + in + let rec loop_and_stop f name uuid max sent = + let sent = sent + 1 in + let@ read_write = with_read_writes in + let* () = f uuid read_write in + if sent >= max then + Logs_lwt.app (fun m -> + m "%s: Stopping requests after %i %ss" __FUN sent name + ) + else if Mtime.Span.compare (Mtime_clock.count elapsed) throttled_reads > 0 + then + let* () = Lwt_unix.sleep 0.1 in + loop_and_stop f name uuid max sent + else + let* () = Lwt.pause () in + loop_and_stop f name uuid max sent + in + let vms = List.init 4 (fun _ -> Uuidm.(v `V4)) in + + List.concat + [ + List.map (fun uuid -> loop_and_stop persist "write" uuid max_writes 0) vms + ; List.map (fun uuid -> loop_and_stop read "read" uuid max_reads 0) vms + ] + +let from_cache with_watcher = retry_forever "watcher" with_watcher + +let main () = + let* with_read_writes, with_watcher = + Xapi_guard.Disk_cache.(setup Swtpm log_read log_write) + in + let reader = from_cache with_watcher in + let writers = to_cache with_read_writes in + let* _ = Lwt.all (reader :: writers) in + Lwt.return_unit + +let () = + setup_log @@ Some Logs.Debug ; + Lwt_main.run (main ()) diff --git a/ocaml/xapi-guard/test/cache_test.mli b/ocaml/xapi-guard/test/cache_test.mli new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ocaml/xapi-guard/test/dune b/ocaml/xapi-guard/test/dune index 934256a9f7a..e082a47a690 100644 --- a/ocaml/xapi-guard/test/dune +++ b/ocaml/xapi-guard/test/dune @@ -1,6 +1,7 @@ (test (name xapi_guard_test) (modes exe) + (modules (:standard \ cache_test)) (libraries alcotest alcotest-lwt @@ -17,3 +18,18 @@ xen-api-client-lwt) (package varstored-guard) ) + +(executable + (name cache_test) + (modules cache_test) + (libraries + logs + logs.fmt + logs.lwt + lwt + lwt.unix + mtime + mtime.clock.os + uuidm + xapi_guard) + (preprocess (pps ppx_deriving_rpc))) diff --git a/ocaml/xapi-guard/test/xapi_guard_test.ml b/ocaml/xapi-guard/test/xapi_guard_test.ml index 41ce8f6e347..86efb713d29 100644 --- a/ocaml/xapi-guard/test/xapi_guard_test.ml +++ b/ocaml/xapi-guard/test/xapi_guard_test.ml @@ -60,7 +60,9 @@ let xapi_rpc call = | _ -> Fmt.failwith "XAPI RPC call %s not expected in test" call.Rpc.name -let vm_uuid = Uuidx.(to_string (make ())) +let vm_uuid = Uuidm.v `V4 + +let vm_uuid_str = Uuidm.to_string vm_uuid let () = let old_hook = !Lwt.async_exception_hook in @@ -78,9 +80,10 @@ let with_rpc f switch () = in (Lwt_switch.add_hook (Some switch) @@ fun () -> SessionCache.destroy cache) ; let path = Filename.concat tmp "socket" in + let push_nothing _ = Lwt_result.return () in (* Create an internal server on 'path', the socket that varstored would connect to *) let* stop_server = - Server_interface.make_server_varstored ~cache path vm_uuid + Server_interface.make_server_varstored push_nothing ~cache path vm_uuid in (* rpc simulates what varstored would do *) let uri = Uri.make ~scheme:"file" ~path () |> Uri.to_string in @@ -101,7 +104,7 @@ let with_rpc f switch () = let dict = Alcotest.(list @@ pair string string) let test_change_nvram ~rpc ~session_id () = - let* self = VM.get_by_uuid ~rpc ~session_id ~uuid:vm_uuid in + let* self = VM.get_by_uuid ~rpc ~session_id ~uuid:vm_uuid_str in let* nvram0 = VM.get_NVRAM ~rpc ~session_id ~self in Alcotest.(check' dict) ~msg:"nvram initial" ~expected:[] ~actual:nvram0 ; let contents = "nvramnew" in @@ -131,7 +134,7 @@ let test_bad_set_nvram ~rpc ~session_id () = let* () = VM.set_NVRAM_EFI_variables ~rpc ~session_id ~self:vm_bad ~value:"bad" in - let* vm_ref = VM.get_by_uuid ~rpc ~session_id ~uuid:vm_uuid in + let* vm_ref = VM.get_by_uuid ~rpc ~session_id ~uuid:vm_uuid_str in let* nvram = VM.get_NVRAM ~rpc ~session_id ~self:vm_ref in Alcotest.(check' dict) ~msg:"only managed to change own nvram" ~actual:nvram diff --git a/ocaml/xapi/xapi_fist.ml b/ocaml/xapi/xapi_fist.ml index 7798713e494..4f211185a92 100644 --- a/ocaml/xapi/xapi_fist.ml +++ b/ocaml/xapi/xapi_fist.ml @@ -160,3 +160,5 @@ let int_seed name : int option = let exchange_certificates_in_pool () : int option = let name = "exchange_certificates_in_pool" in int_seed name + +let disable_xapi_guard_cache () = fistpoint "disable_xapi_guard_cache"