Tuesday, December 23, 2014

Spark practice (3): clean and sort Social Security numbers

Sample.txt
Requirements:
1. separate valid SSN and invalid SSN
2. count the number of valid SSN
402-94-7709 
283-90-3049
124-01-2425
1231232
088-57-9593
905-60-3585
44-82-8341
257581087
327-84-0220
402-94-7709

Thoughts

SSN indexed data is commonly seen and stored in many file systems. The trick to accelerate the speed on Spark is to build a numerical key and use the sortByKey operator. Besides, the accumulator provides a global variable existing across machines in a cluster, which is especially useful for counting data.

Single machine solution

#!/usr/bin/env python
# coding=utf-8
htable = {}
valid_cnt = 0
with open('sample.txt', 'rb') as infile, open('sample_bad.txt', 'wb') as outfile:
for l in infile:
l = l.strip()
nums = l.split('-')
key = -1
if l.isdigit() and len(l) == 9:
key = int(l)
if len(nums) == 3 and map(len, nums) == [3, 2, 4]:
key = 1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2])
if key == -1:
outfile.write(l + '\n')
else:
if key not in htable:
htable[key] = l
valid_cnt += 1

with open('sample_sorted.txt', 'wb') as outfile:
for x in sorted(htable):
outfile.write(htable[x] + '\n')

print valid_cnt

Cluster solution

#!/usr/bin/env python
# coding=utf-8
import pyspark
sc = pyspark.SparkContext()
valid_cnt = sc.accumulator(0)

def is_validSSN(l):
l = l.strip()
nums = l.split('-')
cdn1 = (l.isdigit() and len(l) == 9)
cdn2 = (len(nums) == 3 and map(len, nums) == [3, 2, 4])
if cdn1 or cdn2:
return True
return False

def set_key(l):
global valid_cnt
valid_cnt += 1
l = l.strip()
if len(l) == 9:
return (int(l), l)
nums = l.split('-')
return (1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2]), l)

rdd = sc.textFile('sample.txt')
rdd1 = rdd.filter(lambda x: not is_validSSN(x))

rdd2 = rdd.filter(is_validSSN).distinct() \
.map(lambda x: set_key(x)) \
.sortByKey().map(lambda x: x[1])

for x in rdd1.collect():
print 'Invalid SSN\t', x

for x in rdd2.collect():
print 'valid SSN\t', x

print '\nNumber of valid SSN is {}'.format(valid_cnt)

# Save RDD to file system
rdd1.saveAsTextFile('sample_bad')
rdd2.saveAsTextFile('sample_sorted')
sc.stop()

Friday, December 12, 2014

Spark practice (2): query text using SQL

In a class of a few children, use SQL to find those who are male and weight over 100.
class.txt (including Name Sex Age Height Weight)
Alfred M 14 69.0 112.5 
Alice F 13 56.5 84.0
Barbara F 13 65.3 98.0
Carol F 14 62.8 102.5
Henry M 14 63.5 102.5
James M 12 57.3 83.0
Jane F 12 59.8 84.5
Janet F 15 62.5 112.5
Jeffrey M 13 62.5 84.0
John M 12 59.0 99.5
Joyce F 11 51.3 50.5
Judy F 14 64.3 90.0
Louise F 12 56.3 77.0
Mary F 15 66.5 112.0
Philip M 16 72.0 150.0
Robert M 12 64.8 128.0
Ronald M 15 67.0 133.0
Thomas M 11 57.5 85.0
William M 15 66.5 112.0

Thoughts

The challenge is to transform unstructured data to structured data. In this question, a schema has to be applied including column name and type, so that the syntax of SQL is able to query the pure text.

Single machine solution

Straight-forward and simple if with Python’s built-in module sqlite3.
import sqlite3

conn = sqlite3.connect(':memory:')
c = conn.cursor()
c.execute("""CREATE TABLE class
(Name text, Sex text, Age real, Height real, Weight real)"""
)

