Engineering 15 min read

Extending Apache DataFusion: Custom Table Providers and Physical Optimizers

A practical guide to extending DataFusion with custom data source integrations and optimizer rules. Lessons learned building distributed analytics on DataFusion and Ballista, including the patterns that work and the gotchas that don't.

By Thomas Santerre
datafusion rust query-engines distributed-systems ballista
DataFusion Query Pipeline: SQL to Logical Plan to Physical Plan to Execution to ResultsSQLQueryLogical PlanWhat to computePhysical PlanHow to computePhysicalOptimizerExecutionYour data sourceTableProviderResultsRecordBatches↑ Your extension points

Apache DataFusion is a fast, extensible query engine written in Rust. It powers analytics systems at companies like InfluxData, Databend, and Coralogix, and forms the foundation of projects like Apache Ballista for distributed execution.

At Catalyzed, we’re building a distributed analytics platform that unifies large-scale public datasets with customer-managed data. We chose DataFusion because of its extensibility—we needed to integrate custom data sources, implement format-specific optimizations, and distribute execution across a cluster.

This tutorial shares what we’ve learned. We’ll cover:

  1. Custom Table Providers - connecting DataFusion to your data sources
  2. Custom Execution Plans - implementing efficient data access patterns
  3. Physical Optimizer Rules - transforming execution plans for better performance
  4. Distributed Execution - serializing custom plans for Ballista
  5. Context Propagation - carrying metadata through distributed execution

The examples are simplified for clarity but reflect real patterns from production systems. We’re targeting DataFusion 50+ (the patterns apply broadly across recent versions).

Understanding DataFusion’s Architecture

Before diving into extensions, let’s understand how DataFusion processes queries:

SQL Query

Logical Plan (what to compute)
    ↓ Logical Optimizers
Optimized Logical Plan
    ↓ Physical Planner
Physical Plan (how to compute)
    ↓ Physical Optimizers
Optimized Physical Plan
    ↓ Execution
Results

Table Providers participate during logical planning—they describe what data is available and how to access it. Physical Optimizer Rules run after the physical plan is created, transforming it for better performance.


1. Custom Table Providers

A TableProvider is DataFusion’s abstraction for a table. It defines the schema, provides statistics for optimizer decisions, and creates execution plans for reading data.

The TableProvider Trait

use std::sync::Arc;
use async_trait::async_trait;
use arrow::datatypes::SchemaRef;
use datafusion::catalog::Session;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::logical_expr::TableProviderFilterPushDown;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::Result;
use datafusion_expr::Expr;

pub struct MyTableProvider {
    uri: String,
    schema: SchemaRef,
}

#[async_trait]
impl TableProvider for MyTableProvider {
    /// Required: return self as Any for downcasting
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    /// Required: return the table schema
    fn schema(&self) -> SchemaRef {
        self.schema.clone()
    }

    /// Required: return the table type
    fn table_type(&self) -> TableType {
        TableType::Base
    }

    /// Required: create an ExecutionPlan for reading data
    async fn scan(
        &self,
        state: &dyn Session,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // Create an execution plan that reads from your data source
        Ok(Arc::new(MyTableScanExec::new(
            self.uri.clone(),
            self.schema.clone(),
            projection.cloned(),
            filters.to_vec(),
            limit,
        )))
    }
}

The key method is scan(). DataFusion calls this with:

  • projection: which columns to read (indices into schema)
  • filters: predicates that could be pushed down
  • limit: optional row limit from the query

Providing Statistics

Statistics help DataFusion’s optimizer make better decisions about join ordering, aggregation strategies, and more:

use datafusion_common::{Statistics, stats::Precision};

impl TableProvider for MyTableProvider {
    fn statistics(&self) -> Option<Statistics> {
        // Return what you know - even estimates help!
        Some(Statistics {
            num_rows: Precision::Inexact(1_000_000), // ~1M rows
            total_byte_size: Precision::Inexact(500_000_000), // ~500MB
            column_statistics: vec![], // Per-column stats if available
        })
    }
}

Key insight: Use the right precision level:

  • Precision::Exact(n) - you’re certain of this value
  • Precision::Inexact(n) - this is an estimate (optimizer still benefits)
  • Precision::Absent - you have no idea (different from zero!)

