Youssef Ameachaq's Blog

Youssef Ameachaq

Go Concurrency Patterns Explained


Go provides a powerful and simple way to handle concurrency using goroutines and channels. Here are structured notes explaining common concurrency patterns in Go with examples and easy-to-understand explanations.


1. Worker Pool Pattern

What it does

A worker pool uses multiple goroutines (workers) to process tasks from a shared channel. This pattern limits the number of concurrent tasks.

Example

package main

import (
	"fmt"
	"time"
)

// worker processes tasks from the jobs channel
func worker(id int, jobs <-chan int, results chan<- int) {
	for job := range jobs {
		fmt.Printf("Worker %d started job %d\n", id, job)
		time.Sleep(time.Second) // Simulating work
		fmt.Printf("Worker %d finished job %d\n", id, job)
		results <- job * 2 // Send result back
	}
}

func main() {
	jobs := make(chan int, 5)    // Jobs channel
	results := make(chan int, 5) // Results channel

	// Start 3 workers
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}

	// Send 5 jobs
	for j := 1; j <= 5; j++ {
		jobs <- j
	}
	close(jobs) // Close jobs channel when done sending

	// Collect results
	for r := 1; r <= 5; r++ {
		fmt.Println("Result:", <-results)
	}
}

2. Fan-Out, Fan-In Pattern

What it does

Example

package main

import (
	"fmt"
	"time"
)

func worker(id int, data int, results chan<- int) {
	time.Sleep(time.Second) // Simulate work
	results <- data * 2 // Send back result
}

func main() {
	data := []int{1, 2, 3, 4, 5}
	results := make(chan int, len(data))

	// Fan-out: Start a goroutine for each data item
	for i, value := range data {
		go worker(i, value, results)
	}

	// Fan-in: Collect results
	for i := 0; i < len(data); i++ {
		fmt.Println("Result:", <-results)
	}
}

3. Pipelines

What it does

Data flows through a series of stages, with each stage implemented by a goroutine.

Example

package main

import "fmt"

// Generates numbers
func generator(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

// Squares numbers
func square(input <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for num := range input {
			out <- num * num
		}
		close(out)
	}()
	return out
}

func main() {
	nums := generator(1, 2, 3, 4, 5)
	squares := square(nums)

	// Consume the pipeline output
	for result := range squares {
		fmt.Println(result)
	}
}

4. Select Statement

What it does

The select statement waits for one of multiple channel operations to proceed. This is useful for handling multiple concurrent tasks.

Example (Without default case)

package main

import (
	"fmt"
	"time"
)

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)

	go func() {
		time.Sleep(1 * time.Second)
		ch1 <- "Channel 1 finished"
	}()

	go func() {
		time.Sleep(2 * time.Second)
		ch2 <- "Channel 2 finished"
	}()

	for i := 0; i < 2; i++ {
		select {
		case msg1 := <-ch1:
			fmt.Println(msg1)
		case msg2 := <-ch2:
			fmt.Println(msg2)
		}
	}
}

Using Default (Non-blocking select)

package main

import (
	"fmt"
	"time"
)

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)

	go func() {
		time.Sleep(1 * time.Second)
		ch1 <- "Channel 1 finished"
	}()

	go func() {
		time.Sleep(2 * time.Second)
		ch2 <- "Channel 2 finished"
	}()

	for i := 0; i < 5; i++ {
		select {
		case msg1 := <-ch1:
			fmt.Println(msg1)
		case msg2 := <-ch2:
			fmt.Println(msg2)
		default:
			fmt.Println("No channel is ready yet")
			time.Sleep(500 * time.Millisecond) // Simulate doing other work
		}
	}
}

5. Mutex for Shared State

What it does

A sync.Mutex ensures that only one goroutine can access shared data at a time, preventing race conditions.

Example

package main

import (
	"fmt"
	"sync"
)

func main() {
	var mu sync.Mutex
	counter := 0

	wg := sync.WaitGroup{}
	wg.Add(5)

	for i := 0; i < 5; i++ {
		go func(id int) {
			mu.Lock() // Lock before modifying shared state
			counter++
			fmt.Printf("Goroutine %d incremented counter to %d\n", id, counter)
			mu.Unlock() // Unlock after modification
			wg.Done()
		}(i)
	}

	wg.Wait()
	fmt.Println("Final Counter:", counter)
}

6. Context for Cancellation and Deadlines

What it does

The context package is used to manage cancellations, timeouts, and deadlines for goroutines.

Example

package main

import (
	"context"
	"fmt"
	"time"
)

func worker(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("Worker stopped")
			return
		default:
			fmt.Println("Worker running")
			time.Sleep(500 * time.Millisecond)
		}
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	go worker(ctx)

	time.Sleep(3 * time.Second)
	fmt.Println("Main finished")
}

Wrap up