with open('class.txt') as infile:
for l in infile:
line = l.split()
c.execute('INSERT INTO class VALUES (?,?,?,?,?)', line)
conn.commit()

for x in c.execute("SELECT * FROM class WHERE Sex = 'M' AND Weight > 100"):
print x
conn.close()

Cluster solution

Spark SQL is built on Hive, and seamlessly queries the JSON formatted data that is semi-structured. To dump the JSON file on the file system will be the first step.
import os
import subprocess
import json
from pyspark import SparkContext
from pyspark.sql import HiveContext
sc = SparkContext()
hiveCtx = HiveContext(sc)
def trans(x):
return {'Name': x[0], 'Sex': x[1], 'Age': int(x[2]), \
'Height': float(x[3]), 'Weight': float(x[4])}
# Remove the output directory for JSON if it exists
if 'class-output' in os.listdir('.'):
subprocess.call(['rm', '-rf', 'class-output'])

rdd = sc.textFile('class.txt')
rdd1 = rdd.map(lambda x: x.split()).map(lambda x: trans(x))
rdd1.map(lambda x: json.dumps(x)).saveAsTextFile('class-output')

infile = hiveCtx.jsonFile("class-output/part-00000")
infile.registerTempTable("class")

query = hiveCtx.sql("""SELECT * FROM class WHERE Sex = 'M' AND Weight > 100
"""
)
for x in query.collect():
print x

sc.stop()
 In a conclusion, JSON should be considered if SQL is desired on Spark.

Sunday, December 7, 2014

Spark practice (1): find the stranger that shares the most friends with me

Given the friend pairs in the sample text below (each line contains two people who are friends), find the stranger that shares the most friends with me.
sample.txt
me Alice
Henry me
Henry Alice
me Jane
Alice John
Jane John
Judy Alice
me Mary
Mary Joyce
Joyce Henry
Judy me
Judy Jane
John Carol
Carol me
Mary Henry
Louise Ronald
Ronald Thomas
William Thomas

Thoughts

The scenario is commonly seen for a social network user. Spark has three methods to query such data:
  • MapReduce
  • GraphX
  • Spark SQL
If I start with the simplest MapReduce approach, then I would like to use two hash tables in Python. First I scan all friend pairs and store the friends for each person in a hash table. Second I use another hash table to count my friends’ friends and pick out the strangers to me.

Single machine solution

#!/usr/bin/env python
# coding=utf-8
htable1 = {}
with open('sample.txt', 'rb') as infile:
for l in infile:
line = l.split()
if line[0] not in htable1:
htable1[line[0]] = [line[1]]
else:
htable1[line[0]] += [line[1]]
if line[1] not in htable1:
htable1[line[1]] = [line[0]]
else:
htable1[line[1]] += [line[0]]

lst = htable1['me']
htable2 = {}
for key, value in htable1.iteritems():
if key in lst:
for x in value:
if x not in lst and x != 'me': # should only limit to strangers
if x not in htable2:
htable2[x] = 1
else:
htable2[x] += 1

for x in sorted(htable2, key = htable2.get, reverse = True):
print "The stranger {} has {} common friends with me".format(x, \
htable2[x])
The result shows that John has three common friends like I do, followed by Joyce who has two. Therefore, John will be the one who is most likely to be recommended by the social network.

Cluster solution

