0

I have a dataset like:

  ID    DATETIME    CODE  Value
999 1/2/2024 16:22  TX     100
123 1/2/2024 16:47  IP     100
666 1/2/2024 17:13  IP      85
666 1/2/2024 17:38  IP     100
123 1/2/2024 18:03  TX      90
666 1/2/2024 18:28  TX      85
666 1/2/2024 18:54  IP     100
123 1/2/2024 19:19  CA     100
666 1/2/2024 19:44  OX      95
999 1/2/2024 20:09  18      75
123 1/2/2024 20:35  12     100
654 1/2/2024 21:00  IP      85

Here's, the Reprex of above:

structure(list(ID = c("999", "123", "666", "666", "123", "666", 
"666", "123", "666", "999", "123", "654"), DATETIME = structure(c(1706804520, 
1706806020, 1706807580, 1706809080, 1706810580, 1706812080, 1706813640, 
1706815140, 1706816640, 1706818140, 1706819700, 1706821200), class = c("POSIXct", 
"POSIXt"), tzone = "UTC"), CODE = c("TX", "IP", "IP", "IP", "TX", 
"TX", "IP", "CA", "OX", "18", "12", "IP"), Value = c(100, 100, 
85, 100, 90, 85, 100, 100, 95, 75, 100, 85)), class = c("tbl_df", 
"tbl", "data.frame"), row.names = c(NA, -12L))

I'd like to add a column with the cumulative sum, by ID within the last 2 hours, according to a condition (CODE == IP). Like this:

ID  DATETIME       CODE Value   cum_IP
999 1/2/2024 16:22  TX  100      0
123 1/2/2024 16:47  IP  100    100
666 1/2/2024 17:13  IP   85     85
666 1/2/2024 17:38  IP  100    185
123 1/2/2024 18:03  TX   90      0
666 1/2/2024 18:28  TX   85      0
666 1/2/2024 18:54  IP  100    285
123 1/2/2024 19:19  CA  100      0
666 1/2/2024 19:44  OX   95      0
999 1/2/2024 20:09  18   75      0
123 1/2/2024 20:35  12  100      0
654 1/2/2024 21:00  IP   85     85

I expect not to had made any error computing manually the column, but the idea is understandable. A function that computes an aggregated operation (a sum or a simple count) over rows with a grouping var (ID), a predicate (CODE == IP in this case, but it would be v.g. is.number(CODE)) and a window (2 hours from the current row).

1
  • Please provide a reproducible input. Commented Feb 1, 2024 at 20:14

4 Answers 4

2

Try the below. I'm choosing to group by CODE as well to reduce the conditioning inside the code. We use sapply as a simple rolling-window for the 2 hour window. I'm doing the singular if conditional so that don't sapply(.) through the data unnecessarily.

dplyr

library(dplyr)
quux %>%
  mutate(
    cum_IP = if (first(CODE) == "IP") {
        sapply(DATETIME, function(tm) sum(Value[between(DATETIME, tm-7200, tm)]))
      } else 0,
    .by = c(ID, CODE))
# # A tibble: 12 × 5
#    ID    DATETIME                CODE  Value cum_IP
#    <chr> <dttm>                  <chr> <dbl>  <dbl>
#  1 999   2024-02-01 16:22:00.000 TX      100      0
#  2 123   2024-02-01 16:47:00.000 IP      100    100
#  3 666   2024-02-01 17:13:00.000 IP       85     85
#  4 666   2024-02-01 17:38:00.000 IP      100    185
#  5 123   2024-02-01 18:03:00.000 TX       90      0
#  6 666   2024-02-01 18:28:00.000 TX       85      0
#  7 666   2024-02-01 18:54:00.000 IP      100    285
#  8 123   2024-02-01 19:19:00.000 CA      100      0
#  9 666   2024-02-01 19:44:00.000 OX       95      0
# 10 999   2024-02-01 20:09:00.000 18       75      0
# 11 123   2024-02-01 20:35:00.000 12      100      0
# 12 654   2024-02-01 21:00:00.000 IP       85     85

.by= requires dplyr_1.1.0 or newer, replace with group_by(..) if on an older version.

