Search code examples
dplyrdbplyrduckdbrpostgres

Is there any way to tell dbplyr to evaluate an aggregate function in a window context?


Below I can achieve the results I want by constructing the window w directly in SQL and joining it with other components myself. But is there any way to do this using the standard dbplyr verbs? (See error messages below.)

Is this something that would have to be implemented by the suppliers of the dbplyr backends for RPostgres and duckdb?

library(dplyr, warn.conflicts = FALSE)
library(dbplyr, warn.conflicts = FALSE)
library(DBI)
set.seed(2023)
n <- 100
betas <- tibble(id = 1:n, beta = runif(n, min = 0.7, max = 1.3))

start_date <- as.Date("1980-01-01")
end_date <- as.Date("2023-01-01")

rets <-
  tibble(date = seq.Date(start_date, end_date, by = "month"),
         r_m = rnorm(n = length(date), sd = 0.05)) |>
  cross_join(betas) |>
  mutate(e_i = rnorm(n = length(date), sd = 0.1)) |>
  mutate(r_i = beta * r_m + e_i) |>
  select(id, date, r_i, r_m) |>
  arrange(id, date)

rets
#> # A tibble: 51,700 × 4
#>       id date           r_i     r_m
#>    <int> <date>       <dbl>   <dbl>
#>  1     1 1980-01-01  0.170   0.0332
#>  2     1 1980-02-01  0.0412 -0.0545
#>  3     1 1980-03-01 -0.0784 -0.0211
#>  4     1 1980-04-01  0.268   0.0592
#>  5     1 1980-05-01  0.392   0.0792
#>  6     1 1980-06-01  0.230   0.114 
#>  7     1 1980-07-01 -0.0166 -0.103 
#>  8     1 1980-08-01  0.0441 -0.0204
#>  9     1 1980-09-01  0.150   0.0151
#> 10     1 1980-10-01 -0.0423  0.0348
#> # ℹ 51,690 more rows

db <- dbConnect(RPostgres::Postgres())

rets_db <- copy_to(db, rets, overwrite = TRUE)
rets_db |>
  group_by(id) |>
  window_order(date) |>
  window_frame(-60, 0) |>
  mutate(beta = regr_slope(r_i, r_m))
