Qbeast Spark - Getting Started

Quickstart for Qbeast-spark

Welcome! This page will guide you through setting up and running Qbeast-Spark. You’ll learn how to install dependencies, launch a Spark session, configure projects in Java, Scala, or Python, and work with Qbeast tables.

Before You Begin

You can use Qbeast-Spark interactively (via Spark shell) or as a library in a project.

  • Interactive: Start the Spark shell (Scala or Python) and run code snippets directly.
  • As a Project: Add Qbeast-Spark to a Maven or SBT project and run your own code.

Make sure you have the following:

  • Java 8+ (Java 8, 11, or 17) installed and configured.
  • SBT, Gradle, or Maven if you’re managing dependencies.
  • Apache Spark 3.5+ (with Scala 2.12 support).

Install Java

Refer to the official Apache Spark installation guide to install Java. Set up your PATH or JAVA_HOME environment variable.

If you use Windows, see this blog for setup instructions. Make sure to match your Spark version with supported Delta Lake (3.2.0) and Qbeast-Spark (0.7.0).

Install Apache Spark

Download Spark and set the SPARK_HOME environment variable:

wget https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz
tar -xzvf spark-3.5.2-bin-hadoop3.tgz
export SPARK_HOME=$PWD/spark-3.5.2-bin-hadoop3

ℹ️ You can use Hadoop 2.7, but it may cause issues with some cloud storage providers. See here for more info.

Launch Spark

You can use PySpark, Scala, or SQL shells. Qbeast works with both Delta Lake and Hudi.

Warning: Some cloud providers need specific Spark/Hadoop versions or extra libraries. See Cloud Storages for details.

PySpark

Install PySpark for your Spark version:

pip install pyspark==<compatible-spark-version>

Launch PySpark with Qbeast and Delta Lake:

export QBEAST_SPARK_VERSION=0.8.0-SNAPSHOT
pyspark --packages io.qbeast:qbeast-spark_2.12:$QBEAST_SPARK_VERSION,io.delta:delta-spark_2.12:3.1.0 \
  --conf spark.sql.extensions=io.qbeast.sql.DeltaQbeastSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.DeltaQbeastCatalog

Launch PySpark with Hudi:

export QBEAST_SPARK_VERSION=0.8.0-SNAPSHOT
pyspark --repositories https://maven.pkg.github.com/qbeast-io/qbeast-spark-private \
  --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:1.0.0,io.qbeast:qbeast-spark_2.12:$QBEAST_SPARK_VERSION \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
  --conf spark.sql.extensions=io.qbeast.sql.HudiQbeastSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.HudiQbeastCatalog

Scala

Use the Spark shell from your Spark installation.

For Delta Lake:

export QBEAST_SPARK_VERSION=0.8.0-SNAPSHOT
$SPARK_HOME/bin/spark-shell --repositories https://maven.pkg.github.com/qbeast-io/qbeast-spark-private \
  --packages io.delta:delta-spark_2.12:3.1.0,io.qbeast:qbeast-spark_2.12:$QBEAST_SPARK_VERSION \
  --conf spark.sql.extensions=io.qbeast.sql.DeltaQbeastSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.DeltaQbeastCatalog

For Hudi:

export QBEAST_SPARK_VERSION=0.8.0-SNAPSHOT
$SPARK_HOME/bin/spark-shell --repositories https://maven.pkg.github.com/qbeast-io/qbeast-spark-private \
  --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:1.0.0,io.qbeast:qbeast-spark_2.12:$QBEAST_SPARK_VERSION \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
  --conf spark.sql.extensions=io.qbeast.sql.HudiQbeastSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.HudiQbeastCatalog

SQL Shell

You can also use Qbeast from the Spark SQL shell.

For Delta Lake:

export QBEAST_SPARK_VERSION=0.8.0-SNAPSHOT
$SPARK_HOME/bin/spark-sql --repositories https://maven.pkg.github.com/qbeast-io/qbeast-spark-private \
  --packages io.delta:delta-spark_2.12:3.1.0,io.qbeast:qbeast-spark_2.12:$QBEAST_SPARK_VERSION \
  --conf spark.sql.extensions=io.qbeast.sql.DeltaQbeastSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.DeltaQbeastCatalog

For Hudi:

export QBEAST_SPARK_VERSION=0.8.0-SNAPSHOT
$SPARK_HOME/bin/spark-sql --repositories https://maven.pkg.github.com/qbeast-io/qbeast-spark-private \
  --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:1.0.0,io.qbeast:qbeast-spark_2.12:$QBEAST_SPARK_VERSION \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
  --conf spark.sql.extensions=io.qbeast.sql.HudiQbeastSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.HudiQbeastCatalog

Advanced Configuration

Fine-tune performance with Qbeast Spark configuration options:

ConfigurationDefinitionDefault
spark.qbeast.index.defaultCubeSizeDefault cube size for new datasets5000000
spark.qbeast.index.cubeDomainsBufferCapacityBuffer capacity for intermediate results100000
spark.qbeast.index.columnsToIndex.autoAutomatically select columns to indexfalse
spark.qbeast.index.columnsToIndex.auto.maxMax columns to index automatically10
spark.qbeast.index.numberOfRetriesRetries for writing data2

