1 - Kusto partition & compose intermediate aggregation results

Learn how to use the hll() and tdigest() functions to partition and compose intermediate results of aggregations.

Suppose you want to calculate the count of distinct users every day over the last seven days. You can run summarize dcount(user) once a day with a span filtered to the last seven days. This method is inefficient, because each time the calculation is run, there’s a six-day overlap with the previous calculation. You can also calculate an aggregate for each day, and then combine these aggregates. This method requires you to “remember” the last six results, but it’s much more efficient.

Partitioning queries as described is easy for simple aggregates, such as count() and sum(). It can also be useful for complex aggregates, such as dcount() and percentiles(). This article explains how Kusto supports such calculations.

The following examples show how to use hll/tdigest and demonstrate that using these commands is highly performant in some scenarios:

range x from 1 to 1000000 step 1
| summarize hll(x,4)
| project sizeInMb = estimate_data_size(hll_x) / pow(1024,2)

Output

sizeInMb
1.0000524520874

Ingesting this object into a table before applying this kind of policy will ingest null:

.set-or-append MyTable <| range x from 1 to 1000000 step 1
| summarize hll(x,4)
MyTable
| project isempty(hll_x)

Output

Column1
1

To avoid ingesting null, use the special encoding policy type bigobject, which overrides the MaxValueSize to 2 MB like this:

.alter column MyTable.hll_x policy encoding type='bigobject'

Ingesting a value now to the same table above:

.set-or-append MyTable <| range x from 1 to 1000000 step 1
| summarize hll(x,4)

ingests the second value successfully:

MyTable
| project isempty(hll_x)

Output

Column1
1
0

Example: Count with binned timestamp

There’s a table, PageViewsHllTDigest, containing hll values of Pages viewed in each hour. You want these values binned to 12h. Merge the hll values using the hll_merge() aggregate function, with the timestamp binned to 12h. Use the function dcount_hll to return the final dcount value:

PageViewsHllTDigest
| summarize merged_hll = hll_merge(hllPage) by bin(Timestamp, 12h)
| project Timestamp , dcount_hll(merged_hll)

Output

Timestampdcount_hll_merged_hll
2016-05-01 12:00:00.000000020056275
2016-05-02 00:00:00.000000038797623
2016-05-02 12:00:00.000000039316056
2016-05-03 00:00:00.000000013685621

To bin timestamp for 1d:

PageViewsHllTDigest
| summarize merged_hll = hll_merge(hllPage) by bin(Timestamp, 1d)
| project Timestamp , dcount_hll(merged_hll)

Output

Timestampdcount_hll_merged_hll
2016-05-01 00:00:00.000000020056275
2016-05-02 00:00:00.000000064135183
2016-05-03 00:00:00.000000013685621

The same query may be done over the values of tdigest, which represent the BytesDelivered in each hour:

PageViewsHllTDigest
| summarize merged_tdigests = merge_tdigest(tdigestBytesDel) by bin(Timestamp, 12h)
| project Timestamp , percentile_tdigest(merged_tdigests, 95, typeof(long))

Output

Timestamppercentile_tdigest_merged_tdigests
2016-05-01 12:00:00.0000000170200
2016-05-02 00:00:00.0000000152975
2016-05-02 12:00:00.0000000181315
2016-05-03 00:00:00.0000000146817

Example: Temporary table

Kusto limits are reached with datasets that are too large, where you need to run periodic queries over the dataset, but run the regular queries to calculate percentile() or dcount() over large datasets.

To solve this problem, newly added data may be added to a temp table as hll or tdigest values using hll() when the required operation is dcount or tdigest() when the required operation is percentile using set/append or update policy. In this case, the intermediate results of dcount or tdigest are saved into another dataset, which should be smaller than the target large one.

To solve this problem, newly added data may be added to a temp table as hll or tdigest values using hll() when the required operation is dcount. In this case, the intermediate results of dcount are saved into another dataset, which should be smaller than the target large one.

When you need to get the final results of these values, the queries may use hll/tdigest mergers: hll-merge()/tdigest_merge(). Then, after getting the merged values, percentile_tdigest() / dcount_hll() may be invoked on these merged values to get the final result of dcount or percentiles.

Assuming there’s a table, PageViews, into which data is ingested daily, every day on which you want to calculate the distinct count of pages viewed per minute later than date = datetime(2016-05-01 18:00:00.0000000).

Run the following query:

PageViews
| where Timestamp > datetime(2016-05-01 18:00:00.0000000)
| summarize percentile(BytesDelivered, 90), dcount(Page,2) by bin(Timestamp, 1d)

Output

Timestamppercentile_BytesDelivered_90dcount_Page
2016-05-01 00:00:00.00000008363420056275
2016-05-02 00:00:00.00000008277064135183
2016-05-03 00:00:00.00000007292013685621

This query aggregates all the values every time you run this query (for example, if you want to run it many times a day).

