use crate::ebpf::tracepoint::PerfEventBroadcast;
use aurae_ebpf_shared::{ForkedProcess, ProcessExit};
use std::time::SystemTime;
use std::{
    collections::{HashMap, VecDeque},
    sync::Arc,
    time::Duration,
};
use tokio::sync::Mutex;
#[cfg(not(test))]
pub fn now() -> SystemTime {
    SystemTime::now()
}
#[cfg(test)]
pub fn now() -> SystemTime {
    use test_helpers::mock_time;
    mock_time::now()
}
const PID_MAX: usize = 4194304;
pub trait ProcessInfo {
    fn get_nspid(&self, pid: i32) -> Option<i32>;
}
pub(crate) struct ProcfsProcessInfo {}
impl ProcessInfo for ProcfsProcessInfo {
    fn get_nspid(&self, pid: i32) -> Option<i32> {
        procfs::process::Process::new(pid)
            .and_then(|p| p.status())
            .ok()
            .and_then(|s| s.nspid)
            .and_then(|nspid| nspid.last().copied())
    }
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct Eviction {
    pid: i32,
    evict_at: SystemTime,
}
#[derive(Debug)]
pub struct ProcCache {
    cache: Arc<Mutex<HashMap<i32, i32>>>,
    evict_every: Duration,
    eviction_queue: Arc<Mutex<VecDeque<Eviction>>>,
    last_eviction: SystemTime,
}
impl ProcCache {
    pub fn new(
        evict_after: Duration,
        evict_every: Duration,
        process_fork_events: PerfEventBroadcast<ForkedProcess>,
        process_exit_events: PerfEventBroadcast<ProcessExit>,
        proc_info: impl ProcessInfo + Send + 'static + Sync,
    ) -> Self {
        let res = Self {
            cache: Arc::new(Mutex::new(HashMap::with_capacity(PID_MAX))),
            evict_every,
            eviction_queue: Arc::new(Mutex::new(VecDeque::with_capacity(
                PID_MAX,
            ))),
            last_eviction: SystemTime::UNIX_EPOCH,
        };
        let mut process_fork_rx = process_fork_events.subscribe();
        let cache_for_fork_event_processing = res.cache.clone();
        let _ignored = tokio::spawn(async move {
            while let Ok(e) = process_fork_rx.recv().await {
                if let Some(nspid) = proc_info.get_nspid(e.child_pid) {
                    let mut guard =
                        cache_for_fork_event_processing.lock().await;
                    let _ = guard.insert(e.child_pid, nspid);
                }
            }
        });
        let mut process_exit_rx = process_exit_events.subscribe();
        let eviction_queue_for_exit_event_processing =
            res.eviction_queue.clone();
        let _ignored = tokio::spawn(async move {
            while let Ok(e) = process_exit_rx.recv().await {
                let mut guard =
                    eviction_queue_for_exit_event_processing.lock().await;
                guard.push_back(Eviction {
                    pid: e.pid,
                    evict_at: now()
                        .checked_add(evict_after)
                        .expect("SystemTime overflow"),
                })
            }
        });
        res
    }
    pub async fn get(&self, pid: i32) -> Option<i32> {
        if self
            .last_eviction
            .checked_add(self.evict_every)
            .expect("SystemTime overflow")
            <= now()
        {
            self.evict_expired().await;
        }
        let guard = self.cache.lock().await;
        guard.get(&pid).copied()
    }
    async fn evict_expired(&self) {
        let now = now();
        let mut queue_guard = self.eviction_queue.lock().await;
        let mut evict = Vec::with_capacity(64);
        while let Some(_v) = queue_guard.front().filter(|v| v.evict_at <= now) {
            evict.push(queue_guard.pop_front().expect(
                "the let Some(v) binding guarantees that thsi option is set",
            ))
        }
        drop(queue_guard);
        let mut cache_guard = self.cache.lock().await;
        for e in evict {
            _ = cache_guard.remove(&e.pid);
        }
    }
    #[cfg(test)]
    async fn eviction_queue(&self) -> VecDeque<Eviction> {
        let guard = self.eviction_queue.lock().await;
        guard.clone()
    }
}
#[cfg(test)]
mod test {
    use super::*;
    use crate::ebpf::tracepoint::PerfEventBroadcast;
    use crate::observe::proc_cache::ForkedProcess;
    use serial_test::serial;
    use test_helpers::assert_eventually_eq;
    use test_helpers::mock_time;
    use tokio::sync::broadcast::{channel, Sender};
    struct TestProcessInfo {
        nspid_lookup: HashMap<i32, i32>,
    }
    impl TestProcessInfo {
        fn new(test_data: Vec<(i32, i32)>) -> Self {
            let mut nspid_lookup = HashMap::new();
            for (pid, nspid) in test_data {
                _ = nspid_lookup.insert(pid, nspid);
            }
            Self { nspid_lookup }
        }
    }
    impl ProcessInfo for TestProcessInfo {
        fn get_nspid(&self, pid: i32) -> Option<i32> {
            self.nspid_lookup.get(&pid).copied()
        }
    }
    #[tokio::test]
    async fn must_returm_none_for_non_existing_process() {
        let (cache, _, _) = cache_for_testing(
            Duration::from_secs(5),
            Duration::from_secs(5),
            vec![],
        );
        assert_eq!(cache.get(123).await, None);
    }
    #[tokio::test]
    #[serial] async fn must_create_cache_entry_for_a_new_process() {
        let (cache, fork_tx, _) = cache_for_testing(
            Duration::from_secs(5),
            Duration::from_secs(5),
            vec![(42, 2)],
        );
        let _ = fork_tx
            .send(ForkedProcess { parent_pid: 1, child_pid: 42 })
            .expect("error sending msg");
        assert_eventually_eq!(cache.get(42).await, Some(2));
    }
    #[tokio::test]
    #[serial] async fn must_mark_entry_for_eviction_when_a_process_exits() {
        mock_time::reset();
        let (cache, fork_tx, exit_tx) = cache_for_testing(
            Duration::from_secs(5),
            Duration::from_secs(5),
            vec![(42, 2), (43, 3), (44, 4)],
        );
        let _ = fork_tx.send(ForkedProcess { parent_pid: 1, child_pid: 42 });
        let _ = fork_tx.send(ForkedProcess { parent_pid: 1, child_pid: 43 });
        let _ = fork_tx.send(ForkedProcess { parent_pid: 1, child_pid: 44 });
        let _ = exit_tx.send(ProcessExit { pid: 42 });
        assert_eventually_eq!(cache.get(42).await, Some(2));
        mock_time::advance_time(Duration::from_secs(5));
        let _ = exit_tx.send(ProcessExit { pid: 44 });
        assert_eventually_eq!(
            cache.eviction_queue().await,
            vec![
                Eviction { pid: 42, evict_at: seconds_after_unix_epoch(5) },
                Eviction { pid: 44, evict_at: seconds_after_unix_epoch(10) }
            ],
        );
    }
    #[tokio::test]
    #[serial] async fn must_evict_expired_entries_from_cache_on_get() {
        mock_time::reset();
        let (cache, fork_tx, exit_tx) = cache_for_testing(
            Duration::from_secs(5),
            Duration::from_secs(5),
            vec![(42, 2), (43, 3), (44, 4), (45, 5)],
        );
        let _ = fork_tx.send(ForkedProcess { parent_pid: 1, child_pid: 42 });
        let _ = fork_tx.send(ForkedProcess { parent_pid: 1, child_pid: 43 });
        let _ = fork_tx.send(ForkedProcess { parent_pid: 1, child_pid: 44 });
        let _ = fork_tx.send(ForkedProcess { parent_pid: 1, child_pid: 45 });
        let _ = exit_tx.send(ProcessExit { pid: 42 });
        assert_eventually_eq!(
            cache.eviction_queue().await,
            vec![Eviction { pid: 42, evict_at: seconds_after_unix_epoch(5) }],
        );
        mock_time::advance_time(Duration::from_secs(2));
        let _ = exit_tx.send(ProcessExit { pid: 44 });
        assert_eventually_eq!(
            cache.eviction_queue().await,
            vec![
                Eviction { pid: 42, evict_at: seconds_after_unix_epoch(5) }, Eviction { pid: 44, evict_at: seconds_after_unix_epoch(7) }, ],
        );
        mock_time::advance_time(Duration::from_secs(5));
        let _ = exit_tx.send(ProcessExit { pid: 45 });
        assert_eventually_eq!(
            cache.eviction_queue().await,
            vec![
                Eviction { pid: 42, evict_at: seconds_after_unix_epoch(5) }, Eviction { pid: 44, evict_at: seconds_after_unix_epoch(7) }, Eviction { pid: 45, evict_at: seconds_after_unix_epoch(12) } ],
        );
        assert_eq!(cache.get(42).await, None); assert_eq!(cache.get(43).await, Some(3)); assert_eq!(cache.get(44).await, None); assert_eq!(cache.get(45).await, Some(5)); }
    #[tokio::test]
    #[serial] async fn must_honor_eviction_interval() {
        mock_time::reset();
        let (cache, fork_tx, exit_tx) = cache_for_testing(
            Duration::from_secs(5),
            Duration::from_secs(60), vec![(42, 2), (43, 3), (44, 4), (45, 5)],
        );
        let _ = cache.get(1).await; let _ = fork_tx.send(ForkedProcess { parent_pid: 1, child_pid: 42 }); let _ = exit_tx.send(ProcessExit { pid: 42 }); assert_eventually_eq!(
            cache.eviction_queue().await,
            vec![Eviction { pid: 42, evict_at: seconds_after_unix_epoch(5) }],
        );
        mock_time::advance_time(Duration::from_secs(6)); let _ = cache.get(1).await; assert_eventually_eq!(
            cache.eviction_queue().await,
            vec![Eviction { pid: 42, evict_at: seconds_after_unix_epoch(5) }]
        ); }
    fn seconds_after_unix_epoch(seconds: u64) -> SystemTime {
        SystemTime::UNIX_EPOCH
            .checked_add(Duration::from_secs(seconds))
            .unwrap()
    }
    fn cache_for_testing(
        expire_after: Duration,
        evict_every: Duration,
        test_data: Vec<(i32, i32)>,
    ) -> (ProcCache, Sender<ForkedProcess>, Sender<ProcessExit>) {
        let (fork_tx, _fork_rx) = channel(4);
        let fork_broadcaster = PerfEventBroadcast::new(fork_tx.clone());
        let (exit_tx, _exit_rx) = channel::<ProcessExit>(4);
        let exit_broadcaster = PerfEventBroadcast::new(exit_tx.clone());
        let test_proc_info = TestProcessInfo::new(test_data);
        let cache = ProcCache::new(
            expire_after,
            evict_every,
            fork_broadcaster,
            exit_broadcaster,
            test_proc_info,
        );
        (cache, fork_tx, exit_tx)
    }
}