Golangbyte
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

Concurrency Patterns

Concurrency Patterns

Concurrency patterns are essential in writing efficient and scalable concurrent programs. In this document, we explore various concurrency patterns in Go, demonstrating their usage through code examples.

Table of Contents

  1. Worker Pool
  2. Semaphore
  3. Pipeline
  4. Fan In
  5. Fan Out
  6. Project
  7. Conclusion

1. Worker Pool

worker pool is a concurrency pattern that aims at reusing goroutines. It initiallizes them at first then assigns them tasks. Once task is done, new task will be assigned to worker.This helps in preventing unwanted initialization of concurrent functions. Worker pool pattern is used as a component in many complex patterns.

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var wg sync.WaitGroup
	numWorkers := 3
	//	to process this
	numArr := []int{5, 7, 4, 77, 2}
	jobs := make(chan int, len(numArr))
	results := make(chan int, len(numArr))
	//	prepare waitgroups
	wg.Add(numWorkers)
	//	start workers with jobs
	for i := 0; i < numWorkers; i++ {
		go worker(i, jobs, results)
	}
	//	add data to process in jobs channel
	for _, i := range numArr {
		jobs <- i
	}
	close(jobs)
	//	read result from result channel
	for i := 0; i < len(numArr); i++ {
		fmt.Println(<-results)
	}
}

func worker(id int, jobs chan int, results chan int) {
	for job := range jobs {
		fmt.Println("worker ", id, "started job ", job)
		time.Sleep(1 * time.Second)
		results <- job * 10
		fmt.Println("worker ", id, "finished job ", job)
	}
}

2. Semaphore

Semaphore pattern is used to limit access control to resource in a given instant. It helps in limiting concurrent workers at a time by blocking number of workers at aa instant to the resource. Semaphore is a struct with channel of user defined capacity. The worker acquires a token from channel and releases it after work is done for other worker. There are standard libraries also available for semaphore patters operations.

package main

import (
	"fmt"
	"sync"
	"time"
)

type Semaphore struct {
	ch chan struct{}
}

func NewSemaphore(size int) *Semaphore {
	return &Semaphore{ch: make(chan struct{}, size)}
}

func (s *Semaphore) Acquire() {
	s.ch <- struct{}{}
}

func (s *Semaphore) Release() {
	<-s.ch
}

func main() {
	start := time.Now()
	numWorkers := 10
	numArr := []int{5, 7, 4, 77, 2, 88, 66, 97, 90, 45, 34, 12, 78}
	var wg sync.WaitGroup
	sem := NewSemaphore(3)
	jobsChan := make(chan int, len(numArr))
	results := make(chan int, len(numArr))
	wg.Add(numWorkers)
	// Start workers
	worker(numWorkers, &wg, jobsChan, results, sem)
	// Add job to process in jobs channel
	for _, job := range numArr {
		sem.Acquire()
		jobsChan <- job
	}
	close(jobsChan)
	// Wait for all workers to finish
	wg.Wait()
	// Close the results channel after all jobs are done
	close(results)
	// Read results from the channel
	for res := range results {
		fmt.Println("Result:", res)
	}
	fmt.Println(time.Since(start))
}

func worker(numWorkers int, wg *sync.WaitGroup, jobs chan int, results chan int, sem *Semaphore) {
	for i := 0; i < numWorkers; i++ {
		go func(workerID int) {
			defer wg.Done()
			for job := range jobs {
				fmt.Println("Worker", workerID, "started job", job)
				time.Sleep(1 * time.Second)
				results <- job * 10
				fmt.Println("Worker", workerID, "finished job", job)
				sem.Release()
			}
		}(i)
	}
}

3. Pipeline

Pipeline pattern is a multi stage pattern for processing the task. It breaks down task into smaller subtasks working in parallel and moving data via channels for efficient operation of task concurrently. The functions are communicated via stage channels, each for a specific responsiblity. this completes the operation in multi steps. This significantly improves the time and makes better use of resources.

package main

import (
	"fmt"
	"time"
)
// multi stage process
func sliceToChan(numbers []int) <-chan int {
	result := make(chan int)
	go func() {
		for _, n := range numbers {
			result <- n
		}
		close(result)
	}()
	return result
}

// processing function
func squareFunc(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()
	return out
}

func main() {
//	for measurement
	start := time.Now()
	//input
	nums := []int{1, 4, 5, 6, 2, 8}
	// stage1  
	dataChan := sliceToChan(nums)
	// stage 2
	finalChannel := squareFunc(dataChan)
	//stage 3
	for n := range finalChannel {
		fmt.Println("value is ", n)
	}
	fmt.Println(time.Since(start))
}

4. Fan In

Fan in is a pattern that is used to get data from a lot of channels as input and combine them into a single output channel. This is used with pipelines for more effective operations of big tasks with fanout.

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// FanIn function combines multiple input channels into a single channel
func fanIn(input1, input2 <-chan int) <-chan int {
	output := make(chan int)
	var wg sync.WaitGroup

	// Goroutine for the first input channel
	wg.Add(1)
	go func() {
		defer wg.Done()
		for v := range input1 {
			output <- v
		}
	}()

	// Goroutine for the second input channel
	wg.Add(1)
	go func() {
		defer wg.Done()
		for v := range input2 {
			output <- v
		}
	}()

	// Wait for both goroutines to finish
	go func() {
		wg.Wait()
		close(output)
	}()

	return output
}

