diff --git a/init.go b/init.go index b3df270..91ffd05 100644 --- a/init.go +++ b/init.go @@ -11,15 +11,16 @@ import ( "strconv" "strings" "sync" + "time" "github.com/fsnotify/fsnotify" ) -// Configuration structure type Config struct { Port int AuthCode string Peers []string + PeerID string OpenSearch OpenSearchConfig } @@ -27,7 +28,6 @@ type OpenSearchConfig struct { Domain string } -// Default configuration values var defaultConfig = Config{ Port: 5000, OpenSearch: OpenSearchConfig{ @@ -57,11 +57,21 @@ func main() { saveConfig(config) } + // Initialize P2P + var nodeErr error + hostID, nodeErr = initP2P() + if nodeErr != nil { + log.Fatalf("Failed to initialize P2P: %v", nodeErr) + } + config.PeerID = hostID.String() + if len(config.Peers) > 0 { - go startNodeClient(config.Peers) + time.Sleep(2 * time.Second) // Give some time for connections to establish startElection() } + go startNodeClient() + runServer() } @@ -103,7 +113,7 @@ func createConfig() error { fmt.Print("Do you want to connect to other nodes? (yes/no): ") connectNodes, _ := reader.ReadString('\n') if strings.TrimSpace(connectNodes) == "yes" { - fmt.Println("Enter peer addresses (comma separated, e.g., http://localhost:5000,http://localhost:5001): ") + fmt.Println("Enter peer addresses (comma separated, e.g., /ip4/127.0.0.1/tcp/5000,/ip4/127.0.0.1/tcp/5001): ") peersStr, _ := reader.ReadString('\n') if peersStr != "\n" { config.Peers = strings.Split(strings.TrimSpace(peersStr), ",") diff --git a/main.go b/main.go index 0d9ac33..f79eef2 100644 --- a/main.go +++ b/main.go @@ -122,20 +122,19 @@ func parsePageParameter(pageStr string) int { } func runServer() { - // http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("static")))) - // http.HandleFunc("/", handleSearch) - // http.HandleFunc("/search", handleSearch) - // http.HandleFunc("/img_proxy", handleImageProxy) - // http.HandleFunc("/settings", func(w http.ResponseWriter, r *http.Request) { - // http.ServeFile(w, r, "templates/settings.html") - // }) - // http.HandleFunc("/opensearch.xml", func(w http.ResponseWriter, r *http.Request) { - // w.Header().Set("Content-Type", "application/opensearchdescription+xml") - // http.ServeFile(w, r, "static/opensearch.xml") - // }) - // initializeTorrentSites() - - http.HandleFunc("/node", handleNodeRequest) // Handle node requests + http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("static")))) + http.HandleFunc("/", handleSearch) + http.HandleFunc("/search", handleSearch) + http.HandleFunc("/img_proxy", handleImageProxy) + http.HandleFunc("/node", handleNodeRequest) + http.HandleFunc("/settings", func(w http.ResponseWriter, r *http.Request) { + http.ServeFile(w, r, "templates/settings.html") + }) + http.HandleFunc("/opensearch.xml", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/opensearchdescription+xml") + http.ServeFile(w, r, "static/opensearch.xml") + }) + initializeTorrentSites() config := loadConfig() generateOpenSearchXML(config) @@ -143,9 +142,6 @@ func runServer() { fmt.Printf("Server is listening on http://localhost:%d\n", config.Port) log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", config.Port), nil)) - // Start node communication client - go startNodeClient(peers) - // Start automatic update checker go checkForUpdates() } diff --git a/node-master.go b/node-master.go index 4f5155b..0cfa7ae 100644 --- a/node-master.go +++ b/node-master.go @@ -24,7 +24,12 @@ func sendHeartbeats() { return } for _, node := range peers { - err := sendMessage(node, authCode, "heartbeat", authCode) + msg := Message{ + ID: hostID.Pretty(), + Type: "heartbeat", + Content: authCode, + } + err := sendMessage(node, msg) if err != nil { log.Printf("Error sending heartbeat to %s: %v", node, err) } @@ -55,7 +60,12 @@ func startElection() { defer masterNodeMux.Unlock() for _, node := range peers { - err := sendMessage(node, authCode, "election", authCode) + msg := Message{ + ID: hostID.Pretty(), + Type: "election", + Content: authCode, + } + err := sendMessage(node, msg) if err != nil { log.Printf("Error sending election message to %s: %v", node, err) } diff --git a/node-update.go b/node-update.go index 0ef63f6..555b16a 100644 --- a/node-update.go +++ b/node-update.go @@ -9,14 +9,19 @@ import ( // Function to sync updates across all nodes func nodeUpdateSync() { fmt.Println("Syncing updates across all nodes...") - for _, peer := range peers { - fmt.Printf("Notifying node %s about update...\n", peer) - err := sendMessage(peer, authCode, "update", "Start update process") + for _, peerAddr := range peers { + fmt.Printf("Notifying node %s about update...\n", peerAddr) + msg := Message{ + ID: hostID.Pretty(), + Type: "update", + Content: "Start update process", + } + err := sendMessage(peerAddr, msg) if err != nil { - log.Printf("Failed to notify node %s: %v\n", peer, err) + log.Printf("Failed to notify node %s: %v\n", peerAddr, err) continue } - fmt.Printf("Node %s notified. Waiting for it to update...\n", peer) + fmt.Printf("Node %s notified. Waiting for it to update...\n", peerAddr) time.Sleep(30 * time.Second) // Adjust sleep time as needed to allow for updates } fmt.Println("All nodes have been updated.") diff --git a/node.go b/node.go index 97862c7..a397c78 100644 --- a/node.go +++ b/node.go @@ -2,8 +2,7 @@ package main import ( "bytes" - "crypto/sha256" - "encoding/hex" + "crypto/rand" "encoding/json" "fmt" "io/ioutil" @@ -11,6 +10,10 @@ import ( "net/http" "sync" "time" + + libp2p "github.com/libp2p/go-libp2p" + crypto "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/peer" ) var ( @@ -18,6 +21,7 @@ var ( peers []string authMutex sync.Mutex authenticated = make(map[string]bool) + hostID peer.ID ) type Message struct { @@ -35,74 +39,99 @@ type CrawlerConfig struct { func loadNodeConfig() { config := loadConfig() - authCode = config.AuthCode // nuh uh + authCode = config.AuthCode peers = config.Peers } +func initP2P() (peer.ID, error) { + priv, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, rand.Reader) + if err != nil { + return "", fmt.Errorf("failed to generate key pair: %v", err) + } + + h, err := libp2p.New(libp2p.Identity(priv)) + if err != nil { + return "", fmt.Errorf("failed to create libp2p host: %v", err) + } + + return h.ID(), nil +} + +func sendMessage(serverAddr string, msg Message) error { + msgBytes, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to marshal message: %v", err) + } + + req, err := http.NewRequest("POST", serverAddr, bytes.NewBuffer(msgBytes)) + if err != nil { + return fmt.Errorf("failed to create request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", authCode) + + client := &http.Client{ + Timeout: time.Second * 10, + } + + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := ioutil.ReadAll(resp.Body) + return fmt.Errorf("server error: %s", body) + } + + return nil +} + func handleNodeRequest(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) return } - body, err := ioutil.ReadAll(r.Body) - if err != nil { - http.Error(w, "Error reading request body", http.StatusInternalServerError) + auth := r.Header.Get("Authorization") + if auth != authCode { + http.Error(w, "Unauthorized", http.StatusUnauthorized) return } - defer r.Body.Close() var msg Message - if err := json.Unmarshal(body, &msg); err != nil { - http.Error(w, "Error parsing JSON", http.StatusBadRequest) - return - } - - if !isAuthenticated(msg.ID) { - http.Error(w, "Authentication required", http.StatusUnauthorized) - return - } - - interpretMessage(msg) - fmt.Fprintln(w, "Message received") -} - -func handleAuth(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) - return - } - - body, err := ioutil.ReadAll(r.Body) + err := json.NewDecoder(r.Body).Decode(&msg) if err != nil { - http.Error(w, "Error reading request body", http.StatusInternalServerError) + http.Error(w, "Error parsing JSON", http.StatusBadRequest) return } defer r.Body.Close() - var authRequest CrawlerConfig - if err := json.Unmarshal(body, &authRequest); err != nil { - http.Error(w, "Error parsing JSON", http.StatusBadRequest) - return - } + log.Printf("Received message: %+v\n", msg) + w.Write([]byte("Message received")) - expectedCode := GenerateRegistrationCode(authRequest.Host, authRequest.Port, authCode) - if authRequest.AuthCode != expectedCode { - http.Error(w, "Invalid auth code", http.StatusUnauthorized) - return - } - - authMutex.Lock() - authenticated[authRequest.ID] = true - authMutex.Unlock() - - fmt.Fprintln(w, "Authenticated successfully") + interpretMessage(msg) } -func isAuthenticated(id string) bool { - authMutex.Lock() - defer authMutex.Unlock() - return authenticated[id] +func startNodeClient() { + for { + for _, peerAddr := range peers { + msg := Message{ + ID: hostID.Pretty(), + Type: "test", + Content: "This is a test message from the client node", + } + + err := sendMessage(peerAddr, msg) + if err != nil { + log.Printf("Error sending message to %s: %v", peerAddr, err) + } else { + log.Println("Message sent successfully to", peerAddr) + } + } + time.Sleep(10 * time.Second) + } } func interpretMessage(msg Message) { @@ -120,71 +149,3 @@ func interpretMessage(msg Message) { fmt.Println("Received unknown message type:", msg.Type) } } - -func sendMessage(address, id, msgType, content string) error { - msg := Message{ - ID: id, - Type: msgType, - Content: content, - } - msgBytes, err := json.Marshal(msg) - if err != nil { - return err - } - - req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/node", address), bytes.NewBuffer(msgBytes)) - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/json") - - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := ioutil.ReadAll(resp.Body) - return fmt.Errorf("failed to send message: %s", body) - } - - return nil -} - -func startNodeClient(addresses []string) { - for _, address := range addresses { - go func(addr string) { - for { - err := sendMessage(addr, authCode, "test", "This is a test message") - if err != nil { - fmt.Println("Error sending test message to", addr, ":", err) - continue - } - time.Sleep(10 * time.Second) - } - }(address) - } -} - -func GenerateRegistrationCode(host string, port int, authCode string) string { - data := fmt.Sprintf("%s:%d:%s", host, port, authCode) - hash := sha256.Sum256([]byte(data)) - return hex.EncodeToString(hash[:]) -} - -func ParseRegistrationCode(code string, host string, port int, authCode string) (string, int, string, error) { - data := fmt.Sprintf("%s:%d:%s", host, port, authCode) - hash := sha256.Sum256([]byte(data)) - expectedCode := hex.EncodeToString(hash[:]) - - log.Printf("Parsing registration code: %s", code) - log.Printf("Expected registration code: %s", expectedCode) - - if expectedCode != code { - return "", 0, "", fmt.Errorf("invalid registration code") - } - - return host, port, authCode, nil -}