Friday, December 12, 2014

Spark practice (2): query text using SQL

In a class of a few children, use SQL to find those who are male and weight over 100.
class.txt (including Name Sex Age Height Weight)
Alfred M 14 69.0 112.5 
Alice F 13 56.5 84.0
Barbara F 13 65.3 98.0
Carol F 14 62.8 102.5
Henry M 14 63.5 102.5
James M 12 57.3 83.0
Jane F 12 59.8 84.5
Janet F 15 62.5 112.5
Jeffrey M 13 62.5 84.0
John M 12 59.0 99.5
Joyce F 11 51.3 50.5
Judy F 14 64.3 90.0
Louise F 12 56.3 77.0
Mary F 15 66.5 112.0
Philip M 16 72.0 150.0
Robert M 12 64.8 128.0
Ronald M 15 67.0 133.0
Thomas M 11 57.5 85.0
William M 15 66.5 112.0

Thoughts

The challenge is to transform unstructured data to structured data. In this question, a schema has to be applied including column name and type, so that the syntax of SQL is able to query the pure text.

Single machine solution

Straight-forward and simple if with Python’s built-in module sqlite3.
import sqlite3

conn = sqlite3.connect(':memory:')
c = conn.cursor()
c.execute("""CREATE TABLE class
(Name text, Sex text, Age real, Height real, Weight real)"""
)

with open('class.txt') as infile:
for l in infile:
line = l.split()
c.execute('INSERT INTO class VALUES (?,?,?,?,?)', line)
conn.commit()

for x in c.execute("SELECT * FROM class WHERE Sex = 'M' AND Weight > 100"):
print x
conn.close()

Cluster solution

Spark SQL is built on Hive, and seamlessly queries the JSON formatted data that is semi-structured. To dump the JSON file on the file system will be the first step.
import os
import subprocess
import json
from pyspark import SparkContext
from pyspark.sql import HiveContext
sc = SparkContext()
hiveCtx = HiveContext(sc)
def trans(x):
return {'Name': x[0], 'Sex': x[1], 'Age': int(x[2]), \
'Height': float(x[3]), 'Weight': float(x[4])}
# Remove the output directory for JSON if it exists
if 'class-output' in os.listdir('.'):
subprocess.call(['rm', '-rf', 'class-output'])

rdd = sc.textFile('class.txt')
rdd1 = rdd.map(lambda x: x.split()).map(lambda x: trans(x))
rdd1.map(lambda x: json.dumps(x)).saveAsTextFile('class-output')

infile = hiveCtx.jsonFile("class-output/part-00000")
infile.registerTempTable("class")

query = hiveCtx.sql("""SELECT * FROM class WHERE Sex = 'M' AND Weight > 100
"""
)
for x in query.collect():
print x

sc.stop()
 In a conclusion, JSON should be considered if SQL is desired on Spark.

No comments:

Post a Comment