2

I'm working with 50 Parquet files (each file is of ~800MB with ~380000 rows and ~8 columns). I need to perform a grouped summarisation in R. Something like:

group_by(sample_id, gene1, gene2) %>% 
  summarise(mean_importance = mean(importance), 
            mean_count = mean(n_count)) %>%
  pivot_wider(names_from = "sample_id", 
              values_from = c("mean_importance", "mean_count"), 
              names_sep = "__")

Here, pivot_wider() is not available via arrow. So just before it, I need to collect() the data as a dataframe and then apply pivot_wider(). As soon as I apply collect(), I encounter memory issues (core dumped, bad_alloc). What is the best way to handle this large data without running into memory errors? The experimental batch processing seemed like an option, but I will not be able to make batches by random sub-setting. Rather, it would be ideal to subset via the group_by columns.

I was trying out listing all the possible groups and then using mclapply to process the data.

pq_files <- list.files("path", full.names = TRUE)
pq_files <- open_dataset(sources = pq_files)

grp_list <- expand_grid("gene1" = gene1, 
                         "gene2" = gene2) %>% 
  filter(gene1 != gene2)

res <- mclapply(X = 1:nrow(grp_list), 
                  mc.cores = 60,
                  FUN = function(i){
                    
                    pq_files %>% 
                      filter((gene1 == grp_list$gene1[i]) & (gene1 == grp_list$gene1[i])) %>% 
                      group_by(sample_id, gene1, gene2) %>% 
                      summarise(mean_importance = mean(importance), 
                                mean_count = mean(n_count)) %>%
                      collect() %>%
                      pivot_wider(names_from = "sample_id", 
                                  values_from = c("mean_importance", "mean_count"), 
                                  names_sep = "__")
                    
                  })

But this means it will repeatedly interact with the files on disk, I suppose. Is there a more efficient way to do this? Will using other filetypes/packages help improve performance?

Note: Query cross-posted in GitHub/arrow.

4
  • 2
    Are your Parquet files partitioned by any of your grouping vars? Perhaps one file per sample_id? Are you after dplyr-like pipeline (arrow / dbplyr / duckplyr) or would duckdb + SQL be OK? Anyway, a small reproducible example dataset would certainly help others to help you. Commented Mar 1 at 21:04
  • Unfortunately no. Each file is from a different repetition of a calculation. I need to calculate mean, SD, etc across all the repeats. I am not very familiar with DuckDB and SQL, but chasing after a solution, I have explored arrow +duckdb and dbplyr. The issue is that small datasets works fine even with this code. But with the data size I have it is too large to handle on RAM. Commented Mar 1 at 22:37
  • 1
    Just to comment on your latest code snippet in linked GH issue -- pivot_wider.tbl_lazy() collects data and thus triggers early materialization in duckdb. From linked doc: "Note that pivot_wider() is not and cannot be lazy because we need to look at the data to figure out what the new column names will be. If you have a long running query you have two options ..." Small dataset would do just fine for testing strategies and checking generated dbplyr sql queries & duckdb execution plans to assess if those might scale. Commented Mar 2 at 14:56
  • did you consider using duckdb : duckdb.org/docs/stable/sql/statements/pivot.html Commented Mar 2 at 19:34

2 Answers 2

3

Ignore this answer if this is more about playing with the Parquet format. However, for grouping operations on 50 380K×8 data sets, you might consider a data.table approach.

> library(data.table)
> 
> ## load data into list
> calc_paths <- list.files('.', pattern='^calc', full.names=TRUE)
> lst <- lapply(calc_paths, fread)
> ## rbind, add id in the process
> DT <- rbindlist(lst, idcol=TRUE)
> rm(lst)
> head(DT, 3)
   sample_id         gene1         gene2  importance n_count        foo        bar        baz  calc
       <int>        <char>        <char>       <num>   <int>      <num>      <num>      <num> <int>
