-

   rss_rss_hh_new

 - e-mail

 

 -

 LiveInternet.ru:
: 17.03.2011
:
:
: 51

:


[ ] Akka.Net, F#

, 27 2017 . 19:51 +
, F# , C#.

F#, C#, Microsoft.
WTF , .. .



, Akka.NET, . , , - ( ), - - . ElasticDB, InfluxDB Grafana Kibana .

, , . , .

:

  1. App.Metrics
  2. , Akka
  3. ,


App.Metrics 6 :

  • Counters
  • Apdex
  • Gauges
  • Histograms
  • Meters
  • Timers

, :)
( , , ).

, ( ) EventStream ( Akka.Net).

, - :

    type IMetricsTimer = 
        abstract member Measure : Amount        -> unit
        abstract member Measure : Amount * Item -> unit

, / , :

    type IMetricsCounter = 
        abstract member Decrement : unit          -> unit
        abstract member Decrement : Amount        -> unit
        abstract member Decrement : Amount * Item -> unit
        abstract member Increment : unit          -> unit
        abstract member Increment : Amount        -> unit
        abstract member Increment : Amount * Item -> unit

:

    type DecrementCounterCommand = 
        { CounterId       : CounterId
          DecrementAmount : Amount
          Item            : Item }

    type CreateCounterCommand = 
        { CounterId             : CounterId
          Context               : ContextName
          Name                  : MetricName
          MeasurementUnit       : MeasurementUnit
          ReportItemPercentages : bool
          ReportSetItems        : bool
          ResetOnReporting      : bool }

, , -. Discriminated Union:

    type MetricsMessage =
        | DecrementCounter of DecrementCounterCommand
        | IncrementCounter of IncrementCounterCommand
        | MarkMeter        of MarkMeterCommand
        | MeasureTime      of MeasureTimeCommand
        | CreateCounter    of CreateCounterCommand
        | CreateMeter      of CreateMeterCommand
        | CreateTimer      of CreateTimerCommand

. , .. .

