avatarAdam Szpilewicz

Summary

The provided content discusses the fan-in, fan-out pattern in Go, which is a concurrency model that efficiently distributes and aggregates tasks across multiple goroutines for parallel processing, leading to improved application performance and resource utilization.

Abstract

The fan-in, fan-out pattern is a concurrency design that allows for the parallel processing of tasks in Go. It involves the distribution of work (fan-out) to multiple goroutines that process tasks concurrently and the subsequent aggregation (fan-in) of the results into a single output. This pattern is facilitated by Go's channels, which are used to communicate between goroutines. The content includes a detailed example of a Go program that downloads files from URLs, counts the number of words in each file, and then combines these counts to calculate a total word count. The example demonstrates the use of goroutines and channels to concurrently download, process, and merge data, showcasing Go's capabilities for managing concurrent tasks efficiently. The article concludes by emphasizing the practical benefits of using Go's concurrency model for real-world applications that require the processing of multiple tasks simultaneously.

Opinions

  • The author believes that the fan-in, fan-out pattern is particularly useful for dividing tasks into smaller, independent units that can be processed concurrently.
  • The pattern is praised for not only improving application performance but also enhancing code maintainability and readability.
  • The use of goroutines and channels is highlighted as a powerful feature of Go that simplifies concurrent programming.
  • The article suggests that the pattern is beneficial for processing large datasets and can lead to better system resource utilization.
  • The author expresses enthusiasm for Go's concurrency model, implying that it is well-suited for real-world applications that need to manage multiple tasks concurrently.
  • The provided example is presented as a practical demonstration of the effectiveness of Go's concurrency features in a real-world scenario.

Fan-in, Fan-out pattern with go

Fan-out, fan-in is a powerful concurrency pattern that enables efficient parallel processing of tasks and effective utilization of system resources. This pattern leverages the power of goroutines and channels in Go to distribute workload among multiple workers, thus improving the overall performance of an application.

The fan-out part of the pattern involves distributing work among multiple worker goroutines. These goroutines work concurrently, each handling a portion of the tasks. This approach helps to increase throughput and process large datasets more efficiently.

The fan-in aspect of the pattern involves collecting the results from the worker goroutines and combining them into a single output. This process is typically done using a dedicated goroutine that listens to the individual output channels of the workers, merges the results, and sends them to a single output channel.

The fan-out, fan-in pattern is particularly useful in situations where tasks can be divided into smaller, independent units and processed concurrently. This pattern not only improves application performance but also enhances code maintainability and readability by separating the concerns of distributing tasks and aggregating results.

Example in go

In this example, we have a concurrent program that downloads a list of files from URLs, processes the contents to count the number of words in each file, and then combines the results to get the total word count.

Here’s a breakdown of the code:

  1. Import necessary packages.

2. Define simulateDownload(url string) function, which simulates downloading a file from the provided URL and returns its content as a string.

3. Define downloader(urls []string) function, which takes a slice of URLs and returns a channel that sends the content of each URL. It launches a goroutine that iterates over the URLs, simulates the download, and sends the content through the channel. The channel is closed after all URLs have been processed.

It will be fan-out part: the downloader function creates a single downloadStream channel that sends the content of each downloaded file. Later, we will create multiple worker goroutines that listen to this shared channel, effectively fanning out the work to be done concurrently.

4. Define worker(in <-chan string) function, which takes a channel of strings as input and returns a channel of integers. It launches a goroutine that reads the content from the input channel, prints the processing timestamp and content, counts the number of words in the content, and sends the count through the output channel. The output channel is closed after all content has been processed.

5. Define merger(ins ...<-chan int) function, which takes a variadic parameter of channels with integer values and returns a channel with integer values. It merges the input channels into a single output channel. A sync.WaitGroup is used to wait for all input channels to be processed, after which the output channel is closed.

It will be fan-in part: the merger function combines the results from multiple worker goroutines by listening to their individual output channels. It uses a sync.WaitGroup to ensure that it waits for all the worker goroutines to complete before closing its output channel.

6. In the main function, seed the random generator, define a slice of URLs, and create a download stream by calling the downloader() function.

7. Define the number of workers, create a slice of worker channels, and start the worker goroutines with the download stream as input.

8. Merge the worker channels using the merger() function.

9. Iterate over the merged channel to compute the total word count.

10. Print the total word count.

This example demonstrates how to use channels and goroutines to concurrently download, process, and merge data in an efficient and organized manner.

package main

import (
 "fmt"
 "math/rand"
 "strings"
 "sync"
 "time"
)

// simulateDownload simulates downloading a file and returns its content.
func simulateDownload(url string) string {
 time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
 return fmt.Sprintf("Content of %s", url)
}

// downloader downloads a list of URLs and returns the content.
func downloader(urls []string) <-chan string {
 out := make(chan string)
 go func() {
  defer close(out)
  for _, url := range urls {
   out <- simulateDownload(url)
  }
 }()
 return out
}

// worker processes the content and returns the number of words.
func worker(in <-chan string) <-chan int {
 out := make(chan int)
 go func() {
  defer close(out)
  for content := range in {
   fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000000 -0700 MST"))
   fmt.Printf("Processing content: %s\n\n", content)
   words := strings.Fields(content)
   out <- len(words)
  }
 }()
 return out
}

// merger merges the results from multiple workers.
func merger(ins ...<-chan int) <-chan int {
 out := make(chan int)
 var wg sync.WaitGroup
 wg.Add(len(ins))

 for _, in := range ins {
  go func(in <-chan int) {
   defer wg.Done()
   for n := range in {
    fmt.Printf("Merging result: %d\n", n)
    out <- n
   }
  }(in)
 }

 go func() {
  wg.Wait()
  close(out)
 }()

 return out
}

func main() {
 rand.Seed(time.Now().UnixNano())

 urls := []string{
  "https://example.com/file1.txt",
  "https://example.com/file2.txt",
  "https://example.com/file3.txt",
  "https://example.com/file4.txt",
  "https://example.com/file5.txt",
 }

 downloadStream := downloader(urls)
 numWorkers := 3

 workerChannels := make([]<-chan int, numWorkers)
 for i := 0; i < numWorkers; i++ {
  workerChannels[i] = worker(downloadStream)
 }

 merged := merger(workerChannels...)

 totalWordCount := 0
 for count := range merged {
  totalWordCount += count
 }

 fmt.Printf("Total word count: %d\n", totalWordCount)
}

Conclusion

We’ve demonstrated how Go’s powerful concurrency model, including goroutines and channels, can be used to build efficient and concurrent programs. In our example, we downloaded multiple files from a list of URLs, processed their contents to count the number of words, and combined the results using concurrent programming techniques. This example shows how Go can be used to effectively manage multiple tasks concurrently, leading to improved performance and resource utilization in real-world applications.

If you enjoy reading medium articles and are interested in becoming a member, I would be happy to share my referral link with you!

https://medium.com/@adamszpilewicz/membership

Golang
Go
Programming
Software Development
Software Engineering
Recommended from ReadMedium