Handling large data with polars and tidypolars

Motivation



The use of big data in economics is growing:

  • full censuses
  • genomics data
  • mobile phone data
  • and more…

Motivation



Usually, we load datasets directly in R (or Stata, or whatever).


This means that we are limited by the amount of RAM1 on our computer.


You want to handle more data? Get more RAM.

Motivation



But we can’t necessarily have all the RAM we need.



And we don’t necessarily need all this RAM in the first place.

Motivation

Motivation

Introduction to polars

Introduction


polars is a recent DataFrame library that is available for several languages:

  • Python
  • R
  • Rust
  • and more


Built to have a syntax similar (though not identical) to Python Pandas.


Built to be very fast and memory-efficient thanks to several mechanisms.

Eager vs Lazy

Eager vs Lazy



Eager evaluation: all operations are run line by line, in order, and directly applied on the data. This is the way we’re used to.


Lazy evaluation: operations are only run when we call a specific function at the end of the chain, usually called collect().


When dealing with large data, it is better to use lazy evaluation.

Eager vs Lazy


Why is it better to use lazy evaluation?


  1. Optimize the query plan
  2. Catch schema errors before computations
  3. Use streaming mode

1. Optimize the query plan


The code below takes some data, sorts it by a variable, and then filters it based on a condition:


tmp <- test[order(test$country), ]
subset(tmp, country %in% c("United Kingdom", "Japan", "Vietnam"))


Do you see what could be improved here?

1. Optimize the query plan


The problem lies in the order of operations: sorting data is much slower than filtering it.


Let’s test with a dataset of 2M observations and 2 columns:

system.time({
  tmp <- test[order(test$country), ]
  subset(tmp, country %in% c("United Kingdom", "Japan", "Vietnam"))
})
   user  system elapsed 
  12.00    0.02   12.12 
system.time({
  tmp <- subset(test, country %in% c("United Kingdom", "Japan", "Vietnam"))
  tmp[order(tmp$country), ]
})
   user  system elapsed 
   2.11    0.00    2.15 


We gained a lot of time just by reordering these two operations.

1. Optimize the query plan



There are probably tons of suboptimal code in our scripts.


But it’s already hard enough to make scripts that work and that are reproducible, we don’t want to spend even more time trying to optimize them.


  Let polars do this automatically.

1. Optimize the query plan


Workflow:

  1. scan the data to get it in lazy mode.
import polars as pl

raw_data = pl.scan_parquet("path/to/file.parquet")

# Or
pl.scan_csv("path/to/file.csv")
pl.scan_json("path/to/file.json")
...
library(polars)

raw_data = pl$scan_parquet("path/to/file.parquet")

# Or
pl$scan_csv("path/to/file.csv")
pl$scan_json("path/to/file.json")
...

This only returns the schema of the data: the column names and their types (character, integers, …).

1. Optimize the query plan


Workflow:

  1. Write the query that you want to run on the data: filter, sort, create new variables, etc.
my_data = raw_data
   .sort("iso")
   .filter(
      pl.col("gdp") > 123,
      pl.col("country").is_in(["United Kingdom", "Japan", "Vietnam"])
   )
   .with_columns(gdp_per_cap = pl.col("gdp") / pl.col("population"))
my_data = raw_data$
   sort("iso")$
   filter(
      pl$col("gdp") > 123,
      pl$col("country")$is_in(c("United Kingdom", "Japan", "Vietnam"))
   )$
   with_columns(gdp_per_cap = pl$col("gdp") / pl$col("population"))
my_data = raw_data |> 
   arrange(iso) |> 
   filter(
      gdp > 123,
      country %in% c("United Kingdom", "Japan", "Vietnam")
   ) |> 
   mutate(gdp_per_cap =  gdp / population)

1. Optimize the query plan


Workflow:


  1. Call collect() at the end of the query to execute it.
my_data.collect()
my_data$collect()
my_data |> collect()

1. Optimize the query plan


polars doesn’t directly execute the code. Before that, it does a lot of optimizations to be sure that we don’t do inefficient operations.


Examples of optimizations:

  • predicate pushdown: do not load rows that are filtered out in the query;
  • projection pushdown: do not load variables (columns) that are never used;
  • cache and reuse computations
  • and many more things

1. Optimize the query plan



Using the previous example, the non-optimized query looks like this:

lazy_query$describe_plan()
FILTER col("country").is_in([Series]) FROM
SORT BY [col("country")]
  DF ["country", "year"]; PROJECT */2 COLUMNS; SELECTION: "None"


And the optimized query looks like:

lazy_query$describe_optimized_plan()
SORT BY [col("country")]
  DF ["country", "year"]; PROJECT */2 COLUMNS; SELECTION: "col(\"country\").is_in([Series])"

2. Catch schema errors before computations

2. Catch schema errors before computations



Calling collect() doesn’t start computations right away.


First, polars scans the query to ensure there are no schema errors, e.g doing pl.col("gdp") > "France".


In this specific case, that would return:

polars.exceptions.ComputeError: cannot compare string with numeric data

3. Use streaming mode

3. Use streaming mode



It is possible that the collected dataset is still too big for our RAM.


In this case, this will provoke a crash of the R / Python session.


Streaming is a way to run the code on batches of data to avoid using all memory at the same time.

3. Use streaming mode



Using this is extremely simple: instead of calling collect(), we call collect(streaming = TRUE).


Warning

This is still an early, incomplete feature of polars for now.







(Quick) Demo time!

Going further



If you’re interested:


Those projects are rapidly evolving, please report any bug on GitHub!

Conclusion

Conclusion



Getting bigger computers shouldn’t necessarily be the first reflex to handle large data.


There are several other tools available: arrow, DuckDB, Spark, and others.


Do you use any of them? How do you deal with large datasets?