Skip to content

feat(flowtable): Add FlowTable Capacity#1431

Open
sergeymatov wants to merge 1 commit intomainfrom
pr/smatov/flowtable-capacity
Open

feat(flowtable): Add FlowTable Capacity#1431
sergeymatov wants to merge 1 commit intomainfrom
pr/smatov/flowtable-capacity

Conversation

@sergeymatov
Copy link
Copy Markdown
Contributor

If FlowTable size is reached configured capacity new flows won't be created and appropriate packet should be dropped.
Changes including NAT/PortFW error handling and appropriate packet drop.
Default value is set to 1M of flows.

Copilot AI review requested due to automatic review settings April 2, 2026 15:35
@sergeymatov sergeymatov requested a review from a team as a code owner April 2, 2026 15:35
@sergeymatov sergeymatov requested review from daniel-noland and removed request for a team April 2, 2026 15:35
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a configurable hard cap to the shared FlowTable so that when the table reaches capacity, new flows are not created and packets are dropped accordingly (default: 1,000,000 flows). This ties capacity into external config (via k8s conversion) and applies it from mgmt onto the dataplane’s shared flow table.

Changes:

  • Introduce FlowTable capacity tracking + CapacityExceeded error and propagate it through insert APIs.
  • Update NAT (stateful + port-forward) to drop packets when flow creation fails due to capacity.
  • Plumb flow_table_capacity from external config (k8s) into mgmt, and apply it to the running shared flow table.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
flow-entry/src/flow_table/table.rs Adds capacity state + new error; changes insert APIs to return Result.
nat/src/stateful/mod.rs Maps flow-table capacity failures into NAT error handling / drop reasons.
nat/src/portfw/nf.rs Handles flow-table insertion failures by dropping port-forwarded packets.
config/src/external/mod.rs Adds flow_table_capacity to external config model.
config/src/converters/k8s/config/gateway_config.rs Converts k8s CRD field into ExternalConfig.flow_table_capacity.
mgmt/src/processor/proc.rs Applies configured capacity to the shared FlowTable on config apply.
mgmt/src/tests/mgmt.rs Wires a FlowTable into mgmt processor params for tests.
mgmt/Cargo.toml Adds flow-entry dependency so mgmt can access FlowTable.
dataplane/src/packet_processor/mod.rs Exposes the shared FlowTable from setup so mgmt can update capacity.
dataplane/src/main.rs Passes the shared FlowTable into mgmt startup params.
k8s-intf/src/bolero/gateway.rs Extends bolero generator/normalizer to include flow_table_capacity.
Comments suppressed due to low confidence (1)

flow-entry/src/flow_table/table.rs:157

confidence: 10
tags: ["logic"]

`insert`/`insert_from_arc` now return `Result<_, FlowTableError>`, but there are still in-repo call sites using the old `Option` return type (e.g. `flow-entry/src/flow_table/nf_lookup.rs` calls `self.flow_table.insert(...)` and `insert_from_arc(...)` without handling a `Result`). This will fail to compile; please update remaining callers to handle/propagate `FlowTableError` (at least `CapacityExceeded`).
/// Add a flow to the table.
///
/// # Returns
///
/// Returns the old `Arc<FlowInfo>` associated with the flow key, if any.
///
/// # Panics
///
/// Panics if:
///  - this thread already holds the read lock on the table orif the table lock is poisoned.
///  - if the `flow_info` to insert has a key different from `flow_key`
///
/// # Errors
///
/// Returns [`FlowTableError::CapacityExceeded`] when the table has reached its hard limit.
pub fn insert(
    &self,
    flow_key: FlowKey,
    mut flow_info: FlowInfo,
) -> Result<Option<Arc<FlowInfo>>, FlowTableError> {
    // if the flow_info embeds its key already, it must match `flow_key`
    flow_info.flowkey().inspect(|key| {
        assert_eq!(
            *key, &flow_key,
            "Attempted to insert a flow with key: {key} with a distinct key: {flow_key}"
        );
    });

    // embed the key in the flow if it did not provide one
    if flow_info.flowkey().is_none() {
        flow_info.set_flowkey(flow_key);
    }

    debug!("insert: Inserting flow key {:?}", flow_key);
    let val = Arc::new(flow_info);
    self.insert_common(flow_key, &val)
}
</details>

