Easy Use Mapreduce

本文用来记录MR的使用,已经遇到的一些问题和解决方法

#使用Python执行MR

Mapper的写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import json
import time
#加载编码
reload(sys)
sys.setdefaultencoding('utf-8')
for line in sys.stdin:
j = json.loads(line.strip())
print "%s\t%s" % (j.get("name"), j.get("age"))

Reducer的写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import json
import time
#加载编码
reload(sys)
sys.setdefaultencoding('utf-8')
uri_count = {}
for line in sys.stdin:
data = line.strip().split("\t")
key = "%s-%s" % (data[0], data[1])
c = uri_count.get(key, 0)
uri_count[key] = c + 1
for key in uri_count:
print "%s\t%s" % (key, uri_count.get(key, 0))

从上面的代码可以看出来,python的脚本需要从标准输入(sys.stdin)中接入数据。

执行Mapreduce

1
2
3
4
5
6
7
8
9
/home/hadoop/yarn-2.8.1/bin/hadoop jar /home/hadoop/yarn-2.8.1/share/hadoop/tools/lib/hadoop-streaming-2.8.1.jar \
-D mapreduce.job.queuename=bigdata.queue \
-input hdfs://nameservice1/data/mylogs/api_request/2018/07/03/*/* \
-output /tmp/20180703SpecialUri \
-mapper "specialUriMapper.py" \
-reducer "specialUriReducer.py" \
-file /home/hadoop/script/user_action/specialUriMapper.py \
-file /home/hadoop/script/user_action/specialUriReducer.py \
-file /home/hadoop/script/user_action/kickA.log

参数说明:

-D mapreduce.job.queuename用指定需要运行MR的队列
-input MR的输入
-output MR的输出
-mapper 指定执行MR中Mapper的程序
-reducer 指定执行MR中Reducer的程序
-file 需要一起上传的文件,如果python程序中使用了其他的数据文件,可以通过这个参数一起上传。

其他一些参数:

-D mapreduce.job.name Job的名称
-D mapreduce.job.user.name
-D mapreduce.job.node-label-expression
-D mapreduce.job.queuename
-D mapreduce.map.memory.mb
-D mapreduce.reduce.memory.mb