Wrong exact statistics lead to bad query plans. When in doubt, use Inexact.

Filter Pushdown

Filter pushdown tells DataFusion whether your data source can apply filters natively:

impl TableProvider for MyTableProvider {
    fn supports_filters_pushdown(
        &self,
        filters: &[&Expr],
    ) -> Result<Vec<TableProviderFilterPushDown>> {
        filters.iter().map(|filter| {
            if self.can_push_down(filter) {
                // We handle this filter - don't apply it again
                Ok(TableProviderFilterPushDown::Exact)
            } else if self.can_partially_push_down(filter) {
                // We'll try, but DataFusion should verify
                Ok(TableProviderFilterPushDown::Inexact)
            } else {
                // We can't handle this - DataFusion must apply it
                Ok(TableProviderFilterPushDown::Unsupported)
            }
        }).collect()
    }
}

Practical tip: Start with Unsupported for all filters. Add pushdown support incrementally as you identify which filters your data source handles efficiently.


2. Custom Execution Plans

The ExecutionPlan trait defines how data is actually read. This is where you implement your data access logic.

The ExecutionPlan Trait

use std::sync::Arc;
use arrow::datatypes::SchemaRef;
use datafusion::physical_plan::{
    ExecutionPlan, PlanProperties, ExecutionMode, Partitioning,
    DisplayAs, DisplayFormatType,
};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion_common::Result;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream;

#[derive(Debug)]
pub struct MyTableScanExec {
    uri: String,
    schema: SchemaRef,
    projection: Option<Vec<usize>>,
    properties: PlanProperties,
}

impl MyTableScanExec {
    pub fn new(
        uri: String,
        schema: SchemaRef,
        projection: Option<Vec<usize>>,
        _filters: Vec<Expr>,
        _limit: Option<usize>,
    ) -> Self {
        // Define execution properties
        let properties = PlanProperties::new(
            EquivalenceProperties::new(schema.clone()),
            Partitioning::UnknownPartitioning(1), // Single partition
            ExecutionMode::Bounded, // Finite data
        );

        Self { uri, schema, projection, properties }
    }
}

impl ExecutionPlan for MyTableScanExec {
    fn name(&self) -> &str {
        "MyTableScanExec"
    }

    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn schema(&self) -> SchemaRef {
        self.schema.clone()
    }

    fn properties(&self) -> &PlanProperties {
        &self.properties
    }

    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
        vec![] // Leaf node - no children
    }

    fn with_new_children(
        self: Arc<Self>,
        children: Vec<Arc<dyn ExecutionPlan>>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        if children.is_empty() {
            Ok(self)
        } else {
            Err(datafusion_common::DataFusionError::Internal(
                "MyTableScanExec is a leaf node".to_string()
            ))
        }
    }

    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>,
    ) -> Result<SendableRecordBatchStream> {
        // This is where data reading happens
        // We'll cover the lazy initialization pattern next
        todo!()
    }
}

impl DisplayAs for MyTableScanExec {
    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match t {
            DisplayFormatType::Default | DisplayFormatType::Verbose => {
                write!(f, "MyTableScanExec: uri={}", self.uri)
            }
        }
    }
}

Lazy Initialization Pattern

Critical insight: Don’t open connections or load data in the constructor. The execution plan might be serialized and sent to remote executors (in distributed systems), or it might be inspected for explain plans without ever executing.

Instead, use a state machine stream that initializes on first poll:

use std::pin::Pin;
use std::task::{Context, Poll};
use futures::{Stream, Future, ready};
use arrow::record_batch::RecordBatch;
use datafusion::physical_plan::SendableRecordBatchStream;

enum StreamState {
    /// Initial state - haven't opened data source yet
    Uninitialized {
        uri: String,
        schema: SchemaRef,
    },
    /// Opening data source (async operation in progress)
    Initializing {
        future: Pin<Box<dyn Future<Output = Result<DataSourceHandle>> + Send>>,
        schema: SchemaRef,
    },
    /// Actively streaming data
    Streaming {
        inner: Pin<Box<dyn Stream<Item = Result<RecordBatch>> + Send>>,
    },
    /// Terminal state
    Done,
}

