OPTIMIZATION

As part of the Managed Tables features for USE, we are implementing a set of optimizations to improve the performance of the index.

Sampling overhead reduction

  • Reduce the sampling overhead for a given fraction by reindexing files with weight ranges that intersect with the sampling fraction
  • The idea is to remove the offsets by reindexing these files, and consequently reducing the overhead

Cube De-fragmentation

Reduce cube fragmentation by re-indexing files from consecutive streaming ingestions Blocks from a given cube are written in different files as data are added in different instances, and Rollup groups the small blocks into single files.

This causes cubes to be fragmented, making files have poor min/max as blocks from different regions are grouped.

We must select a list of files to re-index such that their elements end up in a few cubes, each with enough records. Otherwise, rollup will merge them into a few files again. The files must be selected so the resulting cubes are large.

All these options require more development time and study. Currently, we are selecting files with the largest number of blocks.

Delta Optimizations

Vacuum

Removes files not referenced by the _delta_log. We can specify the retention period in hours and force vacuum by disabling the retention period check if needed.

Optimization Event

We generate an optimization event to measure the changes to the index.

Events are written to <tablePath>/_qbeast/insights/optimization/events/

The following is the schema:

root
 |-- optimization_name: string (nullable = true)
 |-- write_id: string (nullable = true)
 |-- table_id: string (nullable = true)
 |-- revision_id: long (nullable = true)
 |-- started_at: long (nullable = true)
 |-- finished_at: long (nullable = true)
 |-- input_files_stats: struct (nullable = true)
 |    |-- bytes: long (nullable = true)
 |    |-- cubes: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- elementCount: long (nullable = true)
 |    |-- files: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- numCubes: long (nullable = true)
 |    |-- numFiles: long (nullable = true)
 |-- output_files_stats: struct (nullable = true)
 |    |-- bytes: long (nullable = true)
 |    |-- cubes: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- elementCount: long (nullable = true)
 |    |-- files: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- numCubes: long (nullable = true)
 |    |-- numFiles: long (nullable = true)
 |-- process_time_ms: long (nullable = true)
 |-- cube_defragmentation_stats: struct (nullable = true)
 |    |-- average_cube_count_per_file_after: double (nullable = true)
 |    |-- average_cube_count_per_file_before: double (nullable = true)
 |    |-- average_cube_count_per_file_reduction: double (nullable = true)
 |    |-- average_block_element_count_per_file_after: double (nullable = true)
 |    |-- average_block_element_count_per_file_before: double (nullable = true)
 |    |-- average_block_element_count_per_file_increase: double (nullable = true)
 |-- sampling_overhead_reduction_stats: struct (nullable = true)
 |    |-- fraction: double (nullable = true)
 |    |-- table_bytes_before: long (nullable = true)
 |    |-- table_bytes_after: long (nullable = true)
 |    |-- sampled_bytes_before: long (nullable = true)
 |    |-- sampled_bytes_after: long (nullable = true)
 |    |-- sampled_fraction_before: double (nullable = true)
 |    |-- sampled_fraction_after: double (nullable = true)
 |    |-- bytes_to_optimize: long (nullable = true)
 |    |-- optimized_bytes: long (nullable = true)

How to launch?

You can launch Optimization from the CLI (spark-submit) or interactively from the Scala shell.

Prepare the environment by building the project and launching the Spark shell with the required packages.

cd core
 
sbt assembly
 
export SPARK_350=~/spark-3.5.0-bin-hadoop3/
 
$SPARK_350/bin/spark-shell \
--packages io.qbeast:qbeast-spark_2.12:0.6.0,io.delta:delta-spark_2.12:3.1.0 \
--jars ./target/scala-2.12/use-core-assembly-0.0.1.jar \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog 

Sampling Reduction

First, copy the dataset into a tmp directory.

cp -R ./core/src/it/resources/test-with-offsets/ /tmp/test-with-offsets-sampling/

Scala

import io.qbeast.use.managed.index._
 
