5 Concurrency
Goroutines are functions that run concurrently in their own runtime. They are extremely lightweight (just 2 KB compared to 8 MB of threads). Any function can be run as a goroutine by adding the go
keyword in front of it.
package main
import "time"
func main() {
// function running concurrently
go myFunc()
// wait for all goroutines to finish
time.Sleep(1*time.Second)
}
func myFunc(){
for i := 0 ; i<10;i++ {
fmt.Println("hello index is : ", i)
}
}
Goroutines get executed in a fork-join pattern. The main routine has no knowledge of other goroutines, and it can get closed while goroutines are still running (goroutine leak). To wait for all goroutines to execute successfully before exiting, we use waitgroups.
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
// add number of goroutines to track for
wg.Add(1)
// run goroutine with wg as parameter
go MyFunc(&wg)
// wait for all goroutines to finish
wg.Wait()
fmt.Println("all done now exiting")
}
func MyFunc(wg *sync.WaitGroup) {
// call wg.done after function is executed
defer wg.Done()
for i := 0; i < 10; i++ {
fmt.Println("hello index is : ", i)
}
}
Channels are used to communicate between goroutines. Unbuffered channels have zero capacity, so after pushing a value into them, pulling should be done immediately to keep goroutine communication in synchronization.
package main
import (
"fmt"
"time"
)
func sendData(ch chan int) {
fmt.Println("Sending data...")
ch <- 42
fmt.Println("Sent data.")
}
func main() {
// Creating an unbuffered channel
dataChannel := make(chan int)
// Start a goroutine to send data
go sendData(dataChannel)
// Receive data from the channel
fmt.Println("Receiving data...")
data := <-dataChannel
fmt.Println("Received data:", data)
fmt.Println("Done!")
}
Buffered channels have a capacity as assigned to them. They can hold value in them before pulling till their set capacity. They provide asynchronous communication between goroutines. They may hold some value in them if all values were not consumed. So they should be checked for any residual value in them after execution.
package main
import (
"fmt"
)
func main() {
// buffered channel
myChan := make(chan int, 10)
myChan <- 1
myChan <- 88
myChan <- 99
// values are present in channel
//check for values if present
val, ok := <-myChan
fmt.Println(val, " ", ok)
fmt.Println(<-myChan)
fmt.Println(<-myChan)
fmt.Println("all done now exiting")
}
Buffered channels works as an asynchronous communication pipeline between two goroutines. This makes them continue their operations after value is pushed into the channel so they donot block waiting for value to be pulled from channel.
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
// buffered channel
myChan := make(chan int,10)
go Generator(&wg,myChan)
go func(wg *sync.WaitGroup) {
defer wg.Done()
for val := range myChan {
fmt.Println("value is ", val)
}
}(&wg)
wg.Wait()
fmt.Println("all done now exiting")
}
func Generator(wg *sync.WaitGroup,myChan chan int) {
defer wg.Done()
for i := 0; i < 15; i++ {
myChan <- i
}
close(myChan)
}
Channels can be directed as
- read only
- write only
- read and write channel
As required by the program to prevent any unwanted write or read to a channel.
package main
import (
"fmt"
"time"
)
func main() {
// example sendonly channel
sendOnly := make(chan<- int, 5)
// example recieveonly channel
receiveOnly := make(<-chan int)
biDirectional := make(chan int)
go func(ch chan<- int) {
for i := 1; i <= 5; i++ {
ch <- i
}
close(ch)
}(biDirectional)
go func(ch <-chan int) {
for val := range ch {
fmt.Println("Received value:", val)
}
}(biDirectional)
fmt.Println("done execution")
}
If two goroutines access same share resource and update it simultaneously, it can cause race conditions or unstable value assignment to the resource. to protect from this , we use mutual exclusion lock or mutex. it locks the resource before updating and releases after it is done for other goroutines to read or update its value. Mutex lock are of two types :-
- Mutex lock which does not allow read or writeto other goroutines.
- RWLock mutex that allows read but not write toother goroutines during operation.
package main
import (
"fmt"
"sync"
)
var Counter int
func main() {
var wg sync.WaitGroup
var mu sync.Mutex
wg.Add(2)
go Increment(&wg, &mu)
go Decrement(&wg, &mu)
wg.Wait()
fmt.Println("counter is :", Counter)
fmt.Println("all done now exiting")
}
func Increment(wg *sync.WaitGroup, mu *sync.Mutex) {
defer wg.Done()
for i := 0; i < 10; i++ {
mu.Lock()
Counter ++
mu.Unlock()
}
}
func Decrement(wg *sync.WaitGroup, mu *sync.Mutex) {
defer wg.Done()
for i := 0; i < 10; i++ {
mu.Lock()
Counter--
mu.Unlock()
}
}
Select is used to wait on multiple channel operations without blocking execution when working with multiple channels and goroutines. It helps in coordinating communication between goroutines and is a essential block for many concurrency patterns. It is like switch statement for concurrency.
package main
import (
"fmt"
)
func main() {
myChan := make(chan int)
done := make(chan bool)
go Increment(myChan,done)
for {
select {
case msg1 := <-myChan:
fmt.Println("value is ", msg1)
case <-done:
fmt.Println("exiting ")
return
default:
fmt.Println("no activity")
}
}
}
func Increment(myChan chan int,done chan bool) {
for i := 0; i < 10; i++ {
myChan <- 99
}
done <- true
}
Context is used to manage cancellation, deadlines and request scoped values across multiple goroutines. It helps in propagating signals through concurrent operations to multiple goroutines to change their behavior or return as defined.
package main
import (
"context"
"fmt"
"time"
)
var Counter int
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
done := make(chan bool)
go Worker(ctx, done)
// Wait for the worker to complete or timeout
if <-done {
fmt.Println("Main: Worker has completed its task.")
} else {
fmt.Println("Main: Worker did not complete. timeout or was canceled.")
}
fmt.Println("all done now exiting")
}
func Worker(ctx context.Context, done chan bool) {
for {
select {
case <-time.After(2 * time.Second):
fmt.Println("woring good")
done <- true
case <-ctx.Done():
fmt.Println("timeout")
done <- false
}
}
}
In Go, the sync/atomic package provides low-level atomic operations on memory, ensuring that certain operations are executed atomically without interruption from other goroutines. These operations are often used to safely update shared variables in concurrent programs.
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var counter int64
var wg sync.WaitGroup
const goroutines = 100
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func() {
atomic.AddInt64(&counter,1)
wg.Done()
}()
}
wg.Wait()
fmt.Println("Counter:", atomic.LoadInt64(&counter)) // Read the final counter value atomically
}
Using atomic functions in this way helps avoid race conditions and ensures that the shared variable (counter in this case) is safely updated in a concurrent environment. The atomic package provides a variety of atomic operations for different types, such as integers and pointers.
Understanding concurrency in Go involves mastering the use of goroutines, channels, mutex, select, context, and atomic operations. These concepts empower developers to create efficient, concurrent programs that make the most of the Go programming language. Always consider the specific needs of your application when choosing the appropriate concurrency mechanisms.
Find the complete source code and examples on GitHub.