6  Anomaly Detection

library(DBI)
library(tidyverse)
library(dbplyr)
library(knitr)
db <- dbConnect(duckdb::duckdb())
earthquakes <-
  tbl(db, "read_csv_auto('data/earthquakes*.csv')") |>
  compute(name = "earthquakes")
earthquakes |>
  filter(!is.na(mag)) |>
  group_by(mag) |>
  summarize(earthquakes = n()) |>
  mutate(pct_earthquakes = 1.0 * earthquakes / 
           sum(earthquakes, na.rm = TRUE)) |>
  arrange(desc(mag)) |>
  collect(n = 5) |>
  kable()
mag earthquakes pct_earthquakes
9.1 1 7.0e-07
8.8 1 7.0e-07
8.6 1 7.0e-07
8.3 2 1.3e-06
8.2 4 2.7e-06
earthquakes |>
  filter(!is.na(mag)) |>
  group_by(mag) |>
  summarize(earthquakes = n()) |>
  mutate(pct_earthquakes = earthquakes/sum(earthquakes, 
                                           na.rm = TRUE)) |>
  arrange(mag) |>
  collect(n = 5) |>
  kable()
mag earthquakes pct_earthquakes
-9.99 258 0
-9.00 29 0
-5.00 1 0
-2.60 2 0
-2.50 3 0
norcal <-
  earthquakes |>
  filter(!is.na(mag), place == 'Northern California') |>
  group_by(place, mag) |>
  summarize(count = n(), .groups = "drop") 

norcal |> 
  arrange(place, desc(mag)) |> 
  collect(n = 3) |>
  kable()
place mag count
Northern California 5.60 1
Northern California 4.73 1
Northern California 4.51 1
norcal |> 
  arrange(place, mag) |> 
  collect(n = 3) |>
  kable()
place mag count
Northern California -1.6 1
Northern California -1.2 2
Northern California -1.1 7
earthquakes |>
  filter(!is.na(mag), place == 'Northern California') |>
  group_by(place) |>
  window_order(mag) |>
  mutate(percentile = percent_rank()) |>
  group_by(place, mag, percentile) |>
  summarize(count = n(), .groups = "drop") |>
  arrange(place, desc(mag)) |>
  collect(n = 6) |>
  kable()
place mag percentile count
Northern California 5.60 1.0000000 1
Northern California 4.73 0.9999871 1
Northern California 4.51 0.9999741 1
Northern California 4.43 0.9999482 2
Northern California 4.29 0.9999353 1
Northern California 4.28 0.9999224 1
earthquakes |>
  filter(!is.na(mag), place == 'Central Alaska') |>
  select(place, mag) |>
  mutate(percentile = ntile(100, order_by = "mag")) |>
  arrange(place, desc(mag)) |>
  collect(n = 5) |>
  kable()
place mag percentile
Central Alaska 5.4 100
Central Alaska 5.3 100
Central Alaska 5.2 100
Central Alaska 4.8 100
Central Alaska 4.6 100
earthquakes |>
  filter(!is.na(mag), place == 'Central Alaska') |>
  select(place, mag) |>
  group_by(place) |>
  window_order(mag) |>
  mutate(percentile = round(percent_rank() * 100, 0)) |>
  arrange(place, desc(mag)) |>
  show_query()
<SQL>
SELECT
  place,
  mag,
  ROUND(PERCENT_RANK() OVER (PARTITION BY place ORDER BY mag) * 100.0, CAST(ROUND(0.0, 0) AS INTEGER)) AS percentile
FROM earthquakes
WHERE (NOT((mag IS NULL))) AND (place = 'Central Alaska')
ORDER BY place, mag DESC

The ntile() offered by dbplyr works a little differently from other window functions. Rather than preceding the mutate with a window_order(), we need to specify an order_by argument.1

cen_ak_quartiles <-
  earthquakes |>
  filter(!is.na(mag), place == 'Central Alaska') |>
  select(place, mag) |>
  group_by(place) |>
  mutate(ntile = ntile(4, order_by = "mag"))

