From 2aa314b4f4893769eb5547348eea83829153805b Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Wed, 15 Apr 2026 15:53:32 +0530 Subject: [PATCH] feat: Added a new trait to expose SchemaProvider --- Cargo.toml | 1 + src/lib.rs | 6 ++ src/query/mod.rs | 86 ++++++++++++++++++----------- src/query/stream_schema_provider.rs | 2 +- 4 files changed, 62 insertions(+), 33 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 13ffb0070..471b7fd3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ arrow-json = "57.1.0" arrow-schema = { version = "57.1.0", features = ["serde"] } arrow-select = "57.1.0" datafusion = "51.0.0" +datafusion-proto = "51.0.0" object_store = { version = "0.12.4", features = [ "cloud", "aws", diff --git a/src/lib.rs b/src/lib.rs index 5c3704d5c..2d68ff907 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,7 +56,12 @@ pub mod validator; use std::time::Duration; // Public re-exports of crates being used in enterprise +pub use arrow_array; +pub use arrow_flight; +pub use arrow_ipc; +pub use catalog as parseable_catalog; pub use datafusion; +pub use datafusion_proto; pub use handlers::http::modal::{ ParseableServer, ingest_server::IngestServer, query_server::QueryServer, server::Server, }; @@ -65,6 +70,7 @@ pub use openid; pub use opentelemetry_proto; use parseable::PARSEABLE; use reqwest::{Client, ClientBuilder}; +pub use utils as parseable_utils; // It is very unlikely that panic will occur when dealing with locks. pub const LOCK_EXPECT: &str = "Thread shouldn't panic while holding a lock"; diff --git a/src/query/mod.rs b/src/query/mod.rs index e88ef802f..541345cee 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -25,6 +25,7 @@ use arrow_schema::SchemaRef; use chrono::NaiveDateTime; use chrono::{DateTime, Duration, Utc}; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::catalog::SchemaProvider; use datafusion::common::tree_node::Transformed; use datafusion::execution::disk_manager::DiskManager; use datafusion::execution::{ @@ -45,7 +46,7 @@ use datafusion::sql::sqlparser::dialect::PostgreSqlDialect; use futures::Stream; use futures::stream::select_all; use itertools::Itertools; -use once_cell::sync::Lazy; +use once_cell::sync::{Lazy, OnceCell}; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use std::ops::Bound; @@ -57,7 +58,6 @@ use sysinfo::System; use tokio::runtime::Runtime; use self::error::ExecuteError; -use self::stream_schema_provider::GlobalSchemaProvider; pub use self::stream_schema_provider::PartialTimeFilter; use crate::alerts::alert_structs::Conditions; use crate::alerts::alerts_utils::get_filter_string; @@ -70,11 +70,11 @@ use crate::handlers::http::query::QueryError; use crate::metrics::increment_bytes_scanned_in_query_by_date; use crate::option::Mode; use crate::parseable::{DEFAULT_TENANT, PARSEABLE}; -use crate::storage::{ObjectStorageProvider, ObjectStoreFormat}; +use crate::query::stream_schema_provider::GlobalSchemaProvider; +use crate::storage::{ObjectStorage, ObjectStorageProvider, ObjectStoreFormat}; use crate::utils::time::TimeRange; -// pub static QUERY_SESSION: Lazy = -// Lazy::new(|| Query::create_session_context(PARSEABLE.storage())); +pub static SCHEMA_PROVIDER: OnceCell> = OnceCell::new(); pub static QUERY_SESSION_STATE: Lazy = Lazy::new(|| Query::create_session_state(PARSEABLE.storage())); @@ -90,6 +90,15 @@ pub static QUERY_SESSION: Lazy = Lazy::new(|| { } }); +/// Trait to enable implementation of SchemaProvider +pub trait ParseableSchemaProvider: Send + Sync { + fn new_provider( + &self, + storage: Option>, + tenant_id: &Option, + ) -> Box; +} + pub struct InMemorySessionContext { session_context: Arc>, } @@ -104,18 +113,23 @@ impl InMemorySessionContext { } pub fn add_schema(&self, tenant_id: &str) { + let schema_provider = if let Some(provider) = SCHEMA_PROVIDER.get() { + provider.new_provider( + Some(PARSEABLE.storage().get_object_store()), + &Some(tenant_id.to_owned()), + ) + } else { + Box::new(GlobalSchemaProvider { + storage: PARSEABLE.storage().get_object_store(), + tenant_id: Some(tenant_id.to_owned()), + }) + }; self.session_context .write() .expect("SessionContext should be writeable") .catalog("datafusion") .expect("Default catalog should be available") - .register_schema( - tenant_id, - Arc::new(GlobalSchemaProvider { - storage: PARSEABLE.storage().get_object_store(), - tenant_id: Some(tenant_id.to_owned()), - }), - ) + .register_schema(tenant_id, schema_provider.into()) .expect("Should be able to register new schema"); } @@ -194,29 +208,41 @@ impl Query { // register multiple schemas if let Some(tenants) = PARSEABLE.list_tenants() { for t in tenants.iter() { - let schema_provider = Arc::new(GlobalSchemaProvider { - storage: storage.get_object_store(), - tenant_id: Some(t.clone()), - }); - let _ = catalog.register_schema(t, schema_provider); + let schema_provider = if let Some(provider) = SCHEMA_PROVIDER.get() { + provider.new_provider( + Some(PARSEABLE.storage().get_object_store()), + &Some(t.to_owned()), + ) + } else { + Box::new(GlobalSchemaProvider { + storage: PARSEABLE.storage().get_object_store(), + tenant_id: Some(t.to_owned()), + }) + }; + let _ = catalog.register_schema(t, schema_provider.into()); } } } else { // register just one schema - let schema_provider = Arc::new(GlobalSchemaProvider { - storage: storage.get_object_store(), - tenant_id: None, - }); + let schema_provider = if let Some(provider) = SCHEMA_PROVIDER.get() { + provider.new_provider(Some(PARSEABLE.storage().get_object_store()), &None) + } else { + Box::new(GlobalSchemaProvider { + storage: PARSEABLE.storage().get_object_store(), + tenant_id: None, + }) + }; + let _ = catalog.register_schema( &state.config_options().catalog.default_schema, - schema_provider, + schema_provider.into(), ); } SessionContext::new_with_state(state) } - fn create_session_state(storage: Arc) -> SessionState { + pub fn create_session_state(storage: Arc) -> SessionState { let runtime_config = storage .get_datafusion_runtime() .with_disk_manager_builder(DiskManager::builder()); @@ -303,8 +329,8 @@ impl Query { ), ExecuteError, > { - let df = QUERY_SESSION - .get_ctx() + let ctx = QUERY_SESSION.get_ctx(); + let df = ctx .execute_logical_plan(self.final_logical_plan(tenant_id)) .await?; let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); @@ -320,14 +346,10 @@ impl Query { return Ok((Either::Left(vec![]), fields)); } - let plan = QUERY_SESSION - .get_ctx() - .state() - .create_physical_plan(df.logical_plan()) - .await?; + let plan = ctx.state().create_physical_plan(df.logical_plan()).await?; let results = if !is_streaming { - let task_ctx = QUERY_SESSION.get_ctx().task_ctx(); + let task_ctx = ctx.task_ctx(); let batches = collect_partitioned(plan.clone(), task_ctx.clone()) .await? @@ -343,7 +365,7 @@ impl Query { Either::Left(batches) } else { - let task_ctx = QUERY_SESSION.get_ctx().task_ctx(); + let task_ctx = ctx.task_ctx(); let output_partitions = plan.output_partitioning().partition_count(); diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 387a6e3d5..1b5ef0db3 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -698,7 +698,7 @@ pub enum PartialTimeFilter { } impl PartialTimeFilter { - fn try_from_expr(expr: &Expr, time_partition: &Option) -> Option { + pub fn try_from_expr(expr: &Expr, time_partition: &Option) -> Option { let Expr::BinaryExpr(binexpr) = expr else { return None; };