1:         1 R-HSA-3207141 R-HSA-4248729 0.002099328       2 -0.5163218 -2.0224784  0.1751066     1
2:         1 R-HSA-3207141 R-HSA-4818148 0.837731566      99 -0.4989981  0.0605347  3.0834226     1
3:         1 R-HSA-3207141 R-HSA-1122938 0.265865533      86 -0.5294986 -0.9458609 -0.3671904     1
> dim(DT)
[1] 18975000        9
> nrow(DT)/50
[1] 379500
> cat(as.numeric(object.size(DT))/1024^2, 'MB\n')  ## has altogether just 1 GB
1085.795 MB
> 
> ## perform averaging over 50 calculations, takes one minute
> res <- dcast(DT, gene1 + gene2 ~ .id, value.var=c('importance', 'n_count'),
+              fun.aggregate=c(mean, sd))
Processed 3795000 groups out of 3795000. 100% done. Time elapsed: 58s. ETA: 0s.

Gives

> res[, c(1:5, 53:54, 103:104, 153:154)]
Key: <gene1, gene2>
               gene1         gene2 importance_mean_1 importance_mean_2 importance_mean_3 n_count_mean_1
              <char>        <char>             <num>             <num>             <num>          <num>
    1: R-HSA-1044954 R-HSA-1048670         0.5034147         0.4911527         0.4260874           43.4
    2: R-HSA-1044954 R-HSA-1060926         0.6358689         0.4799145         0.7599004           49.0
    3: R-HSA-1044954 R-HSA-1060946         0.3787561         0.6303922         0.5512408           54.8
    4: R-HSA-1044954 R-HSA-1122938         0.5456925         0.3828336         0.3285011           44.6
    5: R-HSA-1044954 R-HSA-1140279         0.4362493         0.3417474         0.5717543           64.8
   ---                                                                                                 
75896:  R-HSA-902336 R-HSA-7967969         0.6908568         0.5781419         0.5774333           39.2
75897:  R-HSA-902336   R-HSA-80412         0.4561290         0.3573683         0.4122798           35.4
75898:  R-HSA-902336    R-HSA-8081         0.4873442         0.6198936         0.6669743           62.8
75899:  R-HSA-902336  R-HSA-821215         0.3568274         0.3085601         0.2843851           42.0
75900:  R-HSA-902336  R-HSA-864879         0.3326516         0.3117023         0.5920007           49.2
       n_count_mean_2 importance_sd_1 importance_sd_2 n_count_sd_1 n_count_sd_2
                <num>           <num>           <num>        <num>        <num>
    1:           47.8       0.3610905      0.33258767     36.14277     41.44514
    2:           50.4       0.2390627      0.33043443     33.63034     29.46693
    3:           44.8       0.1894234      0.42000253     27.49000     19.72815
    4:           55.0       0.3544943      0.04998031     28.78020     26.23928
    5:           59.8       0.2463601      0.22524469     22.73104     33.28213
   ---                                                                         
75896:           24.4       0.2358344      0.39416754     28.56046     33.42604
75897:           69.4       0.1917775      0.26613427     34.07052     21.93855
75898:           51.2       0.1939475      0.21306278     29.43128     29.92825
75899:           47.0       0.1616249      0.19064479     37.70279     23.38803
75900:           34.0       0.2480584      0.12403069     28.78715     14.83240

Data:

Below, I've simulated the data based on your description. However, I don't understand how you arrived at 800 MB per file; that's much more than one would expect. Since I don't think Parquet has that much overhead data, you may want to delete unnecessary columns before proceeding. Of course, I could be completely off the mark here.

