Youssef Ameachaq's Blog

0. Who am I?

Intro

1. Worker Pool Pattern

package main

import (
	"fmt"
	"time"
)

// worker processes tasks from the jobs channel
func worker(id int, jobs <-chan int, results chan<- int) {
	for job := range jobs {
		fmt.Printf("Worker %d started job %d\n", id, job)
		time.Sleep(time.Second) // Simulating work
		fmt.Printf("Worker %d finished job %d\n", id, job)
		results <- job * 2 // Send result back
	}
}

func main() {
	jobs := make(chan int, 5)    // Jobs channel
	results := make(chan int, 5) // Results channel

	// Start 3 workers
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}

	// Send 5 jobs
	for j := 1; j <= 5; j++ {
		jobs <- j
	}
	close(jobs) // Close jobs channel when done sending

	// Collect results
	for r := 1; r <= 5; r++ {
		fmt.Println("Result:", <-results)
	}
}

2. Fan-Out, Fan-In Pattern

Example

package main

import (
	"fmt"
	"time"
)

func worker(id int, data int, results chan<- int) {
	time.Sleep(time.Second) // Simulate work
	results <- data * 2 // Send back result
}

func main() {
	data := []int{1, 2, 3, 4, 5}
	results := make(chan int, len(data))

	// Fan-out: Start a goroutine for each data item
	for i, value := range data {
		go worker(i, value, results)
	}

	// Fan-in: Collect results
	for i := 0; i < len(data); i++ {
		fmt.Println("Result:", <-results)
	}
}

3. Pipelines

Example

package main

import "fmt"

// Generates numbers
func generator(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

// Squares numbers
func square(input <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for num := range input {
			out <- num * num
		}
		close(out)
	}()
	return out
}

func main() {
	nums := generator(1, 2, 3, 4, 5)
	squares := square(nums)

	// Consume the pipeline output
	for result := range squares {
		fmt.Println(result)
	}
}

4. Select Statement

Example (Without default case)

package main

import (
	"fmt"
	"time"
)

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)

	go func() {
		time.Sleep(1 * time.Second)
		ch1 <- "Channel 1 finished"
	}()

	go func() {
		time.Sleep(2 * time.Second)
		ch2 <- "Channel 2 finished"
	}()

	for i := 0; i < 2; i++ {
		select {
		case msg1 := <-ch1:
			fmt.Println(msg1)
		case msg2 := <-ch2:
			fmt.Println(msg2)
		}
	}
}

Using Default (Non-blocking select)

package main

import (
	"fmt"
	"time"
)

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)

	go func() {
		time.Sleep(1 * time.Second)
		ch1 <- "Channel 1 finished"
	}()

	go func() {
		time.Sleep(2 * time.Second)
		ch2 <- "Channel 2 finished"
	}()

	for i := 0; i < 5; i++ {
		select {
		case msg1 := <-ch1:
			fmt.Println(msg1)
		case msg2 := <-ch2:
			fmt.Println(msg2)
		default:
			fmt.Println("No channel is ready yet")
			time.Sleep(500 * time.Millisecond) // Simulate doing other work
		}
	}
}

5. Mutex for Shared State

Example

package main

import (
	"fmt"
	"sync"
)

func main() {
	var mu sync.Mutex
	counter := 0

	wg := sync.WaitGroup{}
	wg.Add(5)

	for i := 0; i < 5; i++ {
		go func(id int) {
			mu.Lock() // Lock before modifying shared state
			counter++
			fmt.Printf("Goroutine %d incremented counter to %d\n", id, counter)
			mu.Unlock() // Unlock after modification
			wg.Done()
		}(i)
	}

	wg.Wait()
	fmt.Println("Final Counter:", counter)
}

6. Context for Cancellation and Deadlines

Example

package main

import (
	"context"
	"fmt"
	"time"
)

func worker(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("Worker stopped")
			return
		default:
			fmt.Println("Worker running")
			time.Sleep(500 * time.Millisecond)
		}
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	go worker(ctx)

	time.Sleep(3 * time.Second)
	fmt.Println("Main finished")
}

Wrap up

Thank you