pub struct LazyTableStream {
    state: StreamState,
}

impl Stream for LazyTableStream {
    type Item = Result<RecordBatch>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        loop {
            match &mut self.state {
                StreamState::Uninitialized { uri, schema } => {
                    // Start async initialization
                    let uri = uri.clone();
                    let future = Box::pin(async move {
                        open_data_source(&uri).await
                    });
                    self.state = StreamState::Initializing {
                        future,
                        schema: schema.clone(),
                    };
                }
                StreamState::Initializing { future, schema } => {
                    // Poll the initialization future
                    let handle = ready!(future.as_mut().poll(cx))?;
                    let stream = handle.into_stream(schema.clone());
                    self.state = StreamState::Streaming {
                        inner: Box::pin(stream),
                    };
                }
                StreamState::Streaming { inner } => {
                    // Forward to inner stream
                    match ready!(inner.as_mut().poll_next(cx)) {
                        Some(batch) => return Poll::Ready(Some(batch)),
                        None => {
                            self.state = StreamState::Done;
                            return Poll::Ready(None);
                        }
                    }
                }
                StreamState::Done => return Poll::Ready(None),
            }
        }
    }
}

This pattern ensures:

  1. No I/O during plan construction
  2. Resources are allocated only when execution actually happens
  3. Proper async handling without blocking
Lazy initialization state machine: Uninitialized → Initializing → Streaming → Done

3. Physical Optimizer Rules

Physical optimizer rules transform execution plans after they’re created. This is where you can inject format-specific optimizations.

The PhysicalOptimizerRule Trait

use std::sync::Arc;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::Result;
use datafusion_common::tree_node::{Transformed, TreeNode};

#[derive(Debug, Default)]
pub struct MyCacheOptimizer;

impl PhysicalOptimizerRule for MyCacheOptimizer {
    fn optimize(
        &self,
        plan: Arc<dyn ExecutionPlan>,
        _config: &ConfigOptions,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // transform_down visits nodes from root to leaves
        plan.transform_down(|node| self.optimize_node(node)).data()
    }

    fn name(&self) -> &str {
        "MyCacheOptimizer"
    }

    fn schema_check(&self) -> bool {
        true // Enable schema validation after optimization
    }
}

Tree Traversal and Pattern Matching

The key is using transform_down (or transform_up) with pattern matching:

impl MyCacheOptimizer {
    fn optimize_node(
        &self,
        plan: Arc<dyn ExecutionPlan>,
    ) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
        // Try to downcast to our specific node type
        let Some(scan) = plan.as_any().downcast_ref::<MyTableScanExec>() else {
            // Not our node type - return unchanged
            return Ok(Transformed::no(plan));
        };

        // Check if optimization applies
        if !self.should_use_cache(scan) {
            return Ok(Transformed::no(plan));
        }

        // Create optimized replacement
        let cached_scan = Arc::new(CachedTableScanExec::new(
            scan.uri().to_string(),
            scan.schema(),
        ));

        // Return the replacement, marking as transformed
        Ok(Transformed::yes(cached_scan))
    }

    fn should_use_cache(&self, scan: &MyTableScanExec) -> bool {
        // Example: use cache for small tables
        if let Some(stats) = scan.statistics() {
            if let Precision::Exact(rows) | Precision::Inexact(rows) = stats.num_rows {
                return rows < 10_000; // Cache tables under 10K rows
            }
        }
        false
    }
}

Registering Your Optimizer

Optimizers are registered when building the SessionState:

use datafusion::execution::SessionStateBuilder;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
use datafusion::prelude::SessionConfig;

fn create_session_with_optimizer() -> datafusion::execution::SessionState {
    let config = SessionConfig::new();

    // Get default optimizer rules
    let mut rules = PhysicalOptimizer::new().rules;

    // Insert before SanityCheckPlan (the last rule)
    // This ensures all other optimizations have run first
    let insert_pos = rules.len().saturating_sub(1);
    rules.insert(insert_pos, Arc::new(MyCacheOptimizer::default()));

    SessionStateBuilder::new()
        .with_config(config)
        .with_default_features()
        .with_physical_optimizer_rules(rules)
        .build()
}

