Spark 1.6.1, and SparkR install and use in MacOSX or Linux

With the help of Julien Nauroy (DI Université Paris Sud) and Maxime Jacquemin (ESE Université Paris Sud)
__________________________________________________________________________

To install SparkR, first go there:
https://spark.apache.org

Download the Spark software choosing these options:
- Most recent spark release (1.6.1 at the time of this post written)
- Prebuilt for Hadopi 2.6 and later
- Direct download

You will get a file spark-1.6.1-bin-hadoop2.6.tar in your download folder.

Untar it and then you will have a folder:
spark-1.6.1-bin-hadoop2.6

Copy this folder in a safe place, for example:
~/Documents/spark-1.6.1-bin-hadoop2.6

________________________
Two solutions to install the package:
library("devtools")
Sys.setenv(SPARK_HOME = "~/Documents/spark-1.6.1-bin-hadoop2.6")

install(file.path(Sys.getenv("SPARK_HOME"), "R", "lib", "SparkR"))

or enter these commands in R:
Sys.setenv(SPARK_HOME = "~/Documents/spark-1.6.1-bin-hadoop2.6")

paste0("ln -s " , file.path(Sys.getenv("SPARK_HOME"), "R", "lib", "SparkR"), " ", file.path(.libPaths()[1], "SparkR"))

Copy what this command will return into a terminal session. For example here:

> paste0("ln -s " , file.path(Sys.getenv("SPARK_HOME"), "R", "lib", "SparkR"), " ", file.path(.libPaths()[1], "SparkR"))
[1] "ln -s ~/Documents/spark-1.6.1-bin-hadoop2.6/R/lib/SparkR /Library/Frameworks/R.framework/Versions/3.4/Resources/library/SparkR"

Copy this in terminal:
ln -s ~/Documents/spark-1.6.1-bin-hadoop2.6/R/lib/SparkR /Library/Frameworks/R.framework/Versions/3.4/Resources/library/SparkR




It will create a link in the package folder to the correct place where is located the library SparkR.
________________________

Then edit the file ~/.Rprofile and add (you could better create a Rstudio project SparkR and include the .Rprofile in this folder).

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "~/Documents/Espace_de_travail_R/spark-1.6.1-bin-hadoop2.6")
}

Then open R and to load SparkR just use:
library(SparkR)
sc <- sparkR.init(master = "local[*]", sparkEnvir = list(spark.driver.memory="2g"))
sqlContext <- sparkRSQL.init(sc)

To try if all is ok:

df <- data.frame(COL1=c(1, 2, 3), COL2=c(8, 9, 10))

sdf <- createDataFrame(sqlContext, df)
str(sdf)

collect(pdf)

Note that you must refresh package information in Rstudio to see that SparkR is loaded.

If you don't want to see all the informations shown in your console, copy this file:
~/Documents/spark-1.6.1-bin-hadoop2.6/conf/log4j.properties.template
to 
~/Documents/spark-1.6.1-bin-hadoop2.6/conf/log4j.properties

and change the parameter log4j.rootCategory to:
log4j.rootCategory=WARN, console

=======================
And now the first try:


library(SparkR)

piFuncVec <- function(elems) {
  rands1 <- runif(n = length(elems), min = -1, max = 1)
  rands2 <- runif(n = length(elems), min = -1, max = 1)
  val <- ifelse((rands1^2 + rands2^2) < 1, 1.0, 0.0)
  sum(val)
}

if (exists(as.character(substitute(sc)))) sparkR.stop()
sc <- sparkR.init(master = "local[*]", appName = "PiR", 
                  sparkEnvir = list(spark.driver.memory = "2g"))

slices <- 10

n <- 10000 * slices

rdd <- SparkR:::parallelize(sc, 1:n, slices)
count <- SparkR:::reduce(SparkR:::lapplyPartition(rdd, piFuncVec), sum)
cat("Pi is roughly", 4.0 * count / n, "\n")
cat("Num elements in RDD ", count(rdd), "\n")

> cat("Pi is roughly", 4.0 * count / n, "\n")
Pi is roughly 3.1448 
> cat("Num elements in RDD ", count(rdd), "\n")
Num elements in RDD  100000 




______________________________________________________________
Another example:

library(SparkR)

sparkR.stop()
sc <- sparkR.init(master = "local[*]", appName = "SparkR-data-manipulation-example", 
                  sparkEnvir = list(spark.driver.memory = "2g"))


## Initialize SQLContext
sqlContext <- sparkRSQL.init(sc)

# The data can be downloaded from: http://s3-us-west-2.amazonaws.com/sparkr-data/flights.csv
flightsCsvPath <- "flights.csv"

# Create a local R dataframe
flights_df <- read.csv(flightsCsvPath, header = TRUE)
flights_df$date <- as.Date(flights_df$date)

## Filter flights whose destination is San Francisco and write to a local data frame
SFO_df <- flights_df[flights_df$dest == "SFO", ]

# Convert the local data frame into a SparkR DataFrame
SFO_DF <- createDataFrame(sqlContext, SFO_df)

# Print the schema of this Spark DataFrame
printSchema(SFO_DF)

# Cache the DataFrame
cache(SFO_DF)

# Print the first 6 rows of the DataFrame
showDF(SFO_DF, numRows = 6) ## or
head(SFO_DF)

# Show the number of rows in the DataFrame
count(SFO_DF)

# Select specific columns
destDF <- select(SFO_DF, "dest", "cancelled")

# Using SQL to select columns of data
# First, register the flights DataFrame as a table
registerTempTable(SFO_DF, "flightsTable")
destDF <- sql(sqlContext, "SELECT dest, cancelled FROM flightsTable")

# Use collect to create a local R data frame
local_df <- collect(destDF)

# Print the newly created local data frame
head(local_df)

# Filter flights whose destination is JFK
jfkDF <- filter(SFO_DF, "dest = \"JFK\"") ##OR
jfkDF <- filter(SFO_DF, SFO_DF$dest == "JFK")

# If the magrittr library is available, we can use it to
# chain data frame operations
if("magrittr" %in% rownames(installed.packages())) {
  library(magrittr)
  
  # Group the flights by date and then find the average daily delay
  # Write the result into a DataFrame
  groupBy(SFO_DF, SFO_DF$date) %>%
    summarize(avg(SFO_DF$dep_delay), avg(SFO_DF$arr_delay)) -> dailyDelayDF
  
  # Print the computed data frame
  head(dailyDelayDF)
}

sparkR.stop()

______________________________________________________________
You can check how things are running here:
http://localhost:4040

If you have problem to get java connected into R, try this:
install.packages("java")
And in terminal:

R CMD javareconf -e

Commentaires

Posts les plus consultés de ce blog

Standard error from Hessian Matrix... what can be done when problem occurs

stepAIC from package MASS with AICc

Install treemix in ubuntu 20.04