[ ] Akka.Net, F# |
type IMetricsTimer =
abstract member Measure : Amount -> unit
abstract member Measure : Amount * Item -> unit
type IMetricsCounter =
abstract member Decrement : unit -> unit
abstract member Decrement : Amount -> unit
abstract member Decrement : Amount * Item -> unit
abstract member Increment : unit -> unit
abstract member Increment : Amount -> unit
abstract member Increment : Amount * Item -> unit
type DecrementCounterCommand =
{ CounterId : CounterId
DecrementAmount : Amount
Item : Item }
type CreateCounterCommand =
{ CounterId : CounterId
Context : ContextName
Name : MetricName
MeasurementUnit : MeasurementUnit
ReportItemPercentages : bool
ReportSetItems : bool
ResetOnReporting : bool }
type MetricsMessage =
| DecrementCounter of DecrementCounterCommand
| IncrementCounter of IncrementCounterCommand
| MarkMeter of MarkMeterCommand
| MeasureTime of MeasureTimeCommand
| CreateCounter of CreateCounterCommand
| CreateMeter of CreateMeterCommand
| CreateTimer of CreateTimerCommand
let private createMeter (evtStream: EventStream) meterId =
{ new IMetricsMeter with
member this.Mark amount =
this.Mark (amount, Item None)
member this.Mark item =
this.Mark (Amount 1L, item)
member this.Mark (amount, item) =
evtStream.Publish <| MarkMeter { MeterId = meterId; Amount = amount; Item = item }
private IMetricsMeter createMeter(EventStream evtStream, MeterId meterId)
{
private class TempClass : IMetricsMeter
{
public void Mark(long amount)
{
Mark(amount, "");
}
public void Mark(string item)
{
Mark(1, item);
}
public void Mark(long amount, string item)
{
evtStream.Publish(new MarkMeter {...});//omitted
}
}
return new TempClass();
}
member this.CreateMeter (name, measureUnit, rateUnit) =
let cmd =
{ MeterId = MeterId (toId name)
Context = context
Name = name
MeasurementUnit = measureUnit
RateUnit = rateUnit }
evtStream.Publish <| CreateMeter cmd
createMeter evtStream cmd.MeterId
type IActorContext with
member x.GetMetricsProducer context =
createAdapter x.System.EventStream context
type public MetricController(metrics: IMetrics) =
inherit ApiController()
[]
[]
member __.GetMetrics() =
__.Ok(metrics.Snapshot.Get())
let createRecorder (metrics: IMetrics) (mailbox: Actor<_>) =
let self = mailbox.Self
let counters = new Dictionary()
let meters = new Dictionary()
let timers = new Dictionary()
// ...
let handle = function
| DecrementCounter evt ->
match counters.TryGetValue evt.CounterId with
| (false, _) -> ()
| (true, c) ->
let (Amount am) = evt.DecrementAmount
match evt.Item with
| Item (Some i) -> c.Decrement (i, am)
| Item None -> c.Decrement (am)
| CreateMeter cmd ->
match meters.TryGetValue cmd.MeterId with
| (false, _) ->
let (ContextName ctxName) = cmd.Context
let (MetricName name) = cmd.Name
let options = new MeterOptions(
Context = ctxName,
MeasurementUnit = toUnit cmd.MeasurementUnit,
Name = name,
RateUnit = toTimeUnit cmd.RateUnit)
let m = metrics.Provider.Meter.Instance options
meters.Add(cmd.MeterId, m)
| _ -> ()
// match
subscribe typedefof self mailbox.Context.System.EventStream |> ignore
let rec loop() = actor {
let! msg = mailbox.Receive()
handle msg
return! loop()
}
loop()
type IMetricApiConfig =
abstract member Host: string
abstract member Port: int
type ApiMessage = ReStartApiMessage
let createReader (config: IMetricApiConfig) resolver (mailbox: Actor<_>) =
let startUp (app: IAppBuilder) =
let httpConfig = new HttpConfiguration(DependencyResolver = resolver)
httpConfig.Formatters.JsonFormatter.SerializerSettings.Converters.Add(new MetricDataConverter())
httpConfig.Formatters.JsonFormatter.Indent <- true
httpConfig.MapHttpAttributeRoutes()
httpConfig.EnsureInitialized()
app.UseWebApi(httpConfig) |> ignore
let uri = sprintf "http://%s:%d" config.Host config.Port
let mutable api = {new IDisposable with member this.Dispose() = ()}
let handleMsg (ReStartApiMessage) =
api.Dispose()
api <- WebApp.Start(uri, startUp)
mailbox.Defer api.Dispose
mailbox.Self
type Fragment =
| OperationName of string
| OperationDuration of TimeSpan
| TotalDuration of TimeSpan
| ReceivedOn of DateTimeOffset
| MessageType of Type
| Exception of exn
type ILogBuilder =
abstract OnOperationBegin: unit -> unit
abstract OnOperationCompleted: unit -> unit
abstract Set: LogLevel -> unit
abstract Set: Fragment -> unit
abstract Fail: exn -> unit
abstract Supress: unit -> unit
abstract TryGet: Fragment -> Fragment option
type LogBuilder(logger: ILoggingAdapter) =
let logFragments = new Dictionary()
let stopwatch = new Stopwatch()
let mutable logLevel = LogLevel.DebugLevel
interface ILogBuilder with
//
let set fragment =
logFragments.[fragment.GetType()] <- fragment
member x.OnOperationBegin() =
stopwatch.Start()
member this.Fail e =
logLevel <- LogLevel.ErrorLevel
set <| Exception e
member this.OnOperationCompleted() =
stopwatch.Stop()
set <| OperationDuration stopwatch.Elapsed
match tryGet <| ReceivedOn DateTimeOffset.MinValue with
| Some (ReceivedOn date) -> set <| TotalDuration (DateTimeOffset.UtcNow - date)
| _ -> ()
match status with
| Active ->
match (logLevel) with
| LogLevel.DebugLevel -> logger.Debug(message())
| LogLevel.InfoLevel -> logger.Info(message())
| LogLevel.WarningLevel -> logger.Warning(message())
| LogLevel.ErrorLevel -> logger.Error(message())
| x -> failwith(sprintf "Log level %s is not supported" <| string x)
| Supressed -> ()
type Expr<'T,'TLog when 'TLog :> ILogBuilder> = Expression, 'T, 'TLog>>
type Wrap =
static member Handler(e: Expression, 'T, #ILogBuilder>>) = e
let toExprName (expr: Expr<_,_>) =
match expr.Body with
| :? MethodCallExpression as methodCall -> methodCall.Method.Name
| x -> x.ToString()
let loggerActor<'TMsg> (handler: Expr<'TMsg,_>) (mailbox: Actor<'TMsg>) =
let exprName = handler |> toExprName
let metrics = mailbox.Context.GetMetricsProducer (ContextName exprName)
let logger = mailbox.Log.Value
let errorMeter = metrics.CreateMeter (MetricName "Error Rate", Errors)
let instanceCounter = metrics.CreateCounter (MetricName "Instances Counter", Items)
let messagesMeter = metrics.CreateMeter (MetricName "Message Processing Rate", Items)
let operationsTimer = metrics.CreateTimer (MetricName "Operation Durations", Requests, MilliSeconds, MilliSeconds)
instanceCounter.Increment()
mailbox.Defer instanceCounter.Decrement
let completeOperation (msgType: Type) (logger: #ILogBuilder) =
logger.Set (OperationName exprName)
logger.OnOperationCompleted()
match logger.TryGet(OperationDuration TimeSpan.Zero) with
| Some(OperationDuration dur) ->
operationsTimer.Measure(Amount (int64 dur.TotalMilliseconds), Item (Some exprName))
| _ -> ()
messagesMeter.Mark(Item (Some msgType.Name))
let registerExn (msgType: Type) e (logger: #ILogBuilder) =
errorMeter.Mark(Item (Some msgType.Name))
logger.Fail e
let wrapHandler handler mb (logBuilder: unit -> #ILogBuilder) =
let innherHandler mb msg =
let logger = logBuilder()
let msgType = msg.GetType()
logger.Set (MessageType msgType)
try
try
logger.OnOperationBegin()
handler mb msg logger
with
| e -> registerExn msgType e logger; reraise()
finally
completeOperation msgType logger
innherHandler mb
Func wrapHandler(
Func handler,
TMailbox mb,
Func logBuilder)
where TLogBuilder: ILogBuilder
let wrapExpr (expr: Expr<_,_>) mailbox logger =
let action = expr.Compile()
wrapHandler
(fun mb msg log -> action.Invoke(mailbox, msg, log))
mailbox
(fun () -> new LogBuilder(logger))
Action wrapExpr(
Expr expr,
Actor mb,
ILoggingAdapterlogger)
let rec loop() =
actor {
let! msg = mailbox.Receive()
wrapExpr handler mailbox akkaLogger msg
return! loop()
}
loop()
type ActorMessages =
| Wait of int
| Stop
let waitProcess = function
| Wait d -> Async.Sleep d |> Async.RunSynchronously
| Stop -> ()
let spawnWaitWorker() =
loggerActor <| Wrap.Handler(fun mb msg log -> waitProcess msg)
let waitWorker = spawn system "worker-wait" <| spawnWaitWorker()
waitWorker
let failOrStopProcess (mailbox: Actor<_>) msg (log: ILogBuilder) =
try
match msg with
| Wait d -> failwith "can't wait!"
| Stop -> mailbox.Context.Stop mailbox.Self
with
| e -> log.Fail e
let spawnFailOrStopWorker() =
loggerActor <| Wrap.Handler(fun mb msg log -> failOrStopProcess mb msg log)
let failOrStopWorker = spawn system "worker-vocal" <| spawnFailOrStopWorker()
failOrStopWorker
open Akka.FSharp
open SimpleInjector
open App.Metrics;
open Microsoft.Extensions.DependencyInjection
open SimpleInjector.Integration.WebApi
open System.Reflection
open System
open Metrics.MetricActors
open ExampleActors
let createSystem =
let configStr = System.IO.File.ReadAllText("system.json")
System.create "system-for-metrics" (Configuration.parse(configStr))
let createMetricActors system container =
let dependencyResolver = new SimpleInjectorWebApiDependencyResolver(container)
let apiConfig =
{ new IMetricApiConfig with
member x.Host = "localhost"
member x.Port = 10001 }
let metricsReaderSpawner = createReader apiConfig dependencyResolver
let metricsReader = spawn system "metrics-reader" metricsReaderSpawner
let metricsRecorderSpawner = createRecorder (container.GetInstance())
let metricsRecorder = spawn system "metrics-recorder" metricsRecorderSpawner
()
type Container with
member x.AddMetrics() =
let serviceCollection = new ServiceCollection()
let entryAssemblyName = Assembly.GetEntryAssembly().GetName()
let metricsHostBuilder = serviceCollection.AddMetrics(entryAssemblyName)
serviceCollection.AddLogging() |> ignore
let provider = serviceCollection.BuildServiceProvider()
x.Register(fun () -> provider.GetRequiredService())
[]
let main argv =
let container = new Container()
let system = createSystem
container.RegisterSingleton system
container.AddMetrics()
container.Verify()
createMetricActors system container
let waitWorker1 = spawn system "worker-wait1" <| spawnWaitWorker()
let waitWorker2 = spawn system "worker-wait2" <| spawnWaitWorker()
let waitWorker3 = spawn system "worker-wait3" <| spawnWaitWorker()
let waitWorker4 = spawn system "worker-wait4" <| spawnWaitWorker()
let failWorker = spawn system "worker-fail" <| spawnFailWorker()
let waitOrStopWorker = spawn system "worker-silent" <| spawnWaitOrStopWorker()
let failOrStopWorker = spawn system "worker-vocal" <| spawnFailOrStopWorker()
waitWorker1 ignore
0
{
"Context": "waitProcess",
"Counters": [
{
"Name": "Instances Counter",
"Unit": "items",
"Count": 4
}
],
"Meters": [
{
"Name": "Message Processing Rate",
"Unit": "items",
"Count": 4,
"FifteenMinuteRate": 35.668327519112893,
"FiveMinuteRate": 35.01484385742755,
"Items": [
{
"Count": 4,
"FifteenMinuteRate": 0.0,
"FiveMinuteRate": 0.0,
"Item": "Wait",
"MeanRate": 13.082620551464204,
"OneMinuteRate": 0.0,
"Percent": 100.0
}
],
"MeanRate": 13.082613248856632,
"OneMinuteRate": 31.356094372926623,
"RateUnit": "min"
}
],
"Timers": [
{
"Name": "Operation Durations",
"Unit": "req",
"ActiveSessions": 0,
"Count": 4,
"DurationUnit": "ms",
"Histogram": {
"LastUserValue": "waitProcess",
"LastValue": 8001.0,
"Max": 8001.0,
"MaxUserValue": "waitProcess",
"Mean": 3927.1639786164278,
"Median": 5021.0,
"Min": 1078.0,
"MinUserValue": "waitProcess",
"Percentile75": 8001.0,
"Percentile95": 8001.0,
"Percentile98": 8001.0,
"Percentile99": 8001.0,
"Percentile999": 8001.0,
"SampleSize": 4,
"StdDev": 2932.0567172627871,
"Sum": 15190.0
},
"Rate": {
"FifteenMinuteRate": 0.00059447212531854826,
"FiveMinuteRate": 0.00058358073095712587,
"MeanRate": 0.00021824579927905906,
"OneMinuteRate": 0.00052260157288211038
}
}
]
}