Search/crawler-visited.go

106 lines
2.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"bufio"
"fmt"
"os"
"sync"
)
// VisitedStore handles deduplicating visited URLs with a map and a periodic flush to disk.
type VisitedStore struct {
mu sync.Mutex
visited map[string]bool
toFlush []string
filePath string
batchSize int // how many new URLs we batch before flushing
}
// NewVisitedStore creates or loads the visited URLs from filePath.
func NewVisitedStore(filePath string, batchSize int) (*VisitedStore, error) {
store := &VisitedStore{
visited: make(map[string]bool),
filePath: filePath,
batchSize: batchSize,
}
// Attempt to load existing visited URLs (if file exists).
if _, err := os.Stat(filePath); err == nil {
if err := store.loadFromFile(); err != nil {
return nil, fmt.Errorf("loadFromFile error: %w", err)
}
}
return store, nil
}
// loadFromFile loads visited URLs from the stores file. One URL per line.
func (s *VisitedStore) loadFromFile() error {
f, err := os.Open(s.filePath)
if err != nil {
return err
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
url := scanner.Text()
s.visited[url] = true
}
return scanner.Err()
}
// AlreadyVisited returns true if the URL is in the store.
func (s *VisitedStore) AlreadyVisited(url string) bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.visited[url]
}
// MarkVisited adds the URL to the store if not already present, and triggers a flush if batchSize is reached.
func (s *VisitedStore) MarkVisited(url string) (added bool, err error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.visited[url] {
return false, nil
}
// Mark in memory
s.visited[url] = true
s.toFlush = append(s.toFlush, url)
// Flush if we have enough new URLs
if len(s.toFlush) >= s.batchSize {
if err := s.flushToFileUnlocked(); err != nil {
return false, err
}
}
return true, nil
}
// Flush everything in s.toFlush to file, then clear the buffer.
func (s *VisitedStore) Flush() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.flushToFileUnlocked()
}
// flushToFileUnlocked writes s.toFlush lines to the store file, then clears s.toFlush.
func (s *VisitedStore) flushToFileUnlocked() error {
if len(s.toFlush) == 0 {
return nil
}
f, err := os.OpenFile(s.filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer f.Close()
for _, url := range s.toFlush {
if _, err := fmt.Fprintln(f, url); err != nil {
return err
}
}
s.toFlush = nil
return nil
}