Comment thread flow-entry/src/flow_table/table.rs Outdated
Comment thread flow-entry/src/flow_table/table.rs Outdated
Comment thread nat/src/stateful/mod.rs Outdated
Comment thread nat/src/portfw/nf.rs Outdated
Comment thread config/src/external/mod.rs
Comment thread mgmt/src/processor/proc.rs Outdated
@sergeymatov sergeymatov marked this pull request as draft April 2, 2026 15:42
@sergeymatov
Copy link
Copy Markdown
Contributor Author

Converting to draft because Fabric side is not yet merged

@sergeymatov sergeymatov force-pushed the pr/smatov/flowtable-capacity branch 3 times, most recently from 1067b06 to e2a76ee Compare April 2, 2026 16:07
@sergeymatov sergeymatov requested a review from Copilot April 2, 2026 16:41
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 12 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (1)

flow-entry/src/flow_table/table.rs:186

confidence: 9
tags: [logic]

`FlowTable::insert`, `insert_from_arc`, and `reinsert` now return `Result<_, FlowTableError>`, but there are still in-repo call sites that ignore the result (and will not compile / will drop capacity errors), e.g. `flow-entry/src/flow_table/nf_lookup.rs` uses `flow_table.insert(...)` and `insert_from_arc(...)` without handling `Result`. Please update all call sites to handle/propagate `FlowTableError` appropriately.
/// # Errors
///
/// Returns [`FlowTableError::CapacityExceeded`] when the table has reached its hard limit.
pub fn insert(
    &self,
    flow_key: FlowKey,
    mut flow_info: FlowInfo,
) -> Result<Option<Arc<FlowInfo>>, FlowTableError> {
    // if the flow_info embeds its key already, it must match `flow_key`
    flow_info.flowkey().inspect(|key| {
        assert_eq!(
            *key, &flow_key,
            "Attempted to insert a flow with key: {key} with a distinct key: {flow_key}"
        );
    });

    // embed the key in the flow if it did not provide one
    if flow_info.flowkey().is_none() {
        flow_info.set_flowkey(flow_key);
    }

    debug!("insert: Inserting flow key {:?}", flow_key);
    let val = Arc::new(flow_info);
    self.insert_common(flow_key, &val)
}

/// Add a flow entry to the table from a `&Arc<FlowInfo>`
///
/// # Returns
///
/// Returns the old `Arc<FlowInfo>` associated with the flow key, if any.
///
/// # Panics
///
/// Panics if:
///   - this thread already holds the read lock on the table or if the table lock is poisoned.
///   - if the `flow_info` to insert has a key different from `flow_key`
/// # Errors
///
/// Returns [`FlowTableError::CapacityExceeded`] when the table has reached its hard limit.
pub fn insert_from_arc(
    &self,
    flow_key: FlowKey,
    flow_info: &Arc<FlowInfo>,
) -> Result<Option<Arc<FlowInfo>>, FlowTableError> {
    flow_info.flowkey().inspect(|key| {
        assert_eq!(
            *key, &flow_key,
            "Attempted to insert a flow with key: {key} with a distinct key: {flow_key}"
        );
    });
    debug!("insert: Inserting flow key {:?}", flow_key);
    self.insert_common(flow_key, flow_info)
}
</details>

Comment thread nat/src/portfw/nf.rs Outdated
Comment thread nat/src/stateful/mod.rs
Comment thread flow-entry/src/flow_table/table.rs Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 12 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (1)

flow-entry/src/flow_table/table.rs:186

confidence: 9
tags: [logic]