#> Error in `collect()`:
#> ! Failed to collect lazy table.
#> Caused by error:
#> ! Failed to prepare query : ERROR:  column "rets.id" must appear in the GROUP BY clause or be used in an aggregate function
#> LINE 1: SELECT "rets".*, regr_slope("r_i", "r_m") AS "beta"
#>                ^
#> Backtrace:
#>      ▆
#>   1. ├─base::tryCatch(...)
#>   2. │ └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>   3. │   ├─base (local) tryCatchOne(...)
#>   4. │   │ └─base (local) doTryCatch(return(expr), name, parentenv, handler)
#>   5. │   └─base (local) tryCatchList(expr, names[-nh], parentenv, handlers[-nh])
#>   6. │     └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>   7. │       └─base (local) doTryCatch(return(expr), name, parentenv, handler)
#>   8. ├─base::withCallingHandlers(...)
#>   9. ├─base::saveRDS(...)
#>  10. ├─base::do.call(...)
#>  11. ├─base (local) `<fn>`(...)
#>  12. └─global `<fn>`(input = base::quote("keen-boa_reprex.R"))
#>  13.   └─rmarkdown::render(input, quiet = TRUE, envir = globalenv(), encoding = "UTF-8")
#>  14.     └─knitr::knit(knit_input, knit_output, envir = envir, quiet = quiet)
#>  15.       └─knitr:::process_file(text, output)
#>  16.         ├─knitr:::handle_error(...)
#>  17.         │ └─base::withCallingHandlers(...)
#>  18.         ├─base::withCallingHandlers(...)
#>  19.         ├─knitr:::process_group(group)
#>  20.         └─knitr:::process_group.block(group)
#>  21.           └─knitr:::call_block(x)
#>  22.             └─knitr:::block_exec(params)
#>  23.               └─knitr:::eng_r(options)
#>  24.                 ├─knitr:::in_input_dir(...)
#>  25.                 │ └─knitr:::in_dir(input_dir(), expr)
#>  26.                 └─knitr (local) evaluate(...)
#>  27.                   └─evaluate::evaluate(...)
#>  28.                     └─evaluate:::evaluate_call(...)
#>  29.                       ├─evaluate (local) handle(...)
#>  30.                       │ └─base::try(f, silent = TRUE)
#>  31.                       │   └─base::tryCatch(...)
#>  32.                       │     └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>  33.                       │       └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>  34.                       │         └─base (local) doTryCatch(return(expr), name, parentenv, handler)
#>  35.                       ├─base::withCallingHandlers(...)
#>  36.                       ├─base::withVisible(value_fun(ev$value, ev$visible))
#>  37.                       └─knitr (local) value_fun(ev$value, ev$visible)
#>  38.                         └─knitr (local) fun(x, options = options)
#>  39.                           ├─base::withVisible(knit_print(x, ...))
#>  40.                           ├─knitr::knit_print(x, ...)
#>  41.                           └─rmarkdown:::knit_print.tbl_sql(x, ...)
#>  42.                             ├─context$df_print(x)
#>  43.                             └─dbplyr:::print.tbl_sql(x)
#>  44.                               ├─dbplyr:::cat_line(format(x, ..., n = n, width = width, n_extra = n_extra))
#>  45.                               │ ├─base::cat(paste0(..., "\n"), sep = "")
#>  46.                               │ └─base::paste0(..., "\n")
#>  47.                               ├─base::format(x, ..., n = n, width = width, n_extra = n_extra)
#>  48.                               └─pillar:::format.tbl(x, ..., n = n, width = width, n_extra = n_extra)
#>  49.                                 └─pillar:::format_tbl(...)
#>  50.                                   └─pillar::tbl_format_setup(...)
#>  51.                                     ├─pillar:::tbl_format_setup_dispatch(...)
#>  52.                                     └─pillar:::tbl_format_setup.tbl(...)
#>  53.                                       └─pillar:::df_head(x, n + 1)
#>  54.                                         ├─base::as.data.frame(head(x, n))
#>  55.                                         └─dbplyr:::as.data.frame.tbl_sql(head(x, n))
#>  56.                                           ├─base::as.data.frame(collect(x, n = n))
#>  57.                                           ├─dplyr::collect(x, n = n)
#>  58.                                           └─dbplyr:::collect.tbl_sql(x, n = n)
#>  59.                                             └─base::tryCatch(...)
#>  60.                                               └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>  61.                                                 └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>  62.                                                   └─value[[3L]](cond)
#>  63.                                                     └─cli::cli_abort("Failed to collect lazy table.", parent = cnd)
#>  64.                                                       └─rlang::abort(...)

betas_db <-
  rets_db |>
  group_by(id) |>
  window_order(date) |>
  window_frame(-60, 0) |>
  mutate(beta = regr_slope(r_i, r_m))

