3

I'm trying to read through multiple compressed tables that are 5GB+ in size in R, and because I have insufficient memory to read them into memory all at once I need to process them one chunk at a time, for example the first 1000 rows of each file, then the next 1000 rows of each file, etc. I know how to keep a file open with a cursor or file pointer saved in basically any language other than R. How can I do that here?

I'm currently doing something a lot like this:

library(data.table)
library(R.utils)

inFiles = c("file1.tsv.gz", "file2.tsv.gz", "file3.tsv.gz")
totallines <- 10000
chunksize <- 1000

iters          <- 1
skip_val       <- 0
max_iters      <- ceiling(totallines/chunksize)

while (iters <= max_iters) {

    
    data = lapply(inFiles,function(file) {
      data.table::fread(file, nrows=chunksize, skip=skip_val,
                        col.names=data_colnames, sep="\t")
    })

    # Process the data in omitted code here

    # Move on to the next chunk
    iters    = iters + 1
    skip_val = skip_val + chunksize
}

The problem is that these files are large-ish and compressed, and the smaller the chunksize or larger the file, the program spends more and more of its time just reading because of the skipped lines. Every single time it reads the next chunk, it also has to decompress and skip all of the previous lines.

I looked at readr::read_delim_chunked , but am not sure how I could use it to iterate through many files at once.

1
  • One easy way is to download the miller utility available on all platforms that R runs on and run this from the command line/shell (not from R) assuming you want 10,000 row chunks: mlr --csv split -n 10000 file1.csv.gz file2.csv.gz That will split them into split_*.csv files and then you can read each one into R in a loop. If the files do not have the sqame format use separate mlr calls for each. mlr split --help for more info. Commented Apr 5 at 14:58

1 Answer 1

5

You're looking for pipe(). When used inside a loop like repeat(), readLines() continues from the current position — it doesn't restart gunzip or re-decompress previous content.

process_chunks <- \(x, total.lines=1e5, chunk.size=1e3) {
  n_chunks <- ceiling(total.lines/chunk.size)
  unix <- identical(.Platform$OS.type, "unix")
  ## open pipe
  if (!unix) {
    con <- pipe(sprintf("7z e -so %s", shQuote(x)), open="r")  ## Windows fallback (not tested)
  } else {
    con <- pipe(sprintf("gunzip -c %s", shQuote(x)), open="r")
  }
  on.exit(try(close(con), silent=TRUE))  ## ensure pipe is closed gracefully on exit
  res_list <- vector(mode='list', length=n_chunks)
  i <- 1
  repeat {
    lins <- readLines(con, n=chunk.size)
    if (length(lins) == 0) break
    df <- data.table::fread(text=lins)
    ## Process data, save in list
    res_list[[i]] <- colSums(df)  
    ## ++++++++++++++++++++++++++
    i <- i + 1
  }
  do.call(rbind, res_list)  ## rbind result
}

Note: Solution as is assumes there's just data in the .tsv's, no header.

Usage

Single file:

> process_chunks("foo1.tsv.gz") |> head()
             V1          V2         V3        V4
[1,] -25.824427 -38.1319442 -15.260574  11.32532
[2,]  -5.317994 -66.8804838  -3.754295  40.01791
[3,]  -3.206987  -0.4199584  31.328836  11.47539
[4,] -21.786821  36.2002708 -25.986968 -12.03419
[5,] -15.829041  -5.8027936 -25.947610  26.12207
[6,]  23.008565  34.1792188  71.192981 -13.35848

Multiple files:

> in_Files <- c("foo1.tsv.gz", "foo2.tsv.gz", "foo3.tsv.gz")
> lapply(in_Files, process_chunks, total.lines=1e5, chunk.size=1e3) |> lapply(head)
[[1]]
             V1          V2         V3        V4
[1,] -25.824427 -38.1319442 -15.260574  11.32532
[2,]  -5.317994 -66.8804838  -3.754295  40.01791
[3,]  -3.206987  -0.4199584  31.328836  11.47539
[4,] -21.786821  36.2002708 -25.986968 -12.03419
[5,] -15.829041  -5.8027936 -25.947610  26.12207
[6,]  23.008565  34.1792188  71.192981 -13.35848

[[2]]
             V1          V2         V3        V4
[1,] -25.824427 -38.1319442 -15.260574  11.32532
[2,]  -5.317994 -66.8804838  -3.754295  40.01791
[3,]  -3.206987  -0.4199584  31.328836  11.47539
[4,] -21.786821  36.2002708 -25.986968 -12.03419
[5,] -15.829041  -5.8027936 -25.947610  26.12207
[6,]  23.008565  34.1792188  71.192981 -13.35848

[[3]]
             V1          V2         V3        V4
[1,] -25.824427 -38.1319442 -15.260574  11.32532
[2,]  -5.317994 -66.8804838  -3.754295  40.01791
[3,]  -3.206987  -0.4199584  31.328836  11.47539
[4,] -21.786821  36.2002708 -25.986968 -12.03419
[5,] -15.829041  -5.8027936 -25.947610  26.12207
[6,]  23.008565  34.1792188  71.192981 -13.35848

On Linux we might use parallel::mclapply:

parallel::mclapply(in_Files, process_chunks, mc.cores=parallel::detectCores() - 1)