cen_ak_quartiles |>
  group_by(place, ntile) |>
  summarize(maximum = max(mag, na.rm = TRUE),
            minimum = min(mag, na.rm = TRUE),
            .groups = "drop") |>
  arrange(place, desc(ntile)) |>
  kable()
place ntile maximum minimum
Central Alaska 4 5.4 1.4
Central Alaska 3 1.4 1.1
Central Alaska 2 1.1 0.8
Central Alaska 1 0.8 -0.5
earthquakes |>
  filter(!is.na(mag), place == 'Central Alaska') |>
  select(place, mag) |>
  group_by(place) |>
  summarize(pct_25 = quantile(mag, probs = 0.25, na.rm = TRUE),
            pct_50 = quantile(mag, probs = 0.50, na.rm = TRUE),
            pct_75 = quantile(mag, probs = 0.75, na.rm = TRUE)) |>
  kable()
place pct_25 pct_50 pct_75
Central Alaska 0.8 1.1 1.4
earthquakes |>
  filter(!is.na(mag), place == 'Central Alaska') |>
  summarize(pct_25_mag = quantile(mag, probs = 0.25, na.rm = TRUE),
            pct_25_depth = quantile(depth, probs = 0.25, na.rm = TRUE)) |>
  kable()
pct_25_mag pct_25_depth
0.8 7.1
earthquakes |>
  filter(!is.na(mag), 
         place %in% c('Central Alaska', 'Southern Alaska')) |>
  group_by(place) |>
  summarize(pct_25_mag = quantile(mag, probs = 0.25, na.rm = TRUE),
            pct_25_depth = quantile(depth, probs = 0.25, na.rm = TRUE)) |>
  kable()
place pct_25_mag pct_25_depth
Southern Alaska 1.2 10.1
Central Alaska 0.8 7.1
earthquakes |>
  summarize(sd_mag = sd(mag, na.rm = TRUE),
            stddev_samp_mag = stddev_samp(mag),
            stddev_pop_mag = stddev_pop(mag)) |>
  mutate(diff_samp = sd_mag - stddev_samp_mag,
         diff_pop = sd_mag - stddev_pop_mag) |>
  collect() |>
  kable()
sd_mag stddev_samp_mag stddev_pop_mag diff_samp diff_pop
1.273606 1.273606 1.273606 0 4e-07

Below Cathy uses INNER JOIN with ON 1 = 1.

SELECT * 
FROM earthquakes a
INNER JOIN b ON 1 = 1

Instead, I use CROSS JOIN (this is cross_join in dplyr).

SELECT * 
FROM earthquakes a
CROSS JOIN b

The output in the book differs from one gets from running the code, so I add !(mag %in% c(-9, -9.99)) to get closer to the book’s output.

Note that in constructing mag_stats, I follow the book in using avg(mag) and stddev_pop(mag). In practice, I would probably lean more to using R-compatible mean(mag, na.rm = TRUE) and sd(mag, na.rm = TRUE), respectively. This makes little differ in practice—the only difference is that sd is translated into stddev_samp instead of stddev_pop, which is barely different in this case—but I believe it is helpful to be consistent where possible. Often I find myself moving the data processing from PosrtgreSQL to R or vice versa and this is much easier if the dbplyr code is consistent with the dplyr equivalent.

mag_stats <-
  earthquakes |>
  filter(!is.na(mag)) |>
  summarize(avg_mag = avg(mag),
            std_dev = stddev_pop(mag))

z_scores <-
  earthquakes |>
  filter(!is.na(mag), !(mag %in% c(-9, -9.99))) |>
  select(place, mag) |>
  cross_join(mag_stats) |>
  mutate(z_score = (mag - avg_mag) / std_dev) 

z_scores |>
  arrange(desc(mag)) |>
  collect(n = 3) |>
  kable()
place mag avg_mag std_dev z_score
2011 Great Tohoku Earthquake, Japan 9.1 1.625101 1.273606 5.869083
offshore Bio-Bio, Chile 8.8 1.625101 1.273606 5.633532
off the west coast of northern Sumatra 8.6 1.625101 1.273606 5.476497
z_scores |>
  arrange(mag) |>
  collect(n = 3) |>
  kable()
