1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
/* -------------------------------------------------------------------------- *\
* Apache 2.0 License Copyright © 2022-2023 The Aurae Authors *
* *
* +--------------------------------------------+ *
* | █████╗ ██╗ ██╗██████╗ █████╗ ███████╗ | *
* | ██╔══██╗██║ ██║██╔══██╗██╔══██╗██╔════╝ | *
* | ███████║██║ ██║██████╔╝███████║█████╗ | *
* | ██╔══██║██║ ██║██╔══██╗██╔══██║██╔══╝ | *
* | ██║ ██║╚██████╔╝██║ ██║██║ ██║███████╗ | *
* | ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝ | *
* +--------------------------------------------+ *
* *
* Distributed Systems Runtime *
* *
* -------------------------------------------------------------------------- *
* *
* Licensed under the Apache License, Version 2.0 (the "License"); *
* you may not use this file except in compliance with the License. *
* You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, software *
* distributed under the License is distributed on an "AS IS" BASIS, *
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
* See the License for the specific language governing permissions and *
* limitations under the License. *
* *
\* -------------------------------------------------------------------------- */
use super::{cgroup_cache::CgroupCache, proc_cache::ProcCache};
use crate::ebpf::tracepoint::PerfEventBroadcast;
use aurae_ebpf_shared::{HasCgroup, HasHostPid};
use proto::observe::WorkloadType;
use std::{ffi::OsString, sync::Arc};
use tokio::sync::{
mpsc::{self, Receiver},
Mutex,
};
use tonic::Status;
const CGROUPFS_ROOT: &str = "/sys/fs/cgroup";
/// Wrapper around `PerfEventBroadvast<T>` that allows for filtering by
/// Aurae workloads and optionally maps host PIDs to namespace PIDs.
pub struct ObservedEventStream<'a, T> {
source: &'a PerfEventBroadcast<T>,
workload_filter: Option<(WorkloadType, String)>,
proc_cache: Option<Arc<Mutex<ProcCache>>>,
cgroup_cache: Arc<Mutex<CgroupCache>>,
}
impl<'a, T: HasCgroup + HasHostPid + Clone + Send + Sync + 'static>
ObservedEventStream<'a, T>
{
pub fn new(source: &'a PerfEventBroadcast<T>) -> Self {
Self {
source,
workload_filter: None,
proc_cache: None,
cgroup_cache: Arc::new(Mutex::new(CgroupCache::new(
OsString::from(CGROUPFS_ROOT),
))),
}
}
pub fn filter_by_workload(
&mut self,
workload: Option<(WorkloadType, String)>,
) -> &mut Self {
self.workload_filter = workload;
self
}
pub fn map_pids(&mut self, proc_cache: Arc<Mutex<ProcCache>>) -> &mut Self {
self.proc_cache = Some(proc_cache);
self
}
pub fn subscribe<E: Send + Sync + 'static>(
&self,
map_response: fn(T, i32) -> E,
) -> Receiver<Result<E, Status>> {
let (tx, rx) = mpsc::channel(4);
let (match_cgroup_path, cgroup_path) = match &self.workload_filter {
Some((WorkloadType::Cell, id)) => {
(true, format!("/sys/fs/cgroup/{id}/_"))
}
_ => (false, String::new()),
};
let mut events = self.source.subscribe();
let cgroup_thread_cache = self.cgroup_cache.clone();
let proc_thread_cache = self.proc_cache.as_ref().cloned();
let _ignored = tokio::spawn(async move {
while let Ok(event) = events.recv().await {
let accept = !match_cgroup_path || {
let mut cache = cgroup_thread_cache.lock().await;
cache
.get(event.cgroup_id())
.map(|path| path.eq_ignore_ascii_case(&cgroup_path))
.unwrap_or(false)
};
if accept {
let pid = if let Some(ref proc_cache) = proc_thread_cache {
let guard = proc_cache.lock().await;
guard
.get(event.host_pid())
.await
.unwrap_or_else(|| event.host_pid())
} else {
event.host_pid()
};
if tx.send(Ok(map_response(event, pid))).await.is_err() {
// receiver is gone
break;
}
}
}
});
rx
}
}