avatarPhuong Le (@func25)

Summary

The website content provides an in-depth explanation of the errgroup package in Go, detailing its structure, functionality, and the concurrency control mechanisms it employs, such as semaphores, WaitGroups, and channels.

Abstract

The article on the undefined website delves into the errgroup package in Go, emphasizing its importance for managing concurrent tasks effectively. It breaks down the internal structure of the errgroup's Group struct, highlighting the use of synchronization primitives like sync.WaitGroup and sync.Once. The article also discusses the role of the semaphore channel (sem) in controlling the number of goroutines running concurrently and the functions Go, Wait, SetLimit, and TryGo that facilitate the coordination and execution of tasks within the errgroup. The author aims to demystify errgroup by providing clear examples and references to other related articles, making the topic more accessible to readers with varying levels of expertise in Go concurrency.

Opinions

  • The author believes that a solid understanding of errgroup and channels is crucial for effectively using goroutines in Go.
  • They suggest that readers familiarize themselves with key concepts from previous articles to better grasp the advanced topics discussed.
  • The author values the efficiency of concurrent programming and sees errgroup as a robust tool for managing independent tasks.
  • They highlight the importance of error handling in concurrent programming, with errgroup capturing only the first error and ignoring subsequent ones.
  • The article conveys that proper management of goroutine execution is critical to maintaining program efficiency and preventing issues like blocking.
  • The author encourages readers to engage with them on Twitter, indicating a willingness to continue the conversation on the topic and provide further assistance or insights.

Go Deep Dive

Errgroup Explained: Understanding Its Inner Workings

We’ve delved into using errgroup to boost the performance of independent tasks, thanks to the robust features it offers, these include:

Photo by Kevin Ku on Unsplash

To get the most out of our discussion, it’s important to have a good grasp of two key concepts in Go programming: errgroup and channels.

Once you’re comfortable with these concepts, you’ll find it much easier to dive into the more advanced stuff we’re going to talk about.

I aim to make the topic more accessible and valuable for you, drawing from my personal experiences and understanding.

1. What is inside the errgroup?

At the heart of the errgroup package lies the Group struct. Let's take a closer look at its structure:

type Group struct {
  cancel func(error)
  
  wg sync.WaitGroup
  
  sem chan token
  
  errOnce sync.Once
  err     error
}

This struct employs two synchronization techniques: sync.WaitGroup and sync.Once.

I’ve covered these in detail in my article ‘6 Key Sync Package Concepts’, but here’s a brief recap:

  • sync.WaitGroup: It’s used to wait for a group of goroutines to complete their execution.
  • sync.Once: This ensures that certain initialization code is executed only once, no matter how many times it’s called. It’s safe to use concurrently.

We also have an error field, we only capture the first error and ignore any subsequent ones.

The cancel field is a function and it's used to cancel the context either when the task is completed or when an error occurs.

What really makes this struct special is the sem channel, which acts as a semaphore which is crucial for controlling the number of goroutines that run at the same time.

2. How it works?

Let’s start with what a semaphore is.

In Go, a semaphore is used to control the number of goroutines that can run at the same time, it’s a tool for managing how busy our program is.

a. Semaphore

The ‘sem’ channel in an errgroup begins with no limits, it’s set as nil, meaning any number of goroutines can run together.

But in many cases, we need to manage this activity to keep our program efficient, that’s where SetLimit(n) comes in.

This function sets up the 'sem' channel so it only allows 'n' goroutines to run at the same time:

g.sem = make(chan token, n)

This code line transforms the ‘sem’ channel into a buffered channel with a capacity for ’n’ elements (or tokens).

A buffered channel in Go can hold a limited number of values without blocking and the ’n’ here represents the number of goroutines that can be executed simultaneously.

When this buffered channel is full, any attempt to add another goroutine will be blocked. This blockage continues until there is available space in the channel, which happens when one of the running goroutines finishes its task and effectively ‘returns’ its place in the channel.

Let’s now turn our attention to the functions within the Group struct and their role in coordinating these tasks.

b. Go

The Go function in errgroup is essential yet more complex than it may seem, here’s a breakdown of its functionality:

