You're looking for pipe(). When used inside a loop like repeat(), readLines() continues from the current position — it doesn't restart gunzip or re-decompress previous content.
process_chunks <- \(x, total.lines=1e5, chunk.size=1e3) {
n_chunks <- ceiling(total.lines/chunk.size)
unix <- identical(.Platform$OS.type, "unix")
## open pipe
if (!unix) {
con <- pipe(sprintf("7z e -so %s", shQuote(x)), open="r") ## Windows fallback (not tested)
} else {
con <- pipe(sprintf("gunzip -c %s", shQuote(x)), open="r")
}
on.exit(try(close(con), silent=TRUE)) ## ensure pipe is closed gracefully on exit
res_list <- vector(mode='list', length=n_chunks)
i <- 1
repeat {
lins <- readLines(con, n=chunk.size)
if (length(lins) == 0) break
df <- data.table::fread(text=lins)
## Process data, save in list
res_list[[i]] <- colSums(df)
## ++++++++++++++++++++++++++
i <- i + 1
}
do.call(rbind, res_list) ## rbind result
}
Note: Solution as is assumes there's just data in the .tsv's, no header.
Usage
Single file:
> process_chunks("foo1.tsv.gz") |> head()
V1 V2 V3 V4
[1,] -25.824427 -38.1319442 -15.260574 11.32532
[2,] -5.317994 -66.8804838 -3.754295 40.01791
[3,] -3.206987 -0.4199584 31.328836 11.47539
[4,] -21.786821 36.2002708 -25.986968 -12.03419
[5,] -15.829041 -5.8027936 -25.947610 26.12207
[6,] 23.008565 34.1792188 71.192981 -13.35848
Multiple files:
> in_Files <- c("foo1.tsv.gz", "foo2.tsv.gz", "foo3.tsv.gz")
> lapply(in_Files, process_chunks, total.lines=1e5, chunk.size=1e3) |> lapply(head)
[[1]]
V1 V2 V3 V4
[1,] -25.824427 -38.1319442 -15.260574 11.32532
[2,] -5.317994 -66.8804838 -3.754295 40.01791
[3,] -3.206987 -0.4199584 31.328836 11.47539
[4,] -21.786821 36.2002708 -25.986968 -12.03419
[5,] -15.829041 -5.8027936 -25.947610 26.12207
[6,] 23.008565 34.1792188 71.192981 -13.35848
[[2]]
V1 V2 V3 V4
[1,] -25.824427 -38.1319442 -15.260574 11.32532
[2,] -5.317994 -66.8804838 -3.754295 40.01791
[3,] -3.206987 -0.4199584 31.328836 11.47539
[4,] -21.786821 36.2002708 -25.986968 -12.03419
[5,] -15.829041 -5.8027936 -25.947610 26.12207
[6,] 23.008565 34.1792188 71.192981 -13.35848
[[3]]
V1 V2 V3 V4
[1,] -25.824427 -38.1319442 -15.260574 11.32532
[2,] -5.317994 -66.8804838 -3.754295 40.01791
[3,] -3.206987 -0.4199584 31.328836 11.47539
[4,] -21.786821 36.2002708 -25.986968 -12.03419
[5,] -15.829041 -5.8027936 -25.947610 26.12207
[6,] 23.008565 34.1792188 71.192981 -13.35848
On Linux we might use parallel::mclapply:
parallel::mclapply(in_Files, process_chunks, mc.cores=parallel::detectCores() - 1)
Enhanced Alternative
No need to specify total lines; a flexible function (FX) is applied per chunk, metadata lines (skip) can be skipped, and a header is supported. The shell command (unz) is customizable for any decompression tool. matrix calculations are supported by default, and a warning is issued if the last chunk is smaller than expected.
process_chunks2 <- \(x, FX, csz=1e3, skip=0L, header=FALSE, matrix=TRUE,
unz='gunzip -c', warn=TRUE, ...) {
unix <- identical(.Platform$OS.type, "unix")
xq <- shQuote(x, if (!unix) 'cmd' else 'sh')
con <- pipe(sprintf("%s %s", unz, xq), open="r") ## open pipe
on.exit(try(close(con), silent=TRUE)) ## ensure pipe is closed gracefully on exit
res_list <- list()
i <- 1
if (skip > 0L) {
readLines(con, n=skip)
}
if (header) {
hd <- colnames(data.table::fread(text=readLines(con, n=1)))
}
repeat {
lins <- readLines(con, n=csz)
if (length(lins) == 0) break
ch <- data.table::fread(text=lins)
if (matrix) {
ch <- as.matrix(ch)
}
if (warn && (nr <- nrow(ch)) < csz) {
warning(sprintf("Final chunk short: %d < %d", nr, csz))
}
res_list[[i]] <- FX(ch, ...) ## process chunk
i <- i + 1
}
out <- do.call(rbind, res_list) ## rbind result
if (header) {
`colnames<-`(out, hd)
} else{
`colnames<-`(out, NULL)
}
}
> process_chunks2(x='bar.tsv.gz', FX=matrixStats::colMeans2, skip=6, header=FALSE) |> head(2)
[,1] [,2] [,3] [,4]
[1,] -0.025824427 -0.03813194 -0.015260574 0.01132532
[2,] -0.005317994 -0.06688048 -0.003754295 0.04001791
> process_chunks2(x='bar.tsv.gz', FX=matrixStats::colMeans2, skip=5, header=TRUE) |> head(2)
A1 A2 A3 A4
[1,] -0.025824427 -0.03813194 -0.015260574 0.01132532
[2,] -0.005317994 -0.06688048 -0.003754295 0.04001791
Example where total rows are not divisible by chunk size (e.g., m <- 1e5 - 1 in Data, infra):
> process_chunks2(x='bar.tsv.gz', FX=matrixStats::colMeans2, skip=6, header=FALSE) |> head(2)
[,1] [,2] [,3] [,4]
[1,] -0.025824427 -0.03763184 -0.01190839 0.01348543
[2,] -0.005317994 -0.06963092 -0.00367911 0.03837964
Warning message:
In process_chunks2(x = "bar.tsv.gz", FX = matrixStats::colMeans2, :
Final chunk short: 999 < 1000
Data:
(For Linux. Eight files will be created in current directory.)
m <- 1e5; n <- 4
set.seed(42)
mat <- matrix(rnorm(m*n), m, n)
mat |>
write.table('foo.tsv', row.names=FALSE, col.names=FALSE, sep='\t')
system('pigz -p 7 -f foo.tsv')
system('for i in 1 2 3; do cp foo.tsv.gz foo${i}.tsv.gz; done')
mat |>
`colnames<-`(paste0('A', seq_len(n))) |>
data.table::fwrite('bar.tmp', row.names=FALSE, col.names=TRUE, sep='\t')
writeLines(c(
"# File: bar.tsv.gz",
"# Created: 2025-04-06",
"# Rows: 100000 (approx.)",
"# Delimiter: tab",
"# Generator: R/data.table::fwrite()"
), "meta.tmp")
system("cat meta.txt bar.tmp > bar.tsv")
file.remove("meta.tmp", "bar.tmp")
system('pigz -p 7 -f bar.tsv')
system('for i in 1 2 3; do cp bar.tsv.gz bar${i}.tsv.gz; done')
mlr --csv split -n 10000 file1.csv.gz file2.csv.gzThat will split them into split_*.csv files and then you can read each one into R in a loop. If the files do not have the sqame format use separate mlr calls for each.mlr split --helpfor more info.