Search code examples
goconcurrencygoroutine

How to close channel when there is unknown number of inputs to it?


API : https://jsonmock.hackerrank.com/api/articles?page=1

package main

import (
    "fmt"
    "net/http"
    "encoding/json"
    "strconv"
    "sync"
)

type ArticleResponse struct {
    Page       int `json:"page"`
    PerPage    int `json:"per_page"`
    Total      int `json:"total"`
    TotalPages int `json:"total_pages"`
    Data []Article `json:"data"`
}

type Article struct {
    Title       string      `json:"title"`
    URL         string      `json:"url"`
    Author      string      `json:"author"`
    NumComments int         `json:"num_comments"`
    StoryID     int32 `json:"story_id"`
    StoryTitle  string `json:"story_title"`
    StoryURL    string `json:"story_url"`
    ParentID    int32 `json:"parent_id"`
    CreatedAt   int         `json:"created_at"`
}

type CommentTitle struct{
    NumberOfComments int `json:"number_of_comments"`
    Title string `json:"title"`
    Page int `json:"from_page"`
}

const (
    GET_HOST_URL = "https://jsonmock.hackerrank.com/"
    PATH = "api/articles"
)

var wg sync.WaitGroup

func main(){
    comment_title_chan := make(chan CommentTitle)
    var commentTitleSlice []CommentTitle
    
    // pilot call to get total number of pages
    totalPage := makePaginatedRequest(1, comment_title_chan, true)

    // making concurrent requests to multiple pages at once
    for j:=1;j<=totalPage;j++{
        go makePaginatedRequest(j, comment_title_chan, false)
    }
    for j:=0; j<20;j++ {
        commentTitleSlice = append(commentTitleSlice, <-comment_title_chan)
    }
    
    for _,article := range commentTitleSlice{
        fmt.Println(article.NumberOfComments, "\t\t", article.Title)
    }
}

func makePaginatedRequest(pageNo int, chunk chan CommentTitle, pilotMode bool) int{
    uri := GET_HOST_URL + PATH
    req, err := http.NewRequest("GET", uri, nil)
    if err != nil {
        fmt.Println(err)
    }
    q := req.URL.Query()
    q.Add("page", strconv.Itoa(pageNo))
    req.URL.RawQuery = q.Encode()
    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        fmt.Println("Error on response.\n[ERROR] -", err)
    }
    defer resp.Body.Close()
    var articleResponse ArticleResponse
    if err = json.NewDecoder(resp.Body).Decode(&articleResponse) ; err != nil{
        fmt.Println(err)
    }
    if !pilotMode{
        for _, article := range articleResponse.Data{
            if(article.Title != "" && article.NumComments != 0){
                ct := CommentTitle{article.NumComments, article.Title, pageNo}
                wg.Add(1)
                chunk <- ct
                wg.Done()
            }
        }
        wg.Wait()
    }
    return articleResponse.TotalPages
}

Problem statement:

There is one api which gives data when passed page number in query params. I am expected to make a call all pages and fetch all articles with valid titles and number of comments field.

Solution:

step-1: I first make a pilot call to api to get to know number of pages as it is a part of response json.

step-2: I spin up multiple goroutines(number of goroutines = total number of pages)

step-3: each goroutine will make a call to corresponding page and fetches data and sends it to data channel.

step-4: data received in channel is appended to a slice , slice is used for further computation(sorting based on number of comments of an article)

Problem: I am not aware out of total number of records - how many are valid, Hence I dont know when to close the channel from sender (Multiple sender and single receiver in my scenario).

I tried using one more additional signal channel, but again when will I know all goroutines have completed their job so that i can send a signal for further computation ?

I have even used WaitGroup, but this is at a individual goroutine level - still i will not be able to know when all goroutines have completed theit job.

One more similar question in SO didn't help much : Closing channel of unknown length

Update : In the code I have hardcoded j loop value to 20 - that is exactly where I am facing issue. I dont know till where to loop, If i increase it to more then 50, received is blocked.


Solution

  • You are using the waitgroup incorrectly.

    Add to the waitgroup when you create a goroutine, and wait for everything to complete in a separate goroutine, then close the channel:

    for j:=1;j<=totalPage;j++{
            wg.Add(1)
            go makePaginatedRequest(j, comment_title_chan, false)
    }
    go func() {
      wg.Wait()
      close(comment_title_chan)
    }()
    

    Mark it completed when goroutine returns:

    func makePaginatedRequest(pageNo int, chunk chan CommentTitle, pilotMode bool) int{
      defer wg.Done()
      ...