diff --git a/Cargo.toml b/Cargo.toml index 2ef573bf4..756a6889b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,7 +63,10 @@ base64 = "0.22.0" cookie = "0.18.1" hex = "0.4" openid = { version = "0.18.3", default-features = false, features = ["rustls"] } -rustls = { version = "0.23", default-features = false, features = ["ring", "std"] } +rustls = { version = "0.23", default-features = false, features = [ + "ring", + "std", +] } rustls-pemfile = "2.1.2" sha2 = "0.10.8" @@ -104,10 +107,27 @@ opentelemetry-proto = { git = "https://github.com/open-telemetry/opentelemetry-r "metrics", "trace", ] } -prometheus = { version = "0.13.4", default-features = false, features = ["process"] } +prometheus = { version = "0.13.4", default-features = false, features = [ + "process", +] } prometheus-parse = "0.2.5" -tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter", "time"] } +tracing = "0.1.44" +tracing-subscriber = { version = "0.3.23", features = [ + "env-filter", + "time", + "registry", +] } + +# telemetry +tracing-opentelemetry = "0.32.1" +opentelemetry = "0.31.0" +opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] } +opentelemetry-otlp = { version = "0.31.1", features = [ + "grpc-tonic", + "http-proto", + "http-json", +] } +tracing-actix-web = "0.7" # Time and Date chrono = "0.4" diff --git a/src/cli.rs b/src/cli.rs index cf94e5800..cce59ba1c 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -16,7 +16,7 @@ * */ -use clap::Parser; +use clap::{Parser, value_parser}; use std::{env, fs, path::PathBuf}; use url::Url; @@ -148,6 +148,52 @@ pub struct Options { )] pub address: String, + // Actix request timeout in seconds + #[arg( + long, + env = "P_ACTIX_REQUEST_TIMEOUT", + default_value = "5", + help = "Client request timeout" + )] + pub request_timeout: u64, + + // Actix keep alive in seconds + #[arg( + long, + env = "P_ACTIX_KEEP_ALIVE", + default_value = "5", + help = "Server keep-alive" + )] + pub keep_alive: u64, + + // Actix num workers + #[arg( + long, + env = "P_ACTIX_NUM_WORKERS", + default_value_t = num_cpus::get() as u64, + value_parser = value_parser!(u64).range(1..), + help = "Number of workers for actix-web" + )] + pub num_workers: u64, + + // Actix connections backlog + #[arg( + long, + env = "P_ACTIX_BACKLOG", + default_value = "2048", + help = "Maximum number of pending connections" + )] + pub connection_backlog: u32, + + // Actix max connections + #[arg( + long, + env = "P_ACTIX_MAX_CONNECTIONS", + default_value = "25000", + help = "Per-worker maximum number of concurrent connections" + )] + pub max_connections: usize, + #[arg( long = "origin", env = "P_ORIGIN_URI", diff --git a/src/event/format/json.rs b/src/event/format/json.rs index f07728b9b..6efc15560 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -31,7 +31,7 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, }; -use tracing::error; +use tracing::{error, info_span}; use super::EventFormat; use super::{detect_schema_conflicts, rename_conflicting_fields_in_json}; @@ -86,8 +86,12 @@ impl EventFormat for Event { // IMPORTANT: Detect conflicts BEFORE update_field_type_in_schema, because // update_field_type_in_schema may override types (e.g., force Utf8 to Timestamp // if existing schema has Timestamp), which would hide the actual conflict. - let raw_inferred_schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok)) - .map_err(|err| anyhow!("Could not infer schema for this event due to err {:?}", err))?; + let raw_inferred_schema = { + let _span = info_span!("infer_json_schema", record_count = value_arr.len()).entered(); + infer_json_schema_from_iterator(value_arr.iter().map(Ok)).map_err(|err| { + anyhow!("Could not infer schema for this event due to err {:?}", err) + })? + }; // Detect schema conflicts using raw inferred schema vs existing stream schema // Pass the actual values and schema_version to check if values can be coerced to existing types @@ -110,7 +114,11 @@ impl EventFormat for Event { collect_keys(value_arr.iter()).expect("fields can be collected from array of objects"); let mut is_first = false; - let (value_arr, schema) = match derive_arrow_schema(stream_schema, fields) { + let res = { + let _span = info_span!("derive_arrow_schema").entered(); + derive_arrow_schema(stream_schema, fields) + }; + let (value_arr, schema) = match res { Ok(schema) => (value_arr, schema), Err(_) => { let mut infer_schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok)) @@ -155,6 +163,7 @@ impl EventFormat for Event { // Convert the Data type (defined above) to arrow record batch fn decode(data: Self::Data, schema: Arc) -> Result { + let _span = info_span!("json_to_recordbatch", record_count = data.len()).entered(); let array_capacity = round_upto_multiple_of_64(data.len()); let mut reader = ReaderBuilder::new(schema) .with_batch_size(array_capacity) diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 54941cb98..999328fa6 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -29,6 +29,7 @@ use arrow_schema::{DataType, Field, Schema, TimeUnit}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value; +use tracing::info_span; use crate::{ handlers::TelemetryType, @@ -167,6 +168,7 @@ pub trait EventFormat: Sized { schema_version: SchemaVersion, p_custom_fields: &HashMap, ) -> Result<(RecordBatch, bool), AnyError> { + let _span = info_span!("into_recordbatch").entered(); let p_timestamp = self.get_p_timestamp(); let (data, schema, is_first) = self.to_data( storage_schema, diff --git a/src/event/mod.rs b/src/event/mod.rs index 0c66351ab..f9955e986 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -24,6 +24,8 @@ use arrow_schema::{Field, Fields, Schema}; use itertools::Itertools; use std::sync::Arc; +use tracing::{info_span, instrument}; + use self::error::EventError; use crate::{ LOCK_EXPECT, @@ -60,6 +62,16 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { + #[instrument( + name = "event_process", + level = "info", + skip_all, + fields( + stream_name = %self.stream_name, + num_rows = self.rb.num_rows(), + is_first_event = self.is_first_event + ) + )] pub fn process(self) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); if self.time_partition.is_some() { @@ -144,6 +156,7 @@ pub fn commit_schema( schema: Arc, tenant_id: &Option, ) -> Result<(), StagingError> { + let _span = info_span!("commit_schema", stream_name).entered(); let mut stream_metadata = PARSEABLE.streams.write().expect("lock poisoned"); let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); let map = &mut stream_metadata diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 7a9730bb6..1a009b775 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -16,11 +16,12 @@ * */ -use std::{fmt, path::Path, sync::Arc}; +use std::{fmt, path::Path, sync::Arc, time::Duration}; -use actix_web::{App, HttpServer, middleware::from_fn, web::ServiceConfig}; +use actix_web::{App, HttpServer, http::KeepAlive, middleware::from_fn, web::ServiceConfig}; use actix_web_prometheus::PrometheusMetrics; use anyhow::Context; +use arrow::datatypes::ArrowNativeType; use async_trait::async_trait; use base64::{Engine, prelude::BASE64_STANDARD}; use bytes::Bytes; @@ -121,8 +122,22 @@ pub trait ParseableServer { // Create the HTTP server let http_server = HttpServer::new(create_app_fn) - .workers(num_cpus::get()) + .workers(PARSEABLE.options.num_workers.as_usize()) + .keep_alive(KeepAlive::Timeout(Duration::from_secs( + PARSEABLE.options.keep_alive, + ))) + .client_request_timeout(Duration::from_secs(PARSEABLE.options.request_timeout)) + .backlog(PARSEABLE.options.connection_backlog) + .max_connections(PARSEABLE.options.max_connections) .shutdown_timeout(60); + tracing::warn!( + "Starting Query server with-\nNum workers: {}\nKeep Alive: {}\nRequest timeout: {}\nConnection backlog: {}\nMax connections: {}", + PARSEABLE.options.num_workers, + PARSEABLE.options.keep_alive, + PARSEABLE.options.request_timeout, + PARSEABLE.options.connection_backlog, + PARSEABLE.options.max_connections + ); // Start the server with or without TLS let srv = if let Some(config) = ssl { diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 3b1d4c524..928cb0286 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -22,9 +22,10 @@ use chrono::Utc; use opentelemetry_proto::tonic::{ logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData, }; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use serde_json::Value; use std::collections::HashMap; -use tracing::warn; +use tracing::{instrument, warn}; use crate::{ event::{ @@ -48,6 +49,19 @@ const IGNORE_HEADERS: [&str; 3] = [STREAM_NAME_HEADER_KEY, LOG_SOURCE_KEY, EXTRA const MAX_CUSTOM_FIELDS: usize = 10; const MAX_FIELD_VALUE_LENGTH: usize = 100; +#[instrument( + name = "flatten_and_push_logs", + level = "info", + skip( + json, + log_source, + p_custom_fields, + time_partition, + telemetry_type, + tenant_id + ), + fields(stream_name) +)] pub async fn flatten_and_push_logs( json: Value, stream_name: &str, @@ -75,75 +89,74 @@ pub async fn flatten_and_push_logs( time_partition, telemetry_type, tenant_id, - ) - .await?; + )?; } LogSource::OtelLogs => { - //custom flattening required for otel logs let logs: LogsData = serde_json::from_value(json)?; - for record in flatten_otel_logs(&logs, tenant_str) { + let records = flatten_otel_logs(&logs, tenant_str); + if !records.is_empty() { push_logs( stream_name, - record, + Value::Array(records), log_source, p_custom_fields, - time_partition.clone(), + None, telemetry_type, tenant_id, - ) - .await?; + )?; } } LogSource::OtelTraces => { - //custom flattening required for otel traces let traces: TracesData = serde_json::from_value(json)?; - for record in flatten_otel_traces(&traces, tenant_str) { + let records = flatten_otel_traces(&traces, tenant_str); + if !records.is_empty() { push_logs( stream_name, - record, + Value::Array(records), log_source, p_custom_fields, - time_partition.clone(), + None, telemetry_type, tenant_id, - ) - .await?; + )?; } } LogSource::OtelMetrics => { - //custom flattening required for otel metrics let metrics: MetricsData = serde_json::from_value(json)?; - for record in flatten_otel_metrics(metrics, tenant_str) { + let records = flatten_otel_metrics(metrics, tenant_str); + if !records.is_empty() { push_logs( stream_name, - record, + Value::Array(records), log_source, p_custom_fields, - time_partition.clone(), + None, telemetry_type, tenant_id, - ) - .await?; + )?; } } - _ => { - push_logs( - stream_name, - json, - log_source, - p_custom_fields, - time_partition, - telemetry_type, - tenant_id, - ) - .await? - } + _ => push_logs( + stream_name, + json, + log_source, + p_custom_fields, + time_partition, + telemetry_type, + tenant_id, + )?, } Ok(()) } -pub async fn push_logs( +#[instrument( + name = "push_logs", + level = "info", + skip(json, log_source, p_custom_fields, time_partition, telemetry_type, tenant_id), + fields(stream_name, record_count = tracing::field::Empty) +)] +pub fn push_logs( stream_name: &str, json: Value, log_source: &LogSource, @@ -153,12 +166,11 @@ pub async fn push_logs( tenant_id: &Option, ) -> Result<(), PostError> { let stream = PARSEABLE.get_stream(stream_name, tenant_id)?; - let time_partition_limit = PARSEABLE - .get_stream(stream_name, tenant_id)? - .get_time_partition_limit(); + let time_partition_limit = stream.get_time_partition_limit(); let static_schema_flag = stream.get_static_schema_flag(); let custom_partition = stream.get_custom_partition(); let schema_version = stream.get_schema_version(); + let schema = stream.get_schema_raw(); let p_timestamp = Utc::now(); let data = convert_array_to_object( @@ -170,30 +182,94 @@ pub async fn push_logs( log_source, )?; - for json in data { - let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length - let schema = PARSEABLE - .get_stream(stream_name, tenant_id)? - .get_schema_raw(); - json::Event { json, p_timestamp } - .into_event( - stream_name.to_owned(), - origin_size, - &schema, - static_schema_flag, - custom_partition.as_ref(), - time_partition.as_ref(), - schema_version, - StreamType::UserDefined, - p_custom_fields, - telemetry_type, - tenant_id, - )? - .process()?; + if data.is_empty() { + return Err(PostError::Invalid(anyhow::Error::msg( + "Empty data object received", + ))); + } + + // Batch path: one schema inference + one decoder for the entire batch. + // When custom partitions are set, different records may need different + // partition keys, so fall back to per-record processing. + if custom_partition.is_none() && time_partition.is_none() { + // process_non_partitioned returns vec![Value::Array([...])], a single + // element wrapping the whole batch. Unwrap it to avoid double-nesting. + let json_batch = data.into_iter().next().unwrap(); + let origin_size = json_byte_size(&json_batch); + + let (rb, is_first_event) = (json::Event { + json: json_batch, + p_timestamp, + }) + .into_recordbatch( + &schema, + static_schema_flag, + time_partition.as_ref(), + schema_version, + p_custom_fields, + )?; + + let event = crate::event::Event { + rb, + stream_name: stream_name.to_owned(), + origin_format: "json", + origin_size, + is_first_event, + parsed_timestamp: p_timestamp.naive_utc(), + time_partition: None, + custom_partition_values: HashMap::new(), + stream_type: StreamType::UserDefined, + telemetry_type, + tenant_id: tenant_id.to_owned(), + }; + event.process()?; + } else { + // Per-record path for custom-partitioned streams + let events: Result, _> = data + .into_par_iter() + .map(|json| { + let origin_size = json_byte_size(&json); + (json::Event { json, p_timestamp }).into_event( + stream_name.to_owned(), + origin_size, + &schema, + static_schema_flag, + custom_partition.as_ref(), + time_partition.as_ref(), + schema_version, + StreamType::UserDefined, + p_custom_fields, + telemetry_type, + tenant_id, + ) + }) + .collect(); + for event in events.map_err(PostError::Invalid)? { + event.process()?; + } } Ok(()) } +/// Zero-allocation JSON byte size counter. +/// Uses serde_json::to_writer with a writer that only counts bytes. +pub fn json_byte_size(value: &Value) -> u64 { + struct CountingWriter(u64); + impl std::io::Write for CountingWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0 += buf.len() as u64; + Ok(buf.len()) + } + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + } + let mut counter = CountingWriter(0); + // serde_json::to_writer won't fail on Value types + let _ = serde_json::to_writer(&mut counter, value); + counter.0 +} + pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap { let user_agent = req .headers() diff --git a/src/lib.rs b/src/lib.rs index 16e97e2ee..de7ee4a5f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,6 +50,7 @@ mod static_schema; mod stats; pub mod storage; pub mod sync; +pub mod telemetry; pub mod tenants; pub mod users; pub mod utils; @@ -63,9 +64,10 @@ pub use handlers::http::modal::{ }; use once_cell::sync::Lazy; pub use openid; -pub use opentelemetry_proto; use parseable::PARSEABLE; use reqwest::{Client, ClientBuilder}; +pub use {opentelemetry, opentelemetry_otlp, opentelemetry_proto, opentelemetry_sdk}; +pub use {tracing_actix_web, tracing_opentelemetry, tracing_subscriber}; // It is very unlikely that panic will occur when dealing with locks. pub const LOCK_EXPECT: &str = "Thread shouldn't panic while holding a lock"; diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 0af72ec3a..99e51a62f 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -23,6 +23,8 @@ use opentelemetry_proto::tonic::metrics::v1::{ }; use serde_json::{Map, Value}; +use tracing::info_span; + use crate::metrics::increment_metrics_collected_by_date; use super::otel_utils::{ @@ -174,17 +176,7 @@ fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec Vec> { - let mut vec_gauge_json = Vec::new(); - let data_points_json = flatten_number_data_points(&gauge.data_points); - - for data_point_json in data_points_json { - let mut gauge_json = Map::new(); - for (key, value) in &data_point_json { - gauge_json.insert(key.clone(), value.clone()); - } - vec_gauge_json.push(gauge_json); - } - vec_gauge_json + flatten_number_data_points(&gauge.data_points) } /// otel metrics event has json object for sum @@ -192,24 +184,15 @@ fn flatten_gauge(gauge: &Gauge) -> Vec> { /// this function flatten the sum json object /// and returns a `Vec` of `Map` for each data point fn flatten_sum(sum: &Sum) -> Vec> { - let mut vec_sum_json = Vec::new(); - let data_points_json = flatten_number_data_points(&sum.data_points); - for data_point_json in data_points_json { - let mut sum_json = Map::new(); - for (key, value) in &data_point_json { - sum_json.insert(key.clone(), value.clone()); + let mut data_points = flatten_number_data_points(&sum.data_points); + let agg_temp = flatten_aggregation_temporality(sum.aggregation_temporality); + for dp in &mut data_points { + for (k, v) in &agg_temp { + dp.insert(k.clone(), v.clone()); } - vec_sum_json.push(sum_json); + dp.insert("is_monotonic".to_string(), Value::Bool(sum.is_monotonic)); } - let mut sum_json = Map::new(); - sum_json.extend(flatten_aggregation_temporality(sum.aggregation_temporality)); - sum_json.insert("is_monotonic".to_string(), Value::Bool(sum.is_monotonic)); - for data_point_json in &mut vec_sum_json { - for (key, value) in &sum_json { - data_point_json.insert(key.clone(), value.clone()); - } - } - vec_sum_json + data_points } /// otel metrics event has json object for histogram @@ -451,56 +434,65 @@ fn flatten_summary(summary: &Summary) -> Vec> { /// and returns a `Vec` of `Map` of the flattened json /// this function is called recursively for each metric record object in the otel metrics event pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec> { - let mut data_points_json = Vec::new(); - let mut metric_json = Map::new(); - let mut metric_type = String::default(); - match &metrics_record.data { - Some(metric::Data::Gauge(gauge)) => { - metric_type = "gauge".to_string(); - data_points_json.extend(flatten_gauge(gauge)); - } - Some(metric::Data::Sum(sum)) => { - metric_type = "sum".to_string(); - data_points_json.extend(flatten_sum(sum)); - } - Some(metric::Data::Histogram(histogram)) => { - metric_type = "histogram".to_string(); - data_points_json.extend(flatten_histogram(histogram)); - } - Some(metric::Data::ExponentialHistogram(exp_histogram)) => { - metric_type = "exponential_histogram".to_string(); - data_points_json.extend(flatten_exp_histogram(exp_histogram)); - } - Some(metric::Data::Summary(summary)) => { - metric_type = "summary".to_string(); - data_points_json.extend(flatten_summary(summary)); + let (mut data_points, metric_type) = match &metrics_record.data { + Some(metric::Data::Gauge(gauge)) => (flatten_gauge(gauge), "gauge"), + Some(metric::Data::Sum(sum)) => (flatten_sum(sum), "sum"), + Some(metric::Data::Histogram(histogram)) => (flatten_histogram(histogram), "histogram"), + Some(metric::Data::ExponentialHistogram(exp_histogram)) => ( + flatten_exp_histogram(exp_histogram), + "exponential_histogram", + ), + Some(metric::Data::Summary(summary)) => (flatten_summary(summary), "summary"), + None => return Vec::new(), + }; + + // Build metric-level fields once + let metric_name = Value::String(metrics_record.name.clone()); + let metric_desc = Value::String(metrics_record.description.clone()); + let metric_unit = Value::String(metrics_record.unit.clone()); + let metric_type_val = Value::String(metric_type.to_string()); + let mut metadata = Map::new(); + insert_attributes(&mut metadata, &metrics_record.metadata); + + if data_points.is_empty() { + let mut single = Map::new(); + single.insert("metric_name".to_string(), metric_name); + single.insert("metric_description".to_string(), metric_desc); + single.insert("metric_unit".to_string(), metric_unit); + single.insert("metric_type".to_string(), metric_type_val); + match &metrics_record.data { + Some(metric::Data::Sum(sum)) => { + single.extend(flatten_aggregation_temporality(sum.aggregation_temporality)); + single.insert("is_monotonic".to_string(), Value::Bool(sum.is_monotonic)); + } + Some(metric::Data::Histogram(histogram)) => { + single.extend(flatten_aggregation_temporality( + histogram.aggregation_temporality, + )); + } + Some(metric::Data::ExponentialHistogram(exp_histogram)) => { + single.extend(flatten_aggregation_temporality( + exp_histogram.aggregation_temporality, + )); + } + _ => {} } - None => {} + single.extend(metadata); + return vec![single]; } - metric_json.insert( - "metric_name".to_string(), - Value::String(metrics_record.name.clone()), - ); - metric_json.insert( - "metric_description".to_string(), - Value::String(metrics_record.description.clone()), - ); - metric_json.insert( - "metric_unit".to_string(), - Value::String(metrics_record.unit.clone()), - ); - metric_json.insert("metric_type".to_string(), Value::String(metric_type)); - insert_attributes(&mut metric_json, &metrics_record.metadata); - for data_point_json in &mut data_points_json { - for (key, value) in &metric_json { - data_point_json.insert(key.clone(), value.clone()); + + // Insert metric-level fields directly into each data point + for dp in &mut data_points { + dp.insert("metric_name".to_string(), metric_name.clone()); + dp.insert("metric_description".to_string(), metric_desc.clone()); + dp.insert("metric_unit".to_string(), metric_unit.clone()); + dp.insert("metric_type".to_string(), metric_type_val.clone()); + for (k, v) in &metadata { + dp.insert(k.clone(), v.clone()); } } - if data_points_json.is_empty() { - data_points_json.push(metric_json); - } - data_points_json + data_points } /// Common function to process resource metrics and merge resource-level fields @@ -516,71 +508,63 @@ fn process_resource_metrics( get_metric: fn(&M) -> &Metric, tenant_id: &str, ) -> Vec { + let _span = info_span!( + "process_resource_metrics", + resource_count = resource_metrics.len(), + ) + .entered(); let mut vec_otel_json = Vec::new(); for resource_metric in resource_metrics { - let mut resource_metrics_json = Map::new(); - - // Process resource attributes if present + // Build resource-level fields once per resource + let mut resource_fields = Map::new(); if let Some(resource) = get_resource(resource_metric) { - insert_attributes(&mut resource_metrics_json, &resource.attributes); - resource_metrics_json.insert( + insert_attributes(&mut resource_fields, &resource.attributes); + resource_fields.insert( "resource_dropped_attributes_count".to_string(), Value::Number(resource.dropped_attributes_count.into()), ); } + resource_fields.insert( + "resource_schema_url".to_string(), + Value::String(get_schema_url(resource_metric).to_string()), + ); - let mut vec_scope_metrics_json = Vec::new(); - let scope_metrics = get_scope_metrics(resource_metric); - - for scope_metric in scope_metrics { - let mut scope_metrics_json = Map::new(); - - let metrics = get_metrics(scope_metric); - for metric in metrics { - vec_scope_metrics_json.extend(flatten_metrics_record(get_metric(metric))); - } - - let date = chrono::Utc::now().date_naive().to_string(); - increment_metrics_collected_by_date(metrics.len() as u64, &date, tenant_id); + for scope_metric in get_scope_metrics(resource_metric) { + // Build envelope = resource + scope fields (once per scope) + let mut envelope = resource_fields.clone(); if let Some(scope) = get_scope(scope_metric) { - scope_metrics_json - .insert("scope_name".to_string(), Value::String(scope.name.clone())); - scope_metrics_json.insert( + envelope.insert("scope_name".to_string(), Value::String(scope.name.clone())); + envelope.insert( "scope_version".to_string(), Value::String(scope.version.clone()), ); - insert_attributes(&mut scope_metrics_json, &scope.attributes); - scope_metrics_json.insert( + insert_attributes(&mut envelope, &scope.attributes); + envelope.insert( "scope_dropped_attributes_count".to_string(), Value::Number(scope.dropped_attributes_count.into()), ); } - - scope_metrics_json.insert( + envelope.insert( "scope_schema_url".to_string(), Value::String(get_scope_schema_url(scope_metric).to_string()), ); - for scope_metric_json in &mut vec_scope_metrics_json { - for (key, value) in &scope_metrics_json { - scope_metric_json.insert(key.clone(), value.clone()); - } - } - } - - resource_metrics_json.insert( - "resource_schema_url".to_string(), - Value::String(get_schema_url(resource_metric).to_string()), - ); + let metrics = get_metrics(scope_metric); + let date = chrono::Utc::now().date_naive().to_string(); + increment_metrics_collected_by_date(metrics.len() as u64, &date, tenant_id); - for resource_metric_json in &mut vec_scope_metrics_json { - for (key, value) in &resource_metrics_json { - resource_metric_json.insert(key.clone(), value.clone()); + // Flatten each metric's data points and merge envelope in one pass + for metric in metrics { + let data_points = flatten_metrics_record(get_metric(metric)); + for mut dp in data_points { + for (k, v) in &envelope { + dp.insert(k.clone(), v.clone()); + } + vec_otel_json.push(Value::Object(dp)); + } } - - vec_otel_json.push(Value::Object(resource_metric_json.clone())); } } @@ -608,7 +592,14 @@ pub fn flatten_otel_metrics_protobuf( message: &ExportMetricsServiceRequest, tenant_id: &str, ) -> Vec { - process_resource_metrics( + let span = info_span!( + "flatten_otel_metrics_protobuf", + resource_metrics_count = message.resource_metrics.len(), + output_count = tracing::field::Empty, + ); + let _guard = span.enter(); + + let result = process_resource_metrics( &message.resource_metrics, |record| record.resource.as_ref(), |record| &record.scope_metrics, @@ -618,7 +609,10 @@ pub fn flatten_otel_metrics_protobuf( |scope_metric| &scope_metric.metrics, |metric| metric, tenant_id, - ) + ); + + span.record("output_count", result.len()); + result } /// otel metrics event has json object for aggregation temporality diff --git a/src/parseable/staging/mod.rs b/src/parseable/staging/mod.rs index f5bd90cee..5d1a3a071 100644 --- a/src/parseable/staging/mod.rs +++ b/src/parseable/staging/mod.rs @@ -17,6 +17,8 @@ * */ +use std::sync::PoisonError; + use crate::{parseable::StreamNotFound, tenants::TenantNotFound}; pub mod reader; @@ -36,6 +38,6 @@ pub enum StagingError { StreamNotFound(#[from] StreamNotFound), #[error("{0}")] TenantNotFound(#[from] TenantNotFound), - // #[error("Metadata Error: {0}")] - // Metadata(#[from] MetadataError), + #[error("{0}")] + PoisonError(#[from] PoisonError), } diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index f2cfc6713..a92f89d85 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -18,7 +18,7 @@ */ use std::{ - fs::{File, remove_file}, + fs::File, io::{self, BufReader, Read, Seek, SeekFrom}, path::PathBuf, sync::Arc, @@ -30,60 +30,13 @@ use arrow_ipc::{MessageHeader, reader::StreamReader, root_as_message_unchecked}; use arrow_schema::Schema; use byteorder::{LittleEndian, ReadBytesExt}; use itertools::kmerge_by; -use tracing::error; +use tracing::{error, info_span}; use crate::{ event::DEFAULT_TIMESTAMP_KEY, utils::arrow::{adapt_batch, reverse}, }; -#[derive(Debug)] -pub struct MergedRecordReader { - pub readers: Vec>>, -} - -impl MergedRecordReader { - pub fn try_new(files: &[PathBuf]) -> Result { - let mut readers = Vec::with_capacity(files.len()); - - for file in files { - //remove empty files before reading - match file.metadata() { - Err(err) => { - error!("Error when trying to read file: {file:?}; error = {err}"); - continue; - } - Ok(metadata) if metadata.len() == 0 => { - error!("Empty file detected, removing it: {:?}", file); - remove_file(file).unwrap(); - continue; - } - Ok(_) => { - let Ok(reader) = - StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None) - else { - error!("Invalid file detected, ignoring it: {:?}", file); - continue; - }; - - readers.push(reader); - } - } - } - - Ok(Self { readers }) - } - - pub fn merged_schema(&self) -> Schema { - Schema::try_merge( - self.readers - .iter() - .map(|reader| reader.schema().as_ref().clone()), - ) - .unwrap() - } -} - #[derive(Debug)] pub struct MergedReverseRecordReader { pub readers: Vec>>>, @@ -91,6 +44,7 @@ pub struct MergedReverseRecordReader { impl MergedReverseRecordReader { pub fn try_new(file_paths: &[PathBuf]) -> Self { + let _span = info_span!("open_arrow_files", file_count = file_paths.len()).entered(); let mut readers = Vec::with_capacity(file_paths.len()); for path in file_paths { match File::open(path) { @@ -121,9 +75,8 @@ impl MergedReverseRecordReader { ) -> impl Iterator { let adapted_readers = self.readers.into_iter().map(|reader| reader.flatten()); kmerge_by(adapted_readers, move |a: &RecordBatch, b: &RecordBatch| { - // Capture time_partition by value - let a_time = get_timestamp_millis(a, time_partition.clone()); - let b_time = get_timestamp_millis(b, time_partition.clone()); + let a_time = get_timestamp_millis(a, time_partition.as_deref()); + let b_time = get_timestamp_millis(b, time_partition.as_deref()); a_time > b_time }) .map(|batch| reverse(&batch)) @@ -140,19 +93,16 @@ impl MergedReverseRecordReader { } } -fn get_timestamp_millis(batch: &RecordBatch, time_partition: Option) -> i64 { +fn get_timestamp_millis(batch: &RecordBatch, time_partition: Option<&str>) -> i64 { match time_partition { - Some(time_partition) => { - let time_partition = time_partition.as_str(); - match batch.column_by_name(time_partition) { - Some(column) => column - .as_any() - .downcast_ref::() - .unwrap() - .value(0), - None => get_default_timestamp_millis(batch), - } - } + Some(time_partition) => match batch.column_by_name(time_partition) { + Some(column) => column + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + None => get_default_timestamp_millis(batch), + }, None => get_default_timestamp_millis(batch), } } @@ -288,7 +238,12 @@ pub fn get_reverse_reader( // reset reader reader.rewind()?; - Ok(StreamReader::try_new(BufReader::new(OffsetReader::new(reader, messages)), None).unwrap()) + StreamReader::try_new(BufReader::new(OffsetReader::new(reader, messages)), None).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid arrow stream: {e}"), + ) + }) } // return limit for @@ -562,7 +517,8 @@ mod tests { write_test_batches(&file_path, &schema, &batches)?; // Now read them back in reverse order - let mut reader = MergedReverseRecordReader::try_new(&[file_path]).merged_iter(schema, None); + let mut reader = + MergedReverseRecordReader::try_new(&[file_path.into()]).merged_iter(schema, None); // We should get batches in reverse order: 3, 2, 1 // But first message should be schema, so we'll still read them in order @@ -648,7 +604,7 @@ mod tests { } #[test] - fn test_get_reverse_reader_single_message() -> io::Result<()> { + fn testget_reverse_reader_single_message() -> io::Result<()> { let dir = TempDir::new().unwrap(); let file_path = dir.path().join("test_single.data.arrows"); @@ -663,7 +619,8 @@ mod tests { // Write batch to file write_test_batches(&file_path, &schema, &[batch])?; - let mut reader = MergedReverseRecordReader::try_new(&[file_path]).merged_iter(schema, None); + let mut reader = + MergedReverseRecordReader::try_new(&[file_path.into()]).merged_iter(schema, None); // Should get the batch let result_batch = reader.next().expect("Failed to read batch"); diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 1230946ab..b7d22ffa3 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -33,7 +33,6 @@ use parquet::{ schema::types::ColumnPath, }; use relative_path::RelativePathBuf; -use std::io::BufReader; use std::{ collections::{HashMap, HashSet}, fs::{self, File, OpenOptions, remove_file, write}, @@ -42,8 +41,9 @@ use std::{ sync::{Arc, Mutex, RwLock}, time::{Instant, SystemTime, UNIX_EPOCH}, }; +use std::{io::BufReader, sync::PoisonError}; use tokio::task::JoinSet; -use tracing::{error, info, trace, warn}; +use tracing::{Instrument, error, info, info_span, instrument, trace, warn}; use ulid::Ulid; use crate::{ @@ -67,7 +67,7 @@ use super::{ ARROW_FILE_EXTENSION, LogStream, PART_FILE_EXTENSION, staging::{ StagingError, - reader::{MergedRecordReader, MergedReverseRecordReader}, + reader::MergedReverseRecordReader, writer::{DiskWriter, Writer}, }, }; @@ -146,14 +146,28 @@ impl Stream { custom_partition_values: &HashMap, stream_type: StreamType, ) -> Result<(), StagingError> { - let mut guard = match self.writer.lock() { - Ok(guard) => guard, - Err(poisoned) => { - error!( - "Writer lock poisoned while ingesting data for stream {}", - self.stream_name - ); - poisoned.into_inner() + let _span = info_span!( + "stream_push", + stream_name = %self.stream_name, + num_rows = record.num_rows(), + ) + .entered(); + + let mut guard = { + let _lock_span = info_span!("acquire_writer_lock").entered(); + match self.writer.lock() { + Ok(guard) => guard, + Err(poisoned) => { + error!( + "Writer lock poisoned while ingesting data for stream {}", + self.stream_name + ); + + return Err(StagingError::PoisonError(PoisonError::new(format!( + "Writer lock poisoned while ingesting data for stream {} - {}", + self.stream_name, poisoned + )))); + } } }; if self.options.mode != Mode::Query || stream_type == StreamType::Internal { @@ -461,6 +475,12 @@ impl Stream { } /// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal` + #[instrument( + name = "prepare_parquet", + level = "info", + skip(self, tenant_id), + fields(stream_name = %self.stream_name) + )] pub fn prepare_parquet( &self, init_signal: bool, @@ -521,20 +541,32 @@ impl Stream { } pub fn flush(&self, forced: bool) { - let mut writer = match self.writer.lock() { - Ok(guard) => guard, - Err(poisoned) => { - error!( + let _span = info_span!("flush", stream_name = %self.stream_name, forced).entered(); + // Swap out stale writers under the lock, drop them after releasing it. + // DiskWriter::Drop does I/O (IPC finish + file rename) so dropping + // outside the lock avoids blocking concurrent push() calls. + let stale_writers = { + let mut writer = self.writer.lock().unwrap_or_else(|_| { + panic!( "Writer lock poisoned while flushing data for stream {}", self.stream_name - ); - poisoned.into_inner() + ) + }); + writer.mem.clear(); + + let mut old_disk = HashMap::new(); + std::mem::swap(&mut writer.disk, &mut old_disk); + if !forced { + for (k, v) in old_disk.drain() { + if v.is_current() { + writer.disk.insert(k, v); + } + } } + old_disk }; - // Flush memory - writer.mem.clear(); - // Drop schema -> disk writer mapping, triggers flush to disk - writer.disk.retain(|_, w| !forced && w.is_current()); + // DiskWriter::Drop I/O happens here, outside the lock + drop(stale_writers); } fn parquet_writer_props( @@ -632,12 +664,20 @@ impl Stream { shutdown_signal: bool, tenant_id: &Option, ) -> Result, StagingError> { + let span = info_span!( + "convert_disk_files_to_parquet", + stream_name = %self.stream_name, + file_group_count = tracing::field::Empty, + ); + let _guard = span.enter(); + let mut schemas = Vec::new(); let now = SystemTime::now(); let group_minute = minute_from_system_time(now) - 1; let staging_files = self.arrow_files_grouped_exclude_time(now, group_minute, init_signal, shutdown_signal); + span.record("file_group_count", staging_files.len()); if staging_files.is_empty() { self.reset_staging_metrics(tenant_id); return Ok(None); @@ -688,6 +728,11 @@ impl Stream { props: &WriterProperties, time_partition: Option<&String>, ) -> Result { + let _span = info_span!( + "write_parquet_part_file", + stream_name = %self.stream_name, + ) + .entered(); let mut part_file = OpenOptions::new() .create(true) .append(true) @@ -809,7 +854,7 @@ impl Stream { pub fn updated_schema(&self, current_schema: Schema) -> Schema { let staging_files = self.arrow_files(); - let record_reader = MergedRecordReader::try_new(&staging_files).unwrap(); + let record_reader = MergedReverseRecordReader::try_new(&staging_files); if record_reader.readers.is_empty() { return current_schema; } @@ -1130,6 +1175,12 @@ impl Stream { } /// First flushes arrows onto disk and then converts the arrow into parquet + #[instrument( + name = "flush_and_convert", + level = "info", + skip(self, tenant_id), + fields(stream_name = %self.stream_name) + )] pub fn flush_and_convert( &self, init_signal: bool, @@ -1299,9 +1350,13 @@ impl Streams { }; for stream in streams { let tenant = tenant_id.clone(); - joinset.spawn(async move { - stream.flush_and_convert(init_signal, shutdown_signal, &Some(tenant)) - }); + let span = info_span!("stream_sync", stream_name = %stream.stream_name); + joinset.spawn( + async move { + stream.flush_and_convert(init_signal, shutdown_signal, &Some(tenant)) + } + .instrument(span), + ); } } } diff --git a/src/storage/field_stats.rs b/src/storage/field_stats.rs index ccc606b5e..6a057b9c8 100644 --- a/src/storage/field_stats.rs +++ b/src/storage/field_stats.rs @@ -171,11 +171,11 @@ pub async fn calculate_field_stats( )?; let mut p_custom_fields = HashMap::new(); p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string()); + let schema = PARSEABLE + .get_stream(DATASET_STATS_STREAM_NAME, tenant_id)? + .get_schema_raw(); for json in vec_json { - let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length - let schema = PARSEABLE - .get_stream(DATASET_STATS_STREAM_NAME, tenant_id)? - .get_schema_raw(); + let origin_size = crate::handlers::http::modal::utils::ingest_utils::json_byte_size(&json); json::Event { json, p_timestamp: parquet_ts, diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 67aeac609..8c974b8d9 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -39,8 +39,7 @@ use std::sync::Arc; use std::time::Duration; use std::time::Instant; use tokio::task::JoinSet; -use tracing::info; -use tracing::{error, warn}; +use tracing::{Instrument, error, info, info_span, warn}; use ulid::Ulid; use crate::catalog::{self, snapshot::Snapshot}; @@ -1188,21 +1187,25 @@ pub fn sync_all_streams(joinset: &mut JoinSet>) { for stream_name in PARSEABLE.streams.list(&tenant_id) { let object_store = object_store.clone(); let id = tenant_id.clone(); - joinset.spawn(async move { - let start = Instant::now(); - let result = object_store - .upload_files_from_staging(&stream_name, id) - .await; - if let Err(ref e) = result { - error!("Failed to upload files from staging for stream {stream_name}: {e}"); - } else { - info!( - "Completed object_store_sync for stream- {stream_name} in {} ms", - start.elapsed().as_millis() - ); + let span = info_span!("stream_upload", stream_name = %stream_name); + joinset.spawn( + async move { + let start = Instant::now(); + let result = object_store + .upload_files_from_staging(&stream_name, id) + .await; + if let Err(ref e) = result { + error!("Failed to upload files from staging for stream {stream_name}: {e}"); + } else { + info!( + "Completed object_store_sync for stream- {stream_name} in {} ms", + start.elapsed().as_millis() + ); + } + result } - result - }); + .instrument(span), + ); } } } diff --git a/src/sync.rs b/src/sync.rs index 3dc4f7d19..09f8a3f02 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -21,11 +21,25 @@ use futures::FutureExt; use std::collections::HashMap; use std::future::Future; use std::panic::AssertUnwindSafe; +use std::sync::atomic::{AtomicBool, Ordering}; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinSet; use tokio::time::{Duration, Instant, interval_at, sleep}; use tokio::{select, task}; -use tracing::{error, info, trace, warn}; +use tracing::{Instrument, error, info, info_span, trace, warn}; + +static LOCAL_SYNC_RUNNING: AtomicBool = AtomicBool::new(false); +static REMOTE_SYNC_RUNNING: AtomicBool = AtomicBool::new(false); + +/// RAII guard that clears a sync-running flag on drop, so a panic inside the +/// sync body cannot leave the flag stuck at `true` and wedge future ticks. +struct SyncRunningGuard(&'static AtomicBool); + +impl Drop for SyncRunningGuard { + fn drop(&mut self) { + self.0.store(false, Ordering::SeqCst); + } +} use crate::alerts::alert_enums::AlertTask; use crate::alerts::alerts_utils; @@ -127,22 +141,29 @@ pub fn object_store_sync() -> ( loop { select! { _ = sync_interval.tick() => { - trace!("Syncing Parquets to Object Store... "); - - // Monitor the duration of sync_all_streams execution - monitor_task_duration( - "object_store_sync_all_streams", - Duration::from_secs(PARSEABLE.options.object_store_sync_threshold), - || async { - let mut joinset = JoinSet::new(); - sync_all_streams(&mut joinset); - - // Wait for all spawned tasks to complete - while let Some(res) = joinset.join_next().await { - log_join_result(res, "object store sync"); + if REMOTE_SYNC_RUNNING.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() { + warn!("Previous object_store_sync cycle still running, skipping this tick"); + continue; + } + let _guard = SyncRunningGuard(&REMOTE_SYNC_RUNNING); + async { + trace!("Syncing Parquets to Object Store... "); + + // Monitor the duration of sync_all_streams execution + monitor_task_duration( + "object_store_sync_all_streams", + Duration::from_secs(PARSEABLE.options.object_store_sync_threshold), + || async { + let mut joinset = JoinSet::new(); + sync_all_streams(&mut joinset); + + // Wait for all spawned tasks to complete + while let Some(res) = joinset.join_next().await { + log_join_result(res, "object store sync"); + } } - } - ).await; + ).await; + }.instrument(info_span!("object_store_sync_cycle")).await; }, res = &mut inbox_rx => { match res { @@ -194,20 +215,27 @@ pub fn local_sync() -> ( loop { select! { _ = sync_interval.tick() => { + if LOCAL_SYNC_RUNNING.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() { + warn!("Previous local_sync cycle still running, skipping this tick"); + continue; + } + let _guard = SyncRunningGuard(&LOCAL_SYNC_RUNNING); // Monitor the duration of flush_and_convert execution - monitor_task_duration( - "local_sync_flush_and_convert", - Duration::from_secs(PARSEABLE.options.local_sync_threshold), - || async { - let mut joinset = JoinSet::new(); - PARSEABLE.streams.flush_and_convert(&mut joinset, false, false); - - // Wait for all spawned tasks to complete - while let Some(res) = joinset.join_next().await { - log_join_result(res, "flush and convert"); + async { + monitor_task_duration( + "local_sync_flush_and_convert", + Duration::from_secs(PARSEABLE.options.local_sync_threshold), + || async { + let mut joinset = JoinSet::new(); + PARSEABLE.streams.flush_and_convert(&mut joinset, false, false); + + // Wait for all spawned tasks to complete + while let Some(res) = joinset.join_next().await { + log_join_result(res, "flush and convert"); + } } - } - ).await; + ).await; + }.instrument(info_span!("local_sync_cycle")).await; }, res = &mut inbox_rx => { match res { diff --git a/src/telemetry.rs b/src/telemetry.rs new file mode 100644 index 000000000..4e4d7dfdb --- /dev/null +++ b/src/telemetry.rs @@ -0,0 +1,128 @@ +/* + * Parseable Server (C) 2022 - 2025 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use opentelemetry_otlp::Protocol; +use opentelemetry_otlp::SpanExporter; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::{ + Resource, + propagation::TraceContextPropagator, + trace::{BatchSpanProcessor, SdkTracerProvider}, +}; +// Consts describing the env vars +const OTEL_EXPORTER_OTLP_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT"; +const OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"; +const OTEL_EXPORTER_OTLP_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_PROTOCOL"; + +/// Initialise an OTLP tracer provider. +/// +/// **Required env var:** +/// - \`OTEL_EXPORTER_OTLP_ENDPOINT\` — collector address. +/// For HTTP exporters the SDK appends the signal path automatically: +/// e.g. \`http://localhost:4318\` → \`http://localhost:4318/v1/traces\`. +/// Set a signal-specific var \`OTEL_EXPORTER_OTLP_TRACES_ENDPOINT\` to +/// supply a full URL without any path suffix being added. +/// +/// **Optional env vars (all read by the SDK automatically):** +/// - \`OTEL_EXPORTER_OTLP_PROTOCOL\` — transport + serialisation (default: \`http/json\`): +/// - \`grpc\` → gRPC / tonic (Jaeger, Tempo, …) +/// - \`http/json\` → HTTP + JSON (Parseable OSS ingest at \`/v1/traces\`) +/// - \`http/protobuf\` → HTTP + protobuf +/// - \`OTEL_EXPORTER_OTLP_HEADERS\` — comma-separated \`key=value\` pairs forwarded +/// as gRPC metadata or HTTP headers, e.g. +/// \`authorization=Basic ,x-p-stream=my-stream,x-p-log-source=otel-traces\` +/// +/// Returns \`None\` when \`OTEL_EXPORTER_OTLP_ENDPOINT\` or `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` is not set (OTEL disabled). +/// The caller must call \`provider.shutdown()\` before process exit. +pub fn init_tracing() -> Option { + // Only used to decide whether OTEL is enabled; the SDK reads it again + // from env to build the exporter (which also appends /v1/traces for HTTP). + if std::env::var(OTEL_EXPORTER_OTLP_ENDPOINT).is_err() + && std::env::var(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT).is_err() + { + return None; + } + + let protocol = + std::env::var(OTEL_EXPORTER_OTLP_PROTOCOL).unwrap_or_else(|_| "http/json".to_string()); + + // Build the exporter using the SDK's env-var-aware builders. + // We intentionally do NOT call .with_endpoint() / .with_headers() / + // .with_metadata() here — the SDK reads OTEL_EXPORTER_OTLP_ENDPOINT and + // OTEL_EXPORTER_OTLP_HEADERS from the environment automatically, which + // preserves correct path-appending behaviour for HTTP exporters. + let exporter = match protocol.as_str() { + // ── gRPC ───────────────────────────────────────────────────────────── + "grpc" => opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .build(), + // ── HTTP/Protobuf ──────────────────────────────────────────────────── + "http/protobuf" => SpanExporter::builder() + .with_http() + .with_protocol(Protocol::HttpBinary) + .build(), + // ── HTTP/JSON ───────────────────────────────────────────────────────── + "http/json" => SpanExporter::builder() + .with_http() + .with_protocol(Protocol::HttpJson) + .build(), + // return none if an invalid value is set + other => { + tracing::warn!( + "Unknown OTEL_EXPORTER_OTLP_PROTOCOL value '{}'; disabling OTEL tracing. \ + Supported values: grpc, http/protobuf, http/json", + other + ); + return None; + } + }; + + let exporter = exporter + .map_err(|e| tracing::warn!("Failed to build OTEL span exporter: {}", e)) + .ok()?; + + // Declare conformance to OTel Semantic Conventions v1.56.0 via schema_url. + // Downstream collectors (e.g., the Schema Translate Processor) can apply + // migration tables to rewrite attribute names across semconv versions — + // so even if upstream semconv drifts, emitted telemetry remains translatable. + let resource = Resource::builder_empty() + .with_service_name("parseable") + .with_schema_url( + std::iter::empty::(), + "https://opentelemetry.io/schemas/1.56.0", + ) + .build(); + + let processor = BatchSpanProcessor::builder(exporter).build(); + + let provider = SdkTracerProvider::builder() + .with_span_processor(processor) + .with_resource(resource) + .build(); + + opentelemetry::global::set_tracer_provider(provider.clone()); + + // Register the W3C TraceContext propagator globally. + // This is REQUIRED for: + // - Incoming HTTP header extraction (traceparent/tracestate) + // - Cross-thread channel propagation via inject/extract + // Without this, propagator.extract() returns an empty context. + opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); + + Some(provider) +} diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 817753f2f..1027baec6 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -23,6 +23,7 @@ use flatten::{convert_to_array, generic_flattening, has_more_than_max_allowed_le use serde::de::Visitor; use serde_json; use serde_json::Value; +use tracing::info_span; use crate::event::format::LogSource; use crate::metadata::SchemaVersion; @@ -224,6 +225,7 @@ pub fn convert_array_to_object( schema_version: SchemaVersion, log_source: &LogSource, ) -> Result, anyhow::Error> { + let _span = info_span!("convert_array_to_object").entered(); if time_partition.is_some() || custom_partition.is_some() { match body { Value::Array(arr) => process_partitioned_array(