Data partitioning in Kusto (Azure Data Explorer)
Last modified: 12/31/2020
By default, tables in Kusto are partitioned according to the time at which data is ingested. In the majority of use cases, there is no need to change that, unlike in other technologies, in which data partitioning is necessary in many cases, to reach better performance.
In specific cases, it is possible and recommended to define a Data partitioning policy, to reach improved performance at query time.
How data partitioning works
- When data is ingested into a table in a Kusto database, data shards (extents) are created.
- To maintain query efficiency, smaller extents are merged into larger ones.
- That is done automatically, as a background process.
- It reduces the management overhead of having a large number of extents to track.
- It also allows Kusto to optimize its indexes and improve compression.
- When a partitioning policy is defined on a table, extents participate in another background process after they’re created (ingested), and before they are merged.
- This process re-ingests the data from the source extents and creates homogeneous extents.
- In these, all values in the column that is the table’s partition key belong to the same partition.
- Extents that belong to different partitions do not get merged together.
- Because extents are not merged before they are homogeneous, it is important to make sure the cluster’s maximum capacity for partitioning and for merges are balanced, so that:
- The rate of creating new extents isn’t significantly higher than the rate of merging them.
- The overall number of extent in the database/cluster doesn’t keep growing significantly over time.
- Because the partitioning process requires reading and re-ingesting the data again, the process is expected to have a continuous overhead on the cluster’s resources utilization.
- In certain cases, this means that increasing the size of the cluster would be required.
When to use data partitioning
There are very specific scenarios in which the benefits of partitioning a table outweigh the overhead of resources being continuously invested in the process. In such cases, the upside is expected to be significant, and improve query performance several times. In other cases, however, it can do more damage than good.
Filtering
Homogeneous extents include metadata on their partition values, that allows Kusto’s query planner to filter out data shards without having to perform an index lookup or scanning the data. As result, a significant reduction in resources required to serve queries is expected, when a query includes an equality filter on the partition key.
Example
- Table
T
has astring
-typed column namedTenantId
, which represents a unique identifier for a tenant that the data in the table belongs to. T
includes data for multiple tenants, say10,000
or more.- The majority of queries running against
T
use equality filters (==
orin()
) on theTenantId
column.- For example:
T | where Timestamp > ago(30d) | where TenantId == 'f83a65e0-da2b-42be-b59b-a8e25ea3954c' // <-- | where Severity == "Error" | summarize count() by bin(Timestamp, 1d) | render timechart
- The value
f83a65e0-da2b-42be-b59b-a8e25ea3954c
belongs to a single partition, out of the maximum number of partitions defined in the policy (for example: partition number10
out of a total of256
).
- For example:
- The filter on
TenantId
is highly efficient, as it allows Kusto’s query planner to filter out any extents that belongs to partitions that aren’t partition number10
.- Assuming equal-distribution of data across tenants in
T
, that means we’re left with 1/256 (~0.39%) of the extents, even before evaluating the datetime pre-filter and leveraging the default index onTenantId
.
- Assuming equal-distribution of data across tenants in
- When the amount of concurrent queries is higher (dozens or more), the benefit increases significantly - as each such query consumes less resources.
- In this sample use case, it’s appropriate to set the partition assignment mode for the hash partition key to
Uniform
.
Joins / aggregations
When a table has a hash partition key defined in its partitioning policy, there are two different assignment modes:
Default
: All homogeneous extents that belong to the same partition are assigned to the same node in the cluster.Uniform
: Extents’ partition values are not taken into account when assigning extents uniformly to the cluster’s nodes.
Choosing the Default
mode makes sense when the cardinality of the hash partition key is very high, partitions are expected to be ~equally-sized, and the query pattern uses the shuffle strategy often.
- Queries that use the
shuffle
strategy, and in which the shuffle key used injoin
,summarize
ormake-series
is the table’s hash partition key - are expected to perform better. This is because the amount of data required to move across the cluster’s nodes is significantly reduced.
Example
- Table
T
has astring
-typed column namedDeviceId
, which represents a unique identifier for a device that the data in the table was sent from. - 10s of millions of such devices are spread across the world, and they all emit a similar amount of metrics per unit of time.
- Queries running against
T
aggregate overDeviceId
using theshuffle
strategy, withDeviceId
as the shuffle key.- For example:
T | where Timestamp > ago(1d) | summarize hint.shufflekey = DeviceId arg_max(Temperature, RelativeHumidity) by DeviceId // <-- | where RelativeHumidity > 75 | count
- For example:
Backfill or unordered ingestion
In many systems, data is ingested close to when it was generated at its source. More than that - data is ingested in-order, e.g. data from 10 minutes ago is ingested after data from 15 minutes ago. There are some cases, however, in which the patterns are different - these are the ones referred to in this section.
Example
- Data can be spread across source files according to a different partition key, and not according to time.
- For example: A single file may include all records for a single division, however those records span a period of 3 years.
- This data includes a datetime column named
record_timestamp
and is ingested into tableT
. - When this data is ingested into
T
, the creation time of the result extents is set to the time of ingestion, and has no relationship with the datetime values in the dataset.- As the source files are mixed, the ingestor can’t solve this by specifying the CreationTime property at ingestion time.
- Data retention and caching policies act according to the extents’ creation times. A different way is required for overriding it, based on the values of
records_timestamp
. - In this case, setting
record_timestamp
as the uniform range datetime partition key ofT
, while specifying theOverrideCreationTime
property astrue
will have the cluster re-partition the data and organize the records - And, if the table has an effective caching policy of 30 days, any records older than that period will not be cached.
Data compression
Data partitioning results with similar values of the partition key ending up in the same data shards. In some cases, data in other columns behaves similarly, as values sent from the same device or tenant have similar characteristics, compared to those sent from others.
As a result, in many cases the compression ratio of such columns, and not only the partition key, improves. Meaning - less storage is required to store the data, and a potential cost reduction is achievable.
As different data sets may have different characteristics - YMMV.
When not to use data partitioning
Any other case which doesn’t meet the criteria mentioned above will not benefit, and may be harmed by enabling data partitioning.
Specifically, don’t use data partitioning when:
- You don’t frequently use equality filters or join/aggregate on the chosen hash partition key.
- Your data is unevenly distributed across the partitions defined by your policy, and you want each partition to be assigned to a single node.
- If data is already ingested into a non-partitioned table in Kusto, an easy way to check this is by running a query such as the following, and making sure the values under the
count_
column are ~even:T datascope=hotcache | summarize count() by p = hash(my_column, 256) | order by count_ desc
- If data is already ingested into a non-partitioned table in Kusto, an easy way to check this is by running a query such as the following, and making sure the values under the
- The cardinality of the chosen hash partition key is low.
- For example: Table
T
has a columnSeverity
with only 5 unique values (“Verbose”, “Info”, “Warning”, “Error”, “Critical”), even if you frequently filter on this column.
- For example: Table
- You have many outliers in your data, that will result with a large amount of small extents.
- For example: you defined a uniform range datetime partition key, with a range of 1 day, but there are many records with ‘bad’ datetime values in the column from the distant past/future (e.g.
0003-12-31
and3020-07-11
).
- For example: you defined a uniform range datetime partition key, with a range of 1 day, but there are many records with ‘bad’ datetime values in the column from the distant past/future (e.g.
More importantly - make sure you:
- Follow the guidelines in the documentation
- Monitor & measure the impact of the policies you set on the cluster.