Search/node.go

340 lines
7.7 KiB
Go
Raw Normal View History

//go:build experimental
// +build experimental
2024-06-29 21:27:48 +02:00
package main
import (
2024-06-30 23:20:52 +02:00
"bytes"
"encoding/binary"
2024-06-30 23:20:52 +02:00
"encoding/json"
"errors"
2024-06-29 21:27:48 +02:00
"fmt"
"io"
"net"
2024-06-29 21:27:48 +02:00
"net/http"
"os"
2024-06-29 21:27:48 +02:00
"time"
)
2024-06-30 23:20:52 +02:00
var (
sockets []string
hostID string
socketDir string
2024-06-29 21:27:48 +02:00
)
2024-06-30 23:20:52 +02:00
type Message struct {
ID uint32
Type uint8
Content []byte
Target string
2024-06-30 23:20:52 +02:00
}
const (
MsgTypeNone uint8 = 0
MsgTypeTest uint8 = 1
// Request types (1099)
MsgTypeSearchTextRequest uint8 = 10
MsgTypeSearchImageRawRequest uint8 = 11
MsgTypeSearchImageThumbRequest uint8 = 12
MsgTypeSearchImageFullRequest uint8 = 13
MsgTypeSearchImageAllRequest uint8 = 14
MsgTypeSearchVideoRequest uint8 = 15
MsgTypeSearchFileRequest uint8 = 16
MsgTypeSearchForumRequest uint8 = 17
MsgTypeSearchMusicRequest uint8 = 18
// Response types (110199)
MsgTypeSearchTextResponse uint8 = 110
MsgTypeSearchImageResponse uint8 = 111
MsgTypeSearchVideoResponse uint8 = 112
MsgTypeSearchFileResponse uint8 = 113
MsgTypeSearchForumResponse uint8 = 114
MsgTypeSearchMusicResponse uint8 = 115
)
2024-06-30 23:20:52 +02:00
func loadNodeConfig() {
sockets = config.Nodes
socketDir = "/tmp/" // Directory where sockets are stored, for now fixed tmp dir, can be changed later
2024-06-30 23:20:52 +02:00
}
2024-06-29 21:27:48 +02:00
var messageIDCounter uint32 = 0
func generateMessageID() uint32 {
if messageIDCounter == ^uint32(0) { // 0xFFFFFFFF
messageIDCounter = 1
} else {
messageIDCounter++
2024-06-29 21:27:48 +02:00
}
return messageIDCounter
2024-08-08 13:35:50 +02:00
}
func encodeSearchTextParams(query, safe, lang string, page int) ([]byte, error) {
buf := new(bytes.Buffer)
if err := writeString(buf, query); err != nil {
return nil, err
}
if err := writeString(buf, safe); err != nil {
return nil, err
}
if err := writeString(buf, lang); err != nil {
return nil, err
}
if err := binary.Write(buf, binary.BigEndian, uint16(page)); err != nil {
return nil, err
2024-08-08 21:59:10 +02:00
}
return buf.Bytes(), nil
}
func sendMessage(msg Message) error {
socketPath := socketDir + msg.Target + ".sock"
conn, err := net.Dial("unix", socketPath)
2024-08-08 13:35:50 +02:00
if err != nil {
return fmt.Errorf("failed to connect to socket %s: %v", socketPath, err)
2024-06-30 23:20:52 +02:00
}
defer conn.Close()
2024-06-29 21:27:48 +02:00
msgBytes, err := serializeMessage(msg)
2024-08-08 13:35:50 +02:00
if err != nil {
return fmt.Errorf("serialization error: %v", err)
2024-06-29 21:27:48 +02:00
}
_, err = conn.Write(msgBytes)
return err
}
func startUnixSocketServer(socketName string) {
socketPath := socketDir + socketName + ".sock"
if _, err := os.Stat(socketPath); err == nil {
os.Remove(socketPath)
2024-08-08 13:35:50 +02:00
}
listener, err := net.Listen("unix", socketPath)
2024-08-08 13:35:50 +02:00
if err != nil {
panic(fmt.Sprintf("Failed to listen on %s: %v", socketPath, err))
2024-08-08 13:35:50 +02:00
}
defer listener.Close()
os.Chmod(socketPath, 0666)
printInfo("Listening on UNIX socket: %s", socketPath)
2024-08-08 13:35:50 +02:00
for {
conn, err := listener.Accept()
if err != nil {
printWarn("Accept error: %v", err)
continue
}
2024-08-08 13:35:50 +02:00
go func(c net.Conn) {
defer c.Close()
buf, err := io.ReadAll(c)
if err != nil {
printWarn("Read error: %v", err)
return
}
msg, err := deserializeMessage(buf)
if err != nil {
printWarn("Deserialization error: %v", err)
return
}
printDebug("Received binary message: %+v", msg)
interpretMessage(msg)
}(conn)
}
2024-06-30 23:20:52 +02:00
}
2024-08-08 13:35:50 +02:00
func handleNodeRequest(w http.ResponseWriter, r *http.Request) {
2024-07-05 03:08:35 +02:00
if r.Method != http.MethodPost {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
return
}
2024-08-08 13:35:50 +02:00
var msg Message
err := json.NewDecoder(r.Body).Decode(&msg)
if err != nil {
2024-07-05 03:08:35 +02:00
http.Error(w, "Error parsing JSON", http.StatusBadRequest)
return
}
2024-08-08 13:35:50 +02:00
defer r.Body.Close()
2024-07-05 03:08:35 +02:00
printDebug("Received HTTP message: %+v", msg)
2024-08-08 13:35:50 +02:00
interpretMessage(msg)
2024-07-05 03:08:35 +02:00
}
func startNodeClientUnix() {
var idCounter uint32 = 0
2024-08-08 13:35:50 +02:00
for {
msg := Message{
ID: idCounter,
Type: MsgTypeTest,
Content: []byte("This is a test message via UNIX socket"),
Target: "node2", ///!!!
}
idCounter++
2024-08-08 13:35:50 +02:00
if err := sendMessage(msg); err != nil {
printWarn("Send error: %v", err)
2024-08-08 13:35:50 +02:00
}
time.Sleep(10 * time.Second)
}
2024-07-05 03:08:35 +02:00
}
2024-06-30 23:20:52 +02:00
func interpretMessage(msg Message) {
printDebug("Received message: %s", msg.Content)
2024-06-30 23:20:52 +02:00
switch msg.Type {
case MsgTypeTest:
handleTestMessage(msg)
case MsgTypeSearchTextRequest:
2024-08-08 21:59:10 +02:00
handleSearchTextMessage(msg)
case MsgTypeSearchImageRawRequest, MsgTypeSearchImageThumbRequest, MsgTypeSearchImageFullRequest, MsgTypeSearchImageAllRequest:
2024-08-08 21:59:10 +02:00
handleSearchImageMessage(msg)
case MsgTypeSearchVideoRequest:
2024-08-08 21:59:10 +02:00
handleSearchVideoMessage(msg)
case MsgTypeSearchFileRequest:
2024-08-08 21:59:10 +02:00
handleSearchFileMessage(msg)
case MsgTypeSearchForumRequest:
2024-08-08 21:59:10 +02:00
handleSearchForumMessage(msg)
case MsgTypeSearchMusicRequest:
handleSearchMusicMessage(msg)
case MsgTypeSearchTextResponse:
handleTextResultsMessage(msg)
case MsgTypeSearchImageResponse:
handleImageResultsMessage(msg)
case MsgTypeSearchVideoResponse:
handleVideoResultsMessage(msg)
case MsgTypeSearchFileResponse:
handleFileResultsMessage(msg)
case MsgTypeSearchForumResponse:
handleForumResultsMessage(msg)
case MsgTypeSearchMusicResponse:
handleMusicResultsMessage(msg)
2024-06-30 23:20:52 +02:00
default:
printWarn("Unknown message type: %d", msg.Type)
}
}
// Serialize Message to binary
func serializeMessage(msg Message) ([]byte, error) {
buf := new(bytes.Buffer)
if err := binary.Write(buf, binary.BigEndian, msg.ID); err != nil {
return nil, err
}
if err := binary.Write(buf, binary.BigEndian, msg.Type); err != nil {
return nil, err
}
// Content
contentBytes := []byte(msg.Content)
if len(contentBytes) > 65535 {
return nil, errors.New("content too long")
}
if err := binary.Write(buf, binary.BigEndian, uint16(len(contentBytes))); err != nil {
return nil, err
}
buf.Write(contentBytes)
// Target
targetBytes := []byte(msg.Target)
if len(targetBytes) > 255 {
return nil, errors.New("target name too long")
}
if err := buf.WriteByte(uint8(len(targetBytes))); err != nil {
return nil, err
}
buf.Write(targetBytes)
return buf.Bytes(), nil
}
// Deserialize binary to Message
func deserializeMessage(data []byte) (Message, error) {
buf := bytes.NewReader(data)
var msg Message
if err := binary.Read(buf, binary.BigEndian, &msg.ID); err != nil {
return msg, err
}
if err := binary.Read(buf, binary.BigEndian, &msg.Type); err != nil {
return msg, err
}
var contentLen uint16
if err := binary.Read(buf, binary.BigEndian, &contentLen); err != nil {
return msg, err
}
content := make([]byte, contentLen)
if _, err := io.ReadFull(buf, content); err != nil {
return msg, err
}
msg.Content = content
var targetLen uint8
if err := binary.Read(buf, binary.BigEndian, &targetLen); err != nil {
return msg, err
}
target := make([]byte, targetLen)
if _, err := io.ReadFull(buf, target); err != nil {
return msg, err
}
msg.Target = string(target)
return msg, nil
}
func writeString(buf *bytes.Buffer, s string) error {
if err := binary.Write(buf, binary.BigEndian, uint16(len(s))); err != nil {
return err
}
_, err := buf.Write([]byte(s))
return err
}
func readString(buf *bytes.Reader) (string, error) {
var length uint16
if err := binary.Read(buf, binary.BigEndian, &length); err != nil {
return "", err
}
strBytes := make([]byte, length)
if _, err := io.ReadFull(buf, strBytes); err != nil {
return "", err
}
return string(strBytes), nil
}
type testPayload struct {
Message string `json:"message"`
ResponseAddr string `json:"ResponseAddr"`
}
func handleTestMessage(msg Message) {
var payload testPayload
if err := json.Unmarshal([]byte(msg.Content), &payload); err != nil {
printWarn("Failed to parse test payload: %v", err)
return
}
printDebug("Received message: %s", payload.Message)
printInfo("Received TEST message: %s", payload.Message)
reply := Message{
ID: msg.ID,
Type: MsgTypeTest,
Content: []byte("hello test"),
Target: payload.ResponseAddr,
}
if err := sendMessage(reply); err != nil {
printWarn("Failed to send test response: %v", err)
2024-06-30 23:20:52 +02:00
}
}