Go Concurrency Patterns Explained
Go provides a powerful and simple way to handle concurrency using goroutines and channels. Here are structured notes explaining common concurrency patterns in Go with examples and easy-to-understand explanations.
1. Worker Pool Pattern
What it does
A worker pool uses multiple goroutines (workers) to process tasks from a shared channel. This pattern limits the number of concurrent tasks.
Example
package main
import (
"fmt"
"time"
)
// worker processes tasks from the jobs channel
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d started job %d\n", id, job)
time.Sleep(time.Second) // Simulating work
fmt.Printf("Worker %d finished job %d\n", id, job)
results <- job * 2 // Send result back
}
}
func main() {
jobs := make(chan int, 5) // Jobs channel
results := make(chan int, 5) // Results channel
// Start 3 workers
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// Send 5 jobs
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs) // Close jobs channel when done sending
// Collect results
for r := 1; r <= 5; r++ {
fmt.Println("Result:", <-results)
}
}
2. Fan-Out, Fan-In Pattern
What it does
- Fan-Out: Distributes tasks to multiple goroutines for parallel processing.
- Fan-In: Combines results from multiple goroutines into one channel.
Example
package main
import (
"fmt"
"time"
)
func worker(id int, data int, results chan<- int) {
time.Sleep(time.Second) // Simulate work
results <- data * 2 // Send back result
}
func main() {
data := []int{1, 2, 3, 4, 5}
results := make(chan int, len(data))
// Fan-out: Start a goroutine for each data item
for i, value := range data {
go worker(i, value, results)
}
// Fan-in: Collect results
for i := 0; i < len(data); i++ {
fmt.Println("Result:", <-results)
}
}
3. Pipelines
What it does
Data flows through a series of stages, with each stage implemented by a goroutine.
Example
package main
import "fmt"
// Generates numbers
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// Squares numbers
func square(input <-chan int) <-chan int {
out := make(chan int)
go func() {
for num := range input {
out <- num * num
}
close(out)
}()
return out
}
func main() {
nums := generator(1, 2, 3, 4, 5)
squares := square(nums)
// Consume the pipeline output
for result := range squares {
fmt.Println(result)
}
}
4. Select Statement
What it does
The select
statement waits for one of multiple channel operations to proceed. This is useful for handling multiple concurrent tasks.
Example (Without default
case)
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "Channel 1 finished"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "Channel 2 finished"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
}
}
}
Using Default (Non-blocking select)
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "Channel 1 finished"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "Channel 2 finished"
}()
for i := 0; i < 5; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
default:
fmt.Println("No channel is ready yet")
time.Sleep(500 * time.Millisecond) // Simulate doing other work
}
}
}
5. Mutex for Shared State
What it does
A sync.Mutex
ensures that only one goroutine can access shared data at a time, preventing race conditions.
Example
package main
import (
"fmt"
"sync"
)
func main() {
var mu sync.Mutex
counter := 0
wg := sync.WaitGroup{}
wg.Add(5)
for i := 0; i < 5; i++ {
go func(id int) {
mu.Lock() // Lock before modifying shared state
counter++
fmt.Printf("Goroutine %d incremented counter to %d\n", id, counter)
mu.Unlock() // Unlock after modification
wg.Done()
}(i)
}
wg.Wait()
fmt.Println("Final Counter:", counter)
}
6. Context for Cancellation and Deadlines
What it does
The context
package is used to manage cancellations, timeouts, and deadlines for goroutines.
Example
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("Worker stopped")
return
default:
fmt.Println("Worker running")
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
go worker(ctx)
time.Sleep(3 * time.Second)
fmt.Println("Main finished")
}
Wrap up
- Worker Pool: Process tasks with a fixed number of goroutines.
- Fan-Out/Fan-In: Spread tasks across goroutines and collect results.
- Pipelines: Pass data through stages of processing.
- Select: Handle multiple channels simultaneously.
- Mutex: Protect shared resources from concurrent access.
- Context: Manage cancellations and timeouts.