Complete rework of the systemd logic.

This commit is contained in:
r-vdp 2023-03-21 16:14:12 +01:00
parent 806b1f23fd
commit 58353436c2
No known key found for this signature in database
9 changed files with 316 additions and 164 deletions

View file

@ -18,7 +18,9 @@ in
nixosConfig = (lib.nixosSystem { nixosConfig = (lib.nixosSystem {
inherit system; inherit system;
modules = [ ./modules/system-manager.nix ] ++ modules; modules = [
./modules/system-manager.nix
] ++ modules;
specialArgs = extraSpecialArgs; specialArgs = extraSpecialArgs;
}).config; }).config;
@ -36,10 +38,8 @@ in
(name: (name:
let let
serviceName = "${name}.service"; serviceName = "${name}.service";
service = nixosConfig.systemd.services.${name};
in in
lib.nameValuePair serviceName { lib.nameValuePair serviceName {
inherit (service) wantedBy requiredBy;
storePath = storePath =
''${nixosConfig.systemd.units."${serviceName}".unit}/${serviceName}''; ''${nixosConfig.systemd.units."${serviceName}".unit}/${serviceName}'';
}) })

View file

@ -1,6 +1,5 @@
{ lib { lib
, pkgs , pkgs
, config
, ... , ...
}: }:
let let
@ -23,11 +22,12 @@ let
serviceConfig = { serviceConfig = {
Type = "oneshot"; Type = "oneshot";
RemainAfterExit = true; RemainAfterExit = true;
ExecReload = "true"; ExecReload = "${lib.getBin pkgs.coreutils}/bin/true";
}; };
wantedBy = [ "multi-user.target" ]; wantedBy = [ "multi-user.target" ];
requiredBy = lib.mkIf (ix > 5) [ "service-0.service" ];
script = '' script = ''
sleep ${if ix > 5 then "3" else "1"} sleep ${if ix > 5 then "2" else "1"}
''; '';
}) })
); );

View file