`FlowTable::insert`, `insert_from_arc`, and `reinsert` now return `Result<...>`, but there are still call sites using the old `Option` return type (e.g. `flow-entry/src/flow_table/nf_lookup.rs` uses `flow_table.insert(...)` and `flow_table.insert_from_arc(...)` without handling `Result`). Those tests will no longer compile until updated to handle/unwrap the `Result`.
pub fn insert(
    &self,
    flow_key: FlowKey,
    mut flow_info: FlowInfo,
) -> Result<Option<Arc<FlowInfo>>, FlowTableError> {
    // if the flow_info embeds its key already, it must match `flow_key`
    flow_info.flowkey().inspect(|key| {
        assert_eq!(
            *key, &flow_key,
            "Attempted to insert a flow with key: {key} with a distinct key: {flow_key}"
        );
    });

    // embed the key in the flow if it did not provide one
    if flow_info.flowkey().is_none() {
        flow_info.set_flowkey(flow_key);
    }

    debug!("insert: Inserting flow key {:?}", flow_key);
    let val = Arc::new(flow_info);
    self.insert_common(flow_key, &val)
}

/// Add a flow entry to the table from a `&Arc<FlowInfo>`
///
/// # Returns
///
/// Returns the old `Arc<FlowInfo>` associated with the flow key, if any.
///
/// # Panics
///
/// Panics if:
///   - this thread already holds the read lock on the table or if the table lock is poisoned.
///   - if the `flow_info` to insert has a key different from `flow_key`
/// # Errors
///
/// Returns [`FlowTableError::CapacityExceeded`] when the table has reached its hard limit.
pub fn insert_from_arc(
    &self,
    flow_key: FlowKey,
    flow_info: &Arc<FlowInfo>,
) -> Result<Option<Arc<FlowInfo>>, FlowTableError> {
    flow_info.flowkey().inspect(|key| {
        assert_eq!(
            *key, &flow_key,
            "Attempted to insert a flow with key: {key} with a distinct key: {flow_key}"
        );
    });
    debug!("insert: Inserting flow key {:?}", flow_key);
    self.insert_common(flow_key, flow_info)
}
</details>

Comment thread nat/src/portfw/nf.rs Outdated
Comment thread nat/src/portfw/nf.rs Outdated
Comment thread config/src/converters/k8s/config/gateway_config.rs Outdated
@sergeymatov sergeymatov force-pushed the pr/smatov/flowtable-capacity branch from debf2fb to 414263e Compare April 6, 2026 10:21
@sergeymatov sergeymatov requested a review from Copilot April 6, 2026 10:22
@sergeymatov sergeymatov force-pushed the pr/smatov/flowtable-capacity branch from 414263e to 2527470 Compare April 6, 2026 10:26
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 12 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (1)

flow-entry/src/flow_table/table.rs:186

confidence: 9
tags: [logic]

`FlowTable::insert` / `insert_from_arc` now return `Result<_, FlowTableError>`, but there are still in-repo call sites using the old `Option` return type (e.g. `flow-entry/src/flow_table/nf_lookup.rs:106,135,202-203`). This will fail to compile and/or ignore capacity failures. Update those call sites to handle the `Result` (even if just `.unwrap()` in tests, and proper error handling in the NF if inserts can occur at runtime).
/// # Errors
///
/// Returns [`FlowTableError::CapacityExceeded`] when the table has reached its hard limit.
pub fn insert(
    &self,
    flow_key: FlowKey,
    mut flow_info: FlowInfo,
) -> Result<Option<Arc<FlowInfo>>, FlowTableError> {
    // if the flow_info embeds its key already, it must match `flow_key`
    flow_info.flowkey().inspect(|key| {
        assert_eq!(
            *key, &flow_key,
            "Attempted to insert a flow with key: {key} with a distinct key: {flow_key}"
        );
    });

    // embed the key in the flow if it did not provide one
    if flow_info.flowkey().is_none() {
        flow_info.set_flowkey(flow_key);
    }

    debug!("insert: Inserting flow key {:?}", flow_key);
    let val = Arc::new(flow_info);
    self.insert_common(flow_key, &val)
}

