R LanguageSpark API (SparkR)


The SparkR package let's you work with distributed data frames on top of a Spark cluster. These allow you to do operations like selection, filtering, aggregation on very large datasets. SparkR overview SparkR package documentation

Setup Spark context

Setup Spark context in R

To start working with Sparks distributed dataframes, you must connect your R program with an existing Spark Cluster.

sc <- sparkR.init() # connection to Spark context
sqlContext <- sparkRSQL.init(sc) # connection to SQL context

Here are infos how to connect your IDE to a Spark cluster.

Get Spark Cluster

There is an Apache Spark introduction topic with install instructions. Basically, you can employ a Spark Cluster locally via java (see instructions) or use (non-free) cloud applications (e.g. Microsoft Azure [topic site], IBM).

Cache data


Caching can optimize computation in Spark. Caching stores data in memory and is a special case of persistence. Here is explained what happens when you cache an RDD in Spark.


Basically, caching saves an interim partial result - usually after transformations - of your original data. So, when you use the cached RDD, the already transformed data from memory is accessed without recomputing the earlier transformations.


Here is an example how to quickly access large data (here 3 GB big csv) from in-memory storage when accessing it more then once:

# next line is needed for direct csv import:
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"')
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)

# loading 3 GB big csv file:  
train <- read.df(sqlContext, "/train.csv", source = "com.databricks.spark.csv", inferSchema = "true")
# output: time elapsed: 125 s. This action invokes the caching at this point.
# output: time elapsed: 0.2 s (!!)

Create RDDs (Resilient Distributed Datasets)

From dataframe:

mtrdd <- createDataFrame(sqlContext, mtcars)

From csv:

For csv's, you need to add the csv package to the environment before initiating the Spark context:

Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"') # context for csv import read csv -> 
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)

Then, you can load the csv either by infering the data schema of the data in the columns:

train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", inferSchema = "true")

Or by specifying the data schema beforehand:

 customSchema <- structType(
    structField("margin", "integer"),
    structField("gross", "integer"),
    structField("name", "string"))

 train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", schema = customSchema)