1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/* -------------------------------------------------------------------------- *\
 *        Apache 2.0 License Copyright © 2022-2023 The Aurae Authors          *
 *                                                                            *
 *                +--------------------------------------------+              *
 *                |   █████╗ ██╗   ██╗██████╗  █████╗ ███████╗ |              *
 *                |  ██╔══██╗██║   ██║██╔══██╗██╔══██╗██╔════╝ |              *
 *                |  ███████║██║   ██║██████╔╝███████║█████╗   |              *
 *                |  ██╔══██║██║   ██║██╔══██╗██╔══██║██╔══╝   |              *
 *                |  ██║  ██║╚██████╔╝██║  ██║██║  ██║███████╗ |              *
 *                |  ╚═╝  ╚═╝ ╚═════╝ ╚═╝  ╚═╝╚═╝  ╚═╝╚══════╝ |              *
 *                +--------------------------------------------+              *
 *                                                                            *
 *                         Distributed Systems Runtime                        *
 *                                                                            *
 * -------------------------------------------------------------------------- *
 *                                                                            *
 *   Licensed under the Apache License, Version 2.0 (the "License");          *
 *   you may not use this file except in compliance with the License.         *
 *   You may obtain a copy of the License at                                  *
 *                                                                            *
 *       http://www.apache.org/licenses/LICENSE-2.0                           *
 *                                                                            *
 *   Unless required by applicable law or agreed to in writing, software      *
 *   distributed under the License is distributed on an "AS IS" BASIS,        *
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
 *   See the License for the specific language governing permissions and      *
 *   limitations under the License.                                           *
 *                                                                            *
\* -------------------------------------------------------------------------- */

use anyhow::Context;
use aya::{
    maps::perf::AsyncPerfEventArray,
    util::{nr_cpus, online_cpus},
    Bpf,
};
use bytes::BytesMut;
use procfs::page_size;
use std::mem::size_of;
use tokio::sync::broadcast;
use tracing::{error, trace};

use super::perf_event_broadcast::PerfEventBroadcast;

/// Size (in pages) for the circular per-CPU buffers that BPF perfbuf creates.
const PER_CPU_BUFFER_SIZE_IN_PAGES: usize = 2;

pub trait PerfBufferReader<T: Clone + Send + 'static> {
    fn read_from_perf_buffer(
        bpf: &mut Bpf,
        perf_buffer: &'static str,
    ) -> anyhow::Result<PerfEventBroadcast<T>> {
        // Query the number of CPUs on the host
        let num_cpus = nr_cpus()?;

        // Query the page size on the host
        let page_size = page_size();

        // Get the size of the event payload
        let event_struct_size: usize = size_of::<T>();

        // Calculate the capacity of the per-CPU buffers based on the size of
        // the event
        let per_cpu_buffer_capacity = (PER_CPU_BUFFER_SIZE_IN_PAGES
            * page_size as usize)
            / event_struct_size;

        // Set the capacity of the channel to the combined capacity of all the
        // per-CPU buffers
        let channel_capacity = per_cpu_buffer_capacity * num_cpus;

        // Create the channel for broadcasting the events
        let (tx, _) = broadcast::channel(channel_capacity);

        // Open the BPF_PERF_EVENT_ARRAY BPF map that is used to send data from
        // kernel to userspace. This array contains the per-CPU buffers and is
        // indexed by CPU id.
        // https://libbpf.readthedocs.io/en/latest/api.html
        let mut perf_array = AsyncPerfEventArray::try_from(
            bpf.take_map(perf_buffer)
                .context("Failed to find '{perf_buffer}' perf event array")?,
        )?;

        // Spawn a thread per CPU to listen for events from the kernel.
        for cpu_id in online_cpus()? {
            trace!("spawning task for cpu {cpu_id}");
            // Open the per-CPU buffer for the current CPU id
            let mut per_cpu_buffer =
                perf_array.open(cpu_id, Some(PER_CPU_BUFFER_SIZE_IN_PAGES))?;

            // Clone the sender of the event broadcast channel
            let per_cpu_tx = tx.clone();

            // Spawn the thread to listen on the per-CPU buffer
            let _ignored = tokio::spawn(async move {
                trace!("task for cpu {cpu_id} awaiting for events");

                // Allocate enough memory to drain the entire buffer
                // Note: using `vec!` macro will not result in a correct `Vec`
                let mut buffers = (0..per_cpu_buffer_capacity)
                    .map(|_| BytesMut::with_capacity(event_struct_size))
                    .collect::<Vec<_>>();

                // Start polling the per-CPU buffer for events
                loop {
                    let events = match per_cpu_buffer
                        .read_events(&mut buffers)
                        .await
                    {
                        Ok(events) => events,
                        Err(error) => {
                            error!("fail to read events from per-cpu perf buffer, bailing out: {error}");
                            return;
                        }
                    };

                    if events.lost > 0 {
                        error!(
                            "buffer full, dropped {} perf events - this should never happen!",
                            events.lost
                        );
                    }

                    // If we don't have any receivers, there is no reason to send the signals to the channels.
                    // There is the possibility that a receiver subscribes while we are in the loop,
                    //   but this chooses performance over that possibility.
                    if per_cpu_tx.receiver_count() > 0 {
                        for buf in buffers.iter_mut().take(events.read) {
                            let ptr = buf.as_ptr() as *const T;
                            let signal = unsafe { ptr.read_unaligned() };
                            // send only errors if there are no receivers,
                            // so the return can be safely ignored;
                            // future sends may succeed
                            let _ = per_cpu_tx.send(signal);
                            // We don't clear buf for performance reasons (though it should be fast).
                            // Since we call `.take(events.read)` above, we shouldn't be re-reading old data
                            // buf.clear();
                        }
                    }
                }
            });
        }

        Ok(PerfEventBroadcast::new(tx))
    }
}