Compare commits

...

5 Commits

Author SHA1 Message Date
Brandon Tuttle 7050c1251d
Merge ccfd63b84f into 951173ba19 2025-02-13 12:54:31 -05:00
Andrew 951173ba19
Restart mDNS every time the connection information changes (#155) 2025-02-13 18:10:47 +01:00
tutman96 ccfd63b84f Rename JSONRPCServer to JSONRPCRouter 2025-01-19 23:12:24 +00:00
tutman96 be319f38d7 Handle messages async to datachannel receive 2025-01-06 22:07:29 +00:00
tutman96 499382afd8 chore: Refactor jsonrpc server into its own package 2025-01-05 20:45:56 +00:00
5 changed files with 247 additions and 188 deletions

179
internal/jsonrpc/router.go Normal file
View File

@ -0,0 +1,179 @@
package jsonrpc
import (
"encoding/json"
"errors"
"fmt"
"io"
"reflect"
)
type JSONRPCRouter struct {
writer io.Writer
handlers map[string]*RPCHandler
}
func NewJSONRPCRouter(writer io.Writer, handlers map[string]*RPCHandler) *JSONRPCRouter {
return &JSONRPCRouter{
writer: writer,
handlers: handlers,
}
}
func (s *JSONRPCRouter) HandleMessage(data []byte) error {
var request JSONRPCRequest
err := json.Unmarshal(data, &request)
if err != nil {
errorResponse := JSONRPCResponse{
JSONRPC: "2.0",
Error: map[string]interface{}{
"code": -32700,
"message": "Parse error",
},
ID: 0,
}
return s.writeResponse(errorResponse)
}
//log.Printf("Received RPC request: Method=%s, Params=%v, ID=%d", request.Method, request.Params, request.ID)
handler, ok := s.handlers[request.Method]
if !ok {
errorResponse := JSONRPCResponse{
JSONRPC: "2.0",
Error: map[string]interface{}{
"code": -32601,
"message": "Method not found",
},
ID: request.ID,
}
return s.writeResponse(errorResponse)
}
result, err := callRPCHandler(handler, request.Params)
if err != nil {
errorResponse := JSONRPCResponse{
JSONRPC: "2.0",
Error: map[string]interface{}{
"code": -32603,
"message": "Internal error",
"data": err.Error(),
},
ID: request.ID,
}
return s.writeResponse(errorResponse)
}
response := JSONRPCResponse{
JSONRPC: "2.0",
Result: result,
ID: request.ID,
}
return s.writeResponse(response)
}
func (s *JSONRPCRouter) writeResponse(response JSONRPCResponse) error {
responseBytes, err := json.Marshal(response)
if err != nil {
return err
}
_, err = s.writer.Write(responseBytes)
return err
}
func callRPCHandler(handler *RPCHandler, params map[string]interface{}) (interface{}, error) {
handlerValue := reflect.ValueOf(handler.Func)
handlerType := handlerValue.Type()
if handlerType.Kind() != reflect.Func {
return nil, errors.New("handler is not a function")
}
numParams := handlerType.NumIn()
args := make([]reflect.Value, numParams)
// Get the parameter names from the RPCHandler
paramNames := handler.Params
if len(paramNames) != numParams {
return nil, errors.New("mismatch between handler parameters and defined parameter names")
}
for i := 0; i < numParams; i++ {
paramType := handlerType.In(i)
paramName := paramNames[i]
paramValue, ok := params[paramName]
if !ok {
return nil, errors.New("missing parameter: " + paramName)
}
convertedValue := reflect.ValueOf(paramValue)
if !convertedValue.Type().ConvertibleTo(paramType) {
if paramType.Kind() == reflect.Slice && (convertedValue.Kind() == reflect.Slice || convertedValue.Kind() == reflect.Array) {
newSlice := reflect.MakeSlice(paramType, convertedValue.Len(), convertedValue.Len())
for j := 0; j < convertedValue.Len(); j++ {
elemValue := convertedValue.Index(j)
if elemValue.Kind() == reflect.Interface {
elemValue = elemValue.Elem()
}
if !elemValue.Type().ConvertibleTo(paramType.Elem()) {
// Handle float64 to uint8 conversion
if elemValue.Kind() == reflect.Float64 && paramType.Elem().Kind() == reflect.Uint8 {
intValue := int(elemValue.Float())
if intValue < 0 || intValue > 255 {
return nil, fmt.Errorf("value out of range for uint8: %v", intValue)
}
newSlice.Index(j).SetUint(uint64(intValue))
} else {
fromType := elemValue.Type()
toType := paramType.Elem()
return nil, fmt.Errorf("invalid element type in slice for parameter %s: from %v to %v", paramName, fromType, toType)
}
} else {
newSlice.Index(j).Set(elemValue.Convert(paramType.Elem()))
}
}
args[i] = newSlice
} else if paramType.Kind() == reflect.Struct && convertedValue.Kind() == reflect.Map {
jsonData, err := json.Marshal(convertedValue.Interface())
if err != nil {
return nil, fmt.Errorf("failed to marshal map to JSON: %v", err)
}
newStruct := reflect.New(paramType).Interface()
if err := json.Unmarshal(jsonData, newStruct); err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON into struct: %v", err)
}
args[i] = reflect.ValueOf(newStruct).Elem()
} else {
return nil, fmt.Errorf("invalid parameter type for: %s", paramName)
}
} else {
args[i] = convertedValue.Convert(paramType)
}
}
results := handlerValue.Call(args)
if len(results) == 0 {
return nil, nil
}
if len(results) == 1 {
if results[0].Type().Implements(reflect.TypeOf((*error)(nil)).Elem()) {
if !results[0].IsNil() {
return nil, results[0].Interface().(error)
}
return nil, nil
}
return results[0].Interface(), nil
}
if len(results) == 2 && results[1].Type().Implements(reflect.TypeOf((*error)(nil)).Elem()) {
if !results[1].IsNil() {
return nil, results[1].Interface().(error)
}
return results[0].Interface(), nil
}
return nil, errors.New("unexpected return values from handler")
}

