Sunday, September 15, 2019

Apache Spark GraphX with Scala and Neo4j (I)

    Basado en el trabajo de Carol McDonaldHow to Get Started Using Apache Spark GraphX with Scala he construido una base de datos Neo4j con los datos aportados en el fichero CSV adjunto en la página en el apartado Software.


GraphX Property Graph


GraphX extiende Spark RDD con un Resilient Distributed Property Graph.

El Property Graph es un multigrafo dirigido el cual puede tener multiples aristas en paralelo. Cada vértice y arista tiene asociados propiedades definidas por el usuario. Las aristas en paralelo permiten múltiples relaciones entre los mismos vértices.


Nosotros usaremos GraphX para analizar los datos de los vuelos.

Análisis de Datos de Vuelo Reales con GraphX


El escenario


Nuestros datos pertenecen a https://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time La informacion de los vuelos es de Enero de 2015. Para cada vuelo tenemos la siguiente información:

Field Description Example Value
dOfM(String) Day of month 1
dOfW (String) Day of week 4
carrier (String) Carrier code AA
tailNum (String) Unique identifier for the plane - tail number N787AA
flnum(Int) Flight number 21
org_id(String) Origin airport ID 12478
origin(String) Origin Airport Code JFK
dest_id (String) Destination airport ID 12892
dest (String) Destination airport code LAX
crsdeptime(Double) Scheduled departure time 900
deptime (Double) Actual departure time 855
depdelaymins (Double) Departure delay in minutes 0
crsarrtime (Double) Scheduled arrival time 1230
arrtime (Double) Actual arrival time 1237
arrdelaymins (Double) Arrival delay minutes 7
crselapsedtime (Double) Elapsed time 390
dist (Int) Distance 2475

Comenzamos por analizar tres vuelos. Por cada uno disponemos de la siguiente información:

Originating Airport Destination Airport Distance
SFO ORD 1800 miles
ORD DFW> 800 miles
DFW SFO> 1400 miles

En éste escenario se representará los aeropuertos como vértices y las rutas como aristas. En nuestro grafo tendremos tres vértices, cada uno representando un aeropuerto.. La distancia entre dos aeropuertos es una propiedad de la ruta, como se muestra más abajo:


Tabla de Vértices para Aeropuertos


Definimos los aeropuertos como vértices. Los vértices tienen un Id y pueden tener asociados propiedades o atributos. Cada vértice consiste en:
  • Id del vértice → Id
  • Propiedad del vértice → nombre

ID Property
1 SFO
2 ORD
3 DFW


Tabla de Aristas para las Rutas


Las aristas son las rutas entre dos aeropuertos. Una arista ha de tener una fuente, un destino, y puede tener propiedades. En nuestro ejemplo, una arista consiste en:
  • Id del Origen de la Arista → src
  • Id del Destino de la Arista → dest
  • Propiedad distancia de la Arista → distance

SrcId DestId Property
1 2 1800
2 3 800
3 1 1400


Software


Este tutorial se puede ejecutar sobre el MapR Sandbox incluido en Spark.
El código en Scala está actualizado y listado hasta el apartado PageRank inclusive. Inicialmente definimos el Esquema Flight como sigue,

case class Flight(dofM: String, dofW: String, carrier: String, tailnum: String,
                  flnum: Int, org_id: Long, origin: String, dest_id: Long, dest: String,
                  crsdeptime: Double, deptime: Double, depdelaymins: Double, crsarrtime: Double,
                  arrtime: Double, arrdelay: Double, crselapsedtime: Double, dist: Int)


Y seguidamente creamos la aplicación. En la primera parte desarrollamos el código que nos va a permitir leer el fichero CSV. Para ello la función definida abajo parsea cada linea del fichero CSV en la clase Flight,

def parseFlight(line: Array[String]): Flight = {
    Flight(line(0), line(1), line(2), line(3), line(4).toInt, line(5).toLong,
    line(6), line(7).toLong, line(8), line(9).toDouble, line(10).toDouble,
    line(11).toDouble, line(12).toDouble, line(13).toDouble, line(14).toDouble, line(15).toDouble, line(16).toInt)
  }


Cada linea en el fichero de datos corresponderá a un objeto Flight diferente

// each row is an array of strings (the columns in the csv file)
  val rows = ArrayBuffer[Array[String]]()
  val temp = ArrayBuffer[Array[String]]()

  val bufferedSource = scala.io.Source.fromFile("rita2014jan.csv")
  var textRDDPage : RDD[Array[String]] = null
  var endProgram = false

  // val cont = 0
   breakable {
    for (line <- bufferedSource.getLines) {
      var cols = line.split(",").map(_.trim)
      var row = ArrayBuffer[String]()
      for (col <- cols if !col.isEmpty) {
        row += col
      }

      if(row.size < 17)
        break

      rows += row.toArray
    }
  }
 
Vamos leyendo las líneas del fichero de datos una a una comprobando que no leemos una línea vacía y que está formada por 17 columnas. Cada una de las cuales será añadida al ArrayBuffer[String] 'row'. Finalmente cada fila es añadida al conjunto de filas definido como un ArrayBuffer[Array[String]]. La lectura se interrumpe con la primera línea vacía. Utilizaremos un RDD para los vuelos y otro para los aeropuertos obteniendo estos últimos a partir de los vuelos.

  var flightsRDD : RDD[Flight] = null
  var airports : RDD[(Long, String)] = null
  if (rows.size > 0) {
    textRDDPage = sc.parallelize(rows)
    println(textRDDPage.count() + " rows gotten")
    rows.clear()

    //Create RDD with the January 2014 data
    flightsRDD = textRDDPage.map(parseFlight).cache()
    airports = flightsRDD.map(flight => (flight.org_id, flight.origin)).distinct

    val items = airports.collectAsMap()
    val keys = items.keySet
    if (keys.size > 0) {
      println("Hello World, " + keys.size + " keys")
      for (key <- keys) {
        val airportId = key
        val airportName = items.get(key)
        println("id: " + airportId +
          ", name:" + airportName)
      }
    }
  }

Y finalmente verificamos listando los aeropuertos.