func main() {
	rand.Seed(time.Now().UnixNano())

	// Create two input channels
	ch1 := make(chan int)
	ch2 := make(chan int)

	// Simulate data being sent to the input channels concurrently
	go sendData(ch1, "Channel 1")
	go sendData(ch2, "Channel 2")

	// Combine the input channels using the fan-in pattern
	resultChan := fanIn(ch1, ch2)

	// Read values from the combined channel
	for result := range resultChan {
		fmt.Println("Received:", result)
	}

	fmt.Println("All data received. Exiting.")
}

// sendData sends random data to the specified channel
func sendData(ch chan<- int, channelName string) {
	for i := 0; i < 5; i++ {
		time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
		ch <- i
		fmt.Printf("%s sent: %d\n", channelName, i)
	}
	close(ch)
}

5. Fan Out

Fanout is a pattern that takes the task and spreads it across multiple channels along with worker goroutines. So tasks can be processed in parallel in independent channels with independent workers. Fanin is used with fanout for processing big tasks effeciently.

package main

import (
	"fmt"
	"sync"
	"time"
)

// Worker function represents a worker that processes data
func worker(id int, input <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for value := range input {
		// Simulate processing time
		time.Sleep(time.Millisecond * 500)
		fmt.Printf("Worker %d processed: %d\n", id, value)
	}
}

// FanOut function distributes data from a single input channel to multiple workers
func fanOut(input <-chan int, numWorkers int) []chan int {
	outputChannels := make([]chan int, numWorkers)

	for i := 0; i < numWorkers; i++ {
		outputChannels[i] = make(chan int)
		go func(workerID int) {
			defer close(outputChannels[workerID])
			worker(input, outputChannels[workerID])
		}(i)
	}

	return outputChannels
}

// FanIn function combines multiple input channels into a single channel
func fanIn(inputs []chan int) <-chan int {
	output := make(chan int)
	var wg sync.WaitGroup

	// Function to copy data from an input channel to the output channel
	copy := func(input <-chan int) {
		defer wg.Done()
		for value := range input {
			output <- value
		}
	}

	wg.Add(len(inputs))

	// Start goroutines to copy data from each input channel to the output channel
	for _, input := range inputs {
		go copy(input)
	}

	// Wait for all copying goroutines to finish and close the output channel
	go func() {
		wg.Wait()
		close(output)
	}()

	return output
}

func main() {
	inputChannel := make(chan int)

	// Create multiple worker channels using the fan-out pattern
	workerChannels := fanOut(inputChannel, 3)

	// Simulate sending data to the input channel
	go func() {
		for i := 0; i < 5; i++ {
			inputChannel <- i
		}
		close(inputChannel)
	}()

	// Combine results from workers using the fan-in pattern
	resultChannel := fanIn(workerChannels)

	// Read and print results from the combined channel
	for result := range resultChannel {
		fmt.Printf("Main received: %d\n", result)
	}
}

6. Project

A project that combines fanin fanout with pipeline pattern to process the worker task more effeciently. the worker function can be updated along with input job channel. for other functional operations if required

package main

import (
	"fmt"
	"sync"
	"time"
)

// multi stage process
func sliceToChan(numbers []int) chan int {
	result := make(chan int)
	go func() {
		for _, n := range numbers {
			result <- n
		}
		close(result)
	}()
	return result
}

func squareFunc(wg *sync.WaitGroup, in chan int) chan int {
	defer wg.Done()
	out := make(chan int)

	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()
	return out
}

func fanout(wg *sync.WaitGroup, inchan chan int, n int) []chan int {
	outchans := make([]chan int, n)
	for i := 0; i < n; i++ {
		wg.Add(1)
		outchans[i] = squareFunc(wg, inchan)
	}
	return outchans
}

func fanin(wg *sync.WaitGroup, inchans []chan int) chan int {
	outchan := make(chan int)
	go func() {
		wg.Wait()
		for _, ch := range inchans {
			for n := range ch {
				outchan <- n
			}
		}
		close(outchan)

	}()
	return outchan
}

func main() {
	start := time.Now()
	var wg sync.WaitGroup
	//input
	nums := []int{1, 4, 5, 6, 2, 8}
	// stage1
	dataChan := sliceToChan(nums)

	outChans := fanout(&wg, dataChan, 10)

	finalChannel := fanin(&wg, outChans)
	for n := range finalChannel {
		fmt.Println("value is ", n)
	}
	fmt.Println(time.Since(start))
}

Conclusion

We’ve explored various concurrency patterns in Go, each serving a specific purpose in writing efficient concurrent programs. Understanding and applying these patterns can significantly improve the performance and scalability of your Go applications.

Download and Github

Download the PDF

Find the complete source code and examples on GitHub.