Concurrent programming has never been more important than it is today. Multi-core computers are commonplace - people carry them around in their pockets! As the number of processors on board goes up, people expect their software to run faster. Often this is not the case. Software designed for sequential computers will not be able to take advantage of increased number of processor cores. Software written using multithreading libraries may not give the best performance because spawning new system threads and context switching between them are costly operations. Threads often communicate by sharing memory, which can lead to data races if access to those locations are not carefully controlled and coordinated. Sharing memory can also add to the execution time because the multi-level caches of the different cores has to be kept in sync.

It is obvious that for the new generation of multi-core hardware architectures, new approaches to software design is needed. We need languages that cleanly support modeling concurrency. But before we talk about how to actually write concurrent programs, we need to precisely define what a concurrent program really is. We also need to understand the benefits of making programs concurrent.

Concurrency vs Parallelism

In normal conversations, concurrency and parallelism are often used to mean the same thing - i.e. operating or executing at the same time. But in programming, a more precise definition is needed for these terms. Concurrency is a way to structure a program around several autonomous activities. A single-core computer can run a concurrent program by time-sharing between the different activities or tasks. Thus on a single-core computer, a concurrent program will create an illusion that various activities are happening in parallel.

On a computer with multiple cores, the program gets an opportunity to turn this illusion into a reality by actually scheduling some of the tasks to run on a real hardware core. As more cores are added to the computer, more tasks can be run in parallel and the user will see performance improvements without making any changes to the program itself. The runtime of a concurrent programming language should be able to automatically distribute the tasks across available processors. It should also make it cheap and easy to spawn and manage new concurrent tasks.

Benefits of Concurrency

The most obvious benefit of concurrency is improved performance. If you are only familiar with programming with multiple threads, you may not be too excited because you know that programming with threads is inherently difficult. You have to manage pools of threads, so that you do not run out of this system resource. You should be careful to avoid data races, dead locks and so on. Programming for concurrency need not be this difficult. Later in this post, we will see if you choose the right language, you can easily program with thousands or even millions of concurrent tasks, without losing your sanity.

A language designed for concurrency will also enable you to write fault tolerant systems. You can contain the errors in a process or task within itself. Errors in one task cannot accidentally crash another task. You can build “supervision trees” for restarting tasks that has failed or for gracefully shutting down tasks that are misbehaving.

A language that has explicit language constructs for writing concurrent programs will also enable you to write clean and easily understood code. An the concurrent constructs are built-into the language as primitives, a program written in that language will behave the same way on all operating systems.

Communicating Sequential Processes

The rest of this blog post will talk about one high-level paradigm for writing concurrent programs, i.e Communicating Sequential Processes or CSP.

CSP is a model of concurrency where computation takes place by passing values between tasks. The Go programming language enables this style of concurrency. In Go, each concurrent task is known as a goroutine and channels are the connections between them. Channels are typed, i.e, a channel acts as a conduit for values of a particular type.

Let’s try to understand programming with goroutines with the help of a simple example. The program we have here shows a goroutine waiting for a message to arrive on a channel. Once it gets the message, it echos that to another channel.

package main

import (
    "fmt"
    "strings"
)

func main() {
        // A string channel for sending messages.
    msgChan := make(chan string)

    // A channel to receive back echoed responses.
    echoChan := make(chan string)

    // The `go` statement will concurrently execute the `echo` function in a new goroutine.
    go echo(msgChan, echoChan)

    // Send a string on the message channel.
    // This will be received and processed by the goroutine running `echo`
    // and send back via the `echoChan`.
    msgChan <- "hello"

    // Receive back the processed echo and print it.
    fmt.Println(<-echoChan)

    msgChan <- "bye"
    fmt.Println(<-echoChan)

    // Close both channels.
    close(msgChan)
    close(echoChan)
}

// The echo function uses the `in` parameter only for receiving messages.
// This is indicated by the `<-chan` in the parameter type.
// The `out` channel will only be used for sending. This is indicated
// by the `chan<-` signature in its type.
func echo(in <-chan string, out chan<- string) {
        // The `range` expression will iterate over all values in the
    // the message channel (`in`), until the channel is closed.
    for msg := range in {
            // Send the echoed message to the `out` channel.
        out <- strings.ToUpper(msg)
    }
}

