Skip to content

feat: Ingestion traces#1629

Merged
nikhilsinhaparseable merged 10 commits intoparseablehq:mainfrom
parmesant:ingestion-traces
Apr 28, 2026
Merged

feat: Ingestion traces#1629
nikhilsinhaparseable merged 10 commits intoparseablehq:mainfrom
parmesant:ingestion-traces

Conversation

@parmesant
Copy link
Copy Markdown
Contributor

@parmesant parmesant commented Apr 23, 2026

Fixes #XXXX.

  • Add traces to ingestion
  • Performance improvements
  • Add new CLI args for actix server

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • OpenTelemetry tracing/export with a telemetry init entrypoint
    • New server CLI options for request timeout, keep-alive, worker count, backlog, and max connections
  • Performance Improvements

    • Batched ingestion and optimized schema/record processing
    • Reduced lock contention during flush/convert and more efficient size computation
  • Observability

    • Broad tracing spans/instrumentation across ingestion, storage, sync, and conversion
  • Reliability

    • Guards to prevent overlapping sync cycles
  • Breaking Changes

    • Removal of a legacy merged-reader public API

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 23, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • ✅ Review completed - (🔄 Check again to review again)

Walkthrough

Adds OpenTelemetry/tracing crates and initializer; instruments ingestion, parsing, storage, sync, and utility hot paths with tracing spans; converts ingest push to synchronous batched flow; extends CLI/Actix server connection settings; refactors staging reader and flush/convert locking/flow.

Changes

Cohort / File(s) Summary
Dependencies
Cargo.toml
Add OpenTelemetry/tracing crates, pin tracing = "0.1.44", tracing-subscriber = "0.3.23" (enable "registry"), and reformat feature lists.
Telemetry entry & re-exports
src/lib.rs, src/telemetry.rs
Add pub mod telemetry, new pub fn init_tracing() -> Option<SdkTracerProvider>, and re-export OTEL/tracing crates.
CLI & Actix server config
src/cli.rs, src/handlers/http/modal/mod.rs
Add CLI flags for request timeout, keep-alive, workers, backlog, max connections; apply options to HttpServer and log startup settings.
Ingest batching & push
src/handlers/http/modal/utils/ingest_utils.rs
Convert push_logs from async→sync, batch records into a single Value::Array, add json_byte_size(), cache schema, split custom-partition path (parallel mapping via Rayon) vs. single-batch path.
Event instrumentation
src/event/mod.rs, src/event/format/mod.rs, src/event/format/json.rs
Add #[instrument] and info_span! spans around event processing, schema inference/derivation, and JSON→Arrow decoding (span fields include record counts and stream identifiers).
Staging reader refactor
src/parseable/staging/reader.rs
Remove MergedRecordReader public API, instrument MergedReverseRecordReader::try_new, avoid per-compare cloning, and tighten error mapping for invalid streams; update tests.
Streams, flush & parquet flow
src/parseable/streams.rs
Add tracing spans for push/prepare/flush/convert, reduce lock contention by swapping writer state under mutex and dropping I/O-heavy writers after unlock, use MergedReverseRecordReader, and instrument spawned tasks.
OTel metrics flattening
src/otel/metrics.rs
Simplify flattening by injecting metric-level fields into datapoints in one pass, refactor resource/scope aggregation, and add tracing spans + output_count.
Sync guards & upload instrumentation
src/sync.rs, src/storage/object_storage.rs
Add atomic guards to prevent overlapping sync cycles with RAII SyncRunningGuard, wrap cycles and per-stream uploads in spans, and return upload results from spawned tasks.
Utils & stats
src/storage/field_stats.rs, src/utils/json/mod.rs
Compute target schema once per batch, replace serde_json::to_vec(...).len() with json_byte_size(&json), and add info_span for JSON conversion.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant ActixServer as Actix Server
    participant IngestUtils as Ingest Utils
    participant StreamMgr as Stream / Storage
    participant Telemetry as OTLP Exporter

    Client->>ActixServer: HTTP ingest request
    ActixServer->>IngestUtils: parse & flatten batch (span)
    IngestUtils->>IngestUtils: infer schema & compute json_byte_size (span)
    IngestUtils->>StreamMgr: push_logs (synchronous batched Value::Array)
    StreamMgr->>StreamMgr: write mem/disk, schedule flush/convert (spans)
    StreamMgr->>Telemetry: emit spans/metrics via tracing -> OTLP exporter
    Telemetry-->>Telemetry: export spans to configured OTLP endpoint
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • de-sh
  • nikhilsinhaparseable

Poem

🐇 I hopped through spans and bytes so bright,

Batched the logs and held them tight,
Schemas snug, exporters sing,
Traces travel on their wing—
A rabbit cheers for telemetry tonight.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 inconclusive)

Check name Status Explanation Resolution
Description check ❓ Inconclusive The PR description identifies key changes (traces to ingestion, performance improvements, new CLI args) but lacks detail and all checklist items remain unchecked, suggesting incomplete preparation before submission. Provide concrete issue ID in 'Fixes' section, expand description with rationale and key changes, and verify checklist items before merging.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat: Ingestion traces' directly relates to the main changes: adding tracing/instrumentation throughout the ingestion pipeline and related subsystems.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (2)
src/parseable/staging/reader.rs (1)

86-101: Add direct tests for the new forward merge path.