data.table

(Since you tagged .)

library(data.table)
as.data.table(quux) |>
  _[, cum_IP := if (CODE == "IP") sapply(DATETIME, function(tm) sum(Value[between(DATETIME, tm-7200, tm)])) else 0,
    by = .(ID, CODE)]

We can do CODE instead of first(CODE) because in data.table when grouping by a variable, the inner expression only sees length-1 for the grouping variables.

The use of |> _[..] requires R-4.3 or newer. Other options exist for pipeline data.table for this.

Sign up to request clarification or add additional context in comments.

4 Comments

Now that I am thinking of it. I perceive the within 2hours is quite ambiguous. assume I have data at 8:00 then 9:00 then 10:20. first data and 2nd data are within 2hours and 2nd and 3rd are within 2hours. But 1st and 3rd are not. I am not quite sure whether OP wants to do a cumsum of 1,2,3 or just sum of 1 and 2 and then 2 and 3. its not quite clear to me. so my answer might be incorrect and yours correct +1
I agree, "cumulative conditional sum" is likely going to yield different numbers when the sample data gets to be a bit larger. I suspect the OP may need to revise/clarify the expectation of these calcs.
can i add other filters? like as.data.table(quux) |> _[, cum_IP := if (CODE == "IP") sapply(DATETIME, function(tm) sum(Value[between(DATETIME, tm-7200, tm)]) & Value > 100) else 0, by = .(ID, CODE)]
sure, does it not work when you tried it?
1

Here is an sql left self join:

library(sqldf)

