Changed self-crawling as experimental, cleanup unused features
Some checks failed
Run Integration Tests / test (push) Failing after 1m15s

This commit is contained in:
partisan 2025-06-08 22:12:15 +02:00
parent ca87df5df1
commit 49cb7bb94a
27 changed files with 1731 additions and 832 deletions

345
node.go
View file

@ -1,78 +1,152 @@
//go:build experimental
// +build experimental
package main
import (
"bytes"
"crypto/rand"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"time"
)
var (
authCode string
peers []string
hostID string
sockets []string
hostID string
socketDir string
)
type Message struct {
ID string `json:"id"`
Type string `json:"type"`
Content string `json:"content"`
VisitedNodes []string `json:"visitedNodes"`
ID uint32
Type uint8
Content []byte
Target string
}
const (
MsgTypeNone uint8 = 0
MsgTypeTest uint8 = 1
// Request types (1099)
MsgTypeSearchTextRequest uint8 = 10
MsgTypeSearchImageRawRequest uint8 = 11
MsgTypeSearchImageThumbRequest uint8 = 12
MsgTypeSearchImageFullRequest uint8 = 13
MsgTypeSearchImageAllRequest uint8 = 14
MsgTypeSearchVideoRequest uint8 = 15
MsgTypeSearchFileRequest uint8 = 16
MsgTypeSearchForumRequest uint8 = 17
MsgTypeSearchMusicRequest uint8 = 18
// Response types (110199)
MsgTypeSearchTextResponse uint8 = 110
MsgTypeSearchImageResponse uint8 = 111
MsgTypeSearchVideoResponse uint8 = 112
MsgTypeSearchFileResponse uint8 = 113
MsgTypeSearchForumResponse uint8 = 114
MsgTypeSearchMusicResponse uint8 = 115
)
func loadNodeConfig() {
authCode = config.AuthCode
peers = config.Peers
sockets = config.Nodes
socketDir = "/tmp/" // Directory where sockets are stored, for now fixed tmp dir, can be changed later
}
func generateHostID() (string, error) {
bytes := make([]byte, 16)
_, err := rand.Read(bytes)
if err != nil {
return "", fmt.Errorf("failed to generate host ID: %v", err)
var messageIDCounter uint32 = 0
func generateMessageID() uint32 {
if messageIDCounter == ^uint32(0) { // 0xFFFFFFFF
messageIDCounter = 1
} else {
messageIDCounter++
}
return fmt.Sprintf("%x", bytes), nil
return messageIDCounter
}
func sendMessage(serverAddr string, msg Message) error {
if serverAddr == "" {
return fmt.Errorf("server address is empty")
func encodeSearchTextParams(query, safe, lang string, page int) ([]byte, error) {
buf := new(bytes.Buffer)
if err := writeString(buf, query); err != nil {
return nil, err
}
if err := writeString(buf, safe); err != nil {
return nil, err
}
if err := writeString(buf, lang); err != nil {
return nil, err
}
if err := binary.Write(buf, binary.BigEndian, uint16(page)); err != nil {
return nil, err
}
msgBytes, err := json.Marshal(msg)
return buf.Bytes(), nil
}
func sendMessage(msg Message) error {
socketPath := socketDir + msg.Target + ".sock"
conn, err := net.Dial("unix", socketPath)
if err != nil {
return fmt.Errorf("failed to marshal message: %v", err)
return fmt.Errorf("failed to connect to socket %s: %v", socketPath, err)
}
defer conn.Close()
req, err := http.NewRequest("POST", serverAddr, bytes.NewBuffer(msgBytes))
msgBytes, err := serializeMessage(msg)
if err != nil {
return fmt.Errorf("failed to create request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", authCode)
client := &http.Client{
Timeout: time.Second * 10,
return fmt.Errorf("serialization error: %v", err)
}
resp, err := client.Do(req)
_, err = conn.Write(msgBytes)
return err
}
func startUnixSocketServer(socketName string) {
socketPath := socketDir + socketName + ".sock"
if _, err := os.Stat(socketPath); err == nil {
os.Remove(socketPath)
}
listener, err := net.Listen("unix", socketPath)
if err != nil {
return fmt.Errorf("failed to send request: %v", err)
panic(fmt.Sprintf("Failed to listen on %s: %v", socketPath, err))
}
defer resp.Body.Close()
defer listener.Close()
os.Chmod(socketPath, 0666)
if resp.StatusCode != http.StatusOK {
body, err := io.ReadAll(resp.Body)
printInfo("Listening on UNIX socket: %s", socketPath)
for {
conn, err := listener.Accept()
if err != nil {
return fmt.Errorf("failed to read response body: %v", err)
printWarn("Accept error: %v", err)
continue
}
return fmt.Errorf("server error: %s", body)
}
return nil
go func(c net.Conn) {
defer c.Close()
buf, err := io.ReadAll(c)
if err != nil {
printWarn("Read error: %v", err)
return
}
msg, err := deserializeMessage(buf)
if err != nil {
printWarn("Deserialization error: %v", err)
return
}
printDebug("Received binary message: %+v", msg)
interpretMessage(msg)
}(conn)
}
}
func handleNodeRequest(w http.ResponseWriter, r *http.Request) {
@ -81,12 +155,6 @@ func handleNodeRequest(w http.ResponseWriter, r *http.Request) {
return
}
auth := r.Header.Get("Authorization")
if auth != authCode {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
var msg Message
err := json.NewDecoder(r.Body).Decode(&msg)
if err != nil {
@ -95,64 +163,177 @@ func handleNodeRequest(w http.ResponseWriter, r *http.Request) {
}
defer r.Body.Close()
printDebug("Received message: %+v\n", msg)
w.Write([]byte("Message received"))
printDebug("Received HTTP message: %+v", msg)
interpretMessage(msg)
}
func startNodeClient() {
for {
for _, peerAddr := range peers {
msg := Message{
ID: hostID,
Type: "test",
Content: "This is a test message from the client node",
}
func startNodeClientUnix() {
var idCounter uint32 = 0
err := sendMessage(peerAddr, msg)
if err != nil {
printWarn("Error sending message to %s: %v", peerAddr, err)
} else {
printInfo("Message sent successfully to: %s", peerAddr)
}
for {
msg := Message{
ID: idCounter,
Type: MsgTypeTest,
Content: []byte("This is a test message via UNIX socket"),
Target: "node2", ///!!!
}
idCounter++
if err := sendMessage(msg); err != nil {
printWarn("Send error: %v", err)
}
time.Sleep(10 * time.Second)
}
}
func interpretMessage(msg Message) {
printDebug("Received message: %s", msg.Content)
switch msg.Type {
case "test":
printDebug("Received test message: %v", msg.Content)
case "update":
printDebug("Received update message: %v", msg.Content)
go update()
case "heartbeat":
handleHeartbeat(msg.Content)
case "election":
handleElection(msg.Content)
case "search-text":
case MsgTypeTest:
handleTestMessage(msg)
case MsgTypeSearchTextRequest:
handleSearchTextMessage(msg)
case "search-image":
case MsgTypeSearchImageRawRequest, MsgTypeSearchImageThumbRequest, MsgTypeSearchImageFullRequest, MsgTypeSearchImageAllRequest:
handleSearchImageMessage(msg)
case "search-video":
case MsgTypeSearchVideoRequest:
handleSearchVideoMessage(msg)
case "search-file":
case MsgTypeSearchFileRequest:
handleSearchFileMessage(msg)
case "search-forum":
case MsgTypeSearchForumRequest:
handleSearchForumMessage(msg)
case "forum-results":
handleForumResultsMessage(msg)
case "text-results":
case MsgTypeSearchMusicRequest:
handleSearchMusicMessage(msg)
case MsgTypeSearchTextResponse:
handleTextResultsMessage(msg)
case "image-results":
case MsgTypeSearchImageResponse:
handleImageResultsMessage(msg)
case "video-results":
case MsgTypeSearchVideoResponse:
handleVideoResultsMessage(msg)
case "file-results":
case MsgTypeSearchFileResponse:
handleFileResultsMessage(msg)
case MsgTypeSearchForumResponse:
handleForumResultsMessage(msg)
case MsgTypeSearchMusicResponse:
handleMusicResultsMessage(msg)
default:
printWarn("Received unknown message type: %v", msg.Type)
printWarn("Unknown message type: %d", msg.Type)
}
}
// Serialize Message to binary
func serializeMessage(msg Message) ([]byte, error) {
buf := new(bytes.Buffer)
if err := binary.Write(buf, binary.BigEndian, msg.ID); err != nil {
return nil, err
}
if err := binary.Write(buf, binary.BigEndian, msg.Type); err != nil {
return nil, err
}
// Content
contentBytes := []byte(msg.Content)
if len(contentBytes) > 65535 {
return nil, errors.New("content too long")
}
if err := binary.Write(buf, binary.BigEndian, uint16(len(contentBytes))); err != nil {
return nil, err
}
buf.Write(contentBytes)
// Target
targetBytes := []byte(msg.Target)
if len(targetBytes) > 255 {
return nil, errors.New("target name too long")
}
if err := buf.WriteByte(uint8(len(targetBytes))); err != nil {
return nil, err
}
buf.Write(targetBytes)
return buf.Bytes(), nil
}
// Deserialize binary to Message
func deserializeMessage(data []byte) (Message, error) {
buf := bytes.NewReader(data)
var msg Message
if err := binary.Read(buf, binary.BigEndian, &msg.ID); err != nil {
return msg, err
}
if err := binary.Read(buf, binary.BigEndian, &msg.Type); err != nil {
return msg, err
}
var contentLen uint16
if err := binary.Read(buf, binary.BigEndian, &contentLen); err != nil {
return msg, err
}
content := make([]byte, contentLen)
if _, err := io.ReadFull(buf, content); err != nil {
return msg, err
}
msg.Content = content
var targetLen uint8
if err := binary.Read(buf, binary.BigEndian, &targetLen); err != nil {
return msg, err
}
target := make([]byte, targetLen)
if _, err := io.ReadFull(buf, target); err != nil {
return msg, err
}
msg.Target = string(target)
return msg, nil
}
func writeString(buf *bytes.Buffer, s string) error {
if err := binary.Write(buf, binary.BigEndian, uint16(len(s))); err != nil {
return err
}
_, err := buf.Write([]byte(s))
return err
}
func readString(buf *bytes.Reader) (string, error) {
var length uint16
if err := binary.Read(buf, binary.BigEndian, &length); err != nil {
return "", err
}
strBytes := make([]byte, length)
if _, err := io.ReadFull(buf, strBytes); err != nil {
return "", err
}
return string(strBytes), nil
}
type testPayload struct {
Message string `json:"message"`
ResponseAddr string `json:"ResponseAddr"`
}
func handleTestMessage(msg Message) {
var payload testPayload
if err := json.Unmarshal([]byte(msg.Content), &payload); err != nil {
printWarn("Failed to parse test payload: %v", err)
return
}
printDebug("Received message: %s", payload.Message)
printInfo("Received TEST message: %s", payload.Message)
reply := Message{
ID: msg.ID,
Type: MsgTypeTest,
Content: []byte("hello test"),
Target: payload.ResponseAddr,
}
if err := sendMessage(reply); err != nil {
printWarn("Failed to send test response: %v", err)
}
}