fix: runner can run processes properly and get stdout and stderr without blocking

This commit is contained in:
Gabriele Musco 2023-06-07 13:25:18 +02:00
parent 9b4dc282c4
commit 3d98a3e5e2
No known key found for this signature in database
GPG key ID: 1068D795C80E51DE
4 changed files with 150 additions and 124 deletions

28
Cargo.lock generated
View file

@ -1358,9 +1358,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.144"
version = "0.2.145"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
checksum = "fc86cde3ff845662b8f4ef6cb50ea0e20c524eb3d29ae048287e06a1b3fa6a81"
[[package]]
name = "libloading"
@ -1433,6 +1433,15 @@ dependencies = [
"autocfg",
]
[[package]]
name = "memoffset"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
dependencies = [
"autocfg",
]
[[package]]
name = "memoffset"
version = "0.8.0"
@ -1609,6 +1618,20 @@ dependencies = [
"memoffset 0.6.5",
]
[[package]]
name = "nix"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a"
dependencies = [
"bitflags",
"cfg-if",
"libc",
"memoffset 0.7.1",
"pin-utils",
"static_assertions",
]
[[package]]
name = "nom"
version = "7.1.3"
@ -2048,6 +2071,7 @@ version = "0.1.0"
dependencies = [
"iced",
"iced_aw",
"nix 0.26.2",
"serde",
"serde_json",
]

View file

