# Qbeast Datasource Format



<p align="center">
    <img src="./images/Qbeast-spark.png" />
</p>



## Content
- Introduction
- Table Indexing
- Sample Pushdown
- Table Tolerance
- Analyze and Optimize

## Predicate PushDown
**Predicate pushdown** is of great importance when it comes to **optimizing the logical plan** of a query. Among its benefits, one can find reduced usage of computation resources and less I/O from the secondary storage.

Having a **predicate** in a query generally means the subsequent operators will work with fewer data. Without affecting the query output, further down the plan are the filters greater the benefits.

```sql
SELECT A.name
FROM A JOIN B ON A.id = B.id
WHERE CONDITION_ON_A AND CONDITION_ON_B

```

Take the query above as an example, a potential physical plan, without predicate pushdown can be as following:



<p align="center">
    <img src="./images/g1.png" width="400" height="500" />
</p>



It starts by reading all the data from both tables **A** and **B**, performs the join on the id column, **A.id = B.id**, proceed to apply the **predicates** specified by the **WHERE** clause, **cond_A**, and **cond_B**, and finally project the target column, **name**.

With the optimization of **predicate pushdown**, both conditions are found at the source and used as filters to select satisfying records, **reducing disk I/O** in this way. The **join** operator now also gets to operate with fewer data.



<p align="center">
    <img src="./images/g2.png" width="400" height="500" />
</p>



As **query complexity** increases, the usage of **optimization rules** ensures moving the predicate is safe from altering the final output. Such rules are present in all major SQL query engines, with **Spark SQL** being no exception.


## Sampling

**Sample** operators are yet another way to achieve the benefits of working with a reduced dataset. Unlike using filters where record selection is done **deterministically**, a (uniform) sample operator constructs a **representative subset** of the original data **randomly and uniformly**. The resulting subset is expected to have a **distribution** that resembles the source, and users generally only need to provide the **fraction** of the source data they desire to work with.

```python
df = spark.read.load(source_path)
df.sample(fraction=0.5)
```

Its usage reduces **compute cost and latency** as before, except the result **accuracy** is inevitably compromised in relation to the subset size.

Unlike filters, a sampler operator in Spark SQL can only do its job once all the data is retrieved from the source, for which a complete disk I/O is still required. Apart from that, there's no clear model to understand the **cost and latency vs. accuracy** trade-off when choosing the **fraction** to use, the reason for which there is a general avoidance for using samplers.


## Qbeast Format

To address the above-mentioned issues, we introduce **qbeast datasource format** for Spark, a custom DataSource designed to enable **multidimensional indexing** for datasets together a set of transformation rules we achieve not only to convert the Sample operator into filters so random and uniform record selection can take place at the source, but on top of that, we've also created our own operator, **Table Tolerance**, which given the maximum query **tolerance** it can determine by itself the most cost-effective **fraction** to use, for which the user is no longer left wondering whether the sample they chose to use is accurate for their objectives.

## Setup

Download Pyspark version 3.1.1

In [None]:
!conda install pyspark=3.1.1 -y
#!pip install pyspark==3.1.1 # Google Colab compatibility. Please comment previous line if you're using pip

Importing dependencies and initialize a Spark session

In [15]:
import os
from pyspark.sql import SparkSession


DATA_ROOT = "/tmp/qbeast-test/data"
parquet_table_path = "s3a://qbeast-public-datasets/store_sales"
qbeast_table_path = os.path.join(DATA_ROOT, "qbeast/qtable")


hadoop_deps = ','.join(map(lambda a: 'org.apache.hadoop:hadoop-' + a + ':3.2.0', ['common','client','aws']))
deps = "io.qbeast:qbeast-spark_2.12:0.2.0,io.delta:delta-core_2.12:1.0.0,com.amazonaws:aws-java-sdk:1.12.20," + hadoop_deps

spark = (SparkSession.builder
         .master("local[*]")
         .config("spark.sql.extensions", "io.qbeast.spark.internal.QbeastSparkSessionExtension")
         .config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
         .config("spark.jars.packages", deps)
         .getOrCreate())
spark.sparkContext.setLogLevel('OFF')


# Table Indexing

The dataset used here is the **store_sales** table from **TCP-DS**. The format is parquet and its schema is shown below.

