339 lines
7.7 KiB
Go
339 lines
7.7 KiB
Go
//go:build experimental
|
||
// +build experimental
|
||
|
||
package main
|
||
|
||
import (
|
||
"bytes"
|
||
"encoding/binary"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"net"
|
||
"net/http"
|
||
"os"
|
||
"time"
|
||
)
|
||
|
||
var (
|
||
sockets []string
|
||
hostID string
|
||
socketDir string
|
||
)
|
||
|
||
type Message struct {
|
||
ID uint32
|
||
Type uint8
|
||
Content []byte
|
||
Target string
|
||
}
|
||
|
||
const (
|
||
MsgTypeNone uint8 = 0
|
||
MsgTypeTest uint8 = 1
|
||
|
||
// Request types (10–99)
|
||
MsgTypeSearchTextRequest uint8 = 10
|
||
MsgTypeSearchImageRawRequest uint8 = 11
|
||
MsgTypeSearchImageThumbRequest uint8 = 12
|
||
MsgTypeSearchImageFullRequest uint8 = 13
|
||
MsgTypeSearchImageAllRequest uint8 = 14
|
||
MsgTypeSearchVideoRequest uint8 = 15
|
||
MsgTypeSearchFileRequest uint8 = 16
|
||
MsgTypeSearchForumRequest uint8 = 17
|
||
MsgTypeSearchMusicRequest uint8 = 18
|
||
|
||
// Response types (110–199)
|
||
MsgTypeSearchTextResponse uint8 = 110
|
||
MsgTypeSearchImageResponse uint8 = 111
|
||
MsgTypeSearchVideoResponse uint8 = 112
|
||
MsgTypeSearchFileResponse uint8 = 113
|
||
MsgTypeSearchForumResponse uint8 = 114
|
||
MsgTypeSearchMusicResponse uint8 = 115
|
||
)
|
||
|
||
func loadNodeConfig() {
|
||
sockets = config.Nodes
|
||
socketDir = "/tmp/" // Directory where sockets are stored, for now fixed tmp dir, can be changed later
|
||
}
|
||
|
||
var messageIDCounter uint32 = 0
|
||
|
||
func generateMessageID() uint32 {
|
||
if messageIDCounter == ^uint32(0) { // 0xFFFFFFFF
|
||
messageIDCounter = 1
|
||
} else {
|
||
messageIDCounter++
|
||
}
|
||
return messageIDCounter
|
||
}
|
||
|
||
func encodeSearchTextParams(query, safe, lang string, page int) ([]byte, error) {
|
||
buf := new(bytes.Buffer)
|
||
|
||
if err := writeString(buf, query); err != nil {
|
||
return nil, err
|
||
}
|
||
if err := writeString(buf, safe); err != nil {
|
||
return nil, err
|
||
}
|
||
if err := writeString(buf, lang); err != nil {
|
||
return nil, err
|
||
}
|
||
if err := binary.Write(buf, binary.BigEndian, uint16(page)); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return buf.Bytes(), nil
|
||
}
|
||
|
||
func sendMessage(msg Message) error {
|
||
socketPath := socketDir + msg.Target + ".sock"
|
||
|
||
conn, err := net.Dial("unix", socketPath)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to connect to socket %s: %v", socketPath, err)
|
||
}
|
||
defer conn.Close()
|
||
|
||
msgBytes, err := serializeMessage(msg)
|
||
if err != nil {
|
||
return fmt.Errorf("serialization error: %v", err)
|
||
}
|
||
|
||
_, err = conn.Write(msgBytes)
|
||
return err
|
||
}
|
||
|
||
func startUnixSocketServer(socketName string) {
|
||
socketPath := socketDir + socketName + ".sock"
|
||
|
||
if _, err := os.Stat(socketPath); err == nil {
|
||
os.Remove(socketPath)
|
||
}
|
||
|
||
listener, err := net.Listen("unix", socketPath)
|
||
if err != nil {
|
||
panic(fmt.Sprintf("Failed to listen on %s: %v", socketPath, err))
|
||
}
|
||
defer listener.Close()
|
||
os.Chmod(socketPath, 0666)
|
||
|
||
printInfo("Listening on UNIX socket: %s", socketPath)
|
||
|
||
for {
|
||
conn, err := listener.Accept()
|
||
if err != nil {
|
||
printWarn("Accept error: %v", err)
|
||
continue
|
||
}
|
||
|
||
go func(c net.Conn) {
|
||
defer c.Close()
|
||
buf, err := io.ReadAll(c)
|
||
if err != nil {
|
||
printWarn("Read error: %v", err)
|
||
return
|
||
}
|
||
|
||
msg, err := deserializeMessage(buf)
|
||
if err != nil {
|
||
printWarn("Deserialization error: %v", err)
|
||
return
|
||
}
|
||
|
||
printDebug("Received binary message: %+v", msg)
|
||
interpretMessage(msg)
|
||
}(conn)
|
||
}
|
||
}
|
||
|
||
func handleNodeRequest(w http.ResponseWriter, r *http.Request) {
|
||
if r.Method != http.MethodPost {
|
||
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
|
||
return
|
||
}
|
||
|
||
var msg Message
|
||
err := json.NewDecoder(r.Body).Decode(&msg)
|
||
if err != nil {
|
||
http.Error(w, "Error parsing JSON", http.StatusBadRequest)
|
||
return
|
||
}
|
||
defer r.Body.Close()
|
||
|
||
printDebug("Received HTTP message: %+v", msg)
|
||
interpretMessage(msg)
|
||
}
|
||
|
||
func startNodeClientUnix() {
|
||
var idCounter uint32 = 0
|
||
|
||
for {
|
||
msg := Message{
|
||
ID: idCounter,
|
||
Type: MsgTypeTest,
|
||
Content: []byte("This is a test message via UNIX socket"),
|
||
Target: "node2", ///!!!
|
||
}
|
||
idCounter++
|
||
|
||
if err := sendMessage(msg); err != nil {
|
||
printWarn("Send error: %v", err)
|
||
}
|
||
time.Sleep(10 * time.Second)
|
||
}
|
||
}
|
||
|
||
func interpretMessage(msg Message) {
|
||
printDebug("Received message: %s", msg.Content)
|
||
|
||
switch msg.Type {
|
||
case MsgTypeTest:
|
||
handleTestMessage(msg)
|
||
case MsgTypeSearchTextRequest:
|
||
handleSearchTextMessage(msg)
|
||
case MsgTypeSearchImageRawRequest, MsgTypeSearchImageThumbRequest, MsgTypeSearchImageFullRequest, MsgTypeSearchImageAllRequest:
|
||
handleSearchImageMessage(msg)
|
||
case MsgTypeSearchVideoRequest:
|
||
handleSearchVideoMessage(msg)
|
||
case MsgTypeSearchFileRequest:
|
||
handleSearchFileMessage(msg)
|
||
case MsgTypeSearchForumRequest:
|
||
handleSearchForumMessage(msg)
|
||
case MsgTypeSearchMusicRequest:
|
||
handleSearchMusicMessage(msg)
|
||
|
||
case MsgTypeSearchTextResponse:
|
||
handleTextResultsMessage(msg)
|
||
case MsgTypeSearchImageResponse:
|
||
handleImageResultsMessage(msg)
|
||
case MsgTypeSearchVideoResponse:
|
||
handleVideoResultsMessage(msg)
|
||
case MsgTypeSearchFileResponse:
|
||
handleFileResultsMessage(msg)
|
||
case MsgTypeSearchForumResponse:
|
||
handleForumResultsMessage(msg)
|
||
case MsgTypeSearchMusicResponse:
|
||
handleMusicResultsMessage(msg)
|
||
|
||
default:
|
||
printWarn("Unknown message type: %d", msg.Type)
|
||
}
|
||
}
|
||
|
||
// Serialize Message to binary
|
||
func serializeMessage(msg Message) ([]byte, error) {
|
||
buf := new(bytes.Buffer)
|
||
|
||
if err := binary.Write(buf, binary.BigEndian, msg.ID); err != nil {
|
||
return nil, err
|
||
}
|
||
if err := binary.Write(buf, binary.BigEndian, msg.Type); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// Content
|
||
contentBytes := []byte(msg.Content)
|
||
if len(contentBytes) > 65535 {
|
||
return nil, errors.New("content too long")
|
||
}
|
||
if err := binary.Write(buf, binary.BigEndian, uint16(len(contentBytes))); err != nil {
|
||
return nil, err
|
||
}
|
||
buf.Write(contentBytes)
|
||
|
||
// Target
|
||
targetBytes := []byte(msg.Target)
|
||
if len(targetBytes) > 255 {
|
||
return nil, errors.New("target name too long")
|
||
}
|
||
if err := buf.WriteByte(uint8(len(targetBytes))); err != nil {
|
||
return nil, err
|
||
}
|
||
buf.Write(targetBytes)
|
||
|
||
return buf.Bytes(), nil
|
||
}
|
||
|
||
// Deserialize binary to Message
|
||
func deserializeMessage(data []byte) (Message, error) {
|
||
buf := bytes.NewReader(data)
|
||
var msg Message
|
||
|
||
if err := binary.Read(buf, binary.BigEndian, &msg.ID); err != nil {
|
||
return msg, err
|
||
}
|
||
if err := binary.Read(buf, binary.BigEndian, &msg.Type); err != nil {
|
||
return msg, err
|
||
}
|
||
|
||
var contentLen uint16
|
||
if err := binary.Read(buf, binary.BigEndian, &contentLen); err != nil {
|
||
return msg, err
|
||
}
|
||
content := make([]byte, contentLen)
|
||
if _, err := io.ReadFull(buf, content); err != nil {
|
||
return msg, err
|
||
}
|
||
msg.Content = content
|
||
|
||
var targetLen uint8
|
||
if err := binary.Read(buf, binary.BigEndian, &targetLen); err != nil {
|
||
return msg, err
|
||
}
|
||
target := make([]byte, targetLen)
|
||
if _, err := io.ReadFull(buf, target); err != nil {
|
||
return msg, err
|
||
}
|
||
msg.Target = string(target)
|
||
|
||
return msg, nil
|
||
}
|
||
|
||
func writeString(buf *bytes.Buffer, s string) error {
|
||
if err := binary.Write(buf, binary.BigEndian, uint16(len(s))); err != nil {
|
||
return err
|
||
}
|
||
_, err := buf.Write([]byte(s))
|
||
return err
|
||
}
|
||
|
||
func readString(buf *bytes.Reader) (string, error) {
|
||
var length uint16
|
||
if err := binary.Read(buf, binary.BigEndian, &length); err != nil {
|
||
return "", err
|
||
}
|
||
strBytes := make([]byte, length)
|
||
if _, err := io.ReadFull(buf, strBytes); err != nil {
|
||
return "", err
|
||
}
|
||
return string(strBytes), nil
|
||
}
|
||
|
||
type testPayload struct {
|
||
Message string `json:"message"`
|
||
ResponseAddr string `json:"ResponseAddr"`
|
||
}
|
||
|
||
func handleTestMessage(msg Message) {
|
||
var payload testPayload
|
||
if err := json.Unmarshal([]byte(msg.Content), &payload); err != nil {
|
||
printWarn("Failed to parse test payload: %v", err)
|
||
return
|
||
}
|
||
printDebug("Received message: %s", payload.Message)
|
||
printInfo("Received TEST message: %s", payload.Message)
|
||
|
||
reply := Message{
|
||
ID: msg.ID,
|
||
Type: MsgTypeTest,
|
||
Content: []byte("hello test"),
|
||
Target: payload.ResponseAddr,
|
||
}
|
||
|
||
if err := sendMessage(reply); err != nil {
|
||
printWarn("Failed to send test response: %v", err)
|
||
}
|
||
}
|