Query Details
// This query assumes a feed of threat indicators is ingested/synchronized periodically, and each synchronization ingests new indicators and only old indicators that have been modified.
// Active threat indicators in Sentinel are renovated as ThreatIntelligenceIndicator events every ~12 days.
let query_frequency = 1h;
let query_period = 14d;
let query_wait = 1h;
let table_query_lookback = 14d;
let _TIBenignProperty =
_GetWatchlist('ID-TIBenignProperty')
| where Notes has_any ("[Hash]")
| project IndicatorId, BenignProperty
;
let _TIExcludedSources = toscalar(
_GetWatchlist('Activity-ExpectedSignificantActivity')
| where Activity == "ThreatIndicatorSource"
| summarize make_list(Auxiliar)
);
let _TITableMatch = (table_start: datetime, table_end: datetime, only_new_ti: boolean, ti_start: datetime = datetime(null)) {
// Scheduled Analytics rules have a query period limit of 14d
let _Indicators =// materialize(
ThreatIntelligenceIndicator
| where TimeGenerated > ago(query_period)
// Take the earliest TimeGenerated and the latest column info
| summarize hint.strategy=shuffle
minTimeGenerated = min(TimeGenerated),
arg_max(TimeGenerated, Active, Description, ActivityGroupNames, IndicatorId, ThreatType, DomainName, Url, ExpirationDateTime, ConfidenceScore, AdditionalInformation, ExternalIndicatorId, FileHashValue, FileHashType)
by IndicatorId
// Remove inactive or expired indicators
| where not(not(Active) or ExpirationDateTime < now())
// Pick indicators that contain the desired entity type
| where isnotempty(FileHashValue)
| extend FileHashValue = toupper(FileHashValue)
// Remove indicators from specific sources
| where not(AdditionalInformation has_any (_TIExcludedSources))
// Remove excluded indicators with benign properties
| join kind=leftanti _TIBenignProperty on IndicatorId, $left.FileHashValue == $right.BenignProperty
// Deduplicate indicators by FileHashValue column, equivalent to using join kind=innerunique afterwards
| summarize hint.strategy=shuffle
minTimeGenerated = min(minTimeGenerated),
take_any(*)
by FileHashValue
// If we want only new indicators, remove indicators received previously
| where not(only_new_ti and minTimeGenerated < ti_start)
//)
;
//let _IndicatorsLength = toscalar(_Indicators | summarize count());
let _TableEvents =
EmailAttachmentInfo
| where ingestion_time() between (table_start .. table_end)
// Filter events that may contain indicators
| where isnotempty(SHA256)
| extend FileHashValue = toupper(SHA256)
//| where not(_IndicatorsLength < 1000000 and not(FileHashValue in (toscalar(_Indicators | summarize make_list(FileHashValue))))) // "in" limit 1.000.000
| project-rename EmailAttachmentInfo_TimeGenerated = TimeGenerated
;
_Indicators
| join kind=inner hint.strategy=shuffle _TableEvents on FileHashValue
// Take only a single event by key columns
//| summarize hint.strategy=shuffle take_any(*) by FileHashValue, NetworkMessageId
| project
EmailAttachmentInfo_TimeGenerated,
Description, ActivityGroupNames, IndicatorId, ThreatType, DomainName, Url, ExpirationDateTime, ConfidenceScore, AdditionalInformation, FileHashValue, FileHashType,
SenderFromAddress, SenderDisplayName, RecipientEmailAddress, FileName, FileType, FileSize, SHA256, ThreatTypes, DetectionMethods, NetworkMessageId, ReportId
| join kind=leftouter hint.strategy=shuffle (
EmailEvents
| where ingestion_time() between(table_start .. now())
| project
SenderFromDomain,
SenderMailFromAddress,
SenderMailFromDomain,
SenderIPv4,
SenderIPv6,
AuthenticationDetails,
EmailDirection,
Subject,
EmailLanguage,
UrlCount,
AttachmentCount,
AdditionalFields,
OrgLevelPolicy,
OrgLevelAction,
UserLevelPolicy,
UserLevelAction,
EmailActionPolicy,
EmailAction,
DeliveryAction,
DeliveryLocation,
ConfidenceLevel,
Connectors,
NetworkMessageId,
EmailEvents_ReportId = ReportId
)
on NetworkMessageId
| project-away NetworkMessageId1
};
union// isfuzzy=true
// Match current table events all indicators available
_TITableMatch(ago(query_frequency + query_wait), ago(query_wait), false),
// Match past table events new indicators since last query execution
_TITableMatch(ago(table_query_lookback + query_wait), ago(query_frequency + query_wait), true, ago(query_frequency))
| summarize arg_max(EmailAttachmentInfo_TimeGenerated, *) by IndicatorId, NetworkMessageId
| extend
timestamp = EmailAttachmentInfo_TimeGenerated,
FileHashCustomEntity = FileHashValue
The query is retrieving threat indicators from a feed and matching them with events in a table. It filters out inactive or expired indicators, removes indicators from specific sources, and excludes indicators with benign properties. It also removes indicators that have been received previously if only new indicators are desired. The matched indicators are then joined with events from a table and additional information is projected. Finally, the results are summarized by IndicatorId and NetworkMessageId, and a custom entity called FileHashCustomEntity is created.

Jose Sebastián Canós
Released: December 5, 2022
Tables
Keywords
Operators