See the Qbeast-Spark advanced configuration for more.

Set Up a Project

Add Qbeast-Spark to your Java, Scala, or Python project.

Maven

Add this to your pom.xml:

<dependency>
  <groupId>io.qbeast</groupId>
  <artifactId>qbeast-spark_2.12</artifactId>
  <version>0.7.0</version>
</dependency>

SBT

Add to your build.sbt:

libraryDependencies += "io.qbeast" %% "qbeast-spark" % "0.8.0-SNAPSHOT"

To use a snapshot, add:

ThisBuild / resolvers += "Sonatype OSS Snapshots" at "https://s01.oss.sonatype.org/content/repositories/snapshots"

Python

Configure your SparkSession:

import pyspark
 
spark = pyspark.sql.SparkSession.builder.appName("MyApp").getOrCreate()
 
# For custom config:
spark = pyspark.sql.SparkSession.builder \
    .appName("MyApp") \
    .config("spark.qbeast.tableFormat", "delta") \
    .config("spark.sql.extensions", "io.qbeast.sql.DeltaQbeastSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "io.qbeast.catalog.QbeastCatalog") \
    .getOrCreate()

Working with Qbeast Tables

Create and manage tables using Spark DataFrame or SQL APIs.

Create a Table

Python:

data = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c")], "id: int, name:string")
data.write.mode("overwrite").option("columnsToIndex", "id,name").saveAsTable("qbeast_table")

Scala:

val data = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "name")
data.write.mode("overwrite").option("columnsToIndex", "id,name").saveAsTable("qbeast_table")

SQL:

CREATE TABLE qbeast_table (id INT, name STRING)
USING qbeast
OPTIONS ('columnsToIndex'='id,name');

To specify a location:

CREATE TABLE qbeast_table (id INT, name STRING)
USING qbeast
LOCATION '/tmp/qbeast_table'
OPTIONS ('columnsToIndex'='id,name');

Advanced Table Options

OptionDefinitionExample
columnsToIndexColumns used to index. Choose columns you’ll frequently query to maximize layout efficieny."id,name"
cubeSizeMax number of elements per cube (soft limit). Default is 5 million.1000
columnStatsMin/max stats for indexed columns as JSON, skips stats computation."""{"a_min":0,"a_max":10,"b_min":20.0,"b_max":70.0}"""

Append Data

Add data to a table or a path in append mode.

Python:

append = spark.createDataFrame([(4, "d"), (5, "e")], "id: int, name:string")
append.write.mode("append").insertInto("qbeast_table")
# Or, to append to a path:
append.write.mode("append").option("columnsToIndex", "id,name").format("qbeast").save("/tmp/qbeast_table")

Scala:

val append = Seq((4, "d"), (5, "e")).toDF("id", "name")
append.write.mode("append").insertInto("qbeast_table")
// Or, to append to a path:
append.write.mode("append").option("columnsToIndex", "id,name").format("qbeast").save("/tmp/qbeast_table")

SQL:

INSERT INTO table qbeast_table VALUES (4, "d"), (5, "e");

Read Data

Read data by specifying the table path or name.

Python:

qbeast_df = spark.read.format("qbeast").load("/tmp/qbeast_table")

Scala:

val qbeastDF = spark.read.format("qbeast").load("/tmp/qbeast_table")

SQL:

SELECT * FROM qbeast_table;

Sampling

Sampling lets you analyze a subset of your data, saving time and compute.

You can sample directly at the storage level using Qbeast’s metadata. This way, Spark reads only the needed fraction, not the whole dataset.

Thanks to the Qbeast Metadata, it is possible to use the sample and TABLESAMPLE (in SQL) methods to select a fraction of the data directly from storage instead of loading and computing the results in memory with all the records.

Python:

qbeast_df.sample(0.3).show()

Scala:

qbeast_df.sample(0.3).show()

SQL:

SELECT * FROM qbeast_table TABLESAMPLE (30 PERCENT);

Check the Spark Web UI to see how sampling pushes down to storage:

qbeastDf.sample(0.3).explain()

QbeastTable API (Scala)

You can get index and layout metrics using the QbeastTable API:

import io.qbeast.table.QbeastTable
 
val qbeastTable = QbeastTable.forPath(spark, "/tmp/qbeast_table")
 
qbeastTable.getIndexMetrics()
MethodDefinition
revisionIDs(): List[Long]All Revision IDs in the table
latestRevision(): RevisionLatest Revision
latestRevisionID(): LongLatest Revision ID
revision(revisionID: Option[Int])Info for a specific or latest revision
indexedColumns(revisionID: Option[Int])Indexed columns for a specific or latest revision
cubeSize(revisionID: Option[Int])Cube size for a specific or latest revision
getDenormalizedBlocks(revisionID: Option[Int])Denormalized Blocks for a revision
getIndexMetrics(revisionID: Option[Int])Index metrics for a specific or latest revision

Optimize Tables

