Pages

January 13, 2011

Rx.net ObserveOn performance

I been playing with benchmarking performance of pushing small messages via Rx where message producer and message consumer are on different threads. With one producer and possibly many consumers.

Rx has a useful operator called ObserveOn, which let's you process messages on the threadpool. However it seems to have a couple of shortcomings.

1. It does not process several messages concurrently, although it is using a thread pool, it will process messages serially. So if message post processing done by the consumer is time consuming, you may want fork and join type of semantics (spin off N tasks, where N is the size of the thread pool), and then assemble the results in the right order and forward (right order is very important for correct Rx operation).

2. Rx seems to be slow! I wrote a little benchmark that pushes small messages as fast as it can (messages are preallocated, so little memory allocator overhead in the test timing), then I implemented a synchronized queue (user ReaderWriterLockSlim and Queue) to do same produce/consume logic, and the results are dramatically different, tests we done on a 2 core, 3 Ghz machine.

With Rx I am getting around 220k msg/sec, with my hand coded queue, I am getting a wopping (compared to Rx), 4m msg/sec, that is 20x faster than Rx.

Source code for the benchmark is here (note to self, clean up):
https://gist.github.com/90983196d1a45e91d11e

January 3, 2011

Erlang Ring Benchmark Part 3

In Ring Benchmark Part 2 I presented a more compact solution to the ring benchmark. It's performance is about same as the original solution that was using pure recursion to setup the ring. Both part 1 and part 2 were waiting for a message to go around the ring completely, before injecting the next message. In this version we will inject messages 1 through M as soon as the ring is setup, this considerably improves throughput. Line 10 below is the part where we spawn off a process to inject M..0 messages into the ring.

Timings Across Part 1-3

  • Part 1: N=1000, M=1000, Run Time = 600 ms
  • Part 2: N=1000, M=1000, Run Time = 600 ms
  • Part 3: N=1000, M=1000, Run Time = 400 ms

Code


-module(ring3_2).
-include("debug.hrl").
-export([start/2]).
%% ring benchmark using lists methods
start(N, M) ->
    io:format("Starting ring benchmark on pid=~p, N = ~p, M = ~p.~n", [self(), N, M]),
    statistics(wall_clock),
    statistics(runtime),
    Last = lists:foldl(
    fun (X, Pid) -> 
        spawn(fun () -> loop(X, Pid) end) end, self(), lists:seq(N-1, 1, -1)),    
    spawn(fun () -> 
        lists:foreach(fun (X) -> Last ! X end, lists:seq(M-1, 0, -1)) end),
    wait(),
    {_, WC} = statistics(wall_clock),
    {_, RT} = statistics(runtime),
    io:format("Done running ring benchmark in Wall Clock = ~p, Runtime = ~p.~n", [WC, RT]).

wait() ->
    receive
 Xm when Xm =:= 0 -> 
     ?DEBUG("main node done waiting for x = ~p.~n", [Xm]),
     void;
 Xm -> ?DEBUG("main node done waiting for x = ~p.~n", [Xm]), wait()       
    end.

loop(Nth, Pid) ->
    receive
 X when X =:= 0 -> 
     ?DEBUG("~p dying.~n", [Nth]),
     Pid ! X;
 X -> 
     ?DEBUG("~p got message ~p.~n", [Nth, X]), 
     Pid ! X,
     loop(Nth, Pid)
    end.

Erlang Ring Benchmark Part 2

Using list comprehensions - results in more compact code.

-module(ring3).
-include("debug.hrl").
-export([start/2]).
%% ring benchmark using lists methods
start(N, M) ->
    io:format("Starting ring benchmark on pid=~p, N = ~p, M = ~p.~n", [self(), N, M]),
    statistics(wall_clock),
    statistics(runtime),
    Last = lists:foldl(fun (X, Pid) -> 
        spawn(fun () -> loop(X, Pid) end) end, self(), lists:seq(N-1, 1, -1)),    
    lists:foreach(fun (X) -> send(X, Last) end, lists:seq(M-1, 0, -1)),
    {_, WC} = statistics(wall_clock),
    {_, RT} = statistics(runtime),
    io:format("Done running ring benchmark in Wall Clock = ~p, Runtime = ~p.~n", [WC, RT]).

send(X, Pid) ->
    ?DEBUG("sending X=~p to ring.~n", [X]),
    Pid ! X,
    receive
 Xm -> ?DEBUG("main node done waiting for x = ~p.~n", [Xm])
    end.

loop(Nth, Pid) ->
    receive
 X when X =:= 0 -> 
     ?DEBUG("~p dying.~n", [Nth]),
     Pid ! X;
 X -> 
     ?DEBUG("~p got message.~n", [Nth]), 
     Pid ! X,
     loop(Nth, Pid)
    end.