Playing with Elixir streams in IEx
- elixir
- erlang
- stream
Today I wanted to play with Elixir Stream to mimic the ideas of Observable collections from the Rx (Reactive Extension) concepts. The best playground is the IEx, the interactive shell for Elixir. However, in order to achieve the results that I wanted there was a lot of details that you need to know before having fun with it.
I'm glad that I had some help from @rcillo a co-worker of mine that have been studying Elixir for quite some time. We paired to achieve the results that we wanted. It took just 10~15min to get the desired result.
So what are we going to see in this posts:
- running multiples shells
- naming process
- IO device
So, let's start!
Elixir version used 0.15.1
The Elixir stream code snippet
The concept that I wanted to try is use the GenEvent stream support and the for syntax. This is the snippet:
{:ok, manager} = GenEvent.start_link
stream = GenEvent.stream(manager)
for x <- stream, do: IO.puts(x)
manager.notify("Hello World")
Pretty simple. I'm not going to dig each line of the snippet, the basic idea is:
given a stream of events, for each event published I want to print it on the screen
Side note
(You can advance to the next section if you want)
I'm passionate for new programming languages learning new ones is a way to always keep thinking different. The for notation in Elixir is available in other languages. (Each have different details in implementation, but all proposes the same feature)
Haskell have the do-notation and list-comprehension
main = do
x <- doSomeComputation
y <- doSomeComputation
return (x, y)
-- list comprehension
[x | x <- [1...5]]In Scala we would have:
for {
x <- doSomeComputation
y <- doSomeComputation
} yield (x, y)And C# LINQ also supports it
from x in doSomeComputation
from y in doSomeComputation
select new { x, y }So, go check this stuff if you never saw them.
Running on IEx
So, to run the IEx you just do
> iexThen you can start writing Elixir code on it. So we the code that we want to test, but it didn't worked as expected.
iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.172.0>}
iex> stream = GenEvent.stream(manager)
%GenEvent.Stream{duration: :infinity, id: nil, manager: :eventor, mode: :ack, timeout: :infinity}
iex> for x <- stream, do: IO.puts(stream)
BOOM! console freezesWell, once you write the for code, the console is blocked because the stream is infinity. The <- notation can be considered as a generator. You can read it as "for each x generated by stream, do...". We could stop it by stopping the GenEvent process. But, how to do that if the console is blocked?
Spawning local shells
I didn't know about it, but @rcillo showed me some tricks. You can type the C-g (control-g) and you will get a prompt where you can type some commands.
User switch command
--> h
c [nn] - connect to job
i [nn] - interrupt job
k [nn] - kill job
j - list all jobs
s [shell] - start local shell
r [node [shell]] - start remote shell
q - quit erlang
? | h - this messageType h to get some helps. If you type j you will get all shells running with its job number.
--> j
1 {erlang,apply,[#Fun<Elixir.IEx.CLI.0.105530432>,[]]}
2 {'Elixir.IEx',start,[]}
3* {'Elixir.IEx',start,[]}With this you are able to switch between shells by using c <job-number>.
In order to spawn a new shell with IEx, you do:
--> s 'Elixir.IEx'Note that it is with single quotes (double quotes will fail). In case the shell is seems not to be responsive, just hit ENTER and you it will show the iex> prompt.
So, now that you have two shells, you just go and try:
iex> GenEvent.notify(manager, "hello world")
** (RuntimeError) undefined function: manager/0Well, it didn't work. How can we fix this? We need to register the process with a name.
Registering processes
Erlang provides a register/2 method that allows you bind a PID process to a name, this way we will have access to it.
iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.185.0>}
iex> :erlang.register(:eventor, manager)
true
iex> :erlang.whereis(:eventor)
#PID<0.185.0>Ok, now we are able to just GenEvent.notify(:eventor, "Hello World") from both shells.
But, you just send notify messages but nothing prints in the console. You might think that it will the test the shell that is listening for events, but it don't. What is the problem? In order to print the texts in console there is the concept of devices in Erlang (that I will not discuss it here, because I don't know anything about it). But, let's think that it is similar to the STDOUT. The problem is that the shell with for snippet is blocked, and the IO.puts there can't flush its content, and your second shell have a different "device".
We need to specify which device do we want the IO.puts to print out.
Specifying the device where IO.puts prints out
If you check the IO.puts doc you will see the following doc:
puts(device \\ :erlang.group_leader(), item)So, we can specify the first argument the device that we want to output.
Each shell, has its own :erlang.group_leader(), what we will do is register the group_leader that we want the text to output.
iex> :erlang.register(:globalio, :erlang.group_leader())
true
iex> IO.puts(:globalio, "Hello World")
Hello WorldSo what we have to do is just rewrite the for code with the following code:
iex> for x <- stream, do: IO.puts(:globalio, x)Now, every notify event that you send will be printed out on the console! Yay =)
Summary
We saw a lot of things, so let's reproduce the steps in order. Considering the iex(n)> to identify each shell.
iex(1)> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.185.0>}
iex(1)> :erlang.register(:eventor, manager)
true
Type C-g
s 'Elixir.IEx'
iex(2)> :erlang.register(:globalio, :erlang.group_leader())
true
Type C-g
--> c 1
iex(1)> stream = GenEvent.stream(manager)
%GenEvent.Stream{duration: :infinity, id: nil, manager: :eventor, mode: :ack, timeout: :infinity}
iex(1)> for x <- stream, do: IO.puts(:globalio, x)
Type C-g
--> c 2
iex(2)> GenEvent.notify(:eventor, "Hello World")
Hello World
:okOk, now you can keep exploring the Stream API in Elixir. Have fun!