QUICKSTART
USE Core
USE Core is the part of Universal Storage Engine that gathers all the Table information to show to the user.
To test and Quickstart the backend, you need to package and configure your cluster accordingly.
The idea is to use it alongside Spark Connect, but you can still play with your (boring) Spark Cluster.
The next steps show you how.
Pre: Download and configure Spark Environment
export SPARK_VERSION=3.5.0
wget https://dlcdn.apache.org/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop3.tgz
tar xzvf spark-$SPARK_VERSION-bin-hadoop3.tgz
export SPARK_HOME=$PWD/spark-$SPARK_VERSION-bin-hadoop3.tgz
1. Run sbt package
Inside the USE-core project folder, run sbt package.
git clone git@github.com:Qbeast-io/USE.git
git fetch --all
git checkout show_the_metrics
cd USE/core
sbt package2. Copy packages to SPARK_CONNECT jars
We need to solve this, but the only way of using correctly the Dependencies is to copy them into jars Spark folder.
cp target/scala-2.12/use-core_2.12-0.0.1.jar $SPARK_HOME/jars/Other libraries that can be missing are Delta Lake's both storage and core. Please, download and include them as well in the jars.
export DELTA_VERSION=3.1.0
wget -P $SPARK_HOME/jars/ https://repo1.maven.org/maven2/io/delta/delta-core_2.12/$DELTA_VERSION/delta-core_2.12-$DELTA_VERSION.jar
wget -P $SPARK_HOME/jars/ https://repo1.maven.org/maven2/io/delta/delta-storage_2.12/$DELTA_VERSION/delta-storage_2.12-$DELTA_VERSION.jar3. Start Spark Connect Server (optional)
Start the Spark Connect Server through the terminal.
$SPARK_HOME/sbin/start-connect-server.sh \
--packages org.apache.spark:spark-connect_2.12:$SPARK_VERSION \
--conf spark.sql.catalog.use_catalog=io.qbeast.use.catalog.USEReadOnlyCatalog \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalogThe remote endpoint would be located on localhost address.
We've created a script that runs 1, 2 and 3 sequentially.
Execute it by doing:
cd docs
export SPARK_HOME=/path/to/your/spark
./start-spark-use-connect.sh4. Start a Pyspark shell
Spark Connect Client is only available for Python. So we can either:
- Spark Pyspark Shell.
- Use Jupyter Notebooks and connect to remote Spark Connect Server.
We will show the code for both of them.
Pyspark Shell
$SPARK_HOME/bin/pyspark --remote "sc://localhost"5. Create a Table
Before dealing with Tables, verify that the Spark Connect works by creating a DataFrame and showing it's content.
columns = ["id","name"]
data = [(1,"Sarah"),(2,"Maria")]
df = spark.createDataFrame(data).toDF(*columns)
df.show()If the DataFrame is shown, you can proceed to the next step.
Create and Configure the Table
The Universal Storage Engine would take care of the tables you choose to manage.
If you want a specific table to be ingested and automatically optimized by Qbeast, you should set up the path for the ingestion events. Those events would allow the tool to monitor the data pipelines and grant you access to aggregated statistics for better observability.
// CREATE TABLE
CREATE TABLE t(id INT, name STRING) USING qbeast TBLRPROPERTIES(use.ingestion.source.path.files='s3://bucket')
// OR ALTER TABLE
ALTER TABLE t SET TBLPROPERTIES(use.ingestion.source.path.files='s3://bucket');
To set up also Consumption Monitoring Process on the Spark Backend, you should set the TBLPROPERTIES accordingly.
// CREATE TABLE
CREATE TABLE t(id INT, name STRING) USING qbeast TBLRPROPERTIES(use.consumption.enabled='true')
// ALTER TABLE
ALTER TABLE t SET TBLPROPERTIES(use.consumption.enabled = 'true');
Write Data to the Table
df.write.insertInto("t")6. Explore USE Statistics
The available statistics to query from the USE are available as Tables through USEReadOnlyCatalog :
spark.sql("SHOW TABLES IN use_catalog").show(100, False)The output would look something like this:
+------------------------------+-------------------+-----------+
|namespace |tableName |isTemporary|
+------------------------------+-------------------+-----------+
|system_lake_tables |history |false |
|system_lake_tables |tables_summary |false |
|system_lake_tables |files |false |
|system_lake_tables |catalogs_summary |false |
|system_lake_tables |namespaces_summary |false |
|system_lake_tables |metastore_summary |false |
|spark_catalog.default.students|history |false |
|spark_catalog.default.students|files |false |
|spark_catalog.default.students|files_hourly |false |
|spark_catalog.default.students|ingestions |false |
|spark_catalog.default.students|ingestions_hourly |false |
|spark_catalog.default.students|consumptions |false |
|spark_catalog.default.students|consumptions_hourly|false |
+------------------------------+-------------------+-----------+You can query global summaries by doing:
# TABLES
spark.sql("SELECT * FROM use_catalog.system_lake_tables.tables_summary").show()
# NAMESPACES
spark.sql("SELECT * FROM use_catalog.system_lake_tables.namespaces_summary").show()
# CATALOGS
spark.sql("SELECT * FROM use_catalog.system_lake_tables.catalogs_summary").show()
# ALL TOGETHER
spark.sql("SELECT * FROM use_catalog.system_lake_tables.metastore_summary").show()
And more Open Format Table information:
# HISTORY
spark.sql("SELECT * FROM use_catalog.spark_catalog.default.students.history").show()
# FILES
spark.sql("SELECT * FROM use_catalog.spark_catalog.default.students.files").show()
# FILES HOURLY AGGREGATION
spark.sql("SELECT * FROM use_catalog.spark_catalog.default.students.files_hourly").show()
# INGESTIONS
spark.sql("SELECT * FROM use_catalog.spark_catalog.default.students.ingestions").show()
# INGESTIONS HOURLY AGGREGATION
spark.sql("SELECT * FROM use_catalog.spark_catalog.default.students.ingestions_hourly").show()
# CONSUMPTIONS
spark.sql("SELECT * FROM use_catalog.spark_catalog.default.students.consumptions").show()
# CONSUMPTIONS HOURLY AGGREGATION
spark.sql("SELECT * FROM use_catalog.spark_catalog.default.students.consumptions_hourly").show()The Tables and Schemas that we save in the system are described in DATA MODEL.
7. Optimization
The USE Core has three main optimization processes: CubeDefragmentation, SamplingReduction and DeltaVacuum.
Each of them can be executed manually from the CLI or from the spark-shell.
Example of Cube Defragmentation
First, copy the dataset into a tmp directory.
cp -R ./core/src/it/resources/test-with-offsets /tmp/test-with-offsets-defrag/Scala
import io.qbeast.use.managed.index._
// Optimize 1 MB of data, the total table size is around 4 MB.
val bytesToOptimize = 1L * 1024L * 1024L
// Initialize the configuration
val config = CubeDefragmentationConfiguration(
tableLocation = tmpDir,
revisionID = None,
revisionRewrite = false,
bytesToOptimize = bytesToOptimize)
// Optimize the table
CubeDefragmentation.optimize(config)CLI
$SPARK_HOME350/bin/spark-submit \
--packages io.delta:delta-spark_2.12:3.1.0,io.qbeast:qbeast-spark_2.12:0.6.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \
--conf spark.driver.memory=10G \
--conf spark.driver.extraJavaOptions="-Dlog4j.configuration=~/Developer/log4j3.properties" \
--conf spark.executor.extraJavaOptions="-Dlog4j.configuration=~/Developer/log4j3.properties" \
--files ~/Developer/log4j3.properties \
--class "io.qbeast.use.managed.cli.CubeDefragmentationCLI" \
./target/scala-2.12/use-core-assembly-0.0.1.jar \
--table-location="/tmp/test-with-offsets-defrag/" \
--revision-id=1 \
--revision-rewrite=false \
--bytes-to-optimize=50000000All the optimization processes will emit an OptimizationEvent saved under _qbeast/insights/optimization/events.
Read all about Optimizations in the Optimization Guide.
8. Scheduling
The USE employes FAIR scheduling between two different pools to ensure quick serving of the UI. The pools are:
- defualt: this is the default pool used by any Spark Connect tasks. It has a higher priority and by default it has at least 1 node allocated.
- background: this is the pool used by any streaming task created by the used from a Listener (e.g. the StreamingSparkListener or the JaffleShopDbtStreamingGenerator)
You can change the priority (weights) and the minimum allocation of each pool by changing the configuration in the fairscheduler.xml file.
The default configuration looks as follows:
<?xml version="1.0"?>
<allocations>
<pool name="default">
<schedulingMode>FAIR</schedulingMode>
<weight>2</weight>
<minShare>1</minShare>
</pool>
<pool name="background">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>0</minShare>
</pool>
</allocations>
If you detect any bug, please report it in the Qbeast Users Slack channel.