Taller de procesamiento de BigData en Spark + R

Manuel Parra (manuelparra@decsai.ugr.es). Soft Computing and Intelligent Information Systems . Distributed Computational Intelligence and Time Series. University of Granada. logos

Biblioteca sparklyr

Este paquete admite la conexión a clústeres con Apache Spark locales y remotos. Proporciona un backend compatible con "dplyr" y proporciona una interfaz para los algoritmos de aprendizaje de máquina incorporados en Spark.

Permite

  • Conectar a Spark desde R. El paquete sparklyr proporciona un backend dplyr .
  • Filtrar y agrupar los datasets de Spark para el análisis y visualización.
  • Utilizar la biblioteca de aprendizaje distribuida de Sparks de R.
  • Crear extensiones que llamen a la API Spark completa y proporcione interfaces con los paquetes Spark.

Además junto con la interfaz dplyr de sparklyr, puede crear y afinar fácilmente los flujos de trabajo de ML en Spark, orquestados dentro de R.

Sparklyr proporciona tres familias de funciones que puede utilizar con el aprendizaje de máquina Spark:

  • Algoritmos de aprendizaje automático para el análisis de datos (funciones ml_*)
  • Transformadores de características para manipular características individuales (funciones ft_*)
  • Funciones para manipular Spark DataFrames (funciones sdf_*)

Disponible:

https://github.com/rstudio/sparklyr

logos

El paquete sparklyr proporciona una interfaz dplyr a Spark DataFrames, así como una interfaz R para los méodos de ML de Spark.

Spark es un sistema de computación de clusters de uso general, y existen muchas otras interfaces R que podrían ser implementadas (por ejemplo, interfaces con pipelines de ML, interfaces con paquetes de Spark de terceros, etc.).

El flujo de trabajo para el análisis de datos con sparklyr podría estar compuesto de las siguientes etapas:

  • Realizar consultas SQL a través de la interfaz sparklyr dplyr,
  • Utilizar la familia de funciones sdf y ft para generar nuevas columnas o particionar su conjunto de datos,
  • Elegir un algoritmo de aprendizaje automático apropiado de la familia de funciones ml_ * para modelar los datos,
  • Inspeccionar la calidad del ajuste de su modelo y usarlo para hacer predicciones con nuevos datos,
  • Recopilar los resultados para la visualización y análisis posterior en R

Inicialización del entorno con sparklyr

Es necesario reiniciar Spark para poder trabajar con esta sesión de sparklyr
Para ello, ve a la Máquina Virtual para el proceso de Spark, para luego volver a cargarlo

In [ ]:
# Usamos la libreria sparklyr y dplyr.
# Ajustar el nivel de visualización de errores !
options(warn=0)

# Incluimos la bilbioteca de sparklyr
library(sparklyr)
# Usamos la biblioteca para el manejo de los datos.
library(dplyr)

# Abrimos la conexión. Importante indicar la versión de Spark que tenemos instalada. En nuestro caso tenemos la 2.0.1
sc <- spark_connect(master = "local", version = "2.0.1")

Características


La biblioteca sparklyr tiene asociado un paquete que hace de complemento ideal para la manipulación de datos masivos. Este paquete es dplyr un paquete en R para trabajar con datos estructurados dentro y fuera de R. dplyr hace la manipulación de datos muy sencilla para los usuarios de R, además ofrece interfaces consistentes y con un buen rendimiento. La librería tiene las siguientes funcionalidades básicas:

  • Seleccion, filtrado y agregación.
  • Funciones para muestreo.
  • Funciones de JOIN para Dataframes.
  • Funciones Collect para transformar datos de Spark a R (importante!). ...

Lectura y escritura de datos con sparklyr


Para la lectura de datos tenemos las siguientes funciones:

  • spark_read_csv: Lee un CSV y el resultado lo hace compatible con las funciones de dplyr.
  • spark_read_json: Lee un fichero JSON y el resultado es compatible con la interfaz de dplyr.
  • spark_read_parquet: Lee un fichero PARQUET.

