diff --git a/config.go b/config.go index bdd9ccc..18d83cf 100644 --- a/config.go +++ b/config.go @@ -39,7 +39,7 @@ type Config struct { ConcurrentChromeCrawlers int CrawlingInterval time.Duration // Refres crawled results in... MaxPagesPerDomain int // Max pages to crawl per domain - IndexRefreshInterval time.Duration // Interval for periodic index refresh (e.g., "10m") + IndexBatchSize int DriveCache CacheConfig RamCache CacheConfig @@ -60,7 +60,7 @@ var defaultConfig = Config{ ConcurrentChromeCrawlers: 4, CrawlingInterval: 24 * time.Hour, MaxPagesPerDomain: 10, - IndexRefreshInterval: 2 * time.Minute, + IndexBatchSize: 50, LogLevel: 1, DriveCache: CacheConfig{ Duration: 48 * time.Hour, // Added @@ -255,7 +255,7 @@ func saveConfig(config Config) { indexerSec.Key("ConcurrentChromeCrawlers").SetValue(strconv.Itoa(config.ConcurrentStandardCrawlers)) indexerSec.Key("CrawlingInterval").SetValue(config.CrawlingInterval.String()) indexerSec.Key("MaxPagesPerDomain").SetValue(strconv.Itoa(config.MaxPagesPerDomain)) - indexerSec.Key("IndexRefreshInterval").SetValue(config.IndexRefreshInterval.String()) + indexerSec.Key("IndexBatchSize").SetValue(strconv.Itoa(config.IndexBatchSize)) // DriveCache section driveSec := cfg.Section("DriveCache") @@ -303,7 +303,7 @@ func loadConfig() Config { concurrentChromeCrawlers := getConfigValue(cfg.Section("Indexer").Key("ConcurrentChromeCrawlers"), defaultConfig.ConcurrentChromeCrawlers, strconv.Atoi) crawlingInterval := getConfigValue(cfg.Section("Indexer").Key("CrawlingInterval"), defaultConfig.CrawlingInterval, time.ParseDuration) maxPagesPerDomain := getConfigValue(cfg.Section("Indexer").Key("MaxPagesPerDomain"), defaultConfig.MaxPagesPerDomain, strconv.Atoi) - indexRefreshInterval := getConfigValue(cfg.Section("Indexer").Key("IndexRefreshInterval"), defaultConfig.IndexRefreshInterval, time.ParseDuration) + indexBatchSize := getConfigValue(cfg.Section("Indexer").Key("IndexBatchSize"), defaultConfig.IndexBatchSize, strconv.Atoi) // DriveCache driveDuration := getConfigValue(cfg.Section("DriveCache").Key("Duration"), defaultConfig.DriveCache.Duration, time.ParseDuration) @@ -334,7 +334,7 @@ func loadConfig() Config { ConcurrentChromeCrawlers: concurrentChromeCrawlers, CrawlingInterval: crawlingInterval, MaxPagesPerDomain: maxPagesPerDomain, - IndexRefreshInterval: indexRefreshInterval, + IndexBatchSize: indexBatchSize, DriveCache: CacheConfig{ Duration: driveDuration, MaxUsageBytes: driveMaxUsage, diff --git a/crawler.go b/crawler.go index 3ddc36b..afa7f9e 100644 --- a/crawler.go +++ b/crawler.go @@ -2,7 +2,6 @@ package main import ( "bufio" - "fmt" "os" "path/filepath" "strings" @@ -45,14 +44,20 @@ func runCrawlerAndIndexer() { } // 2. Crawl each domain and write results to data_to_index.txt - outFile := filepath.Join(config.DriveCache.Path, "data_to_index.txt") - if err := crawlDomainsToFile(domains, outFile, config.MaxPagesPerDomain); err != nil { + if err := crawlDomainsToFile(domains, config.MaxPagesPerDomain); err != nil { printErr("Error crawling domains: %v", err) return } - // 3. Re-index data_to_index.txt periodically based on IndexRefreshInterval - startPeriodicIndexing(outFile, config.IndexRefreshInterval) + // After finishing crawling, flush any pending visited-urls + if visitedStore != nil { + if err := visitedStore.Flush(); err != nil { + printErr("Failed to flush visitedStore: %v", err) + } + } + + // 3. Re-index data_to_index.txt based on IndexRefreshInterval + //startPeriodicIndexing(outFile, config.IndexRefreshInterval) printDebug("Crawl + index refresh completed.") } @@ -89,16 +94,10 @@ func readDomainsCSV(csvPath string) ([][2]string, error) { // crawlDomainsToFile does an async pipeline: // 1. "standard" goroutines read from standardCh -> attempt standard extraction -> if fails, push to chromeCh // 2. "chrome" goroutines read from chromeCh -> attempt chromedp extraction -> if fails, skip -func crawlDomainsToFile(domains [][2]string, outFile string, maxPages int) error { - - var mu sync.Mutex - - // Open file for writing (truncate if existing) - file, err := os.OpenFile(outFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) - if err != nil { - return fmt.Errorf("unable to open %s for writing: %v", outFile, err) - } - defer file.Close() +// +// Now, instead of writing to a file, we directly index each result into Bleve via indexDocImmediately(...). +func crawlDomainsToFile(domains [][2]string, maxPages int) error { + var mu sync.Mutex // Used if needed to protect shared data. (Here mainly for visitedStore.) // Prepare channels standardCh := make(chan [2]string, 1000) // buffered channels help avoid blocking @@ -110,6 +109,7 @@ func crawlDomainsToFile(domains [][2]string, outFile string, maxPages int) error wgStandard.Add(1) go func() { defer wgStandard.Done() + for dom := range standardCh { rank := dom[0] domainName := dom[1] @@ -118,14 +118,17 @@ func crawlDomainsToFile(domains [][2]string, outFile string, maxPages int) error } fullURL := "https://" + domainName - // 1. Check if we've already visited this URL + // Mark visited so we don't re-crawl duplicates + mu.Lock() added, err := visitedStore.MarkVisited(fullURL) + mu.Unlock() + if err != nil { printErr("MarkVisited error for %s: %v", fullURL, err) continue } if !added { - // Already visited + // Already visited, skip continue } @@ -139,13 +142,11 @@ func crawlDomainsToFile(domains [][2]string, outFile string, maxPages int) error continue } - // 3. Write to file - line := fmt.Sprintf("%s|%s|%s|%s|%s\n", - fullURL, title, keywords, desc, rank) - - mu.Lock() - _, _ = file.WriteString(line) - mu.Unlock() + // 3. Directly index + err = indexDocImmediately(fullURL, title, keywords, desc, rank) + if err != nil { + printErr("Index error for %s: %v", fullURL, err) + } } }() } @@ -156,6 +157,7 @@ func crawlDomainsToFile(domains [][2]string, outFile string, maxPages int) error wgChrome.Add(1) go func() { defer wgChrome.Done() + for dom := range chromeCh { rank := dom[0] domainName := dom[1] @@ -164,28 +166,19 @@ func crawlDomainsToFile(domains [][2]string, outFile string, maxPages int) error } fullURL := "https://" + domainName - // We already marked it visited in the standard pass - // but you may re-check if you prefer: - // - // added, err := visitedStore.MarkVisited(fullURL) - // if err != nil { ... } - // if !added { continue } - // 3. Chromedp fallback extraction userAgent, _ := GetUserAgent("crawler-chrome") title, desc, keywords := fetchPageMetadataChrome(fullURL, userAgent) if title == "" || desc == "" { - printWarn("Skipping (Chrome) %s: missing title/desc", fullURL) + printWarn("Skipping %s: unable to get title/desc data", fullURL) continue } - // 4. Write to file - line := fmt.Sprintf("%s|%s|%s|%s|%s\n", - fullURL, title, keywords, desc, rank) - - mu.Lock() - _, _ = file.WriteString(line) - mu.Unlock() + // 4. Directly index the doc + err := indexDocImmediately(fullURL, title, keywords, desc, rank) + if err != nil { + printErr("Index error for %s: %v", fullURL, err) + } } }() } @@ -195,7 +188,6 @@ func crawlDomainsToFile(domains [][2]string, outFile string, maxPages int) error for _, dom := range domains { standardCh <- dom } - // close the standardCh once all are queued close(standardCh) }() @@ -208,7 +200,7 @@ func crawlDomainsToFile(domains [][2]string, outFile string, maxPages int) error // Wait for chrome workers to finish wgChrome.Wait() - // Optionally flush the visited store once more + // Flush visitedStore if visitedStore != nil { if err := visitedStore.Flush(); err != nil { printErr("visitedStore flush error: %v", err) diff --git a/indexer.go b/indexer.go index 306c28d..73ca9e3 100644 --- a/indexer.go +++ b/indexer.go @@ -8,6 +8,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "time" "github.com/blevesearch/bleve/v2" @@ -26,22 +27,123 @@ type Document struct { var ( // Global Bleve index handle - bleveIndex bleve.Index + bleveIndex bleve.Index + docBuffer []Document + docBufferMu sync.Mutex ) -// startPeriodicIndexing refreshes the index from a file periodically -func startPeriodicIndexing(filePath string, interval time.Duration) { +// // startPeriodicIndexing refreshes the index from a file periodically +// func startPeriodicIndexing(filePath string, interval time.Duration) { +// go func() { +// for { +// printDebug("Refreshing index from %s", filePath) +// if err := IndexFile(filePath); err != nil { +// printErr("Failed to refresh index: %v", err) +// } +// time.Sleep(interval) +// } +// }() +// } + +// indexDocImmediately indexes a single document into the Bleve index. +func indexDocImmediately(link, title, tags, desc, rank string) error { + pop, _ := strconv.ParseInt(rank, 10, 64) + normalized := normalizeDomain(link) + + doc := Document{ + ID: normalized, + Link: link, + Title: title, + Tags: tags, + Description: desc, + Popularity: pop, + } + + // Insert directly into the Bleve index + err := bleveIndex.Index(doc.ID, map[string]interface{}{ + "title": doc.Title, + "description": doc.Description, + "link": doc.Link, + "tags": doc.Tags, + "popularity": doc.Popularity, + }) + if err != nil { + return fmt.Errorf("failed to index doc %s: %v", link, err) + } + return nil +} + +// StartBatchIndexing spawns a goroutine that flushes the buffer every interval. +func StartBatchIndexing() { go func() { - for { - printDebug("Refreshing index from %s", filePath) - if err := IndexFile(filePath); err != nil { - printErr("Failed to refresh index: %v", err) - } - time.Sleep(interval) + ticker := time.NewTicker(config.IndexRefreshInterval) + defer ticker.Stop() + + for range ticker.C { + flushDocBuffer() } }() } +func flushDocBuffer() { + docBufferMu.Lock() + defer docBufferMu.Unlock() + + if len(docBuffer) == 0 { + return + } + + batch := bleveIndex.NewBatch() + for _, doc := range docBuffer { + err := batch.Index(doc.ID, map[string]interface{}{ + "title": doc.Title, + "description": doc.Description, + "link": doc.Link, + "tags": doc.Tags, + "popularity": doc.Popularity, + }) + if err != nil { + printErr("batch index error for %s: %v", doc.Link, err) + } + } + // Attempt to commit the batch + if err := bleveIndex.Batch(batch); err != nil { + printErr("error committing batch: %v", err) + } + + // Clear the buffer + docBuffer = docBuffer[:0] +} + +// indexDocBatch queues a single document into memory, which gets flushed by the ticker. +func indexDocBatch(link, title, tags, desc, rank string) error { + pop, _ := strconv.ParseInt(rank, 10, 64) + normalized := normalizeDomain(link) + + doc := Document{ + ID: normalized, + Link: link, + Title: title, + Tags: tags, + Description: desc, + Popularity: pop, + } + + docBufferMu.Lock() + docBuffer = append(docBuffer, doc) + + // Optional: if we exceed config.IndexBatchSize, flush immediately + if len(docBuffer) >= config.IndexBatchSize { + go func() { + // flush in a separate goroutine to avoid blocking + flushDocBuffer() + }() + } + docBufferMu.Unlock() + + return nil +} + // InitIndex ensures that the Bleve index is created or opened. func InitIndex() error { idx, err := bleve.Open(filepath.Join(config.DriveCache.Path, "index.bleve")) diff --git a/init.go b/init.go index 7a6dba2..666d93a 100644 --- a/init.go +++ b/init.go @@ -3,8 +3,6 @@ package main import ( "flag" "os" - "path/filepath" - "time" ) var config Config @@ -109,16 +107,17 @@ func main() { return } - webCrawlerInit() - err := InitIndex() if err != nil { printErr("Failed to initialize index:", err) } - // Start periodic indexing (every 2 minutes) - dataFilePath := filepath.Join(config.DriveCache.Path, "data_to_index.txt") - startPeriodicIndexing(dataFilePath, 2*time.Minute) + webCrawlerInit() + + // No longer needed as crawled data are indexed imidietly + // // Start periodic indexing (every 2 minutes) + // dataFilePath := filepath.Join(config.DriveCache.Path, "data_to_index.txt") + // startPeriodicIndexing(dataFilePath, 2*time.Minute) printInfo("Indexer is enabled.") } else {