If you save the hll and tdigest values (which are the intermediate results of dcount and percentile) into a temp table, PageViewsHllTDigest, using an update policy or set/append commands, you may only merge the values and then use dcount_hll/percentile_tdigest using the following query:

PageViewsHllTDigest
| summarize  percentile_tdigest(merge_tdigest(tdigestBytesDel), 90), dcount_hll(hll_merge(hllPage)) by bin(Timestamp, 1d)

Output

Timestamppercentile_tdigest_merge_tdigests_tdigestBytesDeldcount_hll_hll_merge_hllPage
2016-05-01 00:00:00.00000008422420056275
2016-05-02 00:00:00.00000008348664135183
2016-05-03 00:00:00.00000007224713685621

This query should be more performant, as it runs over a smaller table. In this example, the first query runs over ~215M records, while the second one runs over just 32 records:

Example: Intermediate results

The Retention Query. Assume you have a table that summarizes when each Wikipedia page was viewed (sample size is 10M), and you want to find for each date1 date2 the percentage of pages reviewed in both date1 and date2 relative to the pages viewed on date1 (date1 < date2).

The trivial way uses join and summarize operators:

// Get the total pages viewed each day
let totalPagesPerDay = PageViewsSample
| summarize by Page, Day = startofday(Timestamp)
| summarize count() by Day;
// Join the table to itself to get a grid where 
// each row shows foreach page1, in which two dates
// it was viewed.
// Then count the pages between each two dates to
// get how many pages were viewed between date1 and date2.
PageViewsSample
| summarize by Page, Day1 = startofday(Timestamp)
| join kind = inner
(
    PageViewsSample
    | summarize by Page, Day2 = startofday(Timestamp)
)
on Page
| where Day2 > Day1
| summarize count() by Day1, Day2
| join kind = inner
    totalPagesPerDay
on $left.Day1 == $right.Day
| project Day1, Day2, Percentage = count_*100.0/count_1

Output

Day1Day2Percentage
2016-05-01 00:00:00.00000002016-05-02 00:00:00.000000034.0645725975255
2016-05-01 00:00:00.00000002016-05-03 00:00:00.000000016.618368960101
2016-05-02 00:00:00.00000002016-05-03 00:00:00.000000014.6291376489636

The above query took ~18 seconds.

When you use the hll(), hll_merge(), and dcount_hll() functions, the equivalent query will end after ~1.3 seconds and show that the hll functions speeds up the query above by ~14 times:

let Stats=PageViewsSample | summarize pagehll=hll(Page, 2) by day=startofday(Timestamp); // saving the hll values (intermediate results of the dcount values)
let day0=toscalar(Stats | summarize min(day)); // finding the min date over all dates.
let dayn=toscalar(Stats | summarize max(day)); // finding the max date over all dates.
let daycount=tolong((dayn-day0)/1d); // finding the range between max and min
Stats
| project idx=tolong((day-day0)/1d), day, pagehll
| mv-expand pidx=range(0, daycount) to typeof(long)
// Extend the column to get the dcount value from hll'ed values for each date (same as totalPagesPerDay from the above query)
| extend key1=iff(idx < pidx, idx, pidx), key2=iff(idx < pidx, pidx, idx), pages=dcount_hll(pagehll)
// For each two dates, merge the hll'ed values to get the total dcount over each two dates, 
// This helps to get the pages viewed in both date1 and date2 (see the description below about the intersection_size)
| summarize (day1, pages1)=arg_min(day, pages), (day2, pages2)=arg_max(day, pages), union_size=dcount_hll(hll_merge(pagehll)) by key1, key2
| where day2 > day1
// To get pages viewed in date1 and also date2, look at the merged dcount of date1 and date2, subtract it from pages of date1 + pages on date2.
| project pages1, day1,day2, intersection_size=(pages1 + pages2 - union_size)
| project day1, day2, Percentage = intersection_size*100.0 / pages1

Output

day1day2Percentage
2016-05-01 00:00:00.00000002016-05-02 00:00:00.000000033.2298494510578
2016-05-01 00:00:00.00000002016-05-03 00:00:00.000000016.9773830213667
2016-05-02 00:00:00.00000002016-05-03 00:00:00.000000014.5160020350006

2 - summarize operator

Learn how to use the summarize operator to produce a table that summarizes the content of the input table.

Produces a table that aggregates the content of the input table.

Syntax

T | summarize [ SummarizeParameters ] [[Column =] Aggregation [, …]] [by [Column =] GroupExpression [, …]]

Parameters

NameTypeRequiredDescription
ColumnstringThe name for the result column. Defaults to a name derived from the expression.
Aggregationstring✔️A call to an aggregation function such as count() or avg(), with column names as arguments.
GroupExpressionscalar✔️A scalar expression that can reference the input data. The output will have as many records as there are distinct values of all the group expressions.
SummarizeParametersstringZero or more space-separated parameters in the form of Name = Value that control the behavior. See supported parameters.

