Concurrent Programming in fsharp Using Hopac - Part 5


In the last blog post, we learned how Alt in Hopac works and its applications. In this blog post, we are going to apply what we learned so far by creating a Ticker.

Using ticker we can do something repeatedly at regular intervals.

To implement a Ticker in Hopac, we have to know one more abstraction in Hopac called IVar. So, in the first section, we are going to learn IVar and then we'll use it to implement Ticker.

Let's dive in

The IVar type

In a nutshell, IVar represents a write-once variable.

Write once variables are designed for and most commonly used for getting replies from concurrent servers and asynchronous operations, but can also be useful for other purposes such as for one-shot events and for implementing incremental, but immutable, concurrent data structures.

Because it is common to need to be able to communicate either an expected successful result or an exceptional failure in typical use cases of write once variables, direct mechanisms are provided for both. The implementation is optimized in such a way that the ability to report an exceptional failure does not add overhead to the expected successful usage scenarios.

Write once variables are lightweight objects and it is typical to always just create a new write once variable when one is needed. In most cases, a write once variable will be slightly more lightweight than a channel. This is possible because write once variables do not support simple rendezvous like channels do. When simple rendezvous is necessary, a channel should be used instead. - Hopac Documentation

The IVar<'a> type is a subclass of Promise<'a> which in turn a subclass of Alt<'a>.

Let's see IVar in action to learn it better

open Hopac 

// IVar<'a> -> 'a -> unit
let fillAndRead iVar value =
  IVar.fill iVar value // Job<unit> 1
  |> Job.bind (fun _ -> iVar) // Job<unit> 2
  |> (printfn "%A") // Job<unit>
  |> run // unit

As the name indicates, the fillAndRead is a function fills the given write-once variable iVar with the provided value, and then reads the value from the iVar. Finally the prints the value.

1 The IVar.fill function creates a job that writes the given value to the given write-once variable.

val fill: IVar<'x> -> 'x -> Job<unit>

2 The function creates an alternative that becomes available when the write-once variable had a value.

val read: IVar<'x> -> Alt<'x>

Executing the fillAndRead function as below

fillAndRead (IVar<bool>()) true
fillAndRead (IVar<bool>()) false

will give the following output

> fillAndRead (IVar<bool>()) true
- fillAndRead (IVar<bool>()) false
- ;;
val it : unit = ()

The IVar.fill function would return an exception if the IVar variable were already filled.

We can verify this by defining a new IVar value intIVar and using this to call the fillAndRead twice

let intIVar = IVar<int>()
fillAndRead intIVar 42

Executing the above two lines will yield the below output

> let intIVar = IVar<int>()
- fillAndRead intIVar 42
- ;;
val intIVar : IVar<int>
val it : unit = ()

And then calling fillAndRead function with intIVar will have the following output

> fillAndRead intIVar 10
- ;;
Unhandled exception: System.Exception: IVar full
No other causes.
val it : unit = ()

If we don't want any exception to be thrown while filling a IVar value, we can make use of IVar.tryFill function, which creates a job that tries to write the given value to the given write-once variable. No operation takes place, and no error is reported in case of the write-once variable has already been written.

val tryFill: IVar<'x> -> 'x -> Job<unit>

Let's write a another function tryFillAndRead which exactly does what fillAndRead function did except the usage of IVar.fill function.

let tryFillAndRead iVar value =
  IVar.tryFill iVar value
  |> Job.bind (fun _ -> iVar)
  |> (printfn "%A")
  |> run

With this, if we try the above example with the tryFillAndRead function

let anotherIntIVar = IVar<int>()
tryFillAndRead anotherIntIVar 42
tryFillAndRead anotherIntIVar 10

We won't get any error and the value that we use to fill for the second time will be silently ignored.

> let anotherIntIVar = IVar<int>()
- tryFillAndRead anotherIntIVar 42
- tryFillAndRead anotherIntIVar 10
- ;;
val anotherIntIVar : IVar<int>
val it : unit = ()

