mirror of https://github.com/jetkvm/kvm.git
186 lines
5.0 KiB
Go
186 lines
5.0 KiB
Go
package audio
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/jetkvm/kvm/internal/logging"
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
// Buffer pool for zero-allocation writes
|
|
var writeBufferPool = sync.Pool{
|
|
New: func() interface{} {
|
|
buf := make([]byte, ipcHeaderSize+ipcMaxFrameSize)
|
|
return &buf
|
|
},
|
|
}
|
|
|
|
// IPC Protocol constants (matches C implementation in ipc_protocol.h)
|
|
const (
|
|
ipcMagicOutput = 0x4A4B4F55 // "JKOU" - Output (device → browser)
|
|
ipcMagicInput = 0x4A4B4D49 // "JKMI" - Input (browser → device)
|
|
ipcHeaderSize = 9 // Reduced from 17 (removed 8-byte timestamp)
|
|
ipcMaxFrameSize = 1024 // 128kbps @ 20ms = ~600 bytes worst case with VBR+FEC
|
|
ipcMsgTypeOpus = 0
|
|
ipcMsgTypeConfig = 1
|
|
ipcMsgTypeStop = 3
|
|
connectTimeout = 5 * time.Second
|
|
readTimeout = 2 * time.Second
|
|
)
|
|
|
|
// IPCSource implements AudioSource via Unix socket communication with audio subprocess
|
|
type IPCSource struct {
|
|
socketPath string
|
|
magicNumber uint32
|
|
conn net.Conn
|
|
mu sync.Mutex
|
|
logger zerolog.Logger
|
|
readBuf []byte // Reusable buffer for reads (single reader per client)
|
|
}
|
|
|
|
// NewIPCSource creates a new IPC audio source
|
|
// For output: socketPath="/var/run/audio_output.sock", magic=ipcMagicOutput
|
|
// For input: socketPath="/var/run/audio_input.sock", magic=ipcMagicInput
|
|
func NewIPCSource(name, socketPath string, magicNumber uint32) *IPCSource {
|
|
logger := logging.GetDefaultLogger().With().Str("component", name+"-ipc").Logger()
|
|
|
|
return &IPCSource{
|
|
socketPath: socketPath,
|
|
magicNumber: magicNumber,
|
|
logger: logger,
|
|
readBuf: make([]byte, ipcMaxFrameSize),
|
|
}
|
|
}
|
|
|
|
// Connect establishes connection to the subprocess
|
|
func (c *IPCSource) Connect() error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.conn != nil {
|
|
c.conn.Close()
|
|
c.conn = nil
|
|
}
|
|
|
|
conn, err := net.DialTimeout("unix", c.socketPath, connectTimeout)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to %s: %w", c.socketPath, err)
|
|
}
|
|
|
|
c.conn = conn
|
|
c.logger.Debug().Str("socket", c.socketPath).Msg("connected to subprocess")
|
|
return nil
|
|
}
|
|
|
|
// Disconnect closes the connection
|
|
func (c *IPCSource) Disconnect() {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.conn != nil {
|
|
c.conn.Close()
|
|
c.conn = nil
|
|
c.logger.Debug().Msg("disconnected from subprocess")
|
|
}
|
|
}
|
|
|
|
// IsConnected returns true if currently connected
|
|
func (c *IPCSource) IsConnected() bool {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
return c.conn != nil
|
|
}
|
|
|
|
// ReadMessage reads a complete IPC message (header + payload)
|
|
// Returns message type, payload data, and error
|
|
// IMPORTANT: The returned payload slice is only valid until the next ReadMessage call.
|
|
// Callers must use the data immediately or copy if retention is needed.
|
|
func (c *IPCSource) ReadMessage() (uint8, []byte, error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.conn == nil {
|
|
return 0, nil, fmt.Errorf("not connected")
|
|
}
|
|
|
|
// Set read deadline
|
|
if err := c.conn.SetReadDeadline(time.Now().Add(readTimeout)); err != nil {
|
|
return 0, nil, fmt.Errorf("failed to set read deadline: %w", err)
|
|
}
|
|
|
|
// Read 9-byte header
|
|
var header [ipcHeaderSize]byte
|
|
if _, err := io.ReadFull(c.conn, header[:]); err != nil {
|
|
return 0, nil, fmt.Errorf("failed to read header: %w", err)
|
|
}
|
|
|
|
// Parse header (little-endian)
|
|
magic := binary.LittleEndian.Uint32(header[0:4])
|
|
msgType := header[4]
|
|
length := binary.LittleEndian.Uint32(header[5:9])
|
|
|
|
// Validate magic number
|
|
if magic != c.magicNumber {
|
|
return 0, nil, fmt.Errorf("invalid magic: got 0x%X, expected 0x%X", magic, c.magicNumber)
|
|
}
|
|
|
|
// Validate length
|
|
if length > ipcMaxFrameSize {
|
|
return 0, nil, fmt.Errorf("message too large: %d bytes", length)
|
|
}
|
|
|
|
// Read payload if present
|
|
if length == 0 {
|
|
return msgType, nil, nil
|
|
}
|
|
|
|
// Read directly into reusable buffer (zero-allocation)
|
|
if _, err := io.ReadFull(c.conn, c.readBuf[:length]); err != nil {
|
|
return 0, nil, fmt.Errorf("failed to read payload: %w", err)
|
|
}
|
|
|
|
// Return slice of readBuf - caller must use immediately, data is only valid until next ReadMessage
|
|
// This avoids allocation in hot path (50 frames/sec)
|
|
return msgType, c.readBuf[:length], nil
|
|
}
|
|
|
|
// WriteMessage writes a complete IPC message
|
|
func (c *IPCSource) WriteMessage(msgType uint8, payload []byte) error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.conn == nil {
|
|
return fmt.Errorf("not connected")
|
|
}
|
|
|
|
length := uint32(len(payload))
|
|
if length > ipcMaxFrameSize {
|
|
return fmt.Errorf("payload too large: %d bytes", length)
|
|
}
|
|
|
|
// Get buffer from pool for zero-allocation write
|
|
bufPtr := writeBufferPool.Get().(*[]byte)
|
|
defer writeBufferPool.Put(bufPtr)
|
|
buf := *bufPtr
|
|
|
|
// Build header in pooled buffer (9 bytes, little-endian)
|
|
binary.LittleEndian.PutUint32(buf[0:4], c.magicNumber)
|
|
buf[4] = msgType
|
|
binary.LittleEndian.PutUint32(buf[5:9], length)
|
|
|
|
// Copy payload after header
|
|
copy(buf[ipcHeaderSize:], payload)
|
|
|
|
// Write header + payload atomically
|
|
if _, err := c.conn.Write(buf[:ipcHeaderSize+length]); err != nil {
|
|
return fmt.Errorf("failed to write message: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|