betas_db |> show_query()
#> <SQL>
#> SELECT "rets".*, regr_slope("r_i", "r_m") AS "beta"
#> FROM "rets"
tryCatch(betas_db)
#> Error in `collect()`:
#> ! Failed to collect lazy table.
#> Caused by error:
#> ! Failed to prepare query : ERROR:  column "rets.id" must appear in the GROUP BY clause or be used in an aggregate function
#> LINE 1: SELECT "rets".*, regr_slope("r_i", "r_m") AS "beta"
#>                ^
#> Backtrace:
#>      ▆
#>   1. ├─base::tryCatch(...)
#>   2. │ └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>   3. │   ├─base (local) tryCatchOne(...)
#>   4. │   │ └─base (local) doTryCatch(return(expr), name, parentenv, handler)
#>   5. │   └─base (local) tryCatchList(expr, names[-nh], parentenv, handlers[-nh])
#>   6. │     └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>   7. │       └─base (local) doTryCatch(return(expr), name, parentenv, handler)
#>   8. ├─base::withCallingHandlers(...)
#>   9. ├─base::saveRDS(...)
#>  10. ├─base::do.call(...)
#>  11. ├─base (local) `<fn>`(...)
#>  12. └─global `<fn>`(input = base::quote("keen-boa_reprex.R"))
#>  13.   └─rmarkdown::render(input, quiet = TRUE, envir = globalenv(), encoding = "UTF-8")
#>  14.     └─knitr::knit(knit_input, knit_output, envir = envir, quiet = quiet)
#>  15.       └─knitr:::process_file(text, output)
#>  16.         ├─knitr:::handle_error(...)
#>  17.         │ └─base::withCallingHandlers(...)
#>  18.         ├─base::withCallingHandlers(...)
#>  19.         ├─knitr:::process_group(group)
#>  20.         └─knitr:::process_group.block(group)
#>  21.           └─knitr:::call_block(x)
#>  22.             └─knitr:::block_exec(params)
#>  23.               └─knitr:::eng_r(options)
#>  24.                 ├─knitr:::in_input_dir(...)
#>  25.                 │ └─knitr:::in_dir(input_dir(), expr)
#>  26.                 └─knitr (local) evaluate(...)
#>  27.                   └─evaluate::evaluate(...)
#>  28.                     └─evaluate:::evaluate_call(...)
#>  29.                       ├─evaluate (local) handle(...)
#>  30.                       │ └─base::try(f, silent = TRUE)
#>  31.                       │   └─base::tryCatch(...)
#>  32.                       │     └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>  33.                       │       └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>  34.                       │         └─base (local) doTryCatch(return(expr), name, parentenv, handler)
#>  35.                       ├─base::withCallingHandlers(...)
#>  36.                       ├─base::withVisible(value_fun(ev$value, ev$visible))
#>  37.                       └─knitr (local) value_fun(ev$value, ev$visible)
#>  38.                         └─knitr (local) fun(x, options = options)
#>  39.                           ├─base::withVisible(knit_print(x, ...))
#>  40.                           ├─knitr::knit_print(x, ...)
#>  41.                           └─rmarkdown:::knit_print.tbl_sql(x, ...)
#>  42.                             ├─context$df_print(x)
#>  43.                             └─dbplyr:::print.tbl_sql(x)
#>  44.                               ├─dbplyr:::cat_line(format(x, ..., n = n, width = width, n_extra = n_extra))
#>  45.                               │ ├─base::cat(paste0(..., "\n"), sep = "")
#>  46.                               │ └─base::paste0(..., "\n")
#>  47.                               ├─base::format(x, ..., n = n, width = width, n_extra = n_extra)
#>  48.                               └─pillar:::format.tbl(x, ..., n = n, width = width, n_extra = n_extra)
#>  49.                                 └─pillar:::format_tbl(...)
#>  50.                                   └─pillar::tbl_format_setup(...)
#>  51.                                     ├─pillar:::tbl_format_setup_dispatch(...)
#>  52.                                     └─pillar:::tbl_format_setup.tbl(...)
#>  53.                                       └─pillar:::df_head(x, n + 1)
#>  54.                                         ├─base::as.data.frame(head(x, n))
#>  55.                                         └─dbplyr:::as.data.frame.tbl_sql(head(x, n))
#>  56.                                           ├─base::as.data.frame(collect(x, n = n))
#>  57.                                           ├─dplyr::collect(x, n = n)
#>  58.                                           └─dbplyr:::collect.tbl_sql(x, n = n)
#>  59.                                             └─base::tryCatch(...)
#>  60.                                               └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>  61.                                                 └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>  62.                                                   └─value[[3L]](cond)
#>  63.                                                     └─cli::cli_abort("Failed to collect lazy table.", parent = cnd)
#>  64.                                                       └─rlang::abort(...)

