OPTIMIZATION
As part of the Managed Tables features for USE, we are implementing a set of optimizations to improve the performance of the index.
Sampling overhead reduction
- Reduce the sampling overhead for a given fraction by reindexing files with weight ranges that intersect with the sampling fraction
- The idea is to remove the offsets by reindexing these files, and consequently reducing the overhead
Cube De-fragmentation
Reduce cube fragmentation by re-indexing files from consecutive streaming ingestions Blocks from a given cube are written in different files as data are added in different instances, and Rollup groups the small blocks into single files.
This causes cubes to be fragmented, making files have poor min/max as blocks from different regions are grouped.
We must select a list of files to re-index such that their elements end up in a few cubes, each with enough records. Otherwise, rollup will merge them into a few files again. The files must be selected so the resulting cubes are large.
All these options require more development time and study. Currently, we are selecting files with the largest number of blocks.
Delta Optimizations
Vacuum
Removes files not referenced by the _delta_log.
We can specify the retention period in hours and force vacuum by disabling the retention period check if needed.
Optimization Event
We generate an optimization event to measure the changes to the index.
Events are written to <tablePath>/_qbeast/insights/optimization/events/
The following is the schema:
root
|-- optimization_name: string (nullable = true)
|-- write_id: string (nullable = true)
|-- table_id: string (nullable = true)
|-- revision_id: long (nullable = true)
|-- started_at: long (nullable = true)
|-- finished_at: long (nullable = true)
|-- input_files_stats: struct (nullable = true)
| |-- bytes: long (nullable = true)
| |-- cubes: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- elementCount: long (nullable = true)
| |-- files: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- numCubes: long (nullable = true)
| |-- numFiles: long (nullable = true)
|-- output_files_stats: struct (nullable = true)
| |-- bytes: long (nullable = true)
| |-- cubes: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- elementCount: long (nullable = true)
| |-- files: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- numCubes: long (nullable = true)
| |-- numFiles: long (nullable = true)
|-- process_time_ms: long (nullable = true)
|-- cube_defragmentation_stats: struct (nullable = true)
| |-- average_cube_count_per_file_after: double (nullable = true)
| |-- average_cube_count_per_file_before: double (nullable = true)
| |-- average_cube_count_per_file_reduction: double (nullable = true)
| |-- average_block_element_count_per_file_after: double (nullable = true)
| |-- average_block_element_count_per_file_before: double (nullable = true)
| |-- average_block_element_count_per_file_increase: double (nullable = true)
|-- sampling_overhead_reduction_stats: struct (nullable = true)
| |-- fraction: double (nullable = true)
| |-- table_bytes_before: long (nullable = true)
| |-- table_bytes_after: long (nullable = true)
| |-- sampled_bytes_before: long (nullable = true)
| |-- sampled_bytes_after: long (nullable = true)
| |-- sampled_fraction_before: double (nullable = true)
| |-- sampled_fraction_after: double (nullable = true)
| |-- bytes_to_optimize: long (nullable = true)
| |-- optimized_bytes: long (nullable = true)
How to launch?
You can launch Optimization from the CLI (spark-submit) or interactively from the Scala shell.
Prepare the environment by building the project and launching the Spark shell with the required packages.
cd core
sbt assembly
export SPARK_350=~/spark-3.5.0-bin-hadoop3/
$SPARK_350/bin/spark-shell \
--packages io.qbeast:qbeast-spark_2.12:0.6.0,io.delta:delta-spark_2.12:3.1.0 \
--jars ./target/scala-2.12/use-core-assembly-0.0.1.jar \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog Sampling Reduction
First, copy the dataset into a tmp directory.
cp -R ./core/src/it/resources/test-with-offsets/ /tmp/test-with-offsets-sampling/Scala
import io.qbeast.use.managed.index._
// Initialize the configuration
val config = SamplingOverheadReductionConfiguration(
tableLocation = "/tmp/test-with-offsets-sampling",
revisionID = None,
fraction = 0.1,
maxOverheadFactor = 1.1)
// Optimize the table
SamplingOverheadReduction.optimize(config)CLI
$SPARK_HOME350/bin/spark-submit \
--packages io.delta:delta-spark_2.12:3.1.0,io.qbeast:qbeast-spark_2.12:0.6.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \
--conf spark.driver.memory=10G \
--conf spark.driver.extraJavaOptions="-Dlog4j.configuration=~/Developer/log4j3.properties" \
--conf spark.executor.extraJavaOptions="-Dlog4j.configuration=~/Developer/log4j3.properties" \
--files ~/Developer/log4j3.properties \
--class "io.qbeast.use.managed.cli.SamplingOverheadReductionCLI" \
~/Developer/qbeast/USE/core/target/scala-2.12/use-core_2.12-0.0.1.jar \
--table-location="/tmp/test-with-offsets-sampling" \
--revision-id=1 \
--sampling-fraction=0.1 \
--max-overhead-factor=4Cube DeFragmentation
First, copy the dataset into a tmp directory.
cp -R ./core/src/it/resources/test-with-offsets/ /tmp/test-with-offsets-defrag/Scala
import io.qbeast.use.managed.index._
// Optimize 1 MB of data, the total table size is around 4 MB.
val bytesToOptimize = 1L * 1024L * 1024L
// Initialize the configuration
val config = CubeDefragmentationConfiguration(
tableLocation = "/tmp/test-with-offsets-defrag",
revisionID = None,
revisionRewrite = false,
bytesToOptimize = bytesToOptimize)
// Optimize the table
CubeDefragmentation.optimize(config)CLI
$SPARK_HOME350/bin/spark-submit \
--packages io.delta:delta-spark_2.12:3.1.0,io.qbeast:qbeast-spark_2.12:0.6.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \
--conf spark.driver.memory=10G \
--conf spark.driver.extraJavaOptions="-Dlog4j.configuration=~/Developer/log4j3.properties" \
--conf spark.executor.extraJavaOptions="-Dlog4j.configuration=~/Developer/log4j3.properties" \
--files ~/Developer/log4j3.properties \
--class "io.qbeast.use.managed.cli.CubeDefragmentationCLI" \
./target/scala-2.12/use-core-assembly-0.0.1.jar \
--table-location="/tmp/test-with-offsets-defrag" \
--revision-id=1 \
--revision-rewrite=false \
--bytes-to-optimize=50000000DeltaVacuum
First, copy the dataset into a tmp directory.
cp -R ./core/src/it/resources/test-vacuum/ /tmp/test-vacuum/Scala
import io.qbeast.use.managed.index._
// Initialize the configuration
val config =
DeltaVacuumConfiguration(tableLocation = "/tmp/test-vacuum", retentionHours = 0, forceVacuum = true)
// Optimize the table
DeltaVacuum.optimize(config)CLI
$SPARK_HOME350/bin/spark-submit \
--packages io.delta:delta-spark_2.12:3.1.0,io.qbeast:qbeast-spark_2.12:0.6.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \
--conf spark.driver.log.level=WARN \
--conf spark.executor.log.level=WARN \
--conf spark.driver.extraJavaOptions="-Dlog4j.configuration=~/Developer/log4j3.properties" \
--conf spark.executor.extraJavaOptions="-Dlog4j.configuration=~/Developer/log4j3.properties" \
--files ~/Developer/log4j3.properties \
--class "io.qbeast.use.managed.cli.DeltaVacuumCLI" \
~/Developer/qbeast/USE/core/target/scala-2.12/use-core_2.12-0.0.1.jar \
--table-location="/tmp/test-vacuum" \
--retention-hours=0 \
--force-vacuum=true