Query Details
// This query assumes a feed of threat indicators is ingested/synchronized periodically, and each synchronization ingests new indicators and only old indicators that have been modified.
// Active threat indicators in Sentinel are renovated as ThreatIntelligenceIndicator events every ~12 days.
let query_frequency = 1h;
let query_period = 14d;
let query_wait = 0h;
let table_query_lookback = 14d;
let _TIBenignProperty =
_GetWatchlist('ID-TIBenignProperty')
| where Notes has_any ("[URL]")
| project IndicatorId, BenignProperty
;
let _TIExcludedSources = toscalar(
_GetWatchlist('Activity-ExpectedSignificantActivity')
| where Activity == "ThreatIndicatorSource"
| summarize make_list(Auxiliar)
);
let _URLRegex = toscalar(
_GetWatchlist('RegEx-SingleRegularExpressions')
| where UseCase == "Threat Intelligence Indicator URL"
| project RegEx
);
let _TITableMatch = (table_start: datetime, table_end: datetime, only_new_ti: boolean, ti_start: datetime = datetime(null)) {
// Scheduled Analytics rules have a query period limit of 14d
let _Indicators =// materialize(
ThreatIntelligenceIndicator
| where TimeGenerated > ago(query_period)
// Take the earliest TimeGenerated and the latest column info
| summarize hint.strategy=shuffle
minTimeGenerated = min(TimeGenerated),
arg_max(TimeGenerated, Active, Description, ActivityGroupNames, IndicatorId, ThreatType, DomainName, Url, ExpirationDateTime, ConfidenceScore, SourceSystem, Tags, AdditionalInformation, ExternalIndicatorId)
by IndicatorId
// Remove inactive or expired indicators
| where not(not(Active) or ExpirationDateTime < now())
// Pick indicators that contain the desired entity type
| where isnotempty(Url)
| extend Url = trim_end(@"\/", Url)
// Remove indicators from specific sources
| where not(AdditionalInformation has_any (_TIExcludedSources) or Description has_any (_TIExcludedSources))
// Remove excluded indicators with benign properties
| join kind=leftanti _TIBenignProperty on IndicatorId, $left.Url == $right.BenignProperty
// Deduplicate indicators by Url column, equivalent to using join kind=innerunique afterwards
| summarize hint.strategy=shuffle
minTimeGenerated = min(minTimeGenerated),
take_any(*)
by Url
// If we want only new indicators, remove indicators received previously
| where not(only_new_ti and minTimeGenerated < ti_start)
//)
;
//let _IndicatorsLength = toscalar(_Indicators | summarize count());
//let _IndicatorsPrefilter = toscalar(
// _Indicators
// | extend AuxiliarField = tostring(split(extract(_URLRegex, 3, Url), ".")[-1])
// | summarize make_set_if(AuxiliarField, isnotempty(AuxiliarField), 10000)
//);
//let _IndicatorsPrefilterLength = array_length(_IndicatorsPrefilter);
let _TableEvents =
UrlClickEvents
| where TimeGenerated between (table_start .. table_end)
| project-rename OriginalUrl = Url
// Filter events that may contain indicators
//| where not(_IndicatorsPrefilterLength < 10000 and not(UrlChain has_any (_IndicatorsPrefilter))) // valid TLD ~1500 , "has_any" limit 10000
| mv-expand Url = array_concat(pack_array(OriginalUrl), todynamic(UrlChain)) to typeof(string)
| where isnotempty(Url)
| extend Url = trim_end(@"\/", Url)
//| where not(_IndicatorsLength < 1000000 and not(Url in (toscalar(_Indicators | summarize make_list(Url))))) // "in" limit 1.000.000
| project-rename UrlClickEvents_TimeGenerated = TimeGenerated
;
_Indicators
| join kind=inner hint.strategy=shuffle _TableEvents on Url
// Take only a single event by key columns
//| summarize hint.strategy=shuffle take_any(*) by Url, NetworkMessageId
| project
UrlClickEvents_TimeGenerated,
Description,
ActivityGroupNames,
IndicatorId,
ThreatType,
DomainName,
Url,
ExpirationDateTime,
ConfidenceScore,
SourceSystem,
Tags,
AdditionalInformation,
Workload,
AccountUpn,
IPAddress,
ActionType,
IsClickedThrough,
OriginalUrl,
UrlChain,
NetworkMessageId
};
union// isfuzzy=true
// Match current table events all indicators available
_TITableMatch(ago(query_frequency + query_wait), ago(query_wait), false),
// Match past table events new indicators since last query execution
_TITableMatch(ago(table_query_lookback + query_wait), ago(query_frequency + query_wait), true, ago(query_frequency))
| summarize arg_max(UrlClickEvents_TimeGenerated, *) by IndicatorId, NetworkMessageId
| extend
timestamp = UrlClickEvents_TimeGenerated,
AccountCustomEntity = AccountUpn,
IPCustomEntity = IPAddress,
URLCustomEntity = Url
The query is retrieving threat indicators from a feed and matching them with URL click events. It filters out inactive or expired indicators, removes indicators from specific sources, and excludes indicators with benign properties. The query also includes a period of time to look back for past table events and only includes new indicators since the last query execution. The final result is a summary of the matched indicators with relevant information such as timestamp, account, IP address, and URL.

Jose Sebastián Canós
Released: December 14, 2023
Tables
Keywords
Operators