If the log file for the friend pairs is quite big, say, like several GB size, the single machine solution is not able to load the data into the memory and we have to seek help from a cluster.
Spark provides the pair RDD that is similar to a hash table and essentially a key-value structure. To translate the single machine solution to a cluster one, I use the operators from Spark’s Python API including map, reduceByKey, filter, union and sortByKey.
#!/usr/bin/env python
# coding=utf-8
import pyspark
sc = pyspark.SparkContext()
# Load data from hdfs
rdd = sc.textFile('hdfs://sample.txt')
# Build the first pair RDD
rdd1 = rdd.map(lambda x: x.split()).union(rdd.map(lambda x: x.split()[::-1]))
# Bring my friend list to local
lst = rdd1.filter(lambda x: x[0] == 'me').map(lambda x: x[1]).collect()
# Build the second pari RDD
rdd2 = rdd1.filter(lambda x: x[0] in lst).map(lambda x: x[1]) \
.filter(lambda x: x != 'me' and x not in lst) \
.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b) \
.map(lambda (x, y): (y, x)).sortByKey(ascending = False)
# Save the result to hdfs
rdd2.saveAsTextFile("hdfs://sample_output")
# Bring the result to local since the sample is small
for x, y in rdd2.collect():
print "The stranger {} has {} common friends with me".format(y, x)

sc.stop()
The result is the same. In this experiment, most time is spent on the data loading process from HDFS to the memory. The following MapReduce operations actually costs just a small fraction of overall time. In conclusion, Spark fits well on an iterative data analysis against existing RDD.

Friday, December 5, 2014

Use a vector to print Pascal's triangle in SAS

Yesterday Rick Wicklin showed a cool SAS/IML function to use a matrix and print a Pascal’s triangle. I come up with an alternative solution by using a vector in SAS/IML.

Method

Two functions are used, including a main function PascalRule and a helper function _PascalRule. The helper function recycles the vector every time and fills the updated values; the main function increases the length of the vector from 1 to n.

Pro

Get the nth row directly, for example, return the 10th row by PascalRule(10); no need to use a matrix or matrix related operators; use less memory to fit a possibly bigger n.

Con