In [16]:
parquet_df = spark.read.format("parquet").load(parquet_table_path)

print("Number of rows with na:", parquet_df.count())

# Display the schema
parquet_df.printSchema()

Number of rows with na: 2879789
root
 |-- ss_sold_time_sk: integer (nullable = true)
 |-- ss_item_sk: integer (nullable = true)
 |-- ss_customer_sk: integer (nullable = true)
 |-- ss_cdemo_sk: integer (nullable = true)
 |-- ss_hdemo_sk: integer (nullable = true)
 |-- ss_addr_sk: integer (nullable = true)
 |-- ss_store_sk: integer (nullable = true)
 |-- ss_promo_sk: integer (nullable = true)
 |-- ss_ticket_number: long (nullable = true)
 |-- ss_quantity: integer (nullable = true)
 |-- ss_wholesale_cost: decimal(7,2) (nullable = true)
 |-- ss_list_price: decimal(7,2) (nullable = true)
 |-- ss_sales_price: decimal(7,2) (nullable = true)
 |-- ss_ext_discount_amt: decimal(7,2) (nullable = true)
 |-- ss_ext_sales_price: decimal(7,2) (nullable = true)
 |-- ss_ext_wholesale_cost: decimal(7,2) (nullable = true)
 |-- ss_ext_list_price: decimal(7,2) (nullable = true)
 |-- ss_ext_tax: decimal(7,2) (nullable = true)
 |-- ss_coupon_amt: decimal(7,2) (nullable = true)
 |-- ss_net_paid: decimal(7,2) (

The table contains 23 columns in total, and the reason why only work with the first 5 is to have a cleaner query plan for later examination.

In [14]:
processed_parquet_df = (
    parquet_df
    .select(
        "ss_sold_time_sk",
        "ss_item_sk",
        "ss_customer_sk",
        "ss_cdemo_sk",
        "ss_hdemo_sk") # Selecting only the first 5 columns
    .na.drop()         # dropping rows with null values
)

print(f"Number of rows in the resulting dataframe: {processed_parquet_df.count()}")
processed_parquet_df.printSchema()

Number of rows in the resulting dataframe: 2637520
root
 |-- ss_sold_time_sk: integer (nullable = true)
 |-- ss_item_sk: integer (nullable = true)
 |-- ss_customer_sk: integer (nullable = true)
 |-- ss_cdemo_sk: integer (nullable = true)
 |-- ss_hdemo_sk: integer (nullable = true)



With the dataset set in place, we can write the table into a **qbeast datasource**, and indexing it using columns **ss_cdemo_sk** and **ss_hdemo_sk**. The choice of columns is trivial, at the moment any numerical column would do the trick. Generally one should choose the columns that they query most frequently on.

In [None]:
(processed_parquet_df
    .write
    .mode("overwrite")
    .format("qbeast")                                     # Saving the dataframe in a qbeast datasource
    .option("columnsToIndex", "ss_cdemo_sk,ss_hdemo_sk")  # Indexing the table
    .option("cubeSize", "300000")
    .save(qbeast_table_path)
)

## Sampling PushDown

## Qbeast sample vs Spark vanilla sample

To demonstrate the transformation of the **Sample** operator into **Filters** and the subsequent application of Predicate PushDown, we will examine the query plan of a sample operation on a qbeast table and compare it with its application on a regular parquet table, namely **processed_parquet_df** from above.

In [None]:
# write the processed parquet data to a new folder 
# and re-read it so the query plan is simpler to examine.
processed_parquet_dir = os.path.join(DATA_ROOT, "parquet/test_data")

processed_parquet_df.write.mode("overwrite").format("parquet").save(processed_parquet_dir)

processed_parquet_df = spark.read.format("parquet").load(processed_parquet_dir)

In [6]:
qbeast_df = spark.read.format("qbeast").load(qbeast_table_path)

assert qbeast_df.count() == processed_parquet_df.count(), "Both tables should have the same number of rows"
# 2.637.520

In [8]:
print("Query Plan for Sampling on a parquet file\n")

processed_parquet_df.sample(fraction=0.1).explain(True)

Query Plan for Sampling on a parquet file

== Parsed Logical Plan ==
Sample 0.0, 0.1, false, 6984010462622187441
+- Relation[ss_sold_time_sk#918,ss_item_sk#919,ss_customer_sk#920,ss_cdemo_sk#921,ss_hdemo_sk#922] parquet

== Analyzed Logical Plan ==
ss_sold_time_sk: int, ss_item_sk: int, ss_customer_sk: int, ss_cdemo_sk: int, ss_hdemo_sk: int
Sample 0.0, 0.1, false, 6984010462622187441
+- Relation[ss_sold_time_sk#918,ss_item_sk#919,ss_customer_sk#920,ss_cdemo_sk#921,ss_hdemo_sk#922] parquet

== Optimized Logical Plan ==
Sample 0.0, 0.1, false, 6984010462622187441
+- Relation[ss_sold_time_sk#918,ss_item_sk#919,ss_customer_sk#920,ss_cdemo_sk#921,ss_hdemo_sk#922] parquet

== Physical Plan ==
*(1) Sample 0.0, 0.1, false, 6984010462622187441
+- *(1) ColumnarToRow
   +- FileScan parquet [ss_sold_time_sk#918,ss_item_sk#919,ss_customer_sk#920,ss_cdemo_sk#921,ss_hdemo_sk#922] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/qbeast-test/data/parquet/test_data

Notice in the query plan for the parquet table that sample is the last operator from the query and it has remained that way for all stages of the query engine execution.

In [9]:
print("\nQuery Plan for Sampling on a qbeast\n")

qbeast_df.sample(fraction=0.1).explain(True)


Query Plan for Sampling on a qbeast

== Parsed Logical Plan ==
Sample 0.0, 0.1, false, -6426792091855529389
+- Relation[ss_sold_time_sk#928,ss_item_sk#929,ss_customer_sk#930,ss_cdemo_sk#931,ss_hdemo_sk#932] QbeastBaseRelation(parquet,Revision(1,1637574535289,/tmp/qbeast-test/data/qbeast/qtable,300000,List(LinearTransformer(ss_cdemo_sk,io.qbeast.core.model.IntegerDataType$@10cd75bf), LinearTransformer(ss_hdemo_sk,io.qbeast.core.model.IntegerDataType$@10cd75bf)),List(LinearTransformation(2,1920793,io.qbeast.core.model.IntegerDataType$@10cd75bf), LinearTransformation(1,7200,io.qbeast.core.model.IntegerDataType$@10cd75bf))))

== Analyzed Logical Plan ==
ss_sold_time_sk: int, ss_item_sk: int, ss_customer_sk: int, ss_cdemo_sk: int, ss_hdemo_sk: int
Sample 0.0, 0.1, false, -6426792091855529389
+- Relation[ss_sold_time_sk#928,ss_item_sk#929,ss_customer_sk#930,ss_cdemo_sk#931,ss_hdemo_sk#932] QbeastBaseRelation(parquet,Revision(1,1637574535289,/tmp/qbeast-test/data/qbeast/qtable,300000,List(Li

On the other hand, the sample operator is no longer present for the table with qbeast source, the optimized logical plan has a Filter that uses **qbeast_hash** to eliminate unnecessary data instead.

Notice that the query plans for both dataframes are the same at the begining and they only started to differ after the application of the optimization rules, which in this case converted the **Sample** operator into **Filters** and applied **Predicate PushDown** rules from Spark query engine.

These filters are pushed down to the level of the data source in the physical plan and are used by Spark as it scans the data from the source relation. The filters applied at the source are shown in the **DataFilters** filed from **FileScan parquet**.

In [14]:
# processed_parquet_df.sample(0.1).collect()
# qbeast_df.sample(fraction=0.1).collect()

Execute the queries from the previous cell, and check the query plans from **Spark UI**.

The fact that less files are accessed can be seen by comparing the total number of files in the folder and the number of files read from **Query Details**. Also, the **number of output rows** from **Scan parquet** can also indicate whether we are reading all the files.

|Data Source              |Total number of files    |Number of files read     |Number of rows read    | Number of output rows
|-------------------------|:-----------------------:|:-----------------------:|:-----------------------:|:-----------------------:|
|parquet                  |16                        |16                        |2,637,520                | 264,192
|qbeast                   |21                     |1                       |302,715               |262,013

Under the hood, a qbeast table is divided into different partitions according to their states, and each partition is stored in a different parquet file. The filtering at the source is used for partition selection, and the second filtering is the one actually applied to the individual rows.