MergedRecordReader::merged_iter is now the path used during parquet conversion, but the tests in this file still only exercise _MergedReverseRecordReader / _get_reverse_reader. Please add coverage for multiple files, equal timestamps, and custom time_partition so ordering regressions show up here instead of during conversion.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/staging/reader.rs` around lines 86 - 101, Tests currently only
exercise the reverse merge path (_MergedReverseRecordReader /
_get_reverse_reader) but not the new forward merge path
MergedRecordReader::merged_iter used during parquet conversion; add unit tests
that call merged_iter (via MergedRecordReader) to assert global ordering across
multiple IPC files, include cases with equal timestamps to ensure stable
ordering, and a case using a non-default time_partition string so
get_timestamp_millis(time_partition) is exercised; create small in-memory
RecordBatches with controlled timestamps and multiple readers, feed them into
merged_iter, and assert the output sequence matches the expected ascending order
and respects your time_partition extraction.
src/handlers/http/modal/mod.rs (1)

132-139: Use info! for the startup config dump.

This path runs on every healthy boot, so logging it at warning level will create noisy alerts for normal behavior.

Suggested change
-        tracing::warn!(
+        info!(
             "Starting Query server with-\nNum workers: {}\nKeep Alive: {}\nRequest timeout: {}\nConnection backlog: {}\nMax connections: {}",
             PARSEABLE.options.num_workers,
             PARSEABLE.options.keep_alive,
             PARSEABLE.options.request_timeout,
             PARSEABLE.options.connection_backlog,
             PARSEABLE.options.max_connections
         );
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/modal/mod.rs` around lines 132 - 139, The startup
configuration log currently uses tracing::warn! which is too noisy for normal
healthy boots; change the call to tracing::info! so the configuration dump (the
message using PARSEABLE.options.num_workers, .keep_alive, .request_timeout,
.connection_backlog, .max_connections) is emitted at info level instead of warn,
leaving the message text and interpolated fields unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/cli.rs`:
- Around line 169-176: The num_workers CLI field currently allows 0 which later
causes a panic when passed to HttpServer::workers(...); add a custom parser
function (e.g., parse_num_workers) that accepts a &str, parses to usize, and
returns an error if the value is 0, then reference it on the num_workers arg via
#[arg(value_parser = parse_num_workers)] so invalid values are rejected at
argument parsing time instead of panicking at HttpServer startup.

In `@src/handlers/http/modal/utils/ingest_utils.rs`:
- Around line 245-247: The code is converting the broken ControlFlow error to a
string and wrapping it in PostError::CustomError which loses the original error
type; instead preserve the error by returning it as a PostError using From/Into
or a dedicated variant: change the ControlFlow::Break(e) branch to return
Err(e.into()) (or Err(PostError::from(e))) if PostError implements From for the
error type, or add a new PostError variant like PostError::ParallelError(Box<dyn
std::error::Error + Send + Sync>) and return
Err(PostError::ParallelError(Box::new(e))) so the original error type/context is
retained (refer to ControlFlow::Break, r, and PostError::CustomError in the
code).
- Around line 188-191: The code unwraps data.into_iter().next() and can panic if
convert_array_to_object returns an empty Vec; update the block in
ingest_utils.rs around custom_partition/process_non_partitioned to handle an
empty iterator safely (e.g., check data.is_empty() or use .next().ok_or / match)
and return an appropriate error or empty result instead of unwrapping; ensure
the replacement sets json_batch only when an element exists and preserves the
existing behavior when data has one element.
- Around line 153-158: The tracing span for push_logs declares a field
record_count but never sets it; update the push_logs function to call
Span::current().record("record_count", &tracing::field::display(n)) (or use
tracing::Span::current().record with the actual numeric count) at the point
where you know how many records were processed so the field is populated; locate
the push_logs function and add the record call after computing the record count
(use the same "record_count" name and a Display-able numeric value) so the
instrumentation reflects the real value.

In `@src/otel/metrics.rs`:
- Around line 457-464: The empty-data_points fallback currently builds a
synthetic Map with only generic fields (metric_name, metric_description,
metric_unit, metric_type_val, metadata) and thus drops type-specific fields like
is_monotonic and aggregation_temporality; update the fallback in the branch that
constructs single (the Map created with Map::new()) to also insert the same
type-specific entries the non-empty path uses (e.g., insert "is_monotonic" when
the metric is a Sum and insert "aggregation_temporality" for sums/histograms) by
reusing the existing variables/values (is_monotonic, aggregation_temporality or
whatever names are in scope) so the synthetic record matches the non-empty
record shape.

In `@src/parseable/streams.rs`:
- Around line 688-691: The match on MergedRecordReader::try_new(&arrow_files)
can return a reader with reader.readers.is_empty(), and the current `_ =>
continue` leaves the moved files under the processing_* directory with no
cleanup; change this branch to call the same cleanup/rollback logic you use for
successful processing (e.g., remove or move out the processing_* directories for
the given arrow_files) before continuing. Specifically, in the branch where
MergedRecordReader::try_new returns Err or Ok(reader) with
reader.readers.is_empty(), invoke the cleanup routine that cleans up the
arrow_files / processing_* group (the same code path used after successful
conversion) instead of simply continuing, so processing_* directories don’t
accumulate; reference MergedRecordReader::try_new, the record_reader variable,
and the arrow_files/processing_* group when locating where to insert the cleanup
call.

In `@src/sync.rs`:
- Around line 134-156: The REMOTE_SYNC_RUNNING flag is only cleared on the happy
path, so if anything inside the async block panics or returns early the flag
remains true; wrap the compare_exchange and subsequent work in a RAII drop guard
that sets REMOTE_SYNC_RUNNING.store(false, Ordering::SeqCst) in its Drop impl
(or use scopeguard) so the flag is always cleared on exit, and apply the same
pattern to LOCAL_SYNC_RUNNING and the other similar sync blocks (see the
identical logic around lines referenced in the review) that call
sync_all_streams/monitor_task_duration to ensure all exit paths clear their
guard.

In `@src/telemetry.rs`:
- Around line 48-56: The early return in init_tracing() only checks
OTEL_EXPORTER_OTLP_ENDPOINT, incorrectly disabling tracing when only
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is set; change the existence check to succeed
if either std::env::var(OTEL_EXPORTER_OTLP_ENDPOINT).ok() or
std::env::var(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT).ok() is present so
signal-specific traces configuration enables tracing; keep the rest of
init_tracing() (including reading OTEL_EXPORTER_OTLP_PROTOCOL) unchanged.

---

Nitpick comments:
In `@src/handlers/http/modal/mod.rs`:
- Around line 132-139: The startup configuration log currently uses
tracing::warn! which is too noisy for normal healthy boots; change the call to
tracing::info! so the configuration dump (the message using
PARSEABLE.options.num_workers, .keep_alive, .request_timeout,
.connection_backlog, .max_connections) is emitted at info level instead of warn,
leaving the message text and interpolated fields unchanged.

In `@src/parseable/staging/reader.rs`:
- Around line 86-101: Tests currently only exercise the reverse merge path
(_MergedReverseRecordReader / _get_reverse_reader) but not the new forward merge
path MergedRecordReader::merged_iter used during parquet conversion; add unit
tests that call merged_iter (via MergedRecordReader) to assert global ordering
across multiple IPC files, include cases with equal timestamps to ensure stable
ordering, and a case using a non-default time_partition string so
get_timestamp_millis(time_partition) is exercised; create small in-memory
RecordBatches with controlled timestamps and multiple readers, feed them into
merged_iter, and assert the output sequence matches the expected ascending order
and respects your time_partition extraction.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 255d8234-a2d6-4ff7-a902-148688550cd3

📥 Commits

Reviewing files that changed from the base of the PR and between 76ff210 and 665e160.

📒 Files selected for processing (16)
  • Cargo.toml
  • src/cli.rs
  • src/event/format/json.rs
  • src/event/format/mod.rs
  • src/event/mod.rs
  • src/handlers/http/modal/mod.rs
  • src/handlers/http/modal/utils/ingest_utils.rs
  • src/lib.rs
  • src/otel/metrics.rs
  • src/parseable/staging/reader.rs
  • src/parseable/streams.rs
  • src/storage/field_stats.rs
  • src/storage/object_storage.rs
  • src/sync.rs
  • src/telemetry.rs
  • src/utils/json/mod.rs

Comment thread src/cli.rs Outdated
Comment thread src/handlers/http/modal/utils/ingest_utils.rs
Comment thread src/handlers/http/modal/utils/ingest_utils.rs Outdated
Comment thread src/handlers/http/modal/utils/ingest_utils.rs Outdated
Comment thread src/otel/metrics.rs
Comment thread src/parseable/streams.rs Outdated
Comment thread src/sync.rs Outdated
Comment thread src/telemetry.rs Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/parseable/staging/reader.rs (1)

147-147: Consider using #[allow(dead_code)] instead of underscore prefix for unused functions.

The underscore-prefix naming convention (_new, _find_limit_and_type) is typically used for unused variables in Rust, not functions. If these are internal-only and the underscore is to suppress unused warnings, consider using #[allow(dead_code)] or #[cfg(test)] attributes instead, which convey intent more clearly.

Also applies to: 246-246

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/staging/reader.rs` at line 147, The underscore-prefixed
function names like _new and _find_limit_and_type should not be used to silence
unused warnings; instead remove the leading underscore and mark the
internal-only functions with #[allow(dead_code)] (or #[cfg(test)] if they are
only used in tests). Locate the functions named _new and _find_limit_and_type
and replace the underscore naming with normal names (new, find_limit_and_type)
and add #[allow(dead_code)] above each function (or switch to #[cfg(test)] when
appropriate) so intent is explicit and warnings are suppressed correctly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/parseable/staging/reader.rs`:
- Around line 516-517: Tests call non-existent underscored constructors/methods
for MergedReverseRecordReader; change calls from
MergedReverseRecordReader::_try_new(...) and ._merged_iter(...) to the actual
public names MergedReverseRecordReader::try_new(...) and .merged_iter(...);
apply this same replacement for the second occurrence of the same calls later in
the file (the other MergedReverseRecordReader::_try_new/_merged_iter usage).
- Line 603: Rename the misnamed test function
testget_reverse_reader_single_message to follow snake_case:
test_get_reverse_reader_single_message; update any references or attributes
(e.g., the #[test] function declaration) that call or annotate this function to
use the new name so the test runs correctly and to match Rust naming
conventions.

---

Nitpick comments:
In `@src/parseable/staging/reader.rs`:
- Line 147: The underscore-prefixed function names like _new and
_find_limit_and_type should not be used to silence unused warnings; instead
remove the leading underscore and mark the internal-only functions with
#[allow(dead_code)] (or #[cfg(test)] if they are only used in tests). Locate the
functions named _new and _find_limit_and_type and replace the underscore naming
with normal names (new, find_limit_and_type) and add #[allow(dead_code)] above
each function (or switch to #[cfg(test)] when appropriate) so intent is explicit
and warnings are suppressed correctly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 454be6bf-abb5-47c7-873a-8cfe93d88955

📥 Commits

Reviewing files that changed from the base of the PR and between 665e160 and 516dd35.

📒 Files selected for processing (2)
  • src/parseable/staging/reader.rs
  • src/parseable/streams.rs

Comment thread src/parseable/staging/reader.rs Outdated
Comment thread src/parseable/staging/reader.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/parseable/staging/reader.rs (1)

602-602: ⚠️ Potential issue | 🟡 Minor

Rename test to snake_case (test_get_reverse_reader_single_message).

Line 602 still has the previously reported typo testget_reverse_reader_single_message.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/staging/reader.rs` at line 602, Rename the test function symbol
testget_reverse_reader_single_message to use snake_case
(test_get_reverse_reader_single_message) so it follows Rust naming conventions;
update the function declaration fn testget_reverse_reader_single_message() ->
io::Result<()> to fn test_get_reverse_reader_single_message() -> io::Result<()>
and update any references to that function within the file (e.g., test
registration or calls) to the new name.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/parseable/staging/reader.rs`:
- Line 241: The code calls StreamReader::try_new(...).unwrap(), which panics on
Arrow IPC/stream parsing errors and breaks the Result-based error flow; replace
the unwrap by propagating the error through the function's Result (e.g., use ?
or map_err to convert the Arrow error into the crate's error type) so
StreamReader::try_new(BufReader::new(OffsetReader::_new(reader, messages)),
None) returns an Err instead of panicking and the caller (line 56) can
handle/log/skip corrupted staging files.

---

Duplicate comments:
In `@src/parseable/staging/reader.rs`:
- Line 602: Rename the test function symbol
testget_reverse_reader_single_message to use snake_case
(test_get_reverse_reader_single_message) so it follows Rust naming conventions;
update the function declaration fn testget_reverse_reader_single_message() ->
io::Result<()> to fn test_get_reverse_reader_single_message() -> io::Result<()>
and update any references to that function within the file (e.g., test
registration or calls) to the new name.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 28749a8e-693e-4b15-8cee-518dc3fafbda

📥 Commits

Reviewing files that changed from the base of the PR and between 516dd35 and 31012f6.

📒 Files selected for processing (2)
  • src/event/format/json.rs
  • src/parseable/staging/reader.rs
✅ Files skipped from review due to trivial changes (1)
  • src/event/format/json.rs

Comment thread src/parseable/staging/reader.rs Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/handlers/http/modal/utils/ingest_utils.rs (1)

227-253: ⚠️ Potential issue | 🟠 Major

Avoid side-effectful event.process() inside rayon::try_for_each.

Parallel execution with early return on error causes non-deterministic partial ingestion. When a worker fails, other workers may have already persisted their records. This creates inconsistent state where some records in the batch are ingested while others are not, making client retries unpredictable and risky.

Suggested direction
-        let r = data.into_par_iter().try_for_each(|json| {
+        let events: Result<Vec<_>, _> = data
+            .into_par_iter()
+            .map(|json| {
                 let origin_size = json_byte_size(&json);
-
-            match (json::Event { json, p_timestamp }).into_event(
-                stream_name.to_owned(),
-                origin_size,
-                &schema,
-                static_schema_flag,
-                custom_partition.as_ref(),
-                time_partition.as_ref(),
-                schema_version,
-                StreamType::UserDefined,
-                p_custom_fields,
-                telemetry_type,
-                tenant_id,
-            ) {
-                Ok(event) => match event.process() {
-                    Ok(_) => ControlFlow::Continue(()),
-                    Err(e) => ControlFlow::Break(e.into()),
-                },
-                Err(e) => ControlFlow::Break(e),
-            }
-        });
-        if let ControlFlow::Break(e) = r {
-            return Err(PostError::Invalid(e));
+                (json::Event { json, p_timestamp }).into_event(
+                    stream_name.to_owned(),
+                    origin_size,
+                    &schema,
+                    static_schema_flag,
+                    custom_partition.as_ref(),
+                    time_partition.as_ref(),
+                    schema_version,
+                    StreamType::UserDefined,
+                    p_custom_fields,
+                    telemetry_type,
+                    tenant_id,
+                )
+            })
+            .collect();
+        for event in events.map_err(PostError::Invalid)? {
+            event.process()?;
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/modal/utils/ingest_utils.rs` around lines 227 - 253, The
current use of data.into_par_iter().try_for_each calls side-effectful
event.process() inside parallel workers, causing non-deterministic partial
ingestion; change the logic to first transform/validate all inputs in parallel
using json::Event::into_event (collecting Results into a Vec or short-circuiting
on error), and only after successful collection iterate serially over the
resulting events to call event.process(); on any transformation error return
Err(PostError::Invalid(...)) as before, and on any process() error return
Err(PostError::Invalid(...)), ensuring no persistence side-effects happen during
the parallel phase (look for data.into_par_iter().try_for_each,
json::Event::into_event, event.process(), ControlFlow::Break, and
PostError::Invalid).
♻️ Duplicate comments (1)
src/parseable/streams.rs (1)

