Handling large data with polars and tidypolars

Motivation



The use of big data in all disciplines is growing:

  • full censuses
  • genomics data
  • mobile phone data
  • and more…

Motivation



Usually, we load datasets directly in R (or Stata, or whatever).


This means that we are limited by the amount of RAM1 on our computer.


You want to handle more data? Get more RAM.

Motivation



But we can’t necessarily have all the RAM we need.



And we don’t necessarily need all this RAM in the first place.

Motivation

Motivation

Introduction to polars

Introduction


polars is a recent DataFrame library that is available for several languages:

  • Python
  • R
  • Rust
  • and more


Built to have a syntax similar (though not identical) to Python Pandas.


Built to be very fast and memory-efficient thanks to several mechanisms.

Eager vs Lazy

Eager vs Lazy



Eager evaluation: all operations are run line by line, in order, and directly applied on the data. This is the way we’re used to.


Lazy evaluation: operations are only run when we call a specific function at the end of the chain, usually called collect().


When dealing with large data, it is better to use lazy evaluation.

Eager vs Lazy


Why is it better to use lazy evaluation?


  1. Optimize the code
  2. Catch errors before computations
  3. Use streaming mode

1. Optimize the code


The code below takes some data, sorts it by a variable, and then filters it based on a condition:


library(dplyr)

test |> 
   arrange(country) |> 
   filter(country %in% c("United Kingdom", "Japan", "Vietnam"))


Do you see what could be improved here?

1. Optimize the code


The problem lies in the order of operations: sorting data is much slower than filtering it.


Let’s test with a dataset of 50M observations and 10 columns:

system.time({
  test |> 
   arrange(country) |> 
   filter(country %in% c("United Kingdom", "Japan", "Vietnam"))
})
   user  system elapsed 
   8.64   13.64   28.38 
system.time({
  test |> 
   filter(country %in% c("United Kingdom", "Japan", "Vietnam")) |> 
   arrange(country)
})
   user  system elapsed 
   1.21    0.86    3.18 

1. Optimize the query plan



There is probably tons of suboptimal code in our scripts.


But it’s already hard enough to make scripts that work and that are reproducible, we don’t want to spend even more time trying to optimize them.


  Let polars do this automatically.

1. Optimize the query plan


Workflow:

  1. scan the data to get it in lazy mode.
import polars as pl

raw_data = pl.scan_parquet("path/to/file.parquet")

# Or
pl.scan_csv("path/to/file.csv")
pl.scan_json("path/to/file.json")
...
library(polars)

raw_data = pl$scan_parquet("path/to/file.parquet")

# Or
pl$scan_csv("path/to/file.csv")
pl$scan_json("path/to/file.json")
...

This only returns the schema of the data: the column names and their types (character, integers, …).

1. Optimize the query plan


Workflow:

  1. Write the code that you want to run on the data: filter, sort, create new variables, etc.
my_data = raw_data
   .sort("iso")
   .filter(
      pl.col("gdp") > 123,
      pl.col("country").is_in(["United Kingdom", "Japan", "Vietnam"])
   )
   .with_columns(gdp_per_cap = pl.col("gdp") / pl.col("population"))
my_data = raw_data$
   sort("iso")$
   filter(
      pl$col("gdp") > 123,
      pl$col("country")$is_in(c("United Kingdom", "Japan", "Vietnam"))
   )$
   with_columns(gdp_per_cap = pl$col("gdp") / pl$col("population"))
my_data = raw_data |> 
   arrange(iso) |> 
   filter(
      gdp > 123,
      country %in% c("United Kingdom", "Japan", "Vietnam")
   ) |> 
   mutate(gdp_per_cap =  gdp / population)

1. Optimize the code


Workflow:


  1. Call collect() at the end of the code to execute it.
my_data.collect()
my_data$collect()
my_data |> collect()

1. Optimize the query plan


polars doesn’t directly execute the code. Before that, it does a lot of optimizations to be sure that we don’t do inefficient operations1.


Examples of optimizations:

  • do not load rows that are filtered out in the query;
  • do not load variables (columns) that are never used;
  • cache and reuse computations
  • and many more things

2. Catch errors before computations

2. Catch errors before computations



Calling collect() doesn’t start computations right away.


First, polars scans the code to ensure there are no schema errors, i.e. check that we don’t do “forbidden” operations.


For instance, doing pl.col("gdp") > "France" would be an error: we can’t compare a number to a character.


In this case, that would return:

polars.exceptions.ComputeError: cannot compare string with numeric data

3. Use streaming mode

3. Use streaming mode



It is possible that the collected dataset is still too big for our RAM.


In this case, this will crash the Python or R session.


Streaming is a way to run the code on data by batches to avoid using all memory at the same time.


Polars takes care of splitting the data and runs the code piece by piece.

3. Use streaming mode



Using this is extremely simple: instead of calling collect(), we call collect(streaming = TRUE).


Warning

This is still an early, incomplete feature of polars for now.


Not all operations can be run in streaming mode.

A note on file formats

A note on file formats


We’re used to a few file formats: CSV, Excel, .dta. Polars can read most of them (.dta is not possible for now).


When possible, use the Parquet format (.parquet).


Pros:

  • large file compression
  • allows much faster filtering of rows

Cons:

  • cannot be viewed in Excel (but wouldn’t be possible with millions of observations anyway)







(Quick) Demo time!



  • UK Census data

  • About 40M observations, 110+ variables

Going further



If you’re interested:

Conclusion

Conclusion



Getting bigger computers shouldn’t necessarily be the first reflex to handle large data.


There are several other tools available: arrow, DuckDB, Spark, and others.


Do you use any of them? How do you deal with large datasets?





Slides: https://github.com/etiennebacher/handling-large-data



Typos, comments: https://github.com/etiennebacher/handling-large-data/issues

Appendix

Appendix


Using the previous example, the non-optimized query looks like this:

lazy_query$explain(optimized = FALSE) |> cat()
FILTER col("country").is_in([Series]) FROM
  SORT BY [col("country")]
    DF ["country", "year", "var1", "var2"]; PROJECT */10 COLUMNS; SELECTION: None


And the optimized query looks like:

lazy_query$explain(optimized = TRUE) |> cat()
SORT BY [col("country")]
  DF ["country", "year", "var1", "var2"]; PROJECT */10 COLUMNS; SELECTION: col("country").is_in([Series])

Back to slide