Search/node.go
partisan 49cb7bb94a
Some checks failed
Run Integration Tests / test (push) Failing after 1m15s
Changed self-crawling as experimental, cleanup unused features
2025-06-08 22:12:15 +02:00

339 lines
7.7 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//go:build experimental
// +build experimental
package main
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"time"
)
var (
sockets []string
hostID string
socketDir string
)
type Message struct {
ID uint32
Type uint8
Content []byte
Target string
}
const (
MsgTypeNone uint8 = 0
MsgTypeTest uint8 = 1
// Request types (1099)
MsgTypeSearchTextRequest uint8 = 10
MsgTypeSearchImageRawRequest uint8 = 11
MsgTypeSearchImageThumbRequest uint8 = 12
MsgTypeSearchImageFullRequest uint8 = 13
MsgTypeSearchImageAllRequest uint8 = 14
MsgTypeSearchVideoRequest uint8 = 15
MsgTypeSearchFileRequest uint8 = 16
MsgTypeSearchForumRequest uint8 = 17
MsgTypeSearchMusicRequest uint8 = 18
// Response types (110199)
MsgTypeSearchTextResponse uint8 = 110
MsgTypeSearchImageResponse uint8 = 111
MsgTypeSearchVideoResponse uint8 = 112
MsgTypeSearchFileResponse uint8 = 113
MsgTypeSearchForumResponse uint8 = 114
MsgTypeSearchMusicResponse uint8 = 115
)
func loadNodeConfig() {
sockets = config.Nodes
socketDir = "/tmp/" // Directory where sockets are stored, for now fixed tmp dir, can be changed later
}
var messageIDCounter uint32 = 0
func generateMessageID() uint32 {
if messageIDCounter == ^uint32(0) { // 0xFFFFFFFF
messageIDCounter = 1
} else {
messageIDCounter++
}
return messageIDCounter
}
func encodeSearchTextParams(query, safe, lang string, page int) ([]byte, error) {
buf := new(bytes.Buffer)
if err := writeString(buf, query); err != nil {
return nil, err
}
if err := writeString(buf, safe); err != nil {
return nil, err
}
if err := writeString(buf, lang); err != nil {
return nil, err
}
if err := binary.Write(buf, binary.BigEndian, uint16(page)); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func sendMessage(msg Message) error {
socketPath := socketDir + msg.Target + ".sock"
conn, err := net.Dial("unix", socketPath)
if err != nil {
return fmt.Errorf("failed to connect to socket %s: %v", socketPath, err)
}
defer conn.Close()
msgBytes, err := serializeMessage(msg)
if err != nil {
return fmt.Errorf("serialization error: %v", err)
}
_, err = conn.Write(msgBytes)
return err
}
func startUnixSocketServer(socketName string) {
socketPath := socketDir + socketName + ".sock"
if _, err := os.Stat(socketPath); err == nil {
os.Remove(socketPath)
}
listener, err := net.Listen("unix", socketPath)
if err != nil {
panic(fmt.Sprintf("Failed to listen on %s: %v", socketPath, err))
}
defer listener.Close()
os.Chmod(socketPath, 0666)
printInfo("Listening on UNIX socket: %s", socketPath)
for {
conn, err := listener.Accept()
if err != nil {
printWarn("Accept error: %v", err)
continue
}
go func(c net.Conn) {
defer c.Close()
buf, err := io.ReadAll(c)
if err != nil {
printWarn("Read error: %v", err)
return
}
msg, err := deserializeMessage(buf)
if err != nil {
printWarn("Deserialization error: %v", err)
return
}
printDebug("Received binary message: %+v", msg)
interpretMessage(msg)
}(conn)
}
}
func handleNodeRequest(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
return
}
var msg Message
err := json.NewDecoder(r.Body).Decode(&msg)
if err != nil {
http.Error(w, "Error parsing JSON", http.StatusBadRequest)
return
}
defer r.Body.Close()
printDebug("Received HTTP message: %+v", msg)
interpretMessage(msg)
}
func startNodeClientUnix() {
var idCounter uint32 = 0
for {
msg := Message{
ID: idCounter,
Type: MsgTypeTest,
Content: []byte("This is a test message via UNIX socket"),
Target: "node2", ///!!!
}
idCounter++
if err := sendMessage(msg); err != nil {
printWarn("Send error: %v", err)
}
time.Sleep(10 * time.Second)
}
}
func interpretMessage(msg Message) {
printDebug("Received message: %s", msg.Content)
switch msg.Type {
case MsgTypeTest:
handleTestMessage(msg)
case MsgTypeSearchTextRequest:
handleSearchTextMessage(msg)
case MsgTypeSearchImageRawRequest, MsgTypeSearchImageThumbRequest, MsgTypeSearchImageFullRequest, MsgTypeSearchImageAllRequest:
handleSearchImageMessage(msg)
case MsgTypeSearchVideoRequest:
handleSearchVideoMessage(msg)
case MsgTypeSearchFileRequest:
handleSearchFileMessage(msg)
case MsgTypeSearchForumRequest:
handleSearchForumMessage(msg)
case MsgTypeSearchMusicRequest:
handleSearchMusicMessage(msg)
case MsgTypeSearchTextResponse:
handleTextResultsMessage(msg)
case MsgTypeSearchImageResponse:
handleImageResultsMessage(msg)
case MsgTypeSearchVideoResponse:
handleVideoResultsMessage(msg)
case MsgTypeSearchFileResponse:
handleFileResultsMessage(msg)
case MsgTypeSearchForumResponse:
handleForumResultsMessage(msg)
case MsgTypeSearchMusicResponse:
handleMusicResultsMessage(msg)
default:
printWarn("Unknown message type: %d", msg.Type)
}
}
// Serialize Message to binary
func serializeMessage(msg Message) ([]byte, error) {
buf := new(bytes.Buffer)
if err := binary.Write(buf, binary.BigEndian, msg.ID); err != nil {
return nil, err
}
if err := binary.Write(buf, binary.BigEndian, msg.Type); err != nil {
return nil, err
}
// Content
contentBytes := []byte(msg.Content)
if len(contentBytes) > 65535 {
return nil, errors.New("content too long")
}
if err := binary.Write(buf, binary.BigEndian, uint16(len(contentBytes))); err != nil {
return nil, err
}
buf.Write(contentBytes)
// Target
targetBytes := []byte(msg.Target)
if len(targetBytes) > 255 {
return nil, errors.New("target name too long")
}
if err := buf.WriteByte(uint8(len(targetBytes))); err != nil {
return nil, err
}
buf.Write(targetBytes)
return buf.Bytes(), nil
}
// Deserialize binary to Message
func deserializeMessage(data []byte) (Message, error) {
buf := bytes.NewReader(data)
var msg Message
if err := binary.Read(buf, binary.BigEndian, &msg.ID); err != nil {
return msg, err
}
if err := binary.Read(buf, binary.BigEndian, &msg.Type); err != nil {
return msg, err
}
var contentLen uint16
if err := binary.Read(buf, binary.BigEndian, &contentLen); err != nil {
return msg, err
}
content := make([]byte, contentLen)
if _, err := io.ReadFull(buf, content); err != nil {
return msg, err
}
msg.Content = content
var targetLen uint8
if err := binary.Read(buf, binary.BigEndian, &targetLen); err != nil {
return msg, err
}
target := make([]byte, targetLen)
if _, err := io.ReadFull(buf, target); err != nil {
return msg, err
}
msg.Target = string(target)
return msg, nil
}
func writeString(buf *bytes.Buffer, s string) error {
if err := binary.Write(buf, binary.BigEndian, uint16(len(s))); err != nil {
return err
}
_, err := buf.Write([]byte(s))
return err
}
func readString(buf *bytes.Reader) (string, error) {
var length uint16
if err := binary.Read(buf, binary.BigEndian, &length); err != nil {
return "", err
}
strBytes := make([]byte, length)
if _, err := io.ReadFull(buf, strBytes); err != nil {
return "", err
}
return string(strBytes), nil
}
type testPayload struct {
Message string `json:"message"`
ResponseAddr string `json:"ResponseAddr"`
}
func handleTestMessage(msg Message) {
var payload testPayload
if err := json.Unmarshal([]byte(msg.Content), &payload); err != nil {
printWarn("Failed to parse test payload: %v", err)
return
}
printDebug("Received message: %s", payload.Message)
printInfo("Received TEST message: %s", payload.Message)
reply := Message{
ID: msg.ID,
Type: MsgTypeTest,
Content: []byte("hello test"),
Target: payload.ResponseAddr,
}
if err := sendMessage(reply); err != nil {
printWarn("Failed to send test response: %v", err)
}
}