place mag avg_mag std_dev z_score
20 km SSW of Pa‘auilo, Hawaii -5.0 1.625101 1.273606 -5.201846
Nevada -2.6 1.625101 1.273606 -3.317433
Nevada -2.6 1.625101 1.273606 -3.317433

6.1 Graphing to find anomalies visually

earthquakes |>
  filter(!is.na(mag)) |>
  ggplot(aes(x = mag)) +
  geom_histogram(breaks = seq(-10, 10, 0.1))

earthquakes |>
  filter(!is.na(mag),
         between(mag, 7.2, 9.5)) |>
  ggplot(aes(x = mag)) +
  geom_histogram(binwidth = 0.1)

earthquakes |>
  filter(!is.na(mag),
         between(mag, 7.2, 9.5)) |>
  ggplot(aes(x = mag)) +
  geom_bar() +
  scale_x_binned(breaks = seq(7.2, 9.5, 0.1))

earthquakes |>
  filter(!is.na(mag), !is.na(depth)) |>
  distinct(mag, depth) |>
  ggplot(aes(x = mag, y = depth)) +
  geom_point(size = 0.1, colour = "blue")

earthquakes |>
  filter(!is.na(mag), !is.na(depth)) |>
  filter(between(mag, 4, 7), depth <= 50) |>
  ggplot(aes(x = mag, y = depth)) +
  geom_count(color = "blue")

japan_quakes <-
  earthquakes |>
  filter(!is.na(mag), !is.na(depth)) |>
  filter(grepl("Japan", place)) 

japan_quakes |>
  ggplot(aes(y = mag)) +
  geom_boxplot(width = 0.5)

japan_quakes |>
  summarize(p25 = quantile(mag, probs = 0.25, na.rm = TRUE),
            p50 = quantile(mag, probs = 0.50, na.rm = TRUE),
            p75 = quantile(mag, probs = 0.75, na.rm = TRUE)) |>
  mutate(iqr = (p75 - p25) * 1.5,
         lower_whisker = p25 - (p75 - p25) * 1.5,
         upper_whisker = p75 + (p75 - p25) * 1.5) |>
  kable()
p25 p50 p75 iqr lower_whisker upper_whisker
4.3 4.5 4.7 0.6 3.7 5.3
japan_quakes |>
  select(mag, time) |>
  collect() |>
  mutate(year = as.factor(year(time))) |>
  ggplot(aes(y = mag, x = year, group = year)) +
  geom_boxplot()

6.2 Forms of Anomalies

6.2.1 Anomalous Values

earthquakes |>
  filter(mag >= 1.08) |>
  group_by(mag) |>
  summarize(count = n()) |>
  arrange(mag) |>
  collect(n = 5) |>
  kable()
mag count
1.08 3863
1.08 1
1.09 3712
1.10 39728
1.11 3674
earthquakes |>
  filter(depth > 600) |>
  group_by(net) |>
  summarize(count = n()) |>
  arrange(net) |>
  collect(n = 5) |>
  kable()
net count
us 1215
earthquakes |>
  filter(depth > 600) |>
  group_by(place) |>
  summarize(count = n()) |>
  arrange(place) |>
  collect(n = 5) |>
  kable()
place count
100km NW of Ndoi Island, Fiji 1
100km SSW of Ndoi Island, Fiji 1
100km SW of Ndoi Island, Fiji 1
101km ENE of Suva, Fiji 1
101km NNE of Ndoi Island, Fiji 1
earthquakes |>
  filter(depth > 600) |>
  mutate(place_name = case_when(grepl(' of ', place) ~
                                  split_part(place, ' of ', 2L),
                                TRUE ~ place)) |>
  group_by(place_name) |>
  summarize(count = n()) |>
  arrange(desc(count)) |>
  collect(n = 3) |>
  kable()
place_name count
Ndoi Island, Fiji 487
Fiji region 186
Lambasa, Fiji 140
earthquakes |>
  summarize(distinct_types = n_distinct(type),
            distinct_lower = n_distinct(lower(type))) |>
  kable()