db <- dbConnect(duckdb::duckdb())

rets_db <- copy_to(db, rets, overwrite = TRUE)
rets_db |>
  group_by(id) |>
  window_order(date) |>
  window_frame(-60, 0) |>
  mutate(beta = regr_slope(r_i, r_m))
#> Error in `collect()`:
#> ! Failed to collect lazy table.
#> Caused by error:
#> ! rapi_prepare: Failed to prepare query SELECT rets.*, regr_slope(r_i, r_m) AS beta
#> FROM rets
#> LIMIT 11
#> Error: Binder Error: column "id" must appear in the GROUP BY clause or must be part of an aggregate function.
#> Either add it to the GROUP BY list, or use "ANY_VALUE(id)" if the exact value of "id" is not important.
#> Backtrace:
#>      ▆
#>   1. ├─base::tryCatch(...)
#>   2. │ └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>   3. │   ├─base (local) tryCatchOne(...)
#>   4. │   │ └─base (local) doTryCatch(return(expr), name, parentenv, handler)
#>   5. │   └─base (local) tryCatchList(expr, names[-nh], parentenv, handlers[-nh])
#>   6. │     └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>   7. │       └─base (local) doTryCatch(return(expr), name, parentenv, handler)
#>   8. ├─base::withCallingHandlers(...)
#>   9. ├─base::saveRDS(...)
#>  10. ├─base::do.call(...)
#>  11. ├─base (local) `<fn>`(...)
#>  12. └─global `<fn>`(input = base::quote("keen-boa_reprex.R"))
#>  13.   └─rmarkdown::render(input, quiet = TRUE, envir = globalenv(), encoding = "UTF-8")
#>  14.     └─knitr::knit(knit_input, knit_output, envir = envir, quiet = quiet)
#>  15.       └─knitr:::process_file(text, output)
#>  16.         ├─knitr:::handle_error(...)
#>  17.         │ └─base::withCallingHandlers(...)
#>  18.         ├─base::withCallingHandlers(...)
#>  19.         ├─knitr:::process_group(group)
#>  20.         └─knitr:::process_group.block(group)
#>  21.           └─knitr:::call_block(x)
#>  22.             └─knitr:::block_exec(params)
#>  23.               └─knitr:::eng_r(options)
#>  24.                 ├─knitr:::in_input_dir(...)
#>  25.                 │ └─knitr:::in_dir(input_dir(), expr)
#>  26.                 └─knitr (local) evaluate(...)
#>  27.                   └─evaluate::evaluate(...)
#>  28.                     └─evaluate:::evaluate_call(...)
#>  29.                       ├─evaluate (local) handle(...)
#>  30.                       │ └─base::try(f, silent = TRUE)
#>  31.                       │   └─base::tryCatch(...)
#>  32.                       │     └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>  33.                       │       └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>  34.                       │         └─base (local) doTryCatch(return(expr), name, parentenv, handler)
#>  35.                       ├─base::withCallingHandlers(...)
#>  36.                       ├─base::withVisible(value_fun(ev$value, ev$visible))
#>  37.                       └─knitr (local) value_fun(ev$value, ev$visible)
#>  38.                         └─knitr (local) fun(x, options = options)
#>  39.                           ├─base::withVisible(knit_print(x, ...))
#>  40.                           ├─knitr::knit_print(x, ...)
#>  41.                           └─rmarkdown:::knit_print.tbl_sql(x, ...)
#>  42.                             ├─context$df_print(x)
#>  43.                             └─dbplyr:::print.tbl_sql(x)
#>  44.                               ├─dbplyr:::cat_line(format(x, ..., n = n, width = width, n_extra = n_extra))
#>  45.                               │ ├─base::cat(paste0(..., "\n"), sep = "")
#>  46.                               │ └─base::paste0(..., "\n")
#>  47.                               ├─base::format(x, ..., n = n, width = width, n_extra = n_extra)
#>  48.                               └─pillar:::format.tbl(x, ..., n = n, width = width, n_extra = n_extra)
#>  49.                                 └─pillar:::format_tbl(...)
#>  50.                                   └─pillar::tbl_format_setup(...)
#>  51.                                     ├─pillar:::tbl_format_setup_dispatch(...)
#>  52.                                     └─pillar:::tbl_format_setup.tbl(...)
#>  53.                                       └─pillar:::df_head(x, n + 1)
#>  54.                                         ├─base::as.data.frame(head(x, n))
#>  55.                                         └─dbplyr:::as.data.frame.tbl_sql(head(x, n))
#>  56.                                           ├─base::as.data.frame(collect(x, n = n))
#>  57.                                           ├─dplyr::collect(x, n = n)
#>  58.                                           └─dbplyr:::collect.tbl_sql(x, n = n)
#>  59.                                             └─base::tryCatch(...)
#>  60.                                               └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>  61.                                                 └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>  62.                                                   └─value[[3L]](cond)
#>  63.                                                     └─cli::cli_abort("Failed to collect lazy table.", parent = cnd)
#>  64.                                                       └─rlang::abort(...)