func (g *Group) Go(f func() error) {
    // if g.sem is nil, no limit applied

    // otherwise, we push a token to the sem channel, 
    // which will block if the channel is full

    // g.wg.Add(1) to add one more goroutine to the wait group

    // create a goroutine to run the function f
        // defer g.wg.Done() to remove one goroutine from the wait group

        // if f return an error, we call g.cancel to cancel the context 
        // and save the error to g.err
}

When you use g.Go(f), several things happen:

  1. Semaphore Check: If g.sem is not nil, the function checks the semaphore. If the semaphore's limit has been reached, g.Go(f) will block until a slot frees up.
  2. WaitGroup Management: The function then adds a new goroutine to the WaitGroup (g.wg.Add(1)) before creating the goroutine to run f.
  3. Running the Function: The provided function f runs in its goroutine. When f completes, whether successfully or with an error, the function marks one less goroutine in the WaitGroup (g.wg.Done()).
  4. Error Handling: If f returns an error, g.cancel is called to cancel the context, and the error is stored in g.err.

A key point to remember is that g.Go() doesn’t just queue tasks.

It immediately starts the goroutine for function f, given there’s space in the semaphore.

Also, it's important to know that g.Go() can block the initiating goroutine if the semaphore is full, affecting not just g.Wait() but the g.Go() function itself.

This aspect is critical in understanding how errgroup manages goroutine execution.

b. Wait

The Wait function is designed to ensure that all goroutines launched within the errgroup have completed before the program proceeds:

func (g *Group) Wait() error {
  g.wg.Wait()  // Await all goroutines.
  if g.cancel != nil {
    g.cancel(g.err)
  }
  return g.err  // Error from goroutines, or nil.
}

This function blocks until g.wg.Wait() confirms all goroutines have finished.

Then, it invokes g.cancel, a function that handles the cancellation context, which is called irrespective of whether an error occurred.

Finally, Wait returns g.err, which encapsulates any errors from the goroutines but if no errors occurred, g.err will be nil, indicating successful completion of all goroutines.

c. SetLimit

The SetLimit function in the Group type is utilized to initialize the semaphore's sem channel with n tokens, this initialization is critical and should be handled with care.

Adjusting the limit while goroutines are active (indicated by existing tokens in the semaphore) will cause the application to panic.

Therefore, it’s imperative to call SetLimit only before any g.Go() invocations or after ensuring all goroutines have finished using g.Wait().

func (g *Group) SetLimit(n int) {
  if n < 0 {
    g.sem = nil
    return
  }
  if len(g.sem) != 0 {
    panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
  }
  g.sem = make(chan token, n)
}

In cases where n is set to a negative value, SetLimit deactivates the semaphore limit by setting g.sem to nil, allowing an unlimited number of concurrent goroutines.

d. TryGo

The TryGo function is a solution to prevent the potential blocking issue associated with g.Go().

It's useful when adding a goroutine to the group is contingent on the availability of space in the semaphore.

“If the channel is full, won’t it block? You just said sending to a full channel will be blocked”

The implementation of TryGo cleverly addresses this, it utilizes a select statement to non-blockingly check the semaphore's capacity:

select {
case g.sem <- token{}:
    // Proceed if the semaphore has space.
default:
    return false
}

In standard scenarios, sending to a full channel would block the sender.

However, TryGo leverages the select statement with a default case to circumvent this issue. If the semaphore is full, TryGo doesn't block but returns false instead, signaling the inability to add another goroutine at that moment.

With this understanding, why not take a shot at creating your own errgroup? Here are some features you could consider including:

  • Gather all errors that occur, not just the first one.
  • Initiate goroutines only upon calling errgroup.Wait(), and execute them in batches (like 10 or 20 at a time).
  • Modify the SetLimit function to return an error instead of causing a panic.
  • When the maximum number of active goroutines is reached, have group.Go store additional goroutines in a queue to be executed later, rather than blocking us.

By the way, I’m active on Twitter, so feel free to connect with me there.

Golang
Programming
Coding
Software Development
Recommended from ReadMedium