Activate services using DBus.

This commit is contained in:
R-VdP 2023-02-07 23:19:02 +00:00
parent efe60a960e
commit 50b4212a06
No known key found for this signature in database
2 changed files with 118 additions and 60 deletions

View file

@ -3,10 +3,11 @@ mod systemd;
use anyhow::{anyhow, Result};
use clap::Parser;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::os::unix;
use std::path::Path;
use std::{env, fs, io, process, str};
use std::time::Duration;
use std::{env, fs, io, iter, process, str};
const FLAKE_ATTR: &str = "serviceConfig";
const PROFILE_NAME: &str = "service-manager";
@ -49,14 +50,21 @@ enum Action {
},
}
fn main() -> Result<()> {
fn main() {
let args = Args::parse();
// FIXME: set default level to info
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
match args.action {
Action::Activate { store_path } => activate(store_path),
Action::Generate { flake_uri } => generate(&flake_uri),
Action::Activate { store_path } => handle_toplevel_error(activate(store_path)),
Action::Generate { flake_uri } => handle_toplevel_error(generate(&flake_uri)),
}
}
fn handle_toplevel_error<T>(r: Result<T>) {
if let Err(e) = r {
log::error!("{}", e)
}
}
@ -75,7 +83,6 @@ impl ServiceConfig {
fn activate(store_path: StorePath) -> Result<()> {
if !nix::unistd::Uid::is_root(nix::unistd::getuid()) {
log::error!("We need root permissions");
return Err(anyhow!("We need root permissions."));
}
log::info!("Activating service-manager profile: {}", store_path);
@ -88,39 +95,50 @@ fn activate(store_path: StorePath) -> Result<()> {
services.iter().try_for_each(|service| {
create_store_link(
&service.store_path(),
Path::new(&format!("/run/systemd/system/{}.service", service.name)),
Path::new(&format!("/run/systemd/system/{}", service.name)),
)
})?;
start_services(&services);
let service_manager = systemd::ServiceManager::new_session()?;
start_services(&service_manager, &services, &Some(Duration::from_secs(30)))?;
Ok(())
}
fn start_services(services: &[ServiceConfig]) {
if process::Command::new("systemctl")
.arg("daemon-reload")
.output()
.expect("Unable to run systemctl.")
.status
.success()
{
services.iter().for_each(|service| {
log::info!("Starting service {} ...", service.name);
let output = print_out_and_err(
process::Command::new("systemctl")
.arg("start")
.arg(&service.name)
.output()
.expect("Unable to run systemctl"),
);
if output.status.success() {
log::info!("Started service {}", service.name);
} else {
log::error!("Error starting service {}", service.name);
fn start_services(
service_manager: &systemd::ServiceManager,
services: &[ServiceConfig],
timeout: &Option<Duration>,
) -> Result<()> {
service_manager.daemon_reload()?;
let job_monitor = service_manager.monitor_jobs_init()?;
let successful_services = services.iter().fold(HashSet::new(), |set, service| {
match service_manager.restart_unit(&service.name) {
Ok(_) => {
log::info!("Restarting service {}...", service.name);
set.into_iter()
.chain(iter::once(Box::new(service.name.to_owned())))
.collect()
}
});
Err(e) => {
log::error!(
"Error restarting unit, please consult the logs: {}",
service.name
);
log::error!("{}", e);
set
}
}
});
if !service_manager.monitor_jobs_finish(job_monitor, timeout, successful_services)? {
anyhow::bail!("Timeout waiting for systemd jobs");
}
// TODO: do we want to propagate unit failures here in some way?
Ok(())
}
fn generate(flake_uri: &str) -> Result<()> {

View file

@ -4,22 +4,27 @@
mod manager;
mod unit;
use anyhow::Error;
use crate::{
systemd::manager::{OrgFreedesktopSystemd1Manager, OrgFreedesktopSystemd1ManagerJobRemoved},
systemd::unit::OrgFreedesktopSystemd1Unit,
};
use anyhow::Error;
use dbus::{
blocking::{Connection, Proxy},
channel::Token,
Message, Path,
};
use std::{
collections::HashSet,
hash::Hash,
rc::Rc,
result::Result,
sync::atomic::{AtomicBool, Ordering},
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Mutex,
},
thread,
time::{Duration, Instant},
};
@ -74,7 +79,8 @@ pub struct Job<'a> {
}
pub struct JobMonitor {
ready: Arc<AtomicBool>,
job_names: Arc<Mutex<HashSet<String>>>,
tokens: HashSet<Token>,
}
impl Drop for ServiceManager {
@ -85,7 +91,7 @@ impl Drop for ServiceManager {
impl ServiceManager {
pub fn new_session() -> Result<ServiceManager, Error> {
let conn = Connection::new_session()?;
let conn = Connection::new_system()?;
let proxy = Proxy::new(
SD_DESTINATION,
SD_PATH,
@ -143,43 +149,52 @@ impl ServiceManager {
}
}
pub fn monitor_jobs_init<F, I>(&self, names: I, handler: F) -> Result<JobMonitor, Error>
where
F: Fn(&str, &str) + Send + 'static,
I: IntoIterator,
I::Item: AsRef<String>,
{
let mut names_remaining = names
.into_iter()
.map(|n| String::from(n.as_ref()))
.collect::<HashSet<_>>();
let ready = Arc::new(AtomicBool::from(false));
let ready_jobs_removed = Arc::clone(&ready);
pub fn monitor_jobs_init(&self) -> Result<JobMonitor, Error> {
let job_names: Arc<Mutex<HashSet<String>>> = Arc::new(Mutex::from(HashSet::new()));
self.proxy.match_signal(
let job_names_clone = Arc::clone(&job_names);
let token = self.proxy.match_signal(
move |h: OrgFreedesktopSystemd1ManagerJobRemoved, _: &Connection, _: &Message| {
names_remaining.remove(&h.unit);
handler(&h.unit, &h.result);
let res = names_remaining.is_empty();
ready_jobs_removed.store(res, Ordering::Relaxed);
!res
log::debug!("{} added", h.unit);
log_thread("Signal handling");
{
// Insert a new name, and let the lock go out of scope immediately
job_names_clone.lock().unwrap().insert(h.unit);
}
// The callback gets removed at the end of monitor_jobs_finish
true
},
)?;
Ok(JobMonitor { ready })
Ok(JobMonitor {
job_names: Arc::clone(&job_names),
tokens: HashSet::from([token]),
})
}
/// Waits for the monitored jobs to finish. Returns `true` if all jobs
/// finished before the timeout, `false` otherwise.
pub fn monitor_jobs_finish(
pub fn monitor_jobs_finish<I>(
&self,
job_monitor: &JobMonitor,
job_monitor: JobMonitor,
timeout: &Option<Duration>,
) -> Result<bool, Error> {
services: I,
) -> Result<bool, Error>
where
I: IntoIterator,
I::Item: AsRef<String> + Eq + Hash,
{
log::info!("Waiting for jobs to finish...");
let start_time = Instant::now();
while !job_monitor.ready.load(Ordering::Relaxed) {
let mut waiting_for = services
.into_iter()
.map(|n| String::from(n.as_ref()))
.collect::<HashSet<String>>();
let total_jobs = waiting_for.len();
while !waiting_for.is_empty() {
log_thread("Job handling");
self.proxy.connection.process(Duration::from_millis(50))?;
if timeout
@ -188,8 +203,28 @@ impl ServiceManager {
{
return Ok(false);
}
{
let mut job_names = job_monitor.job_names.lock().unwrap();
waiting_for = waiting_for
.iter()
.filter_map(|n| {
if job_names.contains(n) {
None
} else {
Some(n.to_owned())
}
})
.collect();
*job_names = HashSet::new();
log::debug!("{:?}/{:?}", waiting_for.len(), total_jobs);
}
}
log::info!("All jobs finished.");
// Remove the signal handling callback
job_monitor
.tokens
.into_iter()
.try_for_each(|t| self.proxy.match_stop(t, true))?;
Ok(true)
}
@ -258,3 +293,8 @@ impl UnitManager<'_> {
Ok(OrgFreedesktopSystemd1Unit::refuse_manual_stop(&self.proxy)?)
}
}
fn log_thread(name: &str) {
let thread = thread::current();
log::debug!("{} thread: {:?} ({:?})", name, thread.name(), thread.id());
}