Work on properly implementing the activation of systemd services.

This commit is contained in:
R-VdP 2023-02-09 15:04:27 +00:00
parent c3be9ceb19
commit 9aa059887b
No known key found for this signature in database
10 changed files with 205 additions and 86 deletions

View file

@ -2,11 +2,14 @@ use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fs::DirBuilder;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::time::Duration;
use std::{fs, io, str};
use super::{create_store_link, systemd, StorePath, SERVICE_MANAGER_STATE_DIR, SYSTEMD_UNIT_DIR};
use super::{
create_store_link, remove_store_link, systemd, StorePath, STATE_FILE_NAME, SYSTEMD_UNIT_DIR,
SYSTEM_MANAGER_STATE_DIR,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@ -22,53 +25,82 @@ type Services = HashMap<String, ServiceConfig>;
struct LinkedServiceConfig {
#[serde(flatten)]
service_config: ServiceConfig,
linked_path: String,
#[serde(rename = "linkedPath")]
path: String,
}
impl LinkedServiceConfig {
fn linked_path(&self) -> PathBuf {
PathBuf::from(self.path.to_owned())
}
fn new(service_config: ServiceConfig, path: PathBuf) -> Result<Self> {
if let Some(path) = path.to_str() {
return Ok(LinkedServiceConfig {
service_config,
path: String::from(path),
});
}
anyhow::bail!("Could not decode path")
}
}
type LinkedServices = HashMap<String, LinkedServiceConfig>;
pub fn activate(store_path: StorePath) -> Result<()> {
log::info!("Activating service-manager profile: {}", store_path);
log::info!("Activating system-manager profile: {}", store_path);
log::debug!("{:?}", read_linked_services()?);
let old_linked_services = read_linked_services()?;
log::debug!("{:?}", old_linked_services);
log::info!("Reading service definitions...");
let file = fs::File::open(store_path.store_path + "/services/services.json")?;
let reader = io::BufReader::new(file);
let services: Services = serde_json::from_reader(reader)?;
let linked_services = link_services(services);
let linked_services = link_services(services)?;
serialise_linked_services(&linked_services)?;
let services_to_stop = old_linked_services
.into_iter()
.filter(|(name, _)| !linked_services.contains_key(name))
.collect();
let service_manager = systemd::ServiceManager::new_session()?;
start_services(
&service_manager,
linked_services,
&Some(Duration::from_secs(30)),
)?;
let timeout = Some(Duration::from_secs(30));
service_manager.daemon_reload()?;
stop_services(&service_manager, &services_to_stop, &timeout)?;
unlink_services(&services_to_stop)?;
start_services(&service_manager, &linked_services, &timeout)?;
log::info!("Done");
Ok(())
}
fn link_services(services: Services) -> LinkedServices {
services.iter().fold(
fn unlink_services(services: &LinkedServices) -> Result<()> {
services
.values()
.try_for_each(|linked_service| remove_store_link(linked_service.linked_path().as_path()))
}
fn link_services(services: Services) -> Result<LinkedServices> {
services.iter().try_fold(
HashMap::with_capacity(services.len()),
|mut linked_services, (name, service_config)| {
let linked_path = format!("{}/{}", SYSTEMD_UNIT_DIR, name);
match create_store_link(&service_config.store_path, Path::new(&linked_path)) {
let linked_path = PathBuf::from(format!("{}/{}", SYSTEMD_UNIT_DIR, name));
match create_store_link(&service_config.store_path, linked_path.as_path()) {
Ok(_) => {
linked_services.insert(
name.to_owned(),
LinkedServiceConfig {
service_config: service_config.to_owned(),
linked_path,
},
LinkedServiceConfig::new(service_config.to_owned(), linked_path)?,
);
linked_services
Ok(linked_services)
}
e @ Err(_) => {
log::error!("Error linking service {}, skipping.", name);
log::error!("{:?}", e);
linked_services
Ok(linked_services)
}
}
},
@ -77,10 +109,10 @@ fn link_services(services: Services) -> LinkedServices {
// FIXME: we should probably lock this file to avoid concurrent writes
fn serialise_linked_services(linked_services: &LinkedServices) -> Result<()> {
let state_file = format!("{}/services.json", SERVICE_MANAGER_STATE_DIR);
let state_file = format!("{}/{}", SYSTEM_MANAGER_STATE_DIR, STATE_FILE_NAME);
DirBuilder::new()
.recursive(true)
.create(SERVICE_MANAGER_STATE_DIR)?;
.create(SYSTEM_MANAGER_STATE_DIR)?;
log::info!("Writing state info into file: {}", state_file);
let writer = io::BufWriter::new(fs::File::create(state_file)?);
@ -89,41 +121,78 @@ fn serialise_linked_services(linked_services: &LinkedServices) -> Result<()> {
}
fn read_linked_services() -> Result<LinkedServices> {
let state_file = format!("{}/services.json", SERVICE_MANAGER_STATE_DIR);
let state_file = format!("{}/{}", SYSTEM_MANAGER_STATE_DIR, STATE_FILE_NAME);
DirBuilder::new()
.recursive(true)
.create(SERVICE_MANAGER_STATE_DIR)?;
.create(SYSTEM_MANAGER_STATE_DIR)?;
if Path::new(&state_file).is_file() {
log::info!("Reading state info from {}", state_file);
let reader = io::BufReader::new(fs::File::open(state_file)?);
let linked_services = serde_json::from_reader(reader)?;
return Ok(linked_services);
match serde_json::from_reader(reader) {
Ok(linked_services) => return Ok(linked_services),
Err(e) => {
log::error!("Error reading the state file, ignoring.");
log::error!("{:?}", e);
}
}
}
Ok(HashMap::default())
}
fn start_services(
service_manager: &systemd::ServiceManager,
services: LinkedServices,
services: &LinkedServices,
timeout: &Option<Duration>,
) -> Result<()> {
service_manager.daemon_reload()?;
for_each_service(
|s| service_manager.start_unit(s),
service_manager,
services,
timeout,
"restarting",
)
}
fn stop_services(
service_manager: &systemd::ServiceManager,
services: &LinkedServices,
timeout: &Option<Duration>,
) -> Result<()> {
for_each_service(
|s| service_manager.stop_unit(s),
service_manager,
services,
timeout,
"stopping",
)
}
fn for_each_service<F, R>(
f: F,
service_manager: &systemd::ServiceManager,
services: &LinkedServices,
timeout: &Option<Duration>,
log_action: &str,
) -> Result<()>
where
F: Fn(&str) -> Result<R>,
{
let job_monitor = service_manager.monitor_jobs_init()?;
let successful_services = services.keys().fold(
HashSet::with_capacity(services.len()),
|mut set, service| match service_manager.restart_unit(service) {
|mut set, service| match f(service) {
Ok(_) => {
log::info!("Restarting service {}...", service);
log::info!("Service {}: {}...", service, log_action);
set.insert(Box::new(service.to_owned()));
set
}
Err(e) => {
log::error!(
"Error restarting unit, please consult the logs: {}",
service
"Service {}: Error {}, please consult the logs",
service,
log_action
);
log::error!("{}", e);
set

View file

@ -17,14 +17,17 @@ pub fn generate(flake_uri: &str) -> Result<()> {
// FIXME: we should not hard-code the system here
let flake_attr = format!("{}.x86_64-linux", FLAKE_ATTR);
log::info!("Building new system-manager generation...");
log::info!("Running nix build...");
let store_path = run_nix_build(flake_uri, &flake_attr).and_then(get_store_path)?;
log::info!("Generating new generation from {}", store_path);
log::info!("Creating new generation from {}", store_path);
install_nix_profile(&store_path, PROFILE_PATH).map(print_out_and_err)?;
log::info!("Registering GC root...");
create_gcroot(GCROOT_PATH, PROFILE_PATH)?;
log::info!("Done");
Ok(())
}

View file

@ -9,10 +9,11 @@ use std::path::Path;
use std::{fs, str};
const FLAKE_ATTR: &str = "serviceConfig";
const PROFILE_PATH: &str = "/nix/var/nix/profiles/service-manager";
const GCROOT_PATH: &str = "/nix/var/nix/gcroots/service-manager-current";
const PROFILE_PATH: &str = "/nix/var/nix/profiles/system-manager";
const GCROOT_PATH: &str = "/nix/var/nix/gcroots/system-manager-current";
const SYSTEMD_UNIT_DIR: &str = "/run/systemd/system";
const SERVICE_MANAGER_STATE_DIR: &str = "/var/lib/service-manager/state";
const SYSTEM_MANAGER_STATE_DIR: &str = "/var/lib/system-manager/state";
const STATE_FILE_NAME: &str = "services.json";
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@ -42,6 +43,15 @@ fn create_store_link(store_path: &StorePath, from: &Path) -> Result<()> {
unix::fs::symlink(&store_path.store_path, from).map_err(anyhow::Error::from)
}
fn remove_store_link(from: &Path) -> Result<()> {
log::info!("Removing symlink: {}", from.display());
if from.is_symlink() {
fs::remove_file(from)?;
return Ok(());
}
anyhow::bail!("Not a symlink!")
}
pub fn compose<A, B, C, G, F>(f: F, g: G) -> impl Fn(A) -> C
where
F: Fn(B) -> C,

View file

@ -3,7 +3,7 @@ use std::process::ExitCode;
use anyhow::{anyhow, Result};
use clap::Parser;
use service_manager::StorePath;
use system_manager::StorePath;
#[derive(clap::Parser, Debug)]
#[command(author, version, about, long_about=None)]
@ -35,8 +35,8 @@ fn main() -> ExitCode {
fn go(action: Action) -> Result<()> {
check_root()?;
match action {
Action::Activate { store_path } => service_manager::activate::activate(store_path),
Action::Generate { flake_uri } => service_manager::generate::generate(&flake_uri),
Action::Activate { store_path } => system_manager::activate::activate(store_path),
Action::Generate { flake_uri } => system_manager::generate::generate(&flake_uri),
}
}

View file

@ -24,7 +24,6 @@ use std::{
atomic::{AtomicBool, Ordering},
Mutex,
},
thread,
time::{Duration, Instant},
};
@ -155,7 +154,6 @@ impl ServiceManager {
let job_names_clone = Arc::clone(&job_names);
let token = self.proxy.match_signal(
move |h: OrgFreedesktopSystemd1ManagerJobRemoved, _: &Connection, _: &Message| {
log_thread("Signal handling");
log::info!("Job for {} done", h.unit);
{
// Insert a new name, and let the lock go out of scope immediately
@ -184,17 +182,19 @@ impl ServiceManager {
I: IntoIterator,
I::Item: AsRef<String> + Eq + Hash,
{
log::info!("Waiting for jobs to finish...");
let start_time = Instant::now();
let mut waiting_for = services
let mut waiting_for: HashSet<String> = services
.into_iter()
.map(|n| String::from(n.as_ref()))
.collect::<HashSet<String>>();
.collect();
let total_jobs = waiting_for.len();
if total_jobs > 0 {
log::info!("Waiting for jobs to finish...");
}
while !waiting_for.is_empty() {
log_thread("Job handling");
self.proxy.connection.process(Duration::from_millis(50))?;
if timeout
@ -205,21 +205,25 @@ impl ServiceManager {
}
{
let mut job_names = job_monitor.job_names.lock().unwrap();
waiting_for = waiting_for
.iter()
.filter_map(|n| {
if job_names.contains(n) {
None
} else {
Some(n.to_owned())
}
})
.collect();
*job_names = HashSet::new();
log::debug!("{:?}/{:?}", waiting_for.len(), total_jobs);
if !job_names.is_empty() {
waiting_for = waiting_for
.difference(&job_names)
// FIXME can we avoid copying here?
.map(ToOwned::to_owned)
.collect();
*job_names = HashSet::new();
log::debug!(
"Waiting for jobs to finish... ({:?}/{:?})",
total_jobs - waiting_for.len(),
total_jobs
);
}
}
}
log::info!("All jobs finished.");
if total_jobs > 0 {
log::info!("All jobs finished.");
}
// Remove the signal handling callback
job_monitor
.tokens
@ -293,8 +297,3 @@ impl UnitManager<'_> {
Ok(OrgFreedesktopSystemd1Unit::refuse_manual_stop(&self.proxy)?)
}
}
fn log_thread(name: &str) {
let thread = thread::current();
log::debug!("{} thread: {:?} ({:?})", name, thread.name(), thread.id());
}