Observer Pattern in Go with Channel
Leveraging Channels for Asynchronous Communication
Introduction
Hello, fellow Gophers! Today, I’m excited to dive deep into a powerful behavioral design pattern that forms a vital part of our developer toolkit: the Observer Pattern. In my previous post, I showed how we could classically implement the Observer pattern without leveraging in Go Channel. Since there were some requests to show how to implement using Channel, I am writing this post. We’ll explore how we can harness the power of channels in Go to implement this pattern elegantly and effectively.
The Observer Pattern is a design pattern where an object (known as the subject) maintains a list of dependents (observers), automatically notifying them of any state changes. This pattern promotes loose coupling between objects and is particularly useful for event-handling systems.
In the Go landscape, channels are fitting for implementing this pattern. Channels provide a robust means for goroutines (lightweight threads in Go) to communicate and synchronize their execution. This feature of channels fits nicely into the framework of the Observer Pattern, acting as conduits for the passage of state changes between the subject and its observers.
Alright, now let’s take the plunge and swim through some code.
Example 1: Simple Event Notification
Imagine a blog system where we want to notify subscribers whenever a new post is published. Here’s how we can implement the observer pattern using Go channels:
package main
import (
"fmt"
"sync"
)
// Subject struct
type Subject struct {
observers []chan string
}
// NewSubject creates a new Subject
func NewSubject() *Subject {
return &Subject{
observers: make([]chan string, 0),
}
}
// Subscribe method for adding new observer
func (s *Subject) Subscribe(observer chan string) {
s.observers = append(s.observers, observer)
}
// NotifyObservers method for notifying all observers
func (s *Subject) NotifyObservers(message string) {
for _, observer := range s.observers {
observer <- message // sending the message to the observer
}
}
func main() {
subject := NewSubject()
// creating channels for observers
observer1 := make(chan string)
observer2 := make(chan string)
subject.Subscribe(observer1)
subject.Subscribe(observer2)
var wg sync.WaitGroup // using WaitGroup
wg.Add(2) // setting counter to the number of observers
go func() {
for {
msg := <-observer1
fmt.Println("Observer 1 received:", msg)
wg.Done() // decrement counter when observer 1 is done
}
}()
go func() {
for {
msg := <-observer2
fmt.Println("Observer 2 received:", msg)
wg.Done() // decrement counter when observer 2 is done
}
}()
subject.NotifyObservers("New blog post published!")
wg.Wait() // wait until all observers are done
}In this setup, the Subject keeps a list of observers, each represented by a channel. When the NotifyObservers method is invoked, it dispatches the message to all observers by pushing it into their respective channels.
Example 2: Stock Market Ticker
In this second scenario, we’ll model a stock market ticker. Whenever there’s a change in a stock’s price, all registered brokers should be notified. Let’s see how we can do this:
package main
import (
"fmt"
"sync"
)
// Stock is our Subject
type Stock struct {
price float64
brokers []chan float64
}
// NewStock creates a new Stock
func NewStock() *Stock {
return &Stock{
brokers: make([]chan float64, 0),
}
}
// Register method for adding new broker
func (s *Stock) Register(broker chan float64) {
s.brokers = append(s.brokers, broker)
}
// UpdatePrice updates the stock price and notifies all brokers
func (s *Stock) UpdatePrice(price float64) {
s.price = price
for _, broker := range s.brokers {
broker <- s.price // send the updated price to the broker
}
}
func main() {
stock := NewStock()
// creating channels for brokers
broker1 := make(chan float64)
broker2 := make(chan float64)
stock.Register(broker1)
stock.Register(broker2)
var wg sync.WaitGroup // using WaitGroup
go func() {
for price := range broker1 {
fmt.Printf("Broker 1 updated the stock price: %.2f\n", price)
wg.Done() // decrement counter when observer 1 is done
}
}()
go func() {
for price := range broker2 {
fmt.Printf("Broker 2 updated the stock price: %.2f\n", price)
wg.Done() // decrement counter when observer 2 is done
}
}()
wg.Add(2) // assuming each observer will get exactly one message
stock.UpdatePrice(150.25)
wg.Add(2) // assuming each observer will get exactly one message
stock.UpdatePrice(152.85)
wg.Wait() // wait until all observers are done
}The reason for calling wg.Add(2) before each NotifyObservers or UpdatePrice call is that we have two observers subscribed to the subject. Each call to UpdatePrice sends a message to both observers, so we need to increment the WaitGroup counter by two (one for each observer) before each notification.
The wg.Add(2) statement signifies that there are two operations to wait for. When wg.Done() is called from each goroutine, it decrements the WaitGroup counter. So, we call wg.Done() once for each operation - once from each goroutine. When the counter gets back to zero, wg.Wait() stops blocking and allows the main function to continue. This ensures that the main function waits for all observer goroutines to finish processing their messages.
Remember, the wg.Add(2) assumes that each observer will receive exactly one message for every notification. If your program logic might send a different number of messages, you'll need to adjust the argument to wg.Add() accordingly.
Example 3: Weather Station
In the final example, we’ll build a weather station. Sensors send data, and multiple display devices need to be updated with this data.
package main
import (
"fmt"
"sync"
)
// WeatherData is our Subject
type WeatherData struct {
temperature float64
humidity float64
displays []chan string
}
// NewWeatherData creates a new WeatherData
func NewWeatherData() *WeatherData {
return &WeatherData{
displays: make([]chan string, 0),
}
}
// RegisterDisplay method for adding a new display
func (w *WeatherData) RegisterDisplay(display chan string) {
w.displays = append(w.displays, display)
}
// UpdateWeather updates weather data and notifies all displays
func (w *WeatherData) UpdateWeather(temperature float64, humidity float64, wg *sync.WaitGroup) {
w.temperature = temperature
w.humidity = humidity
for _, display := range w.displays {
wg.Add(1) // add to the counter before sending a message
display <- fmt.Sprintf("Temperature: %.2f, Humidity: %.2f", w.temperature, w.humidity)
}
}
func main() {
weatherData := NewWeatherData()
// creating channels for displays
display1 := make(chan string)
display2 := make(chan string)
weatherData.RegisterDisplay(display1)
weatherData.RegisterDisplay(display2)
var wg sync.WaitGroup // using WaitGroup
go func() {
for msg := range display1 {
fmt.Println("Display 1 updated:", msg)
wg.Done() // decrement counter when observer 1 is done
}
}()
go func() {
for msg := range display2 {
fmt.Println("Display 2 updated:", msg)
wg.Done() // decrement counter when observer 2 is done
}
}()
weatherData.UpdateWeather(25.0, 60.0, &wg)
weatherData.UpdateWeather(26.5, 55.0, &wg)
wg.Wait() // wait until all observers are done
}As you can see, when combined with Go channels, the Observer Pattern simplifies and enhances synchronizing multiple observers with a subject’s state. This pattern provides an elegant solution to keep observers in sync, promotes the design principle of loose coupling, and helps our programs be more understandable, adaptable, and testable.
That’s it for today, fellow Gophers! Remember, understanding the fundamental design patterns makes you a better developer and helps you design more robust and easier-to-maintain solutions. So keep learning, keep coding, and see you next time!
Happy coding!





