diff --git a/crates/shim/Cargo.toml b/crates/shim/Cargo.toml index df0b7ecf..8fb76206 100644 --- a/crates/shim/Cargo.toml +++ b/crates/shim/Cargo.toml @@ -72,12 +72,8 @@ tokio = { workspace = true, features = ["full"], optional = true } [target.'cfg(target_os = "linux")'.dependencies] cgroups-rs.workspace = true -[target.'cfg(unix)'.dependencies] -command-fds = "0.3.0" - [target.'cfg(windows)'.dependencies] mio = { version = "1.0", features = ["os-ext", "os-poll"] } -os_pipe.workspace = true windows-sys = { version = "0.52.0", features = [ "Win32_Foundation", "Win32_System_WindowsProgramming", diff --git a/crates/shim/src/asynchronous/mod.rs b/crates/shim/src/asynchronous/mod.rs index b1529af2..53335e27 100644 --- a/crates/shim/src/asynchronous/mod.rs +++ b/crates/shim/src/asynchronous/mod.rs @@ -20,7 +20,7 @@ use std::{ io::Read, os::unix::{fs::FileTypeExt, net::UnixListener}, path::Path, - process::{self, Command, Stdio}, + process::{self, Command as StdCommand, Stdio}, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -28,7 +28,6 @@ use std::{ }; use async_trait::async_trait; -use command_fds::{CommandFdExt, FdMapping}; use containerd_shim_protos::{ api::DeleteResponse, protobuf::{well_known_types::any::Any, Message, MessageField}, @@ -50,7 +49,7 @@ use nix::{ }; use oci_spec::runtime::Features; use signal_hook_tokio::Signals; -use tokio::{io::AsyncWriteExt, sync::Notify}; +use tokio::{io::AsyncWriteExt, process::Command, sync::Notify}; use which::which; const DEFAULT_BINARY_NAME: &str = "runc"; @@ -61,7 +60,7 @@ use crate::{ error::{Error, Result}, logger, parse_sockaddr, reap, socket_address, util::{asyncify, read_file_to_str, write_str_to_file}, - Config, Flags, StartOpts, SOCKET_FD, TTRPC_ADDRESS, + Config, Flags, StartOpts, TTRPC_ADDRESS, }; pub mod monitor; @@ -142,7 +141,10 @@ pub fn run_info() -> Result { let binary_path = which(binary_name).unwrap(); // get features - let output = Command::new(binary_path).arg("features").output().unwrap(); + let output = StdCommand::new(binary_path) + .arg("features") + .output() + .unwrap(); let features: Features = serde_json::from_str(&String::from_utf8_lossy(&output.stdout))?; @@ -215,6 +217,12 @@ where Ok(()) } _ => { + if flags.socket.is_empty() { + return Err(Error::InvalidArgument(String::from( + "Shim socket cannot be empty", + ))); + } + if !config.no_setup_logger { logger::init( flags.debug, @@ -228,11 +236,15 @@ where let task = Box::new(shim.create_task_service(publisher).await) as Box; let task_service = create_task(Arc::from(task)); - let mut server = Server::new().register_service(task_service); - server = server.add_listener(SOCKET_FD)?; - server = server.set_domain_unix(); + let Some(mut server) = create_server_with_retry(&flags).await? else { + signal_server_started(); + return Ok(()); + }; + server = server.register_service(task_service); server.start().await?; + signal_server_started(); + info!("Shim successfully started, waiting for exit signal..."); tokio::spawn(async move { handle_signals(signals).await; @@ -296,38 +308,18 @@ pub async fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> let cwd = env::current_dir().map_err(io_error!(e, ""))?; let address = socket_address(&opts.address, &opts.namespace, grouping); - // Create socket and prepare listener. - // We'll use `add_listener` when creating TTRPC server. - let listener = match start_listener(&address).await { - Ok(l) => l, - Err(e) => { - if let Error::IoError { - err: ref io_err, .. - } = e - { - if io_err.kind() != std::io::ErrorKind::AddrInUse { - return Err(e); - }; - } - if let Ok(()) = wait_socket_working(&address, 5, 200).await { - write_str_to_file("address", &address).await?; - return Ok(address); - } - remove_socket(&address).await?; - start_listener(&address).await? - } - }; + // Activation pattern comes from the hcsshim: https://github.com/microsoft/hcsshim/blob/v0.10.0-rc.7/cmd/containerd-shim-runhcs-v1/serve.go#L57-L70 + // another way to do it would to create named pipe and pass it to the child process through handle inheritence but that would require duplicating + // the logic in Rust's 'command' for process creation. There is an issue in Rust to make it simplier to specify handle inheritence and this could + // be revisited once https://github.com/rust-lang/rust/issues/54760 is implemented. - // tokio::process::Command do not have method `fd_mappings`, - // and the `spawn()` is also not an async method, - // so we use the std::process::Command here let mut command = Command::new(cmd); - command .current_dir(cwd) - .stdout(Stdio::null()) + .stdout(Stdio::piped()) .stdin(Stdio::null()) .stderr(Stdio::null()) + .envs(vars) .args([ "-namespace", &opts.namespace, @@ -335,22 +327,67 @@ pub async fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> &opts.id, "-address", &opts.address, - ]) - .fd_mappings(vec![FdMapping { - parent_fd: listener.into(), - child_fd: SOCKET_FD, - }])?; + "-socket", + &address, + ]); + if opts.debug { command.arg("-debug"); } - command.envs(vars); - let _child = command.spawn().map_err(io_error!(e, "spawn shim"))?; + let mut child = command.spawn().map_err(io_error!(e, "spawn shim"))?; + #[cfg(target_os = "linux")] - crate::cgroup::set_cgroup_and_oom_score(_child.id())?; + crate::cgroup::set_cgroup_and_oom_score(child.id().unwrap())?; + + let mut reader = child.stdout.take().unwrap(); + tokio::io::copy(&mut reader, &mut tokio::io::stderr()) + .await + .unwrap(); + Ok(address) } +#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))] +async fn create_server(flags: &args::Flags) -> Result { + use std::os::fd::IntoRawFd; + let listener = start_listener(&flags.socket).await?; + let mut server = Server::new(); + server = server.add_listener(listener.into_raw_fd())?; + server = server.set_domain_unix(); + Ok(server) +} + +async fn create_server_with_retry(flags: &args::Flags) -> Result> { + // Really try to create a server. + let server = match create_server(flags).await { + Ok(server) => server, + Err(Error::IoError { err, .. }) if err.kind() == std::io::ErrorKind::AddrInUse => { + // If the address is already in use then make sure it is up and running and return the address + // This allows for running a single shim per container scenarios + if let Ok(()) = wait_socket_working(&flags.socket, 5, 200).await { + write_str_to_file("address", &flags.socket).await?; + return Ok(None); + } + remove_socket(&flags.socket).await?; + create_server(flags).await? + } + Err(e) => return Err(e), + }; + + Ok(Some(server)) +} + +fn signal_server_started() { + use libc::{dup2, STDERR_FILENO, STDOUT_FILENO}; + + unsafe { + if dup2(STDERR_FILENO, STDOUT_FILENO) < 0 { + panic!("Error closing pipe: {}", std::io::Error::last_os_error()) + } + } +} + #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))] fn setup_signals_tokio(config: &Config) -> Signals { if config.no_reaper { diff --git a/crates/shim/src/error.rs b/crates/shim/src/error.rs index 2d741f06..55117dd7 100644 --- a/crates/shim/src/error.rs +++ b/crates/shim/src/error.rs @@ -49,11 +49,6 @@ pub enum Error { #[error("Failed to setup logger: {0}")] Setup(#[from] log::SetLoggerError), - /// Unable to pass fd to child process (we rely on `command_fds` crate for this). - #[cfg(unix)] - #[error("Failed to pass socket fd to child: {0}")] - FdMap(#[from] command_fds::FdMappingCollision), - #[cfg(unix)] #[error("Nix error: {0}")] Nix(#[from] nix::Error), diff --git a/crates/shim/src/lib.rs b/crates/shim/src/lib.rs index 2edc290f..6459fcbf 100644 --- a/crates/shim/src/lib.rs +++ b/crates/shim/src/lib.rs @@ -18,10 +18,7 @@ use std::{fs::File, path::PathBuf}; #[cfg(unix)] -use std::{ - os::unix::{io::RawFd, net::UnixListener}, - path::Path, -}; +use std::{os::unix::net::UnixListener, path::Path}; pub use containerd_shim_protos as protos; #[cfg(unix)] @@ -151,12 +148,6 @@ pub struct StartOpts { pub debug: bool, } -/// The shim process communicates with the containerd server through a communication channel -/// created by containerd. One endpoint of the communication channel is passed to shim process -/// through a file descriptor during forking, which is the fourth(3) file descriptor. -#[cfg(unix)] -const SOCKET_FD: RawFd = 3; - #[cfg(target_os = "linux")] pub const SOCKET_ROOT: &str = "/run/containerd"; diff --git a/crates/shim/src/synchronous/mod.rs b/crates/shim/src/synchronous/mod.rs index 4db42571..2629c65a 100644 --- a/crates/shim/src/synchronous/mod.rs +++ b/crates/shim/src/synchronous/mod.rs @@ -75,8 +75,7 @@ use crate::{ }; cfg_unix! { - use crate::{SOCKET_FD, parse_sockaddr}; - use command_fds::{CommandFdExt, FdMapping}; + use crate::parse_sockaddr; use libc::{SIGCHLD, SIGINT, SIGPIPE, SIGTERM}; use nix::{ errno::Errno, @@ -252,6 +251,12 @@ where Ok(()) } _ => { + if flags.socket.is_empty() { + return Err(Error::InvalidArgument(String::from( + "Shim socket cannot be empty", + ))); + } + #[cfg(windows)] util::setup_debugger_event(); @@ -268,11 +273,13 @@ where let task = Box::new(shim.create_task_service(publisher)) as Box; let task_service = create_task(Arc::from(task)); - let mut server = create_server(flags)?; + let Some(mut server) = create_server_with_retry(&flags)? else { + signal_server_started(); + return Ok(()); + }; server = server.register_service(task_service); server.start()?; - #[cfg(windows)] signal_server_started(); info!("Shim successfully started, waiting for exit signal..."); @@ -292,22 +299,44 @@ where } } +#[cfg(windows)] #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))] -fn create_server(_flags: args::Flags) -> Result { +fn create_server(flags: &args::Flags) -> Result { + start_listener(&flags.socket).map_err(io_error!(e, "starting listener"))?; let mut server = Server::new(); + server = server.bind(&flags.socket)?; + Ok(server) +} - #[cfg(unix)] - { - server = server.add_listener(SOCKET_FD)?; - } +#[cfg(unix)] +#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))] +fn create_server(flags: &args::Flags) -> Result { + use std::os::fd::IntoRawFd; + let listener = start_listener(&flags.socket).map_err(io_error!(e, "starting listener"))?; + let mut server = Server::new(); + server = server.add_listener(listener.into_raw_fd())?; + Ok(server) +} - #[cfg(windows)] - { - let address = socket_address(&_flags.address, &_flags.namespace, &_flags.id); - server = server.bind(address.as_str())?; - } +#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))] +fn create_server_with_retry(flags: &args::Flags) -> Result> { + // Really try to create a server. + let server = match create_server(flags) { + Ok(server) => server, + Err(Error::IoError { err, .. }) if err.kind() == std::io::ErrorKind::AddrInUse => { + // If the address is already in use then make sure it is up and running and return the address + // This allows for running a single shim per container scenarios + if let Ok(()) = wait_socket_working(&flags.socket, 5, 200) { + write_address(&flags.socket)?; + return Ok(None); + } + remove_socket(&flags.socket)?; + create_server(flags)? + } + Err(e) => return Err(e), + }; - Ok(server) + Ok(Some(server)) } #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))] @@ -463,96 +492,47 @@ pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result let cwd = env::current_dir().map_err(io_error!(e, ""))?; let address = socket_address(&opts.address, &opts.namespace, grouping); - // Create socket and prepare listener. - // On Linux, We'll use `add_listener` when creating TTRPC server, on Windows the value isn't used hence the clippy allow - // (see note below about activation process for windows) - #[allow(clippy::let_unit_value)] - let _listener = match start_listener(&address) { - Ok(l) => l, - Err(e) => { - if e.kind() != std::io::ErrorKind::AddrInUse { - return Err(Error::IoError { - context: "".to_string(), - err: e, - }); - }; - // If the address is already in use then make sure it is up and running and return the address - // This allows for running a single shim per container scenarios - if let Ok(()) = wait_socket_working(&address, 5, 200) { - write_address(&address)?; - return Ok((0, address)); - } - remove_socket(&address)?; - start_listener(&address).map_err(io_error!(e, ""))? - } - }; + // Activation pattern comes from the hcsshim: https://github.com/microsoft/hcsshim/blob/v0.10.0-rc.7/cmd/containerd-shim-runhcs-v1/serve.go#L57-L70 + // another way to do it would to create named pipe and pass it to the child process through handle inheritence but that would require duplicating + // the logic in Rust's 'command' for process creation. There is an issue in Rust to make it simplier to specify handle inheritence and this could + // be revisited once https://github.com/rust-lang/rust/issues/54760 is implemented. let mut command = Command::new(cmd); - command.current_dir(cwd).envs(vars).args([ - "-namespace", - &opts.namespace, - "-id", - &opts.id, - "-address", - &opts.address, - ]); + command + .current_dir(cwd) + .stdout(Stdio::piped()) + .stdin(Stdio::null()) + .stderr(Stdio::null()) + .envs(vars) + .args([ + "-namespace", + &opts.namespace, + "-id", + &opts.id, + "-address", + &opts.address, + "-socket", + &address, + ]); if opts.debug { command.arg("-debug"); } - #[cfg(unix)] - { - command - .stdout(Stdio::null()) - .stdin(Stdio::null()) - .stderr(Stdio::null()) - .fd_mappings(vec![FdMapping { - parent_fd: _listener.into(), - child_fd: SOCKET_FD, - }])?; - - command - .spawn() - .map_err(io_error!(e, "spawn shim")) - .map(|child| { - // Ownership of `listener` has been passed to child. - (child.id(), address) - }) - } - + // On Windows Rust currently sets the `HANDLE_FLAG_INHERIT` flag to true when using Command::spawn. + // When a child process is spawned by another process (containerd) the child process inherits the parent's stdin, stdout, and stderr handles. + // Due to the HANDLE_FLAG_INHERIT flag being set to true this will cause containerd to hand until the child process closes the handles. + // As a workaround we can Disables inheritance on the io pipe handles. + // This workaround comes from https://github.com/rust-lang/rust/issues/54760#issuecomment-1045940560 #[cfg(windows)] - { - // Activation pattern for Windows comes from the hcsshim: https://github.com/microsoft/hcsshim/blob/v0.10.0-rc.7/cmd/containerd-shim-runhcs-v1/serve.go#L57-L70 - // another way to do it would to create named pipe and pass it to the child process through handle inheritence but that would require duplicating - // the logic in Rust's 'command' for process creation. There is an issue in Rust to make it simplier to specify handle inheritence and this could - // be revisited once https://github.com/rust-lang/rust/issues/54760 is implemented. - let (mut reader, writer) = os_pipe::pipe().map_err(io_error!(e, "create pipe"))?; - let stdio_writer = writer.try_clone().unwrap(); - - command - .stdout(stdio_writer) - .stdin(Stdio::null()) - .stderr(Stdio::null()); - - // On Windows Rust currently sets the `HANDLE_FLAG_INHERIT` flag to true when using Command::spawn. - // When a child process is spawned by another process (containerd) the child process inherits the parent's stdin, stdout, and stderr handles. - // Due to the HANDLE_FLAG_INHERIT flag being set to true this will cause containerd to hand until the child process closes the handles. - // As a workaround we can Disables inheritance on the io pipe handles. - // This workaround comes from https://github.com/rust-lang/rust/issues/54760#issuecomment-1045940560 - disable_handle_inheritance(); - command - .spawn() - .map_err(io_error!(e, "spawn shim")) - .map(|child| { - // IMPORTANT: we must drop the writer and command to close up handles before we copy the reader to stderr - // AND the shim Start method must NOT write to stdout/stderr - drop(writer); - drop(command); - io::copy(&mut reader, &mut io::stderr()).unwrap(); - (child.id(), address) - }) - } + disable_handle_inheritance(); + + let mut child = command.spawn().map_err(io_error!(e, "spawn shim"))?; + + let mut reader = child.stdout.take().unwrap(); + std::io::copy(&mut reader, &mut std::io::stderr()).unwrap(); + + Ok((child.id(), address)) } #[cfg(windows)] @@ -591,6 +571,19 @@ fn signal_server_started() { } } +// This closes the stdout handle which was mapped to the stderr on the first invocation of the shim. +// This releases first process which will give containerd the address of the namedpipe. +#[cfg(unix)] +fn signal_server_started() { + use libc::{dup2, STDERR_FILENO, STDOUT_FILENO}; + + unsafe { + if dup2(STDERR_FILENO, STDOUT_FILENO) < 0 { + panic!("Error closing pipe: {}", std::io::Error::last_os_error()) + } + } +} + #[cfg(test)] mod tests { use std::thread;