/// Add a flow entry to the table from a `&Arc<FlowInfo>`
///
/// # Returns
///
/// Returns the old `Arc<FlowInfo>` associated with the flow key, if any.
///
/// # Panics
///
/// Panics if:
///   - this thread already holds the read lock on the table or if the table lock is poisoned.
///   - if the `flow_info` to insert has a key different from `flow_key`
/// # Errors
///
/// Returns [`FlowTableError::CapacityExceeded`] when the table has reached its hard limit.
pub fn insert_from_arc(
    &self,
    flow_key: FlowKey,
    flow_info: &Arc<FlowInfo>,
) -> Result<Option<Arc<FlowInfo>>, FlowTableError> {
    flow_info.flowkey().inspect(|key| {
        assert_eq!(
            *key, &flow_key,
            "Attempted to insert a flow with key: {key} with a distinct key: {flow_key}"
        );
    });
    debug!("insert: Inserting flow key {:?}", flow_key);
    self.insert_common(flow_key, flow_info)
}
</details>

Comment thread flow-entry/src/flow_table/table.rs Outdated
Comment thread flow-entry/src/flow_table/table.rs Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 12 changed files in this pull request and generated 1 comment.

Comment thread flow-entry/src/flow_table/table.rs
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 13 changed files in this pull request and generated 2 comments.

Comment thread config/src/external/mod.rs Outdated
Comment thread flow-entry/src/flow_table/table.rs
@sergeymatov sergeymatov force-pushed the pr/smatov/flowtable-capacity branch from df3fc8c to a8cfaa8 Compare April 8, 2026 12:16
@sergeymatov sergeymatov marked this pull request as ready for review April 8, 2026 12:28
Comment thread config/src/external/mod.rs Outdated
Comment thread flow-entry/src/flow_table/table.rs Outdated
Comment thread nat/src/portfw/nf.rs Outdated
Comment thread nat/src/stateful/mod.rs Outdated
Copy link
Copy Markdown
Collaborator

@daniel-noland daniel-noland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not done reviewing yet, but I think I spotted an issue. Need to check. I'm going to briefly mark this as don't-merge

Comment thread config/src/converters/k8s/config/gateway_config.rs Outdated
Comment thread config/src/external/mod.rs Outdated
Comment thread flow-entry/src/flow_table/nf_lookup.rs Outdated
}

pub fn set_capacity(&self, capacity: usize) {
self.capacity.store(capacity, Ordering::Relaxed);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will need fuzz tests which run against the thread sanitizer to make sure we don't have race conditions. What was the logic for using Relaxed ordering?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably fine in this case as we don't need this to be a super hard limit with no races at all. We just want to make sure that we enforce the policy correctly. Does Relaxed make any guarantees about when this will be seen? E.g., can it take 30s for a remote CPU to see the update with a synchronization instruction intervening?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the fuzzer might be overkill for this use case. Again, the real question is what are the semantics of relaxed when the reader isn't using a synchronizing instruction.

Comment thread nat/src/stateful/mod.rs Outdated
Comment thread flow-entry/src/flow_table/table.rs Outdated
Comment thread flow-entry/src/flow_table/table.rs Outdated
continue;
}

drop(table);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the manual drop here?

This feels like a serious problem but I need to look closer into it

Comment thread flow-entry/src/flow_table/table.rs Outdated
Comment thread flow-entry/src/flow_table/table.rs Outdated
@daniel-noland daniel-noland added the dont-merge Do not merge this Pull Request label Apr 8, 2026
@Fredi-raspall
Copy link
Copy Markdown
Contributor

I'm not done reviewing yet, but I think I spotted an issue. Need to check. I'm going to briefly mark this as don't-merge