Enhanced Alternative

No need to specify total lines; a flexible function (FX) is applied per chunk, metadata lines (skip) can be skipped, and a header is supported. The shell command (unz) is customizable for any decompression tool. matrix calculations are supported by default, and a warning is issued if the last chunk is smaller than expected.

process_chunks2 <- \(x, FX, csz=1e3, skip=0L, header=FALSE, matrix=TRUE, 
                     unz='gunzip -c', warn=TRUE, ...) {
  unix <- identical(.Platform$OS.type, "unix")
  xq <- shQuote(x, if (!unix) 'cmd' else 'sh')
  con <- pipe(sprintf("%s %s", unz, xq), open="r")  ## open pipe
  on.exit(try(close(con), silent=TRUE))  ## ensure pipe is closed gracefully on exit
  res_list <- list()
  i <- 1
  if (skip > 0L) {
    readLines(con, n=skip)
  }
  if (header) {
   hd <- colnames(data.table::fread(text=readLines(con, n=1)))
  }
  repeat {
    lins <- readLines(con, n=csz)
    if (length(lins) == 0) break
    ch <- data.table::fread(text=lins)
    if (matrix) {
      ch <- as.matrix(ch)
    }
    if (warn && (nr <- nrow(ch)) < csz) {
      warning(sprintf("Final chunk short: %d < %d", nr, csz))
    }
    res_list[[i]] <- FX(ch, ...)  ## process chunk
    i <- i + 1
  }
  out <- do.call(rbind, res_list)  ## rbind result
  if (header) {
    `colnames<-`(out, hd)
  } else{
    `colnames<-`(out, NULL)
  }
}

> process_chunks2(x='bar.tsv.gz', FX=matrixStats::colMeans2, skip=6, header=FALSE) |> head(2)
             [,1]        [,2]         [,3]       [,4]
[1,] -0.025824427 -0.03813194 -0.015260574 0.01132532
[2,] -0.005317994 -0.06688048 -0.003754295 0.04001791
> process_chunks2(x='bar.tsv.gz', FX=matrixStats::colMeans2, skip=5, header=TRUE) |> head(2)
               A1          A2           A3         A4
[1,] -0.025824427 -0.03813194 -0.015260574 0.01132532
[2,] -0.005317994 -0.06688048 -0.003754295 0.04001791

Example where total rows are not divisible by chunk size (e.g., m <- 1e5 - 1 in Data, infra):

> process_chunks2(x='bar.tsv.gz', FX=matrixStats::colMeans2, skip=6, header=FALSE) |> head(2)
             [,1]        [,2]        [,3]       [,4]
[1,] -0.025824427 -0.03763184 -0.01190839 0.01348543
[2,] -0.005317994 -0.06963092 -0.00367911 0.03837964
Warning message:
In process_chunks2(x = "bar.tsv.gz", FX = matrixStats::colMeans2,  :
  Final chunk short: 999 < 1000

Data:

(For Linux. Eight files will be created in current directory.)

m <- 1e5; n <- 4
set.seed(42)
mat <- matrix(rnorm(m*n), m, n)
mat |> 
  write.table('foo.tsv', row.names=FALSE, col.names=FALSE, sep='\t')
system('pigz -p 7 -f foo.tsv')
system('for i in 1 2 3; do cp foo.tsv.gz foo${i}.tsv.gz; done')

mat |> 
  `colnames<-`(paste0('A', seq_len(n))) |> 
  data.table::fwrite('bar.tmp', row.names=FALSE, col.names=TRUE, sep='\t')
writeLines(c(
  "# File:       bar.tsv.gz",
  "# Created:    2025-04-06",
  "# Rows:       100000 (approx.)",
  "# Delimiter:  tab",
  "# Generator:  R/data.table::fwrite()"
), "meta.tmp")
system("cat meta.txt bar.tmp > bar.tsv")
file.remove("meta.tmp", "bar.tmp")
system('pigz -p 7 -f bar.tsv')
system('for i in 1 2 3; do cp bar.tsv.gz bar${i}.tsv.gz; done')
Sign up to request clarification or add additional context in comments.

3 Comments

Niiiiiiiiiiice, well done jay.sf. My only two thoughts are: (1) not sure total.lines=1e5 is known before starting? I generally like preallocation of res_list, but do you know of known copy/realloc problems with extending res_list one at a time? (This should occur natrually when i > n_chunks, I think.) (2) I'm assuming the preference for 7z over gunzip is mostly speed/efficiency, or perhaps assumed availability ... perhaps no need for unix and you can do this? exe <- Filter(nzchar, Sys.which(c("7zz", "7z", "gunzip")))[1] and perhaps check is.na(exe) if nothing was found.
(... and then adjust cli args based on which is found, with switch or such)
@r2evans Had a bit of time, so I put together an enhanced version. Turns out you were right, preallocating doesn’t make much of a difference in this case, I tested it with a few data sizes. I assumed 7z would be the default on Windows but the unzip command's fully customizable now, so the user can set whatever they prefer. Just noticed shQuote() needs the correct type= depending on the OS, so that's worth keeping in mind. I also added skip and header support, super easy to slot in. Now it throws a warning if the final chunk's short. And yeah, fast matrix ops are in too.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.