Conversation
Greptile SummaryThis PR adds a Rust native module SDK (
Confidence Score: 4/5Safe to merge after fixing the empty-dict-to-null coercion in native_module.py; remaining findings are P2 style/hardening suggestions. One P1 logic bug: the dimos/core/native_module.py — the config serialization logic at line 181 needs the falsy-dict guard removed. Important Files Changed
Sequence DiagramsequenceDiagram
participant Py as NativeModule (Python)
participant Proc as Rust Process
participant BG as Background Task (tokio)
participant LCM as LCM Bus
Py->>Proc: spawn(executable + CLI args)
Py->>Proc: stdin: {"topics": {...}, "config": {...}}
Proc->>Proc: from_stdin() — parse topics & config
Proc->>Proc: module.input() / module.output() — register routes
Proc->>BG: module.spawn() — start select! loop
loop message loop
LCM-->>BG: transport.recv() -> (channel, data)
BG->>BG: match route.topic() -> try_dispatch(data)
BG-->>Proc: mpsc channel delivers typed message
Proc->>BG: Output::publish(msg) -> mpsc send
BG->>LCM: transport.publish(topic, data)
end
Py->>Proc: SIGTERM (on stop())
Proc-->>Py: exit
|
| ) | ||
|
|
There was a problem hiding this comment.
Empty dict coerced to
null, breaking structs with all-optional fields
config_dict if config_dict else None converts an empty dict {} to None, which serialises to JSON null. Any Rust module whose Python-side config fields are all None (e.g. optional_flag: int | None = None) will produce an empty dict and cause the Rust from_stdin::<PongConfig>() call to fail with "failed to deserialize config" — because serde_json::from_value(Value::Null) cannot produce a struct type, even one with all #[serde(default)] fields.
The conditional should be removed so the dict (possibly {}) is always forwarded:
| ) | |
| stdin_blob = json.dumps( | |
| {"topics": topics, "config": config_dict} | |
| ).encode() + b"\n" |
| let mut line = String::new(); | ||
| io::stdin().lock().read_line(&mut line)?; | ||
|
|
There was a problem hiding this comment.
Blocking I/O inside an async function
io::stdin().lock().read_line(&mut line)? is a synchronous, blocking call. Inside a tokio::main context it occupies a worker thread for the duration of the read. Although the pipe already has data when Python spawns the process (so the block is brief in practice), Tokio's documentation recommends wrapping blocking calls with spawn_blocking or using the async-aware tokio::io::AsyncBufReadExt alternative:
use tokio::io::{AsyncBufReadExt, BufReader};
let mut stdin = BufReader::new(tokio::io::stdin());
let mut line = String::new();
stdin.read_line(&mut line).await?;| loop { | ||
| tokio::select! { | ||
| result = transport.recv() => match result { | ||
| Ok((channel, data)) => { | ||
| for route in &routes { | ||
| if route.topic() == channel { | ||
| route.try_dispatch(&data); | ||
| } | ||
| } | ||
| } | ||
| Err(e) => eprintln!("dimos_module: recv error: {e}"), | ||
| }, | ||
| Some((topic, data)) = publish_rx.recv() => { | ||
| if let Err(e) = transport.publish(&topic, &data).await { | ||
| eprintln!("dimos_module: publish error on {topic}: {e}"); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| }); | ||
|
|
There was a problem hiding this comment.
Busy-loop on persistent transport errors
When transport.recv() returns an Err the error is logged and the loop immediately retries with no delay. If the underlying LCM socket enters a persistent error state (e.g. network interface gone), this will saturate a CPU core with error prints. Consider adding a short backoff or a consecutive-error counter:
Err(e) => {
eprintln!("dimos_module: recv error: {e}");
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}| stderr=subprocess.PIPE, | ||
| ) | ||
| assert self._process.stdin is not None | ||
| self._process.stdin.write(stdin_blob) |
There was a problem hiding this comment.
nice, clean, will just mess with existing native modules, can we have a toggle? maybe a class itself can have an attribute on nativemodule version, new version expects stdin blob, old one, args
There was a problem hiding this comment.
Sure i can add a field to the native module config to tell it if it should read from cli args or stdin for the configs. (default to cli args for now, but we should move any existing modules to stdin asap, should be a pretty minor change)
| @@ -0,0 +1,40 @@ | |||
| // NativeModule ping example. | |||
| // | |||
There was a problem hiding this comment.
Can we move this to dimos/examples/ dir? so we that don't create more example dirs
slight issue - dimos/examples/language-interop has examples of totally external scripts talking to dimos, this is doing actual native modules (I assume proper way going forward), so should reformat the examples/ dir somehow to make sense
direct dimos vs native-module dimos interop examples? idk, you can decide. we'll point new devs / users to whatever you write here - so in your interest to make it nice otherwise they'll talk to you :)
There was a problem hiding this comment.
Makes sense, I'll move it over. My vision is to standardize the native module sdk across all the languages to match the Rust implementation, and then examples/native-modules can have an example module of each one and show them being coordinated by dimos.
Problem
We need support for Rust native modules to offload performance critical modules.
Closes DIM-XXX
Solution
See native/rust/examples for a Rust native ping pong example
Added NativeModule struct to allow easy publishing and subscribing to topics. NativeModules can use any transport that implements the Transport trait (currently only LCM).
Rust NativeModules also support configuration through stdin instead of cli args. Cli args are kept for backwards compatibility.
Breaking Changes
None
How to Test
For unit tests
cd dimos/native/rust
cargo test
For end to end test with two Rust modules talking over LCM
python3 rust_ping_pong.py
Contributor License Agreement