1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/* -------------------------------------------------------------------------- *\
 *        Apache 2.0 License Copyright © 2022-2023 The Aurae Authors          *
 *                                                                            *
 *                +--------------------------------------------+              *
 *                |   █████╗ ██╗   ██╗██████╗  █████╗ ███████╗ |              *
 *                |  ██╔══██╗██║   ██║██╔══██╗██╔══██╗██╔════╝ |              *
 *                |  ███████║██║   ██║██████╔╝███████║█████╗   |              *
 *                |  ██╔══██║██║   ██║██╔══██╗██╔══██║██╔══╝   |              *
 *                |  ██║  ██║╚██████╔╝██║  ██║██║  ██║███████╗ |              *
 *                |  ╚═╝  ╚═╝ ╚═════╝ ╚═╝  ╚═╝╚═╝  ╚═╝╚══════╝ |              *
 *                +--------------------------------------------+              *
 *                                                                            *
 *                         Distributed Systems Runtime                        *
 *                                                                            *
 * -------------------------------------------------------------------------- *
 *                                                                            *
 *   Licensed under the Apache License, Version 2.0 (the "License");          *
 *   you may not use this file except in compliance with the License.         *
 *   You may obtain a copy of the License at                                  *
 *                                                                            *
 *       http://www.apache.org/licenses/LICENSE-2.0                           *
 *                                                                            *
 *   Unless required by applicable law or agreed to in writing, software      *
 *   distributed under the License is distributed on an "AS IS" BASIS,        *
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
 *   See the License for the specific language governing permissions and      *
 *   limitations under the License.                                           *
 *                                                                            *
\* -------------------------------------------------------------------------- */
use super::{cgroup_cache::CgroupCache, proc_cache::ProcCache};
use crate::ebpf::tracepoint::PerfEventBroadcast;
use aurae_ebpf_shared::{HasCgroup, HasHostPid};
use proto::observe::WorkloadType;
use std::{ffi::OsString, sync::Arc};
use tokio::sync::{
    mpsc::{self, Receiver},
    Mutex,
};
use tonic::Status;

const CGROUPFS_ROOT: &str = "/sys/fs/cgroup";

/// Wrapper around `PerfEventBroadvast<T>` that allows for filtering by
/// Aurae workloads and optionally maps host PIDs to namespace PIDs.
pub struct ObservedEventStream<'a, T> {
    source: &'a PerfEventBroadcast<T>,
    workload_filter: Option<(WorkloadType, String)>,
    proc_cache: Option<Arc<Mutex<ProcCache>>>,
    cgroup_cache: Arc<Mutex<CgroupCache>>,
}

impl<'a, T: HasCgroup + HasHostPid + Clone + Send + Sync + 'static>
    ObservedEventStream<'a, T>
{
    pub fn new(source: &'a PerfEventBroadcast<T>) -> Self {
        Self {
            source,
            workload_filter: None,
            proc_cache: None,
            cgroup_cache: Arc::new(Mutex::new(CgroupCache::new(
                OsString::from(CGROUPFS_ROOT),
            ))),
        }
    }

    pub fn filter_by_workload(
        &mut self,
        workload: Option<(WorkloadType, String)>,
    ) -> &mut Self {
        self.workload_filter = workload;
        self
    }

    pub fn map_pids(&mut self, proc_cache: Arc<Mutex<ProcCache>>) -> &mut Self {
        self.proc_cache = Some(proc_cache);
        self
    }

    pub fn subscribe<E: Send + Sync + 'static>(
        &self,
        map_response: fn(T, i32) -> E,
    ) -> Receiver<Result<E, Status>> {
        let (tx, rx) = mpsc::channel(4);

        let (match_cgroup_path, cgroup_path) = match &self.workload_filter {
            Some((WorkloadType::Cell, id)) => {
                (true, format!("/sys/fs/cgroup/{id}/_"))
            }
            _ => (false, String::new()),
        };
        let mut events = self.source.subscribe();

        let cgroup_thread_cache = self.cgroup_cache.clone();
        let proc_thread_cache = self.proc_cache.as_ref().cloned();
        let _ignored = tokio::spawn(async move {
            while let Ok(event) = events.recv().await {
                let accept = !match_cgroup_path || {
                    let mut cache = cgroup_thread_cache.lock().await;
                    cache
                        .get(event.cgroup_id())
                        .map(|path| path.eq_ignore_ascii_case(&cgroup_path))
                        .unwrap_or(false)
                };
                if accept {
                    let pid = if let Some(ref proc_cache) = proc_thread_cache {
                        let guard = proc_cache.lock().await;
                        guard
                            .get(event.host_pid())
                            .await
                            .unwrap_or_else(|| event.host_pid())
                    } else {
                        event.host_pid()
                    };

                    if tx.send(Ok(map_response(event, pid))).await.is_err() {
                        // receiver is gone
                        break;
                    }
                }
            }
        });

        rx
    }
}