Query Details
// This query assumes a feed of threat indicators is ingested/synchronized periodically, and each synchronization ingests new indicators and only old indicators that have been modified.
// Active threat indicators in Sentinel are renovated as ThreatIntelligenceIndicator events every ~12 days.
let query_frequency = 1h;
let query_period = 14d;
let query_wait = 0h;
let table_query_lookback = 14d;
let _TIBenignProperty =
_GetWatchlist('ID-TIBenignProperty')
| where Notes has_any ("[SourceIPAddress]")
| project IndicatorId, BenignProperty
;
let _TIExcludedSources = toscalar(
_GetWatchlist('Activity-ExpectedSignificantActivity')
| where Activity == "ThreatIndicatorSource"
| summarize make_list(Auxiliar)
);
let _TITableMatch = (table_start: datetime, table_end: datetime, only_new_ti: boolean, ti_start: datetime = datetime(null)) {
// Scheduled Analytics rules have a query period limit of 14d
let _Indicators =// materialize(
ThreatIntelligenceIndicator
| where TimeGenerated > ago(query_period)
// Take the earliest TimeGenerated and the latest column info
| summarize hint.strategy=shuffle
minTimeGenerated = min(TimeGenerated),
arg_max(TimeGenerated, Active, Description, ActivityGroupNames, IndicatorId, ThreatType, DomainName, Url, ExpirationDateTime, ConfidenceScore, AdditionalInformation, ExternalIndicatorId, NetworkIP, NetworkSourceIP, NetworkDestinationIP, EmailSourceIpAddress)
by IndicatorId
// Remove inactive or expired indicators
| where not(not(Active) or ExpirationDateTime < now())
// Pick indicators that contain the desired entity type
| mv-expand IPAddress = pack_array(NetworkIP, NetworkSourceIP, NetworkDestinationIP, EmailSourceIpAddress) to typeof(string)
| where isnotempty(IPAddress)
| extend TI_IPAddress = IPAddress
// Remove indicators from specific sources
| where not(AdditionalInformation has_any (_TIExcludedSources))
// Remove excluded indicators with benign properties
| join kind=leftanti _TIBenignProperty on IndicatorId, $left.IPAddress == $right.BenignProperty
// Deduplicate indicators by IPAddress column, equivalent to using join kind=innerunique afterwards
| summarize hint.strategy=shuffle
minTimeGenerated = min(minTimeGenerated),
take_any(*)
by IPAddress
// If we want only new indicators, remove indicators received previously
| where not(only_new_ti and minTimeGenerated < ti_start)
//)
;
//let _IndicatorsLength = toscalar(_Indicators | summarize count());
//let _IndicatorsPrefilter = toscalar(
// _Indicators
// | extend AuxiliarField = tostring(extract(@"([0-9A-Za-f]+)[\.\:]", 1, IPAddress))
// | summarize make_set_if(AuxiliarField, isnotempty(AuxiliarField), 10000)
//);
//let _IndicatorsPrefilterLength = array_length(_IndicatorsPrefilter);
let _TableEvents =
AADServicePrincipalSignInLogs
| where TimeGenerated between (table_start .. table_end)
// Filter events that may contain indicators
//| where not(_IndicatorsPrefilterLength < 10000 and not(IPAddress has_any (_IndicatorsPrefilter))) // "has_any" limit 10000
//| where not(_IndicatorsLength < 1000000 and not(IPAddress in (toscalar(_Indicators | summarize make_list(TI_IPAddress))))) // "in" limit 1.000.000
| project-rename AADServicePrincipalSignInLogs_TimeGenerated = TimeGenerated
;
_Indicators
| join kind=inner hint.strategy=shuffle _TableEvents on IPAddress
// Take only a single event by key columns
//| summarize hint.strategy=shuffle take_any(*) by IPAddress, ServicePrincipalName
| project
AADServicePrincipalSignInLogs_TimeGenerated,
Description, ActivityGroupNames, IndicatorId, ThreatType, DomainName, Url, ExpirationDateTime, ConfidenceScore, AdditionalInformation, TI_IPAddress, NetworkIP, NetworkSourceIP, NetworkDestinationIP, EmailSourceIpAddress,
Category, ServicePrincipalName, IPAddress, Location, ResultType, ResultDescription, AuthenticationProcessingDetails, AppId, ResourceDisplayName, ServicePrincipalId, ServicePrincipalCredentialKeyId, ServicePrincipalCredentialThumbprint, CorrelationId
};
union// isfuzzy=true
// Match current table events all indicators available
_TITableMatch(ago(query_frequency + query_wait), ago(query_wait), false),
// Match past table events new indicators since last query execution
_TITableMatch(ago(table_query_lookback + query_wait), ago(query_frequency + query_wait), true, ago(query_frequency))
| summarize arg_max(AADServicePrincipalSignInLogs_TimeGenerated, *) by IndicatorId, ServicePrincipalName
| extend
timestamp = AADServicePrincipalSignInLogs_TimeGenerated,
AccountCustomEntity = ServicePrincipalName,
IPCustomEntity = TI_IPAddress
This query is used to match threat indicators with table events in Azure Sentinel. The query assumes that a feed of threat indicators is ingested periodically, and each synchronization only includes new indicators and modified old indicators. The query frequency, period, and wait time are defined at the beginning.
The query first retrieves a watchlist of benign properties and excluded sources. It then defines a function called "_TITableMatch" that takes the start and end times of the table events, a flag indicating whether to include only new indicators, and the start time of the previous query execution. Inside this function, it retrieves the threat intelligence indicators, filters out inactive or expired indicators, picks indicators with the desired entity type, removes indicators from specific sources and with benign properties, deduplicates indicators by IP address, and optionally removes indicators received previously.
The function then joins the threat indicators with the table events based on the IP address. It projects the relevant columns from both tables and takes only a single event for each key combination. The function returns the joined table events.
Finally, the query uses the union operator to combine the results of two "_TITableMatch" function calls. The first call matches the current table events with all available indicators, while the second call matches the past table events with new indicators since the last query execution. The query then summarizes the results by the IndicatorId and ServicePrincipalName columns and extends the result with additional columns for timestamp, account custom entity, and IP custom entity.

Jose Sebastián Canós
Released: December 5, 2022
Tables
Keywords
Operators