diff --git a/src/not-so-smart/fetch.ml b/src/not-so-smart/fetch.ml index 585a4fb17..fbd091ded 100644 --- a/src/not-so-smart/fetch.ml +++ b/src/not-so-smart/fetch.ml @@ -75,8 +75,6 @@ struct List.fold_left fold [] have |> List.split module V1 = struct - module Smart_flow = State_flow.Make (Smart) - let fetch ?(uses_git_transport = false) ?(push_stdout = ignore) ?(push_stderr = ignore) ~capabilities ?deepen ?want:(refs = `None) ~host path flow store access fetch_cfg push_pack = @@ -106,7 +104,7 @@ struct let ctx = Smart.Context.make ~client_caps in - Smart_flow.run sched io_raise io flow (prelude ctx) |> prj + State_flow.run sched io_raise Smart.pp_error io flow (prelude ctx) |> prj >>= fun (uids, refs) -> let hex = { Neg.to_hex = Uid.to_hex; of_hex = Uid.of_hex; compare = Uid.compare } @@ -131,7 +129,9 @@ struct if res < 0 then Log.warn (fun m -> m "No common commits"); let rec read_pack () = Log.debug (fun m -> m "Reading PACK file..."); - Smart_flow.run sched io_raise io flow (recv_pack_state ctx) |> prj + State_flow.run sched io_raise Smart.pp_error io flow + (recv_pack_state ctx) + |> prj >>= fun should_continue -> if should_continue then read_pack () else return () in @@ -140,8 +140,6 @@ struct end module V2 = struct - module State_flow = State_flow.Make (Wire_proto_v2) - let connect ?(uses_git_transport = false) ~host ~path ctx = let open Wire_proto_v2.Syntax in let return = Wire_proto_v2.return in @@ -162,7 +160,9 @@ struct let* () = Wire_proto_v2.send ctx Flush () in Wire_proto_v2.return caps in - State_flow.run sched io_raise io flow (get_caps ctx) |> prj + State_flow.run sched io_raise Wire_proto_v2.pp_error io flow + (get_caps ctx) + |> prj let ls_refs_request ?(uses_git_transport = false) ~host ~path ctx flow req = let ls_refs_resp = @@ -172,6 +172,7 @@ struct let* () = Wire_proto_v2.send ctx Ls_refs_req (`Client_caps caps, req) in Wire_proto_v2.recv ctx Ls_refs_res in - State_flow.run sched io_raise io flow ls_refs_resp |> prj + State_flow.run sched io_raise Wire_proto_v2.pp_error io flow ls_refs_resp + |> prj end end diff --git a/src/not-so-smart/find_common.ml b/src/not-so-smart/find_common.ml index d425ad1cf..5407c90c5 100644 --- a/src/not-so-smart/find_common.ml +++ b/src/not-so-smart/find_common.ml @@ -61,8 +61,6 @@ let io_monad (type t) { bind; return } = with Smart.v1 and implement a state of the art synchronisation algorithm, I translated as is [fetch-pack.c:find_common] in OCaml. *) -module Smart_flow = State_flow.Make (Smart) - let tips (type t) scheduler access store negotiator = let open (val io_monad scheduler : Io_monad with type s = t) in access.locals store >>= fun ref_lst -> @@ -76,13 +74,15 @@ let consume_shallow_list (type t) scheduler io flow cfg deepen { of_hex; _ } ctx = let open (val io_monad scheduler : Io_monad with type s = t) in if cfg.stateless && Option.is_some deepen then - Smart_flow.run scheduler raise io flow Smart.(recv ctx shallows) + State_flow.run scheduler raise Smart.pp_error io flow + Smart.(recv ctx shallows) >>| fun shallows -> List.map (Smart.Shallow.map ~f:of_hex) shallows else return [] let handle_shallow (type t) scheduler io flow { of_hex; _ } access store ctx = let open (val io_monad scheduler : Io_monad with type s = t) in - Smart_flow.run scheduler raise io flow Smart.(recv ctx shallows) + State_flow.run scheduler raise Smart.pp_error io flow + Smart.(recv ctx shallows) >>= fun shallows -> let shallows = List.map (Smart.Shallow.map ~f:of_hex) shallows in fold_left_s shallows ~init:() ~f:(fun () -> function @@ -115,13 +115,14 @@ let find_common (type t) scheduler io flow cfg >>= function | [] -> Log.debug (fun m -> m "Nothing to download."); - Smart_flow.run scheduler raise io flow Smart.(send ctx flush ()) + State_flow.run scheduler raise Smart.pp_error io flow + Smart.(send ctx flush ()) >>= fun () -> return `Close | (uid, _) :: others as refs -> Log.debug (fun m -> m "We want %d commit(s)." (List.length refs)); access.shallowed store >>= fun shallowed -> let shallowed = List.map to_hex shallowed in - Smart_flow.run scheduler raise io flow + State_flow.run scheduler raise Smart.pp_error io flow Smart.( let uid = to_hex uid in let others = List.map (fun (uid, _) -> to_hex uid) others in @@ -164,7 +165,8 @@ let find_common (type t) scheduler io flow cfg m "count: %d, in-vain: %d, flush-at: %d.\n%!" !count !in_vain !flush_at); if !flush_at <= !count then ( - Smart_flow.run scheduler raise io flow Smart.(send ctx flush ()) + State_flow.run scheduler raise Smart.pp_error io flow + Smart.(send ctx flush ()) >>= fun () -> incr flushes; flush_at := next_flush stateless !count; @@ -173,7 +175,8 @@ let find_common (type t) scheduler io flow cfg consume_shallow_list scheduler io flow cfg None hex ctx >>= fun _shallows -> let rec loop () = - Smart_flow.run scheduler raise io flow Smart.(recv ctx ack) + State_flow.run scheduler raise Smart.pp_error io flow + Smart.(recv ctx ack) >>| Smart.Negotiation.map ~f:of_hex >>= fun ack -> match ack with @@ -238,7 +241,7 @@ let find_common (type t) scheduler io flow cfg Log.debug (fun m -> m "Negotiation (got ready: %b, no-done: %b)." !got_ready no_done); (if (not !got_ready) || not no_done then - Smart_flow.run scheduler raise io flow + State_flow.run scheduler raise Smart.pp_error io flow Smart.(send ctx negotiation_done ()) else return ()) >>= fun () -> @@ -247,14 +250,16 @@ let find_common (type t) scheduler io flow cfg incr flushes); (if (not !got_ready) || not no_done then ( Log.debug (fun m -> m "Negotiation is done!"); - Smart_flow.run scheduler raise io flow Smart.(recv ctx shallows) + State_flow.run scheduler raise Smart.pp_error io flow + Smart.(recv ctx shallows) >>= fun _shallows -> return ()) else return ()) >>= fun () -> let rec go () = if !flushes > 0 || cfg.multi_ack = `Some || cfg.multi_ack = `Detailed then ( - Smart_flow.run scheduler raise io flow Smart.(recv ctx ack) + State_flow.run scheduler raise Smart.pp_error io flow + Smart.(recv ctx ack) >>| Smart.Negotiation.map ~f:of_hex >>= fun ack -> match ack with diff --git a/src/not-so-smart/push.ml b/src/not-so-smart/push.ml index 7d0b56666..aa6264408 100644 --- a/src/not-so-smart/push.ml +++ b/src/not-so-smart/push.ml @@ -39,8 +39,6 @@ struct pp_error = Flow.pp_error; } - module Smart_flow = State_flow.Make (Smart) - let push ?(uses_git_transport = true) ~capabilities:client_caps cmds ~host path flow store access push_cfg pack = let fiber ctx = @@ -57,7 +55,7 @@ struct return (Smart.Advertised_refs.map ~fuid:Uid.of_hex ~fref:Ref.v v) in let ctx = Smart.Context.make ~client_caps in - Smart_flow.run sched fail io flow (fiber ctx) |> prj + State_flow.run sched fail Smart.pp_error io flow (fiber ctx) |> prj >>= fun advertised_refs -> Pck.commands sched ~capabilities:(Smart.Advertised_refs.capabilities advertised_refs) @@ -66,10 +64,12 @@ struct |> prj >>= function | None -> - Smart_flow.run sched fail io flow Smart.(send ctx flush ()) |> prj + State_flow.run sched fail Smart.pp_error io flow + Smart.(send ctx flush ()) + |> prj >>= fun () -> return () | Some cmds -> ( - Smart_flow.run sched fail io flow + State_flow.run sched fail Smart.pp_error io flow Smart.( send ctx commands (Commands.map ~fuid:Uid.to_hex ~fref:Ref.to_string cmds)) @@ -101,14 +101,16 @@ struct Log.debug (fun m -> m "report-status capability: %b." report_status); if report_status then - Smart_flow.run sched fail io flow Smart.(recv ctx status) + State_flow.run sched fail Smart.pp_error io flow + Smart.(recv ctx status) |> prj >>| Smart.Status.map ~f:Ref.v else let cmds = List.map R.ok (Smart.Commands.commands cmds) in return (Smart.Status.v cmds) | Some payload -> - Smart_flow.run sched fail io flow Smart.(send ctx pack payload) + State_flow.run sched fail Smart.pp_error io flow + Smart.(send ctx pack payload) |> prj >>= fun () -> go () in diff --git a/src/not-so-smart/smart.ml b/src/not-so-smart/smart.ml index b462d549b..4b5690111 100644 --- a/src/not-so-smart/smart.ml +++ b/src/not-so-smart/smart.ml @@ -50,6 +50,10 @@ module Value = struct type error = [ Protocol.Encoder.error | Protocol.Decoder.error ] + let pp_error ppf = function + | #Protocol.Encoder.error as err -> Protocol.Encoder.pp_error ppf err + | #Protocol.Decoder.error as err -> Protocol.Decoder.pp_error ppf err + let encode : type a. encoder -> a send -> a -> (unit, [> Encoder.error ]) State.t = fun encoder w v -> @@ -137,10 +141,6 @@ let send_advertised_refs : _ send = Advertised_refs include State.Scheduler (Value) -let pp_error ppf = function - | #Protocol.Encoder.error as err -> Protocol.Encoder.pp_error ppf err - | #Protocol.Decoder.error as err -> Protocol.Decoder.pp_error ppf err - module Unsafe = struct let write context packet = let encoder = Context.encoder context in diff --git a/src/not-so-smart/state.ml b/src/not-so-smart/state.ml index 371e23776..231ba7249 100644 --- a/src/not-so-smart/state.ml +++ b/src/not-so-smart/state.ml @@ -29,6 +29,7 @@ module type VALUE = sig type encoder type decoder + val pp_error : error Fmt.t val encode : encoder -> 'a send -> 'a -> (unit, error) t val decode : decoder -> 'a recv -> ('a, error) t end @@ -77,6 +78,8 @@ module Scheduler struct type error = Value.error + let pp_error = Value.pp_error + let bind : ('a, 'err) t -> f:('a -> ('b, 'err) t) -> ('b, 'err) t = let rec bind' m ~f = match m with diff --git a/src/not-so-smart/state.mli b/src/not-so-smart/state.mli index 9b382edbf..e864d2bfc 100644 --- a/src/not-so-smart/state.mli +++ b/src/not-so-smart/state.mli @@ -28,6 +28,7 @@ module type VALUE = sig type encoder type decoder + val pp_error : error Fmt.t val encode : encoder -> 'a send -> 'a -> (unit, error) t val decode : decoder -> 'a recv -> ('a, error) t end @@ -57,6 +58,7 @@ module Scheduler and type decoder = Context.decoder) : sig type error = Value.error + val pp_error : error Fmt.t val return : 'v -> ('v, 'err) t val bind : ('a, 'err) t -> f:('a -> ('b, 'err) t) -> ('b, 'err) t val ( >>= ) : ('a, 'err) t -> ('a -> ('b, 'err) t) -> ('b, 'err) t diff --git a/src/not-so-smart/state_flow.ml b/src/not-so-smart/state_flow.ml index a9bb6ef67..f32ac3796 100644 --- a/src/not-so-smart/state_flow.ml +++ b/src/not-so-smart/state_flow.ml @@ -7,70 +7,60 @@ let io_buffer_size = 65536 type ('a, 's) raise = exn -> ('a, 's) io -module Make (Read_write : sig - type ('a, 'err) t = ('a, 'err) State.t - type error +let run : + type fl s err. + s scheduler -> + ('a, s) raise -> + err Fmt.t -> + (fl, 'error, s) flow -> + fl -> + ('res, [ `Protocol of err ]) State.t -> + ('res, s) io = + fun scheduler io_raise pp_error flow_ops flow state -> + let { bind; return } = scheduler in + let ( >>= ) = bind in - val pp_error : error Fmt.t -end) = -struct - type nonrec error = Read_write.error + let failwithf fmt = + Format.kasprintf (fun err -> io_raise (Failure err)) fmt + in - let run : - type fl s. - s scheduler -> - ('a, s) raise -> - (fl, 'error, s) flow -> - fl -> - ('res, [ `Protocol of error ]) Read_write.t -> - ('res, s) io = - fun scheduler io_raise flow_ops flow state -> - let { bind; return } = scheduler in - let ( >>= ) = bind in + let cbuff = Cstruct.create io_buffer_size in - let failwithf fmt = - Format.kasprintf (fun err -> io_raise (Failure err)) fmt - in + let rec unwrap = function + | State.Return v -> + Log.debug (fun m -> m "got return "); + return v + | Error (`Protocol err) -> + Log.err (fun m -> m "Got a protocol error: %a." pp_error err); + failwithf "%a" pp_error err + | Read { k; buffer; off; len; eof } -> ( + let rd_n_bytes = min (Cstruct.len cbuff) len in + Log.debug (fun m -> m "Start to read %d byte(s)." rd_n_bytes); + flow_ops.recv flow (Cstruct.sub cbuff 0 rd_n_bytes) >>= function + | Ok `End_of_flow -> + Log.debug (fun m -> m "Got end of input."); + unwrap (eof ()) + | Ok (`Input len) -> + Log.debug (fun m -> m "Got %d/%d byte(s)." len rd_n_bytes); + Cstruct.blit_to_bytes cbuff 0 buffer off len; + unwrap (k len) + | Error err -> + Log.err (fun m -> m "Got an error: %a." flow_ops.pp_error err); + failwithf "%a" flow_ops.pp_error err) + | Write { k; buffer; off; len } -> + (* TODO: almost always we can write in one go instead of calling a loop, + so we should try writing and call loop if we aren't done *) + let rec loop tmp = + if Cstruct.is_empty tmp then unwrap (k len) + else + flow_ops.send flow tmp >>= function + | Ok shift -> + Log.debug (fun m -> + m "Wrote %d byte(s). %s" shift (Cstruct.to_string tmp)); + loop (Cstruct.shift tmp shift) + | Error err -> failwithf "%a" flow_ops.pp_error err + in + Cstruct.of_string buffer ~off ~len |> loop + in - let cbuff = Cstruct.create io_buffer_size in - - let rec unwrap = function - | State.Return v -> - Log.debug (fun m -> m "got return "); - return v - | Error (`Protocol err) -> - Log.err (fun m -> - m "Got a protocol error: %a." Read_write.pp_error err); - failwithf "%a" Read_write.pp_error err - | Read { k; buffer; off; len; eof } -> ( - let rd_n_bytes = min (Cstruct.len cbuff) len in - Log.debug (fun m -> m "Start to read %d byte(s)." rd_n_bytes); - flow_ops.recv flow (Cstruct.sub cbuff 0 rd_n_bytes) >>= function - | Ok `End_of_flow -> - Log.debug (fun m -> m "Got end of input."); - unwrap (eof ()) - | Ok (`Input len) -> - Log.debug (fun m -> m "Got %d/%d byte(s)." len rd_n_bytes); - Cstruct.blit_to_bytes cbuff 0 buffer off len; - unwrap (k len) - | Error err -> - Log.err (fun m -> m "Got an error: %a." flow_ops.pp_error err); - failwithf "%a" flow_ops.pp_error err) - | Write { k; buffer; off; len } -> - (* TODO: almost always we can write in one go instead of calling a loop, - so we should try writing and call loop if we aren't done *) - let rec loop tmp = - if Cstruct.is_empty tmp then unwrap (k len) - else - flow_ops.send flow tmp >>= function - | Ok shift -> - Log.debug (fun m -> - m "Wrote %d byte(s). %s" shift (Cstruct.to_string tmp)); - loop (Cstruct.shift tmp shift) - | Error err -> failwithf "%a" flow_ops.pp_error err - in - Cstruct.of_string buffer ~off ~len |> loop - in - - unwrap state -end + unwrap state diff --git a/src/not-so-smart/state_flow.mli b/src/not-so-smart/state_flow.mli index bab268386..75e05f2ea 100644 --- a/src/not-so-smart/state_flow.mli +++ b/src/not-so-smart/state_flow.mli @@ -4,21 +4,11 @@ val io_buffer_size : int type ('a, 's) raise = exn -> ('a, 's) Sigs.io -module Make : functor - (Read_write : sig - type ('a, 'err) t = ('a, 'err) State.t - type error - - val pp_error : error Fmt.t - end) - -> sig - type nonrec error = Read_write.error - - val run : - 's Sigs.scheduler -> - ('res, 's) raise -> - ('fl, 'error, 's) Sigs.flow -> - 'fl -> - ('res, [ `Protocol of error ]) State.t -> - ('res, 's) Sigs.io -end +val run : + 's Sigs.scheduler -> + ('res, 's) raise -> + 'err Fmt.t -> + ('fl, 'error, 's) Sigs.flow -> + 'fl -> + ('res, [ `Protocol of 'err ]) State.t -> + ('res, 's) Sigs.io diff --git a/src/not-so-smart/wire_proto_v2.ml b/src/not-so-smart/wire_proto_v2.ml index 0fa3c7063..691ade9bb 100644 --- a/src/not-so-smart/wire_proto_v2.ml +++ b/src/not-so-smart/wire_proto_v2.ml @@ -38,6 +38,12 @@ module Value = struct type encoder = Pkt_line.Encoder.encoder type decoder = Pkt_line.Decoder.decoder + let pp_error ppf = function + | #Proto_vals_v2.Encoder.error as err -> + Proto_vals_v2.Encoder.pp_error ppf err + | #Proto_vals_v2.Decoder.error as err -> + Proto_vals_v2.Decoder.pp_error ppf err + let encode : type a. encoder -> a send -> a -> (unit, error) State.t = fun encoder w v -> let encoder_state = @@ -78,9 +84,3 @@ module Value = struct end include State.Scheduler (Value) - -let pp_error ppf = function - | #Proto_vals_v2.Encoder.error as err -> - Proto_vals_v2.Encoder.pp_error ppf err - | #Proto_vals_v2.Decoder.error as err -> - Proto_vals_v2.Decoder.pp_error ppf err