Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ base64 = "0.22.0"
cookie = "0.18.1"
hex = "0.4"
openid = { version = "0.18.3", default-features = false, features = ["rustls"] }
rustls = { version = "0.23", default-features = false, features = ["ring", "std"] }
rustls = { version = "0.23", default-features = false, features = [
"ring",
"std",
] }
rustls-pemfile = "2.1.2"
sha2 = "0.10.8"

Expand Down Expand Up @@ -104,10 +107,27 @@ opentelemetry-proto = { git = "https://github.com/open-telemetry/opentelemetry-r
"metrics",
"trace",
] }
prometheus = { version = "0.13.4", default-features = false, features = ["process"] }
prometheus = { version = "0.13.4", default-features = false, features = [
"process",
] }
prometheus-parse = "0.2.5"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "time"] }
tracing = "0.1.44"
tracing-subscriber = { version = "0.3.23", features = [
"env-filter",
"time",
"registry",
] }

# telemetry
tracing-opentelemetry = "0.32.1"
opentelemetry = "0.31.0"
opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.31.1", features = [
"grpc-tonic",
"http-proto",
"http-json",
] }
tracing-actix-web = "0.7"

# Time and Date
chrono = "0.4"
Expand Down
48 changes: 47 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

use clap::Parser;
use clap::{Parser, value_parser};
use std::{env, fs, path::PathBuf};

use url::Url;
Expand Down Expand Up @@ -148,6 +148,52 @@ pub struct Options {
)]
pub address: String,

// Actix request timeout in seconds
#[arg(
long,
env = "P_ACTIX_REQUEST_TIMEOUT",
default_value = "5",
help = "Client request timeout"
)]
pub request_timeout: u64,

// Actix keep alive in seconds
#[arg(
long,
env = "P_ACTIX_KEEP_ALIVE",
default_value = "5",
help = "Server keep-alive"
)]
pub keep_alive: u64,

// Actix num workers
#[arg(
long,
env = "P_ACTIX_NUM_WORKERS",
default_value_t = num_cpus::get() as u64,
value_parser = value_parser!(u64).range(1..),
help = "Number of workers for actix-web"
)]
pub num_workers: u64,

// Actix connections backlog
#[arg(
long,
env = "P_ACTIX_BACKLOG",
default_value = "2048",
help = "Maximum number of pending connections"
)]
pub connection_backlog: u32,

// Actix max connections
#[arg(
long,
env = "P_ACTIX_MAX_CONNECTIONS",
default_value = "25000",
help = "Per-worker maximum number of concurrent connections"
)]
pub max_connections: usize,

#[arg(
long = "origin",
env = "P_ORIGIN_URI",
Expand Down
17 changes: 13 additions & 4 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tracing::error;
use tracing::{error, info_span};

use super::EventFormat;
use super::{detect_schema_conflicts, rename_conflicting_fields_in_json};
Expand Down Expand Up @@ -86,8 +86,12 @@ impl EventFormat for Event {
// IMPORTANT: Detect conflicts BEFORE update_field_type_in_schema, because
// update_field_type_in_schema may override types (e.g., force Utf8 to Timestamp
// if existing schema has Timestamp), which would hide the actual conflict.
let raw_inferred_schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok))
.map_err(|err| anyhow!("Could not infer schema for this event due to err {:?}", err))?;
let raw_inferred_schema = {
let _span = info_span!("infer_json_schema", record_count = value_arr.len()).entered();
infer_json_schema_from_iterator(value_arr.iter().map(Ok)).map_err(|err| {
anyhow!("Could not infer schema for this event due to err {:?}", err)
})?
};

