Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
4b34c9a
add: test_autoware
nokosaaan Mar 17, 2026
dda7c64
fix: perf features and test_dag for sending period info
nokosaaan Mar 17, 2026
29e5173
fix: feature devision for test_autoware 1
nokosaaan Mar 23, 2026
9cabca9
fix: comment alignment 1
nokosaaan Mar 23, 2026
9832297
fix: module separation 1
nokosaaan Mar 24, 2026
9bac530
fix: revert schedtype and toml
nokosaaan Mar 24, 2026
47de021
add: saveing implementation msg with period
nokosaaan Mar 26, 2026
efb1f02
fix: delete kernel dependics
nokosaaan Mar 26, 2026
b1b921b
fix: no period with test_autoware
nokosaaan Mar 26, 2026
1b18749
fix: reduce impl for get_period with macro
nokosaaan Mar 27, 2026
3c112f9
fix: reset test_dag
nokosaaan Mar 27, 2026
1cfa9ed
fix: delete unnecessary comments
nokosaaan Apr 3, 2026
5b5bcaf
Merge branch 'main' of https://github.com/tier4/awkernel into mrm_dev2
nokosaaan Apr 3, 2026
4e8a8c3
fix: delete warnings 1
nokosaaan Apr 3, 2026
60d50db
fix: delete warnings 2
nokosaaan Apr 15, 2026
028ca6a
Merge branch 'main' of https://github.com/tier4/awkernel into mrm_dev2
nokosaaan Apr 15, 2026
f6e2fcf
Merge branch 'main' of https://github.com/tier4/awkernel into mrm_dev2
nokosaaan Apr 15, 2026
b75b75c
Merge branch 'mrm_dev2' of https://github.com/tier4/awkernel into mrm…
nokosaaan Apr 15, 2026
48f29f1
fix: delete warnings 3
nokosaaan Apr 15, 2026
aed1c5c
fix: sync bootloader-fix
nokosaaan Apr 15, 2026
f024138
sync bootloader-fix 2
nokosaaan Apr 15, 2026
3aca1f9
fix: add need-get-period feature
nokosaaan Apr 15, 2026
87742e5
fix: delete the portion of impl about macro
nokosaaan Apr 15, 2026
5d30fc2
fix: delete unnecessary imports
nokosaaan Apr 15, 2026
a1f0619
Merge branch 'main' of https://github.com/tier4/awkernel into eva-fea…
nokosaaan Apr 17, 2026
59b5354
fix: attach cargo fmt 1
nokosaaan Apr 17, 2026
b77385e
fix: delete if-else diversion and focus feature diversion
nokosaaan Apr 17, 2026
85d9042
fix: apply cargo fmt 2
nokosaaan Apr 17, 2026
c88633a
fix: experimental apply for perf
nokosaaan Apr 17, 2026
8495759
fix: experimental apply for perf 2
nokosaaan Apr 17, 2026
8e03ef4
fix: delete .clone() which has Copy trait
nokosaaan Apr 17, 2026
a728ff8
fix: unused imports and unnecessary casting
nokosaaan Apr 17, 2026
6585af5
fix: apply cargo fmt 3
nokosaaan Apr 17, 2026
bc19c69
fix: delete unnecessary import
nokosaaan Apr 17, 2026
adfa4fd
fix: delete unnecessary functions and modify kernel/src/main.rs
nokosaaan Apr 17, 2026
377d1b1
fix: add get_period_count for getting periodic period
nokosaaan Apr 17, 2026
ddd69c2
fix: async_lib/Cargo.toml
nokosaaan Apr 17, 2026
37cae77
fix: apply cargo fmt 4
nokosaaan Apr 17, 2026
3ff0804
fix: delete NodeRecord
nokosaaan Apr 17, 2026
d8e28cd
fix: unify start publish_timestamp_at to pubsub.rs
nokosaaan Apr 17, 2026
2de13a9
fix: delete unused imports
nokosaaan Apr 17, 2026
9a9d2da
fix: add dependics in kernel/Cargo.toml for print_timestamp_table
nokosaaan Apr 17, 2026
14f92ca
fix: clarify the responsibility between send_all and send_all_with_me…
nokosaaan Apr 17, 2026
968c058
fix: delete get_period and
nokosaaan Apr 17, 2026
efc54b6
fix: apply cargo fmt 5
nokosaaan Apr 17, 2026
64e87b6
fix: apply copilot suggestion in kernel/main.rs
nokosaaan Apr 17, 2026
12fb54a
fix: delete unnecessary declare and add dependics to need
nokosaaan Apr 17, 2026
0e3156a
fix(perf): make period-based recording resilient to dag/index overflow
Copilot Apr 17, 2026
bf7ba7b
fix(perf): warn when period metadata saturates
Copilot Apr 17, 2026
5ff88eb
fix: apply copilot suggestion 1
nokosaaan Apr 17, 2026
465f75e
fix: apply copilot suggestion 2
nokosaaan Apr 17, 2026
74b6d27
Update awkernel_async_lib/src/pubsub.rs
nokosaaan Apr 17, 2026
f8589f5
Update awkernel_async_lib/src/pubsub.rs
nokosaaan Apr 17, 2026
160c9eb
fix(perf): snapshot and cap timestamp summary output
Copilot Apr 17, 2026
2c08c1c
Update awkernel_async_lib/src/task.rs
nokosaaan Apr 17, 2026
696a272
fix: apply cargo fmt 6
nokosaaan Apr 19, 2026
6545deb
fix(time_interval): pass dag_id as u32 to perf period API
Copilot Apr 19, 2026
1e85519
Update awkernel_async_lib/src/pubsub.rs
nokosaaan Apr 20, 2026
ef6f24e
docs(time_interval): update interval example with dag_id argument
Copilot Apr 20, 2026
dd39f3e
fix: change pubsub test case
nokosaaan Apr 20, 2026
74fef12
Merge branch 'eva-feature' of https://github.com/tier4/awkernel into …
nokosaaan Apr 20, 2026
9b76322
Update awkernel_async_lib/src/task.rs
nokosaaan Apr 20, 2026
786398a
Update awkernel_async_lib/src/pubsub.rs
nokosaaan Apr 20, 2026
b54e541
Merge branch 'main' into eva-feature
nokosaaan Apr 20, 2026
50794a2
Update awkernel_async_lib/src/dag.rs
nokosaaan Apr 20, 2026
95c8fef
Update awkernel_async_lib/src/task.rs
nokosaaan Apr 20, 2026
c5ed577
fix: apply cargo fmt 7
nokosaaan Apr 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions awkernel_async_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ no_preempt = []
spinlock = ["awkernel_lib/spinlock"]
clippy = []
perf = []
need-get-period = ["perf"]
85 changes: 76 additions & 9 deletions awkernel_async_lib/src/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ use crate::{
time_interval::interval,
Attribute, MultipleReceiver, MultipleSender, VectorToPublishers, VectorToSubscribers,
};