// Initialize the configuration
val config = SamplingOverheadReductionConfiguration(
    tableLocation = "/tmp/test-with-offsets-sampling",
    revisionID = None,
    fraction = 0.1,
    maxOverheadFactor = 1.1)
// Optimize the table
SamplingOverheadReduction.optimize(config)

CLI

$SPARK_HOME350/bin/spark-submit \
--packages io.delta:delta-spark_2.12:3.1.0,io.qbeast:qbeast-spark_2.12:0.6.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \
--conf spark.driver.memory=10G \
--conf spark.driver.extraJavaOptions="-Dlog4j.configuration=~/Developer/log4j3.properties" \
--conf spark.executor.extraJavaOptions="-Dlog4j.configuration=~/Developer/log4j3.properties" \
--files ~/Developer/log4j3.properties \
--class "io.qbeast.use.managed.cli.SamplingOverheadReductionCLI" \
~/Developer/qbeast/USE/core/target/scala-2.12/use-core_2.12-0.0.1.jar \
--table-location="/tmp/test-with-offsets-sampling" \
--revision-id=1 \
--sampling-fraction=0.1 \
--max-overhead-factor=4

Cube DeFragmentation

First, copy the dataset into a tmp directory.

cp -R ./core/src/it/resources/test-with-offsets/ /tmp/test-with-offsets-defrag/

Scala

import io.qbeast.use.managed.index._
 
// Optimize 1 MB of data, the total table size is around 4 MB.
val bytesToOptimize = 1L * 1024L * 1024L
// Initialize the configuration
val config = CubeDefragmentationConfiguration(
  tableLocation = "/tmp/test-with-offsets-defrag",
  revisionID = None,
  revisionRewrite = false,
  bytesToOptimize = bytesToOptimize)
// Optimize the table
CubeDefragmentation.optimize(config)

CLI

$SPARK_HOME350/bin/spark-submit \
--packages io.delta:delta-spark_2.12:3.1.0,io.qbeast:qbeast-spark_2.12:0.6.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \
--conf spark.driver.memory=10G \
--conf spark.driver.extraJavaOptions="-Dlog4j.configuration=~/Developer/log4j3.properties" \
--conf spark.executor.extraJavaOptions="-Dlog4j.configuration=~/Developer/log4j3.properties" \
--files ~/Developer/log4j3.properties \
--class "io.qbeast.use.managed.cli.CubeDefragmentationCLI" \
./target/scala-2.12/use-core-assembly-0.0.1.jar \
--table-location="/tmp/test-with-offsets-defrag" \
--revision-id=1 \
--revision-rewrite=false \
--bytes-to-optimize=50000000

DeltaVacuum

First, copy the dataset into a tmp directory.

cp -R ./core/src/it/resources/test-vacuum/ /tmp/test-vacuum/

Scala

import io.qbeast.use.managed.index._
 
// Initialize the configuration
val config =
  DeltaVacuumConfiguration(tableLocation = "/tmp/test-vacuum", retentionHours = 0, forceVacuum = true)
// Optimize the table
DeltaVacuum.optimize(config)

CLI

$SPARK_HOME350/bin/spark-submit \
--packages io.delta:delta-spark_2.12:3.1.0,io.qbeast:qbeast-spark_2.12:0.6.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \
--conf spark.driver.log.level=WARN \
--conf spark.executor.log.level=WARN \
--conf spark.driver.extraJavaOptions="-Dlog4j.configuration=~/Developer/log4j3.properties" \
--conf spark.executor.extraJavaOptions="-Dlog4j.configuration=~/Developer/log4j3.properties" \
--files ~/Developer/log4j3.properties \
--class "io.qbeast.use.managed.cli.DeltaVacuumCLI" \
~/Developer/qbeast/USE/core/target/scala-2.12/use-core_2.12-0.0.1.jar \
--table-location="/tmp/test-vacuum" \
--retention-hours=0 \
--force-vacuum=true