Qbeast Spark - Getting Started
Quickstart for Qbeast-spark
Welcome! This page will guide you through setting up and running Qbeast-Spark. You’ll learn how to install dependencies, launch a Spark session, configure projects in Java, Scala, or Python, and work with Qbeast tables.
Before You Begin
You can use Qbeast-Spark interactively (via Spark shell) or as a library in a project.
- Interactive: Start the Spark shell (Scala or Python) and run code snippets directly.
- As a Project: Add Qbeast-Spark to a Maven or SBT project and run your own code.
Make sure you have the following:
- Java 8+ (Java 8, 11, or 17) installed and configured.
- SBT, Gradle, or Maven if you’re managing dependencies.
- Apache Spark 3.5+ (with Scala 2.12 support).
Install Java
Refer to the official Apache Spark installation guide to install Java. Set up your PATH
or JAVA_HOME
environment variable.
If you use Windows, see this blog for setup instructions. Make sure to match your Spark version with supported Delta Lake (3.2.0) and Qbeast-Spark (0.7.0).
Install Apache Spark
Download Spark and set the SPARK_HOME
environment variable:
wget https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz
tar -xzvf spark-3.5.2-bin-hadoop3.tgz
export SPARK_HOME=$PWD/spark-3.5.2-bin-hadoop3
ℹ️ You can use Hadoop 2.7, but it may cause issues with some cloud storage providers. See here for more info.
Launch Spark
You can use PySpark, Scala, or SQL shells. Qbeast works with both Delta Lake and Hudi.
Warning: Some cloud providers need specific Spark/Hadoop versions or extra libraries. See Cloud Storages for details.
PySpark
Install PySpark for your Spark version:
pip install pyspark==<compatible-spark-version>
Launch PySpark with Qbeast and Delta Lake:
export QBEAST_SPARK_VERSION=0.8.0-SNAPSHOT
pyspark --packages io.qbeast:qbeast-spark_2.12:$QBEAST_SPARK_VERSION,io.delta:delta-spark_2.12:3.1.0 \
--conf spark.sql.extensions=io.qbeast.sql.DeltaQbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.DeltaQbeastCatalog
Launch PySpark with Hudi:
export QBEAST_SPARK_VERSION=0.8.0-SNAPSHOT
pyspark --repositories https://maven.pkg.github.com/qbeast-io/qbeast-spark-private \
--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:1.0.0,io.qbeast:qbeast-spark_2.12:$QBEAST_SPARK_VERSION \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
--conf spark.sql.extensions=io.qbeast.sql.HudiQbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.HudiQbeastCatalog
Scala
Use the Spark shell from your Spark installation.
For Delta Lake:
export QBEAST_SPARK_VERSION=0.8.0-SNAPSHOT
$SPARK_HOME/bin/spark-shell --repositories https://maven.pkg.github.com/qbeast-io/qbeast-spark-private \
--packages io.delta:delta-spark_2.12:3.1.0,io.qbeast:qbeast-spark_2.12:$QBEAST_SPARK_VERSION \
--conf spark.sql.extensions=io.qbeast.sql.DeltaQbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.DeltaQbeastCatalog
For Hudi:
export QBEAST_SPARK_VERSION=0.8.0-SNAPSHOT
$SPARK_HOME/bin/spark-shell --repositories https://maven.pkg.github.com/qbeast-io/qbeast-spark-private \
--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:1.0.0,io.qbeast:qbeast-spark_2.12:$QBEAST_SPARK_VERSION \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
--conf spark.sql.extensions=io.qbeast.sql.HudiQbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.HudiQbeastCatalog
SQL Shell
You can also use Qbeast from the Spark SQL shell.
For Delta Lake:
export QBEAST_SPARK_VERSION=0.8.0-SNAPSHOT
$SPARK_HOME/bin/spark-sql --repositories https://maven.pkg.github.com/qbeast-io/qbeast-spark-private \
--packages io.delta:delta-spark_2.12:3.1.0,io.qbeast:qbeast-spark_2.12:$QBEAST_SPARK_VERSION \
--conf spark.sql.extensions=io.qbeast.sql.DeltaQbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.DeltaQbeastCatalog
For Hudi:
export QBEAST_SPARK_VERSION=0.8.0-SNAPSHOT
$SPARK_HOME/bin/spark-sql --repositories https://maven.pkg.github.com/qbeast-io/qbeast-spark-private \
--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:1.0.0,io.qbeast:qbeast-spark_2.12:$QBEAST_SPARK_VERSION \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
--conf spark.sql.extensions=io.qbeast.sql.HudiQbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.HudiQbeastCatalog
Advanced Configuration
Fine-tune performance with Qbeast Spark configuration options:
Configuration | Definition | Default |
---|---|---|
spark.qbeast.index.defaultCubeSize | Default cube size for new datasets | 5000000 |
spark.qbeast.index.cubeDomainsBufferCapacity | Buffer capacity for intermediate results | 100000 |
spark.qbeast.index.columnsToIndex.auto | Automatically select columns to index | false |
spark.qbeast.index.columnsToIndex.auto.max | Max columns to index automatically | 10 |
spark.qbeast.index.numberOfRetries | Retries for writing data | 2 |
See the Qbeast-Spark advanced configuration for more.
Set Up a Project
Add Qbeast-Spark to your Java, Scala, or Python project.
Maven
Add this to your pom.xml
:
<dependency>
<groupId>io.qbeast</groupId>
<artifactId>qbeast-spark_2.12</artifactId>
<version>0.7.0</version>
</dependency>
SBT
Add to your build.sbt
:
libraryDependencies += "io.qbeast" %% "qbeast-spark" % "0.8.0-SNAPSHOT"
To use a snapshot, add:
ThisBuild / resolvers += "Sonatype OSS Snapshots" at "https://s01.oss.sonatype.org/content/repositories/snapshots"
Python
Configure your SparkSession:
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("MyApp").getOrCreate()
# For custom config:
spark = pyspark.sql.SparkSession.builder \
.appName("MyApp") \
.config("spark.qbeast.tableFormat", "delta") \
.config("spark.sql.extensions", "io.qbeast.sql.DeltaQbeastSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "io.qbeast.catalog.QbeastCatalog") \
.getOrCreate()
Working with Qbeast Tables
Create and manage tables using Spark DataFrame or SQL APIs.
Create a Table
Python:
data = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c")], "id: int, name:string")
data.write.mode("overwrite").option("columnsToIndex", "id,name").saveAsTable("qbeast_table")
Scala:
val data = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "name")
data.write.mode("overwrite").option("columnsToIndex", "id,name").saveAsTable("qbeast_table")
SQL:
CREATE TABLE qbeast_table (id INT, name STRING)
USING qbeast
OPTIONS ('columnsToIndex'='id,name');
To specify a location:
CREATE TABLE qbeast_table (id INT, name STRING)
USING qbeast
LOCATION '/tmp/qbeast_table'
OPTIONS ('columnsToIndex'='id,name');
Advanced Table Options
Option | Definition | Example |
---|---|---|
columnsToIndex | Columns used to index. Choose columns you’ll frequently query to maximize layout efficieny. | "id,name" |
cubeSize | Max number of elements per cube (soft limit). Default is 5 million. | 1000 |
columnStats | Min/max stats for indexed columns as JSON, skips stats computation. | """{"a_min":0,"a_max":10,"b_min":20.0,"b_max":70.0}""" |
Append Data
Add data to a table or a path in append mode.
Python:
append = spark.createDataFrame([(4, "d"), (5, "e")], "id: int, name:string")
append.write.mode("append").insertInto("qbeast_table")
# Or, to append to a path:
append.write.mode("append").option("columnsToIndex", "id,name").format("qbeast").save("/tmp/qbeast_table")
Scala:
val append = Seq((4, "d"), (5, "e")).toDF("id", "name")
append.write.mode("append").insertInto("qbeast_table")
// Or, to append to a path:
append.write.mode("append").option("columnsToIndex", "id,name").format("qbeast").save("/tmp/qbeast_table")
SQL:
INSERT INTO table qbeast_table VALUES (4, "d"), (5, "e");
Read Data
Read data by specifying the table path or name.
Python:
qbeast_df = spark.read.format("qbeast").load("/tmp/qbeast_table")
Scala:
val qbeastDF = spark.read.format("qbeast").load("/tmp/qbeast_table")
SQL:
SELECT * FROM qbeast_table;
Sampling
Sampling lets you analyze a subset of your data, saving time and compute.
You can sample directly at the storage level using Qbeast’s metadata. This way, Spark reads only the needed fraction, not the whole dataset.
Thanks to the Qbeast Metadata, it is possible to use the
sample
andTABLESAMPLE
(in SQL) methods to select a fraction of the data directly from storage instead of loading and computing the results in memory with all the records.
Python:
qbeast_df.sample(0.3).show()
Scala:
qbeast_df.sample(0.3).show()
SQL:
SELECT * FROM qbeast_table TABLESAMPLE (30 PERCENT);
Check the Spark Web UI to see how sampling pushes down to storage:
qbeastDf.sample(0.3).explain()
QbeastTable API (Scala)
You can get index and layout metrics using the QbeastTable
API:
import io.qbeast.table.QbeastTable
val qbeastTable = QbeastTable.forPath(spark, "/tmp/qbeast_table")
qbeastTable.getIndexMetrics()
Method | Definition |
---|---|
revisionIDs(): List[Long] | All Revision IDs in the table |
latestRevision(): Revision | Latest Revision |
latestRevisionID(): Long | Latest Revision ID |
revision(revisionID: Option[Int]) | Info for a specific or latest revision |
indexedColumns(revisionID: Option[Int]) | Indexed columns for a specific or latest revision |
cubeSize(revisionID: Option[Int]) | Cube size for a specific or latest revision |
getDenormalizedBlocks(revisionID: Option[Int]) | Denormalized Blocks for a revision |
getIndexMetrics(revisionID: Option[Int]) | Index metrics for a specific or latest revision |
Optimize Tables
Optimizing rewrites parts of your table to improve layout and performance.
qbeastTable.optimize() // Optimize the latest revision
qbeastTable.optimize(2L) // Optimize a specific revision
qbeastTable.optimize(Seq("file1", "file2")) // Optimize specific files
To optimize the full table across all revisions:
val revisions = qbeastTable.revisionIDs()
revisions.foreach(revision => qbeastTable.optimize(revision))
See the QbeastTable documentation for more.
Deletes
Caution: Deleting data with the Delta Lake API will leave the Qbeast index in an inconsistent state. See issue #327.
Delete rows using the Delta Lake API:
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/qbeast/people-10m")
deltaTable.delete("birthDate < '1955-01-01'")
Or with SQL functions:
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.delete(col("birthDate") < "1955-01-01")
Table Tolerance (Experimental)
You can specify a tolerance and let Qbeast compute the sample fraction:
import io.qbeast.spark.implicits._
qbeastDf.agg(avg("user_id")).tolerance(0.1).show()
Cloud Providers
AWS S3
🚧 Amazon S3 requires Hadoop 3.2. Qbeast is compatible with public and private buckets.
For a public bucket:
$SPARK_HOME/bin/spark-shell \
--conf spark.sql.extensions=io.qbeast.sql.DeltaQbeastSparkSessionExtension \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider \
--packages io.qbeast:qbeast-spark_2.12:0.8.0-SNAPSHOT,io.delta:delta-spark_2.12:3.1.0,com.amazonaws:aws-java-sdk:1.12.20,org.apache.hadoop:hadoop-common:3.2.0,org.apache.hadoop:hadoop-client:3.2.0,org.apache.hadoop:hadoop-aws:3.2.0
For a private bucket:
$SPARK_HOME/bin/spark-shell \
--conf spark.sql.extensions=io.qbeast.sql.DeltaQbeastSparkSessionExtension \
--conf spark.hadoop.fs.s3a.access.key=${AWS_ACCESS_KEY_ID} \
--conf spark.hadoop.fs.s3a.secret.key=${AWS_SECRET_ACCESS_KEY} \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--packages io.qbeast:qbeast-spark_2.12:0.8.0-SNAPSHOT,io.delta:delta-spark_2.12:3.1.0,com.amazonaws:aws-java-sdk:1.12.20,org.apache.hadoop:hadoop-common:3.2.0,org.apache.hadoop:hadoop-client:3.2.0,org.apache.hadoop:hadoop-aws:3.2.0
GCP
Qbeast works with many GCP services including Cloud Storage, BigQuery, BigLake, and DataProc.
Minimal Setup for BigQuery
-
Install the GCS Cloud Storage connector for Hadoop v3.
-
Provision one or more GCS Buckets.
-
Navigate to Google DataProc, select
Metastore Services -> Metastore
in the sidebar. ClickCreate
, and configure the metastore config overrides, setting:hive.metastore.warehouse.dir: gs://<bucket name>/<nested path>/hive-warehouse
. The nested path is optional. -
Selecting an existing Spark node (in GCE or GKE), and modify its properties to enable Google Cloud & Qbeast configurations
# Configure the Spark worker to use the Qbeast formatter library spark.sql.extensions io.qbeast.sql.DeltaQbeastSparkSessionExtension spark.sql.catalog.spark_catalog io.qbeast.catalog.QbeastCatalog
-
Create a schema in BigQuery Studio in the same region than the GC bucket.
-
Create an external connection with
*connection
type* ofApache Spark
, and configure to point to the DataProc metastore described in step #3. -
Create an external connection for BiqQuery to address the Cloud Storage
- Click
Add
, SelectConnections to external data sources
, selectVertex AI remote models, remote functions and BigLake (Cloud Resource)
, choose a connection ID, and select the region used for the GCS Bucket. - Select the external connection created (matching the name) in the left sidebar, and copy the Service Account ID. Assign this service account ID permissions to the GCS Bucket by navigating to the bucket in Cloud Storage,
Grant Access
, entering the BQ Service Account as the principal, and assigning aStorage Admin
role (to be refined later).
- Click
-
Create an external table within BigQuery targeting the Qbeast formatted table (using Delta Lake connector).
CREATE EXTERNAL TABLE `<project>.<schema>.<table name>` WITH CONNECTION `<connection id>` OPTIONS (format ="DELTA_LAKE", uris=['<bucket location>']);
Version Compatibility
Qbeast Version | Spark | Hadoop | Delta Lake |
---|---|---|---|
0.1.0 | 3.0.0 | 3.2.0 | 0.8.0 |
0.2.0 | 3.1.x | 3.2.0 | 1.0.0 |
0.3.x | 3.2.x | 3.3.x | 1.2.x |
0.4.x | 3.3.x | 3.3.x | 2.1.x |
0.5.x | 3.4.x | 3.3.x | 2.4.x |
0.6.x | 3.5.x | 3.3.x | 3.1.x |
0.7.x | 3.5.x | 3.3.x | 3.1.x |
See Delta Lake releases for up-to-date compatibility info.