Optimizing Tables
Table Optimization Guide
The Universal Storage Engine (USE) includes two main optimization procedures to manage data layout and table metadata. This guide covers the RevisionOptimization procedure for improving query performance and the DeltaVacuum procedure for cleaning up unused files.
RevisionOptimization can be triggered periodically and automatically selects files to re-index for improved performance. DeltaVacuum triggers the delta Vacuum to remove files that are no longer referenced by the _delta_log
.
RevisionOptimization
The goal of this optimization process is to reduce the discrepancy between the actual data layout and the one expected by the OTree index.
A qbeast table is generally composed of one or more Revisions, each representing an OTree index with a set of parameters such as column transformers and/or cube size. Each file from the table is assigned to a Revision, and files written with the underlying table format are assigned to the unindexed Revision (RevisionID = 0).
RevisionOptimization is designed to optimize different Revisions (OTrees) separately to reduce the layout-index discrepancy (except the unindexed revision).
Continuous Optimization Process
RevisionOptimizations are executed iteratively or continuously. Once triggered, the process will keep running until the user stops it or when there are no more files to optimize.
In each iteration, a sequence of ordered FileSelectionStrategies
is used to select files to optimize, and the first one that returns files is used for the optimization. Users can select the file selection strategies and their ordering via configuration.
def optimize(): Unit = {
while (continue) {
val filesToOptimize = selectFiles()
if (filesToOptimize.isEmpty) continue = false
else optimizeFiles(filesToOptimize)
}
}
def selectFiles(): Seq[String] = {
var files = Seq.empty[String]
for (strategy <- fileSelectionStrategies) {
if (files.isEmpty) {
files = strategy.selectFiles()
}
}
}
File Selection Strategies
The file selection strategies are used to select files to optimize.
When the targeted Revision is indexed, the process will select files to reduce the cube fragmentation (LeveledCompactionSelection
). When the Revision is not indexed, the process will select files to index them by adding them to the latest revision (UnindexedFilesSelection
).
Leveled Compaction
As more and more files are added to a Revision, the cube fragmentation increases, and the performance of the queries degrades.
Each time new data are added to a cube, we create offsets in the existing files. If the append is relatively small compared to the size of the existing index, we create many small cubes that are then grouped together by rollup to avoid creating too many small files.
While the index metadata is always updated as we manipulate the index, the data layout could be out of sync with the index.
From a high-level perspective, appends and other table operations increase the number of blocks per cube and per file. The performance can degrade as the number of blocks increases, and the layout of the data becomes more fragmented. The LeveledCompactionSelection
strategy is designed to reduce the number of blocks per file and cube by selecting files to optimize based on the blocks they contain, and it does so in a leveled manner to avoid large data amplification.
Here is how it works:
Classify Files by Levels
Place files in different levels, with the level
defined as log10
of its largest elementCount
:
def computeFileLevel(file: IndexFile): Int = {
val maxBlock = file.blocks.maxBy(_.elementCount)
val level = math.log10(maxBlock.elementCount)
math.floor(level)
}
In this way, we place files in the following elementCount ranges:
Level | ElementCount Range |
---|---|
0 | [1, 9] |
1 | [10, 99] |
2 | [100, 999] |
3 | [1,000, 9,999] |
4 | [10,000, 99,999] |
Create Block Groups for Each Level
For all files within each level, we group their blocks by the CubeId
and compute their total elementCount
. Block groups with total elementCounts
below the next level threshold are discarded. For a block group in level 1, it has to have at least math.pow(10, 2) = 100
elements to qualify.
Order and Select Block Groups
The remaining block groups are ordered by their level, the depth of their CubeId, and the number of blocks they contain, all in ascending order. The first block group is then selected for optimization.
We prioritize the cubes from the lower levels because their offsets propagate downward and can be optimized in future iterations. Minimizing the level serves the same purpose—we create at least one file that will be sent to a higher level and can then be optimized later to avoid level starvation.
This strategy is used by default for the latest indexed revision if revisionID
is not specified.
The experiments show that this approach optimizes the index iteratively and can reach a quasi-optimal state.
Unindexed Files Selection
This file selection strategy is used only for the unindexed revision (RevisionID = 0). This revision, as the name suggests, is not indexed, and the optimization process will add them to the latest revision. Files are ordered ascendingly by modification time and the first batchBytes
bytes of files are selected for optimization. Their data are added (indexed) to the latest revision; a new revision can be created if the data optimized escape the space of the latest revision.
This strategy is used and prioritized by default.
Configuration Options
The following table describes all available configuration options for optimization:
Configuration Key | Description | Default Value |
---|---|---|
use.optimization.autoOptimize.enabled | Enable or disable auto-optimization | Not set |
use.optimization.autoOptimize.revisionId | Revision ID for leveled compaction | None (by default it optimizes the latest one) |
use.optimization.autoOptimize.optimizeUnindexedFiles.enabled | Enable or disable optimization for the unindexed revision | true (applicable if auto-optimization is enabled) |
use.optimization.autoOptimize.optimizeUnindexedFiles.prioritize | Whether to prioritize the optimization of unindexed files | true |
use.optimization.autoOptimize.optimizeUnindexedFiles.batchBytes | The batch size for optimizing the unindexed files | 10000000000 (10GB) |
use.optimization.autoOptimize.optimizeUnindexedFiles.fromTimestamp | The modificationTimestamp from which the unindexed files can be optimized | 0 |
Example Configuration
Here's an example configuration that:
- Enables auto-optimization
- Enables optimization for the unindexed revision
- Prioritizes the optimization of unindexed files
- Sets the batch size to 5GB
- Only selects the unindexed files that are modified after
2023-01-01 00:00:00
ALTER TABLE table-name SET TBLPROPERTIES (
'use.optimization.autoOptimize.enabled'='true',
'use.optimization.autoOptimize.optimizeUnindexedFiles.enabled'='true',
'use.optimization.autoOptimize.optimizeUnindexedFiles.prioritize'='true',
'use.optimization.autoOptimize.optimizeUnindexedFiles.batchBytes'='5000000000',
'use.optimization.autoOptimize.optimizeUnindexedFiles.fromTimestamp'='2023-01-01 00:00:00'
);
With this configuration, the auto-optimization will always check for the unindexed files to optimize before using the LeveledCompactionSelection
strategy for an indexed revision.
If there are unindexed files, the ones modified before 2023-01-01 00:00:00
will be ignored, and the rest are ordered by modification time, and the first 5GB of files are selected for optimization.
If there are no unindexed files, the optimization will be performed using the LeveledCompactionSelection
strategy. The revision id is not set in this case, so the latest revision will be optimized.
If you want to optimize the unindexed revision only, you can set the revisionId
to 0 and enable the optimizeUnindexedFiles
property.
Configuring Optimization
Automatic Optimization
Configure auto-optimization for your tables by setting table properties:
ALTER TABLE table_name SET TBLPROPERTIES (
'use.optimization.autoOptimize.enabled'='true',
'use.optimization.autoOptimize.optimizeUnindexedFiles.enabled'='true',
'use.optimization.autoOptimize.optimizeUnindexedFiles.prioritize'='true',
'use.optimization.autoOptimize.optimizeUnindexedFiles.batchBytes'='5000000000',
'use.optimization.autoOptimize.optimizeUnindexedFiles.fromTimestamp'='2023-01-01 00:00:00'
)
Check table properties:
SHOW TBLPROPERTIES test
Unset the auto-optimization:
ALTER TABLE test UNSET TBLPROPERTIES('use.optimization.autoOptimize.enabled')
Manual Trigger
You can trigger optimization manually by running:
ALTER TABLE use_catalog.spark_catalog.default.test SET TBLPROPERTIES(
'use.optimization.type'='LeveledCompaction',
'use.optimization.bytes-per-iter'='5000000'
)
This command will trigger a manual execution for spark_catalog.default.test
. The execution request is ignored when an optimization is already running or the table already has auto-optimization configured.
Delta Vacuum
The Delta Vacuum feature removes files that are no longer referenced by the _delta_log
. You can specify the retention period in hours and force vacuum by disabling the retention period check if needed.
Optimization Insights
Monitor and track your optimization processes using the following queries.
Check Optimization History
To check which optimizations have been executed on a table:
SELECT * FROM use_catalog.spark_catalog.default.test.optimizations
This will output one row for each optimization operation on the table.
Check Scheduled Optimizations
To check which tables are being optimized (or scheduled for optimization):
SELECT * FROM use_catalog.system_lake_tables.metastore_summary
WHERE table_properties['use.optimization.type'] is not null