> RcppAlgos::permuteCount(276, 2)*5  ## calc. genes needed
[1] 379500
> 
> genes <- paste0('R-HSA-', sample.int(8e6, 276))  ## sample genes
> 
> make_data <- \(n_genes=276, n_samp=5) {
+   ## sample function
+   df <- lapply(seq_len(n_samp), \(i) {
+     cbind(sample_id=i, RcppAlgos::permuteGeneral(genes, 2) |>
+             `colnames<-`(c('gene1', 'gene2')))
+   }) |> do.call(what='rbind') |> as.data.frame()
+   df$importance <- truncnorm::rtruncnorm(nrow(df), 0, 1)
+   df$n_count <- round(runif(nrow(df), 0, 100))
+   df$foo <- rnorm(nrow(df))
+   df$bar <- rnorm(nrow(df))
+   df$baz <- rnorm(nrow(df))
+   df |> as.data.table()
+ }
> 
> ## test
> tmp <- make_data()
> 
> dim(tmp)
[1] 379500      8
> cat(as.numeric(object.size(tmp))/1024^2, 'MB\n')  ## <---- you seem to have ~800 MB here, why?
23.19916 MB
> rm(tmp)
> 
> ## create data files in .tsv format
> ncpu <- parallel::detectCores() - 1L
> pbmcapply::pbmclapply(seq_len(50), \(i) {
+   write.table(make_data(), sprintf('calc_%02d.tsv', i), sep='\t', quote=FALSE, 
+               row.names=FALSE)
+ }, mc.cores=ncpu) |> invisible()
  |=======================================================================================| 100%, Elapsed 00:06
Sign up to request clarification or add additional context in comments.

2 Comments

Why lst <- Map(`[<-`, lst, 'calc', value=seq_along(lst)) instead of lst = data.table::rbindlist(lst, idcol = TRUE)?
@Friede Cool, didn't know that feature. Updated!
2

Consider splitting each parquet file to disk by gene pairs, avoiding holding all data in memory, then read iteratively or by parallel process for gene pair aggregation.

Split Each Parquet by Gene Pairs (can parallelize by pq_files and grp_list)

pq_files <- list.files(path = "path", full.names = TRUE)

grp_list <- expand_grid(
  "gene1" = gene1, "gene2" = gene2, stringsAsFactors = FALSE
) %>% filter(gene1 != gene2)

for(i in seq_along(pq_files)) {
  for(j in 1:nrow(grp_list)) {
      pq_df <- read_parquet(pq_files[i]) %>% 
        filter(gene1 == grp_list$gene1[j] & gene2 == grp_list$gene2[j])
      write_parquet(pq_df, paste0(g1, "_", g2, "_", i, ".parquet")) 
    },
  }
}

Combine and Aggregate Each Gene Pair Parquet (can parallelize by grp_list)

for(i in 1:nrow(grp_list)) {
  patt = paste0(grp_list$gene1[i], "_", grp_list$gene2[i], "_.*\\.parquet") 
  pq_files <- list.files(path = "path", pattern=patt, full.names = TRUE)
  pq_df <- bind_rows(lapply(pq_files, read_parquet))

  pq_agg <- pq_df %>% 
    group_by(sample_id, gene1, gene2) %>% 
    summarise(mean_importance = mean(importance), 
              mean_count = mean(n_count)) %>%
    pivot_wider(names_from = "sample_id", 
                values_from = c("mean_importance", "mean_count"), 
                names_sep = "__")

  write_parquet(
    pq_agg,
    paste0(grp_list$gene1[1], "_", grp_list$gene2[i], "_agg.parquet")
  )
}

Combine All Aggregated Parquets

pq_agg_files <- list.files(path="path", pattern="_agg.parquet$", full.names = TRUE)

final_pq_df <- bind_rows(lapply(pq_agg_files, read_parquet))

3 Comments

That is going to be a lot of I/O operations. That does not sound a good idea.
@ArindamGhosh it's a good idea to avoid a memory bottleneck. If you have an additional I/O bottleneck then you should probably include this in the question.
Yes, it is a trade-off. How many gene pairs do you have? If all data can't hold in memory, leverage disk space. Or look to high performance, cluster, or cloud computing resources.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.