Query Details

Workload Identity Info Xdr

Query

id: 0cd18905-c644-4ad1-b70d-95195620ed98
Function:
    Title: "Function to get summarized overview of application and workload identities from IdentityInfo and OAuthAppInfo table with API Permissions, Azure RBAC- and Entra ID roles with enriched details from my EntraOps classification, critical asset management and CSPM."
    Version: "1.3.0"
    LastUpdated: "2025-06-13"
Category: Microsoft Defender XDR Function
FunctionName: WorkloadIdentityInfoXdr
FunctionAlias: WorkloadIdentityInfoXdr
FunctionQuery: |
    let WorkloadIdentityInfoXdr = (ServicePrincipalName:string, ServicePrincipalObjectId:string) {
        let FirstPartyApps = externaldata(
            AppId: string,
            AppDisplayName: string,
            AppOwnerOrganizationId: string,
            Source: string
            ) ["https://raw.githubusercontent.com/merill/microsoft-info/main/_info/MicrosoftApps.json"] with(format='multijson')
            | project OAuthAppId = AppId, AppOwnerTenantId = AppOwnerOrganizationId, FirstPartyAppDisplayName = AppDisplayName;
        let SensitiveEntraDirectoryRoles = externaldata(
            RoleName: string,
            RoleId: string,
            isPrivileged: bool,
            Categories: string,
            Classification: dynamic
            ) ["https://raw.githubusercontent.com/Cloud-Architekt/AzurePrivilegedIAM/main/Classification/Classification_EntraIdDirectoryRoles.json"] with(format='multijson')
            | project RoleDefinitionName = RoleName, RoleId, RoleIsPrivileged = isPrivileged, Classification, RoleCategories = Categories;
        let SensitiveApiPermissions = externaldata(
            PermissionId: string,
            PermissionValue: string,
            PermissionType: string,
            TargetAppDisplayName: string,
            TargetAppId: string,
            Category: string,
            EAMTierLevelName: string
            ) [@"https://raw.githubusercontent.com/Cloud-Architekt/AzurePrivilegedIAM/refs/heads/main/Classification/Classification_ApiPermissions.json"] with (format="multijson");
        let PrivilegedAzureRoles = dynamic(['Owner','Contributor','Access Review Operator Service Role','Azure File Sync Administrator','Role Based Access Control Administrator','User Access Administrator']);
        let PrivilegedArmOperations = toscalar(
            externaldata(RoleAction: string)
                [@"https://raw.githubusercontent.com/Cloud-Architekt/AzurePrivilegedIAM/refs/heads/main/PrivilegedOperations/ArmApiRequest.csv"] with (format="csv", ignoreFirstRecord=true)
            | summarize make_set(RoleAction)
        );
        let PrivilegedArmOperationsPattern = @"Microsoft\.Authorization/.*/action";
        let PrivilegedGroupMinCriticalLevel = 2;
        // Materialize risk state — uses isfuzzy=true so the query succeeds even if AADRiskyServicePrincipals
        // does not exist in the workspace.
        let RiskData = materialize(
            union isfuzzy=true
                (AADRiskyServicePrincipals
                    | where TimeGenerated > ago(30d)
                    | summarize arg_max(TimeGenerated, RiskLevel, RiskState, RiskDetail) by Id
                    | project-rename ServicePrincipalId = Id),
                (datatable(ServicePrincipalId:string, RiskLevel:string, RiskState:string, RiskDetail:string)[])
            | where isnotempty(ServicePrincipalId)
        );
        // Materialize ExposureGraphEdges "has role on" subset — used in two separate joins
        let HasRoleEdges = materialize(
            ExposureGraphEdges | where EdgeLabel == @"has role on"
        );
        // Materialize ExposureGraphNodes to avoid scanning the table multiple times
        let AllGraphNodes = materialize(ExposureGraphNodes);
        // Materialize IdentityInfo once to avoid scanning the table twice
        let FilteredIdentityInfo = materialize(
            IdentityInfo
                | where Type == "ServiceAccount" and SourceProvider == "AzureActiveDirectory"
                | where Timestamp > ago(14d)
                | summarize arg_max(Timestamp, *) by AccountObjectId
        );
        // Materialize filtered OAuthAppInfo once to avoid scanning the table twice
        let FilteredOAuthApp = materialize(
            OAuthAppInfo
                | where Timestamp >ago(30d)
                | where AppName contains ServicePrincipalName and ServicePrincipalId contains tostring(ServicePrincipalObjectId)
                | summarize arg_max(Timestamp, *) by ServicePrincipalId
        );
        FilteredIdentityInfo
            | where AccountDisplayName contains ServicePrincipalName and AccountObjectId contains tostring(ServicePrincipalObjectId)
            | project ServicePrincipalName = AccountDisplayName, ServicePrincipalId = AccountObjectId, CriticalityLevel
        // Lookup for OAuth application details
        | lookup (
            FilteredOAuthApp
        ) on ServicePrincipalId
        // Lookup for Graph API Classification
        | lookup (
            FilteredOAuthApp
                | mv-expand Perm = parse_json(Permissions)
                | extend TargetAppId = tostring(Perm["TargetAppId"])
                | extend TargetAppDisplayName = tostring(Perm["TargetAppDisplayName"])
                | extend PermissionValue = tostring(Perm["PermissionValue"])
                | extend PermissionType = tostring(Perm["PermissionType"])
                | extend InUse = tostring(Perm["InUse"])
                | extend PrivilegeLevel = tostring(Perm["PrivilegeLevel"])
                | project-away Perm
                | join kind = leftouter (
                    SensitiveApiPermissions
                ) on PermissionValue, PermissionType, TargetAppId
                | extend ApiPermission = bag_pack_columns(PermissionId, PermissionValue, PermissionType, TargetAppId, TargetAppDisplayName, InUse, PrivilegeLevel, Category, EAMTierLevelName)
                | summarize ApiPermissions = make_set(ApiPermission) by ServicePrincipalId
        ) on ServicePrincipalId
        | project-away Permissions
        // Lookup for latest Identity Protection risk state
        | join kind=leftouter (
            RiskData
        ) on ServicePrincipalId
        // Lookup for First Party App Status
        | join kind=leftouter ( FirstPartyApps ) on OAuthAppId, AppOwnerTenantId
        | extend IsFirstPartyApp = isnotempty(FirstPartyAppDisplayName)
        | project-away FirstPartyAppDisplayName
        // Lookup for Permanent or Active Entra ID Roles with Classification to EntraOps
        | join kind=leftouter (
            FilteredIdentityInfo
                | where isnotempty(AssignedRoles)
                | mv-expand AssignedRoles
                | extend RoleDefinitionName = tostring(AssignedRoles)
                | join kind=inner ( SensitiveEntraDirectoryRoles
                ) on RoleDefinitionName
                | extend RoleDefinitionId = RoleId
                | extend Category = RoleCategories
                | extend EAMTierLevelName = tostring(Classification.EAMTierLevelName)
                | project RoleAssignments = bag_pack_columns(RoleDefinitionName, RoleDefinitionId, Category, EAMTierLevelName, RoleIsPrivileged), ServicePrincipalId = AccountObjectId
                | summarize AssignedEntraRoles = make_set(RoleAssignments) by ServicePrincipalId
        ) on ServicePrincipalId
        // Lookup for Critical asset and Graph node details
        | join kind=leftouter (
            AllGraphNodes
            | where NodeLabel == @"serviceprincipal" or NodeLabel == @"managedidentity"
            | extend rawData = parse_json(NodeProperties)["rawData"]
            | mv-expand EntityIds
            | where EntityIds.type == "AadObjectId"
            | extend EntityId = tostring(EntityIds.id)
            | extend ServicePrincipalId = tostring(extract("objectid=([\\w-]+)", 1, EntityId))
            | extend ServicePrincipalType = tostring(rawData["servicePrincipalType"])
            | extend XspmCriticalAssetDetails = rawData["criticalityLevel"]
            | extend XspmGraphNodeDetails = bag_pack_columns(NodeId, NodeName, NodeLabel)
            | project ServicePrincipalId, ServicePrincipalType, XspmGraphNodeId = NodeId, XspmGraphNodeDetails, XspmCriticalAssetDetails
        ) on ServicePrincipalId
        // Lookup for Graph node details of OAuth App
        | join kind=leftouter (
            AllGraphNodes
            | where NodeLabel == @"Microsoft Entra OAuth App"
            | mv-expand EntityIds
            | where EntityIds.type == "AadApplicationId"
            | extend OAuthAppId = tostring(EntityIds.id)
            | extend rawData = parse_json(NodeProperties)["rawData"]
            | extend IsExternalApp = tobool(rawData["isBelongToExternalApplication"])
            | extend XspmGraphOAuthAppNodeDetails = bag_pack_columns(NodeId, NodeName, NodeLabel)
            | project XspmGraphOAuthAppNodeDetails, OAuthAppId, IsExternalApp
            ) on OAuthAppId
        // Lookup for Azure roles from Graph edges
        | join kind=leftouter (
            HasRoleEdges
            | where SourceNodeLabel == "managedidentity" or SourceNodeLabel == "serviceprincipal"
            | where TargetNodeCategories contains 'environmentAzure'
            | mv-expand parse_json(EdgeProperties)["rawData"]["permissions"]["roles"]
            | extend RoleDefinitionName = parse_json(EdgeProperties_rawData_permissions_roles)["name"]
            | extend RoleDefinitionId = parse_json(EdgeProperties_rawData_permissions_roles)["id"]
            | extend RoleActions = parse_json(EdgeProperties_rawData_permissions_roles)["actions"]
            | extend RoleIsPrivileged = iff((
                RoleActions matches regex (PrivilegedArmOperationsPattern)
                or RoleActions has_any (PrivilegedArmOperations)) == true
                or RoleDefinitionName in~ (PrivilegedAzureRoles)
                or RoleActions[0] == "*",
                "true", "false")
            | extend IsOverProvisioned = EdgeProperties["rawData"]["isOverProvisioned"]
            | extend IsIdentityInactive = EdgeProperties["rawData"]["isIdentityInactive"]
            | project RoleAssignments = bag_pack_columns(RoleDefinitionName, RoleDefinitionId, RoleIsPrivileged, IsOverProvisioned, IsIdentityInactive), XspmGraphNodeId = SourceNodeId
            | summarize AssignedAzureRoles = make_set(RoleAssignments) by XspmGraphNodeId
        ) on XspmGraphNodeId
        // Lookup for Security Group assignments from Graph edges
        | join kind=leftouter (
            ExposureGraphEdges
            | where SourceNodeLabel == "managedidentity" or SourceNodeLabel == "serviceprincipal"
            | where EdgeLabel == @"member of"
            | where TargetNodeLabel == @"group"
            | join kind=inner ( AllGraphNodes
                | mv-expand EntityIds
                | where EntityIds.type == "AadObjectId"
                | extend EntityId = tostring(EntityIds.id)
                | extend GroupDisplayName = NodeName
                | extend GroupObjectId = tostring(extract("objectid=([\\w-]+)", 1, EntityId))
                | extend XspmCriticalAssetDetails = parse_json(NodeProperties)["rawData"]["criticalityLevel"]
            ) on $left.TargetNodeId == $right.NodeId
            | extend GroupIsPrivileged = iff(
                XspmCriticalAssetDetails["criticalityLevel"] <= PrivilegedGroupMinCriticalLevel or XspmCriticalAssetDetails["ruleBasedCriticalityLevel"] <= PrivilegedGroupMinCriticalLevel,
                "true",
                "false"
                )
            | project RoleAssignments = bag_pack_columns(GroupDisplayName, GroupObjectId, GroupIsPrivileged), XspmGraphNodeId = SourceNodeId
            | summarize AssignedGroupMembership = make_set(RoleAssignments) by XspmGraphNodeId
        ) on XspmGraphNodeId
        // Lookup for Nodes with "can authenticate as" relation from Graph edges (App Registration or Azure Resources with Managed Identities)
        | join kind=leftouter (
            ExposureGraphEdges
            | where EdgeLabel == @"can authenticate as"
            | where TargetNodeLabel == @"managedidentity" or TargetNodeLabel == @"serviceprincipal"
            | join kind=leftouter ( AllGraphNodes | project SourceNodeId = NodeId, EntityIds ) on SourceNodeId
            | extend NodeId = SourceNodeId, NodeName = SourceNodeName, NodeLabel = SourceNodeLabel
            | extend AuthenticatedBy = bag_pack_columns(NodeId, NodeName, NodeLabel, EntityIds)
            | summarize AuthenticatedBy = make_set(AuthenticatedBy) by TargetNodeId
        ) on $left.XspmGraphNodeId == $right.TargetNodeId
        // Lookup for Ownership (currently limited to Application Objects)
        | extend XspmGraphOAuthAppNodeId = tostring(XspmGraphOAuthAppNodeDetails.NodeId)
        | join kind=leftouter (
            HasRoleEdges
            // Currently limited to OAuth App edges
            | where TargetNodeLabel == "Microsoft Entra OAuth App"
            | extend RolePermissions = EdgeProperties["rawData"]["roles"]["rolePermissions"]
            | mv-expand RolePermissions
            | where RolePermissions.["roleValue"] startswith 'Owner'
            | join kind=leftouter (
                AllGraphNodes | project SourceNodeId = NodeId, EntityIds
            ) on SourceNodeId
            | extend NodeId = SourceNodeId, NodeName = SourceNodeName, NodeLabel = SourceNodeLabel
            | extend OwnedBy = bag_pack_columns(NodeId, NodeName, NodeLabel, EntityIds)
            | project-rename XspmGraphOAuthAppNodeId = TargetNodeId
            | summarize OwnedBy = make_set(OwnedBy) by XspmGraphOAuthAppNodeId
        ) on XspmGraphOAuthAppNodeId
        | extend CriticalityLevel = coalesce(toint(XspmCriticalAssetDetails["criticalityLevel"]), CriticalityLevel)
        // Compute highest privilege tier level and unique categories across Entra roles and API permissions
        | mv-apply EntraRole = AssignedEntraRoles to typeof(dynamic) on (
            summarize EntraRoleTierLevels = make_set_if(tostring(EntraRole.EAMTierLevelName), tostring(EntraRole.EAMTierLevelName) !in ("Unclassified", "")),
                        EntraRoleCategories = make_set_if(tostring(EntraRole.Category), isnotempty(tostring(EntraRole.Category)))
        )
        | mv-apply ApiPerm = ApiPermissions to typeof(dynamic) on (
            summarize ApiPermTierLevels = make_set_if(tostring(ApiPerm.EAMTierLevelName), tostring(ApiPerm.EAMTierLevelName) !in ("Unclassified", "")),
                        ApiPermCategories = make_set_if(tostring(ApiPerm.Category), isnotempty(tostring(ApiPerm.Category)))
        )
        | extend AllTierLevels = set_union(EntraRoleTierLevels, ApiPermTierLevels)
        | extend PrivilegeTierLevel = case(
            set_has_element(AllTierLevels, "ControlPlane"),   "ControlPlane",
            set_has_element(AllTierLevels, "ManagementPlane"), "ManagementPlane",
            set_has_element(AllTierLevels, "WorkloadPlane"),   "WorkloadPlane",
            set_has_element(AllTierLevels, "UserAccess"),      "UserAccess",
            "Unclassified"
        )
        | extend PrivilegeCategories = array_sort_asc(set_union(EntraRoleCategories, ApiPermCategories))
        | project-away EntraRoleTierLevels, EntraRoleCategories, ApiPermTierLevels, ApiPermCategories, AllTierLevels
        // Compute EntraOpsInvestigationScore (0–100) combining risk, privilege tier, asset criticality and privilege level
        | extend _TierScore = case(
            PrivilegeTierLevel == "ControlPlane",    100,
            PrivilegeTierLevel == "ManagementPlane",  75,
            PrivilegeTierLevel == "WorkloadPlane",    40,
            PrivilegeTierLevel == "UserAccess",       15,
            0)
        | extend _CriticalityScore = case(
            CriticalityLevel == 0, 100,
            CriticalityLevel == 1,  75,
            CriticalityLevel == 2,  50,
            CriticalityLevel == 3,  25,
            0)
        | extend _PrivilegeLevelScore = case(
            PrivilegeLevel =~ "High",   100,
            PrivilegeLevel =~ "Medium",  50,
            PrivilegeLevel =~ "Low",     10,
            0)
        | extend EntraOpsInvestigationScore = tolong(round(
                (coalesce(todouble(RiskScore), 0.0) * 0.35)
            + (_TierScore                          * 0.35)
            + (_CriticalityScore                   * 0.20)
            + (_PrivilegeLevelScore                * 0.10)))
        // Ensure ControlPlane always reaches at least score 80
        | extend EntraOpsInvestigationScore = iff(PrivilegeTierLevel == "ControlPlane", max_of(EntraOpsInvestigationScore, 80), EntraOpsInvestigationScore)
        // Boost score for confirmed compromise or high Identity Protection risk
        | extend EntraOpsInvestigationScore = iff(RiskState =~ "confirmedCompromised" or RiskLevel =~ "high", max_of(EntraOpsInvestigationScore, 90), EntraOpsInvestigationScore)
        | project-away _TierScore, _CriticalityScore, _PrivilegeLevelScore
        | project
            // Timestamps
            Timestamp, TimeGenerated,
            // Identity core
            ServicePrincipalName, ServicePrincipalId, ServicePrincipalType, OAuthAppId,
            // Risk & investigation scoring
            EntraOpsInvestigationScore, RiskScore, RiskLevel, RiskState, RiskDetail,
            // Privilege classification
            CriticalityLevel, PrivilegeTierLevel, PrivilegeLevel, PrivilegeCategories,
            // App metadata
            AppStatus, IsExternalApp, IsFirstPartyApp, IsAdminConsented, VerifiedPublisher, AppOrigin, AppOwnerTenantId, AddedOnTime, LastModifiedTime,
            // Permissions & assignments
            ApiPermissions, AssignedEntraRoles, AssignedAzureRoles, AssignedGroupMembership,
            // Graph relationships
            AuthenticatedBy, OwnedBy
        };
    WorkloadIdentityInfoXdr(ServicePrincipalName, ServicePrincipalObjectId)

