Skip to main content

Quickstart for Qbeast-spark

Content

Welcome to the documentation of the Qbeast-Spark project.

In these sections you will find guides to better understand the technology behind an open format and be able to play with it in just a few lines of code.

Writing

Inside the project folder, launch a spark-shell with the required dependencies. The following example uses AWS S3 for reading from a public dataset.

ℹ️ Warning: Different cloud providers may require specific versions of Spark or Hadoop, or specific libraries. Refer here to check compatibilities.

$SPARK_HOME/bin/spark-shell \
--packages io.qbeast:qbeast-spark_2.12:0.6.0,io.delta:delta-spark_2.12:3.1.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog

As an extra configuration, you can also change two global parameters of the index:

  1. The default desired size of the written files (100000)
--conf spark.qbeast.index.defaultCubeSize=200000
  1. The default buffer capacity for intermediate results (100000)
--conf spark.qbeast.index.cubeWeightsBufferCapacity=200

Consult the Qbeast-Spark advanced configuration for more information.

Read the ecommerce test dataset from Kaggle.

val ecommerce = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/test/resources/ecommerce100K_2019_Oct.csv")

Indexing the data with the desired columns, in this case user_id and product_id.

val qbeastTablePath = "/tmp/qbeast-test-data/qtable"

(ecommerce.write
.mode("overwrite")
.format("qbeast") // Saving the dataframe in a qbeast datasource
.option("columnsToIndex", "user_id,product_id") // Indexing the table
.option("cubeSize", "500") // The desired number of records of the resulting files/cubes. Default is 5M
.save(qbeastTablePath))

Sampling

Allow the sample operator to be pushed down to the source when sampling, reducing i/o and computational cost.

Perform sampling, open your Spark Web UI, and observe how the sample operator is converted into a filter and pushed down to the source!

val qbeastDf = (spark
.read
.format("qbeast")
.load(qbeastTablePath))

qbeastDf.sample(0.1).explain()
== Physical Plan ==
*(1) Filter ((qbeast_hash(ss_cdemo_sk#1091, ss_cdemo_sk#1091, 42) < -1717986918) AND (qbeast_hash(ss_cdemo_sk#1091, ss_cdemo_sk#1091, 42) >= -2147483648))
+- *(1) ColumnarToRow
+- FileScan parquet [ss_sold_time_sk#1088,ss_item_sk#1089,ss_customer_sk#1090,ss_cdemo_sk#1091,ss_hdemo_sk#1092,ss_addr_sk#1093,ss_store_sk#1094,ss_promo_sk#1095,ss_ticket_number#1096L,ss_quantity#1097,ss_wholesale_cost#1098,ss_list_price#1099,ss_sales_price#1100,ss_ext_discount_amt#1101,ss_ext_sales_price#1102,ss_ext_wholesale_cost#1103,ss_ext_list_price#1104,ss_ext_tax#1105,ss_coupon_amt#1106,ss_net_paid#1107,ss_net_paid_inc_tax#1108,ss_net_profit#1109,ss_sold_date_sk#1110] Batched: true, DataFilters: [(qbeast_hash(ss_cdemo_sk#1091, ss_cdemo_sk#1091, 42) < -1717986918), (qbeast_hash(ss_cdemo_sk#10..., Format: Parquet, Location: OTreeIndex[file:/tmp/qbeast-test-data/qtable], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ss_sold_time_sk:int,ss_item_sk:int,ss_customer_sk:int,ss_cdemo_sk:int,ss_hdemo_sk:int,ss_a...

Notice that the sample operator is no longer present in the physical plan. It's converted into a Filter (qbeast_hash) instead and is used to select files during data scanning(DataFilters from FileScan). We skip reading many files in this way, involving less I/O.

SQL

Thanks to the QbeastCatalog, you can use plain SQL and CREATE TABLE or INSERT INTO in qbeast format.

To check the different configuration on the Catalog, please go to Advanced Configuration

ecommerce.createOrReplaceTmpView("ecommerce_october")

spark.sql("CREATE OR REPLACE TABLE ecommerce_qbeast USING qbeast AS SELECT * FROM ecommerce_october")

//OR

val ecommerceNovember = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("./src/test/resources/ecommerce100K_2019_Nov.csv")

ecommerceNovember.createOrReplaceTmpView("ecommerce_november")

spark.sql("INSERT INTO ecommerce_qbeast SELECT * FROM ecommerce_november")

Sampling has also an operator called TABLESAMPLE, which can be expressed in both terms of rows or percentage.

spark.sql("SELECT avg(price) FROM ecommerce_qbeast TABLESAMPLE(10 PERCENT)").show()

Python

Similar to Scala and SQL, you can also use PySpark to operate on Qbeast tables. Here, you can find an end-to-end example following the same operations demonstrated earlier on this page.

from pyspark.sql import SparkSession

# Initialize Spark session with Qbeast and Delta support, running locally
spark = SparkSession.builder \
.appName("QbeastExample") \
.master("local[*]") \
.config("spark.jars.packages", "io.qbeast:qbeast-spark_2.12:0.6.0,io.delta:delta-spark_2.12:3.1.0") \
.getOrCreate()

qbeast_table_path = "/tmp/qbeast-test-data/qtable"

# Creating a temporary view for the ecommerce data of October
ecommerce_october = spark.read \
.format("csv")\
.option("header", "true") \
.option("inferSchema", "true") \
.load("src/test/resources/ecommerce100K_2019_Oct.csv")

# Creating a Qbeast table using PySpark
ecommerce_october.write \
.format("qbeast") \
.mode("overwrite") \
.option("columnsToIndex", "user_id,product_id") \
.option("cubeSize", "500") \
.save(qbeast_table_path)

# Reading the ecommerce data for November
ecommerce_november = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("src/test/resources/ecommerce300k_2019_Nov.csv")

# Inserting data into the Qbeast table using PySpark
ecommerce_november.write \
.format("qbeast") \
.mode("append") \
.save(qbeast_table_path)

sampled_data = spark.read.format("qbeast").load(qbeast_table_path).sample(0.10)
sampled_data.selectExpr("avg(price)").show()

Analyze and Optimize

Analyze and optimize the index to announce and replicate cubes respectively to maintain the tree's structure in an optimal state.

WARNING: ISSUE #282 Replication not enabled on Multiblock Format (from v.0.6.0). We don't encourage to use this method since it can impact index consistency.

import io.qbeast.spark.QbeastTable

val qbeastTable = QbeastTable.forPath(spark, qbeastTablePath)

// analyze the index structure and detect cubes that can be optimized
qbeastTable.analyze()

// optimize the index to improve read operations
qbeastTable.optimize()

Other DML Operations

DML is a Data Manipulation Language that is used to manipulate data itself. For example: insert, update, and delete are instructions in SQL.

Deletes

WARNING: Data can be removed from a Qbeast table with Delta Lake API, but, as currently constructed, it will leave the index in an inconsistent state. See issue #327.

You can delete rows from a table using the DeltaTable API; then the table should only be read using delta.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/qbeast/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Table Tolerance (Work In Progress)

Specify the tolerance willing to accept and let qbeast handle the calculation for the optimal fraction size to use.

import io.qbeast.spark.implicits._

qbeastDf.agg(avg("user_id")).tolerance(0.1).show()