Introduction

Streams are composable, lazy enumerables. Any enumerable that generates elements one by one during enumeration is called a stream.

Streams are great for composing functions without actually materializing the computation. As stated by the official docs a stream can be used in place of standard enumerables and are especially usefull for large or infinite collections.

Computations that perform item by item is probably the most common operation, but sometimes you want to do aggregations, keep an accumulator to track state or even emit the accumulator after aggregating items from the stream. This article will explore some options.

The problem

Recently I encountered a problem where items were pulled from a RocksDB iterator, into a stream and the resulting stream was to be processed further. For those not familiar with RocksDB, it’s a embeddable key-value LSM based store. Without going to deep into specifics (as it’s outside of the scope of the article) in RocksDB keys are stored in a sorted sequence, iterating through a certain key space can be done by using prefix iteration. In this application keys are encoded roughly as: <key>:<revision> and given a prefix we want to find the highest revision for each key within a prefix.

Given a stream of key, value tuples {binary(), binary()} provide functions for emitting these tuples based on a provided uniqness check of a key. More specifically each “base” key can appear multiple times, but in different revisions. We are (almost) always looking for the latest revision of a given key. The RocksDB iterator start at a given revision and is either processed in ascending or descending order. i.e forward or reverse iteration over the keys.

For simplicity we’ll use lists of key-value tuples to test our implementation.

The following key module provides functions used to encode/decode keys:

defmodule Key do
  @delimiter ":"

  @spec encode(binary(), non_neg_integer()) :: binary()
  def encode(key, rev), do: key <> @delimiter <> <<rev::integer-size(32)>>

  @spec decode(binary()) :: binary()
  def decode(key) do
    key_end = byte_size(key) - (4 + byte_size(@delimiter))

    :binary.part(key, {0, key_end})
  end
end

For instance encoding the key /key with revision 1:

iex(1)> Key.encode("/key", 1)
<<47, 107, 101, 121, 46, 0, 0, 0, 1>>

Any keys with the same prefix will end up “grouped” together i.e:

"/namespace/key:3"
"/namespace/key:2"
"/namespace/key:1"
"/namespace/another_key:2"
"/namespace/another_key:1"
"/another_namespace/key:2"
"/another_namespace/key:1"

Iterating through the prefix "/namespace" should yield the following two keys:

"/namespace/key:3"
"/namespace/another_key:2"

Desc. with Stream.transform/3

If the iterator is processing in reverse, i.e we start at the largest key within a prefix, we know that the first encounter of a key is the one that should be emitted, and thus we can use Stream.transform/3 with a map as accumulator, skipping over keys we already encountered:

defmodule Iterator do
  @type order :: :asc | :desc

  @spec uniq(Enumerable.t(), order()) :: Enumerable.t()
  def uniq(enum, :desc) do
    Stream.transform(enum, %{}, fn {k, _v} = kv, acc ->
      key = Key.decode(k)

      if Map.has_key?(acc, key) do
        {[], acc}
      else
        {[kv], Map.put(acc, key, true)}
      end
    end)
  end
end

For each item the key is decoded and checked against the accumulator. If the key has already been emitted, skip it, otherwise emit the key-value pair and add an entry to the accumulator that it has been processed.

We can write a simple test to verify:

defmodule IteratorTest do
  use ExUnit.Case

  describe "uniq/2" do
    test ":desc emits unique keys with the highest rev" do
      kvs = [
        {Key.encode("/key", 2), "2"},
        {Key.encode("/key", 1), "1"},
        {Key.encode("/other_key", 3), "3"},
        {Key.encode("/other_key", 2), "2"},
        {Key.encode("/other_key", 1), "1"},
      ]

      result =
        kvs
        |> Iterator.uniq(:desc)
        |> Enum.to_list()

      assert length(result) == 2
      assert result == [Enum.at(kvs, 0), Enum.at(kvs, 2)]
    end
  end
end

Asc. with Stream.transform/3

For processing in a forward direction we must keep track on the current key we are processing, assuming keys are sorted we will iterate over one “base” key at the time. Therefor we know that when we reach a new key, the last entry of previous key should be emitted and we save the latest key value until we reach the next key and so on.

There’s a problem with this, what happens with the last key? To emit the last pair we must know the stream is done, to emit whatever we have left as our accumulator. This can be achieved by using a token.

We can represent the accumulator as a tuple containing the current key and the latest key-value pair, and to know when the stream is done we’ll append a :halt token to end of the stream using Stream.concat/2:

defmodule Iterator do
  @type order :: :asc | :desc

  @spec uniq(Enumerable.t(), order()) :: Enumerable.t()
  ...
  def uniq(stream, :asc) do
    stream
    |> Stream.concat([:halt])
    |> Stream.transform({nil, nil}, &transform/2)
  end

  # Empty stream
  defp transform(:halt, {nil, nil} = acc) do
    {[], acc}
  end

  defp transform({k, _v} = kv, {nil, nil}) do
    {[], {Key.decode(k), kv}}
  end

  defp transform(:halt, {_key, acc_kv} = acc) do
    {[acc_kv], acc}
  end

  defp transform({k, _v} = kv, {current_key, acc}) do
    key = Key.decode(k)

    if key == current_key do
      {[], {key, kv}}
    else
      {[acc], {key, kv}}
    end
  end
