wip: Plugin RPC with status reporting to the UI

This commit is contained in:
tutman96 2025-01-05 23:38:54 +00:00
parent 0b3cd59e36
commit e61decfb33
9 changed files with 412 additions and 75 deletions

View File

@ -5,31 +5,152 @@ import (
"errors"
"fmt"
"io"
"log"
"reflect"
"sync"
"sync/atomic"
"time"
)
type JSONRPCServer struct {
writer io.Writer
handlers map[string]*RPCHandler
nextId atomic.Int64
responseChannelsMutex sync.Mutex
responseChannels map[int64]chan JSONRPCResponse
}
func NewJSONRPCServer(writer io.Writer, handlers map[string]*RPCHandler) *JSONRPCServer {
return &JSONRPCServer{
writer: writer,
handlers: handlers,
writer: writer,
handlers: handlers,
responseChannels: make(map[int64]chan JSONRPCResponse),
nextId: atomic.Int64{},
}
}
func (s *JSONRPCServer) Request(method string, params map[string]interface{}, result interface{}) *JSONRPCResponseError {
id := s.nextId.Add(1)
request := JSONRPCRequest{
JSONRPC: "2.0",
Method: method,
Params: params,
ID: id,
}
requestBytes, err := json.Marshal(request)
if err != nil {
return &JSONRPCResponseError{
Code: -32700,
Message: "Parse error",
Data: err,
}
}
// log.Printf("Sending RPC request: Method=%s, Params=%v, ID=%d", method, params, id)
responseChan := make(chan JSONRPCResponse, 1)
s.responseChannelsMutex.Lock()
s.responseChannels[id] = responseChan
s.responseChannelsMutex.Unlock()
defer func() {
s.responseChannelsMutex.Lock()
delete(s.responseChannels, id)
s.responseChannelsMutex.Unlock()
}()
_, err = s.writer.Write(requestBytes)
if err != nil {
return &JSONRPCResponseError{
Code: -32603,
Message: "Internal error",
Data: err,
}
}
timeout := time.After(5 * time.Second)
select {
case response := <-responseChan:
if response.Error != nil {
return response.Error
}
rawResult, err := json.Marshal(response.Result)
if err != nil {
return &JSONRPCResponseError{
Code: -32603,
Message: "Internal error",
Data: err,
}
}
if err := json.Unmarshal(rawResult, result); err != nil {
return &JSONRPCResponseError{
Code: -32603,
Message: "Internal error",
Data: err,
}
}
return nil
case <-timeout:
return &JSONRPCResponseError{
Code: -32603,
Message: "Internal error",
Data: "timeout waiting for response",
}
}
}
type JSONRPCMessage struct {
Method *string `json:"method,omitempty"`
ID *int64 `json:"id,omitempty"`
}
func (s *JSONRPCServer) HandleMessage(data []byte) error {
var request JSONRPCRequest
err := json.Unmarshal(data, &request)
// Data will either be a JSONRPCRequest or JSONRPCResponse object
// We need to determine which one it is
var raw JSONRPCMessage
err := json.Unmarshal(data, &raw)
if err != nil {
errorResponse := JSONRPCResponse{
JSONRPC: "2.0",
Error: map[string]interface{}{
"code": -32700,
"message": "Parse error",
Error: &JSONRPCResponseError{
Code: -32700,
Message: "Parse error",
},
ID: 0,
}
return s.writeResponse(errorResponse)
}
if raw.Method == nil && raw.ID != nil {
var resp JSONRPCResponse
if err := json.Unmarshal(data, &resp); err != nil {
fmt.Println("error unmarshalling response", err)
return err
}
s.responseChannelsMutex.Lock()
responseChan, ok := s.responseChannels[*raw.ID]
s.responseChannelsMutex.Unlock()
if ok {
responseChan <- resp
} else {
log.Println("No response channel found for ID", resp.ID)
}
return nil
}
var request JSONRPCRequest
err = json.Unmarshal(data, &request)
if err != nil {
errorResponse := JSONRPCResponse{
JSONRPC: "2.0",
Error: &JSONRPCResponseError{
Code: -32700,
Message: "Parse error",
},
ID: 0,
}
@ -41,9 +162,9 @@ func (s *JSONRPCServer) HandleMessage(data []byte) error {
if !ok {
errorResponse := JSONRPCResponse{
JSONRPC: "2.0",
Error: map[string]interface{}{
"code": -32601,
"message": "Method not found",
Error: &JSONRPCResponseError{
Code: -32601,
Message: "Method not found",
},
ID: request.ID,
}
@ -54,10 +175,10 @@ func (s *JSONRPCServer) HandleMessage(data []byte) error {
if err != nil {
errorResponse := JSONRPCResponse{
JSONRPC: "2.0",
Error: map[string]interface{}{
"code": -32603,
"message": "Internal error",
"data": err.Error(),
Error: &JSONRPCResponseError{
Code: -32603,
Message: "Internal error",
Data: err.Error(),
},
ID: request.ID,
}

View File

@ -8,10 +8,16 @@ type JSONRPCRequest struct {
}
type JSONRPCResponse struct {
JSONRPC string `json:"jsonrpc"`
Result interface{} `json:"result,omitempty"`
Error interface{} `json:"error,omitempty"`
ID interface{} `json:"id"`
JSONRPC string `json:"jsonrpc"`
Result interface{} `json:"result,omitempty"`
Error *JSONRPCResponseError `json:"error,omitempty"`
ID interface{} `json:"id"`
}
type JSONRPCResponseError struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
type JSONRPCEvent struct {

View File

@ -3,7 +3,6 @@ package plugin
import (
"fmt"
"log"
"net"
"os"
"os/exec"
"path"
@ -22,7 +21,7 @@ type PluginInstall struct {
manifest *PluginManifest
runningVersion *string
processManager *ProcessManager
rpcListener net.Listener
rpcServer *PluginRpcServer
}
func (p *PluginInstall) GetManifest() (*PluginManifest, error) {
@ -54,13 +53,24 @@ func (p *PluginInstall) GetStatus() (*PluginStatus, error) {
Enabled: p.Enabled,
}
status.Status = "stopped"
if p.processManager != nil {
status.Status = "running"
if p.processManager.LastError != nil {
status.Status = "errored"
status.Error = p.processManager.LastError.Error()
if p.rpcServer != nil && p.rpcServer.status.Status != "disconnected" {
log.Printf("Status from RPC: %v", p.rpcServer.status)
status.Status = p.rpcServer.status.Status
status.Message = p.rpcServer.status.Message
if status.Status == "error" {
status.Message = p.rpcServer.status.Message
}
} else {
status.Status = "stopped"
if p.processManager != nil {
status.Status = "running"
if p.processManager.LastError != nil {
status.Status = "errored"
status.Message = p.processManager.LastError.Error()
}
}
log.Printf("Status from process manager: %v", status.Status)
}
return &status, nil
@ -94,8 +104,10 @@ func (p *PluginInstall) ReconcileSubprocess() error {
p.processManager.Disable()
p.processManager = nil
p.runningVersion = nil
p.rpcListener.Close()
p.rpcListener = nil
err = p.rpcServer.Stop()
if err != nil {
return fmt.Errorf("failed to stop rpc server: %v", err)
}
}
if versionShouldBeRunning == "" {
@ -103,25 +115,22 @@ func (p *PluginInstall) ReconcileSubprocess() error {
}
workingDir := path.Join(pluginsFolder, "working_dirs", p.manifest.Name)
socketPath := path.Join(workingDir, "plugin.sock")
os.Remove(socketPath)
err = os.MkdirAll(workingDir, 0755)
if err != nil {
return fmt.Errorf("failed to create working directory: %v", err)
}
listener, err := net.Listen("unix", socketPath)
p.rpcServer = NewPluginRpcServer(p, workingDir)
err = p.rpcServer.Start()
if err != nil {
return fmt.Errorf("failed to listen on socket: %v", err)
return fmt.Errorf("failed to start rpc server: %v", err)
}
p.rpcListener = listener
p.processManager = NewProcessManager(func() *exec.Cmd {
cmd := exec.Command(manifest.BinaryPath)
cmd.Dir = p.GetExtractedFolder()
cmd.Env = append(cmd.Env,
"JETKVM_PLUGIN_SOCK="+socketPath,
"JETKVM_PLUGIN_SOCK="+p.rpcServer.SocketPath(),
"JETKVM_PLUGIN_WORKING_DIR="+workingDir,
)
cmd.Stdout = os.Stdout
@ -147,8 +156,7 @@ func (p *PluginInstall) Shutdown() {
p.runningVersion = nil
}
if p.rpcListener != nil {
p.rpcListener.Close()
p.rpcListener = nil
if p.rpcServer != nil {
p.rpcServer.Stop()
}
}

166
internal/plugin/rpc.go Normal file
View File

@ -0,0 +1,166 @@
package plugin
import (
"context"
"errors"
"fmt"
"kvm/internal/jsonrpc"
"log"
"net"
"os"
"path"
"time"
)
type PluginRpcStatus struct {
Status string `json:"status"`
Message string `json:"message,omitempty"`
}
var (
PluginRpcStatusDisconnected = PluginRpcStatus{"disconnected", ""}
PluginRpcStatusLoading = PluginRpcStatus{"loading", ""}
PluginRpcStatusPendingConfiguration = PluginRpcStatus{"pending-configuration", ""}
PluginRpcStatusRunning = PluginRpcStatus{"running", ""}
PluginRpcStatusError = PluginRpcStatus{"error", ""}
)
type PluginRpcServer struct {
install *PluginInstall
workingDir string
listener net.Listener
status PluginRpcStatus
}
func NewPluginRpcServer(install *PluginInstall, workingDir string) *PluginRpcServer {
return &PluginRpcServer{
install: install,
workingDir: workingDir,
status: PluginRpcStatusDisconnected,
}
}
func (s *PluginRpcServer) Start() error {
socketPath := s.SocketPath()
_ = os.Remove(socketPath)
listener, err := net.Listen("unix", socketPath)
if err != nil {
return fmt.Errorf("failed to listen on socket: %v", err)
}
s.listener = listener
s.status = PluginRpcStatusDisconnected
go func() {
for {
conn, err := listener.Accept()
if err != nil {
// If the error indicates the listener is closed, break out
if opErr, ok := err.(*net.OpError); ok && opErr.Err.Error() == "use of closed network connection" {
log.Println("Listener closed, exiting accept loop.")
return
}
log.Printf("Failed to accept connection: %v", err)
continue
}
log.Printf("Accepted plugin rpc connection from %v", conn.RemoteAddr())
go s.handleConnection(conn)
}
}()
return nil
}
func (s *PluginRpcServer) Stop() error {
if s.listener != nil {
s.status = PluginRpcStatusDisconnected
return s.listener.Close()
}
return nil
}
func (s *PluginRpcServer) Status() PluginRpcStatus {
return s.status
}
func (s *PluginRpcServer) SocketPath() string {
return path.Join(s.workingDir, "plugin.sock")
}
func (s *PluginRpcServer) handleConnection(conn net.Conn) {
rpcserver := jsonrpc.NewJSONRPCServer(conn, map[string]*jsonrpc.RPCHandler{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go s.handleRpcStatus(ctx, rpcserver)
// Read from the conn and write into rpcserver.HandleMessage
buf := make([]byte, 65*1024)
for {
// TODO: if read 65k bytes, then likey there is more data to read... figure out how to handle this
n, err := conn.Read(buf)
if err != nil {
log.Printf("Failed to read message: %v", err)
if errors.Is(err, net.ErrClosed) {
s.status = PluginRpcStatusDisconnected
} else {
s.status = PluginRpcStatusError
s.status.Message = fmt.Errorf("failed to read message: %v", err).Error()
}
break
}
err = rpcserver.HandleMessage(buf[:n])
if err != nil {
log.Printf("Failed to handle message: %v", err)
s.status = PluginRpcStatusError
s.status.Message = fmt.Errorf("failed to handle message: %v", err).Error()
continue
}
}
}
func (s *PluginRpcServer) handleRpcStatus(ctx context.Context, rpcserver *jsonrpc.JSONRPCServer) {
// log.Printf("Plugin rpc server started. Getting supported methods...")
// supportedMethodsResponse, err := rpcserver.Request("getPluginSupportedMethods", map[string]interface{}{})
// if err != nil {
// log.Printf("Failed to get supported methods: %v", err)
// s.status = PluginRpcStatusError
// s.status.Message = fmt.Errorf("error getting supported methods: %v", err).Error()
// }
// if supportedMethodsResponse.Error != nil {
// log.Printf("Failed to get supported methods: %v", supportedMethodsResponse.Error)
// s.status = PluginRpcStatusError
// s.status.Message = fmt.Errorf("error getting supported methods: %v", supportedMethodsResponse.Error).Error()
// }
// log.Printf("Plugin has supported methods: %v", supportedMethodsResponse.Result)
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
var statusResponse PluginRpcStatus
err := rpcserver.Request("getPluginStatus", map[string]interface{}{}, &statusResponse)
if err != nil {
log.Printf("Failed to get status: %v", err)
if err, ok := err.Data.(error); ok && errors.Is(err, net.ErrClosed) {
s.status = PluginRpcStatusDisconnected
break
}
s.status = PluginRpcStatusError
s.status.Message = fmt.Errorf("error getting status: %v", err).Error()
continue
}
s.status = statusResponse
}
}
}

View File

@ -14,5 +14,5 @@ type PluginStatus struct {
PluginManifest
Enabled bool `json:"enabled"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
Message string `json:"message,omitempty"`
}

View File

@ -1,13 +1,15 @@
import { PluginStatus } from "@/hooks/stores";
import Modal from "@components/Modal";
import AutoHeight from "@components/AutoHeight";
import { GridCard } from "@components/Card";
import Card, { GridCard } from "@components/Card";
import LogoBlueIcon from "@/assets/logo-blue.svg";
import LogoWhiteIcon from "@/assets/logo-white.svg";
import { ViewHeader } from "./MountMediaDialog";
import { Button } from "./Button";
import { useJsonRpc } from "@/hooks/useJsonRpc";
import { useCallback, useEffect, useState } from "react";
import { PluginStatusIcon } from "./PluginStatusIcon";
import { cx } from "@/cva.config";
export default function PluginConfigureModal({
plugin,
@ -42,7 +44,7 @@ function Dialog({ plugin, setOpen }: { plugin: PluginStatus | null, setOpen: (op
return;
}
}
setLoading(true);
send("pluginUpdateConfig", { name: plugin.name, enabled }, resp => {
if ("error" in resp) {
@ -77,16 +79,28 @@ function Dialog({ plugin, setOpen }: { plugin: PluginStatus | null, setOpen: (op
<GridCard cardClassName="relative w-full text-left pointer-events-auto">
<div className="p-4">
<div className="flex flex-col items-start justify-start space-y-4 text-left">
<img
src={LogoBlueIcon}
alt="JetKVM Logo"
className="h-[24px] dark:hidden block"
/>
<img
src={LogoWhiteIcon}
alt="JetKVM Logo"
className="h-[24px] dark:block hidden dark:!mt-0"
/>
<div className="flex justify-between w-full">
<div>
<img
src={LogoBlueIcon}
alt="JetKVM Logo"
className="h-[24px] dark:hidden block"
/>
<img
src={LogoWhiteIcon}
alt="JetKVM Logo"
className="h-[24px] dark:block hidden dark:!mt-0"
/>
</div>
<div className="flex items-center">
{plugin && <>
<p className="text-sm text-gray-500 dark:text-gray-400 inline-block">
{plugin.status}
</p>
<PluginStatusIcon plugin={plugin} />
</>}
</div>
</div>
<div className="w-full space-y-4">
<div className="flex items-center justify-between w-full">
<ViewHeader title="Plugin Configuration" description={`Configure the ${plugin?.name} plugin`} />
@ -104,6 +118,8 @@ function Dialog({ plugin, setOpen }: { plugin: PluginStatus | null, setOpen: (op
</div>
</div>
<div className="h-[1px] w-full bg-slate-800/10 dark:bg-slate-300/20" />
<div
className="space-y-2 opacity-0 animate-fadeIn"
style={{
@ -111,10 +127,25 @@ function Dialog({ plugin, setOpen }: { plugin: PluginStatus | null, setOpen: (op
}}
>
{error && <p className="text-red-500 dark:text-red-400">{error}</p>}
{plugin?.message && (
<>
<p className="text-sm text-gray-500 dark:text-gray-400">
Plugin message:
</p>
<Card className={cx(
"text-gray-500 dark:text-gray-400 p-4 border",
plugin.status === "errored" && "border-red-200 bg-red-50 text-red-800 dark:text-red-400",
)}>
{plugin.message}
</Card>
</>
)}
<p className="text-sm text-gray-500 dark:text-gray-400 py-10">
TODO: Plugin configuration goes here
</p>
<div className="h-[1px] w-full bg-slate-800/10 dark:bg-slate-300/20" />
<div
className="flex items-end w-full opacity-0 animate-fadeIn"
style={{
@ -148,7 +179,7 @@ function Dialog({ plugin, setOpen }: { plugin: PluginStatus | null, setOpen: (op
</div>
</div>
</GridCard>
</div>
</AutoHeight>
</div >
</AutoHeight >
)
}

View File

@ -2,24 +2,9 @@ import { useJsonRpc } from "@/hooks/useJsonRpc";
import { Button } from "@components/Button";
import { PluginStatus, usePluginStore, useUiStore } from "@/hooks/stores";
import { useCallback, useEffect, useState } from "react";
import { cx } from "@/cva.config";
import UploadPluginModal from "@components/UploadPluginDialog";
import PluginConfigureModal from "@components/PluginConfigureDialog";
function PluginListStatusIcon({ plugin }: { plugin: PluginStatus }) {
let classNames = "bg-slate-500 border-slate-600";
if (plugin.enabled && plugin.status === "running") {
classNames = "bg-green-500 border-green-600";
} else if (plugin.enabled && plugin.status === "errored") {
classNames = "bg-red-500 border-red-600";
}
return (
<div className="flex items-center px-2">
<div className={cx("h-2 w-2 rounded-full border transition", classNames)} />
</div>
)
}
import { PluginStatusIcon } from "./PluginStatusIcon";
export default function PluginList() {
const [send] = useJsonRpc();
@ -45,20 +30,21 @@ export default function PluginList() {
setError(resp.error.message);
return
}
console.log('pluginList', resp.result);
setPlugins(resp.result as PluginStatus[]);
});
}, [send, setPlugins])
useEffect(() => {
// Only update plugins when the sidebar view is the settings view
if (sidebarView !== "system") return;
if (sidebarView !== "system" && !pluginConfigureModalOpen) return;
updatePlugins();
const updateInterval = setInterval(() => {
updatePlugins();
}, 10_000);
return () => clearInterval(updateInterval);
}, [updatePlugins, sidebarView])
}, [updatePlugins, sidebarView, pluginConfigureModalOpen])
return (
<>
@ -68,7 +54,7 @@ export default function PluginList() {
{plugins.length === 0 && <li className="text-sm text-center text-gray-500 dark:text-gray-400 py-5">No plugins installed</li>}
{plugins.map(plugin => (
<li key={plugin.name} className="flex items-center justify-between pa-2 py-2 gap-x-2">
<PluginListStatusIcon plugin={plugin} />
<PluginStatusIcon plugin={plugin} />
<div className="overflow-hidden flex grow flex-col">
<p className="text-base font-semibold text-black dark:text-white">{plugin.name}</p>
<p className="text-xs text-slate-700 dark:text-slate-300 line-clamp-1">
@ -99,7 +85,7 @@ export default function PluginList() {
updatePlugins();
}
}}
plugin={configuringPlugin}
plugin={plugins.find(p => p.name == configuringPlugin?.name) ?? null}
/>
<div className="flex items-center gap-x-2">

View File

@ -0,0 +1,19 @@
import { cx } from "@/cva.config";
import { PluginStatus } from "@/hooks/stores";
export function PluginStatusIcon({ plugin }: { plugin: PluginStatus; }) {
let classNames = "bg-slate-500 border-slate-600";
if (plugin.enabled && plugin.status === "running") {
classNames = "bg-green-500 border-green-600";
} else if (plugin.enabled && plugin.status === "pending-configuration") {
classNames = "bg-yellow-500 border-yellow-600";
} else if (plugin.enabled && plugin.status === "errored") {
classNames = "bg-red-500 border-red-600";
}
return (
<div className="flex items-center px-2" title={plugin.status}>
<div className={cx("h-2 w-2 rounded-full border transition", classNames)} />
</div>
);
}

View File

@ -539,8 +539,8 @@ export interface PluginManifest {
export interface PluginStatus extends PluginManifest {
enabled: boolean;
status: "stopped" | "running" | "errored";
error?: string;
status: "stopped" | "running" | "loading" | "pending-configuration" | "errored";
message?: string;
}
interface PluginState {