688-691: ⚠️ Potential issue | 🟠 Major

Clean up processing_* files when all readers are rejected.

If MergedReverseRecordReader::try_new(&arrow_files) yields no readers, this branch skips cleanup and leaves the moved group stranded in processing_*.

🧹 Proposed fix
             let record_reader = MergedReverseRecordReader::try_new(&arrow_files);
             if record_reader.readers.is_empty() {
+                self.cleanup_arrow_files_and_dir(&arrow_files, tenant_id);
                 continue;
             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/streams.rs` around lines 688 - 691, When
MergedReverseRecordReader::try_new(&arrow_files) returns a reader struct with an
empty readers list, we must delete the moved group from the processing_*
location before continuing; update the branch that checks if
record_reader.readers.is_empty() to invoke the existing cleanup/removal routine
(or add one) that deletes the processing_* files associated with the current
arrow_files/group and any moved marker, then continue. Locate the logic around
MergedReverseRecordReader::try_new and the variable arrow_files in the loop and
call the same cleanup helper used elsewhere (or implement a
remove_processing_group function) to ensure no stranded processing_* files
remain.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/modal/mod.rs`:
- Around line 133-140: The startup log hardcodes "Query server" in the
tracing::warn! within ParseableServer::start; change the message to include the
actual node role instead—use the role/name property from the ParseableServer
instance (e.g., PARSEABLE.role or self.role / ParseableServer::role()) in the
formatted string so ingestors/indexers/prism nodes emit their correct role at
startup, and update the tracing::warn! call to interpolate that dynamic role
value along with the existing options fields.

In `@src/handlers/http/modal/utils/ingest_utils.rs`:
- Around line 94-137: The OTEL branches (LogSource::OtelLogs,
LogSource::OtelTraces, LogSource::OtelMetrics) currently forward the incoming
time_partition into push_logs; change each push_logs call so it always passes
None for the time_partition argument (i.e., enforce time_partition = None) while
keeping the rest of the arguments and use of
flatten_otel_logs/flatten_otel_traces/flatten_otel_metrics unchanged so OTEL
ingestion cannot carry a time partition.
- Around line 194-224: The fast-path for custom_partition.is_none() is dropping
caller-provided time_partition by setting Event.time_partition = None (and in
other places hard-coding parsed_timestamp to now); update the Event construction
in this branch to preserve the caller's p_timestamp and time_partition: set
parsed_timestamp = p_timestamp.naive_utc() (not Utc::now()) and set
time_partition = time_partition.clone() or time_partition.map(|t| t.clone()) so
if a time_partition was provided it is kept; leave other fields (rb,
origin_size, is_first_event, custom_partition_values) unchanged.

In `@src/parseable/streams.rs`:
- Around line 156-167: When acquiring self.writer.lock() in the ingest/flush
paths you must not call poisoned.into_inner(); instead treat a poisoned mutex as
a fatal error and fail-fast: log the poisoning (including self.stream_name) and
then propagate the failure by returning an Err from the current function (or
panic if the surrounding API expects process-level failure) rather than
recovering the inner guard. Update the lock handling at the shown
self.writer.lock() site and the similar occurrence around lines 545-553 to
remove into_inner() usage and return/propagate a poison error so corrupted
writer state cannot be reused.

In `@src/telemetry.rs`:
- Around line 61-63: The code sets the traces exporter protocol using only
OTEL_EXPORTER_OTLP_PROTOCOL and defaults to "http/json"; change the lookup to
first check OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, fall back to
OTEL_EXPORTER_OTLP_PROTOCOL if not set, and use "http/protobuf" as the final
default; update the variable named protocol (where std::env::var is called) to
implement this precedence and default so traces-specific env var overrides the
generic one and the default matches the OpenTelemetry spec.

---

Outside diff comments:
In `@src/handlers/http/modal/utils/ingest_utils.rs`:
- Around line 227-253: The current use of data.into_par_iter().try_for_each
calls side-effectful event.process() inside parallel workers, causing
non-deterministic partial ingestion; change the logic to first
transform/validate all inputs in parallel using json::Event::into_event
(collecting Results into a Vec or short-circuiting on error), and only after
successful collection iterate serially over the resulting events to call
event.process(); on any transformation error return Err(PostError::Invalid(...))
as before, and on any process() error return Err(PostError::Invalid(...)),
ensuring no persistence side-effects happen during the parallel phase (look for
data.into_par_iter().try_for_each, json::Event::into_event, event.process(),
ControlFlow::Break, and PostError::Invalid).

---

Duplicate comments:
In `@src/parseable/streams.rs`:
- Around line 688-691: When MergedReverseRecordReader::try_new(&arrow_files)
returns a reader struct with an empty readers list, we must delete the moved
group from the processing_* location before continuing; update the branch that
checks if record_reader.readers.is_empty() to invoke the existing
cleanup/removal routine (or add one) that deletes the processing_* files
associated with the current arrow_files/group and any moved marker, then
continue. Locate the logic around MergedReverseRecordReader::try_new and the
variable arrow_files in the loop and call the same cleanup helper used elsewhere
(or implement a remove_processing_group function) to ensure no stranded
processing_* files remain.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 0c5a0fe7-ccf1-41b3-9239-6a518f60a518

📥 Commits

Reviewing files that changed from the base of the PR and between 31012f6 and a3ea90e.

📒 Files selected for processing (16)
  • Cargo.toml
  • src/cli.rs
  • src/event/format/json.rs
  • src/event/format/mod.rs
  • src/event/mod.rs
  • src/handlers/http/modal/mod.rs
  • src/handlers/http/modal/utils/ingest_utils.rs
  • src/lib.rs
  • src/otel/metrics.rs
  • src/parseable/staging/reader.rs
  • src/parseable/streams.rs
  • src/storage/field_stats.rs
  • src/storage/object_storage.rs
  • src/sync.rs
  • src/telemetry.rs
  • src/utils/json/mod.rs
✅ Files skipped from review due to trivial changes (3)
  • src/utils/json/mod.rs
  • src/lib.rs
  • src/storage/object_storage.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/storage/field_stats.rs
  • src/sync.rs

Comment thread src/handlers/http/modal/mod.rs
Comment thread src/handlers/http/modal/utils/ingest_utils.rs
Comment thread src/handlers/http/modal/utils/ingest_utils.rs Outdated
Comment thread src/parseable/streams.rs
Comment thread src/telemetry.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
src/parseable/streams.rs (2)

549-557: ⚠️ Potential issue | 🟠 Major

Don’t recover a poisoned writer during flush.

push() now aborts on poison, but flush() still calls into_inner() and keeps converting with potentially corrupted writer state. This path should fail fast as well.

🔒 Safer handling
-            let mut writer = match self.writer.lock() {
-                Ok(guard) => guard,
-                Err(poisoned) => {
-                    error!(
-                        "Writer lock poisoned while flushing data for stream {}",
-                        self.stream_name
-                    );
-                    poisoned.into_inner()
-                }
-            };
+            let mut writer = self.writer.lock().expect(LOCK_EXPECT);

Based on learnings, Parseable treats staging/invariant failures as critical and prefers fail-fast behavior over continuing in a potentially inconsistent state.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/streams.rs` around lines 549 - 557, The flush() path should
fail fast instead of calling poisoned.into_inner(); in the match on
self.writer.lock() inside flush(), replace the Err(poisoned) branch so it does
not call into_inner() but instead logs the poison and returns/propagates an
error (or panics) consistent with push()’s behavior; specifically, update the
match in the flush() method (the block currently logging "Writer lock poisoned
while flushing data for stream {}") to immediately abort/return an Err
indicating a poisoned lock (do not call into_inner()) so the code treats a
poisoned writer as a fatal invariant violation like push().

684-695: ⚠️ Potential issue | 🟠 Major

Don’t strand moved Arrow groups under processing_*.

If MergedReverseRecordReader::try_new() filters every file out, this branch just continues. Those files were already moved into processing_*, and normal cycles only regroup the current in-process directory, so they fall off the retry path.

Based on learnings, failed staged work is supposed to stay on the normal sync retry path; leaving it under an orphaned processing_* directory bypasses that path.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/streams.rs` around lines 684 - 695, The loop calls
MergedReverseRecordReader::try_new(...) and if record_reader.readers.is_empty()
it currently continues, but those files were already moved into a processing_*
dir and will be orphaned; instead, when record_reader.readers.is_empty() detect
that case and move the corresponding arrow_files (or the parquet_path group)
back from the processing_* location into the staged location (or call the
existing undo/reenqueue helper) so they stay on the normal sync retry path, and
update/reset metrics via update_staging_metrics/reset_staging_metrics as
appropriate before continuing; specifically modify the branch around
record_reader.readers.is_empty() to perform the requeue/rollback of the moved
files rather than silently continue.
src/telemetry.rs (1)

61-63: ⚠️ Potential issue | 🟠 Major

Honor the traces-specific protocol override here.

This still ignores OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, so a traces-only config cannot override the generic protocol. The OTLP default is also expected to be http/protobuf, not http/json.

🔧 Suggested change
 const OTEL_EXPORTER_OTLP_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT";
 const OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT";
 const OTEL_EXPORTER_OTLP_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_PROTOCOL";
+const OTEL_EXPORTER_OTLP_TRACES_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL";
 ...
-    let protocol =
-        std::env::var(OTEL_EXPORTER_OTLP_PROTOCOL).unwrap_or_else(|_| "http/json".to_string());
+    let protocol = std::env::var(OTEL_EXPORTER_OTLP_TRACES_PROTOCOL)
+        .or_else(|_| std::env::var(OTEL_EXPORTER_OTLP_PROTOCOL))
+        .unwrap_or_else(|_| "http/protobuf".to_string());
OpenTelemetry OTLP exporter spec: does `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL` override `OTEL_EXPORTER_OTLP_PROTOCOL` for traces, and what protocol should SDKs default to when neither is set?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/telemetry.rs` around lines 61 - 63, The code sets protocol by only
reading OTEL_EXPORTER_OTLP_PROTOCOL and defaulting to "http/json"; change the
lookup in telemetry.rs so the traces-specific env var
OTEL_EXPORTER_OTLP_TRACES_PROTOCOL is consulted first, falling back to
OTEL_EXPORTER_OTLP_PROTOCOL if unset, and use the OTLP default "http/protobuf"
as the final fallback; update the logic that assigns the variable protocol to
reflect this precedence and default.
src/handlers/http/modal/mod.rs (1)

133-140: ⚠️ Potential issue | 🟡 Minor

Startup log still hard-codes the node role as "Query server".

This is the shared startup path; please log the actual runtime role instead of a fixed "Query".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/modal/mod.rs` around lines 133 - 140, The startup log
hardcodes "Query server"; change the tracing::warn format string to include the
actual runtime role and pass the role value from the parsed config (use the
runtime role field on PARSEABLE, e.g., PARSEABLE.role or PARSEABLE.options.role
as appropriate). Update the message to something like "{} server with-\nNum
workers: ..." and add the role value as the first argument so the logged role
reflects the real runtime role instead of always "Query".
🧹 Nitpick comments (1)
src/parseable/staging/mod.rs (1)

41-42: Simplify PoisonError variant to plain string.

Mutex::lock() and RwLock::write() return PoisonError<MutexGuard<_>> and PoisonError<RwLockWriteGuard<_>> respectively. The #[from] PoisonError<String> attribute will not auto-convert these—callers must manually construct PoisonError::new(String) anyway. Use a plain PoisonedLock(String) variant instead to make this synthesized error clearer and reduce unnecessary fabrication.

♻️ Suggested change
-    #[error("{0}")]
-    PoisonError(#[from] PoisonError<String>),
+    #[error("{0}")]
+    PoisonedLock(String),
-                    return Err(StagingError::PoisonError(PoisonError::new(format!(
+                    return Err(StagingError::PoisonedLock(format!(
                         "Writer lock poisoned while ingesting data for stream {} - {}",
                         self.stream_name, poisoned
-                    ))));
+                    )));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/staging/mod.rs` around lines 41 - 42, Replace the
PoisonError(#[from] PoisonError<String>) enum variant with a simple
string-carrying variant (e.g., PoisonedLock(String)) and update its Display
attribute (the existing #[error("{0}")]) accordingly; remove the #[from] on
PoisonError since PoisonError<T> from parking_lot/std won't auto-convert, and
update any call sites that constructed PoisonError::new(...) to instead produce
the new PoisonedLock(String) variant (or convert the incoming PoisonError<T>
into a meaningful String and wrap it) so callers no longer need to fabricate
PoisonError<String>.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/modal/mod.rs`:
- Around line 24-25: The code currently casts the u64 worker count using
as_usize() (e.g., where num_workers.as_usize() is passed to
HttpServer::workers()), which can truncate on 32-bit targets; change this to use
usize::try_from(num_workers) and handle the Result explicitly: on Ok(n) pass n
to HttpServer::workers(), on Err(_) return or propagate a descriptive error (or
clamp/validate) so you never call HttpServer::workers() with a truncated or zero
value; update any function (e.g., the modal handler initialization) to propagate
the conversion error instead of relying on as_usize().

---

Duplicate comments:
In `@src/handlers/http/modal/mod.rs`:
- Around line 133-140: The startup log hardcodes "Query server"; change the
tracing::warn format string to include the actual runtime role and pass the role
value from the parsed config (use the runtime role field on PARSEABLE, e.g.,
PARSEABLE.role or PARSEABLE.options.role as appropriate). Update the message to
something like "{} server with-\nNum workers: ..." and add the role value as the
first argument so the logged role reflects the real runtime role instead of
always "Query".

In `@src/parseable/streams.rs`:
- Around line 549-557: The flush() path should fail fast instead of calling
poisoned.into_inner(); in the match on self.writer.lock() inside flush(),
replace the Err(poisoned) branch so it does not call into_inner() but instead
logs the poison and returns/propagates an error (or panics) consistent with
push()’s behavior; specifically, update the match in the flush() method (the
block currently logging "Writer lock poisoned while flushing data for stream
{}") to immediately abort/return an Err indicating a poisoned lock (do not call
into_inner()) so the code treats a poisoned writer as a fatal invariant
violation like push().
- Around line 684-695: The loop calls MergedReverseRecordReader::try_new(...)
and if record_reader.readers.is_empty() it currently continues, but those files
were already moved into a processing_* dir and will be orphaned; instead, when
record_reader.readers.is_empty() detect that case and move the corresponding
arrow_files (or the parquet_path group) back from the processing_* location into
the staged location (or call the existing undo/reenqueue helper) so they stay on
the normal sync retry path, and update/reset metrics via
update_staging_metrics/reset_staging_metrics as appropriate before continuing;
specifically modify the branch around record_reader.readers.is_empty() to
perform the requeue/rollback of the moved files rather than silently continue.

In `@src/telemetry.rs`:
- Around line 61-63: The code sets protocol by only reading
OTEL_EXPORTER_OTLP_PROTOCOL and defaulting to "http/json"; change the lookup in
telemetry.rs so the traces-specific env var OTEL_EXPORTER_OTLP_TRACES_PROTOCOL
is consulted first, falling back to OTEL_EXPORTER_OTLP_PROTOCOL if unset, and
use the OTLP default "http/protobuf" as the final fallback; update the logic
that assigns the variable protocol to reflect this precedence and default.

---

Nitpick comments:
In `@src/parseable/staging/mod.rs`:
- Around line 41-42: Replace the PoisonError(#[from] PoisonError<String>) enum
variant with a simple string-carrying variant (e.g., PoisonedLock(String)) and
update its Display attribute (the existing #[error("{0}")]) accordingly; remove
the #[from] on PoisonError since PoisonError<T> from parking_lot/std won't
auto-convert, and update any call sites that constructed PoisonError::new(...)
to instead produce the new PoisonedLock(String) variant (or convert the incoming
PoisonError<T> into a meaningful String and wrap it) so callers no longer need
to fabricate PoisonError<String>.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 56ee729e-6b4d-4573-b069-eb2c126b4640

📥 Commits

Reviewing files that changed from the base of the PR and between a3ea90e and b4d42ac.

📒 Files selected for processing (9)
  • src/cli.rs
  • src/handlers/http/modal/mod.rs
  • src/handlers/http/modal/utils/ingest_utils.rs
  • src/otel/metrics.rs
  • src/parseable/staging/mod.rs
  • src/parseable/staging/reader.rs
  • src/parseable/streams.rs
  • src/sync.rs
  • src/telemetry.rs
✅ Files skipped from review due to trivial changes (1)
  • src/otel/metrics.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/sync.rs
  • src/handlers/http/modal/utils/ingest_utils.rs

Comment thread src/handlers/http/modal/mod.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/parseable/streams.rs (1)

691-695: ⚠️ Potential issue | 🟠 Major

Empty reader groups leave orphaned processing_* directories.

When MergedReverseRecordReader::try_new returns a reader with no valid readers (all files filtered out), the code just continues without cleaning up the arrow files that were already moved to the processing_* directory. These orphaned directories accumulate on disk and are never revisited.

🐛 Proposed fix
         for (parquet_path, arrow_files) in staging_files {
             let record_reader = MergedReverseRecordReader::try_new(&arrow_files);
             if record_reader.readers.is_empty() {
+                // Clean up arrow files that couldn't be read to avoid orphaned processing_* dirs
+                self.cleanup_arrow_files_and_dir(&arrow_files, tenant_id);
                 continue;
             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/streams.rs` around lines 691 - 695, When
MergedReverseRecordReader::try_new(...) returns an object whose
record_reader.readers.is_empty(), the code currently continues without cleaning
up the arrow_files that were already moved into their processing_* directories;
fix this by, before the continue, iterating the arrow_files collection and
removing each associated processing directory (e.g., compute the processing_*
path from each arrow_file entry and call std::fs::remove_dir_all or the existing
cleanup helper) and log non-fatal failures; keep using the same symbols
(MergedReverseRecordReader::try_new, record_reader.readers.is_empty(),
arrow_files, staging_files) and ensure removal errors are caught/ignored so the
loop proceeds.
♻️ Duplicate comments (2)
src/parseable/staging/reader.rs (1)

607-607: ⚠️ Potential issue | 🟡 Minor

Typo in test function name: missing underscore.

The function testget_reverse_reader_single_message is missing an underscore between "test" and "get". Standard Rust test naming uses snake_case: test_get_reverse_reader_single_message.

✏️ Proposed fix
-    fn testget_reverse_reader_single_message() -> io::Result<()> {
+    fn test_get_reverse_reader_single_message() -> io::Result<()> {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/staging/reader.rs` at line 607, Rename the test function
testget_reverse_reader_single_message to use snake_case by inserting the missing
underscore, i.e., rename the symbol testget_reverse_reader_single_message to
test_get_reverse_reader_single_message wherever it is declared and referenced
(the test function in reader.rs), and update any test invocations or references
to match the new name.
src/parseable/streams.rs (1)

543-574: ⚠️ Potential issue | 🟠 Major

Inconsistent poison handling: flush() still uses into_inner().

The push() method was updated to return an error on poisoned mutex (lines 160-170), but flush() still recovers via poisoned.into_inner() (lines 551-557). This is inconsistent and can persist corrupted writer state. Based on learnings, Parseable treats staging/invariant failures as critical and prefers fail-fast behavior.

Additionally, since flush() returns (), consider either:

  1. Changing the return type to Result<(), StagingError> and propagating the error, or
  2. Using expect(LOCK_EXPECT) to panic on poison (consistent with other lock usages in this file like line 536).
🐛 Proposed fix (option 2 - panic on poison)
     pub fn flush(&self, forced: bool) {
         let _span = info_span!("flush", stream_name = %self.stream_name, forced).entered();
         // Swap out stale writers under the lock, drop them after releasing it.
         // DiskWriter::Drop does I/O (IPC finish + file rename) so dropping
         // outside the lock avoids blocking concurrent push() calls.
         let stale_writers = {
-            let mut writer = match self.writer.lock() {
-                Ok(guard) => guard,
-                Err(poisoned) => {
-                    error!(
-                        "Writer lock poisoned while flushing data for stream {}",
-                        self.stream_name
-                    );
-                    poisoned.into_inner()
-                }
-            };
+            let mut writer = self.writer.lock().expect(LOCK_EXPECT);
             writer.mem.clear();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/streams.rs` around lines 543 - 574, flush() currently recovers
from a poisoned mutex via poisoned.into_inner(), which is inconsistent with
push() and risks continuing with corrupted state; change the poison handling in
flush() to be fail-fast like other usages by replacing the match on
self.writer.lock() with self.writer.lock().expect(LOCK_EXPECT) (or alternatively
change flush() signature to return Result<(), StagingError> and propagate the
lock error), ensuring you reference the flush() function and the same
LOCK_EXPECT constant used elsewhere so poison leads to a panic rather than
silent recovery.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/modal/utils/ingest_utils.rs`:
- Around line 52-64: The tracing spans are currently using fields(stream_name)
which creates an empty field instead of capturing the parameter; update the
instrument attributes to explicitly pass the value (e.g., change
fields(stream_name) to fields(stream_name = %stream_name)) or remove stream_name
from fields(...) to rely on automatic recording; apply this fix to the
flatten_and_push_logs instrument attribute and the other instrument attribute
later in the file where fields(stream_name) is used so the actual stream_name
value is recorded.

---

Outside diff comments:
In `@src/parseable/streams.rs`:
- Around line 691-695: When MergedReverseRecordReader::try_new(...) returns an
object whose record_reader.readers.is_empty(), the code currently continues
without cleaning up the arrow_files that were already moved into their
processing_* directories; fix this by, before the continue, iterating the
arrow_files collection and removing each associated processing directory (e.g.,
compute the processing_* path from each arrow_file entry and call
std::fs::remove_dir_all or the existing cleanup helper) and log non-fatal
failures; keep using the same symbols (MergedReverseRecordReader::try_new,
record_reader.readers.is_empty(), arrow_files, staging_files) and ensure removal
errors are caught/ignored so the loop proceeds.

---

Duplicate comments:
In `@src/parseable/staging/reader.rs`:
- Line 607: Rename the test function testget_reverse_reader_single_message to
use snake_case by inserting the missing underscore, i.e., rename the symbol
testget_reverse_reader_single_message to test_get_reverse_reader_single_message
wherever it is declared and referenced (the test function in reader.rs), and
update any test invocations or references to match the new name.

In `@src/parseable/streams.rs`:
- Around line 543-574: flush() currently recovers from a poisoned mutex via
poisoned.into_inner(), which is inconsistent with push() and risks continuing
with corrupted state; change the poison handling in flush() to be fail-fast like
other usages by replacing the match on self.writer.lock() with
self.writer.lock().expect(LOCK_EXPECT) (or alternatively change flush()
signature to return Result<(), StagingError> and propagate the lock error),
ensuring you reference the flush() function and the same LOCK_EXPECT constant
used elsewhere so poison leads to a panic rather than silent recovery.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 75447c63-646e-48de-8b5b-44218a2075ae

📥 Commits

Reviewing files that changed from the base of the PR and between a3ea90e and b4d42ac.

📒 Files selected for processing (9)
  • src/cli.rs
  • src/handlers/http/modal/mod.rs
  • src/handlers/http/modal/utils/ingest_utils.rs
  • src/otel/metrics.rs
  • src/parseable/staging/mod.rs
  • src/parseable/staging/reader.rs
  • src/parseable/streams.rs
  • src/sync.rs
  • src/telemetry.rs
✅ Files skipped from review due to trivial changes (2)
  • src/parseable/staging/mod.rs
  • src/otel/metrics.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/handlers/http/modal/mod.rs
  • src/telemetry.rs

Comment thread src/handlers/http/modal/utils/ingest_utils.rs
coderabbitai[bot]
coderabbitai Bot previously approved these changes Apr 28, 2026
@nikhilsinhaparseable nikhilsinhaparseable merged commit 39c856a into parseablehq:main Apr 28, 2026
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants