Manuel Parra (manuelparra@decsai.ugr.es). Soft Computing and Intelligent Information Systems
. Distributed Computational Intelligence and Time Series. University of Granada.

sparklyr¶Este paquete admite la conexión a clústeres con Apache Spark locales y remotos. Proporciona un backend compatible con "dplyr" y proporciona una interfaz para los algoritmos de aprendizaje de máquina incorporados en Spark.
Permite
Además junto con la interfaz dplyr de sparklyr, puede crear y afinar fácilmente los flujos de trabajo de ML en Spark, orquestados dentro de R.
Sparklyr proporciona tres familias de funciones que puede utilizar con el aprendizaje de máquina Spark:
ml_*)ft_*)sdf_*)Disponible:
https://github.com/rstudio/sparklyr

El paquete sparklyr proporciona una interfaz dplyr a Spark DataFrames, así como una interfaz R para los méodos de ML de Spark.
Spark es un sistema de computación de clusters de uso general, y existen muchas otras interfaces R que podrían ser implementadas (por ejemplo, interfaces con pipelines de ML, interfaces con paquetes de Spark de terceros, etc.).
El flujo de trabajo para el análisis de datos con sparklyr podría estar compuesto de las siguientes etapas:
sparklyr¶Es necesario reiniciar Spark para poder trabajar con esta sesión de sparklyr
Para ello, ve a la Máquina Virtual para el proceso de Spark, para luego volver a cargarlo
# Usamos la libreria sparklyr y dplyr.
# Ajustar el nivel de visualización de errores !
options(warn=0)
# Incluimos la bilbioteca de sparklyr
library(sparklyr)
# Usamos la biblioteca para el manejo de los datos.
library(dplyr)
# Abrimos la conexión. Importante indicar la versión de Spark que tenemos instalada. En nuestro caso tenemos la 2.0.1
sc <- spark_connect(master = "local", version = "2.0.1")
Para la lectura de datos tenemos las siguientes funciones:
Además del formato de los datos, Spark soporta la lectura de datos desde una variedad de fuentes de datos. Estos incluyen, almacenamiento en
Para la escritura de DataFrames existen las mismas funcione según el tipo de fuente de datos:
# Lectura de un fichero de datos CSV
tttm <- spark_read_csv(sc,
name="tttm",
path="/SparkR/datasets/BNGhearth.dat",
delimiter = ",",
header=TRUE,
overwrite = TRUE)
¿Ha cargado los datos rápido?
count(tttm)
La escritura de datos es sencilla y simplemente requiere la función concreta para almacenar los datos. El valor del parámetros path puede ser de diferente origen de datos:
path="hdfs://....")path="s3://...")path="...")# Escritura de un fichero de datos CSV (en local)
spark_write_csv(tttm,
path="/SparkR/datasets/results/BNGhearth_RESULT.csv",
delimiter = ",",
header=TRUE)
# Escritura de un fichero en S3 de Amazon
# spark_write_csv(tttm,
# path="s3://mybucket/BNGhearth_RESULT.csv",
# delimiter = ",",
# header=TRUE)
# Escritura de un fichero en HDFS
# spark_write_csv(tttm,
# path="hdfs://user/manuelparra/BNGhearth_RESULT.csv",
# delimiter = ",",
# header=TRUE)
Para los demás formatos, se usa la función correspondiente, teniendo en cuenta que la entrada de la función de escritura, siempre tiene que ser compatible con dplyr
#spark_write_json(tttm, .....
#spark_write_parquet(tttm, .....
Las funciones de manejo de datos son comandos dplyr para manipular datos
Cuando se conecta a un Spark DataFrame, dplyr traduce los comandos a las sentencias Spark SQL. Las fuentes de datos remotas utilizan exactamente los mismos cinco verbos que las fuentes de datos locales. Aquí están los cinco verbos con sus comandos SQL correspondientes:
# Veamos la cabecera del dataset
head(tttm)
# Seleccionamos las columnas que queremos con select
select(tttm, age,sex,class)
# Filtro los registros cuando sex y class son 1
res_filtered<-filter(tttm, sex==1 & class==1)
# Contamos los registros.
count(res_filtered)
/SparkR/datasets/BNGhearth.dat : Balanceado o no Balanceado con respecto a la variable de clase class ?¶res_class1 <- filter(tttm, class==1)
count(res_class1)
res_class0 <- filter(tttm, class==0)
count(res_class0)
Otro modo de saber si es o no balanceado de una forma más directa:
# Agrupamos el dataset por clase y luego contamos los registros.
# En SQL sería select count(class) ...group by class
# --> Importante, uso collect(...)
num_regs <- as.integer(collect(count(tttm)))
# Mostramos el número de registros
print(num_regs)
# Agrupamos por clase y contamos el numero de elementos de cada clase, ademñas
# añadimos una columna que calcula el porcentaje que supone cada clase del total
summarize( group_by(tttm,class), count = n(), percent= n()/num_regs *100.0)
/SparkR/datasets/databig/ECBDL14_10tst.data : Balanceado o no Balanceado con respecto a la variable de clase class ?¶## Ejercicio
tttm2 <- spark_read_csv(sc,
name="tttm1",
path="/SparkR/datasets/databig/ECBDL14_10tst.data",
delimiter = ",",
header=TRUE,
overwrite = TRUE)
## Ejercicio
num_regs <- as.integer(collect(count(tttm2)))
# Mostramos el número de registros
print(num_regs)
# Agrupamos por clase y contamos el numero de elementos de cada clase, ademñas
# añadimos una columna que calcula el porcentaje que supone cada clase del total
summarize( group_by(tttm2,class), count = n(), percent= n()/num_regs *100.0)
En sparklyr el interfaz de dplyr utiliza la SparkSQL para realizar los metodos de procesamiento de datos como agrupación, selección, mutacion, etc.
library(magrittr)
# Atención collect (...)
num_regs <- as.integer(collect(count(tttm)))
# También podemos usar magittr para hacer los mismo de un modo más claro. El ejemplo de arriba y este son lo mismo.
tttm %>%
group_by(class) %>%
summarize(count = n(), percent= n()/num_regs *100.0 )
tttm %>%
group_by(class) %>%
summarize(count = n(), maxf1=max(f1),minf1= min(f1))
# Imprimimos los primeros registros
head(tttm)
# Añadimos una columna con el doble del valor de la columna f6
tttm <- tttm %>%
mutate(chest2=chest*2.0)
# Selecciono la columna chest2 que hemos creado recientemente.
tttm %>% select(chest2)
# Si del anterior ejemplo queremos construir un nuevo dataframe, usamos
# select para tomar las columnas que nos interesen.
tttm_new <- tttm %>%
select (age,sex, chest )
head(tttm_new)
# La función arrange permite aplicar funciones de ordenación, ... sobre dataframes.
tttm %>%
select (age,chest,bloodpressure ) %>%
arrange (desc(age)) %>%
filter (chest> 1.5)
src_tbls(sc)
Permite la partición de un SparkDataFrame en varios grupos. Esta función es útil para dividir un DataFrame en, por ejemplo, conjuntos de datos de entrenamiento y prueba.
sdf_partition(x, ..., weights = NULL, seed = sample(.Machine$integer.max, 1))
Los pesos de muestreo definen la probabilidad de que una determinada observación se asignará a una partición en particular, no el tamaño resultante de la partición. Esto implica que particionar un DataFrame con, por ejemplo, sdf_partition (x, training = 0.5, test = 0.5)
Nota: No está garantizado para producir particiones de entrenamiento y prueba de igual tamaño.
partitions <- tttm %>%
sdf_partition(training = 0.6, test = 0.4)
print(count(partitions$test))
print(count(partitions$training))
Dado un modelo ml_model junto a un nuevo conjunto de datos, producir un nuevo Spark DataFrame con valores predichos codificados en la columna "predicción".
sdf_predict(object, newdata, ...)
Lee una sola columna de un SparkDataFrame y devuelva el contenido de esa columna en R.
sdf_read_column (x, columna)
Devuelve el contenido en un objeto manipulable por R.
Regista un SparkDataFrame asignándole un nombre para la table en el contexto de SparkSQL.
sdf_register(x, name = NULL)
Permite usar ese DataFrame desde una consulta en SQL con Spark.
Extrae un muestreo aleatorio de filas de un SparkDataFrame
sdf_sample(x, fraction = 1, replacement = TRUE, seed = NULL)
# Indico la fracción de la muestra y la semilla para la extracción.
sdf_sample(tttm,fraction=0.2,seed=98765)
Ordena por una o varias columnas, con cada columna ordenada en orden ascendente.
sdf_sort(x, columns)
sdf_sort(tttm, c("age"))
Al tener un conjunto de datos grande y desbalanceado, podemos tomar varias alternativas para trabajar con el mismo:
Vamos a trabajar con los algoritmos de ML utilizando submuestreo Random UnderSampling sencillo, para ello:
# Contamos los registros de la clase minoritaria
regs_minor <- tttm2 %>%
filter(class==1) %>%
count %>%
collect %>%
as.integer
# Extraemos un sample con un numero similar de instancias que de la clase minoritaria
only_class_0 <- tttm2 %>%
filter(class==0) %>%
sdf_sample(regs_minor, fraction=as.double(regs_minor/as.integer(collect(count(tttm)))))
# Extraemos las instancias de la clase minoritaria
only_class_1 <- tttm2 %>%
filter(class==1)
Contamos los registros de ambos
count(only_class_0)
# as.integer(collect(count(only_class_0)))
# only_class_0 %>% count %>% collect %>% as.integer
count(only_class_1)
# as.integer(collect(count(only_class_1)))
# only_class_1 %>% count %>% collect %>% as.integer
Unimos los dos dataframes con rbind
# Unimos
ds_ml <- rbind(only_class_1,only_class_0, name="ds_ml")
# Calculamos el tamaño de cada clase.
ds_ml %>%
filter(class==0) %>%
count()
ds_ml %>%
filter(class==1) %>%
count()
Para el particionado usamos una función de sparklyr: sdf_partition
# La función sdf_partition devuelve el dataframe separado en training y test.
# Para acceder a cada dataframe usamos ...$training , ...$test
partitions <- sdf_partition(ds_ml,training=0.80,test=0.20)
Contamos el número de registros de cada conjunto:
count(partitions$test)
count(partitions$training)
library(DBI)
resultssql <- dbGetQuery(sc,"select sex as sexo, age as edad from tttm LIMIT 10 ")
print(resultssql)
resultssql <- dbGetQuery(sc,"select count(class) as clases1 from tttm where class=1")
print(resultssql)
resultssql <- dbGetQuery(sc,"select sex, count(class) as clases1 from tttm group by sex")
print(resultssql)
resultssql <- dbGetQuery(sc,"select age from tttm where bloodpressure>=130 LIMIT 10")
print(resultssql)
resultssql <- dbGetQuery(sc,"select age from tttm where bloodpressure>=130 order by age desc LIMIT 10")
print(resultssql)