diff --git a/awkernel_async_lib/Cargo.toml b/awkernel_async_lib/Cargo.toml index 1abcf8e9c..ac3688d55 100644 --- a/awkernel_async_lib/Cargo.toml +++ b/awkernel_async_lib/Cargo.toml @@ -45,3 +45,4 @@ no_preempt = [] spinlock = ["awkernel_lib/spinlock"] clippy = [] perf = [] +need-get-period = ["perf"] diff --git a/awkernel_async_lib/src/dag.rs b/awkernel_async_lib/src/dag.rs index fa150a48f..956933651 100644 --- a/awkernel_async_lib/src/dag.rs +++ b/awkernel_async_lib/src/dag.rs @@ -67,6 +67,13 @@ use crate::{ time_interval::interval, Attribute, MultipleReceiver, MultipleSender, VectorToPublishers, VectorToSubscribers, }; + +#[cfg(feature = "need-get-period")] +use crate::task::perf::{ + get_period_count, increment_period_count, subscribe_timestamp_at, + update_fin_recv_outer_timestamp_at, update_pre_send_outer_timestamp_at, +}; + use alloc::{ borrow::Cow, boxed::Box, @@ -926,10 +933,31 @@ where Args::create_subscribers(subscribe_topic_names, Attribute::default()); loop { - let args: <::Subscribers as MultipleReceiver>::Item = - subscribers.recv_all().await; - let results = f(args); - publishers.send_all(results).await; + #[cfg(feature = "need-get-period")] + { + let (args, count_st): ( + <::Subscribers as MultipleReceiver>::Item, + u32, + ) = subscribers.recv_all_with_period().await; + + // [end] pubsub communication latency + let end = awkernel_lib::time::Time::now().uptime().as_nanos() as u64; + subscribe_timestamp_at(count_st as usize, end, 1, dag_info.node_id.clone()); + + let results = f(args); + publishers + .send_all_with_meta(results, 1, count_st as usize, dag_info.node_id) + .await; + } + + #[cfg(not(feature = "need-get-period"))] + { + let args: <::Subscribers as MultipleReceiver>::Item = + subscribers.recv_all().await; + + let results = f(args); + publishers.send_all(results).await; + } } }; @@ -966,13 +994,31 @@ where Attribute::default(), ); - let mut interval = interval(period); + let mut interval = interval(period, dag_info.dag_id); // Consume the first tick here to start the loop's main body without an initial delay. interval.tick().await; loop { - let results = f(); - publishers.send_all(results).await; + #[cfg(feature = "need-get-period")] + { + let index = get_period_count(dag_info.dag_id) as usize; + if index != 0 { + // [start] cycle deviation index >= 1 + let release_time = awkernel_lib::time::Time::now().uptime().as_nanos() as u64; + update_pre_send_outer_timestamp_at(index, release_time, dag_info.dag_id); + } + let results = f(); + publishers + .send_all_with_meta(results, 0, index, dag_info.node_id) + .await; + increment_period_count(dag_info.dag_id); + } + + #[cfg(not(feature = "need-get-period"))] + { + let results = f(); + publishers.send_all(results).await; + } #[cfg(feature = "perf")] periodic_measure(); @@ -1006,8 +1052,29 @@ where Args::create_subscribers(subscribe_topic_names, Attribute::default()); loop { - let args: ::Item = subscribers.recv_all().await; - f(args); + #[cfg(feature = "need-get-period")] + { + let (args, count_st): (::Item, u32) = + subscribers.recv_all_with_period().await; + + // [end] pubsub communication latency + let end = awkernel_lib::time::Time::now().uptime().as_nanos() as u64; + subscribe_timestamp_at(count_st as usize, end, 2, dag_info.node_id.clone()); + + let timenow = awkernel_lib::time::Time::now().uptime().as_nanos() as u64; + if count_st != 0 { + update_fin_recv_outer_timestamp_at(count_st as usize, timenow, dag_info.dag_id); + } + + f(args); + } + + #[cfg(not(feature = "need-get-period"))] + { + let args: ::Item = + subscribers.recv_all().await; + f(args); + } } }; diff --git a/awkernel_async_lib/src/pubsub.rs b/awkernel_async_lib/src/pubsub.rs index c216bae0b..618015650 100644 --- a/awkernel_async_lib/src/pubsub.rs +++ b/awkernel_async_lib/src/pubsub.rs @@ -52,11 +52,16 @@ use core::{ use futures::Future; use pin_project::pin_project; +#[cfg(feature = "need-get-period")] +use crate::task::perf::publish_timestamp_at; + /// Data and timestamp. #[derive(Clone)] pub struct Data { pub timestamp: awkernel_lib::time::Time, pub data: T, + #[cfg(feature = "need-get-period")] + pub index: u32, } /// Publisher. @@ -260,6 +265,8 @@ struct Sender<'a, T: 'static + Send> { subscribers: VecDeque>, state: SenderState, timestamp: awkernel_lib::time::Time, + #[cfg(feature = "need-get-period")] + index: u32, } enum SenderState { @@ -276,8 +283,16 @@ impl<'a, T: Send> Sender<'a, T> { subscribers: Default::default(), state: SenderState::Start, timestamp: awkernel_lib::time::Time::now(), + #[cfg(feature = "need-get-period")] + index: 0, } } + + #[cfg(feature = "need-get-period")] + pub(super) fn with_period(mut self, index: u32) -> Self { + self.index = index; + self + } } impl Future for Sender<'_, T> @@ -309,6 +324,8 @@ where if let Err(data) = guard.push(Data { timestamp: awkernel_lib::time::Time::now(), data: data.clone(), + #[cfg(feature = "need-get-period")] + index: *this.index, }) { // If the send buffer is full, then remove the oldest one and store again. guard.pop(); @@ -342,6 +359,8 @@ where match inner.queue.push(Data { timestamp: *this.timestamp, data: data.clone(), + #[cfg(feature = "need-get-period")] + index: *this.index, }) { Ok(_) => { // Wake the subscriber up. @@ -386,8 +405,94 @@ where sender.await; r#yield().await; } + + #[cfg(feature = "need-get-period")] + pub async fn send_with_meta(&self, data: T, pub_id: u32, index: usize, node_id: u32) { + // [start] pubsub communication latency + let start = awkernel_lib::time::Time::now().uptime().as_nanos() as u64; + publish_timestamp_at(index, start, pub_id, node_id); + let period = match u32::try_from(index) { + Ok(period) => period, + Err(_) => { + log::warn!( + "Period index {} exceeds u32::MAX; saturating period metadata", + index + ); + u32::MAX + } + }; + let sender = Sender::new(self, data).with_period(period); + sender.await; + r#yield().await; + } } +#[cfg(all(test, feature = "need-get-period"))] +mod need_get_period_tests { + use super::*; + use core::{ + future::Future, + pin::Pin, + task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, + }; + + fn block_on(future: F) -> F::Output { + fn raw_waker() -> RawWaker { + fn clone(_: *const ()) -> RawWaker { + raw_waker() + } + fn wake(_: *const ()) {} + fn wake_by_ref(_: *const ()) {} + fn drop(_: *const ()) {} + + RawWaker::new( + core::ptr::null(), + &RawWakerVTable::new(clone, wake, wake_by_ref, drop), + ) + } + + let waker = unsafe { Waker::from_raw(raw_waker()) }; + let mut future = Box::pin(future); + + loop { + let mut context = Context::from_waker(&waker); + match Pin::as_mut(&mut future).poll(&mut context) { + Poll::Ready(output) => return output, + Poll::Pending => {} + } + } + } + + #[test] + fn send_with_meta_propagates_period_to_receiver() { + block_on(async { + let (publisher, subscriber) = create_pubsub::(Attribute::default()); + publisher.send_with_meta(42, 1, 7, 99).await; + + let received = subscriber.recv().await; + assert_eq!(received.data, 42); + assert_eq!(received.index, 7); + }); + } + + #[test] + fn tuple_recv_all_with_period_returns_shared_period() { + block_on(async { + let (publisher1, subscriber1) = create_pubsub::(Attribute::default()); + let (publisher2, subscriber2) = create_pubsub::(Attribute::default()); + + publisher1.send_with_meta(10, 11, 3, 21).await; + publisher2.send_with_meta(20, 12, 3, 22).await; + + let ((value1, value2), period) = + (subscriber1, subscriber2).recv_all_with_period().await; + + assert_eq!(value1, 10); + assert_eq!(value2, 20); + assert_eq!(period, 3); + }); + } +} /// Create an anonymous publisher and an anonymous subscriber. /// This channel works as a channel of multiple producers and multiple consumers. /// @@ -756,12 +861,24 @@ pub trait MultipleReceiver { type Item; fn recv_all(&self) -> Pin + Send + '_>>; + + #[cfg(feature = "need-get-period")] + fn recv_all_with_period(&self) -> Pin + Send + '_>>; } pub trait MultipleSender { type Item; fn send_all(&self, item: Self::Item) -> Pin + Send + '_>>; + + #[cfg(feature = "need-get-period")] + fn send_all_with_meta( + &self, + item: Self::Item, + pub_id: u32, + index: usize, + node_id: u32, + ) -> Pin + Send + '_>>; } pub trait VectorToPublishers { type Publishers: MultipleSender; @@ -834,6 +951,13 @@ macro_rules! impl_async_receiver_for_tuple { fn recv_all(&self) -> Pin + Send + '_>> { Box::pin(async move{}) } + + #[cfg(feature = "need-get-period")] + fn recv_all_with_period( + &self, + ) -> Pin + Send + '_>> { + Box::pin(async move { ((), 0) }) + } } impl MultipleSender for () { @@ -842,6 +966,17 @@ macro_rules! impl_async_receiver_for_tuple { fn send_all(&self, _item: Self::Item) -> Pin + Send + '_>> { Box::pin(async move{}) } + + #[cfg(feature = "need-get-period")] + fn send_all_with_meta( + &self, + _item: Self::Item, + _pub_id: u32, + _index: usize, + _node_id: u32, + ) -> Pin + Send + '_>> { + Box::pin(async move{}) + } } }; ($(($T:ident, $idx:tt, $idx2:tt)),+) => { @@ -854,6 +989,34 @@ macro_rules! impl_async_receiver_for_tuple { ($($idx.recv().await.data,)+) }) } + + #[cfg(feature = "need-get-period")] + fn recv_all_with_period( + &self, + ) -> Pin + Send + '_>> { + let ($($idx,)+) = self; + Box::pin(async move { + let mut period: Option = None; + $( + let item = $idx.recv().await; + match period { + Some(expected) => { + assert!( + expected == item.index, + "recv_all_with_period received mismatched periods: expected {}, got {}", + expected, + item.index + ); + } + None => { + period = Some(item.index); + } + } + let $idx2 = item.data; + )+ + (($($idx2,)+), period.expect("recv_all_with_period requires at least one subscriber")) + }) + } } impl<$($T: Clone + Sync + Send + 'static),+> MultipleSender for ($(Publisher<$T>,)+) { @@ -868,6 +1031,23 @@ macro_rules! impl_async_receiver_for_tuple { )+ }) } + + #[cfg(feature = "need-get-period")] + fn send_all_with_meta( + &self, + item: Self::Item, + pub_id: u32, + index: usize, + node_id: u32, + ) -> Pin + Send + '_>> { + let ($($idx,)+) = self; + let ($($idx2,)+) = item; + Box::pin(async move { + $( + $idx.send_with_meta($idx2, pub_id, index, node_id).await; + )+ + }) + } } }; } diff --git a/awkernel_async_lib/src/task.rs b/awkernel_async_lib/src/task.rs index bdb73c447..0cbbfcd0b 100644 --- a/awkernel_async_lib/src/task.rs +++ b/awkernel_async_lib/src/task.rs @@ -246,7 +246,7 @@ struct Tasks { id_to_task: BTreeMap>, } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct DagInfo { pub dag_id: u32, pub node_id: u32, @@ -454,8 +454,12 @@ fn get_next_task(execution_ensured: bool) -> Option> { #[cfg(feature = "perf")] pub mod perf { + use crate::task::{self}; + use alloc::boxed::Box; + use alloc::string::{String, ToString}; use awkernel_lib::cpu::NUM_MAX_CPU; use core::ptr::{read_volatile, write_volatile}; + use core::sync::atomic::AtomicU32; #[derive(Debug, Clone, PartialEq, Eq)] #[repr(u8)] @@ -464,6 +468,7 @@ pub mod perf { Kernel, Task, ContextSwitch, + ContextSwitchMain, Interrupt, Idle, } @@ -475,8 +480,9 @@ pub mod perf { 1 => Self::Kernel, 2 => Self::Task, 3 => Self::ContextSwitch, - 4 => Self::Interrupt, - 5 => Self::Idle, + 4 => Self::ContextSwitchMain, + 5 => Self::Interrupt, + 6 => Self::Idle, _ => panic!("From for PerfState::from: invalid value"), } } @@ -490,6 +496,7 @@ pub mod perf { static mut TASK_TIME: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; static mut INTERRUPT_TIME: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; static mut CONTEXT_SWITCH_TIME: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; + static mut CONTEXT_SWITCH_MAIN_TIME: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; static mut IDLE_TIME: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; static mut PERF_TIME: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; @@ -497,6 +504,7 @@ pub mod perf { static mut TASK_WCET: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; static mut INTERRUPT_WCET: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; static mut CONTEXT_SWITCH_WCET: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; + static mut CONTEXT_SWITCH_MAIN_WCET: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; static mut IDLE_WCET: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; static mut PERF_WCET: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; @@ -504,9 +512,389 @@ pub mod perf { static mut TASK_COUNT: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; static mut INTERRUPT_COUNT: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; static mut CONTEXT_SWITCH_COUNT: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; + static mut CONTEXT_SWITCH_MAIN_COUNT: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; static mut IDLE_COUNT: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; static mut PERF_COUNT: [u64; NUM_MAX_CPU] = [0; NUM_MAX_CPU]; + use alloc::{collections::BTreeMap, vec::Vec}; + use awkernel_lib::sync::{mcs::MCSNode, mutex::Mutex}; + const MAX_LOGS: usize = 8192; + + type DagTimestampMap = BTreeMap; + type DagTimestampTable = [DagTimestampMap; MAX_LOGS]; + + static SEND_OUTER_TIMESTAMP: Mutex>> = Mutex::new(None); + static RECV_OUTER_TIMESTAMP: Mutex>> = Mutex::new(None); + static ABSOLUTE_DEADLINE: Mutex>> = Mutex::new(None); + static RELATIVE_DEADLINE: Mutex>> = Mutex::new(None); + + pub static PERIOD_COUNT: Mutex> = Mutex::new(BTreeMap::new()); + + pub fn get_period_count(dag_id: u32) -> u32 { + let mut node = MCSNode::new(); + let period_count = PERIOD_COUNT.lock(&mut node); + period_count + .get(&dag_id) + .map(|count| count.load(core::sync::atomic::Ordering::Relaxed)) + .unwrap_or(0) + } + + pub fn increment_period_count(dag_id: u32) -> u32 { + let mut node = MCSNode::new(); + let mut period_count = PERIOD_COUNT.lock(&mut node); + let count = period_count + .entry(dag_id) + .or_insert_with(|| AtomicU32::new(0)); + count.fetch_add(1, core::sync::atomic::Ordering::Relaxed) + 1 + } + //pubsub + const MAX_PUBSUB: usize = 3; + const MAX_NODES: usize = 20; + #[derive(Clone)] + struct PubSubTable { + timestamps: [[[u64; MAX_NODES]; MAX_PUBSUB]; MAX_LOGS], + used_indices: Vec, + } + + impl Default for PubSubTable { + fn default() -> Self { + Self { + timestamps: [[[0; MAX_NODES]; MAX_PUBSUB]; MAX_LOGS], + used_indices: Vec::new(), + } + } + } + + impl PubSubTable { + #[inline(always)] + fn flat_index(index: usize, pub_id: usize, node_id: usize) -> usize { + (index * MAX_PUBSUB * MAX_NODES) + (pub_id * MAX_NODES) + node_id + } + + #[inline(always)] + fn decode_flat_index(flat_index: usize) -> (usize, usize, usize) { + let per_log = MAX_PUBSUB * MAX_NODES; + let index = flat_index / per_log; + let rem = flat_index % per_log; + let pub_id = rem / MAX_NODES; + let node_id = rem % MAX_NODES; + (index, pub_id, node_id) + } + + #[inline(always)] + fn mark_used(&mut self, index: usize, pub_id: usize, node_id: usize) { + self.used_indices + .push(Self::flat_index(index, pub_id, node_id)); + } + } + + static PUBLISH: Mutex>> = Mutex::new(None); + static SUBSCRIBE: Mutex>> = Mutex::new(None); + + #[inline(always)] + fn normalize_log_index(index: usize) -> usize { + index % MAX_LOGS + } + + pub fn update_pre_send_outer_timestamp_at(index: usize, new_timestamp: u64, dag_id: u32) { + let index = normalize_log_index(index); + + let mut node = MCSNode::new(); + let mut recorder_opt = SEND_OUTER_TIMESTAMP.lock(&mut node); + + let recorder = + recorder_opt.get_or_insert_with(|| Box::new(core::array::from_fn(|_| BTreeMap::new()))); + + if recorder[index].get(&dag_id).copied().unwrap_or(0) == 0 { + recorder[index].insert(dag_id, new_timestamp); + } + } + + pub fn update_fin_recv_outer_timestamp_at(index: usize, new_timestamp: u64, dag_id: u32) { + let index = normalize_log_index(index); + + let mut node = MCSNode::new(); + let mut recorder_opt = RECV_OUTER_TIMESTAMP.lock(&mut node); + + let recorder = + recorder_opt.get_or_insert_with(|| Box::new(core::array::from_fn(|_| BTreeMap::new()))); + + recorder[index].insert(dag_id, new_timestamp); + } + + pub fn update_absolute_deadline_timestamp_at(index: usize, deadline: u64, dag_id: u32) { + let index = normalize_log_index(index); + + let mut node = MCSNode::new(); + let mut recorder_opt = ABSOLUTE_DEADLINE.lock(&mut node); + + let recorder = + recorder_opt.get_or_insert_with(|| Box::new(core::array::from_fn(|_| BTreeMap::new()))); + + if recorder[index].get(&dag_id).copied().unwrap_or(0) == 0 { + recorder[index].insert(dag_id, deadline); + } + } + + pub fn update_relative_deadline_timestamp_at(index: usize, deadline: u64, dag_id: u32) { + let index = normalize_log_index(index); + + let mut node = MCSNode::new(); + let mut recorder_opt = RELATIVE_DEADLINE.lock(&mut node); + + let recorder = + recorder_opt.get_or_insert_with(|| Box::new(core::array::from_fn(|_| BTreeMap::new()))); + + if recorder[index].get(&dag_id).copied().unwrap_or(0) == 0 { + recorder[index].insert(dag_id, deadline); + } + } + + pub fn publish_timestamp_at(index: usize, new_timestamp: u64, pub_id: u32, node_id: u32) { + let index = normalize_log_index(index); + assert!((pub_id as usize) < MAX_PUBSUB, "Publish ID out of bounds"); + + let node_id_usize = node_id as usize; + if node_id_usize >= MAX_NODES { + log::warn!( + "Publish node ID out of bounds: {} (max {})", + node_id_usize, + MAX_NODES + ); + return; + } + + let mut node = MCSNode::new(); + let mut recorder_opt = PUBLISH.lock(&mut node); + + let recorder = recorder_opt.get_or_insert_with(|| Box::new(PubSubTable::default())); + let pub_id = pub_id as usize; + + if recorder.timestamps[index][pub_id][node_id_usize] == 0 { + recorder.timestamps[index][pub_id][node_id_usize] = new_timestamp; + recorder.mark_used(index, pub_id, node_id_usize); + } + } + + pub fn subscribe_timestamp_at(index: usize, new_timestamp: u64, sub_id: u32, node_id: u32) { + let index = normalize_log_index(index); + assert!((sub_id as usize) < MAX_PUBSUB, "Subscribe ID out of bounds"); + + let node_id_usize = node_id as usize; + if node_id_usize >= MAX_NODES { + log::warn!( + "Subscribe node ID out of bounds: {} (max {})", + node_id_usize, + MAX_NODES + ); + return; + } + + let mut node = MCSNode::new(); + let mut recorder_opt = SUBSCRIBE.lock(&mut node); + + let recorder = recorder_opt.get_or_insert_with(|| Box::new(PubSubTable::default())); + let sub_id = sub_id as usize; + + if recorder.timestamps[index][sub_id][node_id_usize] == 0 { + recorder.timestamps[index][sub_id][node_id_usize] = new_timestamp; + recorder.mark_used(index, sub_id, node_id_usize); + } + } + + // For precision of the cycle + pub fn print_timestamp_table() { + let mut node1 = MCSNode::new(); + let mut node2 = MCSNode::new(); + let mut node3 = MCSNode::new(); + let mut node4 = MCSNode::new(); + const MAX_ROWS_TO_PRINT: usize = 256; + + let send_outer_opt = SEND_OUTER_TIMESTAMP.lock(&mut node1); + let recv_outer_opt = RECV_OUTER_TIMESTAMP.lock(&mut node2); + let absolute_deadline_opt = ABSOLUTE_DEADLINE.lock(&mut node3); + let relative_deadline_opt = RELATIVE_DEADLINE.lock(&mut node4); + + let mut rows = Vec::new(); + let mut truncated = false; + 'collect_rows: for i in 0..MAX_LOGS { + let mut dag_ids = Vec::new(); + if let Some(arr) = &*send_outer_opt { + dag_ids.extend(arr[i].keys().copied()); + } + if let Some(arr) = &*recv_outer_opt { + dag_ids.extend(arr[i].keys().copied()); + } + if let Some(arr) = &*absolute_deadline_opt { + dag_ids.extend(arr[i].keys().copied()); + } + if let Some(arr) = &*relative_deadline_opt { + dag_ids.extend(arr[i].keys().copied()); + } + + dag_ids.sort_unstable(); + dag_ids.dedup(); + + for dag_id in dag_ids { + let pre_send_outer = match &*send_outer_opt { + Some(arr) => arr[i].get(&dag_id).copied().unwrap_or(0), + None => 0, + }; + let fin_recv_outer = match &*recv_outer_opt { + Some(arr) => arr[i].get(&dag_id).copied().unwrap_or(0), + None => 0, + }; + let absolute_deadline = match &*absolute_deadline_opt { + Some(arr) => arr[i].get(&dag_id).copied().unwrap_or(0), + None => 0, + }; + let relative_deadline = match &*relative_deadline_opt { + Some(arr) => arr[i].get(&dag_id).copied().unwrap_or(0), + None => 0, + }; + + if pre_send_outer != 0 + || fin_recv_outer != 0 + || absolute_deadline != 0 + || relative_deadline != 0 + { + if rows.len() >= MAX_ROWS_TO_PRINT { + truncated = true; + break 'collect_rows; + } + rows.push(( + i, + dag_id, + pre_send_outer, + fin_recv_outer, + absolute_deadline, + relative_deadline, + )); + } + } + } + drop(relative_deadline_opt); + drop(absolute_deadline_opt); + drop(recv_outer_opt); + drop(send_outer_opt); + + log::info!("--- Timestamp Summary (in nanoseconds) ---"); + log::info!( + "{: ^5} | {: ^5} | {: ^14} | {: ^14} | {: ^14} | {: ^14} | {: ^14}", + "Index", + "DAG-ID", + "Send-Outer", + "Recv-Outer", + "Latency", + "Absolute Deadline", + "Relative Deadline" + ); + + log::info!("-----|--------|----------------|----------------|----------------|--------------------|--------------------|--------------------|--------------------"); + + for (i, dag_id, pre_send_outer, fin_recv_outer, absolute_deadline, relative_deadline) in + rows + { + let format_ts = |ts: u64| -> String { + if ts == 0 { + "-".to_string() + } else { + ts.to_string() + } + }; + + let latency_str = if pre_send_outer != 0 && fin_recv_outer != 0 { + fin_recv_outer.saturating_sub(pre_send_outer).to_string() + } else { + "-".to_string() + }; + + log::info!( + "{: >5} | {: >5} | {: >14} | {: >14} | {: >14} | {: >20} | {: >20}", + i, + format_ts(dag_id as u64), + format_ts(pre_send_outer), + format_ts(fin_recv_outer), + latency_str, + format_ts(absolute_deadline), + format_ts(relative_deadline), + ); + } + if truncated { + log::warn!( + "Timestamp Summary truncated to {} rows; call print_timestamp_table() again to continue inspection", + MAX_ROWS_TO_PRINT + ); + } + log::info!("----------------------------------------------------------"); + } + + // For pubsub communication latency + pub fn print_pubsub_table() { + let mut node1 = MCSNode::new(); + let mut node2 = MCSNode::new(); + + let publish_opt = PUBLISH.lock(&mut node1); + let subscribe_opt = SUBSCRIBE.lock(&mut node2); + + let mut indices = Vec::new(); + if let Some(publish) = publish_opt.as_ref() { + indices.extend_from_slice(&publish.used_indices); + } + if let Some(subscribe) = subscribe_opt.as_ref() { + indices.extend_from_slice(&subscribe.used_indices); + } + indices.sort_unstable(); + indices.dedup(); + + log::info!("--- Pub/Sub Timestamp Summary (in nanoseconds) ---"); + log::info!( + "{: ^5} | {: ^10} | {: ^7} | {: ^14} | {: ^14}", + "Index", + "Pub/Sub ID", + "Node ID", + "Publish Time", + "Subscribe Time" + ); + log::info!("-----|------------|---------|----------------|----------------"); + for flat_index in indices { + let (i, j, k) = PubSubTable::decode_flat_index(flat_index); + // Skip if decoded indices are out of bounds + if i >= MAX_LOGS || j >= MAX_PUBSUB || k >= MAX_NODES { + log::warn!("Decoded index out of bounds: ({}, {}, {})", i, j, k); + continue; + } + let publish_time = match &*publish_opt { + Some(table) => table.timestamps[i][j][k], + None => 0, + }; + let subscribe_time = match &*subscribe_opt { + Some(table) => table.timestamps[i][j][k], + None => 0, + }; + + if publish_time != 0 || subscribe_time != 0 { + let format_ts = |ts: u64| -> String { + if ts == 0 { + "-".to_string() + } else { + ts.to_string() + } + }; + + log::info!( + "{: >5} | {: >10} | {: >7} | {: >14} | {: >14}", + i, + j, + k, + format_ts(publish_time), + format_ts(subscribe_time), + ); + } + } + log::info!("--------------------------------------------------------------"); + } + fn update_time_and_state(next_state: PerfState) { let end = awkernel_lib::delay::cpu_counter(); let cpu_id = awkernel_lib::cpu::cpu_id(); @@ -554,6 +942,101 @@ pub mod perf { let wcet = read_volatile(&CONTEXT_SWITCH_WCET[cpu_id]); write_volatile(&mut CONTEXT_SWITCH_WCET[cpu_id], wcet.max(diff)); }, + PerfState::ContextSwitchMain => unsafe { + let t = read_volatile(&CONTEXT_SWITCH_MAIN_TIME[cpu_id]); + write_volatile(&mut CONTEXT_SWITCH_MAIN_TIME[cpu_id], t + diff); + let c = read_volatile(&CONTEXT_SWITCH_MAIN_COUNT[cpu_id]); + write_volatile(&mut CONTEXT_SWITCH_MAIN_COUNT[cpu_id], c + 1); + let wcet = read_volatile(&CONTEXT_SWITCH_MAIN_WCET[cpu_id]); + write_volatile(&mut CONTEXT_SWITCH_MAIN_WCET[cpu_id], wcet.max(diff)); + }, + PerfState::Idle => unsafe { + let t = read_volatile(&IDLE_TIME[cpu_id]); + write_volatile(&mut IDLE_TIME[cpu_id], t + diff); + let c = read_volatile(&IDLE_COUNT[cpu_id]); + write_volatile(&mut IDLE_COUNT[cpu_id], c + 1); + let wcet = read_volatile(&IDLE_WCET[cpu_id]); + write_volatile(&mut IDLE_WCET[cpu_id], wcet.max(diff)); + }, + PerfState::Boot => (), + } + } + + let cnt = awkernel_lib::delay::cpu_counter(); + + unsafe { + // Overhead of this. + let t = read_volatile(&PERF_TIME[cpu_id]); + write_volatile(&mut PERF_TIME[cpu_id], t + (cnt - end)); + let c = read_volatile(&PERF_COUNT[cpu_id]); + write_volatile(&mut PERF_COUNT[cpu_id], c + 1); + let wcet = read_volatile(&PERF_WCET[cpu_id]); + write_volatile(&mut PERF_WCET[cpu_id], wcet.max(cnt - end)); + + // State transition. + write_volatile(&mut START_TIME[cpu_id], cnt); + write_volatile(&mut PERF_STATES[cpu_id], next_state as u8); + } + } + + fn update_time_and_state_for_dag(next_state: PerfState) { + let end = awkernel_lib::delay::cpu_counter(); + let cpu_id = awkernel_lib::cpu::cpu_id(); + + let state: PerfState = unsafe { read_volatile(&PERF_STATES[cpu_id]) }.into(); + if state == next_state { + return; + } + + let start = unsafe { read_volatile(&START_TIME[cpu_id]) }; + + if start > 0 && start <= end { + let diff = end - start; + + match state { + PerfState::Kernel => unsafe { + let t = read_volatile(&KERNEL_TIME[cpu_id]); + write_volatile(&mut KERNEL_TIME[cpu_id], t + diff); + let c = read_volatile(&KERNEL_COUNT[cpu_id]); + write_volatile(&mut KERNEL_COUNT[cpu_id], c + 1); + let wcet = read_volatile(&KERNEL_WCET[cpu_id]); + write_volatile(&mut KERNEL_WCET[cpu_id], wcet.max(diff)); + }, + PerfState::Task => unsafe { + let t = read_volatile(&TASK_TIME[cpu_id]); + write_volatile(&mut TASK_TIME[cpu_id], t + diff); + let c = read_volatile(&TASK_COUNT[cpu_id]); + write_volatile(&mut TASK_COUNT[cpu_id], c + 1); + let wcet = read_volatile(&TASK_WCET[cpu_id]); + write_volatile(&mut TASK_WCET[cpu_id], wcet.max(diff)); + }, + PerfState::Interrupt => unsafe { + // log::info!("PreemptionTime CPU:{} Diff:{}", cpu_id, diff); + let t = read_volatile(&INTERRUPT_TIME[cpu_id]); + write_volatile(&mut INTERRUPT_TIME[cpu_id], t + diff); + let c = read_volatile(&INTERRUPT_COUNT[cpu_id]); + write_volatile(&mut INTERRUPT_COUNT[cpu_id], c + 1); + let wcet = read_volatile(&INTERRUPT_WCET[cpu_id]); + write_volatile(&mut INTERRUPT_WCET[cpu_id], wcet.max(diff)); + }, + PerfState::ContextSwitch => unsafe { + // log::info!("ContextSwitchTime CPU:{} Diff:{}", cpu_id, diff); + let t = read_volatile(&CONTEXT_SWITCH_TIME[cpu_id]); + write_volatile(&mut CONTEXT_SWITCH_TIME[cpu_id], t + diff); + let c = read_volatile(&CONTEXT_SWITCH_COUNT[cpu_id]); + write_volatile(&mut CONTEXT_SWITCH_COUNT[cpu_id], c + 1); + let wcet = read_volatile(&CONTEXT_SWITCH_WCET[cpu_id]); + write_volatile(&mut CONTEXT_SWITCH_WCET[cpu_id], wcet.max(diff)); + }, + PerfState::ContextSwitchMain => unsafe { + // log::info!("ContextSwitchMainTime CPU:{} Diff:{}", cpu_id, diff); + let t = read_volatile(&CONTEXT_SWITCH_MAIN_TIME[cpu_id]); + write_volatile(&mut CONTEXT_SWITCH_MAIN_TIME[cpu_id], t + diff); + let c = read_volatile(&CONTEXT_SWITCH_MAIN_COUNT[cpu_id]); + write_volatile(&mut CONTEXT_SWITCH_MAIN_COUNT[cpu_id], c + 1); + let wcet = read_volatile(&CONTEXT_SWITCH_MAIN_WCET[cpu_id]); + write_volatile(&mut CONTEXT_SWITCH_MAIN_WCET[cpu_id], wcet.max(diff)); + }, PerfState::Idle => unsafe { let t = read_volatile(&IDLE_TIME[cpu_id]); write_volatile(&mut IDLE_TIME[cpu_id], t + diff); @@ -598,7 +1081,27 @@ pub mod perf { pub fn start_interrupt() -> PerfState { let cpu_id = awkernel_lib::cpu::cpu_id(); let previous: PerfState = unsafe { read_volatile(&PERF_STATES[cpu_id]) }.into(); - update_time_and_state(PerfState::Interrupt); + + // Check the current task + if let Some(task_id) = task::get_current_task(cpu_id) { + if let Some(task) = task::get_task(task_id) { + let dag_info = task.info.lock(&mut task::MCSNode::new()).get_dag_info(); + if dag_info.is_some() { + // DAG task-specific handling + update_time_and_state_for_dag(PerfState::Interrupt); + } else { + // Normal task-specific handling + update_time_and_state(PerfState::Interrupt); + } + } else { + // Default handling if the current task cannot be resolved + update_time_and_state(PerfState::Interrupt); + } + } else { + // Default handling if no task is running + update_time_and_state(PerfState::Interrupt); + } + previous } @@ -609,6 +1112,7 @@ pub mod perf { PerfState::Kernel => start_kernel(), PerfState::Task => start_task(), PerfState::ContextSwitch => start_context_switch(), + PerfState::ContextSwitchMain => start_context_switch_main(), PerfState::Interrupt => { start_interrupt(); } @@ -618,7 +1122,50 @@ pub mod perf { #[inline(always)] pub(crate) fn start_context_switch() { - update_time_and_state(PerfState::ContextSwitch); + let cpu_id = awkernel_lib::cpu::cpu_id(); + + if let Some(task_id) = task::get_current_task(cpu_id) { + if let Some(task) = task::get_task(task_id) { + let dag_info = task.info.lock(&mut task::MCSNode::new()).get_dag_info(); + if dag_info.is_some() { + // DAG task-specific handling + update_time_and_state_for_dag(PerfState::ContextSwitch); + } else { + // Normal task-specific handling + update_time_and_state(PerfState::ContextSwitch); + } + } else { + // Default handling if the current task cannot be resolved + update_time_and_state(PerfState::ContextSwitch); + } + } else { + // Default handling if no task is running + update_time_and_state(PerfState::ContextSwitch); + } + } + + #[inline(always)] + pub(crate) fn start_context_switch_main() { + let cpu_id = awkernel_lib::cpu::cpu_id(); + + if let Some(task_id) = task::get_current_task(cpu_id) { + if let Some(task) = task::get_task(task_id) { + let dag_info = task.info.lock(&mut task::MCSNode::new()).get_dag_info(); + if dag_info.is_some() { + // DAG task-specific handling + update_time_and_state_for_dag(PerfState::ContextSwitchMain); + } else { + // Normal task-specific handling + update_time_and_state(PerfState::ContextSwitchMain); + } + } else { + // Fallback handling if the current task ID cannot be resolved + update_time_and_state(PerfState::ContextSwitchMain); + } + } else { + // Default handling if no task is running + update_time_and_state(PerfState::ContextSwitchMain); + } } #[inline(always)] diff --git a/awkernel_async_lib/src/time_interval.rs b/awkernel_async_lib/src/time_interval.rs index 96ce16219..d3eb054af 100644 --- a/awkernel_async_lib/src/time_interval.rs +++ b/awkernel_async_lib/src/time_interval.rs @@ -29,6 +29,8 @@ //! SOFTWARE. use crate::sleep_task::Sleep; +#[cfg(feature = "need-get-period")] +use crate::task::perf::{get_period_count, update_pre_send_outer_timestamp_at}; use alloc::boxed::Box; use awkernel_lib::time::Time; use core::{ @@ -86,7 +88,7 @@ pub enum MissedTickBehavior { /// use crate::time_interval::interval; /// use core::time::Duration; /// -/// let mut interval = interval(Duration::from_secs(1)); +/// let mut interval = interval(Duration::from_secs(1), 0); /// let mut ticks = 0; /// while ticks < 5 { /// let tick_time = interval.tick().await; @@ -95,13 +97,19 @@ pub enum MissedTickBehavior { /// } /// ``` /// -pub fn interval(period: Duration) -> Interval { +pub fn interval(period: Duration, dag_id: u32) -> Interval { assert!(!period.is_zero(), "`period` must be non-zero."); - interval_at(Time::now(), period) + interval_at(Time::now(), period, dag_id) } -pub fn interval_at(start: Time, period: Duration) -> Interval { +pub fn interval_at(start: Time, period: Duration, _dag_id: u32) -> Interval { assert!(!period.is_zero(), "`period` must be non-zero."); + #[cfg(feature = "need-get-period")] + { + let index = get_period_count(_dag_id) as usize; + // [start] cycle deviation index == 0 (basis of cycle deviation) + update_pre_send_outer_timestamp_at(index, start.uptime().as_nanos() as u64, _dag_id); + } Interval { delay: None, next_tick_target: start, diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 9368f5904..efcf539e0 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -78,6 +78,7 @@ optional = true [features] default = ["x86"] perf = ["awkernel_async_lib/perf", "userland/perf"] +need-get-period = ["perf", "awkernel_async_lib/need-get-period"] no_std_unwinding = ["dep:unwinding"] debug = ["dep:gimli", "dep:addr2line"] x86 = [ @@ -92,12 +93,14 @@ x86 = [ "awkernel_drivers/uart_16550", ] rv32 = [ + "perf", "no_std_unwinding", "awkernel_lib/rv32", "dep:ns16550a", "awkernel_drivers/rv32", ] rv64 = [ + "perf", "no_std_unwinding", "awkernel_lib/rv64", "dep:ns16550a", diff --git a/kernel/src/main.rs b/kernel/src/main.rs index 5d02b2aa0..ee7dc19dc 100644 --- a/kernel/src/main.rs +++ b/kernel/src/main.rs @@ -74,7 +74,17 @@ fn main(kernel_info: KernelInfo) { // Enable awkernel_lib::cpu::sleep_cpu() and awkernel_lib::cpu::wakeup_cpu(). unsafe { awkernel_lib::cpu::init_sleep() }; + #[cfg(feature = "need-get-period")] + let mut last_print = awkernel_lib::time::Time::now(); + loop { + #[cfg(feature = "need-get-period")] + if last_print.elapsed().as_secs() >= 30 { + awkernel_async_lib::task::perf::print_timestamp_table(); + // awkernel_async_lib::task::perf::print_pubsub_table(); + + last_print = awkernel_lib::time::Time::now(); + } // handle IRQs { let _irq_enable = awkernel_lib::interrupt::InterruptEnable::new();