Query Details
// Rule : Workload Identity - SP Sign-in Surge After Owner's Risk Elevation with M365 Impact
// Severity: High
// Tactics : LateralMovement, Persistence, Collection
// MITRE : T1078.004 (Valid Accounts: Cloud Accounts), T1098.001, T1530
// Freq : PT1H Period: PT1H
// Tables : AADUserRiskEvents, AuditLogs, AADServicePrincipalSignInLogs, OfficeActivity
// Built-in differentiation: No Sentinel built-in correlates Identity Protection user risk
// events with subsequent SP sign-in surges, nor does any built-in cross-reference the
// resulting OfficeActivity to confirm M365 data plane impact. The four-table correlation
// provides both the attack-chain signal (human compromise -> SP authentication surge) and
// the impact confirmation (real M365 data access from the same IP addresses).
//==========================================================================================
// Attack chain: (1) Attacker compromises a user account. (2) Identity Protection fires a
// risk event. (3) Attacker uses SP credentials the compromised user controls to authenticate
// headlessly at scale. (4) The SP accesses Exchange/SharePoint/Teams resources.
// Detecting a sign-in spike (current hour >= 3x 14-day hourly average) on an SP owned by a
// recently risk-elevated user, confirmed by OfficeActivity from the same signing IPs,
// provides compound evidence of active lateral movement with real M365 data impact.
// ---- Network Allowlist (exclude trusted IPs / CIDR / ranges) --------------
let _allow = materialize(union isfuzzy=true (print R="" | take 0), (_GetWatchlist('NetworkAllowlist') | project R = tostring(IPOrRange)) | where isnotempty(R));
let _allowCIDR = toscalar(_allow | where not(R matches regex @'^\d+\.\d+\.\d+\.\d+-\d+\.\d+\.\d+\.\d+$') | extend R = iff(R has '/', R, strcat(R, '/32')) | summarize make_list(R));
let _allowRange = toscalar(_allow | where R matches regex @'^\d+\.\d+\.\d+\.\d+-\d+\.\d+\.\d+\.\d+$' | summarize make_list(R));
let _ExcludeAllowlistedIPs = (T:(IPAddress:string)) {
T
| extend IPAddress = tostring(IPAddress)
| where array_length(_allowCIDR) == 0 or isnull(ipv4_is_in_any_range(IPAddress, _allowCIDR)) or not(ipv4_is_in_any_range(IPAddress, _allowCIDR))
| mv-apply _r = _allowRange to typeof(string) on (
extend _lo = tostring(split(_r,'-')[0]), _hi = tostring(split(_r,'-')[1])
| extend _inRange = ipv4_compare(IPAddress, _lo) >= 0 and ipv4_compare(IPAddress, _hi) <= 0
| summarize _anyInRange = max(toint(_inRange)))
| where isnull(_anyInRange) or _anyInRange == 0
| project-away _anyInRange
};
// ---------------------------------------------------------------------------
let LookbackWindow = 1h;
let RiskLookback = 72h;
let SpikeMultiplier = 3.0;
// Users with a risk event in the past 72 hours
let RecentRiskyUsers = union isfuzzy=true
AADUserRiskEvents,
(datatable(UserId:string, UserPrincipalName:string, RiskEventType:string,
RiskLevel:string, DetectedDateTime:datetime)[])
| where DetectedDateTime > ago(RiskLookback)
| where RiskLevel in ("high", "medium")
| summarize
RiskEventTypes = make_set(RiskEventType, 10),
RiskLevel = max(RiskLevel),
FirstRisk = min(DetectedDateTime)
by UserId, UserPrincipalName;
// SPs created or owned by risky users (AuditLogs lookback: 30d for context)
let RiskyOwnedSPs = AuditLogs
| where TimeGenerated > ago(30d)
| where OperationName in~ (
"Add service principal",
"Add application",
"Add owner to service principal",
"Add owner to application")
| where Result =~ "success"
| where isnotempty(tostring(InitiatedBy.user.id))
| extend InitiatorUserId = tostring(InitiatedBy.user.id)
| extend SPId = tostring(TargetResources[0].id)
| extend SPName = tostring(TargetResources[0].displayName)
| join kind=inner RecentRiskyUsers on $left.InitiatorUserId == $right.UserId
| summarize
OwnerUserId = any(UserId),
OwnerUPN = any(UserPrincipalName),
RiskEventTypes = any(RiskEventTypes),
RiskLevel = any(RiskLevel),
FirstRisk = any(FirstRisk),
SPName = any(SPName)
by SPId;
// 14-day sign-in baseline (hourly average)
let HistoricalAvg = (AADServicePrincipalSignInLogs | invoke _ExcludeAllowlistedIPs())
| where TimeGenerated between (ago(14d) .. ago(1h))
| where ResultType == "0"
| summarize HistoricalSignins = count() by ServicePrincipalId
| extend HourlyAvg = todouble(HistoricalSignins) / (14.0 * 24.0);
// Current window SP sign-in activity
let CurrentSignins = (AADServicePrincipalSignInLogs | invoke _ExcludeAllowlistedIPs())
| where TimeGenerated > ago(LookbackWindow)
| where ResultType == "0"
| summarize
CurrentCount = count(),
UniqueIPs = dcount(IPAddress),
IPList = make_set(IPAddress, 5),
Countries = make_set(Location, 3),
Resources = make_set(ResourceDisplayName, 5),
CredTypes = make_set(ClientCredentialType, 3)
by ServicePrincipalId, ServicePrincipalName, AppId;
// OfficeActivity from the same IPs used by the SP — confirms M365 data plane impact
let OfficeImpact = OfficeActivity
| where TimeGenerated > ago(LookbackWindow)
| where isnotempty(ClientIP)
| summarize
OfficeOpCount = count(),
OfficeOps = make_set(Operation, 10),
OfficeWorkloads = make_set(OfficeWorkload, 5),
OfficeResources = make_set(OfficeObjectId, 5),
OfficeUsers = dcount(UserId)
by ClientIP;
// Compound detection: risky-owned SP + sign-in spike + M365 data plane access
let SpikyRiskySPs = CurrentSignins
| join kind=inner HistoricalAvg on ServicePrincipalId
| join kind=inner RiskyOwnedSPs on $left.ServicePrincipalId == $right.SPId
| where todouble(CurrentCount) >= SpikeMultiplier * iff(HourlyAvg < 1.0, 1.0, HourlyAvg)
| extend SpikeRatio = round(todouble(CurrentCount) / iff(HourlyAvg < 1.0, 1.0, HourlyAvg), 1);
SpikyRiskySPs
| mv-expand SigninIP = IPList to typeof(string)
| join kind=leftouter OfficeImpact on $left.SigninIP == $right.ClientIP
| summarize
OwnerUserId = any(OwnerUserId),
OwnerUPN = any(OwnerUPN),
RiskEventTypes = any(RiskEventTypes),
RiskLevel = any(RiskLevel),
FirstRisk = any(FirstRisk),
CurrentCount = any(CurrentCount),
HourlyAvg = any(HourlyAvg),
SpikeRatio = any(SpikeRatio),
UniqueIPs = any(UniqueIPs),
IPList = any(IPList),
Countries = any(Countries),
Resources = any(Resources),
CredTypes = any(CredTypes),
OfficeOpCount = sum(OfficeOpCount),
OfficeOps = make_set(OfficeOps, 10),
OfficeWorkloads = make_set(OfficeWorkloads, 5),
OfficeResources = make_set(OfficeResources, 5),
OfficeUsers = sum(OfficeUsers)
by ServicePrincipalId, ServicePrincipalName, AppId
| extend AlertSeverity = case(
RiskLevel == "high" and OfficeOpCount > 0 and SpikeRatio > 10, "Critical",
RiskLevel == "high" and OfficeOpCount > 0, "High",
RiskLevel == "high", "High",
OfficeOpCount > 0, "Medium",
"Medium")
| project
ServicePrincipalId, ServicePrincipalName, AppId,
OwnerUserId, OwnerUPN, RiskEventTypes, RiskLevel, FirstRisk,
CurrentCount, HourlyAvg, SpikeRatio,
UniqueIPs, IPList, Countries, Resources, CredTypes,
OfficeOpCount, OfficeOps, OfficeWorkloads, OfficeResources, OfficeUsers,
AlertSeverity
| order by SpikeRatio desc
This KQL query is designed to detect potential security threats involving Azure Active Directory (AAD) and Microsoft 365 (M365) services. Here's a simplified breakdown of what the query does:
Purpose: The query aims to identify suspicious activity where a service principal (SP) experiences a sudden surge in sign-ins after a user associated with that SP has been flagged for risky behavior. This could indicate a compromised account being used for unauthorized access.
Key Components:
Exclusions: The query excludes trusted IP addresses from the analysis to reduce false positives.
Detection Logic:
Output: The query outputs a list of service principals with details about the associated user, the nature of the risk, the sign-in spike, and any M365 data access, ordered by the severity of the spike.
In essence, this query helps security teams identify and prioritize potential security breaches involving compromised user accounts and unauthorized access to cloud resources.

David Alonso
Released: April 21, 2026
Tables
Keywords
Operators