Taller de procesamiento de BigData en Spark + R

Manuel Parra (manuelparra@decsai.ugr.es). Soft Computing and Intelligent Information Systems . Distributed Computational Intelligence and Time Series. University of Granada. logos

Procesando datos con SparkSQL

SQL (por sus siglas en inglés Structured Query Language; en español lenguaje de consulta estructurada) es un lenguaje específico del dominio que da acceso a un sistema de gestión de bases de datos relacionales que permite especificar diversos tipos de operaciones en ellos. Una de sus características es el manejo del álgebra y el cálculo relacional que permiten efectuar consultas con el fin de recuperar, de forma sencilla, información de bases de datos, así como hacer cambios en ellas. (+info: https://es.wikipedia.org/wiki/SQL)

logos

Forma básica:

SELECT [ALL | DISTINCT ]
             <nombre_campo> 
            FROM <nombre_tabla>                
            [WHERE <condición> [{ AND|OR <condición>}]]
            [GROUP BY <nombre_campo> [{,<nombre_campo >}]]
            [HAVING <condición>[{ AND|OR <condición>}]]
            [ORDER BY <nombre_campo>|<indice_campo> [ASC | DESC]
                [{,<nombre_campo>|<indice_campo> [ASC | DESC ]}]]

Como siempre para todos nuestros scripts con SparkR, cargamos la biblioteca, y creamos una nueva sesión de SparkR.

En este caso:

    Cuidado con la cantidad de MEMORIA que usamos para esta sección !    

In [ ]:
#Fijamos la ruta donde está instalado Spark
Sys.setenv("SPARK_HOME"='/usr/local/spark/')

.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R/lib/"),.libPaths()))
library(SparkR)
sparkR.session(appName="EntornoInicio", master = "local[*]", sparkConfig = list(spark.driver.memory = "1g"))

Los SparkDataFrames y SparkSQL soportan un alto número de funciones para hacer un procesado de datos estructurado.

Vamos a poner en práctica las más utilizadas. La lista completa de operaciones que se pueden aplicar se puede ver desde API de SparkR en https://spark.apache.org/docs/latest/api/R/index.html

funcSparkR

Operando con SparkSQL sobre cojuntos masivos de datos.

Una utilidad importante de Spark SQL es ejecutar consultas SQL.

Spark SQL también se puede utilizar para leer datos de una instalación de HIVE existente. Al ejecutar SQL desde otro lenguaje de programación, los resultados se devolverán como Dataset / DataFrame. También puede interactuar con la interfaz SQL utilizando la línea de comandos o sobre JDBC / ODBC.

Todas las funciones de manejo de datos que se han usado con SparkR, pueden hacerse de una forma sencilla e intuitiva con SparkSQL

In [ ]:
df_nyctrips <- read.df("/SparkR/datasets/yellow_tripdata_2016-02_small1.csv", "csv", header = "true", inferSchema = "true")

¿Cómo se crea un vista de un SparkDataFrame?

In [ ]:
# df_nyctrips es nuestro DataFrameSpark de SQL y el nombre que le pondremos a la 'vista' es slqdf_filtered_nyc 
# y será usado para trabajar desde SPARKSQL.
createOrReplaceTempView(df_nyctrips,"slqdf_filtered_nyc")

Para aplicar una consulta a una vista de un SparkDataFrame usamos la función SQL sql e indicamos como nombres de las tablas, las vistas que hemos creado de SparkDataFrames disponibles.

In [ ]:
# Hacemos una consulta para los 3 primeros registros del dataset.
results <- sql("select * from slqdf_filtered_nyc LIMIT 3 ")
In [ ]:
# Vemos el resultado.
head(results)

Buscamos el total de kilómetros recorridos por cada vendedor:

In [ ]:
results <- sql("select VendorID, SUM(trip_distance) from slqdf_filtered_nyc GROUP BY VendorID ")

# Vemos el resultado
head(results)
In [ ]:
results <- sql("select VendorID, SUM(trip_distance) as Dist from slqdf_filtered_nyc GROUP BY VendorID ")

# Vemos el resultado
head(results)


Ejercicio práctico:

Crea una SparkDataDrame que se llame sql_nyc y que tenga una nueva columna que calcule el tiempo de cada viaje en segundos; llama a esa nueva columna trip_time y contenga los campos: VendorID, passenger_count,trip_distance,``total_amount```

Pista: INT(unix_timestamp(tpep_dropoff_datetime)- unix_timestamp(tpep_pickup_datetime))

Necesitamos ese SparkDataFrame para poder seguir con los ejemplos siguientes


Calculamos el tiempo en segundos consumido en los viajes de cada Vendedor.

In [ ]:
results <- sql("select VendorID, SUM(trip_time) from sql_nyc GROUP BY VendorID ")

# Vemos los resultados
head(results)

Calculamos el tiempo en minutos

In [ ]:
results <- sql("select VendorID, SUM(trip_time)/60.0 as min_trip from sql_nyc GROUP BY VendorID ")

# Vemos los resultados
head(results)

Buscamos la ganacia total cada vendedor:

In [ ]:
results <- sql("select VendorID, SUM(total_amount)*1.10373 as Total_Amount_Euro from slqdf_filtered_nyc GROUP BY VendorID ")

# Vemos el resultado
head(results)

Calculamos la media y la desviación típica del tiempo de recorrido y ganancia por numero de personas:

In [ ]:
results <- sql("select passenger_count, AVG(trip_time), AVG(total_amount) ,AVG(trip_distance)   
                from sql_nyc 
                GROUP BY passenger_count 
                order by passenger_count ASC ")
head(results)

Coeficiente de correlación

In [ ]:
results <- sql("select corr(total_amount,trip_distance) as correlation_coef
                from 
                slqdf_filtered_nyc")
# Ver resultados
head(results)
In [ ]:
results <- sql("select corr(total_amount,trip_time) as correlation_coef
                from 
                sql_nyc")
head(results)


Pregunta:

¿Existe alguna correlación entre tiempo de viaje y distancia de viaje?


In [ ]:
results <- sql("select corr(trip_time,trip_distance) as correlation_coef
                from 
                sql_nyc")
head(results)


Ejercicio práctico:

¿ Qué deducimos de estos coeficiente de corelación ?


Un ejemplo más completo:

In [ ]:
# Calculamos el número de viajes por hora del día y los dibujamos:
results <- sql("select hour(tpep_pickup_datetime)  as hourpick ,count(*) as numtrips from slqdf_filtered_nyc group by  hour(tpep_pickup_datetime) order by hourpick ASC")

dframeR <- collect(results)

library(ggplot2)
ggplot(data=dframeR, aes(x=hourpick, y=numtrips)) +
    geom_line() + scale_x_continuous(  breaks = c(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23))+ 
    geom_point()


Ejercicio práctico:

Compara la media del tiempo de viaje por hora del día e imprime el gráfico resultante.


Ahora utilizamos el conjunto de datos de BNGHeart para realizar un análisis exploratorio.

In [ ]:
heart_df <- read.df("/SparkR/datasets/BNGhearth.dat", "csv", header = "true", inferSchema = "true")

printSchema(heart_df)

head(heart_df)

count(heart_df)
In [ ]:
# Calculamos por sexos la media de bloodpressure, choresterol y heartrate
createOrReplaceTempView(heart_df,"heart")

res_heart <- sql("SELECT sex, avg(bloodpressure),avg(choresterol),avg(heartrate) from heart group by sex")

head(res_heart)


Ejercicio práctico:

¿Existe alguna correlación entre age y heartrate?



Ejercicio práctico:

¿Cuántos individuos hay por clase y por sexo?