I have been learning F# by reading the Expert F# 4.0 book by Don Syme, Adam Granicz, et al. I am really enjoying the concurrency chapter and decided to take on a small challenge involving agents (MailboxProcessor). I settled on the following requirements:
1 consumer agent
- with two-way communication
- accepts arrays of doubles and stores their sum and count internally
- allows other to fetch the current average
4 producer agents
- generate arrays of doubles
- sends them to the consumer agent
- fetches the current sum and count
- waits for a specific interval then generates next one
Stats and Messages
Let's start by defining a Stats
type that will store our sum and count:
type Stats =
{ Count: int;
Sum: double; }
And also the message type, that our StatsAgent
will accept in order to
interact with the outside world:
type Msg =
| Incr of double
| Fetch of AsyncReplyChannel<Stats>
| Die
I don't have any particular comments on the above. They are pretty similar to the F# WikiBook earlier mentioned in a link, except for the Fetch return type that I defined differently. By the way, this Fetch is what will allows us to have two-way communication, otherwise the consumer would be just a sink. The Die is just to close the consumer when needed.
Consumer
To create a consumer agent, we just need to do let agent =
MailboxProcessor.Start(...)
. However, I will instead instantiate a type and
then hide the agent inside, doing some encapsulation. I have been reading
Clean Code by Robert C. Martin and he pointed out that we should define the
interface of our code for outsiders. In the current case, I have a
particular use in mind for MailboxProcessor
. If I just instantiate it and
pass it around, others in my code may use it incorrectly. I think you get
the point, well, here goes the code:
type StatsAgent() =
let agent =
MailboxProcessor.Start(fun inbox ->
let rec loop stats =
async {
let! msg = inbox.Receive()
match msg with
| Incr(x) ->
return! loop
{ Count = stats.Count + 1
Sum = stats.Sum + x }
| Fetch(replyChannel) ->
replyChannel.Reply(stats)
return! loop stats
| Die -> return ()
}
loop
{ Count = 0
Sum = 0.0 })
interface IDisposable with
member this.Dispose() = this.Die()
member _.Incr(x) = agent.Post(Incr x)
member _.Fetch() = agent.PostAndReply(Fetch)
member _.Die() = agent.Post(Die)
I wrote a simple main function to call the above code and it looked alright, showing the correct sums and counts. For the exact code, check the Add StatsAgent commit.
Producers
To make it simple to manually verify correctness, each producer will just producer numbers from 1 up to a threshold. A producer will post numbers every N milliseconds and we can have as many producers as we want. To encode that, the following types were used:
type NumberProducer =
{ TotalNumbers: int
SleepIntervalMilliseconds: int }
type Poster =
{ Producers: NumberProducer [] }
After that, the producer was created and also auxiliary code to run several producers in parallel:
type PosterAgent(statsAgent, poster) =
let (poster: Poster) = poster
let (statsAgent: StatsAgent) = statsAgent
let produce (id, numberProducer) =
async {
for i in 1 .. numberProducer.TotalNumbers do
statsAgent.Incr(double i)
Thread.Sleep(numberProducer.SleepIntervalMilliseconds)
printfn "Producer %d finished; Current stats: %A" id (statsAgent.Fetch())
}
member _.Execute() =
let producers =
[ for id in 0 .. poster.Producers.Length - 1 ->
produce (id, poster.Producers.[id]) ]
Async.RunSynchronously(Async.Parallel producers) |> ignore
Running it all
I defined 4 producers, two generating 100 numbers, one every 50ms, and the others 200 numbers, one every 30ms:
let producers =
[| { TotalNumbers = 100
SleepIntervalMilliseconds = 50 }
{ TotalNumbers = 100
SleepIntervalMilliseconds = 50 }
{ TotalNumbers = 200
SleepIntervalMilliseconds = 30 }
{ TotalNumbers = 200
SleepIntervalMilliseconds = 30 } |]
let poster = { Producers = producers }
The output ended up being something like:
Producer 1 finished; Current stats: { Count = 524
Sum = 36506.0 }
Producer 0 finished; Current stats: { Count = 524
Sum = 36506.0 }
Producer 2 finished; Current stats: { Count = 600
Sum = 50300.0 }
Producer 3 finished; Current stats: { Count = 600
Sum = 50300.0 }
Program execution took 6624.670500ms
From the above, we can see execution indeed occurred concurrently, as synchronous execution would have taken around 22s, and our total time was almost 7s. The final count is also correct, 600 numbers.
As for the actual final summed value, we can use the formula to sum an arithmetic progression:
$$ S_n = \frac{n(a_1+a_n)}{2} $$
Using the above formulas, we get exactly 50300. For the final code, check this commit.
I appreciate any comments and suggestions.