Expiring Items from a Map in Elixir
I’ve got a side project that I’ve been using to teach myself some of the ins and outs of Elixir. So far the experience has been full of ups and downs — moments of joy and moments of frustration, but this is to be expected.
I’ve got a GenServer
that I’m using to maintain a cache of events that I receive from a subscription (I allude to this in my previous post). I’ve been playing with various list comprehension and functional syntax in order to allow clients to get meaningful information out of this server.
I discovered that I want to expire some of the items in this map on a timer. Every 40 seconds or so I want to go through the cache and remove any item in the map that has a timestamp older than 30 seconds. I’m subscribing to heartbeats and I want to purge old heartbeats so that the absence of a heartbeat can be used as a warning condition for me.
My inner battle-weary production-sore developer cringes at this idea. I’m getting events in the background, and now I want to purge some of those events, also in the background? How am I going to avoid destroying my state, overwriting it, or worse yet — getting into a deadlock. I’ve literally been awake at 2am on a Sunday morning trying to figure out why a server was in a cold dead stop because of deadlocks that happened because two background threads were waiting on each other to unlock a critical section. If you haven’t experienced this particular type of pain, count your blessings.
If I was doing this in Go I would set up a singleton struct pointer and I would send it incoming events on a channel. Every 30 seconds I would send a “brace yourself, it’s time for the purge.” message on a channel to that same thing. I have to use the same channel or I have to coordinate blocking two channels with a select
so that I will only either add an event or conduct a purge sweep, but never cross the streams. It’s not the most elegant, but it sure as hell beats trying to do it in some other languages (they know who they are).
This is when the beauty of Elixir starts to sink in a little more. I remember that my GenServer
module is actually a process. I also remember that every communication with that process is done through messages and that Elixir will take care of concurrency for me. Further, and most importantly, I remember that my server’s state is immutable. The response to every server invocation is a return tuple that includes modified state, which is then fed to the next invocation of my function within that process.
So I start off my GenServer
something like this:
defmodule Sauron.HeartbeatCache do
use GenServer
require Logger @polling_interval 35*1000
@max_allowed_hb_age 30
@separator "|" def start_link() do
GenServer.start_link(__MODULE__, %{}, [name: :heartbeats])
end ...
end
In response to a call to start_link, the server process then invokes the init
function:
def init(heartbeats) do
Logger.info "Initialized heartbeat map"
Process.send_after(self(), :expirebeats, @polling_interval)
{:ok, heartbeats}
end
The cool part here is the bolded line. I’m basically saying “send a message to myself containing the atom :expirebeats
after the period defined by polling_interval
. When the client API calls my service, it invokes the handle_cast
and handle_call
functions, but when I send myself messages (or I get them from child processes as a supervisor/monitor, etc) I get it through handle_info
. So now I can run through my list and expire what I don’t need. Remember that I’m not mutating state, I’m simply returning a new list, one that does not contain the expired items:
def handle_info(:expirebeats, state) do
beforelen = length(Map.keys(state))
now = DateTime.utc_now |> DateTime.to_unix
drop_keys =
Enum.filter(Map.keys(state),
&( (now - Map.get(state, &1, 0)) >= @max_allowed_hb_age))
state = Map.drop(state, drop_keys)
Logger.info
"Checked for expired heartbeats. From #{beforelen} to #{length(Map.keys(state))} items." Process.send_after(self(), :expirebeats, @polling_interval)
{:noreply, state}
end
There is some refactoring that I think I need to do here. The map I’m sifting through has values that are timestamps. So I’m collecting a list of keys to drop if the value of that item in the original map is ≥ 30 seconds old ( max_allowed_hb_age
). Then I invoke Map.drop
to drop the offending keys and return a new map sans the expired items.
Immediately before returning, I just fire off another Process.send_after
that will ultimately trigger the same function after the given time delay. I’m also using the logger here so I can watch in the console and see the size of the heartbeats collection go down. Ultimately if the sources of the heartbeats stops sending me messages, I’ll end up with an empty map.
To recap: I have one process subscribing to heartbeat messages on a topic. I have another process that is my GenServer
representing a heartbeat cache. The subscriber process sends new events to the heartbeat cache, and can do so concurrently with a background timer that purges expired items from that map — no locks, no waits, no critical sections.
I can’t remember the last time I had to worry so little about concurrency management during a task like this. I still feel like a total newbie when it comes to the syntax (I think my “collect and purge” code could be much more elegant), but within just a day or so of starting real-world Elixir, I’ve been able to do something that I would probably get wrong writing in Go or C# on the first try.