#[cfg(feature = "need-get-period")]
use crate::task::perf::{
get_period_count, increment_period_count, subscribe_timestamp_at,
update_fin_recv_outer_timestamp_at, update_pre_send_outer_timestamp_at,
};

use alloc::{
borrow::Cow,
boxed::Box,
Expand Down Expand Up @@ -926,10 +933,31 @@ where
Args::create_subscribers(subscribe_topic_names, Attribute::default());

loop {
let args: <<Args as VectorToSubscribers>::Subscribers as MultipleReceiver>::Item =
subscribers.recv_all().await;
let results = f(args);
publishers.send_all(results).await;
#[cfg(feature = "need-get-period")]
{
let (args, count_st): (
<<Args as VectorToSubscribers>::Subscribers as MultipleReceiver>::Item,
u32,
) = subscribers.recv_all_with_period().await;

// [end] pubsub communication latency
let end = awkernel_lib::time::Time::now().uptime().as_nanos() as u64;
subscribe_timestamp_at(count_st as usize, end, 1, dag_info.node_id.clone());

let results = f(args);
publishers
.send_all_with_meta(results, 1, count_st as usize, dag_info.node_id)
Comment thread
nokosaaan marked this conversation as resolved.
.await;
}

#[cfg(not(feature = "need-get-period"))]
{
let args: <<Args as VectorToSubscribers>::Subscribers as MultipleReceiver>::Item =
subscribers.recv_all().await;

let results = f(args);
publishers.send_all(results).await;
}
}
};

Expand Down Expand Up @@ -966,13 +994,31 @@ where
Attribute::default(),
);

let mut interval = interval(period);
let mut interval = interval(period, dag_info.dag_id);
// Consume the first tick here to start the loop's main body without an initial delay.
interval.tick().await;