26
internal/jsonrpc/types.go Normal file
View File

@ -0,0 +1,26 @@
package jsonrpc
type JSONRPCRequest struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params map[string]interface{} `json:"params,omitempty"`
ID interface{} `json:"id,omitempty"`
}
type JSONRPCResponse struct {
JSONRPC string `json:"jsonrpc"`
Result interface{} `json:"result,omitempty"`
Error interface{} `json:"error,omitempty"`
ID interface{} `json:"id"`
}
type JSONRPCEvent struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params interface{} `json:"params,omitempty"`
}
type RPCHandler struct {
Func interface{}
Params []string
}

View File

@ -5,50 +5,44 @@ import (
"encoding/json"
"errors"
"fmt"
"kvm/internal/jsonrpc"
"log"
"os"
"os/exec"
"path/filepath"
"reflect"
"github.com/pion/webrtc/v4"
)
type JSONRPCRequest struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params map[string]interface{} `json:"params,omitempty"`
ID interface{} `json:"id,omitempty"`
type DataChannelWriter struct {
dataChannel *webrtc.DataChannel
}
type JSONRPCResponse struct {
JSONRPC string `json:"jsonrpc"`
Result interface{} `json:"result,omitempty"`
Error interface{} `json:"error,omitempty"`
ID interface{} `json:"id"`
func NewDataChannelWriter(dataChannel *webrtc.DataChannel) *DataChannelWriter {
return &DataChannelWriter{
dataChannel: dataChannel,
}
}
type JSONRPCEvent struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params interface{} `json:"params,omitempty"`
}
func writeJSONRPCResponse(response JSONRPCResponse, session *Session) {
responseBytes, err := json.Marshal(response)
if err != nil {
log.Println("Error marshalling JSONRPC response:", err)
return
}
err = session.RPCChannel.SendText(string(responseBytes))
func (w *DataChannelWriter) Write(data []byte) (int, error) {
err := w.dataChannel.SendText(string(data))
if err != nil {
log.Println("Error sending JSONRPC response:", err)
return
return 0, err
}
return len(data), nil
}
func NewDataChannelJsonRpcRouter(dataChannel *webrtc.DataChannel) *jsonrpc.JSONRPCRouter {
return jsonrpc.NewJSONRPCRouter(
NewDataChannelWriter(dataChannel),
rpcHandlers,
)
}
// TODO: embed this into the session's rpc server
func writeJSONRPCEvent(event string, params interface{}, session *Session) {
request := JSONRPCEvent{
request := jsonrpc.JSONRPCEvent{
JSONRPC: "2.0",
Method: event,
Params: params,
@ -69,60 +63,6 @@ func writeJSONRPCEvent(event string, params interface{}, session *Session) {
}
}
func onRPCMessage(message webrtc.DataChannelMessage, session *Session) {
var request JSONRPCRequest
err := json.Unmarshal(message.Data, &request)
if err != nil {
errorResponse := JSONRPCResponse{
JSONRPC: "2.0",
Error: map[string]interface{}{
"code": -32700,
"message": "Parse error",
},
ID: 0,
}
writeJSONRPCResponse(errorResponse, session)
return
}
//log.Printf("Received RPC request: Method=%s, Params=%v, ID=%d", request.Method, request.Params, request.ID)
handler, ok := rpcHandlers[request.Method]
if !ok {
errorResponse := JSONRPCResponse{
JSONRPC: "2.0",
Error: map[string]interface{}{
"code": -32601,
"message": "Method not found",
},
ID: request.ID,
}
writeJSONRPCResponse(errorResponse, session)
return
}
result, err := callRPCHandler(handler, request.Params)
if err != nil {
errorResponse := JSONRPCResponse{
JSONRPC: "2.0",
Error: map[string]interface{}{
"code": -32603,
"message": "Internal error",
"data": err.Error(),
},
ID: request.ID,
}
writeJSONRPCResponse(errorResponse, session)
return
}
response := JSONRPCResponse{
JSONRPC: "2.0",
Result: result,
ID: request.ID,
}
writeJSONRPCResponse(response, session)
}
func rpcPing() (string, error) {
return "pong", nil
}
@ -321,108 +261,6 @@ func rpcSetSSHKeyState(sshKey string) error {
return nil
}
func callRPCHandler(handler RPCHandler, params map[string]interface{}) (interface{}, error) {
handlerValue := reflect.ValueOf(handler.Func)
handlerType := handlerValue.Type()
if handlerType.Kind() != reflect.Func {
return nil, errors.New("handler is not a function")
}
numParams := handlerType.NumIn()
args := make([]reflect.Value, numParams)
// Get the parameter names from the RPCHandler
paramNames := handler.Params
if len(paramNames) != numParams {
return nil, errors.New("mismatch between handler parameters and defined parameter names")
}
for i := 0; i < numParams; i++ {
paramType := handlerType.In(i)
paramName := paramNames[i]
paramValue, ok := params[paramName]
if !ok {
return nil, errors.New("missing parameter: " + paramName)
}
convertedValue := reflect.ValueOf(paramValue)
if !convertedValue.Type().ConvertibleTo(paramType) {
if paramType.Kind() == reflect.Slice && (convertedValue.Kind() == reflect.Slice || convertedValue.Kind() == reflect.Array) {
newSlice := reflect.MakeSlice(paramType, convertedValue.Len(), convertedValue.Len())
for j := 0; j < convertedValue.Len(); j++ {
elemValue := convertedValue.Index(j)
if elemValue.Kind() == reflect.Interface {
elemValue = elemValue.Elem()
}
if !elemValue.Type().ConvertibleTo(paramType.Elem()) {
// Handle float64 to uint8 conversion
if elemValue.Kind() == reflect.Float64 && paramType.Elem().Kind() == reflect.Uint8 {
intValue := int(elemValue.Float())
if intValue < 0 || intValue > 255 {
return nil, fmt.Errorf("value out of range for uint8: %v", intValue)
}
newSlice.Index(j).SetUint(uint64(intValue))
} else {
fromType := elemValue.Type()
toType := paramType.Elem()
return nil, fmt.Errorf("invalid element type in slice for parameter %s: from %v to %v", paramName, fromType, toType)
}
} else {
newSlice.Index(j).Set(elemValue.Convert(paramType.Elem()))
}
}
args[i] = newSlice
} else if paramType.Kind() == reflect.Struct && convertedValue.Kind() == reflect.Map {
jsonData, err := json.Marshal(convertedValue.Interface())
if err != nil {
return nil, fmt.Errorf("failed to marshal map to JSON: %v", err)
}
newStruct := reflect.New(paramType).Interface()
if err := json.Unmarshal(jsonData, newStruct); err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON into struct: %v", err)
}
args[i] = reflect.ValueOf(newStruct).Elem()
} else {
return nil, fmt.Errorf("invalid parameter type for: %s", paramName)
}
} else {
args[i] = convertedValue.Convert(paramType)
}
}
results := handlerValue.Call(args)
if len(results) == 0 {
return nil, nil
}
if len(results) == 1 {
if results[0].Type().Implements(reflect.TypeOf((*error)(nil)).Elem()) {
if !results[0].IsNil() {
return nil, results[0].Interface().(error)
}
return nil, nil
}
return results[0].Interface(), nil
}
if len(results) == 2 && results[1].Type().Implements(reflect.TypeOf((*error)(nil)).Elem()) {
if !results[1].IsNil() {
return nil, results[1].Interface().(error)
}
return results[0].Interface(), nil
}
return nil, errors.New("unexpected return values from handler")
}
type RPCHandler struct {
Func interface{}
Params []string
}
func rpcSetMassStorageMode(mode string) (string, error) {
log.Printf("[jsonrpc.go:rpcSetMassStorageMode] Setting mass storage mode to: %s", mode)
var cdrom bool
@ -514,7 +352,7 @@ func rpcResetConfig() error {
}
// TODO: replace this crap with code generator
var rpcHandlers = map[string]RPCHandler{
var rpcHandlers = map[string]*jsonrpc.RPCHandler{
"ping": {Func: rpcPing},
"getDeviceID": {Func: rpcGetDeviceID},
"deregisterDevice": {Func: rpcDeregisterDevice},

View File

@ -13,6 +13,8 @@ import (
"github.com/vishvananda/netlink/nl"
)
var mDNSConn *mdns.Conn
var networkState struct {
Up bool
IPv4 string
@ -91,13 +93,26 @@ func checkNetworkState() {
}
if newState != networkState {
networkState = newState
fmt.Println("network state changed")
//restart MDNS
startMDNS()
networkState = newState
requestDisplayUpdate()
}
}
func startMDNS() error {
//If server was previously running, stop it
if mDNSConn != nil {
fmt.Printf("Stopping mDNS server\n")
err := mDNSConn.Close()
if err != nil {
fmt.Printf("failed to stop mDNS server: %v\n", err)
}
}
//Start a new server
fmt.Printf("Starting mDNS server on jetkvm.local\n")
addr4, err := net.ResolveUDPAddr("udp4", mdns.DefaultAddressIPv4)
if err != nil {
return err
@ -118,10 +133,11 @@ func startMDNS() error {
return err
}
_, err = mdns.Server(ipv4.NewPacketConn(l4), ipv6.NewPacketConn(l6), &mdns.Config{
mDNSConn, err = mdns.Server(ipv4.NewPacketConn(l4), ipv6.NewPacketConn(l6), &mdns.Config{
LocalNames: []string{"jetkvm.local"}, //TODO: make it configurable
})
if err != nil {
mDNSConn = nil
return err
}
//defer server.Close()
@ -157,7 +173,6 @@ func init() {
}
}
}()
fmt.Println("Starting mDNS server")
err := startMDNS()
if err != nil {
fmt.Println("failed to run mDNS: %v", err)

View File

@ -102,8 +102,9 @@ func newSession(config SessionConfig) (*Session, error) {
switch d.Label() {
case "rpc":
session.RPCChannel = d
rpcServer := NewDataChannelJsonRpcRouter(d)
d.OnMessage(func(msg webrtc.DataChannelMessage) {
go onRPCMessage(msg, session)
go rpcServer.HandleMessage(msg.Data)
})
triggerOTAStateUpdate()
triggerVideoStateUpdate()