From ce00c5f91bad09c6a64e9ccb2b56723a51265794 Mon Sep 17 00:00:00 2001 From: partisan Date: Fri, 9 Aug 2024 09:55:41 +0200 Subject: [PATCH] added fetch file results from other nodes --- files.go | 93 ++++++++++++++++++++++++++++++++++++++----- node-handle-search.go | 22 +++++++--- 2 files changed, 99 insertions(+), 16 deletions(-) diff --git a/files.go b/files.go index 1ef1276..35c227c 100644 --- a/files.go +++ b/files.go @@ -1,6 +1,7 @@ package main import ( + "encoding/json" "fmt" "html/template" "log" @@ -30,6 +31,8 @@ var ( rutor TorrentSite ) +var fileResultsChan = make(chan []TorrentResult) + func initializeTorrentSites() { torrentGalaxy = NewTorrentGalaxy() // nyaa = NewNyaa() @@ -108,20 +111,26 @@ func getFileResultsFromCacheOrFetch(cacheKey CacheKey, query, safe, lang string, select { case results := <-cacheChan: if results == nil { - combinedResults = fetchAndCacheFileResults(query, safe, lang, page) + combinedResults = fetchFileResults(query, safe, lang, page) + if len(combinedResults) > 0 { + resultsCache.Set(cacheKey, convertToSearchResults(combinedResults)) + } } else { _, torrentResults, _ := convertToSpecificResults(results) combinedResults = torrentResults } case <-time.After(2 * time.Second): log.Println("Cache check timeout") - combinedResults = fetchAndCacheFileResults(query, safe, lang, page) + combinedResults = fetchFileResults(query, safe, lang, page) + if len(combinedResults) > 0 { + resultsCache.Set(cacheKey, convertToSearchResults(combinedResults)) + } } return combinedResults } -func fetchAndCacheFileResults(query, safe, lang string, page int) []TorrentResult { +func fetchFileResults(query, safe, lang string, page int) []TorrentResult { sites := []TorrentSite{torrentGalaxy, nyaa, thePirateBay, rutor} results := []TorrentResult{} @@ -139,17 +148,81 @@ func fetchAndCacheFileResults(query, safe, lang string, page int) []TorrentResul } } - // Cache the valid results - cacheKey := CacheKey{Query: query, Page: page, Safe: safe == "true", Lang: lang, Type: "file"} - resultsCache.Set(cacheKey, convertToSearchResults(results)) + if len(results) == 0 { + log.Printf("No file results found for query: %s, trying other nodes", query) + results = tryOtherNodesForFileSearch(query, safe, lang, page) + } return results } -func fetchFileResults(query, safe, lang string, page int) []TorrentResult { - cacheKey := CacheKey{Query: query, Page: page, Safe: safe == "true", Lang: lang, Type: "file"} - results := getFileResultsFromCacheOrFetch(cacheKey, query, safe, lang, page) - return results +func tryOtherNodesForFileSearch(query, safe, lang string, page int) []TorrentResult { + for _, nodeAddr := range peers { + results, err := sendFileSearchRequestToNode(nodeAddr, query, safe, lang, page) + if err != nil { + log.Printf("Error contacting node %s: %v", nodeAddr, err) + continue + } + if len(results) > 0 { + return results + } + } + return nil +} + +func sendFileSearchRequestToNode(nodeAddr, query, safe, lang string, page int) ([]TorrentResult, error) { + searchParams := struct { + Query string `json:"query"` + Safe string `json:"safe"` + Lang string `json:"lang"` + Page int `json:"page"` + ResponseAddr string `json:"responseAddr"` + }{ + Query: query, + Safe: safe, + Lang: lang, + Page: page, + ResponseAddr: fmt.Sprintf("http://localhost:%d/node", config.Port), + } + + msgBytes, err := json.Marshal(searchParams) + if err != nil { + return nil, fmt.Errorf("failed to marshal search parameters: %v", err) + } + + msg := Message{ + ID: hostID, + Type: "search-file", + Content: string(msgBytes), + } + + err = sendMessage(nodeAddr, msg) + if err != nil { + return nil, fmt.Errorf("failed to send search request to node %s: %v", nodeAddr, err) + } + + // Wait for results + select { + case res := <-fileResultsChan: + return res, nil + case <-time.After(20 * time.Second): + return nil, fmt.Errorf("timeout waiting for results from node %s", nodeAddr) + } +} + +func handleFileResultsMessage(msg Message) { + var results []TorrentResult + err := json.Unmarshal([]byte(msg.Content), &results) + if err != nil { + log.Printf("Error unmarshalling file results: %v", err) + return + } + + log.Printf("Received file results: %+v", results) + // Send results to fileResultsChan + go func() { + fileResultsChan <- results + }() } func removeMagnetLink(magnet string) string { diff --git a/node-handle-search.go b/node-handle-search.go index d83f545..3992de0 100644 --- a/node-handle-search.go +++ b/node-handle-search.go @@ -131,10 +131,11 @@ func handleSearchVideoMessage(msg Message) { func handleSearchFileMessage(msg Message) { var searchParams struct { - Query string `json:"query"` - Safe string `json:"safe"` - Lang string `json:"lang"` - Page int `json:"page"` + Query string `json:"query"` + Safe string `json:"safe"` + Lang string `json:"lang"` + Page int `json:"page"` + ResponseAddr string `json:"responseAddr"` } err := json.Unmarshal([]byte(msg.Content), &searchParams) if err != nil { @@ -142,6 +143,8 @@ func handleSearchFileMessage(msg Message) { return } + log.Printf("Received search-file request. ResponseAddr: %s", searchParams.ResponseAddr) + results := fetchFileResults(searchParams.Query, searchParams.Safe, searchParams.Lang, searchParams.Page) resultsJSON, err := json.Marshal(results) if err != nil { @@ -155,9 +158,16 @@ func handleSearchFileMessage(msg Message) { Content: string(resultsJSON), } - err = sendMessage(msg.ID, responseMsg) + log.Printf("Sending file search results to %s", searchParams.ResponseAddr) + + if searchParams.ResponseAddr == "" { + log.Printf("Error: Response address is empty") + return + } + + err = sendMessage(searchParams.ResponseAddr, responseMsg) if err != nil { - log.Printf("Error sending file search results to %s: %v", msg.ID, err) + log.Printf("Error sending file search results to %s: %v", searchParams.ResponseAddr, err) } }