end
  1. Add the token to end of the stream.
  2. For handling empty enumerable.
  3. Start condition. When we don’t have any keys yet just add the first one and continue.
  4. Once we encounter the :halt token we emit the accumulator kv-pair which we have stored before the stream ends.
  5. Decode the key, check if it’s the same as the current one, if so save the kv-pair. Otherwise emit the accumulator and keep the new key and kv-pair

and the test:


defmodule IteratorTest do
  use ExUnit.Case

  describe "uniq/2" do
    test ":asc emits unique keys with the highest rev" do
      kvs = [
        {Key.encode("/key", 1), "1"},
        {Key.encode("/key", 2), "2"},
        {Key.encode("/other_key", 1), "3"},
        {Key.encode("/other_key", 2), "2"},
        {Key.encode("/other_key", 3), "1"},
      ]

      result =
        kvs
        |> Iterator.uniq(:asc)
        |> Enum.to_list()

      assert length(result) == 2
      assert result == [Enum.at(kvs, 1), Enum.at(kvs, 4)]
    end
  end
end

Arbitrary order using a token

In the previous examples it was assumed that the stream we are processing comes in sorted order but for prosperity we’ll do a processing of an unsorted stream as well.

Processing the stream in this was is similar to processing it in a forward direction i.e we begin at some given revision move forward in the stream, however we must process the entire stream in order to determine if we’ve processed all occurrences of a key or not. We also need to track the highest revision encountered in order to compare.

defmodule Key do
  @delimiter ":"

  @spec encode(binary(), non_neg_integer()) :: binary()
  def encode(key, rev), do: key <> @delimiter <> <<rev::integer-size(32)>>

  @spec decode(binary()) :: binary()
  def decode(key) do
    key_end = byte_size(key) - (4 + byte_size(@delimiter))

    :binary.part(key, {0, key_end})
  end

  @spec decode_split(binary()) :: {binary(), non_neg_integer()}
  def decode_split(key) do
    [k, <<rev::integer-size(32)>>] = :binary.split(key, @delimiter)
    {k, rev}
  end

  @spec decode_rev(binary()) :: non_neg_integer()
  def decode_rev(key) do
    <<rev::integer-size(32)>> = :binary.part(key, {byte_size(key), -4})
    rev
  end
end

We add some convenience functions to allow us to split the key into a {key, rev} tuple

defmodule Iterator do
  @type order :: :asc | :desc | :unsorted

  ...
  @spec uniq(Enumerable.t(), order()) :: Enumerable.t()
  def uniq(stream, :unsorted) do
    stream
    |> Stream.concat([:halt])
    |> Stream.transform(%{}, &transform_unsorted/2)
  end

  defp transform_unsorted({k, _v} = kv, acc) do
    {key, rev} = Key.decode_split(k)

    {_value, acc} =
      Map.get_and_update(acc, key, fn
        {current_rev, _kv} = value when current_rev >= rev -> {value, value}
        value -> {value, {rev, kv}}
      end)

    {[], acc}
  end

  defp transform_unsorted(:halt, acc) do
    kvs =
      acc
      |> Map.values()
      |> Enum.map(fn {_rev, kv} -> kv end)
    {kvs, acc}
  end
end
  1. Split the key into a {key, rev} tuple.
  2. Map.get_and_update/3 allows us to get and update the value in one pass. If the revision in the map is more recent then the revision we’re processing currently we skip it. Otherwise we set the kv-pair for the key, which also handles the case where the key is not present in the map.
  3. When the :halt token is received the map values are materialized and emitted.

NOTE Be careful when doing operations like this, as for an infinite stream the accumulator grow to an infinite size.

And the test to verify:

defmodule IteratorTest do
  use ExUnit.Case

  describe "uniq/2" do
    test ":unsorted emits unique keys with the highest rev" do
      kvs = [
        {Key.encode("/key", 1), "1"},
        {Key.encode("/other_key", 1), "1"},
        {Key.encode("/other_key", 3), "3"},
        {Key.encode("/key", 2), "2"},
        {Key.encode("/other_key", 2), "2"},
      ]

      result =
        kvs
        |> Iterator.uniq(:unsorted)
        |> Enum.to_list()

      assert length(result) == 2
      assert result == [Enum.at(kvs, 3), Enum.at(kvs, 2)]
    end
  end
end

Wrapping up

Stream.transform/3 is a good candidate when requiring keeping some state throughout a streamed computation. Similarly Stream.transform/4 or Stream.resource/4 can be used if one requires some kind of setup / teardown of resources.

We covered:

  • Processing the stream plainly with an accumulator to track already processed items.
  • Using a token together with an accumulator to track end-of-stream, emitting items continuously.
  • Using a token with an accumulator to track end-of-stream and emitting the accumulator at the end.