The last thing that we need to explore before getting into the implementation of Ticker is, the return type of function is Alt<'a>.

So, we can use it to choose use between any other Alt.

Say, for example, if we want a IVar value to be available in a certain amount of time and do something else if it didn't become available, we can achieve it like this.

open Hopac.Infixes

// int -> IVar<'a> -> unit
let readOrTimeout delayInMillis iVar =

  let timeOutAlt = // Alt<unit>
    timeOutMillis delayInMillis 
    |> Alt.afterFun (fun _ -> printfn "time out!") 1
  let readAlt = // Alt<unit> iVar
    |> Alt.afterFun (printfn "%A") 2
  timeOutAlt <|> readAlt 3
  |> run

1 Alt<unit> that prints a time-out after given time delay

2 Prints the value of iVar as soon as it is available

3 Chooses between timeOutAlt and readAlt and commits to whatever completes first

If we fill a iVar value and run this function

let yetAnotherIntIVar = IVar<int>()
tryFillAndRead yetAnotherIntIVar 10
readOrTimeout 1000 intIVar

We can see the value of IVar in the output.

> let yetAnotherIntIVar = IVar<int>()
- tryFillAndRead yetAnotherIntIVar 10
- readOrTimeout 1000 intIVar
- ;;
val yetAnotherIntIVar : IVar<int>
val it : unit = ()

If we try the readOrTimeout function, with a new IVar value, we will see the time-out message

#time "on"
readOrTimeout 2000 (IVar<unit>())
#time "off"
--> Timing now on
time out!
Real: 00:00:02.001, CPU: 00:00:00.004, GC gen0: 0, gen1: 0
val it : unit = ()
--> Timing now off

With this, we are wrapping up our exploration on IVar.

The Ticker Type

The Ticker type that we are going to implement will have the following signature.

type Ticker =
    new : timeSpan:TimeSpan -> Ticker 1
    member Stop : unit -> unit 2
    member C : Ch<DateTimeOffset> 3

1 The Ticker type is a class with a constructor that takes a TimeSpan to specify the interval.

2 It is going to have a Stop function which stops the ticker.

3 The member C exposes a channel which gives a DateTimeOffset value at regular intervals specified by the TimeSpan.

We are going to hide the complexity of a ticker behind this Ticker type.

open Hopac
open Hopac.Infixes
open System

type Ticker (timeSpan : TimeSpan) =
  // Ch<DateTimeOffset>
  let tickCh = Ch<DateTimeOffset>() 1
  // IVar<unit>
  let cancelled = IVar() 2

  // unit -> Alt<unit>
  let tick () = 3
    Ch.give tickCh DateTimeOffset.Now

  // unit -> Alt<unit>
  let rec loop () = 4
    // Alt<unit>
    let tickerLoop = 
      timeOut timeSpan
      |> Alt.afterJob tick
      |> Alt.afterJob loop
    tickerLoop <|> cancelled

  do start (loop()) 5

  // unit -> unit 
  member __.Stop() = 
    IVar.tryFill cancelled () |> start 6

  // Ch<DateTimeOffset>
  member __.C 
    with get() = tickCh 7

1 A Ticker channel tickCh initialised as private value.

2 A IVar private value cancelled to keep track of the Ticker's state

3 The tick function defines what to do on a tick. We are getting the current date time offset and give it to the outside world via the tickCh.

4 The recursive function loop is the crux of our Ticker implementation that defines a loop function that will keep calling the tick function at the specified interval timeSpan until a value available on cancelled IVar.

The timeOut function is similar to timeOutMillis function except it takes a TimeSpan instead of milliseconds as an int.

5 We are kickstarting the loop function here. It will be invoked during the execution of the constructor.

6 The Stop method tries to fill the IVar value cancelled, and it starts at a new job.