More lines of codes; slowlier to print the triangle, since there is no data structure such as matrix to remember the transient numbers.
proc iml;
/* The main function */
start PascalRule(n);
if n <= 1 then
return({1})
;
answer = {1, 1};
do i = 1 to n - 2 ;
answer = _PascalRule(answer);
end;
return(answer);
finish;
/* The helper function */
start _PascalRule(vector);
previous = 1;
do i = 2 to nrow(vector);
current = vector[i];
vector[i] = previous + current;
previous = current;
end;
vector = vector // {1};
return(vector);
finish;
/* Print the pascal's triangle */
do i = 1 to 10;
x = PascalRule(i);
x = x`;
print x;
end;
quit;
Theoretically, Rick’s solution has a time complexity of O(N^2) and a space complexity of O(N^2), while my solution has a time complexity of O(N^3) (unfortunately have to use three times of do loop in IML) and a space complexity of O(N). Actually it's a trade-off between speed and memory cost.

Friday, November 28, 2014

Minimize complexity by Spark

There is always a trade-off between time complexity and space complexity for computer programs. Deceasing the time cost will increase space cost, and vice versa, The ideal solution to parallelize the program to multiple cores if there is a multiple-core computer, or even scale it out to multiple machines across a cluster, which would eventually reduce both time complexity and space complexity.
Spark is currently the hottest platform for cluster computing on top of Hadoop, and its Python interface provides map, reduce and many other methods, which allow a mapRecdue job in a straightforward way, and therefore easily migrate an algorithm from a single machine to a cluster of many machines.
  • Minimize space complexity
There is a question to look for the only single number from a mostly paired-number array.
Single Number
  Given an array of integers, every element appears twice except for one.
Find that single one.
Note:
Your algorithm should have a linear runtime complexity.
Could you implement it without using extra memory?
The optimal space complexity for this question is O(1) by using the bit manipulator xor. For a cluster, since Spark aggregates memory acrosss machines, the space complexity may become O(1/k), where k is the number of the machines in the cluster.
# Space.py
import pyspark
from random import shuffle
from operator import xor
sc = pyspark.Spark.Context()

# Create the test case and the target is 99
testCase = range(0, 99) + range(0, 100)
shuffle(testCase)
# Run the testing with Spark
result = sc.parallelize(testCase).reduce(xor)
# Show the result
print result
sc.stop()
  • Minimize time complexity
There is a question to implement the function (or a method) that returns the square root of an integer.
Sqrt(x)
Implement int sqrt(int x).
Compute and return the square root of x.
The optimal solution could achieve the time complexity of O(lgN) by using binary search. If we pass the sqrt function to Spark, then the time complexity will decreased to O(lgN/k), where k is the number of the machines in the cluster.
# Time.py
import pyspark
sc = pyspark.Spark.Context()
# Implement binary search for square root function
def sqrt(x):
if x < 0 or not isinstance(x, int):
raise ValueError
hi, lo = x/2 + 1, 0
while hi >= lo:
mid = (hi + lo) / 2
if mid * mid > x:
hi = mid - 1
else:
lo = mid + 1
return int(hi)

# Test the square root algorithm
testCase = sc.parallelize(xrange(1, 100))
result = testCase.map(lambda x: sqrt(x))
# Show the result
for x in result.collect():
print x
sc.stop()
  • Find the worst rating by accounts
There is a question to find the worst one among a few rating letters for each of the account numbers.
Want to find the worst rating for each account number.
sample.txt is below
Account_number    Rating
1 A
1 B
2 A
2 B
2 C
3 A
3 C
the desired result should be like
1            B
2 C
3 C
The question is essentially one of the grouping questons. Spark’s pair RDD, which reflects the key-value relationship for groups, supplies a one-line solution for it.
import pyspark
sc = pyspark.SparkContext()

# Squeeze the letters by keys
rdd = sc.textFile('sample.txt')
result = rdd.map(lambda x: x.split()).filter(x: x[0].isdigit()).reduceByKey(max)
# Show the result
for x in result.collect():
print x
sc.stop()
In a conclusion, Spark significantly changes the way we think about data analysis.

Monday, October 20, 2014

Automated testing by pytest

The most hard part in testing is to write test cases, which is time-consuming and error-prone. Fortunately, besides Python built-in modules such as doctest, unittest, there are quite a few third-party packages that could help with automated testing. My favorite one is pytest, which enjoys proven record and syntax sugar.

Step 1: test-driven development

For example, there is a coding challenge on Leetcode:
Find Minimum in Rotated Sorted Array
Suppose a sorted array is rotated at some pivot unknown to you beforehand.
(i.e., 0 1 2 4 5 6 7 might become 4 5 6 7 0 1 2).
Find the minimum element.
You may assume no duplicate exists in the array.
The straightforward way to find a minimal element in an array(or list in Python) is sequential searching, which goes through every element and has a time complexity of O(N). If the array is sorted, then the minimal one is the first element that only costs O(1).
However, this question provides a rotated sorted array, which suggests a binary search and reduces the complexity from O(N) to O(logN).
As usual, write the test cases first. The great thing for pytest is that it significantly simplies the effort to code the test cases: in this example, I only use 3 lines to generate 101 test cases to cover all conditions from 0 to 99 and also include an null test.
Next step is to code the function. It is easy to transplant the iterative approach of binary search to this question. If the pointer is between a sorted segment, then return the most left element as minimal. Otherwise, adjust the right boundary and the left boundary.
# test1.py
import pytest

# Prepare 101 test cases
array = list(range(100))
_testdata = [[array[i: ] + array[ :i], 0] for i in range(100)]
_testdata += [pytest.mark.empty(([], None))]

# Code the initial binary search function
def findMinPrev(num):
lo, hi = 0, len(num) - 1
while lo <= hi:
if num[lo] <= num[hi]:
return num[lo]
mid = (lo + hi) / 2
if num[mid] < num[hi]:
hi = mid - 1
else:
lo = mid + 1

@pytest.mark.parametrize('input, expected', _testdata)
def test_findMinPrev(input, expected):
assert findMinPrev(input) == expected
After running the py.test -v test1.py command, part of the results shows below. 65 tests passed and 36 failed; the failed cases return the much bigger values that suggests out of boundary during loops, and the selection of the boudaries may be too aggresive.
test1.py:20: AssertionError
_________________________ test_findMinPrev[input98-0] _________________________

input = [98, 99, 0, 1, 2, 3, ...], expected = 0

@pytest.mark.parametrize('input, expected', _testdata)
def test_findMinPrev(input, expected):
> assert findMinPrev(input) == expected
E assert 98 == 0
E + where 98 = findMinPrev([98, 99, 0, 1, 2, 3, ...])

test1.py:20: AssertionError
==================== 36 failed, 65 passed in 0.72 seconds =====================
Now I adjust the right boundary slightly and finally come up with a solution that passes all the tests.
def findMin(num):
lo, hi = 0, len(num) - 1
while lo <= hi:
if num[lo] <= num[hi]:
return num[lo]
mid = (lo + hi) / 2
if num[mid] < num[hi]:
hi = mid
else:
lo = mid + 1

Step 2: performance profiling

Besides the right solution, I am also interested in if the binary search method has indeed improved the performance. This step I choose line_profiler given its line-by-line ability of profiling. I take the most basic one (the sequential search) as benchmark, and also include the method that applies the min function since a few functions similar to it in Pyhton implement vectorizaiton to speed up. The test case is a rotated sorted array with 10 million elements.
# test2.py
from line_profiler import LineProfiler
from sys import maxint

@profile
def findMinRaw(num):
"""Sequential searching"""
if not num:
return
min_val = maxint
for x in num:
if x < min_val:
min_val = x
return min_val

@profile
def findMinLst(num):
"""Searching by list comprehension"""
if not num:
return
return min(num)

@profile
def findMin(num):
""""Binary search"""
lo, hi = 0, len(num) - 1
while lo <= hi:
if num[lo] <= num[hi]:
return num[lo]
mid = (lo + hi) / 2
if num[mid] < num[hi]:
hi = mid
else:
lo = mid + 1

# Prepare a rotated array
array = list(range(10000000))
_testdata = array[56780: ] + array[ :56780]
# Test the three functions
findMinRaw(_testdata)
findMinLst(_testdata)
findMin(_testdata)
After running kernprof -l -v test2.py, I have the output as below. The sequential search has hit the loops 10000001 times and costs almost 14 seconds. The min function encapsulate all details inside and uses 0.5 seconds which is 28 times faster. On the contrary, the binary search method only takes 20 loops to find the minimal value and spends just 0.0001 seconds. As a result, while dealing with large number, an improved algorithm can really save time.
Total time: 13.8512 s
File: test2.py
Function: findMinRaw at line 4

Line # Hits Time Per Hit % Time Line Contents
==============================================================
4 @profile
5 def findMinRaw(num):
6 1 13 13.0 0.0 if not num:
7 return
8 1 3 3.0 0.0 min_val = maxint
9 10000001 16031900 1.6 47.5 for x in num:
10 10000000 17707821 1.8 52.5 if x < min_val:
11 2 5 2.5 0.0 min_val = x
12 1 3 3.0 0.0 return min_val

Total time: 0.510298 s
File: test2.py
Function: findMinLst at line 15

Line # Hits Time Per Hit % Time Line Contents
==============================================================
15 @profile
16 def findMinLst(num):
17 1 4 4.0 0.0 if not num:
18 return
19 1 1243016 1243016.0 100.0 return min(num)

Total time: 0.000101812 s
File: test2.py
Function: findMin at line 22

Line # Hits Time Per Hit % Time Line Contents
==============================================================
22 @profile
23 def findMin(num):
24 1 15 15.0 6.0 lo, hi = 0, len(num) - 1
25 20 40 2.0 16.1 while lo <= hi:
26 20 48 2.4 19.4 if num[lo] <= num[hi]:
27 1 2 2.0 0.8 return num[lo]
28 19 54 2.8 21.8 mid = (lo + hi) / 2
29 19 50 2.6 20.2 if num[mid] < num[hi]:
30 5 10 2.0 4.0 hi = mid
31 else:
32 14 29 2.1 11.7 lo = mid + 1