:

    let private createMeter (evtStream: EventStream) meterId = 
        { new IMetricsMeter with

              member this.Mark amount = 
                  this.Mark (amount, Item None)

              member this.Mark item = 
                  this.Mark (Amount 1L, item)

              member this.Mark (amount, item) = 
                  evtStream.Publish <| MarkMeter { MeterId = meterId; Amount = amount; Item = item }

C# :

        private IMetricsMeter createMeter(EventStream evtStream, MeterId meterId)
        {
            private class TempClass : IMetricsMeter
            {
                public void Mark(long amount)
                {
                    Mark(amount, "");
                }

                public void Mark(string item)
                {
                    Mark(1, item);
                }

                public void Mark(long amount, string item)
                {
                    evtStream.Publish(new MarkMeter {...});//omitted
                }
            }
            return new TempClass();
        }

, , .. . F# .

, MeterId.

IMetricsAdapter, .. :

            member this.CreateMeter (name, measureUnit, rateUnit) = 
                let cmd = 
                    { MeterId         = MeterId (toId name)
                      Context         = context
                      Name            = name
                      MeasurementUnit = measureUnit
                      RateUnit        = rateUnit }
                evtStream.Publish <| CreateMeter cmd
                createMeter evtStream cmd.MeterId

, createMeter evtStream cmd.MeterId.
, IMetricsMeter.

ActorSystem, IMetricsAdapter :

    type IActorContext with
        member x.GetMetricsProducer context = 
            createAdapter x.System.EventStream context


:

  • MetricsMessage / .
  • WebApi , GET .

ApiController, :

    type public MetricController(metrics: IMetrics) = 
        inherit ApiController()

        []
        []
        member __.GetMetrics() =
            __.Ok(metrics.Snapshot.Get())

, MetricsMessage EventStream - . IMetrics , Dictionary.

ConcurrentDictionary, ? . race condition, .

    let createRecorder (metrics: IMetrics) (mailbox: Actor<_>) = 
        let self = mailbox.Self

        let counters = new Dictionary()
        let meters   = new Dictionary()
        let timers   = new Dictionary()
        //    ...

        let handle = function
            | DecrementCounter evt ->
                match counters.TryGetValue evt.CounterId with
                | (false, _) -> ()
                | (true,  c) ->
                    let (Amount am) = evt.DecrementAmount
                    match evt.Item with
                    | Item (Some i) -> c.Decrement (i, am)
                    | Item None     -> c.Decrement (am)
            | CreateMeter cmd ->
                match meters.TryGetValue cmd.MeterId with
                | (false, _) ->
                    let (ContextName ctxName) = cmd.Context
                    let (MetricName name)     = cmd.Name
                    let options = new MeterOptions(
                                        Context         = ctxName, 
                                        MeasurementUnit = toUnit cmd.MeasurementUnit, 
                                        Name            = name,
                                        RateUnit        = toTimeUnit cmd.RateUnit)
                    let m = metrics.Provider.Meter.Instance options
                    meters.Add(cmd.MeterId, m)
                | _ -> ()
           //    match 

        subscribe typedefof self mailbox.Context.System.EventStream |> ignore

        let rec loop() = actor {
            let! msg = mailbox.Receive()
            handle msg
            return! loop()
        }
        loop()


, MetricsMessage, MetricsMessage .

:

  1. ( )
  2. Id ( (bool, obj), TryGetValue F#
  3. ,

, Owin .
, IDependencyResolver. , , Dispose() API . , .. , mutable state.

    type IMetricApiConfig = 
        abstract member Host: string
        abstract member Port: int
    
    type ApiMessage = ReStartApiMessage

    let createReader (config: IMetricApiConfig) resolver (mailbox: Actor<_>) =
        let startUp (app: IAppBuilder) = 
            let httpConfig = new HttpConfiguration(DependencyResolver = resolver)
            httpConfig.Formatters.JsonFormatter.SerializerSettings.Converters.Add(new MetricDataConverter())
            httpConfig.Formatters.JsonFormatter.Indent <- true
            httpConfig.MapHttpAttributeRoutes()
            httpConfig.EnsureInitialized()
            app.UseWebApi(httpConfig) |> ignore

        let uri = sprintf "http://%s:%d" config.Host config.Port
        let mutable api = {new IDisposable with member this.Dispose() = ()}

        let handleMsg (ReStartApiMessage) = 
            api.Dispose()
            api <- WebApp.Start(uri, startUp)

        mailbox.Defer api.Dispose
        mailbox.Self 

api.Dispose mailbox.Defer. api object expression, IDisposable .


Akka.Net ( ILoggingAdapter), ( , ).

union.

type Fragment =
    | OperationName     of string
    | OperationDuration of TimeSpan
    | TotalDuration     of TimeSpan
    | ReceivedOn        of DateTimeOffset
    | MessageType       of Type
    | Exception         of exn

:

type ILogBuilder = 
    abstract OnOperationBegin:     unit     -> unit
    abstract OnOperationCompleted: unit     -> unit
    abstract Set:                  LogLevel -> unit
    abstract Set:                  Fragment -> unit
    abstract Fail:                 exn      -> unit
    abstract Supress:              unit     -> unit
    abstract TryGet:               Fragment -> Fragment option

:

type LogBuilder(logger: ILoggingAdapter) = 
    let logFragments = new Dictionary()
    let stopwatch    = new Stopwatch()
    let mutable logLevel = LogLevel.DebugLevel
    interface ILogBuilder with
        // 

, Dictionary? , LogBuilder . .

:

        let set fragment = 
            logFragments.[fragment.GetType()] <- fragment

        member x.OnOperationBegin() =   
            stopwatch.Start()

        member this.Fail e = 
            logLevel <- LogLevel.ErrorLevel
            set <| Exception e

        member this.OnOperationCompleted() = 
            stopwatch.Stop()
            set <| OperationDuration stopwatch.Elapsed

            match tryGet <| ReceivedOn DateTimeOffset.MinValue with
            | Some (ReceivedOn date) -> set <| TotalDuration (DateTimeOffset.UtcNow - date)
            | _ -> ()

            match status with
            | Active ->
                match (logLevel) with
                | LogLevel.DebugLevel   -> logger.Debug(message())
                | LogLevel.InfoLevel    -> logger.Info(message())
                | LogLevel.WarningLevel -> logger.Warning(message())
                | LogLevel.ErrorLevel   -> logger.Error(message())
                | x                     -> failwith(sprintf "Log level %s is not supported" <| string x)
            | Supressed -> ()

OnOperationCompleted():

  • OperationDuration
  • ReceivedOn ( ), TotalDuration
  • ( Supress()), Akka message(), , - Fragments


, , .

? :

  • . , .. FuncActor
  • ( )
  • ,
  • ,
  • , ,
  • ,

Linq.Expressions. QuotationExpressions F# , .. . , .

, :

type Expr<'T,'TLog when 'TLog :> ILogBuilder> = Expression, 'T, 'TLog>>

type Wrap =
    static member Handler(e:  Expression, 'T, #ILogBuilder>>) = e

let toExprName (expr: Expr<_,_>) = 
    match expr.Body with
    | :? MethodCallExpression as methodCall -> methodCall.Method.Name
    | x -> x.ToString()

Expr , Action ( , ), ( - ).

Wrap.Handler(Expr) F# fun mb msg log -> (), Linq.Expressions.

toExprName , , (MethodCallExpression) .
fun mb msg log -> handleMsg msg toExprName handleMsg.

. :

let loggerActor<'TMsg> (handler: Expr<'TMsg,_>) (mailbox: Actor<'TMsg>) =
    let exprName = handler |> toExprName
    let metrics  = mailbox.Context.GetMetricsProducer (ContextName exprName)
    let logger   = mailbox.Log.Value

handler, .. mailbox Akka (partial application).

ActorSystem IMetricsAdapter metrics. Akka logger.

:

    let errorMeter      = metrics.CreateMeter   (MetricName "Error Rate",              Errors)
    let instanceCounter = metrics.CreateCounter (MetricName "Instances Counter",       Items)
    let messagesMeter   = metrics.CreateMeter   (MetricName "Message Processing Rate", Items)
    let operationsTimer = metrics.CreateTimer   (MetricName "Operation Durations",     Requests, MilliSeconds, MilliSeconds)

    instanceCounter.Increment()
    mailbox.Defer instanceCounter.Decrement

, instanceCounter .

, .

, , , :

    let completeOperation (msgType: Type) (logger: #ILogBuilder) =
        logger.Set (OperationName exprName)
        logger.OnOperationCompleted()

        match logger.TryGet(OperationDuration TimeSpan.Zero) with
        | Some(OperationDuration dur) -> 
            operationsTimer.Measure(Amount (int64 dur.TotalMilliseconds), Item (Some exprName))
        | _ -> ()

        messagesMeter.Mark(Item (Some msgType.Name))

:

    let registerExn (msgType: Type) e (logger: #ILogBuilder) = 
        errorMeter.Mark(Item (Some msgType.Name))
        logger.Fail e

. :

    let wrapHandler handler mb (logBuilder: unit -> #ILogBuilder) =
        let innherHandler mb msg  =
            let logger = logBuilder()
            let msgType = msg.GetType()
            logger.Set (MessageType msgType)
            try
                try
                    logger.OnOperationBegin()
                    handler mb msg logger
                with
                | e -> registerExn msgType e logger; reraise()
            finally
                completeOperation msgType logger
        innherHandler mb

wrapHandler . C# :

Func wrapHandler(
    Func handler, 
    TMailbox mb, 
    Func logBuilder) 
where TLogBuilder: ILogBuilder

.

wrapHandler , TMsg TResults. :

  • handler , ( )

Expression Action :

    let wrapExpr (expr: Expr<_,_>) mailbox logger = 
        let action = expr.Compile()
        wrapHandler 
            (fun mb msg log -> action.Invoke(mailbox, msg, log))
            mailbox
            (fun () -> new LogBuilder(logger))

Expression, wrapHandler , LogBuilder().

. C# :

Action wrapExpr(
    Expr expr, 
    Actor mb, 
    ILoggingAdapterlogger)

TMsg .

:)
    let rec loop() = 
        actor {
            let! msg = mailbox.Receive()
            wrapExpr handler mailbox akkaLogger msg
            return! loop()
        }
    loop()

wrapExpr handler mailbox akkaLogger, , Action, .. , unit (void c#).

msg msg .

!

?


.
, , .

:

type ActorMessages =
    | Wait of int
    | Stop

let waitProcess = function
    | Wait d -> Async.Sleep d |> Async.RunSynchronously
    | Stop   -> ()


loggerActor :

let spawnWaitWorker() =
    loggerActor <| Wrap.Handler(fun mb msg log -> waitProcess msg)

let waitWorker = spawn system "worker-wait"  <| spawnWaitWorker()
waitWorker 

:

let failOrStopProcess (mailbox: Actor<_>) msg (log: ILogBuilder) =
    try
        match msg with
        | Wait d -> failwith "can't wait!"
        | Stop   -> mailbox.Context.Stop mailbox.Self
    with
        | e -> log.Fail e

let spawnFailOrStopWorker() =
    loggerActor <| Wrap.Handler(fun mb msg log -> failOrStopProcess mb msg log)

let failOrStopWorker = spawn system "worker-vocal"  <| spawnFailOrStopWorker()
failOrStopWorker 

EntryPoint , ActorSystem, , .

Program.fs
open Akka.FSharp
open SimpleInjector
open App.Metrics;
open Microsoft.Extensions.DependencyInjection
open SimpleInjector.Integration.WebApi
open System.Reflection
open System
open Metrics.MetricActors
open ExampleActors

let createSystem = 
    let configStr = System.IO.File.ReadAllText("system.json")
    System.create "system-for-metrics" (Configuration.parse(configStr))

let createMetricActors system container = 
    let dependencyResolver = new SimpleInjectorWebApiDependencyResolver(container)
    let apiConfig = 
        { new IMetricApiConfig with
            member x.Host = "localhost"
            member x.Port = 10001 }
    
    let metricsReaderSpawner = createReader apiConfig dependencyResolver
    let metricsReader = spawn system "metrics-reader" metricsReaderSpawner

    let metricsRecorderSpawner = createRecorder (container.GetInstance())
    let metricsRecorder = spawn system "metrics-recorder" metricsRecorderSpawner
    ()

type Container with
    member x.AddMetrics() = 
        let serviceCollection  = new ServiceCollection()
        let entryAssemblyName  = Assembly.GetEntryAssembly().GetName()
        let metricsHostBuilder = serviceCollection.AddMetrics(entryAssemblyName)

        serviceCollection.AddLogging() |> ignore
        let provider = serviceCollection.BuildServiceProvider()

        x.Register(fun () -> provider.GetRequiredService())

[]
let main argv = 
    let container = new Container()
    let system = createSystem

    container.RegisterSingleton system
    container.AddMetrics()
    container.Verify()

    createMetricActors system container

    let waitWorker1      = spawn system "worker-wait1"  <| spawnWaitWorker()
    let waitWorker2      = spawn system "worker-wait2"  <| spawnWaitWorker()
    let waitWorker3      = spawn system "worker-wait3"  <| spawnWaitWorker()
    let waitWorker4      = spawn system "worker-wait4"  <| spawnWaitWorker()

    let failWorker       = spawn system "worker-fail"   <| spawnFailWorker()
    let waitOrStopWorker = spawn system "worker-silent" <| spawnWaitOrStopWorker()
    let failOrStopWorker = spawn system "worker-vocal"  <| spawnFailOrStopWorker()

    waitWorker1  ignore

    0


!

localhost:10001/metrics, json, . waitProcess:

{
      "Context": "waitProcess",
      "Counters": [
        {
          "Name": "Instances Counter",
          "Unit": "items",
          "Count": 4
        }
      ],
      "Meters": [
        {
          "Name": "Message Processing Rate",
          "Unit": "items",
          "Count": 4,
          "FifteenMinuteRate": 35.668327519112893,
          "FiveMinuteRate": 35.01484385742755,
          "Items": [
            {
              "Count": 4,
              "FifteenMinuteRate": 0.0,
              "FiveMinuteRate": 0.0,
              "Item": "Wait",
              "MeanRate": 13.082620551464204,
              "OneMinuteRate": 0.0,
              "Percent": 100.0
            }
          ],
          "MeanRate": 13.082613248856632,
          "OneMinuteRate": 31.356094372926623,
          "RateUnit": "min"
        }
      ],
      "Timers": [
        {
          "Name": "Operation Durations",
          "Unit": "req",
          "ActiveSessions": 0,
          "Count": 4,
          "DurationUnit": "ms",
          "Histogram": {
            "LastUserValue": "waitProcess",
            "LastValue": 8001.0,
            "Max": 8001.0,
            "MaxUserValue": "waitProcess",
            "Mean": 3927.1639786164278,
            "Median": 5021.0,
            "Min": 1078.0,
            "MinUserValue": "waitProcess",
            "Percentile75": 8001.0,
            "Percentile95": 8001.0,
            "Percentile98": 8001.0,
            "Percentile99": 8001.0,
            "Percentile999": 8001.0,
            "SampleSize": 4,
            "StdDev": 2932.0567172627871,
            "Sum": 15190.0
          },
          "Rate": {
            "FifteenMinuteRate": 0.00059447212531854826,
            "FiveMinuteRate": 0.00058358073095712587,
            "MeanRate": 0.00021824579927905906,
            "OneMinuteRate": 0.00052260157288211038
          }
        }
      ]
    }


, :

  • 4 workProcess
  • 4 Wait
  • 5021

.


, , ( , ), .

- , C#, ( , Receive() ).

F# , .. , null .

.

!
Original source: habrahabr.ru (comments, light).

https://habrahabr.ru/post/334296/

:  

: [1] []
 

:
: 

: ( )

:

  URL