GraphX Property Graph
GraphX extiende Spark RDD con un Resilient Distributed Property Graph.
El Property Graph es un multigrafo dirigido el cual puede tener multiples aristas en paralelo.
Cada vértice y arista tiene asociados propiedades definidas por el usuario.
Las aristas en paralelo permiten múltiples relaciones entre los mismos vértices.
Nosotros usaremos GraphX para analizar los datos de los vuelos.
Análisis de Datos de Vuelo Reales con GraphX
El escenario
Nuestros datos pertenecen a https://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time La informacion de los vuelos es de Enero de 2015. Para cada vuelo tenemos la siguiente información:
Field | Description | Example Value |
dOfM(String) | Day of month | 1 |
dOfW (String) | Day of week | 4 |
carrier (String) | Carrier code | AA |
tailNum (String) | Unique identifier for the plane - tail number | N787AA |
flnum(Int) | Flight number | 21 |
org_id(String) | Origin airport ID | 12478 |
origin(String) | Origin Airport Code | JFK |
dest_id (String) | Destination airport ID | 12892 |
dest (String) | Destination airport code | LAX |
crsdeptime(Double) | Scheduled departure time | 900 |
deptime (Double) | Actual departure time | 855 |
depdelaymins (Double) | Departure delay in minutes | 0 |
crsarrtime (Double) | Scheduled arrival time | 1230 |
arrtime (Double) | Actual arrival time | 1237 |
arrdelaymins (Double) | Arrival delay minutes | 7 |
crselapsedtime (Double) | Elapsed time | 390 |
dist (Int) | Distance | 2475 |
Comenzamos por analizar tres vuelos. Por cada uno disponemos de la siguiente información:
Originating Airport | Destination Airport | Distance |
SFO | ORD | 1800 miles |
ORD | DFW> | 800 miles |
DFW | SFO> | 1400 miles |
En éste escenario se representará los aeropuertos como vértices y las rutas como aristas. En nuestro grafo tendremos tres vértices, cada uno representando un aeropuerto.. La distancia entre dos aeropuertos es una propiedad de la ruta, como se muestra más abajo:
Tabla de Vértices para Aeropuertos
Definimos los aeropuertos como vértices. Los vértices tienen un Id y pueden tener asociados propiedades o atributos. Cada vértice consiste en:
- Id del vértice → Id
- Propiedad del vértice → nombre
ID | Property |
1 | SFO |
2 | ORD |
3 | DFW |
Tabla de Aristas para las Rutas
Las aristas son las rutas entre dos aeropuertos. Una arista ha de tener una fuente, un destino, y puede tener propiedades. En nuestro ejemplo, una arista consiste en:
- Id del Origen de la Arista → src
- Id del Destino de la Arista → dest
- Propiedad distancia de la Arista → distance
SrcId | DestId | Property |
1 | 2 | 1800 |
2 | 3 | 800 |
3 | 1 | 1400 |
Software
Este tutorial se puede ejecutar sobre el MapR Sandbox incluido en Spark.
- Puedes descargar el código y los datos para ejecutar este ejemplo desde aquí:
case class Flight(dofM: String, dofW: String, carrier: String, tailnum: String, flnum: Int, org_id: Long, origin: String, dest_id: Long, dest: String, crsdeptime: Double, deptime: Double, depdelaymins: Double, crsarrtime: Double, arrtime: Double, arrdelay: Double, crselapsedtime: Double, dist: Int)
Y seguidamente creamos la aplicación. En la primera parte desarrollamos el código que nos va a permitir leer el fichero CSV. Para ello la función definida abajo parsea cada linea del fichero CSV en la clase Flight,
def parseFlight(line: Array[String]): Flight = { Flight(line(0), line(1), line(2), line(3), line(4).toInt, line(5).toLong, line(6), line(7).toLong, line(8), line(9).toDouble, line(10).toDouble, line(11).toDouble, line(12).toDouble, line(13).toDouble, line(14).toDouble, line(15).toDouble, line(16).toInt) }
Cada linea en el fichero de datos corresponderá a un objeto Flight diferente
// each row is an array of strings (the columns in the csv file) val rows = ArrayBuffer[Array[String]]() val temp = ArrayBuffer[Array[String]]() val bufferedSource = scala.io.Source.fromFile("rita2014jan.csv") var textRDDPage : RDD[Array[String]] = null var endProgram = false // val cont = 0 breakable { for (line <- bufferedSource.getLines) { var cols = line.split(",").map(_.trim) var row = ArrayBuffer[String]() for (col <- cols if !col.isEmpty) { row += col } if(row.size < 17) break rows += row.toArray } }Vamos leyendo las líneas del fichero de datos una a una comprobando que no leemos una línea vacía y que está formada por 17 columnas. Cada una de las cuales será añadida al ArrayBuffer[String] 'row'. Finalmente cada fila es añadida al conjunto de filas definido como un ArrayBuffer[Array[String]]. La lectura se interrumpe con la primera línea vacía. Utilizaremos un RDD para los vuelos y otro para los aeropuertos obteniendo estos últimos a partir de los vuelos.
var flightsRDD : RDD[Flight] = null var airports : RDD[(Long, String)] = null if (rows.size > 0) { textRDDPage = sc.parallelize(rows) println(textRDDPage.count() + " rows gotten") rows.clear() //Create RDD with the January 2014 data flightsRDD = textRDDPage.map(parseFlight).cache() airports = flightsRDD.map(flight => (flight.org_id, flight.origin)).distinct val items = airports.collectAsMap() val keys = items.keySet if (keys.size > 0) { println("Hello World, " + keys.size + " keys") for (key <- keys) { val airportId = key val airportName = items.get(key) println("id: " + airportId + ", name:" + airportName) } } }
Y finalmente verificamos listando los aeropuertos.