Además del formato de los datos, Spark soporta la lectura de datos desde una variedad de fuentes de datos. Estos incluyen, almacenamiento en

  • HDFS (hdfs:// protocol),
  • Amazon S3 (s3n:// protocol), o
  • ficheros locales disponibles en en los nodos (file:// protocol).

Para la escritura de DataFrames existen las mismas funcione según el tipo de fuente de datos:

  • spark_write_csv: Escribe a CSV y recibe una fuente de datos compatible con dplyr.
  • spark_write_json: Escribe a JSON.
  • spark_write_parquet: Escribe a parquet desde cualquier fuente compatible con dplyr.
In [ ]:
# Lectura de un fichero de datos CSV

tttm <- spark_read_csv(sc, 
                       name="tttm", 
                       path="/SparkR/datasets/BNGhearth.dat", 
                       delimiter = ",", 
                       header=TRUE,
                       overwrite = TRUE)

¿Ha cargado los datos rápido?

In [ ]:
count(tttm)

La escritura de datos es sencilla y simplemente requiere la función concreta para almacenar los datos. El valor del parámetros path puede ser de diferente origen de datos:

  • HDFS (path="hdfs://....")
  • AmazonS3 (path="s3://...")
  • Local (path="...")
In [ ]:
# Escritura de un fichero de datos CSV (en local)
spark_write_csv(tttm, 
                path="/SparkR/datasets/results/BNGhearth_RESULT.csv", 
                delimiter = ",", 
                header=TRUE)

# Escritura de un fichero en S3 de Amazon 
# spark_write_csv(tttm, 
#                path="s3://mybucket/BNGhearth_RESULT.csv", 
#                delimiter = ",", 
#                header=TRUE)


# Escritura de un fichero en HDFS 
# spark_write_csv(tttm, 
#                path="hdfs://user/manuelparra/BNGhearth_RESULT.csv", 
#                delimiter = ",", 
#                header=TRUE)

Para los demás formatos, se usa la función correspondiente, teniendo en cuenta que la entrada de la función de escritura, siempre tiene que ser compatible con dplyr

In [ ]:
#spark_write_json(tttm, .....

#spark_write_parquet(tttm, .....

Filtrado, selección, agrupación.

Las funciones de manejo de datos son comandos dplyr para manipular datos

Cuando se conecta a un Spark DataFrame, dplyr traduce los comandos a las sentencias Spark SQL. Las fuentes de datos remotas utilizan exactamente los mismos cinco verbos que las fuentes de datos locales. Aquí están los cinco verbos con sus comandos SQL correspondientes:

  • select ~ SELECT
  • filter ~ WHERE
  • arrange ~ ORDER
  • summarise ~ aggregators: sum, min, sd, etc.
  • mutate ~ operators: +, *, log, etc.
In [ ]:
# Veamos la cabecera del dataset

head(tttm)
In [ ]:
# Seleccionamos las columnas que queremos con select

select(tttm, age,sex,class)
In [ ]:
# Filtro los registros cuando sex y class son 1
res_filtered<-filter(tttm, sex==1 & class==1)

# Contamos los registros.
count(res_filtered)

¿Cómo es el dataset /SparkR/datasets/BNGhearth.dat : Balanceado o no Balanceado con respecto a la variable de clase class ?

In [ ]:
res_class1 <- filter(tttm, class==1)
count(res_class1)

res_class0 <- filter(tttm, class==0)
count(res_class0)

Otro modo de saber si es o no balanceado de una forma más directa:

In [ ]:
# Agrupamos el dataset por clase y luego contamos los registros.

# En SQL sería select count(class) ...group by class

# --> Importante, uso collect(...)
num_regs <- as.integer(collect(count(tttm)))

# Mostramos el número de registros
print(num_regs)

# Agrupamos por clase y contamos el numero de elementos de cada clase, ademñas 
# añadimos una columna que calcula el porcentaje que supone cada clase del total
summarize( group_by(tttm,class), count = n(), percent= n()/num_regs *100.0)

¿Cómo es el dataset /SparkR/datasets/databig/ECBDL14_10tst.data : Balanceado o no Balanceado con respecto a la variable de clase class ?

In [ ]:
## Ejercicio 
tttm2 <- spark_read_csv(sc, 
                       name="tttm1", 
                       path="/SparkR/datasets/databig/ECBDL14_10tst.data", 
                       delimiter = ",", 
                       header=TRUE,
                       overwrite = TRUE)
In [ ]:
## Ejercicio 
num_regs <- as.integer(collect(count(tttm2)))

# Mostramos el número de registros
print(num_regs)

# Agrupamos por clase y contamos el numero de elementos de cada clase, ademñas 
# añadimos una columna que calcula el porcentaje que supone cada clase del total
summarize( group_by(tttm2,class), count = n(), percent= n()/num_regs *100.0)

Preprocesado de dataframes

En sparklyr el interfaz de dplyr utiliza la SparkSQL para realizar los metodos de procesamiento de datos como agrupación, selección, mutacion, etc.

In [ ]:
library(magrittr)

# Atención collect (...)
num_regs <- as.integer(collect(count(tttm)))

# También podemos usar magittr para hacer los mismo de un modo más claro. El ejemplo de arriba y este son lo mismo.
tttm %>% 
    group_by(class) %>%
    summarize(count = n(), percent= n()/num_regs *100.0 )

tttm %>% 
    group_by(class) %>%
    summarize(count = n(), maxf1=max(f1),minf1= min(f1))

# Imprimimos los primeros registros
head(tttm)
In [ ]:
# Añadimos una columna con el doble del valor de la columna f6
tttm <- tttm %>%
             mutate(chest2=chest*2.0)

# Selecciono la columna chest2 que hemos creado recientemente.
tttm %>% select(chest2)
In [ ]:
# Si del anterior ejemplo queremos construir un nuevo dataframe, usamos 
# select para tomar las columnas que nos interesen.
tttm_new <- tttm %>%
               select (age,sex, chest )

head(tttm_new)
In [ ]:
# La función arrange permite aplicar funciones de ordenación, ... sobre dataframes.
tttm %>%
    select (age,chest,bloodpressure ) %>%
    arrange (desc(age)) %>%
    filter (chest> 1.5)

¿Cómo sabemos cuantos objetos tenemos en el contexto de Spark?

In [ ]:
src_tbls(sc)

Partición de un SparkDataFrame

Permite la partición de un SparkDataFrame en varios grupos. Esta función es útil para dividir un DataFrame en, por ejemplo, conjuntos de datos de entrenamiento y prueba.

sdf_partition(x, ..., weights = NULL, seed = sample(.Machine$integer.max,  1))

Los pesos de muestreo definen la probabilidad de que una determinada observación se asignará a una partición en particular, no el tamaño resultante de la partición. Esto implica que particionar un DataFrame con, por ejemplo, sdf_partition (x, training = 0.5, test = 0.5)

Nota: No está garantizado para producir particiones de entrenamiento y prueba de igual tamaño.

In [ ]:
partitions <- tttm %>%
                sdf_partition(training = 0.6, test = 0.4)

print(count(partitions$test))

print(count(partitions$training))

Realizar predicciones sobre los datos

Dado un modelo ml_model junto a un nuevo conjunto de datos, producir un nuevo Spark DataFrame con valores predichos codificados en la columna "predicción".

sdf_predict(object, newdata, ...)

Lectura de una columna

Lee una sola columna de un SparkDataFrame y devuelva el contenido de esa columna en R.

sdf_read_column (x, columna)

Devuelve el contenido en un objeto manipulable por R.

Registra un SparkDataFrame

Regista un SparkDataFrame asignándole un nombre para la table en el contexto de SparkSQL.

sdf_register(x, name = NULL)

Permite usar ese DataFrame desde una consulta en SQL con Spark.

Extracción de una muestra de un SparkDataFrame

Extrae un muestreo aleatorio de filas de un SparkDataFrame

sdf_sample(x, fraction = 1, replacement = TRUE, seed = NULL)
In [ ]:
# Indico la fracción de la muestra y la semilla para la extracción.

sdf_sample(tttm,fraction=0.2,seed=98765)

Ordenar un SparkDataFrame

Ordena por una o varias columnas, con cada columna ordenada en orden ascendente.

sdf_sort(x, columns)
In [ ]:
sdf_sort(tttm, c("age"))

Preparación del dataset para Machine Learning

Al tener un conjunto de datos grande y desbalanceado, podemos tomar varias alternativas para trabajar con el mismo:

  • Hacer un sobremuestreo de la clase minoritaria
  • Hacer un submuestreo de la clase mayoritaria

Vamos a trabajar con los algoritmos de ML utilizando submuestreo Random UnderSampling sencillo, para ello:

  • Calculamos el número de instacias de la clase minoritaria.
  • Hacemos un muestreo sólo de la clase mayoritaria para igualar en instancias la clase minoritaria.
  • Fusionamos ambos muestreos
In [ ]:
# Contamos los registros de la clase minoritaria
regs_minor <- tttm2 %>% 
            filter(class==1) %>% 
            count %>%
            collect %>% 
            as.integer


# Extraemos un sample con un numero similar de instancias que de la clase minoritaria
only_class_0 <- tttm2 %>% 
        filter(class==0) %>%
        sdf_sample(regs_minor, fraction=as.double(regs_minor/as.integer(collect(count(tttm)))))

# Extraemos las instancias de la clase minoritaria 
only_class_1 <- tttm2 %>% 
            filter(class==1)

Contamos los registros de ambos

In [ ]:
count(only_class_0)
# as.integer(collect(count(only_class_0)))
# only_class_0 %>% count %>% collect %>% as.integer
count(only_class_1)
# as.integer(collect(count(only_class_1)))
# only_class_1 %>% count %>% collect %>% as.integer

Unimos los dos dataframes con rbind

In [ ]:
# Unimos 
ds_ml <- rbind(only_class_1,only_class_0, name="ds_ml")

# Calculamos el tamaño de cada clase.
ds_ml %>% 
        filter(class==0) %>%
        count()

ds_ml %>% 
        filter(class==1) %>%
        count()

Para el particionado usamos una función de sparklyr: sdf_partition

In [ ]:
# La función sdf_partition devuelve el dataframe separado en training y test.
# Para acceder a cada dataframe usamos ...$training , ...$test
partitions <- sdf_partition(ds_ml,training=0.80,test=0.20)

Contamos el número de registros de cada conjunto:

In [ ]:
count(partitions$test)
count(partitions$training)

Usando SPARKSQL para el tratamiento de datos

In [ ]:
library(DBI)
In [ ]:
resultssql <- dbGetQuery(sc,"select sex as sexo, age as edad from tttm LIMIT 10 ")
print(resultssql)
In [ ]:
resultssql <- dbGetQuery(sc,"select count(class) as clases1 from tttm where class=1")
print(resultssql)
In [ ]:
resultssql <- dbGetQuery(sc,"select sex, count(class) as clases1 from tttm group by sex")
print(resultssql)
In [ ]:
resultssql <- dbGetQuery(sc,"select age from tttm where bloodpressure>=130 LIMIT 10")
print(resultssql)
In [ ]:
resultssql <- dbGetQuery(sc,"select age from tttm where bloodpressure>=130 order by age desc LIMIT 10")
print(resultssql)