@ -1,6 +1,6 @@
{ lib { lib
, pkgs
, config , config
, pkgs
, ... , ...
}: }:
{ {
@ -33,5 +33,66 @@
]; ];
} }
); );
# Add the system directory for systemd
system-manager.etcFiles = [ "systemd/system" ];
environment.etc =
let
allowCollisions = false;
enabledUnits =
lib.filterAttrs
(name: _: lib.elem
name
(map (name: "${name}.service") config.system-manager.services))
config.systemd.units;
in
{
"systemd/system".source = lib.mkForce (pkgs.runCommand "system-manager-units"
{
preferLocalBuild = true;
allowSubstitutes = false;
}
''
mkdir -p $out
for i in ${toString (lib.mapAttrsToList (n: v: v.unit) enabledUnits)}; do
fn=$(basename $i/*)
if [ -e $out/$fn ]; then
if [ "$(readlink -f $i/$fn)" = /dev/null ]; then
ln -sfn /dev/null $out/$fn
else
${if allowCollisions then ''
mkdir -p $out/$fn.d
ln -s $i/$fn $out/$fn.d/overrides.conf
'' else ''
echo "Found multiple derivations configuring $fn!"
exit 1
''}
fi
else
ln -fs $i/$fn $out/
fi
done
${lib.concatStrings (
lib.mapAttrsToList (name: unit:
lib.concatMapStrings (name2: ''
mkdir -p $out/'${name2}.wants'
ln -sfn '../${name}' $out/'${name2}.wants'/
'') (unit.wantedBy or [])
) enabledUnits)}
${lib.concatStrings (
lib.mapAttrsToList (name: unit:
lib.concatMapStrings (name2: ''
mkdir -p $out/'${name2}.requires'
ln -sfn '../${name}' $out/'${name2}.requires'/
'') (unit.requiredBy or [])
) enabledUnits)}
''
);
};
}; };
} }

View file

@ -13,7 +13,9 @@ pub fn activate(store_path: &StorePath, ephemeral: bool) -> Result<()> {
// TODO we probably need to first deactivate left-over files and services // TODO we probably need to first deactivate left-over files and services
// before we start putting in place the new ones. // before we start putting in place the new ones.
log::info!("Activating etc files...");
etc_files::activate(store_path, ephemeral)?; etc_files::activate(store_path, ephemeral)?;
log::info!("Activating systemd services...");
services::activate(store_path, ephemeral)?; services::activate(store_path, ephemeral)?;
Ok(()) Ok(())
} }

View file

@ -12,7 +12,7 @@ use std::path::{Path, PathBuf};
use std::{fs, io}; use std::{fs, io};
use crate::{ use crate::{
create_link, create_store_link, remove_dir, remove_file, remove_link, StorePath, create_link, create_store_link, etc_dir, remove_dir, remove_file, remove_link, StorePath,
ETC_STATE_FILE_NAME, SYSTEM_MANAGER_STATE_DIR, SYSTEM_MANAGER_STATIC_NAME, ETC_STATE_FILE_NAME, SYSTEM_MANAGER_STATE_DIR, SYSTEM_MANAGER_STATIC_NAME,
}; };
use etc_tree::EtcTree; use etc_tree::EtcTree;
@ -42,16 +42,19 @@ struct EtcFilesConfig {
impl std::fmt::Display for EtcFilesConfig { impl std::fmt::Display for EtcFilesConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Files in config:")?; let out: String = itertools::intersperse(
self.entries.values().try_for_each(|entry| { self.entries.values().map(|entry| {
writeln!( format!(
f,
"target: {}, source:{}, mode:{}", "target: {}, source:{}, mode:{}",
entry.target.display(), entry.target.display(),
entry.source, entry.source,
entry.mode entry.mode
) )
}) }),
"\n".to_owned(),
)
.collect();
write!(f, "{out}")
} }
} }
@ -284,14 +287,6 @@ fn copy_file(source: &Path, target: &Path, mode: &str, old_state: &EtcTree) -> R
} }
} }
fn etc_dir(ephemeral: bool) -> PathBuf {
if ephemeral {
Path::new("/run").join("etc")
} else {
PathBuf::from("/etc")
}
}
fn serialise_state<E>(created_files: Option<E>) -> Result<()> fn serialise_state<E>(created_files: Option<E>) -> Result<()>
where where
E: AsRef<EtcTree>, E: AsRef<EtcTree>,

View file

@ -1,14 +1,13 @@
use anyhow::Result; use anyhow::{Context, Result};
use im::{HashMap, HashSet}; use im::{HashMap, HashSet};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fs::DirBuilder; use std::fs::DirBuilder;
use std::path::{Path, PathBuf}; use std::path::{self, Path};
use std::time::Duration; use std::time::Duration;
use std::{fs, io, str}; use std::{fs, io, str};
use crate::{ use crate::{
create_store_link, remove_link, systemd, StorePath, SERVICES_STATE_FILE_NAME, create_link, etc_dir, systemd, StorePath, SERVICES_STATE_FILE_NAME, SYSTEM_MANAGER_STATE_DIR,
SYSTEM_MANAGER_STATE_DIR,
}; };
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@ -20,44 +19,22 @@ struct ServiceConfig {
type Services = HashMap<String, ServiceConfig>; type Services = HashMap<String, ServiceConfig>;
fn print_services(services: &Services) -> Result<String> { fn print_services(services: &Services) -> Result<String> {
use std::fmt::Write as _; let out = itertools::intersperse(
let mut out = String::new();
writeln!(out, "Services in config:")?;
services services
.iter() .iter()
.try_for_each(|(name, entry)| writeln!(out, "name: {name}, source:{}", entry.store_path))?; .map(|(name, entry)| format!("name: {name}, source:{}", entry.store_path)),
"\n".to_owned(),
)
.collect();
Ok(out) Ok(out)
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct LinkedServiceConfig {
#[serde(flatten)]
service_config: ServiceConfig,
#[serde(rename = "linkedPath")]
path: PathBuf,
}
impl LinkedServiceConfig {
fn linked_path(&self) -> PathBuf {
PathBuf::from(&self.path)
}
fn new(service_config: ServiceConfig, path: PathBuf) -> Self {
LinkedServiceConfig {
service_config,
path,
}
}
}
type LinkedServices = HashMap<String, LinkedServiceConfig>;
pub fn activate(store_path: &StorePath, ephemeral: bool) -> Result<()> { pub fn activate(store_path: &StorePath, ephemeral: bool) -> Result<()> {
let old_linked_services = read_linked_services()?; verify_systemd_dir(ephemeral)?;
log::info!("Reading service definitions..."); let old_services = read_saved_services()?;
log::info!("Reading new service definitions...");
let file = fs::File::open( let file = fs::File::open(
Path::new(&store_path.store_path) Path::new(&store_path.store_path)
.join("services") .join("services")
@ -67,94 +44,158 @@ pub fn activate(store_path: &StorePath, ephemeral: bool) -> Result<()> {
let services: Services = serde_json::from_reader(reader)?; let services: Services = serde_json::from_reader(reader)?;
log::debug!("{}", print_services(&services)?); log::debug!("{}", print_services(&services)?);
let linked_services = link_services(services, ephemeral)?; serialise_saved_services(&services)?;
serialise_linked_services(&linked_services)?;
let services_to_stop = old_linked_services.relative_complement(linked_services.clone()); let services_to_stop = old_services.clone().relative_complement(services.clone());
let service_manager = systemd::ServiceManager::new_session()?; let service_manager = systemd::ServiceManager::new_session()?;
let job_monitor = service_manager.monitor_jobs_init()?;
let timeout = Some(Duration::from_secs(30)); let timeout = Some(Duration::from_secs(30));
// We need to do this before we reload the systemd daemon, so that the daemon // We need to do this before we reload the systemd daemon, so that the daemon
// still knows about these units. // still knows about these units.
stop_services(&service_manager, &services_to_stop, &timeout)?; wait_for_jobs(
unlink_services(&services_to_stop)?; &service_manager,
job_monitor,
stop_services(&service_manager, &services_to_stop)?,
&timeout,
)?;
// We added all new services and removed old ones, so let's reload the units // We added all new services and removed old ones, so let's reload the units
// to tell systemd about them. // to tell systemd about them.
log::info!("Reloading the systemd daemon..."); log::info!("Reloading the systemd daemon...");
service_manager.daemon_reload()?; service_manager.daemon_reload()?;
start_services(&service_manager, &linked_services, &timeout)?; let active_targets = get_active_targets(&service_manager);
let services_to_reload = get_services_to_reload(services, old_services);
let job_monitor = service_manager.monitor_jobs_init()?;
wait_for_jobs(
&service_manager,
job_monitor,
reload_services(&service_manager, &services_to_reload)?
+ start_units(&service_manager, active_targets?)?,
&timeout,
)?;
log::info!("Done"); log::info!("Done");
Ok(()) Ok(())
} }
fn get_active_targets(
service_manager: &systemd::ServiceManager,
) -> Result<Vec<systemd::UnitStatus>> {
// We exclude some targets that we do not want to start
let excluded_targets: HashSet<String> =
["suspend.target", "hibernate.target", "hybrid-sleep.target"]
.iter()
.map(ToOwned::to_owned)
.collect();
Ok(service_manager
.list_units_by_patterns(&["active", "activating"], &[])?
.into_iter()
.filter(|unit| {
unit.name.ends_with(".target")
&& !excluded_targets.contains(&unit.name)
&& !service_manager
.unit_manager(unit)
.refuse_manual_start()
.unwrap_or_else(|e| {
log::error!("Error communicating with DBus: {}", e);
true
})
})
.collect())
}
fn get_services_to_reload(services: Services, old_services: Services) -> Services {
let mut services_to_reload = services.intersection(old_services.clone());
services_to_reload.retain(|name, service| {
if let Some(old_service) = old_services.get(name) {
service.store_path != old_service.store_path
} else {
// Since we run this on the intersection, this should never happen
panic!("Something went terribly wrong!");
}
});
services_to_reload
}
fn verify_systemd_dir(ephemeral: bool) -> Result<()> {
if ephemeral {
let system_dir = Path::new(path::MAIN_SEPARATOR_STR)
.join("run")
.join("systemd")
.join("system");
if system_dir.exists()
&& !system_dir.is_symlink()
&& system_dir.is_dir()
&& system_dir.read_dir()?.next().is_some()
{
anyhow::bail!(
"The directory {} exists and is not empty, we cannot symlink it.",
system_dir.display()
);
} else if system_dir.exists() {
if !system_dir.is_symlink() && system_dir.is_dir() {
fs::remove_dir(&system_dir).with_context(|| {
format!(
"Error while removing the empty dir at {}",
system_dir.display()
)
})?;
} else {
fs::remove_file(&system_dir).with_context(|| {
format!(
"Error while removing the symlink at {}",
system_dir.display()
)
})?;
}
}
let target = etc_dir(ephemeral).join("systemd").join("system");
create_link(&target, &system_dir).with_context(|| {
format!(
"Error while creating symlink: {} -> {}",
system_dir.display(),
target.display(),
)
})?;
}
Ok(())
}
pub fn deactivate() -> Result<()> { pub fn deactivate() -> Result<()> {
let old_linked_services = read_linked_services()?; let old_services = read_saved_services()?;
log::debug!("{:?}", old_linked_services); log::debug!("{:?}", old_services);
let service_manager = systemd::ServiceManager::new_session()?; let service_manager = systemd::ServiceManager::new_session()?;
let job_monitor = service_manager.monitor_jobs_init()?;
let timeout = Some(Duration::from_secs(30)); let timeout = Some(Duration::from_secs(30));
// We need to do this before we reload the systemd daemon, so that the daemon // We need to do this before we reload the systemd daemon, so that the daemon
// still knows about these units. // still knows about these units.
stop_services(&service_manager, &old_linked_services, &timeout)?; wait_for_jobs(
unlink_services(&old_linked_services)?; &service_manager,
job_monitor,
stop_services(&service_manager, &old_services)?,
&timeout,
)?;
// We added all new services and removed old ones, so let's reload the units // We removed all old services, so let's reload the units so that
// to tell systemd about them. // the systemd daemon is up-to-date
log::info!("Reloading the systemd daemon..."); log::info!("Reloading the systemd daemon...");
service_manager.daemon_reload()?; service_manager.daemon_reload()?;
serialise_linked_services(&HashMap::new())?; serialise_saved_services(&HashMap::new())?;
log::info!("Done"); log::info!("Done");
Ok(()) Ok(())
} }
fn unlink_services(services: &LinkedServices) -> Result<()> {
services
.values()
.try_for_each(|linked_service| remove_link(&linked_service.linked_path()))
}
fn link_services(services: Services, ephemeral: bool) -> Result<LinkedServices> {
let systemd_system_dir = systemd_system_dir(ephemeral);
services.iter().try_fold(
HashMap::new(),
|mut linked_services, (name, service_config)| {
let linked_path = systemd_system_dir.join(name);
match create_store_link(&service_config.store_path, &linked_path) {
Ok(_) => {
linked_services.insert(
name.to_owned(),
LinkedServiceConfig::new(service_config.to_owned(), linked_path),
);
}
Err(e) => {
log::error!("Error linking service {name}, skipping.");
log::error!("{:?}", e);
}
};
Ok(linked_services)
},
)
}
fn systemd_system_dir(ephemeral: bool) -> PathBuf {
(if ephemeral {
Path::new("/run")
} else {
Path::new("/etc")
})
.join("systemd")
.join("system")
}
// FIXME: we should probably lock this file to avoid concurrent writes // FIXME: we should probably lock this file to avoid concurrent writes
fn serialise_linked_services(linked_services: &LinkedServices) -> Result<()> { fn serialise_saved_services(services: &Services) -> Result<()> {
let state_file = Path::new(SYSTEM_MANAGER_STATE_DIR).join(SERVICES_STATE_FILE_NAME); let state_file = Path::new(SYSTEM_MANAGER_STATE_DIR).join(SERVICES_STATE_FILE_NAME);
DirBuilder::new() DirBuilder::new()
.recursive(true) .recursive(true)
@ -162,11 +203,11 @@ fn serialise_linked_services(linked_services: &LinkedServices) -> Result<()> {
log::info!("Writing state info into file: {}", state_file.display()); log::info!("Writing state info into file: {}", state_file.display());
let writer = io::BufWriter::new(fs::File::create(state_file)?); let writer = io::BufWriter::new(fs::File::create(state_file)?);
serde_json::to_writer(writer, linked_services)?; serde_json::to_writer(writer, services)?;
Ok(()) Ok(())
} }
fn read_linked_services() -> Result<LinkedServices> { fn read_saved_services() -> Result<Services> {
let state_file = Path::new(SYSTEM_MANAGER_STATE_DIR).join(SERVICES_STATE_FILE_NAME); let state_file = Path::new(SYSTEM_MANAGER_STATE_DIR).join(SERVICES_STATE_FILE_NAME);
DirBuilder::new() DirBuilder::new()
.recursive(true) .recursive(true)
@ -186,47 +227,29 @@ fn read_linked_services() -> Result<LinkedServices> {
Ok(HashMap::default()) Ok(HashMap::default())
} }
fn start_services(
service_manager: &systemd::ServiceManager,
services: &LinkedServices,
timeout: &Option<Duration>,
) -> Result<()> {
for_each_service(
|s| service_manager.start_unit(s),
service_manager,
services,
timeout,
"restarting",
)
}
fn stop_services( fn stop_services(
service_manager: &systemd::ServiceManager, service_manager: &systemd::ServiceManager,
services: &LinkedServices, services: &Services,
timeout: &Option<Duration>, ) -> Result<HashSet<JobId>> {
) -> Result<()> { for_each_service(|s| service_manager.stop_unit(s), services, "stopping")
for_each_service( }
|s| service_manager.stop_unit(s),
service_manager, fn reload_services(
services, service_manager: &systemd::ServiceManager,
timeout, services: &Services,
"stopping", ) -> Result<HashSet<JobId>> {
) for_each_service(|s| service_manager.reload_unit(s), services, "reloading")
} }
fn for_each_service<F, R>( fn for_each_service<F, R>(
action: F, action: F,
service_manager: &systemd::ServiceManager, services: &Services,
services: &LinkedServices,
timeout: &Option<Duration>,
log_action: &str, log_action: &str,
) -> Result<()> ) -> Result<HashSet<JobId>>
where where
F: Fn(&str) -> Result<R>, F: Fn(&str) -> Result<R>,
{ {
let job_monitor = service_manager.monitor_jobs_init()?; let successful_services: HashSet<JobId> =
let successful_services: HashSet<String> =
services services
.clone() .clone()
.into_iter() .into_iter()
@ -234,7 +257,7 @@ where
match action(&service) { match action(&service) {
Ok(_) => { Ok(_) => {
log::info!("Service {}: {}...", service, log_action); log::info!("Service {}: {}...", service, log_action);
set.insert(service); set.insert(JobId { id: service });
set set
} }
Err(e) => { Err(e) => {
@ -246,11 +269,67 @@ where
} }
} }
}); });
// TODO: do we want to propagate unit failures here in some way?
Ok(successful_services)
}
if !service_manager.monitor_jobs_finish(job_monitor, timeout, successful_services)? { fn start_units(
anyhow::bail!("Timeout waiting for systemd jobs"); service_manager: &systemd::ServiceManager,
units: Vec<systemd::UnitStatus>,
) -> Result<HashSet<JobId>> {
for_each_unit(|s| service_manager.start_unit(&s.name), units, "restarting")
}
fn for_each_unit<F, R>(
action: F,
units: Vec<systemd::UnitStatus>,
log_action: &str,
) -> Result<HashSet<JobId>>
where
F: Fn(&systemd::UnitStatus) -> Result<R>,
{
let successful_services: HashSet<JobId> =
units
.into_iter()
.fold(HashSet::new(), |mut set, unit| match action(&unit) {
Ok(_) => {
log::info!("Unit {}: {}...", unit.name, log_action);
set.insert(JobId { id: unit.name });
set
} }
Err(e) => {
log::error!(
"Service {}: error {log_action}, please consult the logs",
unit.name
);
log::error!("{e}");
set
}
});
// TODO: do we want to propagate unit failures here in some way? // TODO: do we want to propagate unit failures here in some way?
Ok(successful_services)
}
fn wait_for_jobs(
service_manager: &systemd::ServiceManager,
job_monitor: systemd::JobMonitor,
jobs: HashSet<JobId>,
timeout: &Option<Duration>,
) -> Result<()> {
if !service_manager.monitor_jobs_finish(job_monitor, timeout, jobs)? {
anyhow::bail!("Timeout waiting for systemd jobs");
}
Ok(()) Ok(())
} }
#[derive(PartialEq, Eq, Hash, Clone)]
struct JobId {
id: String,
}
impl From<JobId> for String {
fn from(value: JobId) -> Self {
value.id
}
}

View file

@ -17,7 +17,7 @@ const SERVICES_STATE_FILE_NAME: &str = "services.json";
const ETC_STATE_FILE_NAME: &str = "etc-files.json"; const ETC_STATE_FILE_NAME: &str = "etc-files.json";
const SYSTEM_MANAGER_STATIC_NAME: &str = ".system-manager-static"; const SYSTEM_MANAGER_STATIC_NAME: &str = ".system-manager-static";
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
#[serde(from = "String", into = "String", rename_all = "camelCase")] #[serde(from = "String", into = "String", rename_all = "camelCase")]
pub struct StorePath { pub struct StorePath {
pub store_path: PathBuf, pub store_path: PathBuf,
@ -92,6 +92,14 @@ fn remove_dir(from: &Path) -> Result<()> {
Ok(()) Ok(())
} }
pub fn etc_dir(ephemeral: bool) -> PathBuf {
if ephemeral {
Path::new("/run").join("etc")
} else {
PathBuf::from("/etc")
}
}
pub fn compose<A, B, C, G, F>(f: F, g: G) -> impl Fn(A) -> C pub fn compose<A, B, C, G, F>(f: F, g: G) -> impl Fn(A) -> C
where where
F: Fn(B) -> C, F: Fn(B) -> C,

View file

@ -243,7 +243,12 @@ fn check_root() -> Result<()> {
fn handle_toplevel_error<T>(r: Result<T>) -> ExitCode { fn handle_toplevel_error<T>(r: Result<T>) -> ExitCode {
if let Err(e) = r { if let Err(e) = r {
log::error!("{e}"); let out: String = itertools::intersperse(
e.chain().map(ToString::to_string),
"\nCaused by:\n ".to_owned(),
)
.collect();
log::error!("{out}");
return ExitCode::FAILURE; return ExitCode::FAILURE;
} }
ExitCode::SUCCESS ExitCode::SUCCESS

View file

@ -152,7 +152,7 @@ impl ServiceManager {
let job_names_clone = Arc::clone(&job_names); let job_names_clone = Arc::clone(&job_names);
let token = self.proxy.match_signal( let token = self.proxy.match_signal(
move |h: OrgFreedesktopSystemd1ManagerJobRemoved, _: &Connection, _: &Message| { move |h: OrgFreedesktopSystemd1ManagerJobRemoved, _: &Connection, _: &Message| {
log::info!("Job for {} done", h.unit); log::debug!("Job for {} done", h.unit);
{ {
// Insert a new name, and let the lock go out of scope immediately // Insert a new name, and let the lock go out of scope immediately
job_names_clone.lock().unwrap().insert(h.unit); job_names_clone.lock().unwrap().insert(h.unit);
@ -170,6 +170,8 @@ impl ServiceManager {
/// Waits for the monitored jobs to finish. Returns `true` if all jobs /// Waits for the monitored jobs to finish. Returns `true` if all jobs
/// finished before the timeout, `false` otherwise. /// finished before the timeout, `false` otherwise.
/// It is important that we consume the job monitor, since we remove the signal handler at the
/// end of this call, and so the job monitor cannot be re-used.
pub fn monitor_jobs_finish<I>( pub fn monitor_jobs_finish<I>(
&self, &self,
job_monitor: JobMonitor, job_monitor: JobMonitor,
@ -186,7 +188,7 @@ impl ServiceManager {
let total_jobs = waiting_for.len(); let total_jobs = waiting_for.len();
if total_jobs > 0 { if total_jobs > 0 {
log::info!("Waiting for jobs to finish..."); log::info!("Waiting for jobs to finish... (0/{})", total_jobs);
} }
while !waiting_for.is_empty() { while !waiting_for.is_empty() {
@ -198,13 +200,13 @@ impl ServiceManager {
{ {
return Ok(false); return Ok(false);
} }
{
let mut job_names = job_monitor.job_names.lock().unwrap(); let mut job_names = job_monitor.job_names.lock().unwrap();
if !job_names.is_empty() { if !job_names.is_empty() {
waiting_for = waiting_for.difference(job_names.clone()); waiting_for = waiting_for.relative_complement(job_names.clone());
*job_names = im::HashSet::new(); *job_names = im::HashSet::new();
log::debug!( if !waiting_for.is_empty() {
"Waiting for jobs to finish... ({:?}/{:?})", log::info!(
"Waiting for jobs to finish... ({}/{})",
total_jobs - waiting_for.len(), total_jobs - waiting_for.len(),
total_jobs total_jobs
); );