Ballista support for datafusion python.
This project is versioned and released independently from the main Ballista project and is intentionally not part of the default Cargo workspace so that it doesn't cause overhead for maintainers of the main Ballista codebase.
Important
Current approach is to support datafusion python API, there are know limitations of current approach, with some cases producing errors.
We are trying to come up with the best approach to support ballista python interface.
More details could be found at #1142
Creates a new context which connects to a Ballista scheduler process.
from datafusion import col, lit
from datafusion import DataFrame
# we do not need datafusion context
# it will be replaced by BallistaSessionContext
# from datafusion import SessionContext
from ballista import BallistaSessionContext
# Change from:
#
# ctx = SessionContext()
#
# to:
ctx = BallistaSessionContext("df://localhost:50050")
# all other functions and functions are from
# datafusion module
ctx.sql("create external table t stored as parquet location './testdata/test.parquet'")
df : DataFrame = ctx.sql("select * from t limit 5")
df.show()Known limitations and inefficiencies of the current approach:
- The client's
SessionConfigis not propagated to Ballista. - Ballista-specific configuration cannot be set.
- Anything requiring custom
datafusion_proto::logical_plan::LogicalExtensionCodec. - No support for
UDFas DataFusion Python does not serialise them. - A Ballista connection will be created for each request.
ctx = BallistaSessionContext("df://localhost:50050")
df = ctx.read_parquet('./testdata/test.parquet').filter(col(id) > lit(4)).limit(5)
pyarrow_batches = df.collect()Check DataFusion python provides more examples and manuals.
Scheduler and executors can be configured and started from python code.
To start scheduler:
from ballista import BallistaScheduler
scheduler = BallistaScheduler()
scheduler.start()
scheduler.wait_for_termination()For executor:
from ballista import BallistaExecutor
executor = BallistaExecutor()
executor.start()
executor.wait_for_termination()Detailed development process explanation can be found in datafusion python documentation. Improving build speed section can be relevant.
python3 -m venv .venv
source .venv/bin/activate
pip3 install -r requirements.txtuv sync --dev --no-install-package ballistamaturin developNote that you can also run maturin develop --release to get a release build locally.
uv run --no-project maturin develop --uvOr uv run --no-project maturin build --release --strip to get a release build.
python3 -m pytestuv run --no-project pytest