Wednesday, April 30, 2014

Count large chunk of data in Python

The line-by-line feature in Python allows it to count hard disk-bound data. The most frequently used data structures in Python are list and dictionary. Many cases the dictionary has advantages since it is a basically a hash table that many realizes O(1) operations.
However, for the tasks of counting values, the two options make no much difference and we can choose any of them for convenience. I listed two examples below.

Use a dictionary as a counter

There is a question to count the strings in Excel.
Count the unique values in one column in EXCEL 2010. The worksheet has 1 million rows and 10 columns.
or numbers.
For example,
A5389579_10
A1543848_6
A5389579_8
Need to cut off the part after (including) underscore such as from A5389579_10 to A5389579
Commonly Excel on a desktop can’t handle this size of data, while Python would easily handle the job.
# Load the Excel file by the xlrd package
import xlrd
book = xlrd.open_workbook("test.xlsx")
sh = book.sheet_by_index(0)
print sh.name, sh.nrows, sh.ncols
print "Cell D30 is", sh.cell_value(rowx=29, colx=3)

# Count the unique values in a dictionary
c = {}
for rx in range(sh.nrows):
word = str(sh.row(rx)[1].value)[:-3]
try:
c[word] += 1
except:
c[word] = 1

print c

Use a list as a counter

There is a question to count emails.
A 3-column data set includes sender, receiver and timestamp. How to calculate the time between the sender sends the email
and the receiver sends the reply email?
The challenge is to scale up the small sample data to larger size. The solution I have has the complexity of O(nlogn), which is only limited by the sorting step.
raw_data = """
SENDER|RECEIVER|TIMESTAMP
A B 56
A A 7
A C 5
C D 9
B B 12
B A 8
F G 12
B A 18
G F 2
A B 20
"""


# Transform the raw data to a nested list
data = raw_data.split()
data.pop(0) # Remove the Head
data = zip(data[0::3], data[1::3], map(lambda x: int(x), data[2::3]))

# Sort the nested list by the timestamp
from operator import itemgetter
data.sort(key=itemgetter(2))
for r in data:
print r

# Count the time difference in a list
c = []
while len(data) != 1:
y = data.pop(0)
for x in data:
if x[0] == y[1] and x[1] == y[0]:
diff = x[2] - y[2]
print y, x, '---->', diff
c.append(diff)
break # Only find the quickest time to respond
print c

P.S.

I come up with the O(n) solution below, which utilizes two hash tables to decrease the complexity.
__author__ = 'dapangmao'

def find_duration(data):
# Construct two hash tables
h1 = {}
h2 = {}
# Find the starting time for each ID pair
for x in data:
if x[0] != x[1]:
key = x[0] + x[1]
try:
h1[key] = x[2]
except:
h1[key] = min(h1[key], x[2])
# Find the minimum duration for each ID pair
for x in data:
key = x[1] + x[0]
if h1.has_key(key):
duration = x[2] - h1[key]
try:
h2[key] = duration
except:
h2[key] = min(h2[key], duration)
return h2

if __name__ == "__main__":
raw_data = """
SENDER|RECEIVER|TIMESTAMP
A B 56
A A 7
A C 5
C D 9
B B 12
B A 8
F G 12
B A 18
G F 2
A B 20
"""


# Transform the raw data to a nested list
data = raw_data.split()
data.pop(0) # Remove the Head
data = zip(data[0::3], data[1::3], map(lambda x: int(x), data[2::3]))
# Verify the result
print find_duration(data)

Sunday, April 6, 2014

10 popular Linux commands for Hadoop

The Hadoop system has its unique shell language, which is called FS. Comparing with the common Bash shell within the Linux ecosystem, the FS shell has much fewer commands. To deal with the humongous size of data distributively stored at the Hadoop nodes, in my practice, I have 10 popular Linux command to facilitate my daily work.
1. sort
A good conduct of running Hadoop is to always test the map/reduce programs at the local machine before releasing the time-consuming map/reduce codes to the cluster environment. The sort command simulates the sort and shuffle step necessary for the map/redcue process. For example, I can run the piped commands below to verify whether the Python codes have any bugs.
./mapper.py | sort | ./reducer.py
2. tail
Interestingly, the FS shell at Hadoop only supports the tail command instead of the head command. Then I can only grab the bottom lines of the data stored at Hadoop.
hadoop fs -tail 5 data/web.log.9
3. sed
Sine the FS shell doesn’t provide the head command, the alternative solution is to use the sed command that actually has more flexible options.
hadoop fs -cat data/web.log.9 | sed '1,+5!d'
4. stat
The stat command allows me to know the time when the file has been touched.
hadoop fs -stat data/web.log.9
5. awk
The commands that the FS shell supports usually have very few options. For example the du command under the FS shell does not support -sh option to aggregate the disk usage of the sub-directories. In this case, I have to look for help from the awk command to satisfy my need.
hadoop fs -du data | awk '{sum+=$1} END {print sum}'
6. wc
One of the most important things to understand a file located at the Hadoop is to find the number of its total lines.
hadoop fs -cat data/web.log.9 | wc -l
7. cut
The cut command is convenient to select the specified columns at the file. For example, I am able to count the lines for each of the unique groups from the column between the position of #5 and #14.
hadoop fs -cat data/web.log.9 | cut -c 5-14 | uniq -c
8. getmerge
The great thing for the getmerge command is that I am able to fetch all the result after map/reduce to the local file system as a single file.
hadoop fs -getmerge result result_merged.txt
9. grep
I can start a mapper-only job only with the grep command form the Bash shell to search the lines which contain the key words I am interested in. And this is a map-only task.
hadoop jar $STREAMING -D mapred.reduce.tasks=0 -input data -output result -mapper "bash -c 'grep -e Texas'"
10. at and crontab
The at and crontab commnands are my favorite to schedule a job at Hadoop. For example, I would like to use the order below to clean the map/reduce results at midnight.
at 0212
at > hadoop fs -rmr result