Extending Apache DataFusion: Custom Table Providers and Physical Optimizers
A practical guide to extending DataFusion with custom data source integrations and optimizer rules. Lessons learned building distributed analytics on DataFusion and Ballista, including the patterns that work and the gotchas that don't.
Apache DataFusion is a fast, extensible query engine written in Rust. It powers analytics systems at companies like InfluxData, Databend, and Coralogix, and forms the foundation of projects like Apache Ballista for distributed execution.
At Catalyzed, we’re building a distributed analytics platform that unifies large-scale public datasets with customer-managed data. We chose DataFusion because of its extensibility—we needed to integrate custom data sources, implement format-specific optimizations, and distribute execution across a cluster.
This tutorial shares what we’ve learned. We’ll cover:
- Custom Table Providers - connecting DataFusion to your data sources
- Custom Execution Plans - implementing efficient data access patterns
- Physical Optimizer Rules - transforming execution plans for better performance
- Distributed Execution - serializing custom plans for Ballista
- Context Propagation - carrying metadata through distributed execution
The examples are simplified for clarity but reflect real patterns from production systems. We’re targeting DataFusion 50+ (the patterns apply broadly across recent versions).
Understanding DataFusion’s Architecture
Before diving into extensions, let’s understand how DataFusion processes queries:
SQL Query
↓
Logical Plan (what to compute)
↓ Logical Optimizers
Optimized Logical Plan
↓ Physical Planner
Physical Plan (how to compute)
↓ Physical Optimizers
Optimized Physical Plan
↓ Execution
Results
Table Providers participate during logical planning—they describe what data is available and how to access it. Physical Optimizer Rules run after the physical plan is created, transforming it for better performance.
1. Custom Table Providers
A TableProvider is DataFusion’s abstraction for a table. It defines the schema, provides statistics for optimizer decisions, and creates execution plans for reading data.
The TableProvider Trait
use std::sync::Arc;
use async_trait::async_trait;
use arrow::datatypes::SchemaRef;
use datafusion::catalog::Session;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::logical_expr::TableProviderFilterPushDown;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::Result;
use datafusion_expr::Expr;
pub struct MyTableProvider {
uri: String,
schema: SchemaRef,
}
#[async_trait]
impl TableProvider for MyTableProvider {
/// Required: return self as Any for downcasting
fn as_any(&self) -> &dyn std::any::Any {
self
}
/// Required: return the table schema
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
/// Required: return the table type
fn table_type(&self) -> TableType {
TableType::Base
}
/// Required: create an ExecutionPlan for reading data
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Create an execution plan that reads from your data source
Ok(Arc::new(MyTableScanExec::new(
self.uri.clone(),
self.schema.clone(),
projection.cloned(),
filters.to_vec(),
limit,
)))
}
}
The key method is scan(). DataFusion calls this with:
- projection: which columns to read (indices into schema)
- filters: predicates that could be pushed down
- limit: optional row limit from the query
Providing Statistics
Statistics help DataFusion’s optimizer make better decisions about join ordering, aggregation strategies, and more:
use datafusion_common::{Statistics, stats::Precision};
impl TableProvider for MyTableProvider {
fn statistics(&self) -> Option<Statistics> {
// Return what you know - even estimates help!
Some(Statistics {
num_rows: Precision::Inexact(1_000_000), // ~1M rows
total_byte_size: Precision::Inexact(500_000_000), // ~500MB
column_statistics: vec![], // Per-column stats if available
})
}
}
Key insight: Use the right precision level:
Precision::Exact(n)- you’re certain of this valuePrecision::Inexact(n)- this is an estimate (optimizer still benefits)Precision::Absent- you have no idea (different from zero!)
Wrong exact statistics lead to bad query plans. When in doubt, use Inexact.
Filter Pushdown
Filter pushdown tells DataFusion whether your data source can apply filters natively:
impl TableProvider for MyTableProvider {
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
filters.iter().map(|filter| {
if self.can_push_down(filter) {
// We handle this filter - don't apply it again
Ok(TableProviderFilterPushDown::Exact)
} else if self.can_partially_push_down(filter) {
// We'll try, but DataFusion should verify
Ok(TableProviderFilterPushDown::Inexact)
} else {
// We can't handle this - DataFusion must apply it
Ok(TableProviderFilterPushDown::Unsupported)
}
}).collect()
}
}
Practical tip: Start with Unsupported for all filters. Add pushdown support incrementally as you identify which filters your data source handles efficiently.
2. Custom Execution Plans
The ExecutionPlan trait defines how data is actually read. This is where you implement your data access logic.
The ExecutionPlan Trait
use std::sync::Arc;
use arrow::datatypes::SchemaRef;
use datafusion::physical_plan::{
ExecutionPlan, PlanProperties, ExecutionMode, Partitioning,
DisplayAs, DisplayFormatType,
};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion_common::Result;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream;
#[derive(Debug)]
pub struct MyTableScanExec {
uri: String,
schema: SchemaRef,
projection: Option<Vec<usize>>,
properties: PlanProperties,
}
impl MyTableScanExec {
pub fn new(
uri: String,
schema: SchemaRef,
projection: Option<Vec<usize>>,
_filters: Vec<Expr>,
_limit: Option<usize>,
) -> Self {
// Define execution properties
let properties = PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::UnknownPartitioning(1), // Single partition
ExecutionMode::Bounded, // Finite data
);
Self { uri, schema, projection, properties }
}
}
impl ExecutionPlan for MyTableScanExec {
fn name(&self) -> &str {
"MyTableScanExec"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![] // Leaf node - no children
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
Ok(self)
} else {
Err(datafusion_common::DataFusionError::Internal(
"MyTableScanExec is a leaf node".to_string()
))
}
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
// This is where data reading happens
// We'll cover the lazy initialization pattern next
todo!()
}
}
impl DisplayAs for MyTableScanExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "MyTableScanExec: uri={}", self.uri)
}
}
}
}
Lazy Initialization Pattern
Critical insight: Don’t open connections or load data in the constructor. The execution plan might be serialized and sent to remote executors (in distributed systems), or it might be inspected for explain plans without ever executing.
Instead, use a state machine stream that initializes on first poll:
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::{Stream, Future, ready};
use arrow::record_batch::RecordBatch;
use datafusion::physical_plan::SendableRecordBatchStream;
enum StreamState {
/// Initial state - haven't opened data source yet
Uninitialized {
uri: String,
schema: SchemaRef,
},
/// Opening data source (async operation in progress)
Initializing {
future: Pin<Box<dyn Future<Output = Result<DataSourceHandle>> + Send>>,
schema: SchemaRef,
},
/// Actively streaming data
Streaming {
inner: Pin<Box<dyn Stream<Item = Result<RecordBatch>> + Send>>,
},
/// Terminal state
Done,
}
pub struct LazyTableStream {
state: StreamState,
}
impl Stream for LazyTableStream {
type Item = Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match &mut self.state {
StreamState::Uninitialized { uri, schema } => {
// Start async initialization
let uri = uri.clone();
let future = Box::pin(async move {
open_data_source(&uri).await
});
self.state = StreamState::Initializing {
future,
schema: schema.clone(),
};
}
StreamState::Initializing { future, schema } => {
// Poll the initialization future
let handle = ready!(future.as_mut().poll(cx))?;
let stream = handle.into_stream(schema.clone());
self.state = StreamState::Streaming {
inner: Box::pin(stream),
};
}
StreamState::Streaming { inner } => {
// Forward to inner stream
match ready!(inner.as_mut().poll_next(cx)) {
Some(batch) => return Poll::Ready(Some(batch)),
None => {
self.state = StreamState::Done;
return Poll::Ready(None);
}
}
}
StreamState::Done => return Poll::Ready(None),
}
}
}
}
This pattern ensures:
- No I/O during plan construction
- Resources are allocated only when execution actually happens
- Proper async handling without blocking
3. Physical Optimizer Rules
Physical optimizer rules transform execution plans after they’re created. This is where you can inject format-specific optimizations.
The PhysicalOptimizerRule Trait
use std::sync::Arc;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::Result;
use datafusion_common::tree_node::{Transformed, TreeNode};
#[derive(Debug, Default)]
pub struct MyCacheOptimizer;
impl PhysicalOptimizerRule for MyCacheOptimizer {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
// transform_down visits nodes from root to leaves
plan.transform_down(|node| self.optimize_node(node)).data()
}
fn name(&self) -> &str {
"MyCacheOptimizer"
}
fn schema_check(&self) -> bool {
true // Enable schema validation after optimization
}
}
Tree Traversal and Pattern Matching
The key is using transform_down (or transform_up) with pattern matching:
impl MyCacheOptimizer {
fn optimize_node(
&self,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
// Try to downcast to our specific node type
let Some(scan) = plan.as_any().downcast_ref::<MyTableScanExec>() else {
// Not our node type - return unchanged
return Ok(Transformed::no(plan));
};
// Check if optimization applies
if !self.should_use_cache(scan) {
return Ok(Transformed::no(plan));
}
// Create optimized replacement
let cached_scan = Arc::new(CachedTableScanExec::new(
scan.uri().to_string(),
scan.schema(),
));
// Return the replacement, marking as transformed
Ok(Transformed::yes(cached_scan))
}
fn should_use_cache(&self, scan: &MyTableScanExec) -> bool {
// Example: use cache for small tables
if let Some(stats) = scan.statistics() {
if let Precision::Exact(rows) | Precision::Inexact(rows) = stats.num_rows {
return rows < 10_000; // Cache tables under 10K rows
}
}
false
}
}
Registering Your Optimizer
Optimizers are registered when building the SessionState:
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
use datafusion::prelude::SessionConfig;
fn create_session_with_optimizer() -> datafusion::execution::SessionState {
let config = SessionConfig::new();
// Get default optimizer rules
let mut rules = PhysicalOptimizer::new().rules;
// Insert before SanityCheckPlan (the last rule)
// This ensures all other optimizations have run first
let insert_pos = rules.len().saturating_sub(1);
rules.insert(insert_pos, Arc::new(MyCacheOptimizer::default()));
SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.with_physical_optimizer_rules(rules)
.build()
}
Important: Optimizer ordering matters. Insert custom rules before SanityCheckPlan so your transformations are validated.
4. Distributed Execution with Ballista
When using Ballista for distributed execution, execution plans must be serialized and sent to worker nodes. Custom plans require custom codecs.
The Serialization Challenge
Ballista serializes plans using Protocol Buffers. The challenge: your custom ExecutionPlan contains Rust types that can’t be directly serialized—database connections, file handles, Arc references to shared state.
The solution is metadata-only serialization: serialize only the information needed to reconstruct the plan on the executor.
The Metadata-Only Pattern
Never serialize runtime objects. Instead, serialize URIs, schemas, and configuration:
Scheduler side:
MyTableScanExec {
connection: Arc<DbConnection>, // NOT serializable
uri: "postgres://...", // Serializable
schema: SchemaRef, // Serializable
}
↓ Encode
Protobuf { uri: "postgres://...", schema: {...} }
↓ Network
Executor side:
Protobuf { uri: "postgres://...", schema: {...} }
↓ Decode (with injected factory)
MyTableScanExec {
connection: factory.connect(&uri), // Reconstructed!
uri: "postgres://...",
schema: SchemaRef,
}
The Factory Injection Pattern
Codecs hold factories that create runtime resources during deserialization:
use std::sync::Arc;
use datafusion::execution::FunctionRegistry;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
pub struct MyPhysicalCodec {
/// Factory for creating connections on executors
connection_factory: Arc<dyn ConnectionFactory>,
}
impl MyPhysicalCodec {
pub fn new(factory: Arc<dyn ConnectionFactory>) -> Self {
Self { connection_factory: factory }
}
}
impl PhysicalExtensionCodec for MyPhysicalCodec {
fn try_decode(
&self,
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
_registry: &dyn FunctionRegistry,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
// Decode protobuf
let proto = MyProto::decode(buf)?;
// Reconstruct with injected factory
Ok(Arc::new(MyTableScanExec::new(
proto.uri,
decode_schema(&proto.schema)?,
self.connection_factory.clone(), // Factory injected here!
)))
}
fn try_encode(
&self,
node: Arc<dyn ExecutionPlan>,
buf: &mut Vec<u8>,
) -> datafusion_common::Result<()> {
let scan = node.as_any().downcast_ref::<MyTableScanExec>()
.ok_or_else(|| DataFusionError::Internal("Not MyTableScanExec".into()))?;
// Encode only metadata - NOT the connection
let proto = MyProto {
uri: scan.uri().to_string(),
schema: encode_schema(scan.schema())?,
};
proto.encode(buf)?;
Ok(())
}
}
Chain-of-Responsibility Codec Pattern
When you have multiple custom plan types, use a composite codec that tries each format:
pub trait FormatPhysicalCodec: Send + Sync + std::fmt::Debug {
/// Try to decode - returns Ok(Some(plan)) if handled, Ok(None) to pass
fn try_decode(
&self,
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
registry: &dyn FunctionRegistry,
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
/// Try to encode - returns Ok(true) if handled, Ok(false) to pass
fn try_encode(
&self,
plan: Arc<dyn ExecutionPlan>,
buf: &mut Vec<u8>,
) -> Result<bool>;
fn name(&self) -> &str;
}
pub struct CompositePhysicalCodec {
format_codecs: Vec<Arc<dyn FormatPhysicalCodec>>,
ballista_codec: BallistaPhysicalExtensionCodec,
}
impl PhysicalExtensionCodec for CompositePhysicalCodec {
fn try_decode(
&self,
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
registry: &dyn FunctionRegistry,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
// Try each format codec in order
for codec in &self.format_codecs {
match codec.try_decode(buf, inputs, registry) {
Ok(Some(plan)) => return Ok(plan), // Handled!
Ok(None) => continue, // Not this format
Err(e) => return Err(e.into()), // Recognized but failed
}
}
// Fall back to Ballista's default codec
self.ballista_codec.try_decode(buf, inputs, registry)
}
fn try_encode(
&self,
node: Arc<dyn ExecutionPlan>,
buf: &mut Vec<u8>,
) -> datafusion_common::Result<()> {
for codec in &self.format_codecs {
match codec.try_encode(node.clone(), buf) {
Ok(true) => return Ok(()), // Handled!
Ok(false) => continue, // Not this type
Err(e) => return Err(e.into()),
}
}
self.ballista_codec.try_encode(node, buf)
}
}
This pattern enables clean separation: each format owns its serialization logic, and new formats can be added without modifying existing code.
5. Context Propagation (The Gotcha That Will Bite You)
When building distributed systems, you often need to carry metadata through execution—query IDs for logging, cache settings, authentication tokens. DataFusion’s ConfigExtension system enables this, but there’s a critical gotcha.
The Problem
DataFusion’s config extensions are serialized as key-value pairs and sent to executors. But here’s what we discovered the hard way:
If the executor doesn’t pre-register the extension type, the values silently disappear.
Here’s the flow:
- Gateway sets
QueryContext { query_id: "abc123" } - Ballista serializes as
"query_context.query_id" = "abc123" - Executor receives key-value pairs
- Executor calls
ConfigOptions::set("query_context.query_id", "abc123") set()looks for an extension with prefix"query_context"- If not found, the value is silently dropped
The Solution
Both scheduler AND executor must pre-register extensions with default values:
// In your executor/scheduler configuration:
override_config_producer: Some(Arc::new(|| {
SessionConfig::new_with_ballista()
.with_option_extension(QueryContext::default()) // REQUIRED!
})),
Implementing ConfigExtension
There’s another subtle gotcha in the implementation:
use datafusion::config::{ConfigEntry, ConfigExtension, ExtensionOptions};
#[derive(Debug, Clone)]
pub struct QueryContext {
pub query_id: String,
pub cache_enabled: bool,
}
impl ConfigExtension for QueryContext {
const PREFIX: &'static str = "query_context";
}
impl ExtensionOptions for QueryContext {
fn as_any(&self) -> &dyn std::any::Any { self }
fn as_any_mut(&mut self) -> &mut dyn std::any::Any { self }
fn cloned(&self) -> Box<dyn ExtensionOptions> { Box::new(self.clone()) }
fn set(&mut self, key: &str, value: &str) -> datafusion_common::Result<()> {
// IMPORTANT: key does NOT include prefix here!
// ConfigOptions::set() strips the prefix before calling this
match key {
"query_id" => {
self.query_id = value.to_string();
Ok(())
}
"cache_enabled" => {
self.cache_enabled = value.parse().map_err(|e| {
DataFusionError::Configuration(format!("Invalid bool: {}", e))
})?;
Ok(())
}
_ => Err(DataFusionError::Configuration(
format!("Unknown key: {}", key)
)),
}
}
fn entries(&self) -> Vec<ConfigEntry> {
// IMPORTANT: keys MUST include prefix here!
// ConfigOptions::entries() does NOT add prefix when collecting
vec![
ConfigEntry {
key: format!("{}.query_id", Self::PREFIX),
value: Some(self.query_id.clone()),
description: "Query identifier for distributed tracing",
},
ConfigEntry {
key: format!("{}.cache_enabled", Self::PREFIX),
value: Some(self.cache_enabled.to_string()),
description: "Whether caching is enabled for this query",
},
]
}
}
The asymmetry is confusing but intentional:
entries()returns fully-qualified keys (query_context.query_id)set()receives stripped keys (query_id)
Extracting Context in Execution
In your execute() method, extract context from TaskContext:
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
// Extract query context for logging/metrics
let query_id = context
.session_config()
.options()
.extensions
.get::<QueryContext>()
.map(|ctx| ctx.query_id.clone())
.unwrap_or_else(|| "unknown".to_string());
tracing::info!(query_id = %query_id, "Starting execution");
// ... rest of execution
}
6. Lessons Learned
When NOT to Write Custom Extensions
Before writing a custom extension, check if DataFusion’s built-in functionality can solve your problem:
- Parquet, CSV, JSON, Avro: Excellent built-in support
- Object storage: Use
object_storecrate with DataFusion’s registration - Simple transforms: Use UDFs/UDAFs instead of custom operators
- Standard SQL operations: DataFusion’s implementations are highly optimized
Write custom extensions when:
- Your data source has unique access patterns (native indexes, proprietary protocols)
- You need to push operations down to external systems
- You have domain-specific optimizations that DataFusion can’t infer
Common Pitfalls
1. Forgetting schema_check()
Always return true from schema_check() in optimizer rules. It catches bugs where your transformation produces plans with mismatched schemas.
2. Blocking in async contexts
TableProvider::scan() is async. Don’t call blocking I/O—use async libraries or spawn_blocking.
3. Wrong statistics precision
Use Precision::Inexact for estimates. Wrong Exact statistics cause worse plans than no statistics.
4. Serializing runtime objects Never serialize connections, file handles, or Arc references. Serialize URIs and reconstruct on the executor.
5. Silent config propagation failures
Pre-register all ConfigExtension types on both scheduler and executor. Test round-trip serialization explicitly.
6. Ignoring child plans in optimizers
When rewriting a node, handle its inputs correctly. Use transform_down or transform_up to visit the entire tree.
Testing Strategies
Unit test optimizers with mock plans:
#[test]
fn optimizer_ignores_unrelated_plans() {
let optimizer = MyCacheOptimizer::default();
let empty = Arc::new(EmptyExec::new(Arc::new(Schema::empty())));
let result = optimizer
.optimize(empty.clone(), &ConfigOptions::default())
.unwrap();
// Should return unchanged
assert!(result.as_any().is::<EmptyExec>());
}
Test config round-trips explicitly:
#[test]
fn test_context_round_trip() {
let original = SessionConfig::new_with_ballista()
.with_option_extension(QueryContext::new("test-123"));
let pairs = original.to_key_value_pairs();
// Target MUST have extension registered
let target = SessionConfig::new_with_ballista()
.with_option_extension(QueryContext::default());
let restored = target.update_from_key_value_pair(&pairs);
let ctx = restored.options().extensions.get::<QueryContext>().unwrap();
assert_eq!(ctx.query_id, "test-123");
}
Integration test with temporary data:
#[tokio::test]
async fn test_end_to_end() {
let temp_dir = tempfile::tempdir().unwrap();
create_test_data(&temp_dir).await;
let ctx = create_session_with_extensions();
ctx.register_table("test", create_provider(&temp_dir)).unwrap();
let result = ctx.sql("SELECT * FROM test WHERE id = 42")
.await.unwrap()
.collect().await.unwrap();
assert_eq!(result.len(), 1);
}
Conclusion
DataFusion provides powerful extension points for building custom query systems:
- TableProvider connects your data sources with proper statistics and filter pushdown
- ExecutionPlan with lazy initialization keeps plans serializable and efficient
- PhysicalOptimizerRule enables format-specific optimizations
- PhysicalExtensionCodec with factory injection enables distributed execution
- ConfigExtension propagates context—if you register it correctly
Start simple. Build a basic TableProvider without filter pushdown or statistics. Add complexity as you understand what your queries need.
The patterns in this tutorial reflect lessons from building production systems. Some we learned from documentation, others from debugging obscure failures at 2 AM. We hope they save you some late nights.
Resources
- DataFusion Documentation
- DataFusion Examples
- Ballista Documentation
- DataFusion Discord - active community for questions
Have questions about building on DataFusion? Find us in the DataFusion Discord or get in touch.