Fan-in, Fan-out pattern with go
Fan-out, fan-in is a powerful concurrency pattern that enables efficient parallel processing of tasks and effective utilization of system resources. This pattern leverages the power of goroutines and channels in Go to distribute workload among multiple workers, thus improving the overall performance of an application.
The fan-out part of the pattern involves distributing work among multiple worker goroutines. These goroutines work concurrently, each handling a portion of the tasks. This approach helps to increase throughput and process large datasets more efficiently.
The fan-in aspect of the pattern involves collecting the results from the worker goroutines and combining them into a single output. This process is typically done using a dedicated goroutine that listens to the individual output channels of the workers, merges the results, and sends them to a single output channel.
The fan-out, fan-in pattern is particularly useful in situations where tasks can be divided into smaller, independent units and processed concurrently. This pattern not only improves application performance but also enhances code maintainability and readability by separating the concerns of distributing tasks and aggregating results.
Example in go
In this example, we have a concurrent program that downloads a list of files from URLs, processes the contents to count the number of words in each file, and then combines the results to get the total word count.
Here’s a breakdown of the code:
- Import necessary packages.
2. Define simulateDownload(url string)
function, which simulates downloading a file from the provided URL and returns its content as a string.
3. Define downloader(urls []string)
function, which takes a slice of URLs and returns a channel that sends the content of each URL. It launches a goroutine that iterates over the URLs, simulates the download, and sends the content through the channel. The channel is closed after all URLs have been processed.
It will be fan-out part: the downloader
function creates a single downloadStream
channel that sends the content of each downloaded file. Later, we will create multiple worker goroutines that listen to this shared channel, effectively fanning out the work to be done concurrently.
4. Define worker(in <-chan string)
function, which takes a channel of strings as input and returns a channel of integers. It launches a goroutine that reads the content from the input channel, prints the processing timestamp and content, counts the number of words in the content, and sends the count through the output channel. The output channel is closed after all content has been processed.
5. Define merger(ins ...<-chan int)
function, which takes a variadic parameter of channels with integer values and returns a channel with integer values. It merges the input channels into a single output channel. A sync.WaitGroup
is used to wait for all input channels to be processed, after which the output channel is closed.
It will be fan-in part: the merger
function combines the results from multiple worker goroutines by listening to their individual output channels. It uses a sync.WaitGroup
to ensure that it waits for all the worker goroutines to complete before closing its output channel.
6. In the main
function, seed the random generator, define a slice of URLs, and create a download stream by calling the downloader()
function.
7. Define the number of workers, create a slice of worker channels, and start the worker goroutines with the download stream as input.
8. Merge the worker channels using the merger()
function.
9. Iterate over the merged channel to compute the total word count.
10. Print the total word count.
This example demonstrates how to use channels and goroutines to concurrently download, process, and merge data in an efficient and organized manner.
package main
import (
"fmt"
"math/rand"
"strings"
"sync"
"time"
)
// simulateDownload simulates downloading a file and returns its content.
func simulateDownload(url string) string {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return fmt.Sprintf("Content of %s", url)
}
// downloader downloads a list of URLs and returns the content.
func downloader(urls []string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for _, url := range urls {
out <- simulateDownload(url)
}
}()
return out
}
// worker processes the content and returns the number of words.
func worker(in <-chan string) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for content := range in {
fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000000 -0700 MST"))
fmt.Printf("Processing content: %s\n\n", content)
words := strings.Fields(content)
out <- len(words)
}
}()
return out
}
// merger merges the results from multiple workers.
func merger(ins ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(ins))
for _, in := range ins {
go func(in <-chan int) {
defer wg.Done()
for n := range in {
fmt.Printf("Merging result: %d\n", n)
out <- n
}
}(in)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
rand.Seed(time.Now().UnixNano())
urls := []string{
"https://example.com/file1.txt",
"https://example.com/file2.txt",
"https://example.com/file3.txt",
"https://example.com/file4.txt",
"https://example.com/file5.txt",
}
downloadStream := downloader(urls)
numWorkers := 3
workerChannels := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
workerChannels[i] = worker(downloadStream)
}
merged := merger(workerChannels...)
totalWordCount := 0
for count := range merged {
totalWordCount += count
}
fmt.Printf("Total word count: %d\n", totalWordCount)
}
Conclusion
We’ve demonstrated how Go’s powerful concurrency model, including goroutines and channels, can be used to build efficient and concurrent programs. In our example, we downloaded multiple files from a list of URLs, processed their contents to count the number of words, and combined the results using concurrent programming techniques. This example shows how Go can be used to effectively manage multiple tasks concurrently, leading to improved performance and resource utilization in real-world applications.
If you enjoy reading medium articles and are interested in becoming a member, I would be happy to share my referral link with you!