50. Go - Channel Fan In / Fan Out

Fan In is taking all values from different channels and funnel them into one channel for easier processing.


Example:

// Note: these channels are taking and pulling one value at a time.

package main

import (
    "fmt"
    "sync"
)

func main() {

    // Two Channels
    channel1 := make(chan int)
    channel2 := make(chan int)

    // Funnel Channel
    channel3 := make(chan int)

    // Add values into Channel 1  then close
    go func() {
        for i := 0; i < 100; i += 2 {
            channel1 <- i
        }
        close(channel1)
    }()

    // Add values into Channel 2 then close
    go func() {
        for i := 0; i < 100; i += 3 {
            channel2 <- i
        }
        close(channel2)
    }()

    // Pull values from both channels and funnel into channel 3 until both channels are closed.
    go func(chan1, chan2 <-chan int, chan3 chan<- int) {
        var wg sync.WaitGroup
        wg.Add(2)

        // Pull from channel 1 until closed.
        go func() {
            for v := range chan1 {
                chan3 <- v
            }
            wg.Done()
        }()

        // Pull form channel 2 until closed.
        go func() {
            for v := range chan2 {
                chan3 <- v
            }
            wg.Done()
        }()

        // Wait for funneling to finish.
        wg.Wait()

        // Close channel 3 once funneling is done.
        close(channel3)
    }(channel1, channel2, channel3)

    // Pull and range over channel 3 until channel 3 is closed.
    for v := range channel3 {
        fmt.Println(v)
    }
}


Fan Out is taking all values from a single channel and spread the values to a lot of goroutines.

Example:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Populate channel with values then close. Done 1 main waitgroup.
func createWork(c chan<- int, p *sync.WaitGroup) {
    for i := 0; i < 100; i++ {
        c <- i
    }
    close(c)
    p.Done()
}

func main() {
    // Create 2 main waitgroups
    var wg sync.WaitGroup
    wg.Add(2)

    todo := make(chan int)

    // Populate channel with values then close. Done 1 main waitgroup.
    go createWork(todo, &wg)

    // Create multiple goroutines with values from channel.
    go func() {

        // Create a 2nd waitgroup for all the goroutines created.
        var wg2 sync.WaitGroup

        for v := range todo {
            wg2.Add(1)

            // Create multiple goroutines for every value in channel.
            go func(vs int) {
                time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
                fmt.Println(vs)
                wg2.Done()
            }(v)
        }

        wg2.Wait()

        wg.Done()
    }()

    wg.Wait()

}


Comments

Popular posts from this blog

2. FreeCodeCamp - Dynamic Programming - Learn to Solve Algorithmic Problems & Coding Challenges

20. Data Analytics - Analyze Data to Answer Questions - Week 1

3. Algorithms - Selection Sort