Browse Source

Implemented unidirectional channels

master
Martins Eglitis 1 year ago
parent
commit
1b71f9f497
1 changed files with 26 additions and 11 deletions
  1. +26
    -11
      crawler.go

+ 26
- 11
crawler.go View File

@@ -6,11 +6,12 @@ import (
"io/ioutil"
"net/http"
"sync"
"time"
)

func request(channel chan int, worker int, wg *sync.WaitGroup, url *string) {
func request(tasks <-chan int, results chan<- int, worker int, wg *sync.WaitGroup, url *string) {
//The for loop is important as the worker will continue to work.
for task := range channel {
for task := range tasks {
res, err := http.DefaultClient.Get(fmt.Sprintf("%s/%v", *url, task))

if err != nil {
@@ -25,7 +26,8 @@ func request(channel chan int, worker int, wg *sync.WaitGroup, url *string) {
fmt.Printf("unable to parse %v : %v \n", url, res)
}

fmt.Printf("number of bytes in body : %v \n", len(body))
//Write the result to the results channel.
results <- len(body)
}

//Decrement the semaphore value.
@@ -36,11 +38,14 @@ func main() {
url := flag.String("url", "http://localhost/tests", "The URL that should be crawled")
minID := flag.Int("min-id", 0, "The minimum ID value")
maxID := flag.Int("max-id", 20, "The maximum ID value")
concurrency := flag.Int("concurrency", 5, "The number of concurrent tasks")
concurrency := flag.Int("concurrency", 10, "The number of concurrent tasks")
flag.Parse()

//Create a bi-directional channel.
channel := make(chan int)
startTime := time.Now()

//Create the channels.
tasks := make(chan int)
results := make(chan int)

//Create a waiting group.
var wg sync.WaitGroup
@@ -51,18 +56,28 @@ func main() {
//Create new workers. This must be done before writing tasks to the channel.
//Otherwise, the channel will block the current main goroutine and deadlock.
for worker := 0; worker < *concurrency; worker++ {
go request(channel, worker, &wg, url)
go request(tasks, results, worker, &wg, url)
}

//Create new tasks and write them to the channel.
for task := *minID; task < *maxID; task++ {
channel <- task
tasks <- task

fmt.Printf("The number of bytes fetched is %v \n", <-results)
}

//Close the channel. Close should be called after each task is assigned to the channel.
//Otherwise it will throw an error for writing to closed channel.
close(channel)
//Otherwise it will block.
close(tasks)

//It does not block when the results channel is not closed, but, for safety, I close
//the read-only channel as well.
close(results)

//Wait until the semaphore will become zero. This should be called after the channel is closed.
wg.Wait()
}

endTime := time.Now()

fmt.Printf("time difference is %v \n", endTime.Sub(startTime))
}

Loading…
Cancel
Save