package main import ( "bufio" "os" "path/filepath" "strings" "sync" "time" ) // Create a global or config-level visited store var visitedStore *VisitedStore // webCrawlerInit is called during init on program start func webCrawlerInit() { store, err := NewVisitedStore(filepath.Join(config.DriveCache.Path, "visited-urls.txt"), config.IndexBatchSize) if err != nil { printErr("Failed to initialize visited store: %v", err) } visitedStore = store // Start the periodic crawler go func() { // First run immediately runCrawlerAndIndexer() // Then run periodically ticker := time.NewTicker(config.CrawlingInterval) for range ticker.C { runCrawlerAndIndexer() } }() } // runCrawlerAndIndexer reads domains.csv -> crawls -> writes to data_to_index.txt -> reindexes func runCrawlerAndIndexer() { // 1. Read domains.csv domains, err := readDomainsCSV(filepath.Join(config.DriveCache.Path, "domains.csv")) if err != nil { printErr("Error reading domains.csv: %v", err) return } // 2. Crawl each domain and write results to data_to_index.txt if err := crawlDomainsToFile(domains, config.MaxPagesPerDomain); err != nil { printErr("Error crawling domains: %v", err) return } // After finishing crawling, flush any pending visited-urls if visitedStore != nil { if err := visitedStore.Flush(); err != nil { printErr("Failed to flush visitedStore: %v", err) } } // 3. Re-index data_to_index.txt based on IndexRefreshInterval //startPeriodicIndexing(outFile, config.IndexRefreshInterval) printDebug("Crawl + index refresh completed.") } // readDomainsCSV returns a slice of (rank,domain) from a local CSV file func readDomainsCSV(csvPath string) ([][2]string, error) { f, err := os.Open(csvPath) if err != nil { return nil, err } defer f.Close() var result [][2]string scanner := bufio.NewScanner(f) // Skip header line scanner.Scan() for scanner.Scan() { line := scanner.Text() // Split by commas, not tabs fields := strings.SplitN(line, ",", 3) // Splits into up to 3 parts (rank, domain, popularity) if len(fields) < 2 { printDebug("Skipping malformed line: %s", line) continue } // Remove quotes around fields, if present rank := strings.Trim(fields[0], `"`) domain := strings.Trim(fields[1], `"`) result = append(result, [2]string{rank, domain}) } return result, scanner.Err() } // crawlDomainsToFile does an async pipeline: // 1. "standard" goroutines read from standardCh -> attempt standard extraction -> if fails, push to chromeCh // 2. "chrome" goroutines read from chromeCh -> attempt chromedp extraction -> if fails, skip // // Now, instead of writing to a file, we directly index each result into Bleve via indexDocImmediately(...). func crawlDomainsToFile(domains [][2]string, maxPages int) error { var mu sync.Mutex // Used if needed to protect shared data. (Here mainly for visitedStore.) // Prepare channels standardCh := make(chan [2]string, 1000) // buffered channels help avoid blocking chromeCh := make(chan [2]string, 1000) // 1) Spawn standard workers var wgStandard sync.WaitGroup for i := 0; i < config.ConcurrentStandardCrawlers; i++ { wgStandard.Add(1) go func() { defer wgStandard.Done() for dom := range standardCh { rank := dom[0] domainName := dom[1] if domainName == "" { continue } fullURL := "https://" + domainName // Mark visited so we don't re-crawl duplicates mu.Lock() added, err := visitedStore.MarkVisited(fullURL) mu.Unlock() if err != nil { printErr("MarkVisited error for %s: %v", fullURL, err) continue } if !added { // Already visited, skip continue } // 2. Standard extraction userAgent, _ := GetUserAgent("crawler-std") title, desc, keywords := fetchPageMetadataStandard(fullURL, userAgent) // If missing, push to Chrome queue if title == "" || desc == "" { chromeCh <- dom continue } // 3. Directly index err = indexDocImmediately(fullURL, title, keywords, desc, rank) if err != nil { printErr("Index error for %s: %v", fullURL, err) } } }() } // 2) Spawn chrome workers var wgChrome sync.WaitGroup for i := 0; i < config.ConcurrentChromeCrawlers; i++ { wgChrome.Add(1) go func() { defer wgChrome.Done() for dom := range chromeCh { rank := dom[0] domainName := dom[1] if domainName == "" { continue } fullURL := "https://" + domainName // 3. Chromedp fallback extraction userAgent, _ := GetUserAgent("crawler-chrome") title, desc, keywords := fetchPageMetadataChrome(fullURL, userAgent) if title == "" || desc == "" { printDebug("Skipping %s: unable to get title/desc data", fullURL) // Here is print for all domains that fail to be crawled continue } // 4. Directly index the doc err := indexDocImmediately(fullURL, title, keywords, desc, rank) if err != nil { printErr("Index error for %s: %v", fullURL, err) } } }() } // Feed domains into standardCh go func() { for _, dom := range domains { standardCh <- dom } close(standardCh) }() // Wait for standard workers to finish, then close chromeCh go func() { wgStandard.Wait() close(chromeCh) }() // Wait for chrome workers to finish wgChrome.Wait() // Flush visitedStore if visitedStore != nil { if err := visitedStore.Flush(); err != nil { printErr("visitedStore flush error: %v", err) } } return nil }