diff --git a/internal/jsonrpc/rpc_server.go b/internal/jsonrpc/rpc_server.go index bdaef1b..5f8e870 100644 --- a/internal/jsonrpc/rpc_server.go +++ b/internal/jsonrpc/rpc_server.go @@ -5,31 +5,152 @@ import ( "errors" "fmt" "io" + "log" "reflect" + "sync" + "sync/atomic" + "time" ) type JSONRPCServer struct { writer io.Writer handlers map[string]*RPCHandler + nextId atomic.Int64 + + responseChannelsMutex sync.Mutex + responseChannels map[int64]chan JSONRPCResponse } func NewJSONRPCServer(writer io.Writer, handlers map[string]*RPCHandler) *JSONRPCServer { return &JSONRPCServer{ - writer: writer, - handlers: handlers, + writer: writer, + handlers: handlers, + responseChannels: make(map[int64]chan JSONRPCResponse), + nextId: atomic.Int64{}, } } +func (s *JSONRPCServer) Request(method string, params map[string]interface{}, result interface{}) *JSONRPCResponseError { + id := s.nextId.Add(1) + request := JSONRPCRequest{ + JSONRPC: "2.0", + Method: method, + Params: params, + ID: id, + } + requestBytes, err := json.Marshal(request) + if err != nil { + return &JSONRPCResponseError{ + Code: -32700, + Message: "Parse error", + Data: err, + } + } + + // log.Printf("Sending RPC request: Method=%s, Params=%v, ID=%d", method, params, id) + + responseChan := make(chan JSONRPCResponse, 1) + s.responseChannelsMutex.Lock() + s.responseChannels[id] = responseChan + s.responseChannelsMutex.Unlock() + defer func() { + s.responseChannelsMutex.Lock() + delete(s.responseChannels, id) + s.responseChannelsMutex.Unlock() + }() + + _, err = s.writer.Write(requestBytes) + if err != nil { + return &JSONRPCResponseError{ + Code: -32603, + Message: "Internal error", + Data: err, + } + } + + timeout := time.After(5 * time.Second) + select { + case response := <-responseChan: + if response.Error != nil { + return response.Error + } + + rawResult, err := json.Marshal(response.Result) + if err != nil { + return &JSONRPCResponseError{ + Code: -32603, + Message: "Internal error", + Data: err, + } + } + + if err := json.Unmarshal(rawResult, result); err != nil { + return &JSONRPCResponseError{ + Code: -32603, + Message: "Internal error", + Data: err, + } + } + + return nil + case <-timeout: + return &JSONRPCResponseError{ + Code: -32603, + Message: "Internal error", + Data: "timeout waiting for response", + } + } +} + +type JSONRPCMessage struct { + Method *string `json:"method,omitempty"` + ID *int64 `json:"id,omitempty"` +} + func (s *JSONRPCServer) HandleMessage(data []byte) error { - var request JSONRPCRequest - err := json.Unmarshal(data, &request) + // Data will either be a JSONRPCRequest or JSONRPCResponse object + // We need to determine which one it is + var raw JSONRPCMessage + err := json.Unmarshal(data, &raw) if err != nil { errorResponse := JSONRPCResponse{ JSONRPC: "2.0", - Error: map[string]interface{}{ - "code": -32700, - "message": "Parse error", + Error: &JSONRPCResponseError{ + Code: -32700, + Message: "Parse error", + }, + ID: 0, + } + return s.writeResponse(errorResponse) + } + + if raw.Method == nil && raw.ID != nil { + var resp JSONRPCResponse + if err := json.Unmarshal(data, &resp); err != nil { + fmt.Println("error unmarshalling response", err) + return err + } + + s.responseChannelsMutex.Lock() + responseChan, ok := s.responseChannels[*raw.ID] + s.responseChannelsMutex.Unlock() + if ok { + responseChan <- resp + } else { + log.Println("No response channel found for ID", resp.ID) + } + return nil + } + + var request JSONRPCRequest + err = json.Unmarshal(data, &request) + if err != nil { + errorResponse := JSONRPCResponse{ + JSONRPC: "2.0", + Error: &JSONRPCResponseError{ + Code: -32700, + Message: "Parse error", }, ID: 0, } @@ -41,9 +162,9 @@ func (s *JSONRPCServer) HandleMessage(data []byte) error { if !ok { errorResponse := JSONRPCResponse{ JSONRPC: "2.0", - Error: map[string]interface{}{ - "code": -32601, - "message": "Method not found", + Error: &JSONRPCResponseError{ + Code: -32601, + Message: "Method not found", }, ID: request.ID, } @@ -54,10 +175,10 @@ func (s *JSONRPCServer) HandleMessage(data []byte) error { if err != nil { errorResponse := JSONRPCResponse{ JSONRPC: "2.0", - Error: map[string]interface{}{ - "code": -32603, - "message": "Internal error", - "data": err.Error(), + Error: &JSONRPCResponseError{ + Code: -32603, + Message: "Internal error", + Data: err.Error(), }, ID: request.ID, } diff --git a/internal/jsonrpc/types.go b/internal/jsonrpc/types.go index 30f8a2c..ac4f956 100644 --- a/internal/jsonrpc/types.go +++ b/internal/jsonrpc/types.go @@ -8,10 +8,16 @@ type JSONRPCRequest struct { } type JSONRPCResponse struct { - JSONRPC string `json:"jsonrpc"` - Result interface{} `json:"result,omitempty"` - Error interface{} `json:"error,omitempty"` - ID interface{} `json:"id"` + JSONRPC string `json:"jsonrpc"` + Result interface{} `json:"result,omitempty"` + Error *JSONRPCResponseError `json:"error,omitempty"` + ID interface{} `json:"id"` +} + +type JSONRPCResponseError struct { + Code int `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data,omitempty"` } type JSONRPCEvent struct { diff --git a/internal/plugin/install.go b/internal/plugin/install.go index e5bf395..89319a8 100644 --- a/internal/plugin/install.go +++ b/internal/plugin/install.go @@ -3,7 +3,6 @@ package plugin import ( "fmt" "log" - "net" "os" "os/exec" "path" @@ -22,7 +21,7 @@ type PluginInstall struct { manifest *PluginManifest runningVersion *string processManager *ProcessManager - rpcListener net.Listener + rpcServer *PluginRpcServer } func (p *PluginInstall) GetManifest() (*PluginManifest, error) { @@ -54,13 +53,24 @@ func (p *PluginInstall) GetStatus() (*PluginStatus, error) { Enabled: p.Enabled, } - status.Status = "stopped" - if p.processManager != nil { - status.Status = "running" - if p.processManager.LastError != nil { - status.Status = "errored" - status.Error = p.processManager.LastError.Error() + if p.rpcServer != nil && p.rpcServer.status.Status != "disconnected" { + log.Printf("Status from RPC: %v", p.rpcServer.status) + status.Status = p.rpcServer.status.Status + status.Message = p.rpcServer.status.Message + + if status.Status == "error" { + status.Message = p.rpcServer.status.Message } + } else { + status.Status = "stopped" + if p.processManager != nil { + status.Status = "running" + if p.processManager.LastError != nil { + status.Status = "errored" + status.Message = p.processManager.LastError.Error() + } + } + log.Printf("Status from process manager: %v", status.Status) } return &status, nil @@ -94,8 +104,10 @@ func (p *PluginInstall) ReconcileSubprocess() error { p.processManager.Disable() p.processManager = nil p.runningVersion = nil - p.rpcListener.Close() - p.rpcListener = nil + err = p.rpcServer.Stop() + if err != nil { + return fmt.Errorf("failed to stop rpc server: %v", err) + } } if versionShouldBeRunning == "" { @@ -103,25 +115,22 @@ func (p *PluginInstall) ReconcileSubprocess() error { } workingDir := path.Join(pluginsFolder, "working_dirs", p.manifest.Name) - socketPath := path.Join(workingDir, "plugin.sock") - - os.Remove(socketPath) err = os.MkdirAll(workingDir, 0755) if err != nil { return fmt.Errorf("failed to create working directory: %v", err) } - listener, err := net.Listen("unix", socketPath) + p.rpcServer = NewPluginRpcServer(p, workingDir) + err = p.rpcServer.Start() if err != nil { - return fmt.Errorf("failed to listen on socket: %v", err) + return fmt.Errorf("failed to start rpc server: %v", err) } - p.rpcListener = listener p.processManager = NewProcessManager(func() *exec.Cmd { cmd := exec.Command(manifest.BinaryPath) cmd.Dir = p.GetExtractedFolder() cmd.Env = append(cmd.Env, - "JETKVM_PLUGIN_SOCK="+socketPath, + "JETKVM_PLUGIN_SOCK="+p.rpcServer.SocketPath(), "JETKVM_PLUGIN_WORKING_DIR="+workingDir, ) cmd.Stdout = os.Stdout @@ -147,8 +156,7 @@ func (p *PluginInstall) Shutdown() { p.runningVersion = nil } - if p.rpcListener != nil { - p.rpcListener.Close() - p.rpcListener = nil + if p.rpcServer != nil { + p.rpcServer.Stop() } } diff --git a/internal/plugin/rpc.go b/internal/plugin/rpc.go new file mode 100644 index 0000000..9606dd6 --- /dev/null +++ b/internal/plugin/rpc.go @@ -0,0 +1,166 @@ +package plugin + +import ( + "context" + "errors" + "fmt" + "kvm/internal/jsonrpc" + "log" + "net" + "os" + "path" + "time" +) + +type PluginRpcStatus struct { + Status string `json:"status"` + Message string `json:"message,omitempty"` +} + +var ( + PluginRpcStatusDisconnected = PluginRpcStatus{"disconnected", ""} + PluginRpcStatusLoading = PluginRpcStatus{"loading", ""} + PluginRpcStatusPendingConfiguration = PluginRpcStatus{"pending-configuration", ""} + PluginRpcStatusRunning = PluginRpcStatus{"running", ""} + PluginRpcStatusError = PluginRpcStatus{"error", ""} +) + +type PluginRpcServer struct { + install *PluginInstall + workingDir string + + listener net.Listener + status PluginRpcStatus +} + +func NewPluginRpcServer(install *PluginInstall, workingDir string) *PluginRpcServer { + return &PluginRpcServer{ + install: install, + workingDir: workingDir, + status: PluginRpcStatusDisconnected, + } +} + +func (s *PluginRpcServer) Start() error { + socketPath := s.SocketPath() + _ = os.Remove(socketPath) + listener, err := net.Listen("unix", socketPath) + if err != nil { + return fmt.Errorf("failed to listen on socket: %v", err) + } + s.listener = listener + + s.status = PluginRpcStatusDisconnected + go func() { + for { + conn, err := listener.Accept() + if err != nil { + // If the error indicates the listener is closed, break out + if opErr, ok := err.(*net.OpError); ok && opErr.Err.Error() == "use of closed network connection" { + log.Println("Listener closed, exiting accept loop.") + return + } + + log.Printf("Failed to accept connection: %v", err) + continue + } + log.Printf("Accepted plugin rpc connection from %v", conn.RemoteAddr()) + + go s.handleConnection(conn) + } + }() + + return nil +} + +func (s *PluginRpcServer) Stop() error { + if s.listener != nil { + s.status = PluginRpcStatusDisconnected + return s.listener.Close() + } + return nil +} + +func (s *PluginRpcServer) Status() PluginRpcStatus { + return s.status +} + +func (s *PluginRpcServer) SocketPath() string { + return path.Join(s.workingDir, "plugin.sock") +} + +func (s *PluginRpcServer) handleConnection(conn net.Conn) { + rpcserver := jsonrpc.NewJSONRPCServer(conn, map[string]*jsonrpc.RPCHandler{}) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go s.handleRpcStatus(ctx, rpcserver) + + // Read from the conn and write into rpcserver.HandleMessage + buf := make([]byte, 65*1024) + for { + // TODO: if read 65k bytes, then likey there is more data to read... figure out how to handle this + n, err := conn.Read(buf) + if err != nil { + log.Printf("Failed to read message: %v", err) + if errors.Is(err, net.ErrClosed) { + s.status = PluginRpcStatusDisconnected + } else { + s.status = PluginRpcStatusError + s.status.Message = fmt.Errorf("failed to read message: %v", err).Error() + } + break + } + + err = rpcserver.HandleMessage(buf[:n]) + if err != nil { + log.Printf("Failed to handle message: %v", err) + s.status = PluginRpcStatusError + s.status.Message = fmt.Errorf("failed to handle message: %v", err).Error() + continue + } + } +} + +func (s *PluginRpcServer) handleRpcStatus(ctx context.Context, rpcserver *jsonrpc.JSONRPCServer) { + // log.Printf("Plugin rpc server started. Getting supported methods...") + // supportedMethodsResponse, err := rpcserver.Request("getPluginSupportedMethods", map[string]interface{}{}) + // if err != nil { + // log.Printf("Failed to get supported methods: %v", err) + // s.status = PluginRpcStatusError + // s.status.Message = fmt.Errorf("error getting supported methods: %v", err).Error() + // } + + // if supportedMethodsResponse.Error != nil { + // log.Printf("Failed to get supported methods: %v", supportedMethodsResponse.Error) + // s.status = PluginRpcStatusError + // s.status.Message = fmt.Errorf("error getting supported methods: %v", supportedMethodsResponse.Error).Error() + // } + + // log.Printf("Plugin has supported methods: %v", supportedMethodsResponse.Result) + + ticker := time.NewTicker(1 * time.Second) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + var statusResponse PluginRpcStatus + err := rpcserver.Request("getPluginStatus", map[string]interface{}{}, &statusResponse) + if err != nil { + log.Printf("Failed to get status: %v", err) + if err, ok := err.Data.(error); ok && errors.Is(err, net.ErrClosed) { + s.status = PluginRpcStatusDisconnected + break + } + + s.status = PluginRpcStatusError + s.status.Message = fmt.Errorf("error getting status: %v", err).Error() + continue + } + + s.status = statusResponse + } + } +} diff --git a/internal/plugin/type.go b/internal/plugin/type.go index 0e6988c..de1001a 100644 --- a/internal/plugin/type.go +++ b/internal/plugin/type.go @@ -14,5 +14,5 @@ type PluginStatus struct { PluginManifest Enabled bool `json:"enabled"` Status string `json:"status"` - Error string `json:"error,omitempty"` + Message string `json:"message,omitempty"` } diff --git a/ui/src/components/PluginConfigureDialog.tsx b/ui/src/components/PluginConfigureDialog.tsx index c5808f1..c5ac794 100644 --- a/ui/src/components/PluginConfigureDialog.tsx +++ b/ui/src/components/PluginConfigureDialog.tsx @@ -1,13 +1,15 @@ import { PluginStatus } from "@/hooks/stores"; import Modal from "@components/Modal"; import AutoHeight from "@components/AutoHeight"; -import { GridCard } from "@components/Card"; +import Card, { GridCard } from "@components/Card"; import LogoBlueIcon from "@/assets/logo-blue.svg"; import LogoWhiteIcon from "@/assets/logo-white.svg"; import { ViewHeader } from "./MountMediaDialog"; import { Button } from "./Button"; import { useJsonRpc } from "@/hooks/useJsonRpc"; import { useCallback, useEffect, useState } from "react"; +import { PluginStatusIcon } from "./PluginStatusIcon"; +import { cx } from "@/cva.config"; export default function PluginConfigureModal({ plugin, @@ -42,7 +44,7 @@ function Dialog({ plugin, setOpen }: { plugin: PluginStatus | null, setOpen: (op return; } } - + setLoading(true); send("pluginUpdateConfig", { name: plugin.name, enabled }, resp => { if ("error" in resp) { @@ -77,16 +79,28 @@ function Dialog({ plugin, setOpen }: { plugin: PluginStatus | null, setOpen: (op
- JetKVM Logo - JetKVM Logo +
+
+ JetKVM Logo + JetKVM Logo +
+
+ {plugin && <> +

+ {plugin.status} +

+ + } +
+
@@ -104,6 +118,8 @@ function Dialog({ plugin, setOpen }: { plugin: PluginStatus | null, setOpen: (op
+
+
{error &&

{error}

} + {plugin?.message && ( + <> +

+ Plugin message: +

+ + {plugin.message} + + + )}

TODO: Plugin configuration goes here

+
+
-
- +
+ ) } diff --git a/ui/src/components/PluginList.tsx b/ui/src/components/PluginList.tsx index 96af34a..9be368c 100644 --- a/ui/src/components/PluginList.tsx +++ b/ui/src/components/PluginList.tsx @@ -2,24 +2,9 @@ import { useJsonRpc } from "@/hooks/useJsonRpc"; import { Button } from "@components/Button"; import { PluginStatus, usePluginStore, useUiStore } from "@/hooks/stores"; import { useCallback, useEffect, useState } from "react"; -import { cx } from "@/cva.config"; import UploadPluginModal from "@components/UploadPluginDialog"; import PluginConfigureModal from "@components/PluginConfigureDialog"; - -function PluginListStatusIcon({ plugin }: { plugin: PluginStatus }) { - let classNames = "bg-slate-500 border-slate-600"; - if (plugin.enabled && plugin.status === "running") { - classNames = "bg-green-500 border-green-600"; - } else if (plugin.enabled && plugin.status === "errored") { - classNames = "bg-red-500 border-red-600"; - } - - return ( -
-
-
- ) -} +import { PluginStatusIcon } from "./PluginStatusIcon"; export default function PluginList() { const [send] = useJsonRpc(); @@ -45,20 +30,21 @@ export default function PluginList() { setError(resp.error.message); return } + console.log('pluginList', resp.result); setPlugins(resp.result as PluginStatus[]); }); }, [send, setPlugins]) useEffect(() => { // Only update plugins when the sidebar view is the settings view - if (sidebarView !== "system") return; + if (sidebarView !== "system" && !pluginConfigureModalOpen) return; updatePlugins(); const updateInterval = setInterval(() => { updatePlugins(); }, 10_000); return () => clearInterval(updateInterval); - }, [updatePlugins, sidebarView]) + }, [updatePlugins, sidebarView, pluginConfigureModalOpen]) return ( <> @@ -68,7 +54,7 @@ export default function PluginList() { {plugins.length === 0 &&
  • No plugins installed
  • } {plugins.map(plugin => (
  • - +

    {plugin.name}

    @@ -99,7 +85,7 @@ export default function PluginList() { updatePlugins(); } }} - plugin={configuringPlugin} + plugin={plugins.find(p => p.name == configuringPlugin?.name) ?? null} />

    diff --git a/ui/src/components/PluginStatusIcon.tsx b/ui/src/components/PluginStatusIcon.tsx new file mode 100644 index 0000000..721ec43 --- /dev/null +++ b/ui/src/components/PluginStatusIcon.tsx @@ -0,0 +1,19 @@ +import { cx } from "@/cva.config"; +import { PluginStatus } from "@/hooks/stores"; + +export function PluginStatusIcon({ plugin }: { plugin: PluginStatus; }) { + let classNames = "bg-slate-500 border-slate-600"; + if (plugin.enabled && plugin.status === "running") { + classNames = "bg-green-500 border-green-600"; + } else if (plugin.enabled && plugin.status === "pending-configuration") { + classNames = "bg-yellow-500 border-yellow-600"; + } else if (plugin.enabled && plugin.status === "errored") { + classNames = "bg-red-500 border-red-600"; + } + + return ( +
    +
    +
    + ); +} diff --git a/ui/src/hooks/stores.ts b/ui/src/hooks/stores.ts index cdb132a..62fc3a8 100644 --- a/ui/src/hooks/stores.ts +++ b/ui/src/hooks/stores.ts @@ -539,8 +539,8 @@ export interface PluginManifest { export interface PluginStatus extends PluginManifest { enabled: boolean; - status: "stopped" | "running" | "errored"; - error?: string; + status: "stopped" | "running" | "loading" | "pending-configuration" | "errored"; + message?: string; } interface PluginState {