betas_all <-
  rets_db |>
  group_by(id) |>
  summarize(beta = regr_slope(r_i, r_m))

betas_all |> show_query()
#> <SQL>
#> SELECT id, regr_slope(r_i, r_m) AS beta
#> FROM rets
#> GROUP BY id
betas_all
#> # Source:   SQL [?? x 2]
#> # Database: DuckDB v0.9.2 [root@Darwin 23.3.0:R 4.3.2/:memory:]
#>       id  beta
#>    <int> <dbl>
#>  1     1 1.04 
#>  2     2 0.881
#>  3     3 0.841
#>  4     4 0.853
#>  5     5 0.812
#>  6     6 0.824
#>  7     7 0.988
#>  8     8 1.02 
#>  9     9 0.753
#> 10    10 1.02 
#> # ℹ more rows

betas_db <-
  rets_db |>
  group_by(id) |>
  window_order(date) |>
  window_frame(-60, 0) |>
  mutate(beta = regr_slope(r_i, r_m))

betas_db |> show_query()
#> <SQL>
#> SELECT rets.*, regr_slope(r_i, r_m) AS beta
#> FROM rets
try(betas_db)
#> Error in `collect()`:
#> ! Failed to collect lazy table.
#> Caused by error:
#> ! rapi_prepare: Failed to prepare query SELECT rets.*, regr_slope(r_i, r_m) AS beta
#> FROM rets
#> LIMIT 11
#> Error: Binder Error: column "id" must appear in the GROUP BY clause or must be part of an aggregate function.
#> Either add it to the GROUP BY list, or use "ANY_VALUE(id)" if the exact value of "id" is not important.
#> Backtrace:
#>      ▆
#>   1. ├─base::tryCatch(...)
#>   2. │ └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>   3. │   ├─base (local) tryCatchOne(...)
#>   4. │   │ └─base (local) doTryCatch(return(expr), name, parentenv, handler)
#>   5. │   └─base (local) tryCatchList(expr, names[-nh], parentenv, handlers[-nh])
#>   6. │     └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>   7. │       └─base (local) doTryCatch(return(expr), name, parentenv, handler)
#>   8. ├─base::withCallingHandlers(...)
#>   9. ├─base::saveRDS(...)
#>  10. ├─base::do.call(...)
#>  11. ├─base (local) `<fn>`(...)
#>  12. └─global `<fn>`(input = base::quote("keen-boa_reprex.R"))
#>  13.   └─rmarkdown::render(input, quiet = TRUE, envir = globalenv(), encoding = "UTF-8")
#>  14.     └─knitr::knit(knit_input, knit_output, envir = envir, quiet = quiet)
#>  15.       └─knitr:::process_file(text, output)
#>  16.         ├─knitr:::handle_error(...)
#>  17.         │ └─base::withCallingHandlers(...)
#>  18.         ├─base::withCallingHandlers(...)
#>  19.         ├─knitr:::process_group(group)
#>  20.         └─knitr:::process_group.block(group)
#>  21.           └─knitr:::call_block(x)
#>  22.             └─knitr:::block_exec(params)
#>  23.               └─knitr:::eng_r(options)
#>  24.                 ├─knitr:::in_input_dir(...)
#>  25.                 │ └─knitr:::in_dir(input_dir(), expr)
#>  26.                 └─knitr (local) evaluate(...)
#>  27.                   └─evaluate::evaluate(...)
#>  28.                     └─evaluate:::evaluate_call(...)
#>  29.                       ├─evaluate (local) handle(...)
#>  30.                       │ └─base::try(f, silent = TRUE)
#>  31.                       │   └─base::tryCatch(...)
#>  32.                       │     └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>  33.                       │       └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>  34.                       │         └─base (local) doTryCatch(return(expr), name, parentenv, handler)
#>  35.                       ├─base::withCallingHandlers(...)
#>  36.                       ├─base::withVisible(value_fun(ev$value, ev$visible))
#>  37.                       └─knitr (local) value_fun(ev$value, ev$visible)
#>  38.                         └─knitr (local) fun(x, options = options)
#>  39.                           ├─base::withVisible(knit_print(x, ...))
#>  40.                           ├─knitr::knit_print(x, ...)
#>  41.                           └─rmarkdown:::knit_print.tbl_sql(x, ...)
#>  42.                             ├─context$df_print(x)
#>  43.                             └─dbplyr:::print.tbl_sql(x)
#>  44.                               ├─dbplyr:::cat_line(format(x, ..., n = n, width = width, n_extra = n_extra))
#>  45.                               │ ├─base::cat(paste0(..., "\n"), sep = "")
#>  46.                               │ └─base::paste0(..., "\n")
#>  47.                               ├─base::format(x, ..., n = n, width = width, n_extra = n_extra)
#>  48.                               └─pillar:::format.tbl(x, ..., n = n, width = width, n_extra = n_extra)
#>  49.                                 └─pillar:::format_tbl(...)
#>  50.                                   └─pillar::tbl_format_setup(...)
#>  51.                                     ├─pillar:::tbl_format_setup_dispatch(...)
#>  52.                                     └─pillar:::tbl_format_setup.tbl(...)
#>  53.                                       └─pillar:::df_head(x, n + 1)
#>  54.                                         ├─base::as.data.frame(head(x, n))
#>  55.                                         └─dbplyr:::as.data.frame.tbl_sql(head(x, n))
#>  56.                                           ├─base::as.data.frame(collect(x, n = n))
#>  57.                                           ├─dplyr::collect(x, n = n)
#>  58.                                           └─dbplyr:::collect.tbl_sql(x, n = n)
#>  59.                                             └─base::tryCatch(...)
#>  60.                                               └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>  61.                                                 └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>  62.                                                   └─value[[3L]](cond)
#>  63.                                                     └─cli::cli_abort("Failed to collect lazy table.", parent = cnd)
#>  64.                                                       └─rlang::abort(...)

