CONTINOUS OPTIMIZATION

What is Continuous Optimization?

Continous Optimization is the Optimization process for a qbeast table that continuously selects files to optimize.

The previous defragmentation procedure does not guarantee conversion as it selects the most fragmented files regardless of block overlap.

Level-wise Defragmentation

When optimizing, the level-wise approach introduced here ensures an increase of at least one order of magnitude in terms of elementCount for at least one file.

  • Place files in different levels, with the level defined as log10 of its largest elementCount. We characterize a file by its largest block, and the file level is then log10 of its elementCount.
def computeFileLevel(file: IndexFile): Int = {
  val maxBlock = file.blocks.maxBy(_.elementCount)
  val level = math.log10(maxBlock.elementCount)
  math.floor(level)
}

In this way, we place files in the following elementCount ranges:

levelelementCount range
0[1, 9]
1[10, 99]
2[100, 999]
3[10.00, 9.999]
4[10.000, 99.999]
  • For all files within each level, we group their blocks by the CubeId and compute their total elementCount. Block groups with total elementCounts below the next level threshold are discarded. For a block group in level 1, it has to have at least math.pow(10, 2) = 100 elements to qualify.

  • We then order all block groups from the lowest file level by CubeId.depth and take their files iteratively until the optimization size threshold is obtained. We prioritize the blocks from the lower levels as their offsets propagate downward and can be optimized in future iterations. Minimizing the level serves the same purpose - we create at least one file that goes to a higher level and can then be optimized to avoid level starvation.

The experiments show that this approach optimizes the index iteratively and can reach a quasi-optimal state. The bytes we optimize in each iteration can affect the number of files left in sub-optimal states - the larger the optimization size, the better the results.

Sampling Overhead Reduction

  • Compute all files with overhead. A file is an overhead file when
  • It has an optimizable block, i.e., the sampling fraction falls within the block weight range, and parts of this overhead are offsets. For example, f=0.15, block weight range = [0.1, 0.2], cube max weight = 0.15; the overhead from 0.15 to 0.2 can be removed here.
  • It has a combination of sampled and non-sampled blocks. As many blocks are written in a single file, some non-sampled blocks may be read. For example, f=0.15, file blocks: c1:[0.1, 0.2], c2:[0.2, 0.3]; here c1 and c2 are from the same file but only c1 is sampled.
  • Remove bytesPerIter bytes from all overhead files and optimize them; the process is done until all files are optimized. The larger the bytesPerIter, the better the overhead reduction.

  • Using a small value for bytesPerIter may render the execution futile.

OptimizationEvents

OptimizationEvents are written with PrecommitHooks as part of qbeast-spark.

Operation-specific event fields are removed; we only rely on FileStats.

case class OptimizationEvent(
    optimization_name: String,
    write_id: String,
    table_id: String,
    revision_id: Long = -1L,
    started_at: Long,
    finished_at: Long,
    process_time_ms: Long = -1L,
    input_files_stats: Option[FileStats] = None,
    output_files_stats: Option[FileStats] = None)
 
case class FileStats(
    files: Seq[String],
    file_levels: Seq[Long],
    element_count: Long,
    bytes: Long,
    num_files: Long,
    num_cubes: Long,
    num_blocks: Long,
    avg_num_blocks_per_file: Double,
    avg_block_element_count_per_file: Double)

Usage

  • Run sbt package for USE
  • Upload the jar to S3 and download it to your master machine on qbeast-cloud
  • Copy the jar to /opt/spark/jars
  • Run:
spark-submit \
--master "local[4]" \
--class "io.qbeast.use.managed.cli.CubeDefragmentationCLI" \
$PATH_TO_USE_JAR \
--table-location=$TABLE_PATH \
--revision-id=$REVISION_ID \
--bytes-per-iter=$BYTES_PER_ITER \
--revision-rewrite=$REVISION_REWRITE

How to set Continuous Optimization on a Table?

Automatic Optimization

First, set up the interval in which the optimization is triggered, use the following configuration:

--conf spark.qbeast.use.optimization-interval="10 minutes"

To configure the table to run Optimization Continuously, you should configure the underlying table with the type of optimization and arguments (OPTIONAL):

ALTER TABLE test SET TBLPROPERTIES('use.optimization.type'='CubeDefragmentation','use.optimization.bytes-per-iter'='5000000')

To unset the Optimization, we should:

ALTER TABLE test UNSET TBLPROPERTIES('use.optimization.type','use.optimization.bytes-per-iter')

The valid arguments and types for the moment are:

Configuration ParameterDescriptionDefault Value
use.optimization.typeThe type of optimization to run. The only valid value is CubeDefragmentation.-
use.optimization.bytes-per-iterThe number of bytes to optimize in each iteration.1000000000 (1GB)
use.optimization.revision-rewriteIf set to true, the optimization will rewrite the revision.false
use.optimization.revision-idThe revision to optimize.Latest Revision

You check the properties of a table with:

SHOW TBLPROPERTIES test

Manual optimization

To run a manual optimization without automatizing anything in the USE backend, you can run the following command:

ALTER TABLE use_catalog.spark_catalog.default.test SET TBLPROPERTIES('use.optimization.type'='CubeDefragmentation','use.optimization.bytes-per-iter'='5000000')
  • This command would not change any of the optimization configurations in the TBLPROPERTIES, it is just to trigger a manual optimization for the table spark_catalog.default.test.
  • This command would be ignored if:
    • an optimization is already running
    • the Table has Continous Optimization configured.

How it is executed?

  • When the application starts, the MetastoreChangeListener is configured to detect any Metastore changes (new tables, altered properties...). That ManagedTablesRegistry is checking the changes reflected by the listener and transforming them into actions. (Start and Stop Ingestion/Consumption Insights, start/stop optimization...)
  • If a Table has the optimization configured, it will emit a StartOptimization action.
  • The StartOptimization action would be captured by the StreamingInit, which runs every 30000 milliseconds.
  • Once a StartOptimization action is caught, the OptimizationChangeListener would start a continuous thread that would run every spark.qbeast.use.optimization-interval.
  • The ContinousOptimization code would continuously check for files to optimize within the configurable arguments (bytes per iteration...)
  • The MetastoreChangeListener would send a StopOptimization action over a table once the UNSET TBLPROPERTIES command is executed or a change on the table is detected.
  • The StopOptimization would wait for the thread to finish and stop any current optimization.

What happens if...?

  • If the optimization parameters change
    • If an optimization is running, we will wait for the end of the execution and start a new scheduling with the updated configuration.
    • If no optimization is running: stop the current and start a new one with the updated configuration.
  • If the optimization is triggered manually:
    • If an optimization is already running, the manual execution would be ignored.
    • If no other optimization is in place, it will run.

Optimization Insights

To check which optimization have been executed in a table:

SELECT * FROM use_catalog.spark_catalog.default.test.optimizations

This would ouput the one row for each Optimize Operation on the table with the schema of an OptimizationEvent.

To check which tables are being optimized (or at least, scheduled for optimization), we can issue this query:

SELECT * FROM use_catalog.system_lake_tables.metastore_summary WHERE table_properties['use.optimization.type'] is not null