loop {
let results = f();
publishers.send_all(results).await;
#[cfg(feature = "need-get-period")]
{
let index = get_period_count(dag_info.dag_id) as usize;
if index != 0 {
// [start] cycle deviation index >= 1
let release_time = awkernel_lib::time::Time::now().uptime().as_nanos() as u64;
update_pre_send_outer_timestamp_at(index, release_time, dag_info.dag_id);
}
let results = f();
publishers
.send_all_with_meta(results, 0, index, dag_info.node_id)
.await;
increment_period_count(dag_info.dag_id);
}
Comment thread
nokosaaan marked this conversation as resolved.

#[cfg(not(feature = "need-get-period"))]
{
let results = f();
publishers.send_all(results).await;
}

#[cfg(feature = "perf")]
periodic_measure();
Expand Down Expand Up @@ -1006,8 +1052,29 @@ where
Args::create_subscribers(subscribe_topic_names, Attribute::default());

loop {
let args: <Args::Subscribers as MultipleReceiver>::Item = subscribers.recv_all().await;
f(args);
#[cfg(feature = "need-get-period")]
{
let (args, count_st): (<Args::Subscribers as MultipleReceiver>::Item, u32) =
subscribers.recv_all_with_period().await;

// [end] pubsub communication latency
let end = awkernel_lib::time::Time::now().uptime().as_nanos() as u64;
subscribe_timestamp_at(count_st as usize, end, 2, dag_info.node_id.clone());

let timenow = awkernel_lib::time::Time::now().uptime().as_nanos() as u64;
if count_st != 0 {
update_fin_recv_outer_timestamp_at(count_st as usize, timenow, dag_info.dag_id);
}

f(args);
}

#[cfg(not(feature = "need-get-period"))]
{
let args: <Args::Subscribers as MultipleReceiver>::Item =
subscribers.recv_all().await;
f(args);
}
}
};

Expand Down
180 changes: 180 additions & 0 deletions awkernel_async_lib/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,16 @@ use core::{
use futures::Future;
use pin_project::pin_project;

#[cfg(feature = "need-get-period")]
use crate::task::perf::publish_timestamp_at;

/// Data and timestamp.
#[derive(Clone)]
pub struct Data<T> {
pub timestamp: awkernel_lib::time::Time,
pub data: T,
#[cfg(feature = "need-get-period")]
pub index: u32,
}

/// Publisher.
Expand Down Expand Up @@ -260,6 +265,8 @@ struct Sender<'a, T: 'static + Send> {
subscribers: VecDeque<ArcInner<T>>,
state: SenderState,
timestamp: awkernel_lib::time::Time,
#[cfg(feature = "need-get-period")]
index: u32,
}

enum SenderState {
Expand All @@ -276,8 +283,16 @@ impl<'a, T: Send> Sender<'a, T> {
subscribers: Default::default(),
state: SenderState::Start,
timestamp: awkernel_lib::time::Time::now(),
#[cfg(feature = "need-get-period")]
index: 0,
}
}

#[cfg(feature = "need-get-period")]
pub(super) fn with_period(mut self, index: u32) -> Self {
self.index = index;
self
}
}