@daniel-noland , bear in mind that this PR needs to be rebased on main, which contains significant changes.

Comment thread flow-entry/src/flow_table/table.rs Outdated
@sergeymatov sergeymatov force-pushed the pr/smatov/flowtable-capacity branch 5 times, most recently from bf4ccec to 901a4dd Compare April 13, 2026 11:05
@sergeymatov
Copy link
Copy Markdown
Contributor Author

@daniel-noland Thank you for the review. I will mark several of your comments as unrelated since I've applied new logic to the flow insertion if corresponding flow is there and there was a rebase on top of main

@sergeymatov sergeymatov force-pushed the pr/smatov/flowtable-capacity branch from 901a4dd to d5f9972 Compare April 13, 2026 16:10
Comment thread flow-entry/src/flow_table/table.rs Outdated
// Leaving one side of a pair without its reverse is worse than briefly exceeding
// the limit by one entry. Concurrent inserts from other workers may cause the
// count to drift slightly above the limit; that is acceptable.
if !table.contains_key(&flow_key) && table.len() >= capacity {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have two comments on this logic.

  1. the comment of the code seems to indicate that we refresh flows by re-inserting them. That could be done and there actually exists a re-insert. However, we don't do that and wonder if we ever should since flow infos get attached extra info (e.g. NAT) that would be lost this way unless re-created too. In general, I'd say that we refresh only on activity (from packets references) and just by sliding the timeout forward. If we're sure we'll always go that way, the contains() call (which is a lookup) may be avoided.

  2. later, you check if a flow has a related flow and that related can be upgraded and is in the table. That's another lookup. I wonder if we could avoid that second lookup by inspecting the status of the flow. Right now, IIRC, a flow is active if it lives in the table. The flow info constructor sets it to Active. If we changed that behavior so that a flow would be active iff it was in the flow table, then the table.get(k) below could be replaced by a check of the status. Ofc, checking the atomic may not be as reliable as the direct lookup, but it may be 99.999% of the times ? Food for thought....

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answering inline:
1 - I think you are right. contains_key is here to divide "new entry" from "replaced entry" . We could replace the contains_key + table.len() check with just table.len(), treating replacements as capacity-consuming too — which is conservative but correct and eliminates the lookup. But as a reminder table.len() returns not only Active flows present in a table

2 - I think we first mark flow Active (insert_common) and only then add it into the DashMap. I guess we can swap those things in the code, and rel.status() == Active can be a good check in this case

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answers for answer ;-)
1- I think our goal is to protect us from memory exhaustion with many flows. So, if there are "many" flows, we should stop admitting more. Therefore, whether they are active or not is irrelevant. All consume the same.
2- Yeah. I think you could even use rel.is_active() IIRC.

Comment thread nat/src/stateful/mod.rs
If `FlowTable` size is reached configured capacity new flows won't
be created and appropriate packet should be dropped.

Signed-off-by: Sergey Matov <sergey.matov@githedgehog.com>
@sergeymatov sergeymatov force-pushed the pr/smatov/flowtable-capacity branch from d5f9972 to 4b115c8 Compare April 15, 2026 16:09
Copy link
Copy Markdown
Contributor

@Fredi-raspall Fredi-raspall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Just one minor thing that should be corrected IMO.

.related
.as_ref()
.and_then(Weak::upgrade)
.is_some_and(|rel| rel.status() == FlowStatus::Active);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you could use FlowInfo::is_valid() (which should actually be renamed to is_active()).
Most importantly, I believe you should change FlowInfo::new() so that when FlowInfos are constructed, their state is NOT Active. Else, this check is not helpful since, by default, all flowInfos are created as Active. You could define a new state or actually use Detached (not an ideal name), but which means a flow that is not present in the table.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we stack a commit before this one for this PR that does the rename?

@daniel-noland daniel-noland removed the dont-merge Do not merge this Pull Request label Apr 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants