320 lines
7.9 KiB
Go
320 lines
7.9 KiB
Go
package main
|
||
|
||
import (
|
||
"bufio"
|
||
"fmt"
|
||
"net/url"
|
||
"os"
|
||
"path/filepath"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
|
||
"github.com/blevesearch/bleve/v2"
|
||
"golang.org/x/net/publicsuffix"
|
||
)
|
||
|
||
// Document represents a single document to be indexed.
|
||
type Document struct {
|
||
ID string `json:"id"`
|
||
Link string `json:"link"`
|
||
Title string `json:"title"`
|
||
Tags string `json:"tags"`
|
||
Description string `json:"description"`
|
||
Popularity int64 `json:"popularity"`
|
||
}
|
||
|
||
var (
|
||
// Global Bleve index handle
|
||
bleveIndex bleve.Index
|
||
docBuffer []Document
|
||
docBufferMu sync.Mutex
|
||
)
|
||
|
||
// // startPeriodicIndexing refreshes the index from a file periodically
|
||
// func startPeriodicIndexing(filePath string, interval time.Duration) {
|
||
// go func() {
|
||
// for {
|
||
// printDebug("Refreshing index from %s", filePath)
|
||
// if err := IndexFile(filePath); err != nil {
|
||
// printErr("Failed to refresh index: %v", err)
|
||
// }
|
||
// time.Sleep(interval)
|
||
// }
|
||
// }()
|
||
// }
|
||
|
||
// indexDocImmediately indexes a single document into the Bleve index.
|
||
func indexDocImmediately(link, title, tags, desc, rank string) error {
|
||
pop, _ := strconv.ParseInt(rank, 10, 64)
|
||
normalized := normalizeDomain(link)
|
||
|
||
doc := Document{
|
||
ID: normalized,
|
||
Link: link,
|
||
Title: title,
|
||
Tags: tags,
|
||
Description: desc,
|
||
Popularity: pop,
|
||
}
|
||
|
||
// Insert directly into the Bleve index
|
||
err := bleveIndex.Index(doc.ID, map[string]interface{}{
|
||
"title": doc.Title,
|
||
"description": doc.Description,
|
||
"link": doc.Link,
|
||
"tags": doc.Tags,
|
||
"popularity": doc.Popularity,
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("failed to index doc %s: %v", link, err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// // StartBatchIndexing spawns a goroutine that flushes the buffer every interval.
|
||
// func StartBatchIndexing() {
|
||
// go func() {
|
||
// ticker := time.NewTicker(config.IndexRefreshInterval)
|
||
// defer ticker.Stop()
|
||
|
||
// for range ticker.C {
|
||
// flushDocBuffer()
|
||
// }
|
||
// }()
|
||
// }
|
||
|
||
func flushDocBuffer() {
|
||
docBufferMu.Lock()
|
||
defer docBufferMu.Unlock()
|
||
|
||
if len(docBuffer) == 0 {
|
||
return
|
||
}
|
||
|
||
batch := bleveIndex.NewBatch()
|
||
for _, doc := range docBuffer {
|
||
err := batch.Index(doc.ID, map[string]interface{}{
|
||
"title": doc.Title,
|
||
"description": doc.Description,
|
||
"link": doc.Link,
|
||
"tags": doc.Tags,
|
||
"popularity": doc.Popularity,
|
||
})
|
||
if err != nil {
|
||
printErr("batch index error for %s: %v", doc.Link, err)
|
||
}
|
||
}
|
||
// Attempt to commit the batch
|
||
if err := bleveIndex.Batch(batch); err != nil {
|
||
printErr("error committing batch: %v", err)
|
||
}
|
||
|
||
// Clear the buffer
|
||
docBuffer = docBuffer[:0]
|
||
}
|
||
|
||
// indexDocBatch queues a single document into memory, which gets flushed by the ticker.
|
||
func indexDocBatch(link, title, tags, desc, rank string) error {
|
||
pop, _ := strconv.ParseInt(rank, 10, 64)
|
||
normalized := normalizeDomain(link)
|
||
|
||
doc := Document{
|
||
ID: normalized,
|
||
Link: link,
|
||
Title: title,
|
||
Tags: tags,
|
||
Description: desc,
|
||
Popularity: pop,
|
||
}
|
||
|
||
docBufferMu.Lock()
|
||
docBuffer = append(docBuffer, doc)
|
||
|
||
// Optional: if we exceed config.IndexBatchSize, flush immediately
|
||
if len(docBuffer) >= config.IndexBatchSize {
|
||
go func() {
|
||
// flush in a separate goroutine to avoid blocking
|
||
flushDocBuffer()
|
||
}()
|
||
}
|
||
docBufferMu.Unlock()
|
||
|
||
return nil
|
||
}
|
||
|
||
// InitIndex ensures that the Bleve index is created or opened.
|
||
func InitIndex() error {
|
||
idx, err := bleve.Open(filepath.Join(config.DriveCache.Path, "index.bleve"))
|
||
if err == bleve.ErrorIndexPathDoesNotExist {
|
||
// Index doesn't exist, create a new one
|
||
mapping := bleve.NewIndexMapping()
|
||
|
||
docMapping := bleve.NewDocumentMapping()
|
||
|
||
// Text fields
|
||
titleFieldMapping := bleve.NewTextFieldMapping()
|
||
titleFieldMapping.Analyzer = "standard"
|
||
docMapping.AddFieldMappingsAt("title", titleFieldMapping)
|
||
|
||
descFieldMapping := bleve.NewTextFieldMapping()
|
||
descFieldMapping.Analyzer = "standard"
|
||
docMapping.AddFieldMappingsAt("description", descFieldMapping)
|
||
|
||
tagFieldMapping := bleve.NewTextFieldMapping()
|
||
tagFieldMapping.Analyzer = "standard"
|
||
docMapping.AddFieldMappingsAt("tags", tagFieldMapping)
|
||
|
||
// Numeric field for popularity
|
||
popularityMapping := bleve.NewNumericFieldMapping()
|
||
docMapping.AddFieldMappingsAt("popularity", popularityMapping)
|
||
|
||
mapping.AddDocumentMapping("Document", docMapping)
|
||
|
||
idx, err = bleve.New(filepath.Join(config.DriveCache.Path, "index.bleve"), mapping)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to create index: %v", err)
|
||
}
|
||
} else if err != nil {
|
||
return fmt.Errorf("failed to open index: %v", err)
|
||
}
|
||
|
||
bleveIndex = idx
|
||
return nil
|
||
}
|
||
|
||
func normalizeDomain(rawURL string) string {
|
||
parsed, err := url.Parse(rawURL)
|
||
if err != nil {
|
||
return rawURL
|
||
}
|
||
domain, err := publicsuffix.EffectiveTLDPlusOne(parsed.Hostname())
|
||
if err != nil {
|
||
return parsed.Hostname() // fallback
|
||
}
|
||
return domain
|
||
}
|
||
|
||
// IndexFile reads a file line-by-line and indexes each line as a document.
|
||
func IndexFile(filePath string) error {
|
||
file, err := os.Open(filePath)
|
||
if err != nil {
|
||
return fmt.Errorf("unable to open file for indexing: %v", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
scanner := bufio.NewScanner(file)
|
||
batch := bleveIndex.NewBatch()
|
||
|
||
// Map to track normalized domains we’ve already indexed
|
||
indexedDomains := make(map[string]bool)
|
||
|
||
for scanner.Scan() {
|
||
line := scanner.Text()
|
||
|
||
// link|title|tags|description|popularity
|
||
parts := strings.SplitN(line, "|", 5)
|
||
if len(parts) < 5 {
|
||
continue
|
||
}
|
||
|
||
// Normalize domain part so duplicates share the same “key”
|
||
normalized := normalizeDomain(parts[0])
|
||
popularity, _ := strconv.ParseInt(parts[4], 10, 64)
|
||
|
||
if indexedDomains[normalized] {
|
||
continue
|
||
}
|
||
|
||
doc := Document{
|
||
ID: normalized,
|
||
Link: parts[0],
|
||
Title: parts[1],
|
||
Tags: parts[2],
|
||
Description: parts[3],
|
||
Popularity: popularity,
|
||
}
|
||
|
||
err := batch.Index(doc.ID, map[string]interface{}{
|
||
"title": doc.Title,
|
||
"description": doc.Description,
|
||
"link": doc.Link,
|
||
"tags": doc.Tags,
|
||
"popularity": doc.Popularity,
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("failed to index document: %v", err)
|
||
}
|
||
|
||
indexedDomains[normalized] = true
|
||
}
|
||
|
||
if err := bleveIndex.Batch(batch); err != nil {
|
||
return fmt.Errorf("error committing batch: %v", err)
|
||
}
|
||
|
||
if err := scanner.Err(); err != nil {
|
||
return fmt.Errorf("error reading file: %v", err)
|
||
}
|
||
|
||
printDebug("Indexed %d unique normalized domains from %s", len(indexedDomains), filePath)
|
||
return nil
|
||
}
|
||
|
||
// SearchIndex performs a full-text search on the indexed data.
|
||
func SearchIndex(queryStr string, page, pageSize int) ([]Document, error) {
|
||
// Check if the indexer is enabled
|
||
if !config.IndexerEnabled {
|
||
return nil, fmt.Errorf("indexer is disabled")
|
||
}
|
||
|
||
exactMatch := bleve.NewMatchQuery(queryStr) // Exact match
|
||
fuzzyMatch := bleve.NewFuzzyQuery(queryStr) // Fuzzy match
|
||
fuzzyMatch.Fuzziness = 2
|
||
prefixMatch := bleve.NewPrefixQuery(queryStr) // Prefix match
|
||
|
||
query := bleve.NewDisjunctionQuery(exactMatch, fuzzyMatch, prefixMatch)
|
||
|
||
req := bleve.NewSearchRequest(query)
|
||
req.Fields = []string{"title", "description", "link", "tags", "popularity"}
|
||
|
||
// Pagination
|
||
req.Size = pageSize
|
||
req.From = (page - 1) * pageSize
|
||
|
||
// Sort primarily by relevance (score), then by popularity descending
|
||
req.SortBy([]string{"-_score", "-popularity"})
|
||
|
||
res, err := bleveIndex.Search(req)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("search error: %v", err)
|
||
}
|
||
|
||
var docs []Document
|
||
for _, hit := range res.Hits {
|
||
title := fmt.Sprintf("%v", hit.Fields["title"])
|
||
description := fmt.Sprintf("%v", hit.Fields["description"])
|
||
link := fmt.Sprintf("%v", hit.Fields["link"])
|
||
tags := fmt.Sprintf("%v", hit.Fields["tags"])
|
||
popularity := int64(0)
|
||
|
||
if pop, ok := hit.Fields["popularity"].(float64); ok {
|
||
popularity = int64(pop)
|
||
}
|
||
|
||
if link == "<nil>" || title == "<nil>" {
|
||
continue
|
||
}
|
||
|
||
docs = append(docs, Document{
|
||
ID: hit.ID,
|
||
Title: title,
|
||
Description: description,
|
||
Link: link,
|
||
Tags: tags,
|
||
Popularity: popularity,
|
||
})
|
||
}
|
||
|
||
return docs, nil
|
||
}
|