Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add headers to PM correlation func #264

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,106 +1,106 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

using System;
using System.Collections.Generic;
using System.Linq;
using NBB.Core.Effects;

namespace NBB.ProcessManager.Definition.Builder
{
public abstract class AbstractDefinition<TData> : IDefinition<TData>
{
private readonly List<IEventActivitySet<TData>> _eventActivities = new();
private readonly Dictionary<Type, EventCorrelation<object, TData>> _eventCorrelations = new();


using NBB.Core.Effects;
using System;
using System.Collections.Generic;
using System.Linq;
namespace NBB.ProcessManager.Definition.Builder
{
public abstract class AbstractDefinition<TData> : IDefinition<TData>
{
private readonly List<IEventActivitySet<TData>> _eventActivities = new();
private readonly Dictionary<Type, EventCorrelation<object, TData>> _eventCorrelations = new();
public IEventActivitySetBuilder<TEvent, TData> StartWith<TEvent>(EventPredicate<TEvent, TData> predicate = null)
{
var ea = new EventActivitySet<TEvent, TData>(true, predicate);
_eventActivities.Add(ea);
return new EventActivitySetBuilder<TEvent, TData>(ea);
}

{
var ea = new EventActivitySet<TEvent, TData>(true, predicate);
_eventActivities.Add(ea);
return new EventActivitySetBuilder<TEvent, TData>(ea);
}
public IEventActivitySetBuilder<TEvent, TData> When<TEvent>(EventPredicate<TEvent, TData> predicate = null)
{
var ea = new EventActivitySet<TEvent, TData>(false, predicate);
_eventActivities.Add(ea);
return new EventActivitySetBuilder<TEvent, TData>(ea);
}

public void Event<TEvent>(Action<EventCorrelationBuilder<TEvent, TData>> configureEventCorrelation)
{
Preconditions.NotNull(configureEventCorrelation, nameof(configureEventCorrelation));

var configurator = new EventCorrelationBuilder<TEvent, TData>();
configureEventCorrelation(configurator);
var correl = configurator.Build();
_eventCorrelations.Add(typeof(TEvent), new EventCorrelation<object, TData>(@event => correl.CorrelationFilter((TEvent) @event)));
}

Func<TEvent, object> IDefinition<TData>.GetCorrelationFilter<TEvent>()
{
if (_eventCorrelations.ContainsKey(typeof(TEvent)))
{
var func = _eventCorrelations[typeof(TEvent)];
return @event => func.CorrelationFilter(@event);
}

return null;
}

IEnumerable<EventType> IDefinition.GetEventTypes() => _eventActivities.Select(x => new EventType(x.EventType, _eventActivities.Count(e => e.GetType() == x.GetType() && e.StartsProcess) == 1)).Distinct();

EffectFunc<TEvent, TData> IDefinition<TData>.GetEffectFunc<TEvent>()
{
var func = _eventActivities
.Where(x => x.EventType == typeof(TEvent))
.Select(x => x.EffectFunc)
.DefaultIfEmpty()
.Aggregate(EffectFuncs.Sequential);

return (@event, data) => func?.Invoke(@event, data) ?? Effect.Pure();
}

SetStateFunc<TEvent, TData> IDefinition<TData>.GetSetStateFunc<TEvent>()
{
var func = _eventActivities
.Where(x => x.EventType == typeof(TEvent))
.Select(x => x.SetStateFunc)
.DefaultIfEmpty()
.Aggregate((func1, func2) => (@event, data) =>
{
var newData = func1(@event, data);
return func2(@event, new InstanceData<TData>(data.InstanceId, newData));
}
);

return (@event, data) => func == null ? data.Data : func.Invoke(@event, data);
}

EventPredicate<TEvent, TData> IDefinition<TData>.GetStarterPredicate<TEvent>()
{
var act = _eventActivities.SingleOrDefault(x => x.EventType == typeof(TEvent) && x.StartsProcess);
if (act == null)
return (@event, data) => false;

return (@event, data) => act.StarterPredicate?.Invoke(@event, data) ?? true;
}

EventPredicate<TEvent, TData> IDefinition<TData>.GetCompletionPredicate<TEvent>()
{
var acts = _eventActivities
.Where(x => x.EventType == typeof(TEvent) && x.CompletesProcess)
.ToList();

if (acts.Count == 0)
return (@event, data) => false;

var func = acts.Select(x => x.CompletionPredicate)
.DefaultIfEmpty()
.Aggregate((func1, func2) => (@event, data) => func2(@event, data));

return (@event, data) => func?.Invoke(@event, data) ?? true;
}
}
}
{
var ea = new EventActivitySet<TEvent, TData>(false, predicate);
_eventActivities.Add(ea);
return new EventActivitySetBuilder<TEvent, TData>(ea);
}
public void Event<TEvent>(Action<EventCorrelationBuilder<TEvent, TData>> configureEventCorrelation)
{
Preconditions.NotNull(configureEventCorrelation, nameof(configureEventCorrelation));
var configurator = new EventCorrelationBuilder<TEvent, TData>();
configureEventCorrelation(configurator);
var correl = configurator.Build();
_eventCorrelations.Add(typeof(TEvent), new EventCorrelation<object, TData>((@event, headers) => correl.CorrelationFilter((TEvent)@event, headers)));
}
Func<TEvent, IDictionary<string, string>, object> IDefinition<TData>.GetCorrelationFilter<TEvent>()
{
if (_eventCorrelations.ContainsKey(typeof(TEvent)))
{
var func = _eventCorrelations[typeof(TEvent)];
return (@event, headers) => func.CorrelationFilter(@event, headers);
}
return null;
}
IEnumerable<EventType> IDefinition.GetEventTypes() => _eventActivities.Select(x => new EventType(x.EventType, _eventActivities.Count(e => e.GetType() == x.GetType() && e.StartsProcess) == 1)).Distinct();
EffectFunc<TEvent, TData> IDefinition<TData>.GetEffectFunc<TEvent>()
{
var func = _eventActivities
.Where(x => x.EventType == typeof(TEvent))
.Select(x => x.EffectFunc)
.DefaultIfEmpty()
.Aggregate(EffectFuncs.Sequential);
return (@event, data) => func?.Invoke(@event, data) ?? Effect.Pure();
}
SetStateFunc<TEvent, TData> IDefinition<TData>.GetSetStateFunc<TEvent>()
{
var func = _eventActivities
.Where(x => x.EventType == typeof(TEvent))
.Select(x => x.SetStateFunc)
.DefaultIfEmpty()
.Aggregate((func1, func2) => (@event, data) =>
{
var newData = func1(@event, data);
return func2(@event, new InstanceData<TData>(data.InstanceId, newData));
}
);
return (@event, data) => func == null ? data.Data : func.Invoke(@event, data);
}
EventPredicate<TEvent, TData> IDefinition<TData>.GetStarterPredicate<TEvent>()
{
var act = _eventActivities.SingleOrDefault(x => x.EventType == typeof(TEvent) && x.StartsProcess);
if (act == null)
return (@event, data) => false;
return (@event, data) => act.StarterPredicate?.Invoke(@event, data) ?? true;
}
EventPredicate<TEvent, TData> IDefinition<TData>.GetCompletionPredicate<TEvent>()
{
var acts = _eventActivities
.Where(x => x.EventType == typeof(TEvent) && x.CompletesProcess)
.ToList();
if (acts.Count == 0)
return (@event, data) => false;
var func = acts.Select(x => x.CompletionPredicate)
.DefaultIfEmpty()
.Aggregate((func1, func2) => (@event, data) => func2(@event, data));
return (@event, data) => func?.Invoke(@event, data) ?? true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
// This source code is licensed under the MIT license.

using System;
using System.Collections.Generic;

namespace NBB.ProcessManager.Definition.Builder
{
public class EventCorrelation<TEvent, TData>
{
public Func<TEvent, object> CorrelationFilter { get; set; }
public Func<TEvent, IDictionary<string, string>, object> CorrelationFilter { get; set; }

public EventCorrelation(Func<TEvent, object> correlationFilter)
public EventCorrelation(Func<TEvent, IDictionary<string, string>, object> correlationFilter)
{
CorrelationFilter = correlationFilter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,33 @@
// This source code is licensed under the MIT license.

using System;
using System.Collections.Generic;

namespace NBB.ProcessManager.Definition.Builder
{
public class EventCorrelationBuilder<TEvent, TData>
{
private Func<TEvent, object> _correlationFilter;
private Func<TEvent, IDictionary<string, string>, object> _correlationFilter;

public EventCorrelationBuilder<TEvent, TData> CorrelateById(Func<TEvent, Guid> selector)
{
_correlationFilter = @event => selector(@event);
_correlationFilter = (@event, headers) => selector(@event);
return this;
}

public EventCorrelationBuilder<TEvent, TData> CorrelateById(Func<TEvent, IDictionary<string, string>, Guid> selector)
{
_correlationFilter = (@event, headers) => selector(@event, headers);
return this;
}

public EventCorrelationBuilder<TEvent, TData> CorrelateById<T>(Func<TEvent, T> selector) where T : struct
{
_correlationFilter = @event => selector(@event);
_correlationFilter = (@event, headers) => selector(@event);
return this;
}

public EventCorrelationBuilder<TEvent, TData> CorrelateBy<T>(Func<TEvent, T> selector) where T : class
public EventCorrelationBuilder<TEvent, TData> CorrelateBy<T>(Func<TEvent, IDictionary<string, string>, T> selector) where T : class
{
_correlationFilter = selector;
return this;
Expand Down
66 changes: 33 additions & 33 deletions src/Orchestration/NBB.ProcessManager.Definition/IDefinition.cs
Original file line number Diff line number Diff line change
@@ -1,39 +1,39 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

using System;
using System.Collections.Generic;

namespace NBB.ProcessManager.Definition
{
public interface IDefinition<TData> : IDefinition
{
EffectFunc<TEvent, TData> GetEffectFunc<TEvent>();
SetStateFunc<TEvent, TData> GetSetStateFunc<TEvent>();
Func<TEvent, object> GetCorrelationFilter<TEvent>();
EventPredicate<TEvent, TData> GetStarterPredicate<TEvent>();
EventPredicate<TEvent, TData> GetCompletionPredicate<TEvent>();
}

public interface IDefinition
{
IEnumerable<EventType> GetEventTypes();
}

public static class IDefinitionExtensions
{
public static bool IsObsolete(this IDefinition definition)
=> definition.GetType().HasAttribute(typeof(ObsoleteProcessAttribute));
}

using System;
using System.Collections.Generic;
namespace NBB.ProcessManager.Definition
{
public interface IDefinition<TData> : IDefinition
{
EffectFunc<TEvent, TData> GetEffectFunc<TEvent>();
SetStateFunc<TEvent, TData> GetSetStateFunc<TEvent>();
Func<TEvent, IDictionary<string,string>, object> GetCorrelationFilter<TEvent>();
EventPredicate<TEvent, TData> GetStarterPredicate<TEvent>();
EventPredicate<TEvent, TData> GetCompletionPredicate<TEvent>();
}
public interface IDefinition
{
IEnumerable<EventType> GetEventTypes();
}
public static class IDefinitionExtensions
{
public static bool IsObsolete(this IDefinition definition)
=> definition.GetType().HasAttribute(typeof(ObsoleteProcessAttribute));
}
/// <summary>
/// New process instances cannot be started for this definition.
/// Obsolete versions of the process definitions are retained because they are required for completing already started processes.
/// </summary>
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public sealed class ObsoleteProcessAttribute : Attribute
{
}

public record struct EventType(Type Type, bool OnlyStartsProcess);
}
/// </summary>
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public sealed class ObsoleteProcessAttribute : Attribute
{
}
public record struct EventType(Type Type, bool OnlyStartsProcess);
}
Loading