Skip to content

Commit

Permalink
refactor: Remove pipeline abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed Jan 30, 2024
1 parent 61d16d0 commit 95a535c
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 138 deletions.
1 change: 0 additions & 1 deletion src/HttpHandler/Builder.fs → src/Builder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

namespace Oryx

open Oryx.Pipeline

type RequestBuilder() =
member _.Zero() : HttpHandler<unit> = httpRequest
Expand Down
120 changes: 52 additions & 68 deletions src/Pipeline/Core.fs → src/Core.fs
Original file line number Diff line number Diff line change
@@ -1,35 +1,34 @@
// Copyright 2020 Cognite AS
// SPDX-License-Identifier: Apache-2.0

namespace Oryx.Pipeline
namespace Oryx

open System.Threading.Tasks

open FSharp.Control.TaskBuilder
open FsToolkit.ErrorHandling
open Oryx

type IAsyncNext<'TContext, 'TSource> =
abstract member OnSuccessAsync: ctx: 'TContext * content: 'TSource -> Task<unit>
abstract member OnErrorAsync: ctx: 'TContext * error: exn -> Task<unit>
abstract member OnCancelAsync: ctx: 'TContext -> Task<unit>
type IHttpNext<'TSource> =
abstract member OnSuccessAsync: ctx: HttpContext * content: 'TSource -> Task<unit>
abstract member OnErrorAsync: ctx: HttpContext * error: exn -> Task<unit>
abstract member OnCancelAsync: ctx: HttpContext -> Task<unit>

type Pipeline<'TContext, 'TSource> = IAsyncNext<'TContext, 'TSource> -> Task<unit>
type HttpHandler<'TResult> = IHttpNext<'TResult> -> Task<unit>

module Core =
/// Swap first with last arg so we can pipe onSuccess
let swapArgs fn = fun a b c -> fn c a b

/// A next continuation for observing the final result.
let finish<'TContext, 'TResult> (tcs: TaskCompletionSource<'TResult>) : IAsyncNext<'TContext, 'TResult> =
let finish<'TResult> (tcs: TaskCompletionSource<'TResult>) : IHttpNext<'TResult> =

{ new IAsyncNext<'TContext, 'TResult> with
{ new IHttpNext<'TResult> with
member x.OnSuccessAsync(_, response) = task { tcs.SetResult response }
member x.OnErrorAsync(ctx, error) = task { tcs.SetException error }
member x.OnCancelAsync(ctx) = task { tcs.SetCanceled() } }

/// Run the HTTP handler in the given context. Returns content and throws exception if any error occured.
let runUnsafeAsync<'TContext, 'TResult> (handler: Pipeline<'TContext, 'TResult>) : Task<'TResult> =
let runUnsafeAsync<'TResult> (handler: HttpHandler<'TResult>) : Task<'TResult> =
let tcs = TaskCompletionSource<'TResult>()

task {
Expand All @@ -38,7 +37,7 @@ module Core =
}

/// Run the HTTP handler in the given context. Returns content as result type.
let runAsync<'TContext, 'TResult> (handler: Pipeline<'TContext, 'TResult>) : Task<Result<'TResult, exn>> =
let runAsync<'TResult> (handler: HttpHandler<'TResult>) : Task<Result<'TResult, exn>> =
task {
try
let! value = runUnsafeAsync handler
Expand All @@ -48,18 +47,15 @@ module Core =
}

/// Produce the given content.
let singleton<'TContext, 'TSource> (ctx: 'TContext) (content: 'TSource) : Pipeline<'TContext, 'TSource> =
let singleton<'TSource> (ctx: HttpContext) (content: 'TSource) : HttpHandler<'TSource> =
fun next -> next.OnSuccessAsync(ctx, content)