Explanation

This KQL query is designed to provide a comprehensive overview of application and workload identities within a Microsoft environment, specifically focusing on their roles, permissions, and associated risks. Here's a simplified breakdown of what the query does:

  1. Data Sources: It pulls data from various external JSON and CSV files, as well as internal tables like IdentityInfo, OAuthAppInfo, AADRiskyServicePrincipals, ExposureGraphEdges, and ExposureGraphNodes.

  2. First-Party Apps: It identifies first-party applications by checking against a list of known Microsoft applications.

  3. Sensitive Roles and Permissions: It gathers information about sensitive roles and API permissions, classifying them based on their privilege levels.

  4. Risk Assessment: It evaluates the risk associated with service principals by checking recent risky activity and assigning a risk score.

  5. Role and Permission Assignments: It looks up and summarizes the roles and permissions assigned to service principals, including Azure RBAC roles and Entra ID roles.

  6. Critical Assets and Graph Nodes: It identifies critical assets and graph node details related to service principals and OAuth applications.

  7. Privilege and Criticality Scoring: It calculates an investigation score based on privilege tier, asset criticality, and risk level, ensuring that high-risk or compromised identities are flagged.

  8. Output: The query outputs a detailed summary including timestamps, identity details, risk scores, privilege classifications, app metadata, permissions, and graph relationships.