Important: Optimizer ordering matters. Insert custom rules before SanityCheckPlan so your transformations are validated.


4. Distributed Execution with Ballista

When using Ballista for distributed execution, execution plans must be serialized and sent to worker nodes. Custom plans require custom codecs.

The Serialization Challenge

Ballista serializes plans using Protocol Buffers. The challenge: your custom ExecutionPlan contains Rust types that can’t be directly serialized—database connections, file handles, Arc references to shared state.

The solution is metadata-only serialization: serialize only the information needed to reconstruct the plan on the executor.

Metadata-only serialization: Scheduler encodes metadata, executor reconstructs with factory injection

The Metadata-Only Pattern

Never serialize runtime objects. Instead, serialize URIs, schemas, and configuration:

Scheduler side:
  MyTableScanExec {
    connection: Arc<DbConnection>,  // NOT serializable
    uri: "postgres://...",          // Serializable
    schema: SchemaRef,              // Serializable
  }
      ↓ Encode
  Protobuf { uri: "postgres://...", schema: {...} }
      ↓ Network
Executor side:
  Protobuf { uri: "postgres://...", schema: {...} }
      ↓ Decode (with injected factory)
  MyTableScanExec {
    connection: factory.connect(&uri),  // Reconstructed!
    uri: "postgres://...",
    schema: SchemaRef,
  }

The Factory Injection Pattern

Codecs hold factories that create runtime resources during deserialization:

use std::sync::Arc;
use datafusion::execution::FunctionRegistry;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;

pub struct MyPhysicalCodec {
    /// Factory for creating connections on executors
    connection_factory: Arc<dyn ConnectionFactory>,
}

impl MyPhysicalCodec {
    pub fn new(factory: Arc<dyn ConnectionFactory>) -> Self {
        Self { connection_factory: factory }
    }
}

impl PhysicalExtensionCodec for MyPhysicalCodec {
    fn try_decode(
        &self,
        buf: &[u8],
        inputs: &[Arc<dyn ExecutionPlan>],
        _registry: &dyn FunctionRegistry,
    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
        // Decode protobuf
        let proto = MyProto::decode(buf)?;

        // Reconstruct with injected factory
        Ok(Arc::new(MyTableScanExec::new(
            proto.uri,
            decode_schema(&proto.schema)?,
            self.connection_factory.clone(), // Factory injected here!
        )))
    }

    fn try_encode(
        &self,
        node: Arc<dyn ExecutionPlan>,
        buf: &mut Vec<u8>,
    ) -> datafusion_common::Result<()> {
        let scan = node.as_any().downcast_ref::<MyTableScanExec>()
            .ok_or_else(|| DataFusionError::Internal("Not MyTableScanExec".into()))?;

        // Encode only metadata - NOT the connection
        let proto = MyProto {
            uri: scan.uri().to_string(),
            schema: encode_schema(scan.schema())?,
        };
        proto.encode(buf)?;
        Ok(())
    }
}

Chain-of-Responsibility Codec Pattern

When you have multiple custom plan types, use a composite codec that tries each format:

pub trait FormatPhysicalCodec: Send + Sync + std::fmt::Debug {
    /// Try to decode - returns Ok(Some(plan)) if handled, Ok(None) to pass
    fn try_decode(
        &self,
        buf: &[u8],
        inputs: &[Arc<dyn ExecutionPlan>],
        registry: &dyn FunctionRegistry,
    ) -> Result<Option<Arc<dyn ExecutionPlan>>>;

    /// Try to encode - returns Ok(true) if handled, Ok(false) to pass
    fn try_encode(
        &self,
        plan: Arc<dyn ExecutionPlan>,
        buf: &mut Vec<u8>,
    ) -> Result<bool>;

    fn name(&self) -> &str;
}

pub struct CompositePhysicalCodec {
    format_codecs: Vec<Arc<dyn FormatPhysicalCodec>>,
    ballista_codec: BallistaPhysicalExtensionCodec,
}

