Conversation
There was a problem hiding this comment.
Pull request overview
Adds a configurable hard cap to the shared FlowTable so that when the table reaches capacity, new flows are not created and packets are dropped accordingly (default: 1,000,000 flows). This ties capacity into external config (via k8s conversion) and applies it from mgmt onto the dataplane’s shared flow table.
Changes:
- Introduce
FlowTablecapacity tracking +CapacityExceedederror and propagate it through insert APIs. - Update NAT (stateful + port-forward) to drop packets when flow creation fails due to capacity.
- Plumb
flow_table_capacityfrom external config (k8s) into mgmt, and apply it to the running shared flow table.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
flow-entry/src/flow_table/table.rs |
Adds capacity state + new error; changes insert APIs to return Result. |
nat/src/stateful/mod.rs |
Maps flow-table capacity failures into NAT error handling / drop reasons. |
nat/src/portfw/nf.rs |
Handles flow-table insertion failures by dropping port-forwarded packets. |
config/src/external/mod.rs |
Adds flow_table_capacity to external config model. |
config/src/converters/k8s/config/gateway_config.rs |
Converts k8s CRD field into ExternalConfig.flow_table_capacity. |
mgmt/src/processor/proc.rs |
Applies configured capacity to the shared FlowTable on config apply. |
mgmt/src/tests/mgmt.rs |
Wires a FlowTable into mgmt processor params for tests. |
mgmt/Cargo.toml |
Adds flow-entry dependency so mgmt can access FlowTable. |
dataplane/src/packet_processor/mod.rs |
Exposes the shared FlowTable from setup so mgmt can update capacity. |
dataplane/src/main.rs |
Passes the shared FlowTable into mgmt startup params. |
k8s-intf/src/bolero/gateway.rs |
Extends bolero generator/normalizer to include flow_table_capacity. |
Comments suppressed due to low confidence (1)
flow-entry/src/flow_table/table.rs:157
confidence: 10
tags: ["logic"]
`insert`/`insert_from_arc` now return `Result<_, FlowTableError>`, but there are still in-repo call sites using the old `Option` return type (e.g. `flow-entry/src/flow_table/nf_lookup.rs` calls `self.flow_table.insert(...)` and `insert_from_arc(...)` without handling a `Result`). This will fail to compile; please update remaining callers to handle/propagate `FlowTableError` (at least `CapacityExceeded`).
/// Add a flow to the table.
///
/// # Returns
///
/// Returns the old `Arc<FlowInfo>` associated with the flow key, if any.
///
/// # Panics
///
/// Panics if:
/// - this thread already holds the read lock on the table orif the table lock is poisoned.
/// - if the `flow_info` to insert has a key different from `flow_key`
///
/// # Errors
///
/// Returns [`FlowTableError::CapacityExceeded`] when the table has reached its hard limit.
pub fn insert(
&self,
flow_key: FlowKey,
mut flow_info: FlowInfo,
) -> Result<Option<Arc<FlowInfo>>, FlowTableError> {
// if the flow_info embeds its key already, it must match `flow_key`
flow_info.flowkey().inspect(|key| {
assert_eq!(
*key, &flow_key,
"Attempted to insert a flow with key: {key} with a distinct key: {flow_key}"
);
});
// embed the key in the flow if it did not provide one
if flow_info.flowkey().is_none() {
flow_info.set_flowkey(flow_key);
}
debug!("insert: Inserting flow key {:?}", flow_key);
let val = Arc::new(flow_info);
self.insert_common(flow_key, &val)
}
</details>
|
Converting to draft because Fabric side is not yet merged |
1067b06 to
e2a76ee
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 12 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
flow-entry/src/flow_table/table.rs:186
confidence: 9
tags: [logic]
`FlowTable::insert`, `insert_from_arc`, and `reinsert` now return `Result<_, FlowTableError>`, but there are still in-repo call sites that ignore the result (and will not compile / will drop capacity errors), e.g. `flow-entry/src/flow_table/nf_lookup.rs` uses `flow_table.insert(...)` and `insert_from_arc(...)` without handling `Result`. Please update all call sites to handle/propagate `FlowTableError` appropriately.
/// # Errors
///
/// Returns [`FlowTableError::CapacityExceeded`] when the table has reached its hard limit.
pub fn insert(
&self,
flow_key: FlowKey,
mut flow_info: FlowInfo,
) -> Result<Option<Arc<FlowInfo>>, FlowTableError> {
// if the flow_info embeds its key already, it must match `flow_key`
flow_info.flowkey().inspect(|key| {
assert_eq!(
*key, &flow_key,
"Attempted to insert a flow with key: {key} with a distinct key: {flow_key}"
);
});
// embed the key in the flow if it did not provide one
if flow_info.flowkey().is_none() {
flow_info.set_flowkey(flow_key);
}
debug!("insert: Inserting flow key {:?}", flow_key);
let val = Arc::new(flow_info);
self.insert_common(flow_key, &val)
}
/// Add a flow entry to the table from a `&Arc<FlowInfo>`
///
/// # Returns
///
/// Returns the old `Arc<FlowInfo>` associated with the flow key, if any.
///
/// # Panics
///
/// Panics if:
/// - this thread already holds the read lock on the table or if the table lock is poisoned.
/// - if the `flow_info` to insert has a key different from `flow_key`
/// # Errors
///
/// Returns [`FlowTableError::CapacityExceeded`] when the table has reached its hard limit.
pub fn insert_from_arc(
&self,
flow_key: FlowKey,
flow_info: &Arc<FlowInfo>,
) -> Result<Option<Arc<FlowInfo>>, FlowTableError> {
flow_info.flowkey().inspect(|key| {
assert_eq!(
*key, &flow_key,
"Attempted to insert a flow with key: {key} with a distinct key: {flow_key}"
);
});
debug!("insert: Inserting flow key {:?}", flow_key);
self.insert_common(flow_key, flow_info)
}
</details>
e2a76ee to
debf2fb
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 12 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
flow-entry/src/flow_table/table.rs:186
confidence: 9
tags: [logic]
`FlowTable::insert`, `insert_from_arc`, and `reinsert` now return `Result<...>`, but there are still call sites using the old `Option` return type (e.g. `flow-entry/src/flow_table/nf_lookup.rs` uses `flow_table.insert(...)` and `flow_table.insert_from_arc(...)` without handling `Result`). Those tests will no longer compile until updated to handle/unwrap the `Result`.
pub fn insert(
&self,
flow_key: FlowKey,
mut flow_info: FlowInfo,
) -> Result<Option<Arc<FlowInfo>>, FlowTableError> {
// if the flow_info embeds its key already, it must match `flow_key`
flow_info.flowkey().inspect(|key| {
assert_eq!(
*key, &flow_key,
"Attempted to insert a flow with key: {key} with a distinct key: {flow_key}"
);
});
// embed the key in the flow if it did not provide one
if flow_info.flowkey().is_none() {
flow_info.set_flowkey(flow_key);
}
debug!("insert: Inserting flow key {:?}", flow_key);
let val = Arc::new(flow_info);
self.insert_common(flow_key, &val)
}
/// Add a flow entry to the table from a `&Arc<FlowInfo>`
///
/// # Returns
///
/// Returns the old `Arc<FlowInfo>` associated with the flow key, if any.
///
/// # Panics
///
/// Panics if:
/// - this thread already holds the read lock on the table or if the table lock is poisoned.
/// - if the `flow_info` to insert has a key different from `flow_key`
/// # Errors
///
/// Returns [`FlowTableError::CapacityExceeded`] when the table has reached its hard limit.
pub fn insert_from_arc(
&self,
flow_key: FlowKey,
flow_info: &Arc<FlowInfo>,
) -> Result<Option<Arc<FlowInfo>>, FlowTableError> {
flow_info.flowkey().inspect(|key| {
assert_eq!(
*key, &flow_key,
"Attempted to insert a flow with key: {key} with a distinct key: {flow_key}"
);
});
debug!("insert: Inserting flow key {:?}", flow_key);
self.insert_common(flow_key, flow_info)
}
</details>
debf2fb to
414263e
Compare
414263e to
2527470
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 12 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
flow-entry/src/flow_table/table.rs:186
confidence: 9
tags: [logic]
`FlowTable::insert` / `insert_from_arc` now return `Result<_, FlowTableError>`, but there are still in-repo call sites using the old `Option` return type (e.g. `flow-entry/src/flow_table/nf_lookup.rs:106,135,202-203`). This will fail to compile and/or ignore capacity failures. Update those call sites to handle the `Result` (even if just `.unwrap()` in tests, and proper error handling in the NF if inserts can occur at runtime).
/// # Errors
///
/// Returns [`FlowTableError::CapacityExceeded`] when the table has reached its hard limit.
pub fn insert(
&self,
flow_key: FlowKey,
mut flow_info: FlowInfo,
) -> Result<Option<Arc<FlowInfo>>, FlowTableError> {
// if the flow_info embeds its key already, it must match `flow_key`
flow_info.flowkey().inspect(|key| {
assert_eq!(
*key, &flow_key,
"Attempted to insert a flow with key: {key} with a distinct key: {flow_key}"
);
});
// embed the key in the flow if it did not provide one
if flow_info.flowkey().is_none() {
flow_info.set_flowkey(flow_key);
}
debug!("insert: Inserting flow key {:?}", flow_key);
let val = Arc::new(flow_info);
self.insert_common(flow_key, &val)
}
/// Add a flow entry to the table from a `&Arc<FlowInfo>`
///
/// # Returns
///
/// Returns the old `Arc<FlowInfo>` associated with the flow key, if any.
///
/// # Panics
///
/// Panics if:
/// - this thread already holds the read lock on the table or if the table lock is poisoned.
/// - if the `flow_info` to insert has a key different from `flow_key`
/// # Errors
///
/// Returns [`FlowTableError::CapacityExceeded`] when the table has reached its hard limit.
pub fn insert_from_arc(
&self,
flow_key: FlowKey,
flow_info: &Arc<FlowInfo>,
) -> Result<Option<Arc<FlowInfo>>, FlowTableError> {
flow_info.flowkey().inspect(|key| {
assert_eq!(
*key, &flow_key,
"Attempted to insert a flow with key: {key} with a distinct key: {flow_key}"
);
});
debug!("insert: Inserting flow key {:?}", flow_key);
self.insert_common(flow_key, flow_info)
}
</details>
2527470 to
9e8026e
Compare
9e8026e to
df3fc8c
Compare
df3fc8c to
a8cfaa8
Compare
daniel-noland
left a comment
There was a problem hiding this comment.
I'm not done reviewing yet, but I think I spotted an issue. Need to check. I'm going to briefly mark this as don't-merge
| } | ||
|
|
||
| pub fn set_capacity(&self, capacity: usize) { | ||
| self.capacity.store(capacity, Ordering::Relaxed); |
There was a problem hiding this comment.
we will need fuzz tests which run against the thread sanitizer to make sure we don't have race conditions. What was the logic for using Relaxed ordering?
There was a problem hiding this comment.
This is probably fine in this case as we don't need this to be a super hard limit with no races at all. We just want to make sure that we enforce the policy correctly. Does Relaxed make any guarantees about when this will be seen? E.g., can it take 30s for a remote CPU to see the update with a synchronization instruction intervening?
There was a problem hiding this comment.
Also, the fuzzer might be overkill for this use case. Again, the real question is what are the semantics of relaxed when the reader isn't using a synchronizing instruction.
| continue; | ||
| } | ||
|
|
||
| drop(table); |
There was a problem hiding this comment.
why the manual drop here?
This feels like a serious problem but I need to look closer into it
@daniel-noland , bear in mind that this PR needs to be rebased on main, which contains significant changes. |
bf4ccec to
901a4dd
Compare
|
@daniel-noland Thank you for the review. I will mark several of your comments as unrelated since I've applied new logic to the flow insertion if corresponding flow is there and there was a rebase on top of main |
901a4dd to
d5f9972
Compare
| // Leaving one side of a pair without its reverse is worse than briefly exceeding | ||
| // the limit by one entry. Concurrent inserts from other workers may cause the | ||
| // count to drift slightly above the limit; that is acceptable. | ||
| if !table.contains_key(&flow_key) && table.len() >= capacity { |
There was a problem hiding this comment.
I have two comments on this logic.
-
the comment of the code seems to indicate that we refresh flows by re-inserting them. That could be done and there actually exists a re-insert. However, we don't do that and wonder if we ever should since flow infos get attached extra info (e.g. NAT) that would be lost this way unless re-created too. In general, I'd say that we refresh only on activity (from packets references) and just by sliding the timeout forward. If we're sure we'll always go that way, the contains() call (which is a lookup) may be avoided.
-
later, you check if a flow has a related flow and that related can be upgraded and is in the table. That's another lookup. I wonder if we could avoid that second lookup by inspecting the status of the flow. Right now, IIRC, a flow is active if it lives in the table. The flow info constructor sets it to Active. If we changed that behavior so that a flow would be active iff it was in the flow table, then the table.get(k) below could be replaced by a check of the status. Ofc, checking the atomic may not be as reliable as the direct lookup, but it may be 99.999% of the times ? Food for thought....
There was a problem hiding this comment.
Answering inline:
1 - I think you are right. contains_key is here to divide "new entry" from "replaced entry" . We could replace the contains_key + table.len() check with just table.len(), treating replacements as capacity-consuming too — which is conservative but correct and eliminates the lookup. But as a reminder table.len() returns not only Active flows present in a table
2 - I think we first mark flow Active (insert_common) and only then add it into the DashMap. I guess we can swap those things in the code, and rel.status() == Active can be a good check in this case
There was a problem hiding this comment.
Answers for answer ;-)
1- I think our goal is to protect us from memory exhaustion with many flows. So, if there are "many" flows, we should stop admitting more. Therefore, whether they are active or not is irrelevant. All consume the same.
2- Yeah. I think you could even use rel.is_active() IIRC.
If `FlowTable` size is reached configured capacity new flows won't be created and appropriate packet should be dropped. Signed-off-by: Sergey Matov <sergey.matov@githedgehog.com>
d5f9972 to
4b115c8
Compare
Fredi-raspall
left a comment
There was a problem hiding this comment.
LGTM. Just one minor thing that should be corrected IMO.
| .related | ||
| .as_ref() | ||
| .and_then(Weak::upgrade) | ||
| .is_some_and(|rel| rel.status() == FlowStatus::Active); |
There was a problem hiding this comment.
Here you could use FlowInfo::is_valid() (which should actually be renamed to is_active()).
Most importantly, I believe you should change FlowInfo::new() so that when FlowInfos are constructed, their state is NOT Active. Else, this check is not helpful since, by default, all flowInfos are created as Active. You could define a new state or actually use Detached (not an ideal name), but which means a flow that is not present in the table.
There was a problem hiding this comment.
Can we stack a commit before this one for this PR that does the rename?
If
FlowTablesize is reached configured capacity new flows won't be created and appropriate packet should be dropped.Changes including NAT/PortFW error handling and appropriate packet drop.
Default value is set to 1M of flows.