Circular buffers
A circular buffer is a common data structure for doing real-time data processing. Internally, it has a buffer, and once
that buffer is full, and a new item is added, it will drop the oldest item to make room for it.
In this post, I'm going to build a small example application to illustrate when this data structure could be useful!
Stock exchange
The application we'll be creating is going to read from a data stream that emits stock price changes. The prices are
going to update at an interval of 0-4 milliseconds. However, our application has to perform some advanced trend line
calculation with each price change. This calculation has a fixed processing time of 2 milliseconds.
That means that sometimes, the prices are going to change faster than we're able to process them. This is known as
backpressure, and could potentially blow up our system if it were to happen over an extended period of time.
To begin, we'll create one function to generate the stock prices, and another to simulate the trend line calculation:
import (
"fmt"
"math/rand/v2"
"time"
)
// producePrices is going to write 100 random stock prices to ch, and then close it.
func producePrices(ch chan<- int) {
fmt.Println("The stock exchange has opened for the day.")
for i := 0; i < 100; i++ {
ch <- rand.IntN(9999)
// To simulate bursts of price changes we'll add a random delay between 0 and 4ms.
time.Sleep(time.Duration(rand.IntN(4)) * time.Millisecond)
}
fmt.Println("The stock exchange is closing for the day.")
close(ch)
}
// calculateTrendLine is going to sleep for 2ms to simulate a calculation.
func calculateTrendLine() {
time.Sleep(2 * time.Millisecond)
}
Next, we'll setup the main
function to consume these price changes:
func main() {
stockPriceStream := make(chan int)
go producePrices(stockPriceStream)
for v := range stockPriceStream {
calculateTrendLine()
fmt.Printf("Updated the trend line with value: %v\n", v)
}
}
When we run this code using go run .
we'll see some output like this:
The stock exchange has opened for the day.
Updated the trend line with value: 6622
Updated the trend line with value: 4554
Updated the trend line with value: 739
...
The stock exchange is closing for the day.
Updated the trend line with value: 3665
Even though the output looks fine, our application has some serious flaws. To expose them, we can increase the
processing time to 2 seconds and then run the application again:
func calculateTrendLine() {
// time.Sleep(2 * time.Millisecond)
time.Sleep(2000 * time.Millisecond)
}
As you've probably noticed, the writes to our stock price channel are blocking, which isn't good. We want to be doing
our trend line calculation as close to real-time as possible. Otherwise, we might be buying when we should be selling
and vice versa!
One way to address this issue could be to make the producePrices
function perform non-blocking writes:
func producePrices(ch chan<- int) {
fmt.Println("The stock exchange has opened for the day.")
for i := 0; i < 100; i++ {
// ch <- rand.IntN(9999)
select {
case ch <- rand.IntN(9999):
default:
}
// To simulate bursts of price changes we'll add a random delay between 0 and 4ms.
time.Sleep(time.Duration(r.Intn(4)) * time.Millisecond)
}
fmt.Println("The stock exchange is closing for the day.")
close(ch)
}
Running the application again, I was only able to process 54
out of 100
price changes. That means that we dropped
46%
of all price changes. If you recall, the stock prices are changing at an interval of 0-4
milliseconds, and our
calculation takes 2
.
Our processing time is simply too slow to consume all of the data in real-time. However, we should be able make
our trend line less fragmented if we were able to "catch up" each time there were more than 2
milliseconds between a
price change.
To achieve that, we'll start by creating a second buffered channel that we'll read from so that the producePrices
function can go back to performing blocking writes:
func producePrices(ch chan<- int) {
fmt.Println("The stock exchange has opened for the day.")
for i := 0; i < 100; i++ {
ch <- r.Intn(9999):
time.Sleep(time.Duration(r.Intn(4)) * time.Millisecond)
}
fmt.Println("The stock exchange is closing for the day.")
close(ch)
}
func main() {
originalStream := make(chan int)
bufferedStream := make(chan int, 3)
// Simulate a stream of data that is going to produce stock prices at a high phase.
go producePrices(originalStream)
for v := range bufferedStream {
calculateTrendLine()
fmt.Printf("Updated the trend line with value: %v\n", v)
}
}
Now we need to connect these two streams in a way where the original stream never gets blocked, while ensuring that the
buffered stream emits the most recent price changes.
To achieve that, we'll create a new CircularBuffer
type:
type CircularBuffer[T any] struct {
inputStream <-chan T
outputStream chan T
}
func NewCircularBuffer[T any](inputStream <-chan T, outputStream chan T) *CircularBuffer[T] {
return &CircularBuffer[T]{
inputStream: inputStream,
outputStream: outputStream,
}
}
func (cb *CircularBuffer[T]) Run() {
for v := range cb.inputStream {
select {
case cb.outputStream <- v:
default:
fmt.Printf("The buffer is full. Dropping the oldest value: %v\n", <-cb.outputStream)
cb.outputStream <- v
}
}
fmt.Println("The input stream was closed. Closing the output stream.")
close(cb.outputStream)
}
the interesting part that we should be focusing on here is the select
statement:
select {
case cb.outputStream <- v:
default:
fmt.Printf("The buffer is full. Dropping the oldest value: %v\n", <-cb.outputStream)
cb.outputStream <- v
}
If the buffer is full, we'll leverage the default
case to pop and discard the oldest value, and then simply write
the new value again.
With that, we can head back to the main
function and use the CircularBuffer
to connect our two streams:
func main() {
originalStream := make(chan int)
bufferedStream := make(chan int, 3)
cb := NewCircularBuffer(originalStream, bufferedStream)
go cb.Run()
// The code here stays the same...
}
and then run the program again:
go run .
The stock exchange has opened for the day.
Updated the trend line with value: 5690
Updated the trend line with value: 4011
Updated the trend line with value: 2018
The buffer is full. Dropping the oldest value: 1294
Updated the trend line with value: 9617
...
The stock exchange is closing for the day.
The input stream was closed. Closing the output stream.
This time, we only dropped 21
prices. That is a huge improvement compared to the original 46
. The cost for that is
of course that our trend line calculation could be 3
price changes behind (our buffer size).
By using our CircularBuffer
we are able to tip the scale between real-time data (small buffer size) and less data
fragmentation (large buffer size) while not having to worry about any backpressure.
This concludes this post, I hope you enjoyed it!
The end
I usually tweet something when I've finished writing a new post. You can find me on Twitter
by clickingÂ