fix: race condition in native library

This commit is contained in:
Siyuan Miao 2025-09-25 08:33:53 +00:00 committed by Siyuan
parent d0df8eaa44
commit efaab7369b
5 changed files with 65 additions and 24 deletions

View File

@ -3,5 +3,6 @@
"cva",
"cx"
],
"cmake.sourceDirectory": "/Users/aveline/Projects/JetKVM/ymjk/internal/native/cgo"
"cmake.sourceDirectory": "/Users/aveline/Projects/JetKVM/ymjk/internal/native/cgo",
"git.ignoreLimitWarning": true
}

View File

@ -3,7 +3,7 @@ BUILDDATE := $(shell date -u +%FT%T%z)
BUILDTS := $(shell date -u +%s)
REVISION := $(shell git rev-parse HEAD)
VERSION_DEV := 0.4.8-dev$(shell date +%Y%m%d%H%M)
VERSION := 0.4.7
VERSION := 0.4.9
PROMETHEUS_TAG := github.com/prometheus/common/version
KVM_PKG_NAME := github.com/jetkvm/kvm

View File

@ -304,6 +304,7 @@ uint32_t detected_width, detected_height;
bool detected_signal = false, streaming_flag = false;
pthread_t *streaming_thread = NULL;
pthread_mutex_t streaming_mutex = PTHREAD_MUTEX_INITIALIZER;
void write_buffer_to_file(const uint8_t *buffer, size_t length, const char *filename)
{
@ -388,28 +389,28 @@ void *run_video_stream(void *arg)
req.count = i;
return errno;
}
printf("VIDIOC_QUERYBUF successful for buffer %d\n", i);
log_info("VIDIOC_QUERYBUF successful for buffer %d\n", i);
printf("plane: length = %d\n", planes_buffer->length);
printf("plane: offset = %d\n", planes_buffer->m.mem_offset);
log_info("plane: length = %d\n", planes_buffer->length);
log_info("plane: offset = %d\n", planes_buffer->m.mem_offset);
MB_BLK blk = RK_MPI_MB_GetMB(memPool, (planes_buffer)->length, RK_TRUE);
if (blk == NULL)
{
RK_LOGE("get mb blk failed!");
log_error("get mb blk failed!");
return -1;
}
printf("Got memory block for buffer %d\n", i);
log_info("Got memory block for buffer %d\n", i);
buffers[i].mb_blk = blk;
RK_S32 buf_fd = (RK_MPI_MB_Handle2Fd(blk));
if (buf_fd < 0)
{
RK_LOGE("RK_MPI_MB_Handle2Fd failed!");
log_error("RK_MPI_MB_Handle2Fd failed!");
return -1;
}
printf("Converted memory block to file descriptor for buffer %d\n", i);
log_info("Converted memory block to file descriptor for buffer %d\n", i);
planes_buffer->m.fd = buf_fd;
}
@ -427,7 +428,7 @@ void *run_video_stream(void *arg)
perror("VIDIOC_QBUF failed");
return errno;
}
printf("VIDIOC_QBUF successful for buffer %d\n", i);
log_info("VIDIOC_QBUF successful for buffer %d\n", i);
}
if (ioctl(video_dev_fd, VIDIOC_STREAMON, &type) < 0)
@ -472,7 +473,7 @@ void *run_video_stream(void *arg)
{
continue;
}
perror("select in video streaming");
log_error("select in video streaming");
break;
}
memset(&buf, 0, sizeof(buf));
@ -482,7 +483,7 @@ void *run_video_stream(void *arg)
buf.length = 1;
if (ioctl(video_dev_fd, VIDIOC_DQBUF, &buf) < 0)
{
perror("VIDIOC_DQBUF failed");
log_error("VIDIOC_DQBUF failed");
break;
}
// printf("got frame, bytesused = %d\n", tmp_plane.bytesused);
@ -518,11 +519,11 @@ void *run_video_stream(void *arg)
{
if (retried == true)
{
RK_LOGE("RK_MPI_VENC_SendFrame retry failed");
log_error("RK_MPI_VENC_SendFrame retry failed");
}
else
{
RK_LOGE("RK_MPI_VENC_SendFrame failed,retrying");
log_error("RK_MPI_VENC_SendFrame failed,retrying");
retried = true;
usleep(1000llu);
goto retry_send_frame;
@ -532,12 +533,12 @@ void *run_video_stream(void *arg)
num++;
if (ioctl(video_dev_fd, VIDIOC_QBUF, &buf) < 0)
printf("failture VIDIOC_QBUF\n");
log_error("failture VIDIOC_QBUF\n");
}
cleanup:
if (ioctl(video_dev_fd, VIDIOC_STREAMOFF, &type) < 0)
{
perror("VIDIOC_STREAMOFF failed");
log_error("VIDIOC_STREAMOFF failed");
}
venc_stop();
@ -560,7 +561,7 @@ void video_shutdown()
{
if (should_exit == true)
{
printf("shutting down in progress already\n");
log_info("shutting down in progress already\n");
return;
}
video_stop_streaming();
@ -578,14 +579,17 @@ void video_shutdown()
{
shutdown(sub_dev_fd, SHUT_RDWR);
// close(sub_dev_fd);
printf("Closed sub_dev_fd\n");
log_info("Closed sub_dev_fd\n");
}
if (memPool != MB_INVALID_POOLID)
{
RK_MPI_MB_DestroyPool(memPool);
}
printf("Destroyed memory pool\n");
log_info("Destroyed memory pool\n");
pthread_mutex_destroy(&streaming_mutex);
log_info("Destroyed streaming mutex\n");
// if (format_thread != NULL) {
// pthread_join(*format_thread, NULL);
// free(format_thread);
@ -594,23 +598,26 @@ void video_shutdown()
// printf("Joined format detection thread\n");
}
// TODO: mutex?
void video_start_streaming()
{
pthread_mutex_lock(&streaming_mutex);
if (streaming_thread != NULL)
{
log_info("video streaming already started");
pthread_mutex_unlock(&streaming_mutex);
return;
}
streaming_thread = malloc(sizeof(pthread_t));
assert(streaming_thread != NULL);
streaming_flag = true;
pthread_create(streaming_thread, NULL, run_video_stream, NULL);
pthread_mutex_unlock(&streaming_mutex);
}
void video_stop_streaming()
{
pthread_mutex_lock(&streaming_mutex);
if (streaming_thread != NULL)
{
streaming_flag = false;
@ -619,6 +626,7 @@ void video_stop_streaming()
streaming_thread = NULL;
log_info("video streaming stopped");
}
pthread_mutex_unlock(&streaming_mutex);
}
void *run_detect_format(void *arg)
@ -632,7 +640,7 @@ void *run_detect_format(void *arg)
if (ioctl(sub_dev_fd, VIDIOC_SUBSCRIBE_EVENT, &sub) == -1)
{
log_error("cannot subscribe to event");
perror("Cannot subscribe to event");
log_error("Cannot subscribe to event");
goto exit;
}
@ -657,12 +665,12 @@ void *run_detect_format(void *arg)
else if (errno == ERANGE)
{
// Timings were found, but they are out of range of the hardware capabilities.
printf("HDMI status: out of range\n");
log_warn("HDMI status: out of range\n");
video_report_format(false, "out_of_range", 0, 0, 0);
}
else
{
perror("error VIDIOC_QUERY_DV_TIMINGS");
log_error("error VIDIOC_QUERY_DV_TIMINGS");
sleep(1);
continue;
}
@ -681,19 +689,24 @@ void *run_detect_format(void *arg)
detected_height = dv_timings.bt.height;
detected_signal = true;
video_report_format(true, NULL, detected_width, detected_height, frames_per_second);
pthread_mutex_lock(&streaming_mutex);
if (streaming_flag == true)
{
pthread_mutex_unlock(&streaming_mutex);
log_info("restarting on going video streaming");
video_stop_streaming();
video_start_streaming();
}
else
{
pthread_mutex_unlock(&streaming_mutex);
}
}
memset(&ev, 0, sizeof(ev));
if (ioctl(sub_dev_fd, VIDIOC_DQEVENT, &ev) != 0)
{
log_error("failed to VIDIOC_DQEVENT");
perror("failed to VIDIOC_DQEVENT");
break;
}
log_info("New event of type %u", ev.type);
@ -715,12 +728,18 @@ void video_set_quality_factor(float factor)
// TODO: update venc bitrate without stopping streaming
pthread_mutex_lock(&streaming_mutex);
if (streaming_flag == true)
{
pthread_mutex_unlock(&streaming_mutex);
log_info("restarting on going video streaming due to quality factor change");
video_stop_streaming();
video_start_streaming();
}
else
{
pthread_mutex_unlock(&streaming_mutex);
}
}
float video_get_quality_factor() {

View File

@ -1,6 +1,7 @@
package native
import (
"sync"
"time"
"github.com/Masterminds/semver/v3"
@ -17,6 +18,7 @@ type Native struct {
onVideoStateChange func(state VideoState)
onVideoFrameReceived func(frame []byte, duration time.Duration)
onIndevEvent func(event string)
videoLock sync.Mutex
}
type NativeOptions struct {
@ -60,6 +62,7 @@ func NewNative(opts NativeOptions) *Native {
onVideoStateChange: opts.OnVideoStateChange,
onVideoFrameReceived: opts.OnVideoFrameReceived,
onIndevEvent: opts.OnIndevEvent,
videoLock: sync.Mutex{},
}
}

View File

@ -9,27 +9,45 @@ type VideoState struct {
}
func (n *Native) VideoSetQualityFactor(factor float64) error {
n.videoLock.Lock()
defer n.videoLock.Unlock()
return videoSetStreamQualityFactor(factor)
}
func (n *Native) VideoGetQualityFactor() (float64, error) {
n.videoLock.Lock()
defer n.videoLock.Unlock()
return videoGetStreamQualityFactor()
}
func (n *Native) VideoSetEDID(edid string) error {
n.videoLock.Lock()
defer n.videoLock.Unlock()
return videoSetEDID(edid)
}
func (n *Native) VideoGetEDID() (string, error) {
n.videoLock.Lock()
defer n.videoLock.Unlock()
return videoGetEDID()
}
func (n *Native) VideoStop() error {
n.videoLock.Lock()
defer n.videoLock.Unlock()
videoStop()
return nil
}
func (n *Native) VideoStart() error {
n.videoLock.Lock()
defer n.videoLock.Unlock()
videoStart()
return nil
}