Search/indexer.go

320 lines
7.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"bufio"
"fmt"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/blevesearch/bleve/v2"
"golang.org/x/net/publicsuffix"
)
// Document represents a single document to be indexed.
type Document struct {
ID string `json:"id"`
Link string `json:"link"`
Title string `json:"title"`
Tags string `json:"tags"`
Description string `json:"description"`
Popularity int64 `json:"popularity"`
}
var (
// Global Bleve index handle
bleveIndex bleve.Index
docBuffer []Document
docBufferMu sync.Mutex
)
// // startPeriodicIndexing refreshes the index from a file periodically
// func startPeriodicIndexing(filePath string, interval time.Duration) {
// go func() {
// for {
// printDebug("Refreshing index from %s", filePath)
// if err := IndexFile(filePath); err != nil {
// printErr("Failed to refresh index: %v", err)
// }
// time.Sleep(interval)
// }
// }()
// }
// indexDocImmediately indexes a single document into the Bleve index.
func indexDocImmediately(link, title, tags, desc, rank string) error {
pop, _ := strconv.ParseInt(rank, 10, 64)
normalized := normalizeDomain(link)
doc := Document{
ID: normalized,
Link: link,
Title: title,
Tags: tags,
Description: desc,
Popularity: pop,
}
// Insert directly into the Bleve index
err := bleveIndex.Index(doc.ID, map[string]interface{}{
"title": doc.Title,
"description": doc.Description,
"link": doc.Link,
"tags": doc.Tags,
"popularity": doc.Popularity,
})
if err != nil {
return fmt.Errorf("failed to index doc %s: %v", link, err)
}
return nil
}
// // StartBatchIndexing spawns a goroutine that flushes the buffer every interval.
// func StartBatchIndexing() {
// go func() {
// ticker := time.NewTicker(config.IndexRefreshInterval)
// defer ticker.Stop()
// for range ticker.C {
// flushDocBuffer()
// }
// }()
// }
func flushDocBuffer() {
docBufferMu.Lock()
defer docBufferMu.Unlock()
if len(docBuffer) == 0 {
return
}
batch := bleveIndex.NewBatch()
for _, doc := range docBuffer {
err := batch.Index(doc.ID, map[string]interface{}{
"title": doc.Title,
"description": doc.Description,
"link": doc.Link,
"tags": doc.Tags,
"popularity": doc.Popularity,
})
if err != nil {
printErr("batch index error for %s: %v", doc.Link, err)
}
}
// Attempt to commit the batch
if err := bleveIndex.Batch(batch); err != nil {
printErr("error committing batch: %v", err)
}
// Clear the buffer
docBuffer = docBuffer[:0]
}
// indexDocBatch queues a single document into memory, which gets flushed by the ticker.
func indexDocBatch(link, title, tags, desc, rank string) error {
pop, _ := strconv.ParseInt(rank, 10, 64)
normalized := normalizeDomain(link)
doc := Document{
ID: normalized,
Link: link,
Title: title,
Tags: tags,
Description: desc,
Popularity: pop,
}
docBufferMu.Lock()
docBuffer = append(docBuffer, doc)
// Optional: if we exceed config.IndexBatchSize, flush immediately
if len(docBuffer) >= config.IndexBatchSize {
go func() {
// flush in a separate goroutine to avoid blocking
flushDocBuffer()
}()
}
docBufferMu.Unlock()
return nil
}
// InitIndex ensures that the Bleve index is created or opened.
func InitIndex() error {
idx, err := bleve.Open(filepath.Join(config.DriveCache.Path, "index.bleve"))
if err == bleve.ErrorIndexPathDoesNotExist {
// Index doesn't exist, create a new one
mapping := bleve.NewIndexMapping()
docMapping := bleve.NewDocumentMapping()
// Text fields
titleFieldMapping := bleve.NewTextFieldMapping()
titleFieldMapping.Analyzer = "standard"
docMapping.AddFieldMappingsAt("title", titleFieldMapping)
descFieldMapping := bleve.NewTextFieldMapping()
descFieldMapping.Analyzer = "standard"
docMapping.AddFieldMappingsAt("description", descFieldMapping)
tagFieldMapping := bleve.NewTextFieldMapping()
tagFieldMapping.Analyzer = "standard"
docMapping.AddFieldMappingsAt("tags", tagFieldMapping)
// Numeric field for popularity
popularityMapping := bleve.NewNumericFieldMapping()
docMapping.AddFieldMappingsAt("popularity", popularityMapping)
mapping.AddDocumentMapping("Document", docMapping)
idx, err = bleve.New(filepath.Join(config.DriveCache.Path, "index.bleve"), mapping)
if err != nil {
return fmt.Errorf("failed to create index: %v", err)
}
} else if err != nil {
return fmt.Errorf("failed to open index: %v", err)
}
bleveIndex = idx
return nil
}
func normalizeDomain(rawURL string) string {
parsed, err := url.Parse(rawURL)
if err != nil {
return rawURL
}
domain, err := publicsuffix.EffectiveTLDPlusOne(parsed.Hostname())
if err != nil {
return parsed.Hostname() // fallback
}
return domain
}
// IndexFile reads a file line-by-line and indexes each line as a document.
func IndexFile(filePath string) error {
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("unable to open file for indexing: %v", err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
batch := bleveIndex.NewBatch()
// Map to track normalized domains weve already indexed
indexedDomains := make(map[string]bool)
for scanner.Scan() {
line := scanner.Text()
// link|title|tags|description|popularity
parts := strings.SplitN(line, "|", 5)
if len(parts) < 5 {
continue
}
// Normalize domain part so duplicates share the same “key”
normalized := normalizeDomain(parts[0])
popularity, _ := strconv.ParseInt(parts[4], 10, 64)
if indexedDomains[normalized] {
continue
}
doc := Document{
ID: normalized,
Link: parts[0],
Title: parts[1],
Tags: parts[2],
Description: parts[3],
Popularity: popularity,
}
err := batch.Index(doc.ID, map[string]interface{}{
"title": doc.Title,
"description": doc.Description,
"link": doc.Link,
"tags": doc.Tags,
"popularity": doc.Popularity,
})
if err != nil {
return fmt.Errorf("failed to index document: %v", err)
}
indexedDomains[normalized] = true
}
if err := bleveIndex.Batch(batch); err != nil {
return fmt.Errorf("error committing batch: %v", err)
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("error reading file: %v", err)
}
printDebug("Indexed %d unique normalized domains from %s", len(indexedDomains), filePath)
return nil
}
// SearchIndex performs a full-text search on the indexed data.
func SearchIndex(queryStr string, page, pageSize int) ([]Document, error) {
// Check if the indexer is enabled
if !config.IndexerEnabled {
return nil, fmt.Errorf("indexer is disabled")
}
exactMatch := bleve.NewMatchQuery(queryStr) // Exact match
fuzzyMatch := bleve.NewFuzzyQuery(queryStr) // Fuzzy match
fuzzyMatch.Fuzziness = 2
prefixMatch := bleve.NewPrefixQuery(queryStr) // Prefix match
query := bleve.NewDisjunctionQuery(exactMatch, fuzzyMatch, prefixMatch)
req := bleve.NewSearchRequest(query)
req.Fields = []string{"title", "description", "link", "tags", "popularity"}
// Pagination
req.Size = pageSize
req.From = (page - 1) * pageSize
// Sort primarily by relevance (score), then by popularity descending
req.SortBy([]string{"-_score", "-popularity"})
res, err := bleveIndex.Search(req)
if err != nil {
return nil, fmt.Errorf("search error: %v", err)
}
var docs []Document
for _, hit := range res.Hits {
title := fmt.Sprintf("%v", hit.Fields["title"])
description := fmt.Sprintf("%v", hit.Fields["description"])
link := fmt.Sprintf("%v", hit.Fields["link"])
tags := fmt.Sprintf("%v", hit.Fields["tags"])
popularity := int64(0)
if pop, ok := hit.Fields["popularity"].(float64); ok {
popularity = int64(pop)
}
if link == "<nil>" || title == "<nil>" {
continue
}
docs = append(docs, Document{
ID: hit.ID,
Title: title,
Description: description,
Link: link,
Tags: tags,
Popularity: popularity,
})
}
return docs, nil
}