Overall, this query is used to enhance security monitoring and risk management by providing a detailed and enriched view of identities and their associated roles and permissions within a Microsoft environment.

Details

Thomas Naunheim profile picture

Thomas Naunheim

Released: June 13, 2026

Tables

AADRiskyServicePrincipalsExposureGraphEdgesExposureGraphNodesIdentityInfoOAuthAppInfo

Keywords

ApplicationWorkloadIdentitiesIdentityInfoOAuthAppInfoAPIPermissionsAzureRBACEntraIDEntraOpsCriticalAssetManagementCSPMServicePrincipalNameServicePrincipalObjectIdFirstPartyAppsSensitiveEntraDirectoryRolesSensitiveApiPermissionsPrivilegedAzureRolesPrivilegedArmOperationsRiskDataExposureGraphEdgesAllGraphNodesFilteredIdentityInfoFilteredOAuthAppRoleDefinitionNameRoleIdRoleIsPrivilegedClassificationRoleCategoriesPermissionIdPermissionValuePermissionTypeTargetAppDisplayNameTargetAppIdCategoryEAMTierLevelNamePrivilegedGroupMinCriticalLevelAADRiskyServicePrincipalsRiskLevelRiskStateRiskDetailAccountObjectIdAccountDisplayNameAssignedRolesRoleDefinitionIdXspmCriticalAssetDetailsXspmGraphNodeDetailsXspmGraphOAuthAppNodeDetailsIsExternalAppIsOverProvisionedIsIdentityInactiveGroupDisplayNameGroupObjectIdGroupIsPrivilegedAuthenticatedByOwnedByCriticalityLevelPrivilegeTierLevelPrivilegeLevelPrivilegeCategoriesEntraOpsInvestigationScoreRiskScoreAppStatusIsAdminConsentedVerifiedPublisherAppOriginAppOwnerTenantIdAddedOnTimeLastModifiedTime

Operators

letexternaldataprojectproject-renameproject-awaysummarizemake_settoscalarmaterializeunionisfuzzywherearg_maxdatatableisnotemptymv-expandparse_jsonextendtostringjoinkindleftouterbag_pack_columnsiffmatchesregexhas_anyin~coalescetointset_unionset_has_elementcasearray_sort_ascmake_set_ifroundtolongmax_ofiffprojectago

Actions