Query Details
// This query assumes a feed of threat indicators is ingested/synchronized periodically, and each synchronization ingests new indicators and only old indicators that have been modified.
// Active threat indicators in Sentinel are renovated as ThreatIntelligenceIndicator events every ~12 days.
let query_frequency = 1h;
let query_period = 14d;
let query_wait = 0h;
let table_query_lookback = 14d;
let _TIBenignProperty =
_GetWatchlist('ID-TIBenignProperty')
| where Notes has_any ("[SourceEmailAddress]", "[DestinationEmailAddress]")
| project IndicatorId, BenignProperty
;
let _TIExcludedSources = toscalar(
_GetWatchlist('Activity-ExpectedSignificantActivity')
| where Activity == "ThreatIndicatorSource"
| summarize make_list(Auxiliar)
);
let _EmailAddressRegex = toscalar(
_GetWatchlist('RegEx-SingleRegularExpressions')
| where UseCase == "EmailAddress"
| project RegEx
);
let _ExternalEmailAddressRegex = toscalar(
_GetWatchlist('RegEx-SingleRegularExpressions')
| where UseCase == "ExternalEmailAddress"
| project RegEx
);
let _TITableMatch = (table_start: datetime, table_end: datetime, only_new_ti: boolean, ti_start: datetime = datetime(null)) {
// Scheduled Analytics rules have a query period limit of 14d
let _Indicators =// materialize(
ThreatIntelligenceIndicator
| where TimeGenerated > ago(query_period)
// Take the earliest TimeGenerated and the latest column info
| summarize hint.strategy=shuffle
minTimeGenerated = min(TimeGenerated),
arg_max(TimeGenerated, Active, Description, ActivityGroupNames, IndicatorId, ThreatType, DomainName, Url, ExpirationDateTime, ConfidenceScore, AdditionalInformation, ExternalIndicatorId, EmailSenderAddress)
by IndicatorId
// Remove inactive or expired indicators
| where not(not(Active) or ExpirationDateTime < now())
// Pick indicators that contain the desired entity type
| where isnotempty(EmailSenderAddress)
| extend EmailAddress = tolower(EmailSenderAddress)
// Remove indicators from specific sources
| where not(AdditionalInformation has_any (_TIExcludedSources))
// Remove excluded indicators with benign properties
| join kind=leftanti _TIBenignProperty on IndicatorId, $left.EmailAddress == $right.BenignProperty
// Deduplicate indicators by EmailAddress column, equivalent to using join kind=innerunique afterwards
| summarize hint.strategy=shuffle
minTimeGenerated = min(minTimeGenerated),
take_any(*)
by EmailAddress
// If we want only new indicators, remove indicators received previously
| where not(only_new_ti and minTimeGenerated < ti_start)
//)
;
//let _IndicatorsLength = toscalar(_Indicators | summarize count());
//let _IndicatorsPrefilter = toscalar(
// _Indicators
// | extend AuxiliarField = tostring(split(EmailAddress, ".")[-1])
// | summarize make_set_if(AuxiliarField, isnotempty(AuxiliarField))
//);
//let _IndicatorsPrefilterLength = array_length(_IndicatorsPrefilter);
let _TableEvents =
OfficeActivity
| where ingestion_time() between (table_start .. table_end)
// Filter events that may contain indicators
| where not(isempty(UserId) and isempty(TargetUserOrGroupName))
//| where not(_IndicatorsPrefilterLength < 10000 and not(UserId has_any (_IndicatorsPrefilter) or TargetUserOrGroupName has_any (_IndicatorsPrefilter))) // valid TLD ~1500 , "has_any" limit 10000
| mv-expand EmailAddress = pack_array(UserId, TargetUserOrGroupName)
| where isnotempty(EmailAddress)
| extend EmailAddress = tolower(EmailAddress)
| where EmailAddress matches regex _EmailAddressRegex
| extend EmailAddress = case(
EmailAddress has "#EXT#", replace_regex(EmailAddress, _ExternalEmailAddressRegex, @"\2@\3"),
EmailAddress startswith "live.com#" or EmailAddress startswith "urn:spo:guest#", replace_regex(EmailAddress, strcat(@"(?:live\.com#|urn:spo:guest#)", _EmailAddressRegex), @"\2@\3"),
EmailAddress
)
//| where not(_IndicatorsLength < 1000000 and not(EmailAddress in (toscalar(_Indicators | summarize make_list(EmailAddress))))) // "in" limit 1.000.000
| project-rename OfficeActivity_TimeGenerated = TimeGenerated
;
_Indicators
| join kind=inner hint.strategy=shuffle _TableEvents on EmailAddress
// Take only a single event by key columns
//| summarize hint.strategy=shuffle take_any(*) by EmailAddress, UserId
| project
OfficeActivity_TimeGenerated,
Description, ActivityGroupNames, IndicatorId, ThreatType, DomainName, Url, ExpirationDateTime, ConfidenceScore, AdditionalInformation, EmailSenderAddress,
OfficeWorkload, RecordType, Operation, UserType, UserId, ClientIP, ResultStatus, OfficeObjectId, Parameters
};
union// isfuzzy=true
// Match current table events all indicators available
_TITableMatch(ago(query_frequency + query_wait), ago(query_wait), false),
// Match past table events new indicators since last query execution
_TITableMatch(ago(table_query_lookback + query_wait), ago(query_frequency + query_wait), true, ago(query_frequency))
| summarize arg_max(OfficeActivity_TimeGenerated, *) by IndicatorId, UserId
| extend
timestamp = OfficeActivity_TimeGenerated,
IPCustomEntity = ClientIP,
AccountCustomEntity = EmailSenderAddress
This query is used to match threat indicators with office activity events. It assumes that a feed of threat indicators is ingested periodically and synchronized, and only new or modified indicators are ingested. The query defines several variables for the frequency and period of the query, as well as the lookback period for the table query.
The query then retrieves a list of benign properties and excluded sources from watchlists. It also retrieves regular expressions for email addresses and external email addresses from another watchlist.
The main part of the query is the _TITableMatch function, which matches the threat indicators with office activity events. It filters the threat indicators based on their activity, expiration date, and source. It also removes indicators with benign properties. The function then matches the remaining indicators with office activity events based on email addresses.
Finally, the query combines the results from two _TITableMatch function calls, one for current table events and all available indicators, and another for past table events and new indicators since the last query execution. It summarizes the results by IndicatorId and UserId, and adds additional fields such as timestamp, IP address, and email address.

Jose Sebastián Canós
Released: December 5, 2022
Tables
Keywords
Operators