/// Map the content of the middleware.
let map<'TContext, 'TSource, 'TResult>
(mapper: 'TSource -> 'TResult)
(source: Pipeline<'TContext, 'TSource>)
: Pipeline<'TContext, 'TResult> =
let map<'TSource, 'TResult> (mapper: 'TSource -> 'TResult) (source: HttpHandler<'TSource>) : HttpHandler<'TResult> =

fun next ->
//fun ctx content -> succes ctx (mapper content)
{ new IAsyncNext<'TContext, 'TSource> with
//fun ctx content -> success ctx (mapper content)
{ new IHttpNext<'TSource> with
member _.OnSuccessAsync(ctx, content) =
try
next.OnSuccessAsync(ctx, mapper content)
Expand All @@ -71,12 +67,12 @@ module Core =
|> source

/// Bind the content of the middleware.
let bind<'TContext, 'TSource, 'TResult>
(fn: 'TSource -> Pipeline<'TContext, 'TResult>)
(source: Pipeline<'TContext, 'TSource>)
: Pipeline<'TContext, 'TResult> =
let bind<'TSource, 'TResult>
(fn: 'TSource -> HttpHandler<'TResult>)
(source: HttpHandler<'TSource>)
: HttpHandler<'TResult> =
fun next ->
{ new IAsyncNext<'TContext, 'TSource> with
{ new IHttpNext<'TSource> with
member _.OnSuccessAsync(ctx, content) =
task {
let handler = fn content
Expand All @@ -87,19 +83,19 @@ module Core =
member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) }
|> source

let concurrent<'TContext, 'TSource, 'TResult>
(merge: 'TContext list -> 'TContext)
(handlers: seq<Pipeline<'TContext, 'TResult>>)
: Pipeline<'TContext, 'TResult list> =
let concurrent<'TSource, 'TResult>
(merge: HttpContext list -> HttpContext)
(handlers: seq<HttpHandler<'TResult>>)
: HttpHandler<'TResult list> =
fun next ->
task {
let res: Result<'TContext * 'TResult, 'TContext * exn> array =
let res: Result<HttpContext * 'TResult, HttpContext * exn> array =
Array.zeroCreate (Seq.length handlers)

let obv n ctx content = task { res.[n] <- Ok(ctx, content) }

let obv n =
{ new IAsyncNext<'TContext, 'TResult> with
{ new IHttpNext<'TResult> with
member _.OnSuccessAsync(ctx, content) = task { res.[n] <- Ok(ctx, content) }
member _.OnErrorAsync(ctx, err) = task { res.[n] <- Error(ctx, err) }
member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) }
Expand All @@ -119,16 +115,16 @@ module Core =
}

/// Run list pipelines sequentially.
let sequential<'TContext, 'TSource, 'TResult>
(merge: 'TContext list -> 'TContext)
(handlers: seq<Pipeline<'TContext, 'TResult>>)
: Pipeline<'TContext, 'TResult list> =
let sequential<'TSource, 'TResult>
(merge: HttpContext list -> HttpContext)
(handlers: seq<HttpHandler<'TResult>>)
: HttpHandler<'TResult list> =
fun next ->
task {
let res = ResizeArray<Result<'TContext * 'TResult, 'TContext * exn>>()
let res = ResizeArray<Result<HttpContext * 'TResult, HttpContext * exn>>()

let obv =
{ new IAsyncNext<'TContext, 'TResult> with
{ new IHttpNext<'TResult> with
member _.OnSuccessAsync(ctx, content) = task { Ok(ctx, content) |> res.Add }
member _.OnErrorAsync(ctx, err) = task { res.Add(Error(ctx, err)) }
member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) }
Expand All @@ -147,13 +143,13 @@ module Core =
}

/// Chunks a sequence of Middlewares into a combination of sequential and concurrent batches.
let chunk<'TContext, 'TSource, 'TResult>
(merge: 'TContext list -> 'TContext)
let chunk<'TSource, 'TResult>
(merge: HttpContext list -> HttpContext)
(chunkSize: int)
(maxConcurrency: int)
(handler: seq<'TSource> -> Pipeline<'TContext, seq<'TResult>>)
(handler: seq<'TSource> -> HttpHandler<seq<'TResult>>)
(items: seq<'TSource>)
: Pipeline<'TContext, seq<'TResult>> =
: HttpHandler<seq<'TResult>> =
items
|> Seq.chunkBySize chunkSize
|> Seq.chunkBySize maxConcurrency
Expand All @@ -163,24 +159,24 @@ module Core =
|> map (Seq.ofList >> Seq.collect (Seq.collect id))

/// Handler that skips (ignores) the content and outputs unit.
let ignoreContent<'TContext, 'TSource> (source: Pipeline<'TContext, 'TSource>) : Pipeline<'TContext, unit> =
let ignoreContent<'TSource> (source: HttpHandler<'TSource>) : HttpHandler<unit> =
fun next ->
{ new IAsyncNext<'TContext, 'TSource> with
{ new IHttpNext<'TSource> with
member _.OnSuccessAsync(ctx, content) = next.OnSuccessAsync(ctx, ())
member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn)
member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) }
|> source

let cache<'TContext, 'TSource> (source: Pipeline<'TContext, 'TSource>) : Pipeline<'TContext, 'TSource> =
let mutable cache: ('TContext * 'TSource) option = None
let cache<'TSource> (source: HttpHandler<'TSource>) : HttpHandler<'TSource> =
let mutable cache: (HttpContext * 'TSource) option = None

fun next ->
task {
match cache with
| Some(ctx, content) -> return! next.OnSuccessAsync(ctx, content)
| _ ->
return!
{ new IAsyncNext<'TContext, 'TSource> with
{ new IHttpNext<'TSource> with
member _.OnSuccessAsync(ctx, content) =
task {
cache <- Some(ctx, content)
Expand All @@ -196,16 +192,13 @@ module Core =
let never _ = task { () }

/// Completes the current request.
let empty<'TContext> (ctx: 'TContext) : Pipeline<'TContext, unit> =
let empty (ctx: HttpContext) : HttpHandler<unit> =
fun next -> next.OnSuccessAsync(ctx, ())

/// Filter content using a predicate function.
let filter<'TContext, 'TSource>
(predicate: 'TSource -> bool)
(source: Pipeline<'TContext, 'TSource>)
: Pipeline<'TContext, 'TSource> =
let filter<'TSource> (predicate: 'TSource -> bool) (source: HttpHandler<'TSource>) : HttpHandler<'TSource> =
fun next ->
{ new IAsyncNext<'TContext, 'TSource> with
{ new IHttpNext<'TSource> with
member _.OnSuccessAsync(ctx, value) =
task {
if predicate value then
Expand All @@ -217,12 +210,9 @@ module Core =
|> source

/// Validate content using a predicate function. Same as filter ut produces an error if validation fails.
let validate<'TContext, 'TSource>
(predicate: 'TSource -> bool)
(source: Pipeline<'TContext, 'TSource>)
: Pipeline<'TContext, 'TSource> =
let validate<'TSource> (predicate: 'TSource -> bool) (source: HttpHandler<'TSource>) : HttpHandler<'TSource> =
fun next ->
{ new IAsyncNext<'TContext, 'TSource> with
{ new IHttpNext<'TSource> with
member _.OnSuccessAsync(ctx, value) =
if predicate value then
next.OnSuccessAsync(ctx, value)
Expand All @@ -234,25 +224,22 @@ module Core =
|> source

/// Retrieves the content.
let await<'TContext, 'TSource> () (source: Pipeline<'TContext, 'TSource>) : Pipeline<'TContext, 'TSource> =
source |> map<'TContext, 'TSource, 'TSource> id
let await<'TSource> () (source: HttpHandler<'TSource>) : HttpHandler<'TSource> =
source |> map<'TSource, 'TSource> id

/// Returns the current environment.
let ask<'TContext, 'TSource> (source: Pipeline<'TContext, 'TSource>) : Pipeline<'TContext, 'TContext> =
let ask<'TSource> (source: HttpHandler<'TSource>) : HttpHandler<HttpContext> =
fun next ->
{ new IAsyncNext<'TContext, 'TSource> with
{ new IHttpNext<'TSource> with
member _.OnSuccessAsync(ctx, _) = next.OnSuccessAsync(ctx, ctx)
member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn)
member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) }
|> source

/// Update (asks) the context.
let update<'TContext, 'TSource>
(update: 'TContext -> 'TContext)
(source: Pipeline<'TContext, 'TSource>)
: Pipeline<'TContext, 'TSource> =
let update<'TSource> (update: HttpContext -> HttpContext) (source: HttpHandler<'TSource>) : HttpHandler<'TSource> =
fun next ->
{ new IAsyncNext<'TContext, 'TSource> with
{ new IHttpNext<'TSource> with
member _.OnSuccessAsync(ctx, content) =
next.OnSuccessAsync(update ctx, content)

Expand All @@ -261,8 +248,5 @@ module Core =
|> source

/// Replaces the value with a constant.
let replace<'TContext, 'TSource, 'TResult>
(value: 'TResult)
(source: Pipeline<'TContext, 'TSource>)
: Pipeline<'TContext, 'TResult> =
let replace<'TSource, 'TResult> (value: 'TResult) (source: HttpHandler<'TSource>) : HttpHandler<'TResult> =
map (fun _ -> value) source
50 changes: 22 additions & 28 deletions src/Pipeline/Error.fs → src/Error.fs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
// Copyright 2020 Cognite AS
// SPDX-License-Identifier: Apache-2.0

namespace Oryx.Pipeline
namespace Oryx

open System

open FSharp.Control.TaskBuilder
open Oryx

module Error =
/// Handler for protecting the pipeline from exceptions and protocol violations.
let protect<'TContext, 'TSource> (source: Pipeline<'TContext, 'TSource>) : Pipeline<'TContext, 'TSource> =
/// Handler for protecting the HttpHandler from exceptions and protocol violations.
let protect<'TSource> (source: HttpHandler<'TSource>) : HttpHandler<'TSource> =
fun next ->
let mutable stopped = false

{ new IAsyncNext<'TContext, 'TSource> with
{ new IHttpNext<'TSource> with
member _.OnSuccessAsync(ctx, content) =
task {
match stopped with
Expand Down Expand Up @@ -47,12 +47,12 @@ module Error =
|> source

/// Handler for catching errors and then delegating to the error handler on what to do.
let catch<'TContext, 'TSource>
(errorHandler: 'TContext -> exn -> Pipeline<'TContext, 'TSource>)
(source: Pipeline<'TContext, 'TSource>)
: Pipeline<'TContext, 'TSource> =
let catch<'TSource>
(errorHandler: HttpContext -> exn -> HttpHandler<'TSource>)
(source: HttpHandler<'TSource>)
: HttpHandler<'TSource> =
fun next ->
{ new IAsyncNext<'TContext, 'TSource> with
{ new IHttpNext<'TSource> with
member _.OnSuccessAsync(ctx, content) =
task {
try
Expand All @@ -79,22 +79,22 @@ module Error =
| Error
| Panic

/// Choose from a list of pipelines to use. The first middleware that succeeds will be used. Handlers will be
/// Choose from a list of HttpHandlers to use. The first middleware that succeeds will be used. Handlers will be
/// tried until one does not produce any error, or a `PanicException`.
let choose<'TContext, 'TSource, 'TResult>
(handlers: (Pipeline<'TContext, 'TSource> -> Pipeline<'TContext, 'TResult>) list)
(source: Pipeline<'TContext, 'TSource>)
: Pipeline<'TContext, 'TResult> =
let choose<'TSource, 'TResult>
(handlers: (HttpHandler<'TSource> -> HttpHandler<'TResult>) list)
(source: HttpHandler<'TSource>)
: HttpHandler<'TResult> =
fun next ->
let exns: ResizeArray<exn> = ResizeArray()

{ new IAsyncNext<'TContext, 'TSource> with
{ new IHttpNext<'TSource> with
member _.OnSuccessAsync(ctx, content) =
let mutable state = ChooseState.Error

task {
let obv =
{ new IAsyncNext<'TContext, 'TResult> with
{ new IHttpNext<'TResult> with
member _.OnSuccessAsync(ctx, content) =
task {
exns.Clear() // Clear to avoid buildup of exceptions in streaming scenarios.
Expand Down Expand Up @@ -122,7 +122,7 @@ module Error =

member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) }

/// Proces handlers until `NoError` or `Panic`.
/// Process handlers until `NoError` or `Panic`.
for handler in handlers do
if state = ChooseState.Error then
state <- ChooseState.NoError
Expand All @@ -136,7 +136,7 @@ module Error =
return! next.OnErrorAsync(ctx, AggregateException(exns))
| ChooseState.Error, exns when exns.Count = 1 -> return! next.OnErrorAsync(ctx, exns.[0])
| ChooseState.Error, _ ->
return! next.OnErrorAsync(ctx, SkipException "Choose: No hander matched")
return! next.OnErrorAsync(ctx, SkipException "Choose: No handler matched")
| ChooseState.NoError, _ -> ()
}

Expand All @@ -151,25 +151,19 @@ module Error =
|> source

/// Error handler for forcing error. Use with e.g `req` computational expression if you need to "return" an error.
let fail<'TContext, 'TSource, 'TResult>
(err: Exception)
(source: Pipeline<'TContext, 'TSource>)
: Pipeline<'TContext, 'TResult> =
let fail<'TSource, 'TResult> (err: Exception) (source: HttpHandler<'TSource>) : HttpHandler<'TResult> =
fun next ->
{ new IAsyncNext<'TContext, 'TSource> with
{ new IHttpNext<'TSource> with
member _.OnSuccessAsync(ctx, content) = next.OnErrorAsync(ctx, err)
member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn)
member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) }
|> source

/// Error handler for forcing a panic error. Use with e.g `req` computational expression if you need break out of
/// the any error handling e.g `choose` or `catch`•.
let panic<'TContext, 'TSource, 'TResult>
(err: Exception)
(source: Pipeline<'TContext, 'TSource>)
: Pipeline<'TContext, 'TResult> =
let panic<'TSource, 'TResult> (err: Exception) (source: HttpHandler<'TSource>) : HttpHandler<'TResult> =
fun next ->
{ new IAsyncNext<'TContext, 'TSource> with
{ new IHttpNext<'TSource> with
member _.OnSuccessAsync(ctx, content) =
next.OnErrorAsync(ctx, PanicException(err))

Expand Down
File renamed without changes.
Loading

0 comments on commit 95a535c

Please sign in to comment.