w <- paste0(" OVER (PARTITION BY id ",
                "ORDER BY date ", 
                "ROWS BETWEEN 59 PRECEDING AND CURRENT ROW)")

betas_db <-
  rets_db |>
  mutate(beta = sql(paste0("regr_slope(r_i, r_m)", w)),
         n_obs = sql(paste0("regr_count(r_i, r_m)", w)))

betas_db |> show_query()
#> <SQL>
#> SELECT
#>   rets.*,
#>   regr_slope(r_i, r_m) OVER (PARTITION BY id ORDER BY date ROWS BETWEEN 59 PRECEDING AND CURRENT ROW) AS beta,
#>   regr_count(r_i, r_m) OVER (PARTITION BY id ORDER BY date ROWS BETWEEN 59 PRECEDING AND CURRENT ROW) AS n_obs
#> FROM rets
betas_db
#> # Source:   SQL [?? x 6]
#> # Database: DuckDB v0.9.2 [root@Darwin 23.3.0:R 4.3.2/:memory:]
#>       id date           r_i     r_m  beta n_obs
#>    <int> <date>       <dbl>   <dbl> <dbl> <dbl>
#>  1    42 1980-01-01  0.122   0.0332 NA        1
#>  2    42 1980-02-01 -0.205  -0.0545  3.73     2
#>  3    42 1980-03-01 -0.219  -0.0211  3.97     3
#>  4    42 1980-04-01  0.0316  0.0592  2.87     4
#>  5    42 1980-05-01  0.151   0.0792  2.89     5
#>  6    42 1980-06-01  0.180   0.114   2.61     6
#>  7    42 1980-07-01 -0.280  -0.103   2.38     7
#>  8    42 1980-08-01 -0.0404 -0.0204  2.31     8
#>  9    42 1980-09-01  0.119   0.0151  2.33     9
#> 10    42 1980-10-01  0.176   0.0348  2.41    10
#> # ℹ more rows

