Workers and Concurrency in Ballerina

The other day I was thinking about what I was going to do for the next post in my series of posts on building a gRPC service in Ballerina. While I was planning that out I decided that I wanted to take a detour and explore some of the core features of the language not specific to building services.

If I were to say that Ballerina appears to be a “good language for building services” that might be accurate, but that would be like saying a Tesla is a “good vehicle for transporting groceries”. Certainly it’s an accurate statement, but it fails to mention some of the most compelling features.

First, I want to draw a distinction between the concepts of asynchronous and concurrent, at least with regard to features in the Ballerina language. When you want to do something asynchronously in Ballerina, you’re going to start that thing and then await the response. In other words, you’ll be using futures.

For example, in Ballerina you can use syntax like this:

future<WeatherForecast> forecast = start fetchForecast(10001);
// do something else
// await the future...
WeatherForecast|error f = await forecast;

If you’ve used any language with async-await syntax, this should look quite familiar. For example, a near equivalent of start in the Go language is the go keyword. Rust is getting a similar async-await syntax in an upcoming release, and C# has had first-class asynchronous keywords for quite some time.

But that’s asynchronous programming, and in this post I want to talk about concurrency. Every language has its own way of allowing a developer to specify that they want an application to divide its execution time among multiple tasks.

In Go, you have goroutines, which are lightweight “green” threads. Elixir and Erlang use processes, and with Rust, Java, and C# you get traditional OS level threads (though there are libraries available for co-routines as well).

In Ballerina, the unit of concurrency is called the worker. What I find fascinating is that every Ballerina function contains a default worker. In other words, the concept of workers is native and deeply ingrained in the system — you can’t actually do anything in Ballerina without the work being within the context of some worker. This idea is subtle and powerful, and is a foundation that enables some pretty powerful capabilities.

To create a worker in Ballerina, you just declare a named worker block:

worker bob {
// do something

Much like the way Go uses channels as a synchronization primitive to transmit information between goroutines, Ballerina lets you send and receive messages to/from workers:

// send a tuple message to bob
worker alice {
("Ballerina", "is awesome") -> bob;
worker bob {
(string, string) fromAlice;
fromAlice <- alice;

This message passing syntax makes me happy. I loved the concept of message passing when I saw it in Smalltalk, I used it excessively when building applications with Akka, and I routinely (see what I did there??) use channels with goroutines in my Go code.

If you’ve been following along with my recent posts, then you’d be right if you thought these workers would show up as individual entities in the Ballerina diagrams that you can generate from the code (I’ll show one in a bit).

What might not be obvious is that, written this way, these workers execute sequentially. The Ballerina compiler is actually going to check and prevent you from compiling a worker-to-worker interaction that could result in a deadlock (yay for deadlock-free code!).

It’s this subtlety that originally stymied my attempt to build a sample to demonstrate concurrency and workers. I wanted to create one worker that read lines of text from the user and another worker that manipulated that text and then echoed it back out to the user via stdout. To get these two workers to operate concurrently I used a fork block and wrote code that looked something like this:

fork {
worker stdin_read {
while(true) {
io:println("Type something:");
string s = io:readln(());
s -> stdout_write;
worker stdout_write {
string msg;
while (true) {
msg <- stdin_read;
io:println("[ECHO] " + msg);
} join (all) (map results) { }

You can try and compile this code, but it won’t work. It doesn’t (in the current version) complain about a potential deadlock, it mentions something about worker send operations needing be top-level operations (I’m assuming error messages will get better as the compiler is improved).

This code looks pretty harmless, but anybody with much experience (or suffering, as the case may be) with multi-threaded development can see how if anything goes wrong in either of these workers, the entire system will grind to a halt in a deadlock.

The above code represents my lack of understanding of some of the primitives available in the Ballerina language as I tried to apply my experience from other languages to this new syntax. After getting some help from the community, I found a solution.

What I needed is an intermediary, something that would act as a third gear in the mesh to prevent the deadlocking: enter the Ballerina stream. I don’t have enough space in this blog post to go into an enormous amount of detail, but Ballerina streams are super powerful. In addition to providing an in-memory, thread-safe publish/subscribe mechanism, they also come complete with their own CEP (Complex Event Processing)-style query language, including the ability to do time windowing!

What I decided to do was refactor my code so that one worker reads from the user and then publishes that string on a stream. I then have a couple other workers that are subscribed to the same stream. Not only did this unlock the gears and prevent a deadlock and make the compiler happy, but I don’t have to use a while loop in the consumer worker.

In this code sample I’m also using Ballerina’s lambda syntax to provide an inline anonymous function that subscribes to messages in the stream. Streams can be free form or they can be type-constrained like the one I’m using. Seeing syntax like stream<T> variable; makes my inner event-sourcing nerd giggle with glee.

Another thing that differentiates Ballerina streams from channels in Rust (and other languages) is that the stock Rust channels are mpsc (Multiple Producer Single Consumer). As you can see in this code, streams can have multiple publishers and multiple consumers.

So let’s run this and see what happens:

$ ballerina run workers.bal 
Type something:
testy mctesto
Type something:
[ECHO 2] testy mctesto
[ECHO 1] testy mctesto
Type something:
[ECHO 1] alice
[ECHO 2] alice
Type something:
[ECHO 1] bob
[ECHO 2] bob

If you look at the output from running this application, you can see the concurrency in action as the output is interleaved randomly. And of course, no Ballerina blog post wouldn’t be complete without an auto-generated diagram (note the default worker in addition to the three I explicitly created):

Concurrency in Ballerina with Workers and a Stream

I am looking forward to getting a chance to exercise these features more, but from what I’ve seen thus far, concurrency in Ballerina looks pretty powerful and with very simple syntax.

In relentless pursuit of elegant simplicity. Tinkerer, writer of tech, fantasy, and sci-fi. Converting napkin drawings into code for @CapitalOne