1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
/* -------------------------------------------------------------------------- *\
* Apache 2.0 License Copyright © 2022-2023 The Aurae Authors *
* *
* +--------------------------------------------+ *
* | █████╗ ██╗ ██╗██████╗ █████╗ ███████╗ | *
* | ██╔══██╗██║ ██║██╔══██╗██╔══██╗██╔════╝ | *
* | ███████║██║ ██║██████╔╝███████║█████╗ | *
* | ██╔══██║██║ ██║██╔══██╗██╔══██║██╔══╝ | *
* | ██║ ██║╚██████╔╝██║ ██║██║ ██║███████╗ | *
* | ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝ | *
* +--------------------------------------------+ *
* *
* Distributed Systems Runtime *
* *
* -------------------------------------------------------------------------- *
* *
* Licensed under the Apache License, Version 2.0 (the "License"); *
* you may not use this file except in compliance with the License. *
* You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, software *
* distributed under the License is distributed on an "AS IS" BASIS, *
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
* See the License for the specific language governing permissions and *
* limitations under the License. *
* *
\* -------------------------------------------------------------------------- */
use anyhow::Context;
use aya::{
maps::perf::AsyncPerfEventArray,
util::{nr_cpus, online_cpus},
Bpf,
};
use bytes::BytesMut;
use procfs::page_size;
use std::mem::size_of;
use tokio::sync::broadcast;
use tracing::{error, trace};
use super::perf_event_broadcast::PerfEventBroadcast;
/// Size (in pages) for the circular per-CPU buffers that BPF perfbuf creates.
const PER_CPU_BUFFER_SIZE_IN_PAGES: usize = 2;
pub trait PerfBufferReader<T: Clone + Send + 'static> {
fn read_from_perf_buffer(
bpf: &mut Bpf,
perf_buffer: &'static str,
) -> anyhow::Result<PerfEventBroadcast<T>> {
// Query the number of CPUs on the host
let num_cpus = nr_cpus()?;
// Query the page size on the host
let page_size = page_size();
// Get the size of the event payload
let event_struct_size: usize = size_of::<T>();
// Calculate the capacity of the per-CPU buffers based on the size of
// the event
let per_cpu_buffer_capacity = (PER_CPU_BUFFER_SIZE_IN_PAGES
* page_size as usize)
/ event_struct_size;
// Set the capacity of the channel to the combined capacity of all the
// per-CPU buffers
let channel_capacity = per_cpu_buffer_capacity * num_cpus;
// Create the channel for broadcasting the events
let (tx, _) = broadcast::channel(channel_capacity);
// Open the BPF_PERF_EVENT_ARRAY BPF map that is used to send data from
// kernel to userspace. This array contains the per-CPU buffers and is
// indexed by CPU id.
// https://libbpf.readthedocs.io/en/latest/api.html
let mut perf_array = AsyncPerfEventArray::try_from(
bpf.take_map(perf_buffer)
.context("Failed to find '{perf_buffer}' perf event array")?,
)?;
// Spawn a thread per CPU to listen for events from the kernel.
for cpu_id in online_cpus()? {
trace!("spawning task for cpu {cpu_id}");
// Open the per-CPU buffer for the current CPU id
let mut per_cpu_buffer =
perf_array.open(cpu_id, Some(PER_CPU_BUFFER_SIZE_IN_PAGES))?;
// Clone the sender of the event broadcast channel
let per_cpu_tx = tx.clone();
// Spawn the thread to listen on the per-CPU buffer
let _ignored = tokio::spawn(async move {
trace!("task for cpu {cpu_id} awaiting for events");
// Allocate enough memory to drain the entire buffer
// Note: using `vec!` macro will not result in a correct `Vec`
let mut buffers = (0..per_cpu_buffer_capacity)
.map(|_| BytesMut::with_capacity(event_struct_size))
.collect::<Vec<_>>();
// Start polling the per-CPU buffer for events
loop {
let events = match per_cpu_buffer
.read_events(&mut buffers)
.await
{
Ok(events) => events,
Err(error) => {
error!("fail to read events from per-cpu perf buffer, bailing out: {error}");
return;
}
};
if events.lost > 0 {
error!(
"buffer full, dropped {} perf events - this should never happen!",
events.lost
);
}
// If we don't have any receivers, there is no reason to send the signals to the channels.
// There is the possibility that a receiver subscribes while we are in the loop,
// but this chooses performance over that possibility.
if per_cpu_tx.receiver_count() > 0 {
for buf in buffers.iter_mut().take(events.read) {
let ptr = buf.as_ptr() as *const T;
let signal = unsafe { ptr.read_unaligned() };
// send only errors if there are no receivers,
// so the return can be safely ignored;
// future sends may succeed
let _ = per_cpu_tx.send(signal);
// We don't clear buf for performance reasons (though it should be fast).
// Since we call `.take(events.read)` above, we shouldn't be re-reading old data
// buf.clear();
}
}
}
});
}
Ok(PerfEventBroadcast::new(tx))
}
}