sqldf("select a.*, (a.CODE = 'IP') * sum(b.Value * (b.CODE == 'IP')) cum_IP 
  from dat a
  left join dat b on a.ID = b.ID and 
                     b.DATETIME between a.DATETIME - 2 * 60 * 60 and a.DATETIME
  group by a.rowid")

giving

    ID            DATETIME CODE Value cum_IP
1  999 2024-02-01 11:22:00   TX   100      0
2  123 2024-02-01 11:47:00   IP   100    100
3  666 2024-02-01 12:13:00   IP    85     85
4  666 2024-02-01 12:38:00   IP   100    185
5  123 2024-02-01 13:03:00   TX    90      0
6  666 2024-02-01 13:28:00   TX    85      0
7  666 2024-02-01 13:54:00   IP   100    285
8  123 2024-02-01 14:19:00   CA   100      0
9  666 2024-02-01 14:44:00   OX    95      0
10 999 2024-02-01 15:09:00   18    75      0
11 123 2024-02-01 15:35:00   12   100      0
12 654 2024-02-01 16:00:00   IP    85     85

Comments

1
library(tidyverse)
df %>%
  mutate(cum_IP = CODE == 'IP',
         cum_IP = cum_IP  & c(0,as.numeric(diff(DATETIME), unit='hours')) <= 2,
         cum_IP  = cumsum(Value * cum_IP) * cum_IP,
                 .by = ID)

# A tibble: 12 × 5
   ID    DATETIME            CODE  Value cum_IP
   <chr> <dttm>              <chr> <dbl>  <dbl>
 1 999   2024-02-01 16:22:00 TX      100      0
 2 123   2024-02-01 16:47:00 IP      100    100
 3 666   2024-02-01 17:13:00 IP       85     85
 4 666   2024-02-01 17:38:00 IP      100    185
 5 123   2024-02-01 18:03:00 TX       90      0
 6 666   2024-02-01 18:28:00 TX       85      0
 7 666   2024-02-01 18:54:00 IP      100    285
 8 123   2024-02-01 19:19:00 CA      100      0
 9 666   2024-02-01 19:44:00 OX       95      0
10 999   2024-02-01 20:09:00 18       75      0
11 123   2024-02-01 20:35:00 12      100      0
12 654   2024-02-01 21:00:00 IP       85     85

1 Comment

thanks @r2evans. I believe to have fixed the issue
0

I could make this flexible function

build_counter.data.table <- function(
    data,
    predicate,
    aggregated_fun = list("CON", "SUM"),
    name,
    duration,
    ut = "minutes",
    rolling_over = "ID",
    count_myself = FALSE
){
  
  
  predicate <- enexpr(predicate)
  
  name <- sapply(
    aggregated_fun, 
    function(x){
      paste0(
        case_match(
          x,
          c("n","lenght","count", "N", "CON") ~ "CON_",
          c("sum", "SUM") ~ "SUM_"
        ),
        name
      )
    }
  ) 
  
  aggregated_fun <- setNames( aggregated_fun, name) 
  
  window <- duration(duration, ut)
  
  data[,
       PRED := fifelse(eval(predicate) == TRUE, 1L, 0L)
  ]
  
  key <- c(rolling_over, "DATETIME")
  setkeyv(data, key)
  setorderv(
    data, 
    key
  #  c(key, "SYSKEY")
  )  
  subkey <- "DATETIME"
  
  data[,
       `:=` (
         nombres_col,{
           ## 1 - join por índice único (en caso de haber repes, 
           ## sustituye por el mayor)
           tmp1 <- .SD[
             unique(.SD, by = key(.SD)),
             on = subkey,
             which = TRUE, 
             mult = "last", 
             allow.cartesian = TRUE
           ]
           
           ## 2 - construimos una copia de data, y copiamos DATETIME 
           ## con el delay DATETIME - window por referencia.
           ## Entonces, realizamos un rolling-join con roll = -Inf, 
           ## que nos trae la siguiente observación desde delante - 
           ## Next Observation Carried Backward (NOCB). En otras palabras,
           ## si no hay coincidencia exacta, rellena con el siguiente 
           ## valor disponible.
           
           data0 <- copy(.SD)[, DATETIME := DATETIME - window]
           setkeyv(data0, subkey)
           
           tmp2 <- .SD[
             data0, 
             on = subkey,
             roll = -Inf, 
             which = TRUE, 
             mult = "first"
           ]
           
           ## 3 - uno los indices
           # nro de items por grupo
           
           idx1 <- tmp1-tmp2+1L  
           
           ##  agregar grp y obtener el resultado deseado
           
           #ans <- copy(.SD)[
           ans <- .SD[
             # idx2: repito idx1 veces cada valor de tmp2, hasta un maximo 
             # de clamp = sum(idx1). Este tercer valor no es importante,
             # puede ser un número enorme.
             # Me quedo con cada una de estas repeticiones idx2, 
             # por eso joineo
             data.table:::vecseq(tmp2, idx1, clamp = sum(as.numeric(idx1)))
           ][, 
             c(
               .SD[.N],
               N = .N,
               
               lapply(seq_along(aggregated_fun), function(i){
                 
                 # considerar utilizar fifelse
                 if(
                   count_myself == TRUE
                 ){
                   if (aggregated_fun[[i]] == "CON")
                     res <- sum(PRED[-.N])
                   if (aggregated_fun[[i]] == "SUM") 
                     res <- sum(PRED[-.N] * Value[-.N])
                 } else {
                   if (aggregated_fun[[i]] == "CON")
                     res <- sum(PRED)
                   if (aggregated_fun[[i]] == "SUM") 
                     res <- sum(PRED * Value)
                 }
                 
                 res
                 
               })
               
             ),
             by = rep(seq_along(idx1), idx1),
             .SDcols = subkey
           ]
           # me quedo con las últimas columnas.
           # TODO: ver si se puede obnerlas previemente con nombre
           # idea: sustituir c por := y luego distinct
           ans[, 
               (ncol(ans) - length(aggregated_fun) + 1):ncol(ans)
           ]
           
         }
       ),
       by = rolling_over,
       env = list(
         nombres_col = I(name),
         aggregated_fun = I(aggregated_fun)
       )
  ][
    , PRED := NULL
  ]
  
}

Calling to the function

setDT(data)
build_counter.data.table(
      data = data,
      name = "MY_COUNTER",
      predicate = fifelse(
        CODE %in% c('IP'),
        TRUE, FALSE
      ),
      aggregated_fun = list("CON", "SUM"),
      duration = 2L,
      ut = "hours",
      rolling_over = "ID",
      count_myself = TRUE
    )
    data

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.