Allow for a job monitor to be reused.

This commit is contained in:
r-vdp 2023-03-23 14:23:43 +01:00
parent 4460250457
commit 9278cc8be8
No known key found for this signature in database
2 changed files with 23 additions and 16 deletions

View file

@ -56,7 +56,7 @@ pub fn activate(store_path: &StorePath, ephemeral: bool) -> Result<()> {
// still knows about these units. // still knows about these units.
wait_for_jobs( wait_for_jobs(
&service_manager, &service_manager,
job_monitor, &job_monitor,
stop_services(&service_manager, &services_to_stop)?, stop_services(&service_manager, &services_to_stop)?,
&timeout, &timeout,
)?; )?;
@ -69,10 +69,9 @@ pub fn activate(store_path: &StorePath, ephemeral: bool) -> Result<()> {
let active_targets = get_active_targets(&service_manager); let active_targets = get_active_targets(&service_manager);
let services_to_reload = get_services_to_reload(services, old_services); let services_to_reload = get_services_to_reload(services, old_services);
let job_monitor = service_manager.monitor_jobs_init()?;
wait_for_jobs( wait_for_jobs(
&service_manager, &service_manager,
job_monitor, &job_monitor,
reload_services(&service_manager, &services_to_reload)? reload_services(&service_manager, &services_to_reload)?
+ start_units(&service_manager, &active_targets?)?, + start_units(&service_manager, &active_targets?)?,
&timeout, &timeout,
@ -192,7 +191,7 @@ pub fn deactivate() -> Result<()> {
// still knows about these units. // still knows about these units.
wait_for_jobs( wait_for_jobs(
&service_manager, &service_manager,
job_monitor, &job_monitor,
stop_services(&service_manager, &old_services)?, stop_services(&service_manager, &old_services)?,
&timeout, &timeout,
)?; )?;
@ -337,7 +336,7 @@ where
fn wait_for_jobs( fn wait_for_jobs(
service_manager: &systemd::ServiceManager, service_manager: &systemd::ServiceManager,
job_monitor: systemd::JobMonitor, job_monitor: &systemd::JobMonitor,
jobs: HashSet<JobId>, jobs: HashSet<JobId>,
timeout: &Option<Duration>, timeout: &Option<Duration>,
) -> Result<()> { ) -> Result<()> {

View file

@ -1,4 +1,4 @@
// FIXME: Remove this // TODO: Remove this
#![allow(dead_code)] #![allow(dead_code)]
mod manager; mod manager;
@ -75,9 +75,23 @@ pub struct Job<'a> {
path: Path<'a>, path: Path<'a>,
} }
pub struct JobMonitor { pub struct JobMonitor<'a> {
job_names: Arc<Mutex<im::HashSet<String>>>, job_names: Arc<Mutex<im::HashSet<String>>>,
tokens: im::HashSet<Token>, tokens: im::HashSet<Token>,
service_manager: &'a ServiceManager,
}
impl Drop for JobMonitor<'_> {
fn drop(&mut self) {
self.tokens.iter().for_each(|t| {
self.service_manager
.proxy
.match_stop(*t, true)
.unwrap_or_else(|e|
log::error!("Error while stopping match listener, memory might leak...\n Caused by: {e}")
)
});
}
} }
impl Drop for ServiceManager { impl Drop for ServiceManager {
@ -147,7 +161,7 @@ impl ServiceManager {
} }
pub fn monitor_jobs_init(&self) -> Result<JobMonitor, Error> { pub fn monitor_jobs_init(&self) -> Result<JobMonitor, Error> {
let job_names: Arc<Mutex<im::HashSet<String>>> = Arc::new(Mutex::from(im::HashSet::new())); let job_names = Arc::new(Mutex::from(im::HashSet::<String>::new()));
let job_names_clone = Arc::clone(&job_names); let job_names_clone = Arc::clone(&job_names);
let token = self.proxy.match_signal( let token = self.proxy.match_signal(
@ -165,16 +179,15 @@ impl ServiceManager {
Ok(JobMonitor { Ok(JobMonitor {
job_names: Arc::clone(&job_names), job_names: Arc::clone(&job_names),
tokens: im::HashSet::unit(token), tokens: im::HashSet::unit(token),
service_manager: self,
}) })
} }
/// Waits for the monitored jobs to finish. Returns `true` if all jobs /// Waits for the monitored jobs to finish. Returns `true` if all jobs
/// finished before the timeout, `false` otherwise. /// finished before the timeout, `false` otherwise.
/// It is important that we consume the job monitor, since we remove the signal handler at the
/// end of this call, and so the job monitor cannot be re-used.
pub fn monitor_jobs_finish<I>( pub fn monitor_jobs_finish<I>(
&self, &self,
job_monitor: JobMonitor, job_monitor: &JobMonitor,
timeout: &Option<Duration>, timeout: &Option<Duration>,
services: I, services: I,
) -> Result<bool, Error> ) -> Result<bool, Error>
@ -217,11 +230,6 @@ impl ServiceManager {
if total_jobs > 0 { if total_jobs > 0 {
log::info!("All jobs finished."); log::info!("All jobs finished.");
} }
// Remove the signal handling callback
job_monitor
.tokens
.into_iter()
.try_for_each(|t| self.proxy.match_stop(t, true))?;
Ok(true) Ok(true)
} }