Search/indexer.go

321 lines
7.9 KiB
Go
Raw Permalink Normal View History

package main
import (
"bufio"
"fmt"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/blevesearch/bleve/v2"
"golang.org/x/net/publicsuffix"
)
// Document represents a single document to be indexed.
type Document struct {
ID string `json:"id"`
Link string `json:"link"`
Title string `json:"title"`
Tags string `json:"tags"`
Description string `json:"description"`
Popularity int64 `json:"popularity"`
}
var (
// Global Bleve index handle
bleveIndex bleve.Index
docBuffer []Document
docBufferMu sync.Mutex
)
// // startPeriodicIndexing refreshes the index from a file periodically
// func startPeriodicIndexing(filePath string, interval time.Duration) {
// go func() {
// for {
// printDebug("Refreshing index from %s", filePath)
// if err := IndexFile(filePath); err != nil {
// printErr("Failed to refresh index: %v", err)
// }
// time.Sleep(interval)
// }
// }()
// }
// indexDocImmediately indexes a single document into the Bleve index.
func indexDocImmediately(link, title, tags, desc, rank string) error {
pop, _ := strconv.ParseInt(rank, 10, 64)
normalized := normalizeDomain(link)
doc := Document{
ID: normalized,
Link: link,
Title: title,
Tags: tags,
Description: desc,
Popularity: pop,
}
// Insert directly into the Bleve index
err := bleveIndex.Index(doc.ID, map[string]interface{}{
"title": doc.Title,
"description": doc.Description,
"link": doc.Link,
"tags": doc.Tags,
"popularity": doc.Popularity,
})
if err != nil {
return fmt.Errorf("failed to index doc %s: %v", link, err)
}
return nil
}
// // StartBatchIndexing spawns a goroutine that flushes the buffer every interval.
// func StartBatchIndexing() {
// go func() {
// ticker := time.NewTicker(config.IndexRefreshInterval)
// defer ticker.Stop()
// for range ticker.C {
// flushDocBuffer()
// }
// }()
// }
func flushDocBuffer() {
docBufferMu.Lock()
defer docBufferMu.Unlock()
if len(docBuffer) == 0 {
return
}
batch := bleveIndex.NewBatch()
for _, doc := range docBuffer {
err := batch.Index(doc.ID, map[string]interface{}{
"title": doc.Title,
"description": doc.Description,
"link": doc.Link,
"tags": doc.Tags,
"popularity": doc.Popularity,
})
if err != nil {
printErr("batch index error for %s: %v", doc.Link, err)
}
}
// Attempt to commit the batch
if err := bleveIndex.Batch(batch); err != nil {
printErr("error committing batch: %v", err)
}
// Clear the buffer
docBuffer = docBuffer[:0]
}
// indexDocBatch queues a single document into memory, which gets flushed by the ticker.
func indexDocBatch(link, title, tags, desc, rank string) error {
pop, _ := strconv.ParseInt(rank, 10, 64)
normalized := normalizeDomain(link)
doc := Document{
ID: normalized,
Link: link,
Title: title,
Tags: tags,
Description: desc,
Popularity: pop,
}
docBufferMu.Lock()
docBuffer = append(docBuffer, doc)
// Optional: if we exceed config.IndexBatchSize, flush immediately
if len(docBuffer) >= config.IndexBatchSize {
go func() {
// flush in a separate goroutine to avoid blocking
flushDocBuffer()
}()
}
docBufferMu.Unlock()
return nil
}
// InitIndex ensures that the Bleve index is created or opened.
func InitIndex() error {
idx, err := bleve.Open(filepath.Join(config.DriveCache.Path, "index.bleve"))
if err == bleve.ErrorIndexPathDoesNotExist {
// Index doesn't exist, create a new one
mapping := bleve.NewIndexMapping()
docMapping := bleve.NewDocumentMapping()
// Text fields
titleFieldMapping := bleve.NewTextFieldMapping()
titleFieldMapping.Analyzer = "standard"
docMapping.AddFieldMappingsAt("title", titleFieldMapping)
descFieldMapping := bleve.NewTextFieldMapping()
descFieldMapping.Analyzer = "standard"
docMapping.AddFieldMappingsAt("description", descFieldMapping)
tagFieldMapping := bleve.NewTextFieldMapping()
tagFieldMapping.Analyzer = "standard"
docMapping.AddFieldMappingsAt("tags", tagFieldMapping)
// Numeric field for popularity
popularityMapping := bleve.NewNumericFieldMapping()
docMapping.AddFieldMappingsAt("popularity", popularityMapping)
mapping.AddDocumentMapping("Document", docMapping)
idx, err = bleve.New(filepath.Join(config.DriveCache.Path, "index.bleve"), mapping)
if err != nil {
return fmt.Errorf("failed to create index: %v", err)
}
} else if err != nil {
return fmt.Errorf("failed to open index: %v", err)
}
bleveIndex = idx
return nil
}
func normalizeDomain(rawURL string) string {
parsed, err := url.Parse(rawURL)
if err != nil {
return rawURL
}
domain, err := publicsuffix.EffectiveTLDPlusOne(parsed.Hostname())
if err != nil {
return parsed.Hostname() // fallback
}
return domain
}
// IndexFile reads a file line-by-line and indexes each line as a document.
func IndexFile(filePath string) error {
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("unable to open file for indexing: %v", err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
batch := bleveIndex.NewBatch()
// Map to track normalized domains weve already indexed
indexedDomains := make(map[string]bool)
for scanner.Scan() {
line := scanner.Text()
// link|title|tags|description|popularity
parts := strings.SplitN(line, "|", 5)
if len(parts) < 5 {
continue
}
// Normalize domain part so duplicates share the same “key”
normalized := normalizeDomain(parts[0])
popularity, _ := strconv.ParseInt(parts[4], 10, 64)
if indexedDomains[normalized] {
continue
}
doc := Document{
ID: normalized,
Link: parts[0],
Title: parts[1],
Tags: parts[2],
Description: parts[3],
Popularity: popularity,
}
err := batch.Index(doc.ID, map[string]interface{}{
"title": doc.Title,
"description": doc.Description,
"link": doc.Link,
"tags": doc.Tags,
"popularity": doc.Popularity,
})
if err != nil {
return fmt.Errorf("failed to index document: %v", err)
}
indexedDomains[normalized] = true
}
if err := bleveIndex.Batch(batch); err != nil {
return fmt.Errorf("error committing batch: %v", err)
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("error reading file: %v", err)
}
printDebug("Indexed %d unique normalized domains from %s", len(indexedDomains), filePath)
return nil
}
// SearchIndex performs a full-text search on the indexed data.
func SearchIndex(queryStr string, page, pageSize int) ([]Document, error) {
// Check if the indexer is enabled
if !config.IndexerEnabled {
return nil, fmt.Errorf("indexer is disabled")
}
exactMatch := bleve.NewMatchQuery(queryStr) // Exact match
fuzzyMatch := bleve.NewFuzzyQuery(queryStr) // Fuzzy match
fuzzyMatch.Fuzziness = 2
prefixMatch := bleve.NewPrefixQuery(queryStr) // Prefix match
query := bleve.NewDisjunctionQuery(exactMatch, fuzzyMatch, prefixMatch)
req := bleve.NewSearchRequest(query)
req.Fields = []string{"title", "description", "link", "tags", "popularity"}
// Pagination
req.Size = pageSize
req.From = (page - 1) * pageSize
// Sort primarily by relevance (score), then by popularity descending
req.SortBy([]string{"-_score", "-popularity"})
res, err := bleveIndex.Search(req)
if err != nil {
return nil, fmt.Errorf("search error: %v", err)
}
var docs []Document
for _, hit := range res.Hits {
title := fmt.Sprintf("%v", hit.Fields["title"])
description := fmt.Sprintf("%v", hit.Fields["description"])
link := fmt.Sprintf("%v", hit.Fields["link"])
tags := fmt.Sprintf("%v", hit.Fields["tags"])
popularity := int64(0)
if pop, ok := hit.Fields["popularity"].(float64); ok {
popularity = int64(pop)
}
if link == "<nil>" || title == "<nil>" {
continue
}
docs = append(docs, Document{
ID: hit.ID,
Title: title,
Description: description,
Link: link,
Tags: tags,
Popularity: popularity,
})
}
return docs, nil
}