wip search requests to other nodes

This commit is contained in:
partisan 2024-08-08 21:59:10 +02:00
parent c594c93559
commit 1baa40b620
7 changed files with 411 additions and 9 deletions

View file

@ -131,13 +131,12 @@ func startFileWatcher() {
configLock.Lock()
config = loadConfig()
configLock.Unlock()
// Perform your logic here to handle the changes in the config file
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Println("Error:", err)
log.Println("Error watching configuration file:", err)
}
}
}()

View file

@ -146,6 +146,12 @@ func fetchAndCacheFileResults(query, safe, lang string, page int) []TorrentResul
return results
}
func fetchFileResults(query, safe, lang string, page int) []TorrentResult {
cacheKey := CacheKey{Query: query, Page: page, Safe: safe == "true", Lang: lang, Type: "file"}
results := getFileResultsFromCacheOrFetch(cacheKey, query, safe, lang, page)
return results
}
func removeMagnetLink(magnet string) string {
// Remove the magnet: prefix unconditionally
return strings.TrimPrefix(magnet, "magnet:")

View file

@ -4,12 +4,15 @@ import (
"encoding/json"
"fmt"
"html/template"
"log"
"math"
"net/http"
"net/url"
"time"
)
var resultsChan = make(chan []ForumSearchResult)
func PerformRedditSearch(query string, safe string, page int) ([]ForumSearchResult, error) {
const (
pageSize = 25
@ -99,9 +102,9 @@ func PerformRedditSearch(query string, safe string, page int) ([]ForumSearchResu
func handleForumsSearch(w http.ResponseWriter, query, safe, lang string, page int) {
results, err := PerformRedditSearch(query, safe, page)
if err != nil {
http.Error(w, fmt.Sprintf("Error performing search: %v", err), http.StatusInternalServerError)
return
if err != nil || len(results) == 0 || 0 == 0 { // 0 == 0 to force search by other node
log.Printf("No results from primary search, trying other nodes")
results = tryOtherNodesForForumSearch(query, safe, lang, page)
}
data := struct {
@ -137,3 +140,81 @@ func handleForumsSearch(w http.ResponseWriter, query, safe, lang string, page in
http.Error(w, fmt.Sprintf("Error rendering template: %v", err), http.StatusInternalServerError)
}
}
func tryOtherNodesForForumSearch(query, safe, lang string, page int) []ForumSearchResult {
for _, nodeAddr := range peers {
results, err := sendSearchRequestToNode(nodeAddr, query, safe, lang, page)
if err != nil {
log.Printf("Error contacting node %s: %v", nodeAddr, err)
continue
}
if len(results) > 0 {
return results
}
}
return nil
}
func sendSearchRequestToNode(nodeAddr, query, safe, lang string, page int) ([]ForumSearchResult, error) {
searchParams := struct {
Query string `json:"query"`
Safe string `json:"safe"`
Lang string `json:"lang"`
Page int `json:"page"`
ResponseAddr string `json:"responseAddr"`
}{
Query: query,
Safe: safe,
Lang: lang,
Page: page,
ResponseAddr: fmt.Sprintf("http://localhost:%d/node", config.Port),
}
msgBytes, err := json.Marshal(searchParams)
if err != nil {
return nil, fmt.Errorf("failed to marshal search parameters: %v", err)
}
msg := Message{
ID: hostID,
Type: "search-forum",
Content: string(msgBytes),
}
err = sendMessage(nodeAddr, msg)
if err != nil {
return nil, fmt.Errorf("failed to send search request to node %s: %v", nodeAddr, err)
}
// Wait for results
select {
case res := <-resultsChan:
return res, nil
case <-time.After(20 * time.Second): // Increased timeout duration
return nil, fmt.Errorf("timeout waiting for results from node %s", nodeAddr)
}
}
func handleForumResultsMessage(msg Message) {
var results []ForumSearchResult
err := json.Unmarshal([]byte(msg.Content), &results)
if err != nil {
log.Printf("Error unmarshalling forum results: %v", err)
return
}
log.Printf("Received forum results: %+v", results)
// Send results to resultsChan
go func() {
resultsChan <- results
}()
}
func fetchForumResults(query, safe, lang string, page int) []ForumSearchResult {
results, err := PerformRedditSearch(query, safe, page)
if err != nil {
log.Printf("Error fetching forum results: %v", err)
return nil
}
return results
}

183
node-handle-search.go Normal file
View file

@ -0,0 +1,183 @@
package main
import (
"encoding/json"
"log"
"sync"
)
var (
forumResults = make(map[string][]ForumSearchResult)
forumResultsMutex sync.Mutex
)
func handleSearchTextMessage(msg Message) {
var searchParams struct {
Query string `json:"query"`
Safe string `json:"safe"`
Lang string `json:"lang"`
Page int `json:"page"`
}
err := json.Unmarshal([]byte(msg.Content), &searchParams)
if err != nil {
log.Printf("Error parsing search parameters: %v", err)
return
}
results := fetchTextResults(searchParams.Query, searchParams.Safe, searchParams.Lang, searchParams.Page)
resultsJSON, err := json.Marshal(results)
if err != nil {
log.Printf("Error marshalling search results: %v", err)
return
}
responseMsg := Message{
ID: hostID,
Type: "search-results",
Content: string(resultsJSON),
}
err = sendMessage(msg.ID, responseMsg)
if err != nil {
log.Printf("Error sending search results to %s: %v", msg.ID, err)
}
}
func handleSearchImageMessage(msg Message) {
var searchParams struct {
Query string `json:"query"`
Safe string `json:"safe"`
Lang string `json:"lang"`
Page int `json:"page"`
}
err := json.Unmarshal([]byte(msg.Content), &searchParams)
if err != nil {
log.Printf("Error parsing search parameters: %v", err)
return
}
results := fetchImageResults(searchParams.Query, searchParams.Safe, searchParams.Lang, searchParams.Page)
resultsJSON, err := json.Marshal(results)
if err != nil {
log.Printf("Error marshalling search results: %v", err)
return
}
responseMsg := Message{
ID: hostID,
Type: "image-results",
Content: string(resultsJSON),
}
err = sendMessage(msg.ID, responseMsg)
if err != nil {
log.Printf("Error sending image search results to %s: %v", msg.ID, err)
}
}
func handleSearchVideoMessage(msg Message) {
var searchParams struct {
Query string `json:"query"`
Safe string `json:"safe"`
Lang string `json:"lang"`
Page int `json:"page"`
}
err := json.Unmarshal([]byte(msg.Content), &searchParams)
if err != nil {
log.Printf("Error parsing search parameters: %v", err)
return
}
results := fetchVideoResults(searchParams.Query, searchParams.Safe, searchParams.Lang, searchParams.Page)
resultsJSON, err := json.Marshal(results)
if err != nil {
log.Printf("Error marshalling search results: %v", err)
return
}
responseMsg := Message{
ID: hostID,
Type: "video-results",
Content: string(resultsJSON),
}
err = sendMessage(msg.ID, responseMsg)
if err != nil {
log.Printf("Error sending video search results to %s: %v", msg.ID, err)
}
}
func handleSearchFileMessage(msg Message) {
var searchParams struct {
Query string `json:"query"`
Safe string `json:"safe"`
Lang string `json:"lang"`
Page int `json:"page"`
}
err := json.Unmarshal([]byte(msg.Content), &searchParams)
if err != nil {
log.Printf("Error parsing search parameters: %v", err)
return
}
results := fetchFileResults(searchParams.Query, searchParams.Safe, searchParams.Lang, searchParams.Page)
resultsJSON, err := json.Marshal(results)
if err != nil {
log.Printf("Error marshalling search results: %v", err)
return
}
responseMsg := Message{
ID: hostID,
Type: "file-results",
Content: string(resultsJSON),
}
err = sendMessage(msg.ID, responseMsg)
if err != nil {
log.Printf("Error sending file search results to %s: %v", msg.ID, err)
}
}
func handleSearchForumMessage(msg Message) {
var searchParams struct {
Query string `json:"query"`
Safe string `json:"safe"`
Lang string `json:"lang"`
Page int `json:"page"`
ResponseAddr string `json:"responseAddr"`
}
err := json.Unmarshal([]byte(msg.Content), &searchParams)
if err != nil {
log.Printf("Error parsing search parameters: %v", err)
return
}
log.Printf("Received search-forum request. ResponseAddr: %s", searchParams.ResponseAddr)
results := fetchForumResults(searchParams.Query, searchParams.Safe, searchParams.Lang, searchParams.Page)
resultsJSON, err := json.Marshal(results)
if err != nil {
log.Printf("Error marshalling search results: %v", err)
return
}
responseMsg := Message{
ID: hostID,
Type: "forum-results",
Content: string(resultsJSON),
}
// Log the address to be used for sending the response
log.Printf("Sending forum search results to %s", searchParams.ResponseAddr)
if searchParams.ResponseAddr == "" {
log.Printf("Error: Response address is empty")
return
}
err = sendMessage(searchParams.ResponseAddr, responseMsg)
if err != nil {
log.Printf("Error sending forum search results to %s: %v", searchParams.ResponseAddr, err)
}
}

80
node-request-search.go Normal file
View file

@ -0,0 +1,80 @@
package main
import (
"encoding/json"
)
// func sendSearchRequestToNode(nodeAddr, query, safe, lang string, page int, requestID string) ([]ForumSearchResult, error) {
// searchParams := struct {
// Query string `json:"query"`
// Safe string `json:"safe"`
// Lang string `json:"lang"`
// Page int `json:"page"`
// ResponseAddr string `json:"responseAddr"`
// }{
// Query: query,
// Safe: safe,
// Lang: lang,
// Page: page,
// ResponseAddr: "http://localhost:5000/node", // Node 1's address
// }
// msg := Message{
// ID: requestID,
// Type: "search-forum",
// Content: toJSON(searchParams),
// }
// msgBytes, err := json.Marshal(msg)
// if err != nil {
// return nil, fmt.Errorf("failed to marshal search request: %v", err)
// }
// req, err := http.NewRequest("POST", nodeAddr, bytes.NewBuffer(msgBytes))
// if err != nil {
// return nil, fmt.Errorf("failed to create search request: %v", err)
// }
// req.Header.Set("Content-Type", "application/json")
// req.Header.Set("Authorization", authCode)
// client := &http.Client{
// Timeout: time.Second * 10,
// }
// resp, err := client.Do(req)
// if err != nil {
// return nil, fmt.Errorf("failed to send search request: %v", err)
// }
// defer resp.Body.Close()
// if resp.StatusCode != http.StatusOK {
// body, _ := ioutil.ReadAll(resp.Body)
// return nil, fmt.Errorf("server error: %s", body)
// }
// var responseMsg Message
// err = json.NewDecoder(resp.Body).Decode(&responseMsg)
// if err != nil {
// return nil, fmt.Errorf("failed to decode search response: %v", err)
// }
// if responseMsg.Type != "forum-results" {
// return nil, fmt.Errorf("unexpected message type: %s", responseMsg.Type)
// }
// var results []ForumSearchResult
// err = json.Unmarshal([]byte(responseMsg.Content), &results)
// if err != nil {
// return nil, fmt.Errorf("failed to unmarshal search results: %v", err)
// }
// return results, nil
// }
func toJSON(v interface{}) string {
data, err := json.Marshal(v)
if err != nil {
return ""
}
return string(data)
}

23
node.go
View file

@ -49,6 +49,10 @@ func generateHostID() (string, error) {
}
func sendMessage(serverAddr string, msg Message) error {
if serverAddr == "" {
return fmt.Errorf("server address is empty")
}
msgBytes, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %v", err)
@ -136,7 +140,26 @@ func interpretMessage(msg Message) {
handleHeartbeat(msg.Content)
case "election":
handleElection(msg.Content)
case "search-text":
handleSearchTextMessage(msg)
case "search-image":
handleSearchImageMessage(msg)
case "search-video":
handleSearchVideoMessage(msg)
case "search-file":
handleSearchFileMessage(msg)
case "search-forum":
log.Println("Received search-forum message:", msg.Content)
handleSearchForumMessage(msg)
case "forum-results":
handleForumResultsMessage(msg)
default:
fmt.Println("Received unknown message type:", msg.Type)
}
}
func generateRequestID() string {
bytes := make([]byte, 16)
rand.Read(bytes)
return fmt.Sprintf("%x", bytes)
}

View file

@ -188,10 +188,10 @@ func handleVideoSearch(w http.ResponseWriter, query, safe, lang string, page int
}
err = tmpl.Execute(w, map[string]interface{}{
"Results": results,
"Query": query,
"Fetched": fmt.Sprintf("%.2f seconds", elapsed.Seconds()),
"Page": page,
"Results": results,
"Query": query,
"Fetched": fmt.Sprintf("%.2f seconds", elapsed.Seconds()),
"Page": page,
"HasPrevPage": page > 1,
"HasNextPage": len(results) > 0, // assuming you have a way to determine if there are more pages
})
@ -200,3 +200,33 @@ func handleVideoSearch(w http.ResponseWriter, query, safe, lang string, page int
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
}
}
func fetchVideoResults(query, safe, lang string, page int) []VideoResult {
apiResp, err := makeHTMLRequest(query, safe, lang, page)
if err != nil {
log.Printf("Error fetching video results: %v", err)
return nil
}
var results []VideoResult
for _, item := range apiResp.Items {
if item.Type == "channel" || item.Type == "playlist" {
continue
}
if item.UploadedDate == "" {
item.UploadedDate = "Now"
}
results = append(results, VideoResult{
Href: fmt.Sprintf("https://youtube.com%s", item.URL),
Title: item.Title,
Date: item.UploadedDate,
Views: formatViews(item.Views),
Creator: item.UploaderName,
Publisher: "Piped",
Image: fmt.Sprintf("/img_proxy?url=%s", url.QueryEscape(item.Thumbnail)),
Duration: formatDuration(item.Duration),
})
}
return results
}