Query Details
// This query assumes a feed of threat indicators is ingested/synchronized periodically, and each synchronization ingests new indicators and only old indicators that have been modified.
// Active threat indicators in Sentinel are renovated as ThreatIntelligenceIndicator events every ~12 days.
let query_frequency = 1h;
let query_period = 14d;
let query_wait = 0h;
let table_query_lookback = 3h;
let _TIBenignProperty =
_GetWatchlist('ID-TIBenignProperty')
| where Notes has_any ("[DestinationIPAddress]")
| project IndicatorId, BenignProperty
;
let _TIExcludedSources = toscalar(
_GetWatchlist('Activity-ExpectedSignificantActivity')
| where Activity == "ThreatIndicatorSource"
| summarize make_list(Auxiliar)
);
let _WAFAddresses = toscalar(
_GetWatchlist('Service-PrivateCorporateServices')
| where Service == "WAF"
| summarize make_list(IPAddress)
);
let _BenignWAFReplies = toscalar(
_GetWatchlist('Activity-ExpectedSignificantActivity')
| where Activity == "MaliciousAddressReplyWAF"
| summarize make_list(Auxiliar)
);
let _TITableMatch = (table_start: datetime, table_end: datetime, only_new_ti: boolean, ti_start: datetime = datetime(null)) {
// Scheduled Analytics rules have a query period limit of 14d
let _Indicators =// materialize(
ThreatIntelligenceIndicator
| where TimeGenerated > ago(query_period)
// Take the earliest TimeGenerated and the latest column info
| summarize hint.strategy=shuffle
minTimeGenerated = min(TimeGenerated),
arg_max(TimeGenerated, Active, Description, ActivityGroupNames, IndicatorId, ThreatType, DomainName, Url, ExpirationDateTime, ConfidenceScore, SourceSystem, Tags, AdditionalInformation, ExternalIndicatorId, NetworkIP, NetworkSourceIP, NetworkDestinationIP, EmailSourceIpAddress)
by IndicatorId
// Remove inactive or expired indicators
| where not(not(Active) or ExpirationDateTime < now())
// Pick indicators that contain the desired entity type
| mv-expand IPAddress = pack_array(NetworkIP, NetworkSourceIP, NetworkDestinationIP, EmailSourceIpAddress) to typeof(string)
| where isnotempty(IPAddress)
| extend TI_IPAddress = IPAddress
// Remove indicators from specific sources
| where not(AdditionalInformation has_any (_TIExcludedSources) or Description has_any (_TIExcludedSources))
// Remove indicators of certain type
//| where not(todynamic(AdditionalInformation)["value_type"] in ("domain", "url"))
// Remove excluded indicators with benign properties
| join kind=leftanti _TIBenignProperty on IndicatorId, $left.IPAddress == $right.BenignProperty
// Deduplicate indicators by IPAddress column, equivalent to using join kind=innerunique afterwards
| summarize hint.strategy=shuffle
minTimeGenerated = min(minTimeGenerated),
take_any(*)
by IPAddress
// If we want only new indicators, remove indicators received previously
| where not(only_new_ti and minTimeGenerated < ti_start)
//)
;
//let _IndicatorsLength = toscalar(_Indicators | summarize count());
//let _IndicatorsPrefilter = toscalar(
// _Indicators
// | extend AuxiliarField = tostring(extract(@"([0-9A-Za-f]+)[\.\:]", 1, IPAddress))
// | summarize make_set_if(AuxiliarField, isnotempty(AuxiliarField), 10000)
//);
//let _IndicatorsPrefilterLength = array_length(_IndicatorsPrefilter);
let _TableEvents =
AzureDiagnostics
| where TimeGenerated between (table_start .. table_end)
| where ResourceProvider == "MICROSOFT.NETWORK" and ResourceType == "AZUREFIREWALLS"// and Category in ("AzureFirewallNetworkRule", "AzureFirewallApplicationRule")
// Some kinds of operations could be excluded
| where not(OperationName in ("AzureFirewallNatRuleLog"))// "AzureFirewallNetworkRuleLog", "AzureFirewallApplicationRuleLog", "AzureFirewallIDSLog", "AzureFirewallThreatIntelLog"
// Filter events that may contain indicators
//| where not(_IndicatorsPrefilterLength < 10000 and not(msg_s has_any (_IndicatorsPrefilter))) // "has_any" limit 10000
| parse msg_s with Protocol " request from " SourceAddress1 ":" SourcePort: int " to " DestinationAddress1 ":" DestinationPort: int *
| parse msg_s with * ". Action: " Action1a "." *
//| parse msg_s with * " was " Action1b " to " NatDestinationAddress ":" NatDestinationPort:int "." *
| parse msg_s with Protocol2 " request from " SourceAddress2 " to " DestinationAddress2 ". Action: " Action2 "." *
| extend
//FirewallAction = case(isnotempty(Action1a), Action1a, isnotempty(Action1b), Action1b, Action2),
FirewallAction = case(isnotempty(Action1a), Action1a, Action2),
Protocol = iff(isnotempty(Protocol), Protocol, Protocol2),
SourceAddress = iff(isnotempty(SourceAddress1), SourceAddress1, SourceAddress2),
DestinationAddress = iff(isnotempty(DestinationAddress1), DestinationAddress1, DestinationAddress2)//,
//NatDestinationAddress = iff(isnotempty(NatDestination), NatDestination, "")
// Remove special cases where WAF is involved
| where not(DestinationAddress in (_WAFAddresses))
| where not(SourceAddress in (_WAFAddresses) and FirewallAction == "Log" and msg_s has_any (_BenignWAFReplies))
| mv-expand IPAddress = pack_array(SourceAddress, DestinationAddress) to typeof(string) //, NatDestinationAddress)
| where isnotempty(IPAddress)
// Remove denied traffic where the source was remote
| where not(IPAddress == SourceAddress and FirewallAction == "Deny")
| where not(isnotempty(parse_ipv4(IPAddress)) and ipv4_is_private(IPAddress))
//| where not(_IndicatorsLength < 1000000 and not(IPAddress in (toscalar(_Indicators | summarize make_list(TI_IPAddress))))) // "in" limit 1.000.000
| project-rename AzureDiagnostics_TimeGenerated = TimeGenerated
;
_Indicators
| join kind=inner hint.strategy=shuffle _TableEvents on IPAddress
// Take only a single event by key columns
//| summarize hint.strategy=shuffle take_any(*) by IPAddress, SourceAddress, DestinationAddress
| project
AzureDiagnostics_TimeGenerated,
Description,
ActivityGroupNames,
IndicatorId,
ThreatType,
DomainName,
Url,
ExpirationDateTime,
ConfidenceScore,
SourceSystem,
Tags,
AdditionalInformation,
TI_IPAddress,
NetworkIP,
NetworkSourceIP,
NetworkDestinationIP,
EmailSourceIpAddress,
Resource,
Category,
OperationName,
FirewallAction,
Protocol,
SourceAddress,
DestinationAddress,// NatDestinationAddress,
msg_s,
ResourceId,
ResourceType
};
union// isfuzzy=true
// Match current table events all indicators available
_TITableMatch(ago(query_frequency + query_wait), ago(query_wait), false),
// Match past table events new indicators since last query execution
_TITableMatch(ago(table_query_lookback + query_wait), ago(query_frequency + query_wait), true, ago(query_frequency))
| summarize arg_max(AzureDiagnostics_TimeGenerated, *) by IndicatorId, SourceAddress, DestinationAddress
| extend
timestamp = AzureDiagnostics_TimeGenerated,
IPCustomEntity = TI_IPAddress
This query retrieves threat indicators from a feed and matches them with events in the AzureDiagnostics table. It filters out inactive or expired indicators and removes indicators from specific sources. It also removes indicators with benign properties and deduplicates indicators by IP address. The query then joins the filtered indicators with events in the AzureDiagnostics table based on the IP address. Finally, it summarizes the results and includes the timestamp and IP address as custom entities. The query is run periodically to synchronize new indicators and modified old indicators with the events in the AzureDiagnostics table.

Jose Sebastián Canós
Released: December 14, 2023
Tables
Keywords
Operators