@ -12,6 +12,7 @@ iced = { version = "0.9.0", features = [
iced_aw = { version = "0.5.2", default-features = false, features = [
"tab_bar", "tabs", "quad"
] }
nix = "0.26.2"
serde = { version = "1.0.163", features = [
"derive"
] }

View file

@ -2,17 +2,30 @@ use std::{
collections::HashMap,
io::{BufRead, BufReader, Write},
process::{Child, Command, Stdio},
sync::{
mpsc::{sync_channel, Receiver, SyncSender},
Arc, Mutex,
},
thread::{self, JoinHandle},
vec,
};
use crate::{profile::Profile, file_utils::get_writer};
use nix::{
sys::signal::{kill, Signal::SIGTERM},
unistd::Pid,
};
use crate::{file_utils::get_writer, profile::Profile};
pub struct Runner {
pub environment: HashMap<String, String>,
pub command: String,
pub args: Vec<String>,
pub stdout: Vec<String>,
pub stderr: Vec<String>,
pub process: Option<Child>,
pub output: Vec<String>,
sender: Arc<Mutex<SyncSender<String>>>,
receiver: Receiver<String>,
threads: Vec<JoinHandle<()>>,
process: Option<Child>,
}
#[derive(PartialEq, Eq, Debug)]
@ -21,103 +34,110 @@ pub enum RunnerStatus {
Stopped,
}
macro_rules! logger_thread {
($buf_fd: expr, $sender: expr) => {
thread::spawn(move || {
let mut reader = BufReader::new($buf_fd);
loop {
let mut buf = String::new();
match reader.read_line(&mut buf) {
Err(_) => return,
Ok(bytes_read) => {
println!("{}", bytes_read);
if bytes_read == 0 {
return;
}
if buf.is_empty() {
continue;
}
match $sender.clone().lock().expect("Could not lock sender").send(buf) {
Ok(_) => {}
Err(_) => return,
};
}
};
}
})
}
}
impl Runner {
pub fn new(environment: HashMap<String, String>, command: String, args: Vec<String>) -> Self {
let (sender, receiver) = sync_channel(64000);
Self {
environment,
command,
args,
stdout: Vec::new(),
stderr: Vec::new(),
output: Vec::new(),
process: None,
sender: Arc::new(Mutex::new(sender)),
threads: Vec::new(),
receiver,
}
}
pub fn monado_runner_from_profile(profile: Profile) -> Self {
Self {
environment: profile.environment,
command: profile.monado_path,
args: vec![],
stdout: Vec::new(),
stderr: Vec::new(),
process: None,
}
Self::new(profile.environment, profile.monado_path, vec![])
}
pub fn start(&mut self) {
match self.process {
Some(_) => panic!("Cannot reuse existing Runner"),
None => {
self.process = Some(
Command::new(&self.command)
.args(&self.args)
.envs(self.environment.clone())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("Failed to execute runner"),
);
}
};
self.threads = Vec::new();
self.process = Some(
Command::new(self.command.clone())
.args(self.args.clone())
.envs(self.environment.clone())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.expect("Failed to execute runner"),
);
let stdout = self.process.as_mut().unwrap().stdout.take().unwrap();
let stderr = self.process.as_mut().unwrap().stderr.take().unwrap();
let stdout_sender = self.sender.clone();
let stderr_sender = self.sender.clone();
self.threads.push(logger_thread!(stdout, stdout_sender));
self.threads.push(logger_thread!(stderr, stderr_sender));
}
pub fn terminate(&mut self) {
match self.process.as_mut() {
None => {
println!("Runner already stopped")
}
Some(proc) => {
proc.kill().expect("Failed to kill process");
proc.wait().expect("Failed to wait for process");
if self.status() != RunnerStatus::Running {
return;
}
let process = self.process.take();
if process.is_none() {
return;
}
let mut proc = process.unwrap();
let child_pid = Pid::from_raw(proc.id().try_into().expect("Could not convert pid to u32"));
kill(child_pid, SIGTERM).expect("Could not send sigterm to process");
loop {
match self.threads.pop() {
None => break,
Some(thread) => thread.join().expect("Failed to join reader thread"),
}
}
proc.wait().expect("Failed to wait for process");
}
pub fn status(&mut self) -> RunnerStatus {
match self.process.as_mut() {
match &mut self.process {
None => RunnerStatus::Stopped,
Some(proc) => match proc.try_wait() {
Ok(_) => RunnerStatus::Stopped,
Err(_) => RunnerStatus::Running,
Ok(Some(_)) => RunnerStatus::Stopped,
Ok(None) => RunnerStatus::Running,
},
}
}
pub fn flush_out(&mut self) {
match self.process.as_mut() {
None => (),
Some(proc) => {
let out = proc.stdout.take().expect("Unable to read stout");
let mut reader = BufReader::new(out);
let mut bytes_read = 1;
let mut buf = "".to_string();
while bytes_read != 0 {
bytes_read = match reader.read_line(&mut buf) {
Ok(bytes) => bytes,
Err(_) => 0,
};
self.stdout.push(buf.clone());
buf.clear();
}
let err = proc.stderr.take().expect("Unable to read stderr");
let mut err_reader = BufReader::new(err);
bytes_read = 1;
while bytes_read != 0 {
bytes_read = match err_reader.read_line(&mut buf) {
Ok(bytes) => bytes,
Err(_) => 0,
};
self.stderr.push(buf.clone());
buf.clear();
}
}
pub fn get_output(&mut self) -> String {
loop {
match self.receiver.try_recv() {
Ok(data) => self.output.push(data),
Err(_) => break,
};
}
}
pub fn get_stdout(&mut self) -> String {
self.flush_out();
self.stdout.concat()
self.output.concat()
}
fn save_log(path_s: String, log: &Vec<String>) -> Result<(), std::io::Error> {
@ -126,12 +146,8 @@ impl Runner {
writer.write_all(log_s.as_ref())
}
pub fn save_stdout(&mut self, base_path: String) -> Result<(), std::io::Error> {
Runner::save_log(base_path + ".stdout", &self.stdout)
}
pub fn save_stderr(&mut self, base_path: String) -> Result<(), std::io::Error> {
Runner::save_log(base_path + ".stderr", &self.stderr)
pub fn save_output(&mut self, path: String) -> Result<(), std::io::Error> {
Runner::save_log(path, &self.output)
}
}
@ -153,14 +169,12 @@ mod tests {
vec!["-c".into(), "echo \"REX2TEST: $REX2TEST\"".into()],
);
runner.start();
while runner.status() == RunnerStatus::Running {
sleep(time::Duration::from_millis(10));
}
runner.flush_out();
sleep(time::Duration::from_millis(10));
runner.terminate();
assert_eq!(runner.status(), RunnerStatus::Stopped);
assert_eq!(runner.stdout.len(), 2);
let out = runner.get_output();
assert_eq!(
runner.stdout.get(0).unwrap(),
out,
"REX2TEST: Lorem ipsum dolor\n"
);
}
@ -176,10 +190,8 @@ mod tests {
while runner.status() == RunnerStatus::Running {
sleep(time::Duration::from_millis(10));
}
runner.flush_out();
runner.save_stdout("./target/testout/testlog".into());
runner.save_stderr("./target/testout/testlog".into());
runner.save_output("./target/testout/testlog".into());
}
#[test]

View file

@ -1,7 +1,4 @@
use super::{
styles::bordered_container::{BorderedContainer, RoundedBorderedContainer},
widgets::widgets::hspacer,
};
use super::{styles::bordered_container::BorderedContainer, widgets::widgets::hspacer};
use crate::{
config::{get_config, save_config, Config},
constants::APP_NAME,
@ -11,13 +8,13 @@ use crate::{
};
use iced::{
executor,
theme::{Container, Button},
theme::{Button, Container},
time,
widget::{button, checkbox, column, container, pick_list, row, scrollable, text, text_input},
Alignment, Application, Command, Element, Length, Padding, Subscription, Theme,
};
use std::{
borrow::BorrowMut,
cell::Cell,
time::{Duration, Instant},
};
@ -25,7 +22,7 @@ pub struct MainWin {
profiles: Vec<Profile>,
config: Config,
debug_search_term: String,
monado_runner: Option<Runner>,
monado_runner: Cell<Runner>,
monado_active: bool,
monado_log: String,
}
@ -58,31 +55,20 @@ impl MainWin {
}
fn start_monado(&mut self) {
match self.monado_runner.borrow_mut() {
None => {}
Some(existing_runner) => {
existing_runner.terminate();
}
};
self.monado_runner.get_mut().terminate();
let profile = self.get_selected_profile().unwrap();
self.monado_runner = Some(Runner::new(
profile.environment.clone(),
profile.monado_path.clone(),
vec![],
));
self.monado_runner
.set(Runner::monado_runner_from_profile(profile.clone()));
self.monado_runner.as_mut().unwrap().start()
self.monado_runner.get_mut().start();
}
fn get_monado_log(&mut self) {
self.monado_log = match self.monado_runner.as_mut() {
Some(runner) => match runner.status() {
RunnerStatus::Running => runner.get_stdout(),
RunnerStatus::Stopped => "Monado service inactive".into(),
},
None => "Monado service inactive".into(),
self.monado_log = match self.monado_runner.get_mut().status() {
RunnerStatus::Running => self.monado_runner.get_mut().get_output(),
RunnerStatus::Stopped => "Monado service inactive".into(),
}
}
}
@ -94,13 +80,17 @@ impl Application for MainWin {
type Theme = Theme;
fn new(_flags: Self::Flags) -> (Self, Command<Message>) {
let profiles = vec![load_profile(&"./test/files/profile.json".to_string()).unwrap()];
let monado_runner = Cell::new(Runner::monado_runner_from_profile(
profiles.get(0).cloned().unwrap(),
));
(
Self {
// TODO: load profiles from disk somehow
profiles: vec![load_profile(&"./test/files/profile.json".to_string()).unwrap()],
profiles,
config: get_config(),
debug_search_term: "".into(),
monado_runner: None,
monado_runner,
monado_active: false,
monado_log: "".into(),
},
@ -118,7 +108,7 @@ impl Application for MainWin {
self.start_monado();
}
Message::Stop => {
self.monado_runner.as_mut().unwrap().terminate();
self.monado_runner.get_mut().terminate();
}
Message::ProfileChanged(profile) => {
if self.config.selected_profile_name != profile.name {
@ -135,13 +125,10 @@ impl Application for MainWin {
}
Message::DebugSearchChanged(term) => self.debug_search_term = term,
Message::LogUpdate(_) => {
self.monado_active = match self.monado_runner.as_mut() {
None => false,
Some(runner) => match runner.status() {
RunnerStatus::Running => true,
RunnerStatus::Stopped => false,
},
};
self.monado_active = match self.monado_runner.get_mut().status() {
RunnerStatus::Running => true,
RunnerStatus::Stopped => false,
};
self.get_monado_log();
}
_ => println!("unhandled"),
@ -213,7 +200,9 @@ impl Application for MainWin {
let monado_view = column![
match self.monado_active {
true => button("Stop").on_press(Message::Stop).style(Button::Destructive),
true => button("Stop")
.on_press(Message::Stop)
.style(Button::Destructive),
false => button("Start").on_press(Message::Start),
}
.padding(Padding::from([6, 24])),