impl PhysicalExtensionCodec for CompositePhysicalCodec {
    fn try_decode(
        &self,
        buf: &[u8],
        inputs: &[Arc<dyn ExecutionPlan>],
        registry: &dyn FunctionRegistry,
    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
        // Try each format codec in order
        for codec in &self.format_codecs {
            match codec.try_decode(buf, inputs, registry) {
                Ok(Some(plan)) => return Ok(plan),  // Handled!
                Ok(None) => continue,                // Not this format
                Err(e) => return Err(e.into()),     // Recognized but failed
            }
        }

        // Fall back to Ballista's default codec
        self.ballista_codec.try_decode(buf, inputs, registry)
    }

    fn try_encode(
        &self,
        node: Arc<dyn ExecutionPlan>,
        buf: &mut Vec<u8>,
    ) -> datafusion_common::Result<()> {
        for codec in &self.format_codecs {
            match codec.try_encode(node.clone(), buf) {
                Ok(true) => return Ok(()),   // Handled!
                Ok(false) => continue,       // Not this type
                Err(e) => return Err(e.into()),
            }
        }

        self.ballista_codec.try_encode(node, buf)
    }
}

This pattern enables clean separation: each format owns its serialization logic, and new formats can be added without modifying existing code.


5. Context Propagation (The Gotcha That Will Bite You)

When building distributed systems, you often need to carry metadata through execution—query IDs for logging, cache settings, authentication tokens. DataFusion’s ConfigExtension system enables this, but there’s a critical gotcha.

The Problem

DataFusion’s config extensions are serialized as key-value pairs and sent to executors. But here’s what we discovered the hard way:

If the executor doesn’t pre-register the extension type, the values silently disappear.

Context propagation: Without pre-registration values are silently dropped; with pre-registration they're preserved

Here’s the flow:

  1. Gateway sets QueryContext { query_id: "abc123" }
  2. Ballista serializes as "query_context.query_id" = "abc123"
  3. Executor receives key-value pairs
  4. Executor calls ConfigOptions::set("query_context.query_id", "abc123")
  5. set() looks for an extension with prefix "query_context"
  6. If not found, the value is silently dropped

The Solution

Both scheduler AND executor must pre-register extensions with default values:

// In your executor/scheduler configuration:
override_config_producer: Some(Arc::new(|| {
    SessionConfig::new_with_ballista()
        .with_option_extension(QueryContext::default()) // REQUIRED!
})),

Implementing ConfigExtension

There’s another subtle gotcha in the implementation:

use datafusion::config::{ConfigEntry, ConfigExtension, ExtensionOptions};

#[derive(Debug, Clone)]
pub struct QueryContext {
    pub query_id: String,
    pub cache_enabled: bool,
}

impl ConfigExtension for QueryContext {
    const PREFIX: &'static str = "query_context";
}

impl ExtensionOptions for QueryContext {
    fn as_any(&self) -> &dyn std::any::Any { self }
    fn as_any_mut(&mut self) -> &mut dyn std::any::Any { self }
    fn cloned(&self) -> Box<dyn ExtensionOptions> { Box::new(self.clone()) }

    fn set(&mut self, key: &str, value: &str) -> datafusion_common::Result<()> {
        // IMPORTANT: key does NOT include prefix here!
        // ConfigOptions::set() strips the prefix before calling this
        match key {
            "query_id" => {
                self.query_id = value.to_string();
                Ok(())
            }
            "cache_enabled" => {
                self.cache_enabled = value.parse().map_err(|e| {
                    DataFusionError::Configuration(format!("Invalid bool: {}", e))
                })?;
                Ok(())
            }
            _ => Err(DataFusionError::Configuration(
                format!("Unknown key: {}", key)
            )),
        }
    }

    fn entries(&self) -> Vec<ConfigEntry> {
        // IMPORTANT: keys MUST include prefix here!
        // ConfigOptions::entries() does NOT add prefix when collecting
        vec![
            ConfigEntry {
                key: format!("{}.query_id", Self::PREFIX),
                value: Some(self.query_id.clone()),
                description: "Query identifier for distributed tracing",
            },
            ConfigEntry {
                key: format!("{}.cache_enabled", Self::PREFIX),
                value: Some(self.cache_enabled.to_string()),
                description: "Whether caching is enabled for this query",
            },
        ]
    }
}

The asymmetry is confusing but intentional:

  • entries() returns fully-qualified keys (query_context.query_id)
  • set() receives stripped keys (query_id)

