Taller de procesamiento de BigData en Spark + R

Manuel Parra (manuelparra@decsai.ugr.es). Soft Computing and Intelligent Information Systems . Distributed Computational Intelligence and Time Series. University of Granada. logos

Utilizando spark-submit para enviar trabajos al clúster de Spark

El script spark-submit de Spark se utiliza para iniciar aplicaciones en un clúster sin tener que hacerlo de modo interactivo.

En el taller hemos realizado un trabajo interactivo con R. Es posible que sea necesario lanzar un experimento en el clúster sin tener que realizarlo de modo interactivo por las característica propias del desarrollo, como por ejemplo:

  • Coste en tiempo, alto
  • Análisis profundos
  • Baterias de análsis
  • etc.

Esto es importante, pues permite enviar trabajos a la plataforma Spark de una forma cómoda independientemente del lenguaje se se haya utilizado para hacer los experimentos y esperar el resultado cuando se haya realizado completamente el experimento (o ejecución) en el clúster.

De modo que se podrán enviar trabajos al clúster en:

  • Scala
  • Python
  • R
  • Java

La sintaxis es la siguiente:

spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

Las opciones más usadas son:

  • --class : El punto de entrada de tu aplicación o experimento.
  • --master : La URL del Master de tu cluster. Si es local usamos local
  • --deploy-mode : Desplegar en los nodos worker o localmente (local) como cliente externo.
  • --conf : Configuración genérica de Spark.
  • --num-executors : Número de nodos que ejecutarán el trabajo
  • --num-cores : Número de cores por nodo ejecutor
  • --executor-memory : Memoria en GB por utilizada máxima por cada nodo ejecutor

Este comando se usa directamente desde el shell de la Máquina virtual

Ejemplo de aplicación en R para spark-submit

Este ejemplo recibe dos parámetros desde la línea de comandos, uno con la cantidad (amount) y otro con el fichero de salida de los datos (resultpath). Conecta con Spark, lee un dataset de ejemplo (/SparkR/datasets/csv/buy_costumers_amazon01.csv) y luego filtra ese dataset por la cantidad (amount). Finalmente almacena el resultado en la ruta indicada por el parametro de entrada resultpath.

Copia el siguiente código en un fichero en R con el nombre miaplicacion.R.

# Leo los agurmentos
args <- commandArgs(trailingOnly =TRUE)

# Tomo el parametyos amount para hacer alguna operación
amount <- args[1]    
resultpath <- args[2]

# Fijamos la ruta donde está instalado Spark
Sys.setenv("SPARK_HOME"='/usr/local/spark/')
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R/lib/"),.libPaths()))

library(SparkR)

# Abro la sesion de SparkR
sparkR.session(appName="EntornoInicio", master = "local[*]", sparkConfig = list(spark.driver.memory = "1g"))

# Leo un dataset
df <- read.df("/SparkR/datasets/csv/buy_costumers_amazon01.csv", "csv", header = "true", inferSchema = "true")

# Filtro el dataset por el parametro de entrada a la aplicacion
df1 <- filter(df,df$amount>amount)

#Escribimos el SparkDataFrame df1
write.df(df1, path = resultpath, source = "csv", mode = "overwrite")

sparkR.session.stop()

Ejecución para R en local

Para ejecutar una aplicación en R y enviarla como trabajo a Spark, hay que usar la siguiente sentencia:

spark-submit --deploy-mode <modo> --master <master> --num-executors <num> --executor-memory <mem g> <aplicacion.R> <parametro1> <parametro2> <parametro3> ...

In [ ]:
spark-submit --deploy-mode client  --master local  --num-executors 1  --executor-memory 1g miaplicacion.R 2000.0 /tmp/resultados.csv

Ejecución para R usando Rscript tanto en modo local como en clúster

In [ ]:
nohup Rscript miaplicacion.R 2000.0 /tmp/resultados.csv &

En R en clúster

Para ejecutar una aplicación en R y enviarla como trabajo a Spark, hay que usar la siguiente sentencia:

In [ ]:
spark-submit --deploy-mode cluster  --master spark://HOST:PORT  --num-executors 5  --executor-cores 5  --executor-memory 4g  miaplicacion.R 2000.0 /tmp/resultados.csv


Ejercicio práctico:

Crea una aplicación con el nombre seleccion.R que reciba 4 parámetros:

  • nombre de la columna: colname
  • número de registros: numrows
  • fichero de entrada: source_file
  • fichero de salida: destination_file

Tome el fichero de entrada, extraiga (seleccione: select(...)) sólo la columna colname y sólo numrows filas del dataset. El dataset resultante ( de una columna y numrows filas ) debe ser almacenado en el fichero destination_file en CSV.

Prueba como fichero de entrada: /SparkR/datasets/databig/ECBDL14_10tst.data y como fichero de salida: /tmp/salida.csv

Ejecuta la aplicación utilizando spark-submit

Verifica que al terminar de ejecutar el trabajo, aparece el dataset en /tmp/salida.csv.