Optimizing rewrites parts of your table to improve layout and performance.

qbeastTable.optimize()         // Optimize the latest revision
qbeastTable.optimize(2L)       // Optimize a specific revision
qbeastTable.optimize(Seq("file1", "file2")) // Optimize specific files

To optimize the full table across all revisions:

val revisions = qbeastTable.revisionIDs()
revisions.foreach(revision => qbeastTable.optimize(revision))

See the QbeastTable documentation for more.

Deletes

Caution: Deleting data with the Delta Lake API will leave the Qbeast index in an inconsistent state. See issue #327.

Delete rows using the Delta Lake API:

import io.delta.tables._
 
val deltaTable = DeltaTable.forPath(spark, "/tmp/qbeast/people-10m")
 
deltaTable.delete("birthDate < '1955-01-01'")

Or with SQL functions:

import org.apache.spark.sql.functions._
import spark.implicits._
 
deltaTable.delete(col("birthDate") < "1955-01-01")

Table Tolerance (Experimental)

You can specify a tolerance and let Qbeast compute the sample fraction:

import io.qbeast.spark.implicits._
 
qbeastDf.agg(avg("user_id")).tolerance(0.1).show()

Cloud Providers

AWS S3

🚧 Amazon S3 requires Hadoop 3.2. Qbeast is compatible with public and private buckets.

For a public bucket:

$SPARK_HOME/bin/spark-shell \
  --conf spark.sql.extensions=io.qbeast.sql.DeltaQbeastSparkSessionExtension \
  --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider \
  --packages io.qbeast:qbeast-spark_2.12:0.8.0-SNAPSHOT,io.delta:delta-spark_2.12:3.1.0,com.amazonaws:aws-java-sdk:1.12.20,org.apache.hadoop:hadoop-common:3.2.0,org.apache.hadoop:hadoop-client:3.2.0,org.apache.hadoop:hadoop-aws:3.2.0

For a private bucket:

$SPARK_HOME/bin/spark-shell \
  --conf spark.sql.extensions=io.qbeast.sql.DeltaQbeastSparkSessionExtension \
  --conf spark.hadoop.fs.s3a.access.key=${AWS_ACCESS_KEY_ID} \
  --conf spark.hadoop.fs.s3a.secret.key=${AWS_SECRET_ACCESS_KEY} \
  --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
  --packages io.qbeast:qbeast-spark_2.12:0.8.0-SNAPSHOT,io.delta:delta-spark_2.12:3.1.0,com.amazonaws:aws-java-sdk:1.12.20,org.apache.hadoop:hadoop-common:3.2.0,org.apache.hadoop:hadoop-client:3.2.0,org.apache.hadoop:hadoop-aws:3.2.0

GCP

Qbeast works with many GCP services including Cloud Storage, BigQuery, BigLake, and DataProc.

Minimal Setup for BigQuery

  • Install the GCS Cloud Storage connector for Hadoop v3.

  • Provision one or more GCS Buckets.

  • Navigate to Google DataProc, select Metastore Services -> Metastore in the sidebar. Click Create, and configure the metastore config overrides, setting: hive.metastore.warehouse.dir: gs://<bucket name>/<nested path>/hive-warehouse . The nested path is optional.

  • Selecting an existing Spark node (in GCE or GKE), and modify its properties to enable Google Cloud & Qbeast configurations

    # Configure the Spark worker to use the Qbeast formatter library
    spark.sql.extensions io.qbeast.sql.DeltaQbeastSparkSessionExtension
    spark.sql.catalog.spark_catalog io.qbeast.catalog.QbeastCatalog
  • Create a schema in BigQuery Studio in the same region than the GC bucket.

  • Create an external connection with *connection type* of Apache Spark , and configure to point to the DataProc metastore described in step #3.

  • Create an external connection for BiqQuery to address the Cloud Storage

    • Click Add, Select Connections to external data sources, select Vertex AI remote models, remote functions and BigLake (Cloud Resource), choose a connection ID, and select the region used for the GCS Bucket.
    • Select the external connection created (matching the name) in the left sidebar, and copy the Service Account ID. Assign this service account ID permissions to the GCS Bucket by navigating to the bucket in Cloud Storage, Grant Access, entering the BQ Service Account as the principal, and assigning a Storage Admin role (to be refined later).
  • Create an external table within BigQuery targeting the Qbeast formatted table (using Delta Lake connector).

    CREATE EXTERNAL TABLE `<project>.<schema>.<table name>`
    WITH CONNECTION `<connection id>`
    OPTIONS (format ="DELTA_LAKE",  uris=['<bucket location>']);

Version Compatibility

Qbeast VersionSparkHadoopDelta Lake
0.1.03.0.03.2.00.8.0
0.2.03.1.x3.2.01.0.0
0.3.x3.2.x3.3.x1.2.x
0.4.x3.3.x3.3.x2.1.x
0.5.x3.4.x3.3.x2.4.x
0.6.x3.5.x3.3.x3.1.x
0.7.x3.5.x3.3.x3.1.x

See Delta Lake releases for up-to-date compatibility info.