CONTINOUS OPTIMIZATION
What is Continuous Optimization?
Continous Optimization is the Optimization process for a qbeast table that continuously selects files to optimize.
The previous defragmentation procedure does not guarantee conversion as it selects the most fragmented files regardless of block overlap.
Level-wise Defragmentation
When optimizing, the level-wise approach introduced here ensures an increase of at least one order of magnitude in terms of elementCount for at least one file.
- Place files in different levels, with the
leveldefined aslog10of its largestelementCount.We characterize a file by its largestblock,and the file level is thenlog10of itselementCount.
def computeFileLevel(file: IndexFile): Int = {
val maxBlock = file.blocks.maxBy(_.elementCount)
val level = math.log10(maxBlock.elementCount)
math.floor(level)
}
In this way, we place files in the following elementCount ranges:
| level | elementCount range |
|---|---|
| 0 | [1, 9] |
| 1 | [10, 99] |
| 2 | [100, 999] |
| 3 | [10.00, 9.999] |
| 4 | [10.000, 99.999] |
-
For all files within each level, we group their blocks by the
CubeIdand compute their totalelementCount.Block groups with totalelementCountsbelow the next level threshold are discarded. For a block group in level 1, it has to have at leastmath.pow(10, 2) = 100elements to qualify. -
We then order all block groups from the lowest file level by
CubeId.depthand take their files iteratively until the optimization size threshold is obtained. We prioritize the blocks from the lower levels as their offsets propagate downward and can be optimized in future iterations. Minimizing the level serves the same purpose - we create at least one file that goes to a higher level and can then be optimized to avoid level starvation.
The experiments show that this approach optimizes the index iteratively and can reach a quasi-optimal state. The bytes we optimize in each iteration can affect the number of files left in sub-optimal states - the larger the optimization size, the better the results.
Sampling Overhead Reduction
- Compute all files with overhead. A file is an overhead file when
- It has an optimizable block, i.e., the sampling fraction falls within the block weight range, and parts of this overhead are offsets. For example, f=0.15, block weight range = [0.1, 0.2], cube max weight = 0.15; the overhead from 0.15 to 0.2 can be removed here.
- It has a combination of sampled and non-sampled blocks. As many blocks are written in a single file, some non-sampled blocks may be read. For example, f=0.15, file blocks: c1:[0.1, 0.2], c2:[0.2, 0.3]; here c1 and c2 are from the same file but only c1 is sampled.
-
Remove
bytesPerIterbytes from all overhead files and optimize them; the process is done until all files are optimized. The larger thebytesPerIter,the better the overhead reduction. -
Using a small value for
bytesPerItermay render the execution futile.
OptimizationEvents
OptimizationEvents are written with PrecommitHooks as part of qbeast-spark.
Operation-specific event fields are removed; we only rely on FileStats.
case class OptimizationEvent(
optimization_name: String,
write_id: String,
table_id: String,
revision_id: Long = -1L,
started_at: Long,
finished_at: Long,
process_time_ms: Long = -1L,
input_files_stats: Option[FileStats] = None,
output_files_stats: Option[FileStats] = None)
case class FileStats(
files: Seq[String],
file_levels: Seq[Long],
element_count: Long,
bytes: Long,
num_files: Long,
num_cubes: Long,
num_blocks: Long,
avg_num_blocks_per_file: Double,
avg_block_element_count_per_file: Double)Usage
- Run
sbt packagefor USE - Upload the jar to S3 and download it to your master machine on qbeast-cloud
- Copy the jar to
/opt/spark/jars - Run:
spark-submit \
--master "local[4]" \
--class "io.qbeast.use.managed.cli.CubeDefragmentationCLI" \
$PATH_TO_USE_JAR \
--table-location=$TABLE_PATH \
--revision-id=$REVISION_ID \
--bytes-per-iter=$BYTES_PER_ITER \
--revision-rewrite=$REVISION_REWRITEHow to set Continuous Optimization on a Table?
Automatic Optimization
First, set up the interval in which the optimization is triggered, use the following configuration:
--conf spark.qbeast.use.optimization-interval="10 minutes"To configure the table to run Optimization Continuously, you should configure the underlying table with the type of optimization and arguments (OPTIONAL):
ALTER TABLE test SET TBLPROPERTIES('use.optimization.type'='CubeDefragmentation','use.optimization.bytes-per-iter'='5000000')To unset the Optimization, we should:
ALTER TABLE test UNSET TBLPROPERTIES('use.optimization.type','use.optimization.bytes-per-iter')The valid arguments and types for the moment are:
| Configuration Parameter | Description | Default Value |
|---|---|---|
use.optimization.type | The type of optimization to run. The only valid value is CubeDefragmentation. | - |
use.optimization.bytes-per-iter | The number of bytes to optimize in each iteration. | 1000000000 (1GB) |
use.optimization.revision-rewrite | If set to true, the optimization will rewrite the revision. | false |
use.optimization.revision-id | The revision to optimize. | Latest Revision |
You check the properties of a table with:
SHOW TBLPROPERTIES testManual optimization
To run a manual optimization without automatizing anything in the USE backend, you can run the following command:
ALTER TABLE use_catalog.spark_catalog.default.test SET TBLPROPERTIES('use.optimization.type'='CubeDefragmentation','use.optimization.bytes-per-iter'='5000000')- This command would not change any of the optimization configurations in the TBLPROPERTIES, it is just to trigger a manual optimization for the table
spark_catalog.default.test. - This command would be ignored if:
- an optimization is already running
- the Table has Continous Optimization configured.
How it is executed?
- When the application starts, the
MetastoreChangeListeneris configured to detect any Metastore changes (new tables, altered properties...). ThatManagedTablesRegistryis checking the changes reflected by the listener and transforming them into actions. (Start and Stop Ingestion/Consumption Insights, start/stop optimization...) - If a Table has the optimization configured, it will emit a
StartOptimizationaction. - The
StartOptimizationaction would be captured by theStreamingInit, which runs every 30000 milliseconds. - Once a
StartOptimizationaction is caught, theOptimizationChangeListenerwould start a continuous thread that would run everyspark.qbeast.use.optimization-interval. - The ContinousOptimization code would continuously check for files to optimize within the configurable arguments (bytes per iteration...)
- The
MetastoreChangeListenerwould send aStopOptimizationaction over a table once theUNSET TBLPROPERTIEScommand is executed or a change on the table is detected. - The
StopOptimizationwould wait for the thread to finish and stop any current optimization.
What happens if...?
- If the optimization parameters change
- If an optimization is running, we will wait for the end of the execution and start a new scheduling with the updated configuration.
- If no optimization is running: stop the current and start a new one with the updated configuration.
- If the optimization is triggered manually:
- If an optimization is already running, the manual execution would be ignored.
- If no other optimization is in place, it will run.
Optimization Insights
To check which optimization have been executed in a table:
SELECT * FROM use_catalog.spark_catalog.default.test.optimizationsThis would ouput the one row for each Optimize Operation on the table with the schema of an OptimizationEvent.
To check which tables are being optimized (or at least, scheduled for optimization), we can issue this query:
SELECT * FROM use_catalog.system_lake_tables.metastore_summary WHERE table_properties['use.optimization.type'] is not null