mirror of https://github.com/jetkvm/kvm.git
Add process_manager and subprocess spawning support
This commit is contained in:
parent
88f3e97011
commit
5de7bc7afa
|
@ -11,23 +11,17 @@ const databaseFile = pluginsFolder + "/plugins.json"
|
||||||
|
|
||||||
type PluginDatabase struct {
|
type PluginDatabase struct {
|
||||||
// Map with the plugin name as the key
|
// Map with the plugin name as the key
|
||||||
Plugins map[string]PluginInstall `json:"plugins"`
|
Plugins map[string]*PluginInstall `json:"plugins"`
|
||||||
|
|
||||||
saveMutex sync.Mutex
|
saveMutex sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
var pluginDatabase = PluginDatabase{}
|
var pluginDatabase = PluginDatabase{}
|
||||||
|
|
||||||
func init() {
|
|
||||||
if err := pluginDatabase.Load(); err != nil {
|
|
||||||
fmt.Printf("failed to load plugin database: %v\n", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *PluginDatabase) Load() error {
|
func (d *PluginDatabase) Load() error {
|
||||||
file, err := os.Open(databaseFile)
|
file, err := os.Open(databaseFile)
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
d.Plugins = make(map[string]PluginInstall)
|
d.Plugins = make(map[string]*PluginInstall)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -1,6 +1,14 @@
|
||||||
package plugin
|
package plugin
|
||||||
|
|
||||||
import "fmt"
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path"
|
||||||
|
"syscall"
|
||||||
|
)
|
||||||
|
|
||||||
type PluginInstall struct {
|
type PluginInstall struct {
|
||||||
Enabled bool `json:"enabled"`
|
Enabled bool `json:"enabled"`
|
||||||
|
@ -11,7 +19,10 @@ type PluginInstall struct {
|
||||||
// Map of a plugin version to the extracted directory
|
// Map of a plugin version to the extracted directory
|
||||||
ExtractedVersions map[string]string `json:"extracted_versions"`
|
ExtractedVersions map[string]string `json:"extracted_versions"`
|
||||||
|
|
||||||
manifest *PluginManifest
|
manifest *PluginManifest
|
||||||
|
runningVersion *string
|
||||||
|
processManager *ProcessManager
|
||||||
|
rpcListener *net.Listener
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PluginInstall) GetManifest() (*PluginManifest, error) {
|
func (p *PluginInstall) GetManifest() (*PluginManifest, error) {
|
||||||
|
@ -38,14 +49,106 @@ func (p *PluginInstall) GetStatus() (*PluginStatus, error) {
|
||||||
return nil, fmt.Errorf("failed to get plugin manifest: %v", err)
|
return nil, fmt.Errorf("failed to get plugin manifest: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
status := "stopped"
|
status := PluginStatus{
|
||||||
if p.Enabled {
|
|
||||||
status = "running"
|
|
||||||
}
|
|
||||||
|
|
||||||
return &PluginStatus{
|
|
||||||
PluginManifest: *manifest,
|
PluginManifest: *manifest,
|
||||||
Enabled: p.Enabled,
|
Enabled: p.Enabled,
|
||||||
Status: status,
|
}
|
||||||
}, nil
|
|
||||||
|
status.Status = "stopped"
|
||||||
|
if p.processManager != nil {
|
||||||
|
status.Status = "running"
|
||||||
|
if p.processManager.LastError != nil {
|
||||||
|
status.Status = "errored"
|
||||||
|
status.Error = p.processManager.LastError.Error()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &status, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PluginInstall) ReconcileSubprocess() error {
|
||||||
|
manifest, err := p.GetManifest()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get plugin manifest: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
versionRunning := ""
|
||||||
|
if p.runningVersion != nil {
|
||||||
|
versionRunning = *p.runningVersion
|
||||||
|
}
|
||||||
|
|
||||||
|
versionShouldBeRunning := p.Version
|
||||||
|
if !p.Enabled {
|
||||||
|
versionShouldBeRunning = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Reconciling plugin %s running %v, should be running %v", manifest.Name, versionRunning, versionShouldBeRunning)
|
||||||
|
|
||||||
|
if versionRunning == versionShouldBeRunning {
|
||||||
|
log.Printf("Plugin %s is already running version %s", manifest.Name, versionRunning)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.processManager != nil {
|
||||||
|
log.Printf("Stopping plugin %s running version %s", manifest.Name, versionRunning)
|
||||||
|
p.processManager.Disable()
|
||||||
|
p.processManager = nil
|
||||||
|
p.runningVersion = nil
|
||||||
|
(*p.rpcListener).Close()
|
||||||
|
p.rpcListener = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if versionShouldBeRunning == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
workingDir := path.Join(pluginsFolder, "working_dirs", p.manifest.Name)
|
||||||
|
socketPath := path.Join(workingDir, "plugin.sock")
|
||||||
|
|
||||||
|
os.Remove(socketPath)
|
||||||
|
err = os.MkdirAll(workingDir, 0755)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create working directory: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
listener, err := net.Listen("unix", socketPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to listen on socket: %v", err)
|
||||||
|
}
|
||||||
|
p.rpcListener = &listener
|
||||||
|
|
||||||
|
p.processManager = NewProcessManager(func() *exec.Cmd {
|
||||||
|
cmd := exec.Command(manifest.BinaryPath)
|
||||||
|
cmd.Dir = p.GetExtractedFolder()
|
||||||
|
cmd.Env = append(cmd.Env,
|
||||||
|
"JETKVM_PLUGIN_SOCK="+socketPath,
|
||||||
|
"JETKVM_PLUGIN_WORKING_DIR="+workingDir,
|
||||||
|
)
|
||||||
|
cmd.Stdout = os.Stdout
|
||||||
|
cmd.Stderr = os.Stderr
|
||||||
|
// Ensure that the process is killed when the parent dies
|
||||||
|
cmd.SysProcAttr = &syscall.SysProcAttr{
|
||||||
|
Setpgid: true,
|
||||||
|
Pdeathsig: syscall.SIGKILL,
|
||||||
|
}
|
||||||
|
return cmd
|
||||||
|
})
|
||||||
|
p.processManager.StartMonitor()
|
||||||
|
p.processManager.Enable()
|
||||||
|
p.runningVersion = &p.Version
|
||||||
|
log.Printf("Started plugin %s version %s", manifest.Name, p.Version)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PluginInstall) Shutdown() {
|
||||||
|
if p.processManager != nil {
|
||||||
|
p.processManager.Disable()
|
||||||
|
p.processManager = nil
|
||||||
|
p.runningVersion = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.rpcListener != nil {
|
||||||
|
(*p.rpcListener).Close()
|
||||||
|
p.rpcListener = nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,26 @@ const pluginsUploadFolder = pluginsFolder + "/uploads"
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
_ = os.MkdirAll(pluginsUploadFolder, 0755)
|
_ = os.MkdirAll(pluginsUploadFolder, 0755)
|
||||||
|
|
||||||
|
if err := pluginDatabase.Load(); err != nil {
|
||||||
|
fmt.Printf("failed to load plugin database: %v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Starts all plugins that need to be started
|
||||||
|
func ReconcilePlugins() {
|
||||||
|
for _, install := range pluginDatabase.Plugins {
|
||||||
|
err := install.ReconcileSubprocess()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("failed to reconcile subprocess for plugin: %v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func GracefullyShutdownPlugins() {
|
||||||
|
for _, install := range pluginDatabase.Plugins {
|
||||||
|
install.Shutdown()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func RpcPluginStartUpload(filename string, size int64) (*storage.StorageFileUpload, error) {
|
func RpcPluginStartUpload(filename string, size int64) (*storage.StorageFileUpload, error) {
|
||||||
|
@ -77,7 +97,7 @@ func RpcPluginExtract(filename string) (*PluginManifest, error) {
|
||||||
// Get existing PluginInstall
|
// Get existing PluginInstall
|
||||||
install, ok := pluginDatabase.Plugins[manifest.Name]
|
install, ok := pluginDatabase.Plugins[manifest.Name]
|
||||||
if !ok {
|
if !ok {
|
||||||
install = PluginInstall{
|
install = &PluginInstall{
|
||||||
Enabled: false,
|
Enabled: false,
|
||||||
Version: manifest.Version,
|
Version: manifest.Version,
|
||||||
ExtractedVersions: make(map[string]string),
|
ExtractedVersions: make(map[string]string),
|
||||||
|
@ -125,7 +145,11 @@ func RpcPluginInstall(name string, version string) error {
|
||||||
if err := pluginDatabase.Save(); err != nil {
|
if err := pluginDatabase.Save(); err != nil {
|
||||||
return fmt.Errorf("failed to save plugin database: %v", err)
|
return fmt.Errorf("failed to save plugin database: %v", err)
|
||||||
}
|
}
|
||||||
// TODO: start the plugin
|
|
||||||
|
err := pluginInstall.ReconcileSubprocess()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to start plugin %s: %v", name, err)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: Determine if the old version should be removed
|
// TODO: Determine if the old version should be removed
|
||||||
|
|
||||||
|
@ -157,6 +181,11 @@ func RpcUpdateConfig(name string, enabled bool) (*PluginStatus, error) {
|
||||||
return nil, fmt.Errorf("failed to save plugin database: %v", err)
|
return nil, fmt.Errorf("failed to save plugin database: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err := pluginInstall.ReconcileSubprocess()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to stop plugin %s: %v", name, err)
|
||||||
|
}
|
||||||
|
|
||||||
status, err := pluginInstall.GetStatus()
|
status, err := pluginInstall.GetStatus()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get plugin status for %s: %v", name, err)
|
return nil, fmt.Errorf("failed to get plugin status for %s: %v", name, err)
|
||||||
|
|
|
@ -0,0 +1,113 @@
|
||||||
|
package plugin
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os/exec"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO: this can probably be defaulted to this, but overwritten on a per-plugin basis
|
||||||
|
const GRACEFUL_SHUTDOWN_DELAY = 30 * time.Second
|
||||||
|
|
||||||
|
type ProcessManager struct {
|
||||||
|
cmdGen func() *exec.Cmd
|
||||||
|
cmd *exec.Cmd
|
||||||
|
enabled bool
|
||||||
|
backoff time.Duration
|
||||||
|
shutdown chan struct{}
|
||||||
|
restartCh chan struct{}
|
||||||
|
LastError error
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProcessManager(commandGenerator func() *exec.Cmd) *ProcessManager {
|
||||||
|
return &ProcessManager{
|
||||||
|
cmdGen: commandGenerator,
|
||||||
|
enabled: true,
|
||||||
|
backoff: time.Second,
|
||||||
|
shutdown: make(chan struct{}),
|
||||||
|
restartCh: make(chan struct{}, 1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProcessManager) StartMonitor() {
|
||||||
|
go pm.monitor()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProcessManager) monitor() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-pm.shutdown:
|
||||||
|
pm.terminate()
|
||||||
|
return
|
||||||
|
case <-pm.restartCh:
|
||||||
|
if pm.enabled {
|
||||||
|
go pm.runProcess()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProcessManager) runProcess() {
|
||||||
|
pm.LastError = nil
|
||||||
|
pm.cmd = pm.cmdGen()
|
||||||
|
log.Printf("Starting process: %v", pm.cmd)
|
||||||
|
err := pm.cmd.Start()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to start process: %v", err)
|
||||||
|
pm.LastError = fmt.Errorf("failed to start process: %w", err)
|
||||||
|
pm.scheduleRestart()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = pm.cmd.Wait()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Process exited: %v", err)
|
||||||
|
pm.LastError = fmt.Errorf("process exited with error: %w", err)
|
||||||
|
pm.scheduleRestart()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProcessManager) scheduleRestart() {
|
||||||
|
if pm.enabled {
|
||||||
|
log.Printf("Restarting process in %v...", pm.backoff)
|
||||||
|
time.Sleep(pm.backoff)
|
||||||
|
pm.backoff *= 2 // Exponential backoff
|
||||||
|
pm.restartCh <- struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProcessManager) terminate() {
|
||||||
|
if pm.cmd.Process != nil {
|
||||||
|
log.Printf("Sending SIGTERM...")
|
||||||
|
pm.cmd.Process.Signal(syscall.SIGTERM)
|
||||||
|
select {
|
||||||
|
case <-time.After(GRACEFUL_SHUTDOWN_DELAY):
|
||||||
|
log.Printf("Forcing process termination...")
|
||||||
|
pm.cmd.Process.Kill()
|
||||||
|
case <-pm.waitForExit():
|
||||||
|
log.Printf("Process exited gracefully.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProcessManager) waitForExit() <-chan struct{} {
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
pm.cmd.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
return done
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProcessManager) Enable() {
|
||||||
|
pm.enabled = true
|
||||||
|
pm.restartCh <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pm *ProcessManager) Disable() {
|
||||||
|
pm.enabled = false
|
||||||
|
close(pm.shutdown)
|
||||||
|
pm.cmd.Wait()
|
||||||
|
}
|
|
@ -14,4 +14,5 @@ type PluginStatus struct {
|
||||||
PluginManifest
|
PluginManifest
|
||||||
Enabled bool `json:"enabled"`
|
Enabled bool `json:"enabled"`
|
||||||
Status string `json:"status"`
|
Status string `json:"status"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
4
main.go
4
main.go
|
@ -2,6 +2,7 @@ package kvm
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"kvm/internal/plugin"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
@ -67,10 +68,13 @@ func Main() {
|
||||||
//go RunFuseServer()
|
//go RunFuseServer()
|
||||||
go RunWebServer()
|
go RunWebServer()
|
||||||
go RunWebsocketClient()
|
go RunWebsocketClient()
|
||||||
|
go plugin.ReconcilePlugins()
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||||
<-sigs
|
<-sigs
|
||||||
log.Println("JetKVM Shutting Down")
|
log.Println("JetKVM Shutting Down")
|
||||||
|
|
||||||
|
plugin.GracefullyShutdownPlugins()
|
||||||
//if fuseServer != nil {
|
//if fuseServer != nil {
|
||||||
// err := setMassStorageImage(" ")
|
// err := setMassStorageImage(" ")
|
||||||
// if err != nil {
|
// if err != nil {
|
||||||
|
|
|
@ -39,6 +39,7 @@ export default function PluginList() {
|
||||||
const sidebarView = useUiStore(state => state.sidebarView);
|
const sidebarView = useUiStore(state => state.sidebarView);
|
||||||
|
|
||||||
const updatePlugins = useCallback(() => {
|
const updatePlugins = useCallback(() => {
|
||||||
|
setError(null);
|
||||||
send("pluginList", {}, resp => {
|
send("pluginList", {}, resp => {
|
||||||
if ("error" in resp) {
|
if ("error" in resp) {
|
||||||
setError(resp.error.message);
|
setError(resp.error.message);
|
||||||
|
|
Loading…
Reference in New Issue