Streams in Elixir are
lazy enumerables. You can create a series of transformations that aren’t
actually run until you either call Stream.run(stream)
or an Enum
function
on it. This means a stream is a powerful interface to pass to calling code since
they can not only add additional stream transformations but can easily control
when the stream is evaluated.
The most flexible way to generate a stream is using
Stream.resource
.
Its signature is (start_fun, next_fun, after_fun)
and it outputs a stream.
This signature can be confusing so let me explain. Most of the meat of
Stream.resource
is in the next_fun
so let’s start by talking about that.
next_fun
takes what the docs call “acc” (meaning accumulator) and should
return a tuple containing the next element in the stream (which should
inexplicably be a single element in a list) and the accumulator to be passed to
the next call of next_fun
. We’ll see some examples of next_fun
in a bit.
start_fun
is a function that takes no arguments and should return the
accumulator for the first call of next_fun
.
after_fun
is a function that is called once the stream is done. It is meant to
clean up any open resources (for example closing an open file that was being
used in the stream).
Let’s look at some examples. First, we can create a stream that will produce each prime number, from two to infinity:
The start_fun
just sets up an empty list as the accumulator and the
after_fun
does nothing (since we have nothing to clean up). The next_fun
is
extracted as next_prime
to take better advantage of pattern matching.
First off, if we have nothing in the accumulator, it just returns {[2], [2]}
which gives 2
as the first element and sets [2]
as the accumulator. We don’t
set [2]
as the accumulator in the start_fun
because then 2
won’t be
returned as the first prime.
If we have elements in the accumulator, we keep adding 1
to the last prime
we’ve found until we find a number that isn’t divisible by any of the previous
primes. That means it is a prime number so we return it plus add it to the
accumulator as an additional prime.
This can be used like this:
I think calling the second value returned by next_fun
the “accumulator”
is misleading, though. Let’s look at another example:
The next_fibonacci
function only returns the previous two fibonacci
numbers as the “accumulator” because those are the only two that are needed to
calculate the next one. I think it might be better to think of the “accumulator”
as the context of previously generated results, regardless of whether it
accumulates or not.
Generating an infinite number of fibonacci numbers or primes is all well and good, but how can we
actually use Stream.resource
to do something real?
Take a look at an asynchronous map module I use in a project of mine:
This module exposes an async_map
function that is meant to work like
Enum.map
but to run the function passed to map asynchronously in parallel. It
then sends each item to the stream as its command finishes so the whole list is
processed in the amount of time it takes for the slowest one to finish. I use it
to make many http requests at once.
One thing to note is that instead of returning {[next], acc}
from next_fun
returning {:halt, acc}
indicates the end of the stream.
The basic logic is to use Task.start_link
to start a long running process for each
item in the list that will send
the result back to this process. Then, it
starts a stream that waits to receive
a message from those long running
processes. Once it has received as many messages as there were items in the list
it ends the stream. Here the “accumulator” is just a count of the number of
messages that have been received so far.
I hope this description of using Stream.resource
helps demystify creating
streams in your Elixir apps.