Query Details
// This query assumes a feed of threat indicators is ingested/synchronized periodically, and each synchronization ingests new indicators and only old indicators that have been modified.
// Active threat indicators in Sentinel are renovated as ThreatIntelligenceIndicator events every ~12 days.
let query_frequency = 1h;
let query_period = 14d;
let query_wait = 0h;
let table_query_lookback = 14d;
let _TIBenignProperty =
_GetWatchlist('ID-TIBenignProperty')
| where Notes has_any ("[SourceEmailAddress]", "[DestinationEmailAddress]")
| project IndicatorId, BenignProperty
;
let _TIExcludedSources = toscalar(
_GetWatchlist('Activity-ExpectedSignificantActivity')
| where Activity == "ThreatIndicatorSource"
| summarize make_list(Auxiliar)
);
let _ExcludedAlerts =
_GetWatchlist('AlertName-MonitoredDetections')
| where Notes has "[ExcludedTIAlerts]"
| project ProductName, AlertName, AnalyticsId = tostring(AnalyticsId)
;
let _EmailAddressRegex = toscalar(
_GetWatchlist('RegEx-SingleRegularExpressions')
| where UseCase == "EmailAddress"
| project RegEx
);
let _ExternalEmailAddressRegex = toscalar(
_GetWatchlist('RegEx-SingleRegularExpressions')
| where UseCase == "ExternalEmailAddress"
| project RegEx
);
let _TITableMatch = (table_start: datetime, table_end: datetime, only_new_ti: boolean, ti_start: datetime = datetime(null)) {
// Scheduled Analytics rules have a query period limit of 14d
let _Indicators =// materialize(
ThreatIntelligenceIndicator
| where TimeGenerated > ago(query_period)
// Take the earliest TimeGenerated and the latest column info
| summarize hint.strategy=shuffle
minTimeGenerated = min(TimeGenerated),
arg_max(TimeGenerated, Active, Description, ActivityGroupNames, IndicatorId, ThreatType, DomainName, Url, ExpirationDateTime, ConfidenceScore, AdditionalInformation, ExternalIndicatorId, EmailSenderAddress)
by IndicatorId
// Remove inactive or expired indicators
| where not(not(Active) or ExpirationDateTime < now())
// Pick indicators that contain the desired entity type
| where isnotempty(EmailSenderAddress)
| extend EmailAddress = tolower(EmailSenderAddress)
// Remove indicators from specific sources
| where not(AdditionalInformation has_any (_TIExcludedSources))
// Remove excluded indicators with benign properties
| join kind=leftanti _TIBenignProperty on IndicatorId, $left.EmailAddress == $right.BenignProperty
// Deduplicate indicators by EmailAddress column, equivalent to using join kind=innerunique afterwards
| summarize hint.strategy=shuffle
minTimeGenerated = min(minTimeGenerated),
take_any(*)
by EmailAddress
// If we want only new indicators, remove indicators received previously
| where not(only_new_ti and minTimeGenerated < ti_start)
//)
;
//let _IndicatorsLength = toscalar(_Indicators | summarize count());
//let _IndicatorsPrefilter = toscalar(
// _Indicators
// | extend AuxiliarField = tostring(split(EmailAddress, ".")[-1])
// | summarize make_set_if(AuxiliarField, isnotempty(AuxiliarField))
//);
//let _IndicatorsPrefilterLength = array_length(_IndicatorsPrefilter);
let _TableEvents =
SecurityAlert
| where TimeGenerated between (table_start .. table_end)
// Remove TI map alerts
| extend MSTI = (VendorName == "Microsoft" and ProductName == 'Azure Sentinel' and (ProviderName == "Threat Intelligence Alerts" or AlertName has "TI map"))
| where MSTI == false //or MSTI == true
// Remove excluded alerts
| extend AnalyticsId = iff(ProductName == "Azure Sentinel", tostring(split(AlertType, "_")[-1]), AlertType)
| join kind=leftanti _ExcludedAlerts on ProductName, AlertName, AnalyticsId
// Filter events that may contain indicators
| where isnotempty(Entities)
//| where not(_IndicatorsPrefilterLength < 10000 and not(Entities has_any (_IndicatorsPrefilter))) // valid TLD ~1500 , "has_any" limit 10000
// Extract already recognized addresses
| mv-apply EntitiesDynamic = todynamic(Entities) on (
extend EntityUPNSuffix = tostring(EntitiesDynamic.UPNSuffix)
| extend EntityUPN = iff(isnotempty(EntityUPNSuffix), strcat(tostring(EntitiesDynamic.Name), "@", EntityUPNSuffix), "")
| summarize EntityEmails = make_set_if(EntityUPN, isnotempty(EntityUPN))
)
// Extract recognized and unrecognized addresses
| extend EntityEmails = todynamic(dynamic_to_json(extract_all(_EmailAddressRegex, dynamic([1]), strcat(tostring(EntityEmails), Entities))))
| mv-expand EmailAddress = EntityEmails
| extend EmailAddress = tolower(tostring(EmailAddress[0]))
| where isnotempty(EmailAddress)
| summarize take_any(*) by EmailAddress
// Parse original address
| extend EmailAddress = case(
EmailAddress has "#EXT#", replace_regex(EmailAddress, _ExternalEmailAddressRegex, @"\2@\3"),
EmailAddress startswith "live.com#" or EmailAddress startswith "guest#", replace_regex(EmailAddress, strcat(@"(?:live\.com#|guest#)", _EmailAddressRegex), @"\2@\3"),
EmailAddress
)
//| where not(_IndicatorsLength < 1000000 and not(EmailAddress in (toscalar(_Indicators | summarize make_list(EmailAddress))))) // "in" limit 1.000.000
// Extract one entity of each type of multiple possible
| mv-apply EntitiesDynamic = todynamic(Entities) on (
summarize
Alert_Account = take_anyif(strcat(tostring(EntitiesDynamic.Name), iff(isnotempty(tostring(EntitiesDynamic.UPNSuffix)), "@", ""), tostring(EntitiesDynamic.UPNSuffix)), EntitiesDynamic.Type == "account" and not(EntitiesDynamic.IsValid == "false")),
Alert_HostName = take_anyif(tostring(EntitiesDynamic.HostName), EntitiesDynamic.Type == "host"),
Alert_IPAddress = take_anyif(tostring(EntitiesDynamic.Address), EntitiesDynamic.Type == "ip")
)
| project-rename Alert_Description = Description
| project-rename SecurityAlert_TimeGenerated = TimeGenerated
;
_Indicators
| join kind=inner hint.strategy=shuffle _TableEvents on EmailAddress
// Take only a single event by key columns
//| summarize hint.strategy=shuffle take_any(*) by EmailAddress, AlertName
| project
SecurityAlert_TimeGenerated,
Description, ActivityGroupNames, IndicatorId, ThreatType, DomainName, Url, ExpirationDateTime, ConfidenceScore, AdditionalInformation, EmailSenderAddress,
AlertName, AlertSeverity, Entities, ProviderName, ProductName, VendorName, AnalyticsId, Alert_Description, Alert_Account, Alert_IPAddress, Alert_HostName, CompromisedEntity
};
union// isfuzzy=true
// Match current table events all indicators available
_TITableMatch(ago(query_frequency + query_wait), ago(query_wait), false),
// Match past table events new indicators since last query execution
_TITableMatch(ago(table_query_lookback + query_wait), ago(query_frequency + query_wait), true, ago(query_frequency))
| summarize arg_max(SecurityAlert_TimeGenerated, *) by IndicatorId, AlertName
| extend
timestamp = SecurityAlert_TimeGenerated,
IPCustomEntity = Alert_IPAddress,
HostCustomEntity = Alert_HostName,
AccountCustomEntity = EmailSenderAddress
This query is used to match threat indicators with security alert events in Azure Sentinel. It starts by retrieving threat indicators and filtering out inactive or expired ones. It also removes indicators from specific sources and excludes indicators with benign properties.
Next, it retrieves security alert events and removes TI map alerts and excluded alerts. It filters events that may contain indicators and extracts recognized and unrecognized email addresses. It then parses the original address and extracts one entity of each type.
The query then joins the threat indicators with the security alert events based on the email address. It takes only a single event for each unique email address and alert name.
Finally, it summarizes the results and creates custom entities for IP addresses, host names, and email addresses.

Jose Sebastián Canós
Released: February 6, 2023
Tables
Keywords
Operators