changed indexing buffer to save to ram not to file

This commit is contained in:
partisan 2025-01-02 12:55:44 +01:00
parent 918e1823df
commit 61266c461a
4 changed files with 155 additions and 62 deletions

View file

@ -8,6 +8,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/blevesearch/bleve/v2"
@ -26,22 +27,123 @@ type Document struct {
var (
// Global Bleve index handle
bleveIndex bleve.Index
bleveIndex bleve.Index
docBuffer []Document
docBufferMu sync.Mutex
)
// startPeriodicIndexing refreshes the index from a file periodically
func startPeriodicIndexing(filePath string, interval time.Duration) {
// // startPeriodicIndexing refreshes the index from a file periodically
// func startPeriodicIndexing(filePath string, interval time.Duration) {
// go func() {
// for {
// printDebug("Refreshing index from %s", filePath)
// if err := IndexFile(filePath); err != nil {
// printErr("Failed to refresh index: %v", err)
// }
// time.Sleep(interval)
// }
// }()
// }
// indexDocImmediately indexes a single document into the Bleve index.
func indexDocImmediately(link, title, tags, desc, rank string) error {
pop, _ := strconv.ParseInt(rank, 10, 64)
normalized := normalizeDomain(link)
doc := Document{
ID: normalized,
Link: link,
Title: title,
Tags: tags,
Description: desc,
Popularity: pop,
}
// Insert directly into the Bleve index
err := bleveIndex.Index(doc.ID, map[string]interface{}{
"title": doc.Title,
"description": doc.Description,
"link": doc.Link,
"tags": doc.Tags,
"popularity": doc.Popularity,
})
if err != nil {
return fmt.Errorf("failed to index doc %s: %v", link, err)
}
return nil
}
// StartBatchIndexing spawns a goroutine that flushes the buffer every interval.
func StartBatchIndexing() {
go func() {
for {
printDebug("Refreshing index from %s", filePath)
if err := IndexFile(filePath); err != nil {
printErr("Failed to refresh index: %v", err)
}
time.Sleep(interval)
ticker := time.NewTicker(config.IndexRefreshInterval)
defer ticker.Stop()
for range ticker.C {
flushDocBuffer()
}
}()
}
func flushDocBuffer() {
docBufferMu.Lock()
defer docBufferMu.Unlock()
if len(docBuffer) == 0 {
return
}
batch := bleveIndex.NewBatch()
for _, doc := range docBuffer {
err := batch.Index(doc.ID, map[string]interface{}{
"title": doc.Title,
"description": doc.Description,
"link": doc.Link,
"tags": doc.Tags,
"popularity": doc.Popularity,
})
if err != nil {
printErr("batch index error for %s: %v", doc.Link, err)
}
}
// Attempt to commit the batch
if err := bleveIndex.Batch(batch); err != nil {
printErr("error committing batch: %v", err)
}
// Clear the buffer
docBuffer = docBuffer[:0]
}
// indexDocBatch queues a single document into memory, which gets flushed by the ticker.
func indexDocBatch(link, title, tags, desc, rank string) error {
pop, _ := strconv.ParseInt(rank, 10, 64)
normalized := normalizeDomain(link)
doc := Document{
ID: normalized,
Link: link,
Title: title,
Tags: tags,
Description: desc,
Popularity: pop,
}
docBufferMu.Lock()
docBuffer = append(docBuffer, doc)
// Optional: if we exceed config.IndexBatchSize, flush immediately
if len(docBuffer) >= config.IndexBatchSize {
go func() {
// flush in a separate goroutine to avoid blocking
flushDocBuffer()
}()
}
docBufferMu.Unlock()
return nil
}
// InitIndex ensures that the Bleve index is created or opened.
func InitIndex() error {
idx, err := bleve.Open(filepath.Join(config.DriveCache.Path, "index.bleve"))