https://detective.kusto.io/inbox
Season 2 - Case 2 - Catch the Phishermen!
// Training
//Get the schema, know what we are working with
PhoneCalls
| getschema
//see how big a data set
PhoneCalls
| count
//see some example data
PhoneCalls
| take 100
//parse "out" the json data using dotNotation so we can use it to filter
PhoneCalls
| where EventType == 'Connect'
| extend Origin=tostring(Properties.Origin), Destination=tostring(Properties.Destination), Hidden=tobool(Properties.IsHidden)
| take 10
//Test 1 using parsed json as filters
PhoneCalls
| where EventType == 'Connect'
| where tobool(Properties.IsHidden) == false or Properties !has 'IsHidden'
| summarize Count=count() by Phone=tostring(Properties.Origin)
| top 1 by Count
//Perform some statistics
PhoneCalls
| where EventType =='Connect'
| summarize calls=count() by bin(Timestamp, 1h)
| summarize avg(calls), percentile(calls, 50)
//Use join
PhoneCalls
| where EventType == 'Connect'
| where Properties.Destination != '0'
| join kind=inner
(PhoneCalls
| where EventType == 'Disconnect'
| extend DisconnectProperties = Properties)
on CallConnectionId
| where DisconnectProperties.DisconnectedBy == 'Destination'
| count
//Solution
PhoneCalls
| where EventType == 'Connect'
| extend Origin=tostring(Properties.Origin), Destination=tostring(Properties.Destination)
| where Properties has 'IsHidden'
| join kind=inner
(PhoneCalls
| where EventType == 'Disconnect'
| extend DisconnectedBy = tostring(Properties.DisconnectedBy)
| where DisconnectedBy == 'Destination')
on CallConnectionId
| summarize PerpNumber = dcount(Destination) by Origin
| top 1 by PerpNumber
Season 2 - Case 1 - To bill or not to bill?
//Case 1 training
//can run sql directly
SELECT SUM(Consumed * Cost) AS TotalCost
FROM Costs
JOIN Consumption ON Costs.MeterType = Consumption.MeterType
//can use EXPLAIN to convert to KQL
EXPLAIN
SELECT SUM(Consumed * Cost) AS TotalCost
FROM Costs
JOIN Consumption ON Costs.MeterType = Consumption.MeterType
//from EXPLAIN
Costs
| join kind=inner
(Consumption
| project-rename ['Consumption.MeterType']=MeterType) on (['$left'].MeterType == ['$right'].['Consumption.MeterType'])
| summarize TotalCost=sum(__sql_multiply(Consumed, Cost))
| project TotalCost
//cleaned up a bit USES LOOKUP
Consumption
| summarize TotalConsumed = sum(Consumed) by MeterType
| lookup Costs on MeterType
| extend TotalCost = TotalConsumed*Cost
| summarize sum(TotalCost)
// who used the most water
Consumption
| where MeterType == "Water"
|summarize WaterWaster=sum(Consumed) by HouseholdId
|top 10 by WaterWaster
//who used the most electricity
Consumption
| where MeterType == "Electricity"
| summarize ElectricityWaster=sum(Consumed) by HouseholdId
| top 10 by ElectricityWaster
//Water consumption by date Timestamp
Consumption
| where MeterType == "Water"
| summarize WaterWaster=sum(Consumed) by bin(Timestamp, 1d)
| render timechart
//Electricity by date
Consumption
| where MeterType == "Electricity"
| summarize Elec_Waster=sum(Consumed) by bin(Timestamp, 1d)
| render timechart
//Elec stats like min max count and avg
Consumption
| where MeterType =="Electricity"
| summarize Total_Count=count(), Lowest_Value=min(Consumed), Highest_Value=max(Consumed), Average_Value=avg(Consumed)
//Case 1 - Solved
Consumption
| summarize hint.strategy=shuffle arg_max(Consumed, *) by HouseholdId, MeterType, Timestamp
| join kind=inner (
Costs
| project MeterType, Cost
) on MeterType
| extend BillAmount = Consumed * Cost
| summarize TotalBillsAmount = sum(BillAmount)
//using lookup function
Consumption
| summarize hint.strategy=shuffle arg_max(Consumed, *) by HouseholdId, MeterType, Timestamp
| lookup Costs on MeterType
| summarize sum(Consumed * Cost)
//Even cleaner using the DISTINCT function
Consumption
| distinct Consumed, HouseholdId, MeterType, Timestamp
| lookup Costs on MeterType
| summarize round(sum(Consumed * Cost),2)
Kusto Detective KQL Challenge Information
## Welcome to the Kusto Detective Agency - Season 2!
Here you can find my solutions for Season 2.
Feel free to reach out if you have any questions.
Season 2 - Case 0 - Onboarding
//Case 0 training
DetectiveCases
| where EventType == 'CaseOpened'
| extend Bounty = toreal(Properties.Bounty)
| count
DetectiveCases
| where EventType == 'CaseSolved'
| summarize CasesSolved=count() by DetectiveId
//| count
| top 10 by CasesSolved
DetectiveCases
| where EventType == 'CaseOpened'
| project CaseOpened = Timestamp, CaseId
| join kind=inner
(
DetectiveCases
| where EventType == 'CaseAssigned'
| summarize FirstAssigned=min(Timestamp) by CaseId)
on CaseId
| summarize Average=avg(FirstAssigned - CaseOpened)
DetectiveCases
| where EventType == 'CaseOpened'
| project CaseOpened = Timestamp, CaseId
| join kind=inner
(
DetectiveCases
| where EventType == 'CaseSolved'
| summarize CaseSolved=min(Timestamp) by CaseId)
on CaseId
| summarize Average=avg(CaseSolved - CaseOpened)
DetectiveCases
| where EventType == 'CaseOpened'
| project CaseOpened=Timestamp, CaseId
| join kind=inner (
DetectiveCases
| where EventType == 'CaseSolved'
| summarize CaseSolved=min(Timestamp) by CaseId
) on CaseId
| summarize avg(CaseSolved - CaseOpened)
//Case 0 Solved
DetectiveCases
| where Timestamp >= datetime(2022-01-01) and Timestamp < datetime(2023-01-01)
| where EventType == "CaseOpened" and isnotnull(Properties.Bounty) and isnotnull(CaseId)
| project CaseId, Bounty = todouble(parse_json(Properties).Bounty)
| join kind=inner (
DetectiveCases
| where EventType == "CaseAssigned" and isnotnull(DetectiveId) and isnotnull(CaseId)
| project CaseId, DetectiveId
) on CaseId
| summarize TotalEarnings = sum(Bounty) by DetectiveId
| top 10 by TotalEarnings desc
Season 2 - Case 3 - Return Stolen Cars!
// Records count
CarsTraffic
| count
// See some examples
CarsTraffic
| take 100
// Records count
StolenCars
| count
//See some examples
StolenCars
| take 100
// find all rows that have one of a list of VIN numbers
CarsTraffic
| where VIN in ('FD655964S', 'JO132865F', 'AD701526K')
| count
//the stolen car with the most sightings
CarsTraffic
| where VIN in (StolenCars)
| summarize Sightings=count() by VIN
| top 1 by Sightings
//which were the first and last cars to be spotted at a certain intersection
CarsTraffic
| where Street == 180 and Ave == 121
| summarize First=arg_min(Timestamp, VIN), Last=arg_max(Timestamp, VIN)
//the street and avenue was stolen car with VIN number IR177866Y first sighted
CarsTraffic
| where VIN == 'IR177866Y'
| summarize First=arg_min(Timestamp, Street, Ave)
//find all cars that were on the same street at a certain time in both the morning and evening of June 14
CarsTraffic
| where Timestamp between (datetime(2023-06-14 08:00) .. 1h)
| join kind = inner (
CarsTraffic
| where Timestamp between (datetime(2023-06-14 20:00) .. 1h)
) on VIN, Street, Ave
| summarize dcount(VIN)
//cars have been at Street 228, Ave 145 but have never been sighted at location Street 121, Ave 180
CarsTraffic
| where Street == 228 and Ave == 145
| join kind=leftanti (
CarsTraffic | where Street == 121 and Ave == 180 | distinct VIN) on VIN
| distinct VIN
| count
//create the joined data to find the stash house for the stolen cars
//last seen location for stolen cars
let VinsbyLocation =
StolenCars
| join kind=inner (CarsTraffic) on $left.VIN == $right.VIN
| summarize arg_max(Timestamp, Ave, Street) by VIN
| extend TimeKey = bin(Timestamp, 15m)
| project TimeKey, Ave, Street;
//all VIN at the same location within 15 time window
let VinsbyTime = VinsbyLocation
| join kind=inner ( CarsTraffic | extend TimeKey = bin(Timestamp, 15m)) on TimeKey, Ave, Street
| summarize by VIN;
//last sighting for each vin and most visited. The top two are where the plates were swapped
//the next is the stash spot
VinsbyTime
| join kind=inner ( CarsTraffic | summarize arg_max(Timestamp, Ave, Street) by VIN ) on VIN
| summarize count() by Ave, Street
| order by count_ desc
Season 2 - Case 4 - Triple trouble!
// Triple trouble
NetworkMetrics
| getschema
NetworkMetrics
//| count
| take 10
IpInfo
| getschema
IpInfo
//| count
| take 10
//bin function to create buckets of time
NetworkMetrics
| summarize count() by bin(Timestamp, 1d)
| render timechart
// You can use time-chart to look on the data
NetworkMetrics
| summarize avg(BytesSent) by bin(Timestamp, 1d)
| render timechart
// .. or you can use query to calculate it
NetworkMetrics
| summarize avg(BytesSent) by bin(Timestamp, 1d)
| top 1 by avg_BytesSent asc
NetworkMetrics
| take 10
| evaluate ipv4_lookup(IpInfo, ClientIP, IpCidr)
//Which company most frequently contacted IP address 178.248.55.129
NetworkMetrics
| where TargetIP == '178.248.55.129'
| evaluate ipv4_lookup(IpInfo, ClientIP, IpCidr)
| summarize Count=count() by Info
| order by Count
//The following query creates two series (as two columns): one with the number of records per day and the other with the average amount of data received per day.
NetworkMetrics
| make-series count(), avg(BytesReceived) on Timestamp step 1d
| render timechart with (ysplit=panels)
//I suspect that someone has hacked into the Digitown municipality system and stolen these documents.
// Our system is a known data hub and hosts various information about the town itself, real-time monitoring
// systems of the city, tax payments, etc.
// It serves as a real-time data provider to many organizations around the world, so it receives a lot of traffic.
// Unfortunately, I dont have much data to give you.
// All I have is a 30day traffic statistics report captured by the Digitown municipality system network routers
let sus =
NetworkMetrics
| summarize dcount(TargetIP) by ClientIP
| where dcount_TargetIP == 1
| distinct ClientIP
| evaluate ipv4_lookup(IpInfo, ClientIP, IpCidr)
| project ClientIP, Info;
NetworkMetrics
| lookup kind=inner sus on ClientIP
| make-series BytesSent=sum(BytesSent) on Timestamp step 1d by Info
| extend (flag, score, baseline) = series_decompose_anomalies(BytesSent)
| extend top_sus = toreal(series_stats_dynamic(score)['max'])
| top 2 by top_sus
Season 2 - Case 5 - El Puente is back!
StorageArchiveLogs
//| getschema
//| take 100
| count
//
StorageArchiveLogs
| parse EventText with TransactionType " blob transaction:" *
| distinct TransactionType
StorageArchiveLogs
| parse EventText with "Read blob transaction: '" BlobURI "' read access (" ReadCount:long " reads) were detected on the origin"
| take 10
StorageArchiveLogs
| parse EventText with
"Read blob transaction: '" BlobURI "' read access (" ReadCount:long " reads) were detected" *
| summarize sum(ReadCount) by BlobURI
| top 1 by sum_ReadCount
StorageArchiveLogs
| parse EventText with TransactionType " blob transaction: '" BlobURI "'" *
| extend Details = parse_url(BlobURI)
| project EventText, BlobURI, Details
| take 10
StorageArchiveLogs
| parse EventText with TransactionType " blob transaction: '" BlobURI "'" *
| extend Details = parse_url(BlobURI)
| extend Host=tostring(Details.Host)
| where TransactionType == 'Delete'
| summarize count() by Host
| top 1 by count_
// Using conditional aggregates
StorageArchiveLogs
| parse EventText with TransactionType " blob transaction: '" BlobURI "'" *
| parse EventText with * "(" Reads:long "reads)" *
| extend Host = tostring(parse_url(BlobURI).Host)
| summarize Deletes=countif(TransactionType == 'Delete'),
Reads=sumif(Reads, TransactionType == 'Read') by Host
| top 1 by Reads
// Using join - requires double-pass over the data, so it works slower
let ParsedData=StorageArchiveLogs
| parse EventText with TransactionType " blob transaction: '" BlobURI "'" *
| extend Host = tostring(parse_url(BlobURI).Host);
ParsedData
| where TransactionType == 'Read'
| parse EventText with * "(" Reads:long "reads)" *
| summarize Reads=sum(Reads) by Host
| join kind=inner
(
ParsedData
| where TransactionType == 'Delete'
| summarize count() by Host)
on Host
| top 1 by Reads
let sus = StorageArchiveLogs
//parse blob data nd get all host and path values
| parse EventText with TransactionType " blob transaction:" *
| parse EventText with * "blob transaction: '" BlobURI "'" *
| parse EventText with * "read access (" ReadCount: long " reads)" *
| extend All = parse_url(BlobURI)
| extend Host = tostring(All.Host)
| extend Path = tostring(All.Path)
//define published time from create records deleted time from delete records
| summarize
published = minif(Timestamp, TransactionType == 'Create')
, deleted = minif(Timestamp, TransactionType == 'Delete')
by Host, Path
//look for deletes that followed a create within 30 min window
| extend lifespan = deleted - published
| where lifespan < timespan(30m)
| project Path;
StorageArchiveLogs
// get the Event Text and project EventText Blob URI All Path
| parse EventText with * "blob transaction: '" BlobURI "'" *
| extend All = parse_url(BlobURI)
| extend Path = tostring(All.Path)
| where Path in (sus)