diff --git a/internal/plugin/database.go b/internal/plugin/database.go index f97e748..fd4ef42 100644 --- a/internal/plugin/database.go +++ b/internal/plugin/database.go @@ -11,23 +11,17 @@ const databaseFile = pluginsFolder + "/plugins.json" type PluginDatabase struct { // Map with the plugin name as the key - Plugins map[string]PluginInstall `json:"plugins"` + Plugins map[string]*PluginInstall `json:"plugins"` saveMutex sync.Mutex } var pluginDatabase = PluginDatabase{} -func init() { - if err := pluginDatabase.Load(); err != nil { - fmt.Printf("failed to load plugin database: %v\n", err) - } -} - func (d *PluginDatabase) Load() error { file, err := os.Open(databaseFile) if os.IsNotExist(err) { - d.Plugins = make(map[string]PluginInstall) + d.Plugins = make(map[string]*PluginInstall) return nil } if err != nil { diff --git a/internal/plugin/install.go b/internal/plugin/install.go index 01d8f25..3e61aae 100644 --- a/internal/plugin/install.go +++ b/internal/plugin/install.go @@ -1,6 +1,14 @@ package plugin -import "fmt" +import ( + "fmt" + "log" + "net" + "os" + "os/exec" + "path" + "syscall" +) type PluginInstall struct { Enabled bool `json:"enabled"` @@ -11,7 +19,10 @@ type PluginInstall struct { // Map of a plugin version to the extracted directory ExtractedVersions map[string]string `json:"extracted_versions"` - manifest *PluginManifest + manifest *PluginManifest + runningVersion *string + processManager *ProcessManager + rpcListener *net.Listener } func (p *PluginInstall) GetManifest() (*PluginManifest, error) { @@ -38,14 +49,106 @@ func (p *PluginInstall) GetStatus() (*PluginStatus, error) { return nil, fmt.Errorf("failed to get plugin manifest: %v", err) } - status := "stopped" - if p.Enabled { - status = "running" - } - - return &PluginStatus{ + status := PluginStatus{ PluginManifest: *manifest, Enabled: p.Enabled, - Status: status, - }, nil + } + + status.Status = "stopped" + if p.processManager != nil { + status.Status = "running" + if p.processManager.LastError != nil { + status.Status = "errored" + status.Error = p.processManager.LastError.Error() + } + } + + return &status, nil +} + +func (p *PluginInstall) ReconcileSubprocess() error { + manifest, err := p.GetManifest() + if err != nil { + return fmt.Errorf("failed to get plugin manifest: %v", err) + } + + versionRunning := "" + if p.runningVersion != nil { + versionRunning = *p.runningVersion + } + + versionShouldBeRunning := p.Version + if !p.Enabled { + versionShouldBeRunning = "" + } + + log.Printf("Reconciling plugin %s running %v, should be running %v", manifest.Name, versionRunning, versionShouldBeRunning) + + if versionRunning == versionShouldBeRunning { + log.Printf("Plugin %s is already running version %s", manifest.Name, versionRunning) + return nil + } + + if p.processManager != nil { + log.Printf("Stopping plugin %s running version %s", manifest.Name, versionRunning) + p.processManager.Disable() + p.processManager = nil + p.runningVersion = nil + (*p.rpcListener).Close() + p.rpcListener = nil + } + + if versionShouldBeRunning == "" { + return nil + } + + workingDir := path.Join(pluginsFolder, "working_dirs", p.manifest.Name) + socketPath := path.Join(workingDir, "plugin.sock") + + os.Remove(socketPath) + err = os.MkdirAll(workingDir, 0755) + if err != nil { + return fmt.Errorf("failed to create working directory: %v", err) + } + + listener, err := net.Listen("unix", socketPath) + if err != nil { + return fmt.Errorf("failed to listen on socket: %v", err) + } + p.rpcListener = &listener + + p.processManager = NewProcessManager(func() *exec.Cmd { + cmd := exec.Command(manifest.BinaryPath) + cmd.Dir = p.GetExtractedFolder() + cmd.Env = append(cmd.Env, + "JETKVM_PLUGIN_SOCK="+socketPath, + "JETKVM_PLUGIN_WORKING_DIR="+workingDir, + ) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + // Ensure that the process is killed when the parent dies + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + Pdeathsig: syscall.SIGKILL, + } + return cmd + }) + p.processManager.StartMonitor() + p.processManager.Enable() + p.runningVersion = &p.Version + log.Printf("Started plugin %s version %s", manifest.Name, p.Version) + return nil +} + +func (p *PluginInstall) Shutdown() { + if p.processManager != nil { + p.processManager.Disable() + p.processManager = nil + p.runningVersion = nil + } + + if p.rpcListener != nil { + (*p.rpcListener).Close() + p.rpcListener = nil + } } diff --git a/internal/plugin/plugin.go b/internal/plugin/plugin.go index a161e68..a2cccc8 100644 --- a/internal/plugin/plugin.go +++ b/internal/plugin/plugin.go @@ -15,6 +15,26 @@ const pluginsUploadFolder = pluginsFolder + "/uploads" func init() { _ = os.MkdirAll(pluginsUploadFolder, 0755) + + if err := pluginDatabase.Load(); err != nil { + fmt.Printf("failed to load plugin database: %v\n", err) + } +} + +// Starts all plugins that need to be started +func ReconcilePlugins() { + for _, install := range pluginDatabase.Plugins { + err := install.ReconcileSubprocess() + if err != nil { + fmt.Printf("failed to reconcile subprocess for plugin: %v\n", err) + } + } +} + +func GracefullyShutdownPlugins() { + for _, install := range pluginDatabase.Plugins { + install.Shutdown() + } } func RpcPluginStartUpload(filename string, size int64) (*storage.StorageFileUpload, error) { @@ -77,7 +97,7 @@ func RpcPluginExtract(filename string) (*PluginManifest, error) { // Get existing PluginInstall install, ok := pluginDatabase.Plugins[manifest.Name] if !ok { - install = PluginInstall{ + install = &PluginInstall{ Enabled: false, Version: manifest.Version, ExtractedVersions: make(map[string]string), @@ -125,7 +145,11 @@ func RpcPluginInstall(name string, version string) error { if err := pluginDatabase.Save(); err != nil { return fmt.Errorf("failed to save plugin database: %v", err) } - // TODO: start the plugin + + err := pluginInstall.ReconcileSubprocess() + if err != nil { + return fmt.Errorf("failed to start plugin %s: %v", name, err) + } // TODO: Determine if the old version should be removed @@ -157,6 +181,11 @@ func RpcUpdateConfig(name string, enabled bool) (*PluginStatus, error) { return nil, fmt.Errorf("failed to save plugin database: %v", err) } + err := pluginInstall.ReconcileSubprocess() + if err != nil { + return nil, fmt.Errorf("failed to stop plugin %s: %v", name, err) + } + status, err := pluginInstall.GetStatus() if err != nil { return nil, fmt.Errorf("failed to get plugin status for %s: %v", name, err) diff --git a/internal/plugin/process_manager.go b/internal/plugin/process_manager.go new file mode 100644 index 0000000..36e4a26 --- /dev/null +++ b/internal/plugin/process_manager.go @@ -0,0 +1,113 @@ +package plugin + +import ( + "fmt" + "log" + "os/exec" + "syscall" + "time" +) + +// TODO: this can probably be defaulted to this, but overwritten on a per-plugin basis +const GRACEFUL_SHUTDOWN_DELAY = 30 * time.Second + +type ProcessManager struct { + cmdGen func() *exec.Cmd + cmd *exec.Cmd + enabled bool + backoff time.Duration + shutdown chan struct{} + restartCh chan struct{} + LastError error +} + +func NewProcessManager(commandGenerator func() *exec.Cmd) *ProcessManager { + return &ProcessManager{ + cmdGen: commandGenerator, + enabled: true, + backoff: time.Second, + shutdown: make(chan struct{}), + restartCh: make(chan struct{}, 1), + } +} + +func (pm *ProcessManager) StartMonitor() { + go pm.monitor() +} + +func (pm *ProcessManager) monitor() { + for { + select { + case <-pm.shutdown: + pm.terminate() + return + case <-pm.restartCh: + if pm.enabled { + go pm.runProcess() + } + } + } +} + +func (pm *ProcessManager) runProcess() { + pm.LastError = nil + pm.cmd = pm.cmdGen() + log.Printf("Starting process: %v", pm.cmd) + err := pm.cmd.Start() + if err != nil { + log.Printf("Failed to start process: %v", err) + pm.LastError = fmt.Errorf("failed to start process: %w", err) + pm.scheduleRestart() + return + } + + err = pm.cmd.Wait() + if err != nil { + log.Printf("Process exited: %v", err) + pm.LastError = fmt.Errorf("process exited with error: %w", err) + pm.scheduleRestart() + } +} + +func (pm *ProcessManager) scheduleRestart() { + if pm.enabled { + log.Printf("Restarting process in %v...", pm.backoff) + time.Sleep(pm.backoff) + pm.backoff *= 2 // Exponential backoff + pm.restartCh <- struct{}{} + } +} + +func (pm *ProcessManager) terminate() { + if pm.cmd.Process != nil { + log.Printf("Sending SIGTERM...") + pm.cmd.Process.Signal(syscall.SIGTERM) + select { + case <-time.After(GRACEFUL_SHUTDOWN_DELAY): + log.Printf("Forcing process termination...") + pm.cmd.Process.Kill() + case <-pm.waitForExit(): + log.Printf("Process exited gracefully.") + } + } +} + +func (pm *ProcessManager) waitForExit() <-chan struct{} { + done := make(chan struct{}) + go func() { + pm.cmd.Wait() + close(done) + }() + return done +} + +func (pm *ProcessManager) Enable() { + pm.enabled = true + pm.restartCh <- struct{}{} +} + +func (pm *ProcessManager) Disable() { + pm.enabled = false + close(pm.shutdown) + pm.cmd.Wait() +} diff --git a/internal/plugin/type.go b/internal/plugin/type.go index 01d85a5..0e6988c 100644 --- a/internal/plugin/type.go +++ b/internal/plugin/type.go @@ -14,4 +14,5 @@ type PluginStatus struct { PluginManifest Enabled bool `json:"enabled"` Status string `json:"status"` + Error string `json:"error,omitempty"` } diff --git a/main.go b/main.go index f94b24e..f4021c9 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package kvm import ( "context" + "kvm/internal/plugin" "log" "net/http" "os" @@ -67,10 +68,13 @@ func Main() { //go RunFuseServer() go RunWebServer() go RunWebsocketClient() + go plugin.ReconcilePlugins() sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) <-sigs log.Println("JetKVM Shutting Down") + + plugin.GracefullyShutdownPlugins() //if fuseServer != nil { // err := setMassStorageImage(" ") // if err != nil { diff --git a/ui/src/components/PluginList.tsx b/ui/src/components/PluginList.tsx index d2397f1..49c54b2 100644 --- a/ui/src/components/PluginList.tsx +++ b/ui/src/components/PluginList.tsx @@ -39,6 +39,7 @@ export default function PluginList() { const sidebarView = useUiStore(state => state.sidebarView); const updatePlugins = useCallback(() => { + setError(null); send("pluginList", {}, resp => { if ("error" in resp) { setError(resp.error.message);