distinct_types distinct_lower
25 24

6.2.2 Anomalous Counts or Frequencies

Why use date_trunc('year',time)::date as earthquake_year?

SELECT EXTRACT(year FROM time) AS earthquake_year, 
  COUNT(*) AS earthquakes
FROM earthquakes
GROUP BY 1
ORDER BY 1;
Displaying records 1 - 10
earthquake_year earthquakes
2010 122322
2011 107397
2012 105693
2013 114368
2014 135247
2015 122914
2016 122420
2017 130622
2018 179304
2019 171116
earthquakes |>
  mutate(earthquake_year = as.character(year(time))) |>
  group_by(earthquake_year) |>
  summarize(earthquakes = n()) |>
  ggplot(aes(x = earthquake_year, y = earthquakes)) +
  geom_bar(stat = "identity")

2

earthquakes |>
  mutate(earthquake_year = as.character(year(time))) |>
  select(earthquake_year) |>
  ggplot(aes(x = earthquake_year)) +
  geom_bar()
earthquakes |>
  mutate(earthquake_month = floor_date(time, "month")) |>
  group_by(earthquake_month) |>
  summarize(earthquakes = n(), .groups = "drop") |>
  ggplot(aes(x = earthquake_month, y = earthquakes)) +
  geom_line()

From the book, “it turns out that the increase in earthquakes starting in 2017 can be at least partially explained by the status field. The status indicates whether the event has been reviewed by a human (‘reviewed’) or was directly posted by a system without review (‘automatic’).” This can be seen in the following plot.3

earthquakes |>
  mutate(earthquake_month = floor_date(time, "month")) |>
  group_by(earthquake_month, status) |>
  summarize(earthquakes = n(), .groups = "drop") |>
  ggplot(aes(x = earthquake_month, y = earthquakes, color = status)) +
  geom_line()

earthquakes |>
  filter(mag >= 6) |>
  group_by(place) |>
  summarize(earthquakes = n(), .groups = "drop") |>
  arrange(desc(earthquakes)) |>
  collect(n = 3) |>
  kable()
place earthquakes
near the east coast of Honshu, Japan 52
off the east coast of Honshu, Japan 34
Vanuatu 28

For the next query, it seems easy enough to just put the result in a plot.

earthquakes |>
  filter(mag >= 6) |>
  mutate(place = if_else(grepl(' of ', place),
                         split_part(place, ' of ', 2L), 
                         place)) |>
  count(place, name = "earthquakes") |>
  arrange(desc(earthquakes)) |>
  collect(n = 10) |>
  ggplot(aes(y = fct_inorder(place), 
             x = earthquakes)) +
  geom_bar(stat = "identity")

6.2.3 Anomalies from the Absence of Data

6.3 Handling Anomalies

6.3.1 Investigation

6.3.2 Removal

earthquakes |>
  filter(!mag %in% c(-9,-9.99)) |>
  select(time, mag, type) |>
  collect(n = 10) |>
  kable()
time mag type
2016-09-28 05:33:17 0.73 earthquake
2016-09-28 05:35:37 0.58 earthquake
2016-09-28 05:41:37 2.16 earthquake
2016-09-28 05:47:16 1.50 earthquake
2016-09-28 05:47:59 1.44 earthquake
2016-09-28 05:52:54 2.20 earthquake
2016-09-28 05:56:43 2.23 earthquake
2016-09-28 05:59:17 2.09 earthquake
2016-09-28 06:09:07 0.89 earthquake
2016-09-28 06:11:14 1.66 earthquake
earthquakes |>
  summarize(avg_mag = avg(mag),
            avg_mag_adjusted = avg(if_else(mag > -9, mag, NA))) |>
  kable()
avg_mag avg_mag_adjusted
1.625101 1.627323
earthquakes |>
  filter(place == 'Yellowstone National Park, Wyoming') |>
  summarize(avg_mag = avg(mag),
            avg_mag_adjusted = avg(if_else(mag > -9, mag, NA))) |>
  kable()
