Hi, I'm Ataias Reis

Programmer, made in Brazil, Alma mater University of Brasilia

One Consumer, Multiple Producers in F#

I have been learning F# by reading the Expert F# 4.0 book by Don Syme, Adam Granicz, et al. I am really enjoying the concurrency chapter and decided to take on a small challenge involving agents (MailboxProcessor). I settled on the following requirements:

1 consumer agent

  • with two-way communication

  • accepts arrays of doubles and stores their sum and count internally

  • allows other to fetch the current average

4 producer agents

  • generate arrays of doubles

  • sends them to the consumer agent

  • fetches the current sum and count

  • waits for a specific interval then generates next one

Stats and Messages

Let's start by defining a Stats type that will store our sum and count:

type Stats =
 { Count: int;
   Sum: double; }

And also the message type, that our StatsAgent will accept in order to interact with the outside world:

type Msg =
 | Incr of double
 | Fetch of AsyncReplyChannel<Stats>
 | Die

I don't have any particular comments on the above. They are pretty similar to the F# WikiBook earlier mentioned in a link, except for the Fetch return type that I defined differently. By the way, this Fetch is what will allows us to have two-way communication, otherwise the consumer would be just a sink. The Die is just to close the consumer when needed.


To create a consumer agent, we just need to do let agent = MailboxProcessor.Start(...). However, I will instead instantiate a type and then hide the agent inside, doing some encapsulation. I have been reading Clean Code by Robert C. Martin and he pointed out that we should define the interface of our code for outsiders. In the current case, I have a particular use in mind for MailboxProcessor. If I just instantiate it and pass it around, others in my code may use it incorrectly. I think you get the point, well, here goes the code:

type StatsAgent() =

let agent =
 MailboxProcessor.Start(fun inbox ->
   let rec loop stats =
     async {
       let! msg = inbox.Receive()
       match msg with
       | Incr(x) ->
           return! loop
                     { Count = stats.Count + 1
                       Sum = stats.Sum + x }
       | Fetch(replyChannel) ->
           return! loop stats
       | Die -> return ()
     { Count = 0
       Sum = 0.0 })

interface IDisposable with
 member this.Dispose() = this.Die()

member _.Incr(x) = agent.Post(Incr x)
member _.Fetch() = agent.PostAndReply(Fetch)
member _.Die() = agent.Post(Die)

I wrote a simple main function to call the above code and it looked alright, showing the correct sums and counts. For the exact code, check the Add StatsAgent commit.


To make it simple to manually verify correctness, each producer will just producer numbers from 1 up to a threshold. A producer will post numbers every N milliseconds and we can have as many producers as we want. To encode that, the following types were used:

type NumberProducer =
{ TotalNumbers: int
 SleepIntervalMilliseconds: int }

type Poster =
{ Producers: NumberProducer [] }

After that, the producer was created and also auxiliary code to run several producers in parallel:

type PosterAgent(statsAgent, poster) =
let (poster: Poster) = poster
let (statsAgent: StatsAgent) = statsAgent

let produce (id, numberProducer) =
 async {
   for i in 1 .. numberProducer.TotalNumbers do
     statsAgent.Incr(double i)
   printfn "Producer %d finished; Current stats: %A" id (statsAgent.Fetch())

member _.Execute() =
 let producers =
   [ for id in 0 .. poster.Producers.Length - 1 ->
       produce (id, poster.Producers.[id]) ]
 Async.RunSynchronously(Async.Parallel producers) |> ignore

Running it all

I defined 4 producers, two generating 100 numbers, one every 50ms, and the others 200 numbers, one every 30ms:

let producers =
[| { TotalNumbers = 100
    SleepIntervalMilliseconds = 50 }
  { TotalNumbers = 100
    SleepIntervalMilliseconds = 50 }
  { TotalNumbers = 200
    SleepIntervalMilliseconds = 30 }
  { TotalNumbers = 200
    SleepIntervalMilliseconds = 30 } |]

let poster = { Producers = producers }

The output ended up being something like:

Producer 1 finished; Current stats: { Count = 524
Sum = 36506.0 }
Producer 0 finished; Current stats: { Count = 524
Sum = 36506.0 }
Producer 2 finished; Current stats: { Count = 600
Sum = 50300.0 }
Producer 3 finished; Current stats: { Count = 600
Sum = 50300.0 }
Program execution took 6624.670500ms

From the above, we can see execution indeed occurred concurrently, as synchronous execution would have taken around 22s, and our total time was almost 7s. The final count is also correct, 600 numbers.

As for the actual final summed value, we can use the formula to sum an arithmetic progression:

$$ S_n = \frac{n(a_1+a_n)}{2} $$

Using the above formulas, we get exactly 50300. For the final code, check this commit.

I appreciate any comments and suggestions.

comments powered by Disqus