// Detect schema conflicts using raw inferred schema vs existing stream schema
// Pass the actual values and schema_version to check if values can be coerced to existing types
Expand All @@ -110,7 +114,11 @@ impl EventFormat for Event {
collect_keys(value_arr.iter()).expect("fields can be collected from array of objects");

let mut is_first = false;
let (value_arr, schema) = match derive_arrow_schema(stream_schema, fields) {
let res = {
let _span = info_span!("derive_arrow_schema").entered();
derive_arrow_schema(stream_schema, fields)
};
let (value_arr, schema) = match res {
Ok(schema) => (value_arr, schema),
Err(_) => {
let mut infer_schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok))
Expand Down Expand Up @@ -155,6 +163,7 @@ impl EventFormat for Event {

// Convert the Data type (defined above) to arrow record batch
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, anyhow::Error> {
let _span = info_span!("json_to_recordbatch", record_count = data.len()).entered();
let array_capacity = round_upto_multiple_of_64(data.len());
let mut reader = ReaderBuilder::new(schema)
.with_batch_size(array_capacity)
Expand Down
2 changes: 2 additions & 0 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use arrow_schema::{DataType, Field, Schema, TimeUnit};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tracing::info_span;

use crate::{
handlers::TelemetryType,
Expand Down Expand Up @@ -167,6 +168,7 @@ pub trait EventFormat: Sized {
schema_version: SchemaVersion,
p_custom_fields: &HashMap<String, String>,
) -> Result<(RecordBatch, bool), AnyError> {
let _span = info_span!("into_recordbatch").entered();
let p_timestamp = self.get_p_timestamp();
let (data, schema, is_first) = self.to_data(
storage_schema,
Expand Down
13 changes: 13 additions & 0 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use arrow_schema::{Field, Fields, Schema};
use itertools::Itertools;
use std::sync::Arc;

use tracing::{info_span, instrument};

use self::error::EventError;
use crate::{
LOCK_EXPECT,
Expand Down Expand Up @@ -60,6 +62,16 @@ pub struct Event {

// Events holds the schema related to a each event for a single log stream
impl Event {
#[instrument(
name = "event_process",
level = "info",
skip_all,
fields(
stream_name = %self.stream_name,
num_rows = self.rb.num_rows(),
is_first_event = self.is_first_event
)
)]
pub fn process(self) -> Result<(), EventError> {
let mut key = get_schema_key(&self.rb.schema().fields);
if self.time_partition.is_some() {
Expand Down Expand Up @@ -144,6 +156,7 @@ pub fn commit_schema(
schema: Arc<Schema>,
tenant_id: &Option<String>,
) -> Result<(), StagingError> {
let _span = info_span!("commit_schema", stream_name).entered();
let mut stream_metadata = PARSEABLE.streams.write().expect("lock poisoned");
let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
let map = &mut stream_metadata
Expand Down
21 changes: 18 additions & 3 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
*
*/

use std::{fmt, path::Path, sync::Arc};
use std::{fmt, path::Path, sync::Arc, time::Duration};

use actix_web::{App, HttpServer, middleware::from_fn, web::ServiceConfig};
use actix_web::{App, HttpServer, http::KeepAlive, middleware::from_fn, web::ServiceConfig};
use actix_web_prometheus::PrometheusMetrics;
use anyhow::Context;
use arrow::datatypes::ArrowNativeType;
use async_trait::async_trait;
Comment thread
parmesant marked this conversation as resolved.
use base64::{Engine, prelude::BASE64_STANDARD};
use bytes::Bytes;
Expand Down Expand Up @@ -121,8 +122,22 @@ pub trait ParseableServer {

// Create the HTTP server
let http_server = HttpServer::new(create_app_fn)
.workers(num_cpus::get())
.workers(PARSEABLE.options.num_workers.as_usize())
.keep_alive(KeepAlive::Timeout(Duration::from_secs(
PARSEABLE.options.keep_alive,
)))
.client_request_timeout(Duration::from_secs(PARSEABLE.options.request_timeout))
.backlog(PARSEABLE.options.connection_backlog)
.max_connections(PARSEABLE.options.max_connections)
.shutdown_timeout(60);
tracing::warn!(
"Starting Query server with-\nNum workers: {}\nKeep Alive: {}\nRequest timeout: {}\nConnection backlog: {}\nMax connections: {}",
PARSEABLE.options.num_workers,
PARSEABLE.options.keep_alive,
PARSEABLE.options.request_timeout,
PARSEABLE.options.connection_backlog,
PARSEABLE.options.max_connections
);
Comment thread
parmesant marked this conversation as resolved.

// Start the server with or without TLS
let srv = if let Some(config) = ssl {
Expand Down
Loading
Loading