impl<T> Future for Sender<'_, T>
Expand Down Expand Up @@ -309,6 +324,8 @@ where
if let Err(data) = guard.push(Data {
timestamp: awkernel_lib::time::Time::now(),
data: data.clone(),
#[cfg(feature = "need-get-period")]
index: *this.index,
}) {
// If the send buffer is full, then remove the oldest one and store again.
guard.pop();
Expand Down Expand Up @@ -342,6 +359,8 @@ where
match inner.queue.push(Data {
timestamp: *this.timestamp,
data: data.clone(),
#[cfg(feature = "need-get-period")]
index: *this.index,
}) {
Ok(_) => {
// Wake the subscriber up.
Expand Down Expand Up @@ -386,8 +405,94 @@ where
sender.await;
r#yield().await;
}

#[cfg(feature = "need-get-period")]
pub async fn send_with_meta(&self, data: T, pub_id: u32, index: usize, node_id: u32) {
// [start] pubsub communication latency
let start = awkernel_lib::time::Time::now().uptime().as_nanos() as u64;
publish_timestamp_at(index, start, pub_id, node_id);
let period = match u32::try_from(index) {
Ok(period) => period,
Err(_) => {
log::warn!(
"Period index {} exceeds u32::MAX; saturating period metadata",
index
);
u32::MAX
}
};
let sender = Sender::new(self, data).with_period(period);
sender.await;
r#yield().await;
}
}

Comment thread
nokosaaan marked this conversation as resolved.
#[cfg(all(test, feature = "need-get-period"))]
mod need_get_period_tests {
use super::*;
use core::{
future::Future,
pin::Pin,
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
};

fn block_on<F: Future>(future: F) -> F::Output {
fn raw_waker() -> RawWaker {
fn clone(_: *const ()) -> RawWaker {
raw_waker()
}
fn wake(_: *const ()) {}
fn wake_by_ref(_: *const ()) {}
fn drop(_: *const ()) {}

RawWaker::new(
core::ptr::null(),
&RawWakerVTable::new(clone, wake, wake_by_ref, drop),
)
}

let waker = unsafe { Waker::from_raw(raw_waker()) };
let mut future = Box::pin(future);

loop {
let mut context = Context::from_waker(&waker);
match Pin::as_mut(&mut future).poll(&mut context) {
Poll::Ready(output) => return output,
Poll::Pending => {}
}
}
}

#[test]
fn send_with_meta_propagates_period_to_receiver() {
block_on(async {
let (publisher, subscriber) = create_pubsub::<u32>(Attribute::default());
publisher.send_with_meta(42, 1, 7, 99).await;

let received = subscriber.recv().await;
assert_eq!(received.data, 42);
assert_eq!(received.index, 7);
});
}

#[test]
fn tuple_recv_all_with_period_returns_shared_period() {
block_on(async {
let (publisher1, subscriber1) = create_pubsub::<u32>(Attribute::default());
let (publisher2, subscriber2) = create_pubsub::<u32>(Attribute::default());

publisher1.send_with_meta(10, 11, 3, 21).await;
publisher2.send_with_meta(20, 12, 3, 22).await;

let ((value1, value2), period) =
(subscriber1, subscriber2).recv_all_with_period().await;

assert_eq!(value1, 10);
assert_eq!(value2, 20);
assert_eq!(period, 3);
});
}
}
/// Create an anonymous publisher and an anonymous subscriber.
/// This channel works as a channel of multiple producers and multiple consumers.
///
Expand Down Expand Up @@ -756,12 +861,24 @@ pub trait MultipleReceiver {
type Item;

fn recv_all(&self) -> Pin<Box<dyn Future<Output = Self::Item> + Send + '_>>;

#[cfg(feature = "need-get-period")]
fn recv_all_with_period(&self) -> Pin<Box<dyn Future<Output = (Self::Item, u32)> + Send + '_>>;
}

pub trait MultipleSender {
type Item;

fn send_all(&self, item: Self::Item) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;

#[cfg(feature = "need-get-period")]
fn send_all_with_meta(
&self,
item: Self::Item,
pub_id: u32,
index: usize,
node_id: u32,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
Comment thread
nokosaaan marked this conversation as resolved.
}
Comment thread
nokosaaan marked this conversation as resolved.
pub trait VectorToPublishers {
type Publishers: MultipleSender;
Expand Down Expand Up @@ -834,6 +951,13 @@ macro_rules! impl_async_receiver_for_tuple {
fn recv_all(&self) -> Pin<Box<dyn Future<Output = Self::Item> + Send + '_>> {
Box::pin(async move{})
}

#[cfg(feature = "need-get-period")]
fn recv_all_with_period(
&self,
) -> Pin<Box<dyn Future<Output = (Self::Item, u32)> + Send + '_>> {
Box::pin(async move { ((), 0) })
}
}

impl MultipleSender for () {
Expand All @@ -842,6 +966,17 @@ macro_rules! impl_async_receiver_for_tuple {
fn send_all(&self, _item: Self::Item) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async move{})
}

#[cfg(feature = "need-get-period")]
fn send_all_with_meta(
&self,
_item: Self::Item,
_pub_id: u32,
_index: usize,
_node_id: u32,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async move{})
}
}
};
($(($T:ident, $idx:tt, $idx2:tt)),+) => {
Expand All @@ -854,6 +989,34 @@ macro_rules! impl_async_receiver_for_tuple {
($($idx.recv().await.data,)+)
})
}

#[cfg(feature = "need-get-period")]
fn recv_all_with_period(
&self,
) -> Pin<Box<dyn Future<Output = (Self::Item, u32)> + Send + '_>> {
let ($($idx,)+) = self;
Box::pin(async move {
let mut period: Option<u32> = None;
$(
let item = $idx.recv().await;
match period {
Some(expected) => {
assert!(
expected == item.index,
"recv_all_with_period received mismatched periods: expected {}, got {}",
expected,
item.index
);
}
None => {
period = Some(item.index);
}
}
let $idx2 = item.data;
)+
(($($idx2,)+), period.expect("recv_all_with_period requires at least one subscriber"))
})
}
}

impl<$($T: Clone + Sync + Send + 'static),+> MultipleSender for ($(Publisher<$T>,)+) {
Expand All @@ -868,6 +1031,23 @@ macro_rules! impl_async_receiver_for_tuple {
)+
})
}

#[cfg(feature = "need-get-period")]
fn send_all_with_meta(
&self,
item: Self::Item,
pub_id: u32,
index: usize,
node_id: u32,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let ($($idx,)+) = self;
let ($($idx2,)+) = item;
Box::pin(async move {
$(
$idx.send_with_meta($idx2, pub_id, index, node_id).await;
)+
})
}
}
};
}
Expand Down
Loading
Loading