7 Finally, we are exposing the tickCh as a getter member C.

The Ticker type in action

Use Case #1

Our first use case is, executing a function repeatedly at an interval of x seconds for n times.

This use case can be implemented as

// int -> int -> (DateTimeOffset -> unit) -> unit
let useCase1 nTimes (xSeconds : int) f =
  let timeSpan = 
    xSeconds |> float |> TimeSpan.FromSeconds 
  let ticker = new Ticker(timeSpan)

  ticker.C // Ch<DateTimeOffset>
  |> Alt.afterFun f // Alt<unit> 1
  |> Job.forN nTimes // Job<unit> 2
  |> run

1 Upon a tick on the Ticker channel, we are calling the provided function f.

2 The Job.forN function creates a job that runs the given job sequentially the given number of times.

val forN: int -> Job<unit> -> Job<unit>

Here the given job is receiving value on the ticker channel and invoking the function f with the received value.

Let's try this out in fsharp interactive to print the Ticker's tick time.

> useCase1 5 2 (printfn "%A");;
21-03-2018 19:24:48 +05:30
21-03-2018 19:24:50 +05:30
21-03-2018 19:24:52 +05:30
21-03-2018 19:24:54 +05:30
21-03-2018 19:24:56 +05:30
val it : unit = ()

Use Case #2

The second use case is running two tickers at the different time interval for x milliseconds overall.

As a first step, let's create a ticker function that creates a Ticker which ticks at given seconds

// int -> Ticker
let ticker seconds =
  |> float 
  |> TimeSpan.FromSeconds
  |> Ticker

Then implement use case #2 as below

// int -> int -> int -> unit
let useCase2 t1Interval t2Interval xMillis =
  let ticker1 = ticker t1Interval
  let ticker2 = ticker t2Interval

  let onTick tCh name loop = 1
      |> Alt.afterFun (fun t -> printfn "[%s] at %A" name t)
      |> Alt.afterJob loop
  // unit -> Alt<'a>
  let rec loop () = 2
    onTick ticker1.C "T1" loop <|> onTick ticker2.C "T2" loop

  printfn "Starts at %A" (DateTimeOffset.Now)
  start (loop()) 3

  let onTimeOut _ = 4

  timeOutMillis xMillis 5
  |> Alt.afterFun onTimeOut
  |> run
  printfn "Ends at %A" (DateTimeOffset.Now)

1 The onTick function takes a channel, the ticker's name and a loop function. Upon receiving the DateTimeOffset value on the ticker, it prints it along with the ticker's name and then calls the provide loop function

2 The recursive function loop chooses between two tickers channel and commits to the one that has a value and then calling itself via the onTick function.

3 We are starting the Job returned by the loop function as a separate job.

4 The onTimeOut function stops the tickers upon timeout.

5 Finally, we wait for the given x milliseconds and call the onTimeOut function after the timeout.

Executing this F# interactive will give us the similar output

> useCase2 1 2 5000;;
Starts at 21-03-2018 19:46:36 +05:30
[T1] at 21-03-2018 19:46:37 +05:30
[T2] at 21-03-2018 19:46:38 +05:30
[T1] at 21-03-2018 19:46:38 +05:30
[T1] at 21-03-2018 19:46:39 +05:30
[T2] at 21-03-2018 19:46:40 +05:30
[T1] at 21-03-2018 19:46:40 +05:30
[T1] at 21-03-2018 19:46:41 +05:30
Ends at 21-03-2018 19:46:41 +05:30
val it : unit = ()


In this blog post, we built a Ticker abstraction using the fundamental ideas provided by Hopac. And then using the ticker, we saw how to implement two different use cases.

Just like composing functions together here we are composing the jobs and implementing complicated stuff with less code.

You can find the source code associated with this blog post on GitHub

If you like my content, you can extend your support by buying me a coffee. Thanks!
Buy Me A Coffee


comments powered by Disqus