Extracting Context in Execution

In your execute() method, extract context from TaskContext:

fn execute(
    &self,
    partition: usize,
    context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
    // Extract query context for logging/metrics
    let query_id = context
        .session_config()
        .options()
        .extensions
        .get::<QueryContext>()
        .map(|ctx| ctx.query_id.clone())
        .unwrap_or_else(|| "unknown".to_string());

    tracing::info!(query_id = %query_id, "Starting execution");

    // ... rest of execution
}

6. Lessons Learned

When NOT to Write Custom Extensions

Before writing a custom extension, check if DataFusion’s built-in functionality can solve your problem:

  • Parquet, CSV, JSON, Avro: Excellent built-in support
  • Object storage: Use object_store crate with DataFusion’s registration
  • Simple transforms: Use UDFs/UDAFs instead of custom operators
  • Standard SQL operations: DataFusion’s implementations are highly optimized

Write custom extensions when:

  • Your data source has unique access patterns (native indexes, proprietary protocols)
  • You need to push operations down to external systems
  • You have domain-specific optimizations that DataFusion can’t infer

Common Pitfalls

1. Forgetting schema_check() Always return true from schema_check() in optimizer rules. It catches bugs where your transformation produces plans with mismatched schemas.

2. Blocking in async contexts TableProvider::scan() is async. Don’t call blocking I/O—use async libraries or spawn_blocking.

3. Wrong statistics precision Use Precision::Inexact for estimates. Wrong Exact statistics cause worse plans than no statistics.

4. Serializing runtime objects Never serialize connections, file handles, or Arc references. Serialize URIs and reconstruct on the executor.

5. Silent config propagation failures Pre-register all ConfigExtension types on both scheduler and executor. Test round-trip serialization explicitly.

6. Ignoring child plans in optimizers When rewriting a node, handle its inputs correctly. Use transform_down or transform_up to visit the entire tree.

Testing Strategies

Unit test optimizers with mock plans:

#[test]
fn optimizer_ignores_unrelated_plans() {
    let optimizer = MyCacheOptimizer::default();
    let empty = Arc::new(EmptyExec::new(Arc::new(Schema::empty())));

    let result = optimizer
        .optimize(empty.clone(), &ConfigOptions::default())
        .unwrap();

    // Should return unchanged
    assert!(result.as_any().is::<EmptyExec>());
}

Test config round-trips explicitly:

#[test]
fn test_context_round_trip() {
    let original = SessionConfig::new_with_ballista()
        .with_option_extension(QueryContext::new("test-123"));

    let pairs = original.to_key_value_pairs();

    // Target MUST have extension registered
    let target = SessionConfig::new_with_ballista()
        .with_option_extension(QueryContext::default());

    let restored = target.update_from_key_value_pair(&pairs);

    let ctx = restored.options().extensions.get::<QueryContext>().unwrap();
    assert_eq!(ctx.query_id, "test-123");
}

Integration test with temporary data:

#[tokio::test]
async fn test_end_to_end() {
    let temp_dir = tempfile::tempdir().unwrap();
    create_test_data(&temp_dir).await;

    let ctx = create_session_with_extensions();
    ctx.register_table("test", create_provider(&temp_dir)).unwrap();

    let result = ctx.sql("SELECT * FROM test WHERE id = 42")
        .await.unwrap()
        .collect().await.unwrap();

    assert_eq!(result.len(), 1);
}

Conclusion

DataFusion provides powerful extension points for building custom query systems:

  • TableProvider connects your data sources with proper statistics and filter pushdown
  • ExecutionPlan with lazy initialization keeps plans serializable and efficient
  • PhysicalOptimizerRule enables format-specific optimizations
  • PhysicalExtensionCodec with factory injection enables distributed execution
  • ConfigExtension propagates context—if you register it correctly

Start simple. Build a basic TableProvider without filter pushdown or statistics. Add complexity as you understand what your queries need.

The patterns in this tutorial reflect lessons from building production systems. Some we learned from documentation, others from debugging obscure failures at 2 AM. We hope they save you some late nights.


Resources

Have questions about building on DataFusion? Find us in the DataFusion Discord or get in touch.