Supported parameters

NameDescription
hint.num_partitionsSpecifies the number of partitions used to share the query load on cluster nodes. See shuffle query
hint.shufflekey=<key>The shufflekey query shares the query load on cluster nodes, using a key to partition data. See shuffle query
hint.strategy=shuffleThe shuffle strategy query shares the query load on cluster nodes, where each node will process one partition of the data. See shuffle query

Returns

The input rows are arranged into groups having the same values of the by expressions. Then the specified aggregation functions are computed over each group, producing a row for each group. The result contains the by columns and also at least one column for each computed aggregate. (Some aggregation functions return multiple columns.)

The result has as many rows as there are distinct combinations of by values (which may be zero). If there are no group keys provided, the result has a single record.

To summarize over ranges of numeric values, use bin() to reduce ranges to discrete values.

Default values of aggregations

The following table summarizes the default values of aggregations:

OperatorDefault value
count(), countif(), dcount(), dcountif(), count_distinct(), sum(), sumif(), variance(), varianceif(), stdev(), stdevif()0
make_bag(), make_bag_if(), make_list(), make_list_if(), make_set(), make_set_if()empty dynamic array ([])
All othersnull

Examples

The example in this section shows how to use the syntax to help you get started.

Summarize price by fruit and supplier.

Unique combination

The following query determines what unique combinations of State and EventType there are for storms that resulted in direct injury. There are no aggregation functions, just group-by keys. The output will just show the columns for those results.

StormEvents
| where InjuriesDirect > 0
| summarize by State, EventType

Output

The following table shows only the first 5 rows. To see the full output, run the query.

StateEventType
TEXASThunderstorm Wind
TEXASFlash Flood
TEXASWinter Weather
TEXASHigh Wind
TEXASFlood

Minimum and maximum timestamp

Finds the minimum and maximum heavy rain storms in Hawaii. There’s no group-by clause, so there’s just one row in the output.

StormEvents
| where State == "HAWAII" and EventType == "Heavy Rain"
| project Duration = EndTime - StartTime
| summarize Min = min(Duration), Max = max(Duration)

Output

MinMax
01:08:0011:55:00

Distinct count

The following query calculates the number of unique storm event types for each state and sorts the results by the number of unique storm types:

StormEvents
| summarize TypesOfStorms=dcount(EventType) by State
| sort by TypesOfStorms

Output

The following table shows only the first 5 rows. To see the full output, run the query.

StateTypesOfStorms
TEXAS27
CALIFORNIA26
PENNSYLVANIA25
GEORGIA24
ILLINOIS23

Histogram

The following example calculates a histogram storm event types that had storms lasting longer than 1 day. Because Duration has many values, use bin() to group its values into 1-day intervals.

StormEvents
| project EventType, Duration = EndTime - StartTime
| where Duration > 1d
| summarize EventCount=count() by EventType, Length=bin(Duration, 1d)
| sort by Length

Output

EventTypeLengthEventCount
Drought30.00:00:001646
Wildfire30.00:00:0011
Heat30.00:00:0014
Flood30.00:00:0020
Heavy Rain29.00:00:0042

Aggregates default values

When the input of summarize operator has at least one empty group-by key, its result is empty, too.

When the input of summarize operator doesn’t have an empty group-by key, the result is the default values of the aggregates used in the summarize For more information, see Default values of aggregations.

datatable(x:long)[]
| summarize any_x=take_any(x), arg_max_x=arg_max(x, *), arg_min_x=arg_min(x, *), avg(x), buildschema(todynamic(tostring(x))), max(x), min(x), percentile(x, 55), hll(x) ,stdev(x), sum(x), sumif(x, x > 0), tdigest(x), variance(x)

Output

any_xarg_max_xarg_min_xavg_xschema_xmax_xmin_xpercentile_x_55hll_xstdev_xsum_xsumif_xtdigest_xvariance_x
NaN0000

The result of avg_x(x) is NaN due to dividing by 0.

datatable(x:long)[]
| summarize  count(x), countif(x > 0) , dcount(x), dcountif(x, x > 0)

Output

count_xcountif_dcount_xdcountif_x
0000
datatable(x:long)[]
| summarize  make_set(x), make_list(x)

Output

set_xlist_x
[][]

The aggregate avg sums all the non-nulls and counts only those which participated in the calculation (won’t take nulls into account).

range x from 1 to 4 step 1
| extend y = iff(x == 1, real(null), real(5))
| summarize sum(y), avg(y)

Output

sum_yavg_y
155

The regular count will count nulls:

range x from 1 to 2 step 1
| extend y = iff(x == 1, real(null), real(5))
| summarize count(y)

Output

count_y
2
range x from 1 to 2 step 1
| extend y = iff(x == 1, real(null), real(5))
| summarize make_set(y), make_set(y)

Output

set_yset_y1
[5.0][5.0]