betas_db |>
  mutate(beta = if_else(n_obs > 48, beta, NA)) |>
  arrange(id, desc(date))
#> # Source:     SQL [?? x 6]
#> # Database:   DuckDB v0.9.2 [root@Darwin 23.3.0:R 4.3.2/:memory:]
#> # Ordered by: id, desc(date)
#>       id date            r_i     r_m  beta n_obs
#>    <int> <date>        <dbl>   <dbl> <dbl> <dbl>
#>  1     1 2023-01-01 -0.0936   0.0195 0.906    60
#>  2     1 2022-12-01  0.0962   0.0354 0.907    60
#>  3     1 2022-11-01  0.0573   0.0377 0.895    60
#>  4     1 2022-10-01  0.00196 -0.0481 0.917    60
#>  5     1 2022-09-01  0.0422   0.0539 0.946    60
#>  6     1 2022-08-01 -0.112    0.0404 0.963    60
#>  7     1 2022-07-01 -0.148   -0.0370 0.955    60
#>  8     1 2022-06-01  0.267   -0.0319 0.984    60
#>  9     1 2022-05-01  0.00671 -0.0677 1.05     60
#> 10     1 2022-04-01  0.0299  -0.0160 1.06     60
#> # ℹ more rows

Created on 2023-12-30 with reprex v2.0.2


Solution

  • This is most likely caused by how the dbplyr translation for regr_slope and regr_count is defined (or not defined). Based on this question and answer it is possible for dbplyr to produce SQL translations with window functions such as:

    SELECT col1
        , rank() OVER (PARTITION BY col2 ORDER BY col3) AS col4
    FROM table
    

    This must mean that for rank() the dbplyr translation has instructions for generating the required window function.

    For regr_* it seems likely that either:

    1. the dbplyr translation is not defined, hence dbplyr is passing the command untranslated, so there is no way for it to generate the window function
    2. the translation is defined, but does not include instructions for generating a window function. Hence this needs a bug-report or a feature-request.

    One way to confirm whether a dbplyr translation is defined is to search the source code. You can download the dbplyr source code from GitHub and search the folder for the regr_slope command.


    It looks like you already have an effective alternative: passing the code as already translated SQL text within sql():

    betas_db <-
      rets_db |>
      mutate(beta = sql("regr_slope(r_i, r_m) OVER (PARTITION BY id ORDER BY date ROWS BETWEEN 59 PRECEDING AND CURRENT ROW)"),
    

    This would be my recommended alternative.