avg_mag avg_mag_adjusted
0.4063935 0.9233279

6.3.3 Replacement with Alternate Values

earthquakes |>
  mutate(event_type = if_else(type == 'earthquake', type, 'Other')) |>
  count(event_type) |>
  kable()
event_type n
earthquake 1461750
Other 34176
extremes <-
  earthquakes |>
  summarize(p95 = quantile(mag, probs = 0.95, na.rm = TRUE),
            p05 = quantile(mag, probs = 0.05, na.rm = TRUE))

extremes |> kable()
p95 p05
4.5 0.12

Note that this SQL from the book4

CASE 
  WHEN mag > p95 THEN p95
  WHEN mag < p05 THEN p05
  ELSE mag
END AS mag_winsorized

can be replaced with a single line:

LEAST(GREATEST(mag, p05), p95) AS mag_winsorized

The R equivalents of LEAST and GREATEST are pmin and pmax, respectively. And dbplyr will translate pmin and pmax for us, so we can get winsorized data as follows.

earthquakes_wins <-
  earthquakes |>
  mutate(mag = if_else(mag %in% c(-9.99, -9), NA, mag)) |>
  filter(!is.na(mag)) |>
  cross_join(extremes) |>
  mutate(mag_winsorized = pmin(pmax(mag, p05), p95)) |>
  select(time, place, mag, mag_winsorized) 

earthquakes_wins |>
  arrange(desc(mag)) |>
  collect(n = 3) |>
  kable()
time place mag mag_winsorized
2011-03-11 05:46:24 2011 Great Tohoku Earthquake, Japan 9.1 4.5
2010-02-27 06:34:11 offshore Bio-Bio, Chile 8.8 4.5
2012-04-11 08:38:36 off the west coast of northern Sumatra 8.6 4.5
earthquakes_wins |>
  filter(mag == mag_winsorized) |>
  collect(n = 3) |>
  kable()
time place mag mag_winsorized
2015-12-04 15:23:52 159km NNE of Cape Yakataga, Alaska 1.2 1.2
2015-12-04 15:26:03 87km WSW of Atka, Alaska 2.0 2.0
2015-12-04 15:32:17 112km N of Larsen Bay, Alaska 2.7 2.7
earthquakes_wins |>
  arrange(mag) |>
  collect(n = 3) |>
  kable()
time place mag mag_winsorized
2016-06-20 22:16:25 20 km SSW of Pa‘auilo, Hawaii -5.0 0.12
2012-06-11 01:59:01 Nevada -2.6 0.12
2012-06-27 01:14:37 Nevada -2.6 0.12

6.3.4 Rescaling

In the book, it says WHERE depth >= 0.05, but I need to use WHERE depth > 0.05 to match the results there.

quake_depths <-
  earthquakes |>
  filter(depth > 0.05) |>
  mutate(depth = round(depth, 1)) |>
  select(depth)

quake_depths |>
  mutate(log_depth = log(depth, base = 10)) |>
  count(depth, log_depth) |>
  arrange(depth) |>
  collect(n = 3) |>
  kable()
depth log_depth n
0.1 -1.0000000 6994
0.2 -0.6989700 6876
0.3 -0.5228787 7269
quake_depths |>
  ggplot(aes(x = depth)) +
  geom_histogram(binwidth = 0.1)

quake_depths |>
  ggplot(aes(x = log(depth, base = 10))) +
  geom_histogram(binwidth = 0.1)


  1. The reason for this is not clear and I have filed a feature request to make the handling of ntile() similar to that of other window functions.↩︎

  2. Without the select(earthquake_year), the following plot would take 4 seconds to produce—versus 0.4 seconds needed here. This suggests that ggplot is triggering a collect() on fields that it does not need to make the plot.↩︎

  3. Unlike the plot in the book, I leave observations with “manual” status in the plot, as they are in the SQL query.↩︎

  4. I simplify the SQL from the book assuming that the table already includes a CROSS JOIN with the SQL equivalent of extremes and I use shorter variable names for the extremes.↩︎