In this example, both channels used are unbuffered. This means utmost one value can be send on the channel. The sending goroutine will block until another goroutine receives the value from the channel. Conversely, if the receive operation was attempted first, the receiving goroutine is blocked until another goroutine performs a send on the same channel. Because of this behavior, unbuffered channels can be used to synchronize operations between sending and receiving goroutines.

Channels can also be buffered by specifying a capacity at the time they are created. The following statement creates a buffered channel capable of holding three integer values:

ch := make(chan int, 3)

Send operations can insert values to the channel until its capacity is full. Then the sending goroutine will block until a value is removed from the channel by a receiving goroutine. This blocking behavior of the send and receive operations is a characteristic of CSP. (In the next part of this series, we will see another model of concurrency where the messaging between tasks are non-blocking by nature.)

Example: A Concurrent Data Pipeline

We conclude this post by looking at a larger example of programming with goroutines. In this program, we model a data pipeline. The data pipeline has three components, represented by goroutines. The first component called Generate creates a sequence of data points or ticks. The second component filters this sequence based on some test. Only the ticks that pass this test is allowed to move to the last stage of the pipeline, which will do some processing on the values. Data flows between the components strictly via channels. This example sets a pattern for building scalable data processing applications.

package main

import (
    "fmt"
    "time"
    "math/rand"
)

// The first parameter `r` simulates an external source of data.
// Here it generates a series of random 64-bit float values.
// The values are generated as long as the `done` channel is open.
// `Generate` returns a channel on which the generated data can be
// received by other parts of the pipeline.
func Generate(r *rand.Rand, done chan bool) <-chan float64 {
    out := make(chan float64)
    go func () {
        for {
                // The for loop needs to run until `done` is closed.
            // `done` is not used by the program to send any actual values.
            // The only event defined for it is the close event.
            // The `select` statement can "listen" on multiple channels for
            // events.
            // Here `select` ensures that the goroutine terminates when the `done`
            // channel is closed. If it is open, send a new "tick" on the `out`
            // channel to the next stage of the pipeline.
            select {
            case <-done:
                return
            default:
                out<- r.Float64()
            }
        }
        close(out)
    }()
    return out
}

func Filter(in <-chan float64, predic func(float64) bool) <-chan float64 {
    out := make(chan float64)
    go func() {
        for tick := range in {
                // `predic` is a used-defined function that accepts a float64
            // and tests it for some condition. Send the current tick to the next stage
            // of the pipeline only if `predic` returned true for it.
            if predic(tick) {
                out<- tick
            }
        }
        close(out)
    }()
    return out
}

func Process(in <-chan float64, proc func(float64)) {
    go func() {
        for tick := range in {
                // Send the tick to be processed by `proc` and end the pipeline.
            proc(tick)
        }
    }()
}

func main() {
    s1 := rand.NewSource(time.Now().UnixNano())
    r1 := rand.New(s1)
    done := make(chan bool)

    ticks := Generate(r1, done)
    // Filter values greater than 0.1
    filteredTicks := Filter(ticks, func(f float64) bool { return f > 0.1 })

    // Keep track of the maximum tick encountered so far, print it.
    var max float64
    Process(filteredTicks, func(tick float64) { if (tick > max) { max = tick; fmt.Printf("%f ", tick) } })

    // Generate and process data for 3 seconds and then end the pipeline by closing
    // the `done` channel.
    time.Sleep(time.Second * 3)
    close(done)
}

Conclusion

Fully utilizing the multi-core computers of today requires taking advantage of concurrent programming techniques. This goal is easily achieved if the programming language and the runtime support higher-level abstractions for concurrency. In this post we got familiar with one such programming model - CSP. The Go programming language makes it practical to develop real-world applications in this model. As we saw, the communication mechanism in CSP is inherently blocking. In the next part of this series we will encounter a concurrent programming model with non-blocking messaging semantics. We will also meet a language that enable us to build robust systems in this model. Stay tuned!


Note that name and e-mail are required for posting comments