MapReduce文件切分个数计算方法

Hadoop的MapReduce计算的第一个阶段是InputFormat处理的,先将文件进行切分,然后将每个切分传递给每个Map任务来执行,本文阐述切分个数,也就是Map任务数目的计算方法;

Hadoop首先会计算每个切分的大小,然后使用”文件总大小/每个切分的大小“来决定划分的总数,如果不足一个切分的大小,则当做1个;

在org.apache.hadoop.mapred.FileInputFormat中给出了计算每个划分大小的方法:

  protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
  }

其中几个变量的解释如下:

  • blockSize:HDFS存储的基本单元,默认为64MB或者128MB;
  • minSize:由用户设置的最小切分大小,配置项为mapred.min.split.size;
  • goalSize:计算公式为"文件总大小/用户设定的Map任务个数",即用户间接期望的大小;

由此可以推断出选定策略:

  • 划分大小为blockSize:blockSize小于用户期望的大小,比用户设定的最小值要大;也就是说如果用户设定的最小值太小的话,会使用block size作为划分大小
  • 划分大小为goalSize:用户设定了Map的任务数目,那么即使算出来的划分大小比block size小也会使用,这个时候出现了两个用户设定值:最小值和期望值,hadoop会选择两者中大的那个;
  • 划分大小为minSize:如果用户期望的值,还有blocksize只都比用户设定的最小值要小,那么就会使用这个最小值;

针对这几个值,用户可以根据输入数据的情况,合理的设置mapred.min.split.size和map.tasks.size来实现优化,InputSplit和blockSize相同是非常好的一种方法,因为不需要拆分block了.

文章地址:http://crazyant.net/1423.html

 

《大数据时代》是一部科幻小说

虽然一直在使用HADOOP、HIVE等技术在处理T级别的数据,认为自己也属于大数据领域的人了,然而对于什么是大数据一直是模糊的概念。自己处理的是T级别数据,感觉HADOOP技术其实就是分布式计算的演变版,并不是什么新奇的事物。然而最近读的《大数据时代》一书,作者从思维、商业、风险、掌控等方面对大数据给出了自己独特的见解,其中的有些观点有些匪夷所思,然而毕竟是作者自己看法,很是新颖。

102265124_360

1、不是随机样本,而是全体数据

大数据时代的来临,一切数据将是全体数据。比如在之前我们要统计春运期间哪里的车站人最多,或者乘客流量趋势,只能通过人工的方式采样几个地区的车站,统计其人流量,然后对整体情况作出预测,而如今通过手机GPS定位,百度能够收集任何一个人所在的位置,使用最全量的位置信息绘制出所有的流动情况。

这正是因为目前存储能力(廉价的磁盘)和计算能力(HADOOP等云计算技术兴起)的大幅改进所带来的变革,只要能检测到,就能对其数字化、存储,然后分析其中的规律或者预测其中蕴含的趋势。通过获取全量的数据而不是局部的采样,能够得到最全面不留任何死角的信息。得到的决策判断也就更加准确,往往能揭示出限于局部无法察觉的规律。

“一叶障目不见泰山”、“不识庐山真面目,只缘身在此山中”,这些都是说人们往往由于视野、见识等限制,无法看到事情的全貌,从而会做出局限性、局部性的判断和认识。现在如果能看到整个泰山,能看到庐山全面目,那么人们看到的可能不只是山的雄伟,可能还有山的趋势、山的变迁。

2、大数据对人类的威胁?

作者举了一个例子,未来通过大数据能够预测到一个人会在下一周犯罪的几率有多大,然后警察提前逮捕此人甚至定罪,就因为通过出行、电话、购物、行为表现等各种信息,大数据发现其跟某种犯罪特征极为相近,判定其要犯罪的几率非常的大而得出结论。

看到这里倒像是感觉作者在写一部科幻小说,未来政府可能只需要一个大数据中心,一方面通过互联网、摄像头、传感器、刷卡消费记录等各种信息收集到几乎方方面面每个人所有的数据;另一方面能够建立各种犯罪、治安、暴力事情的特征库,通过和每个人的行为特征对比,政府能够判定这个人是否将来会犯罪、是否会酒驾、是否会逃税,从而加重对这个人的监控,甚至抓起来审问“你为什么将来会偷邻居王小二家的西瓜?”而商业领域也会同样的建立每个人的消费数据、习惯、洗好等信息,有一天当你洗发水用完了要出门去买,打开门发现货到付款的快递已经到你楼下了,你是应该感到恐怖呢还是应该高兴呢。

作者倒像是一个科幻小说家,构想了这么一副宏达的未来世界之后,他还对其防范措施、法律指定、反垄断等实际方案进行了阐述。信誓旦旦的提到,未来应该制定法律,应该保护人类自由选择、为行为负责的权利,而不能过分的相信大数据判定某人可能有罪的结论。

很荒诞,也很有趣,能自圆其说实在是了不起。从这本书我真的感觉到了,其实要有所突出就要敢提出自己的想法,哪怕这个想法很荒诞很错误,但是只要是自己的想法,就要敢于提出。

3、大数据技能的发展

作者有些观点我也挺赞同的,他提到目前来说大数据领域仍在兴起阶段,因此相当一段时间内大数据的分析师、算法师等人才是极为稀缺的职位,同时大数据技能也是很快发展但是非常重要的技术。然而随着时间的发展,技术并不是门槛,因为各种数据、教程的出现,懂并且熟练掌握这种技术的人会越来越多,技术会变得越来越不值钱。

而一直价值不会降低的,是数据。因为数据存放的时间即使很长,由于新的分析、挖掘想法的出现,这些旧的数据蕴含的金矿才会被一次次重复性的发现出来。

联系到个人的发展,首先自己的大数据技能目前只限于简单的处理,并没有涉及到分析、挖掘内涵、发现规律等领域,要想涉足大数据领域,只会文本分析和简单处理是不够的,需要继续深入到分析领域。不只是表层的技术,而是机器学习、数据挖掘领域等探索性的技术。

或者将来可以拥兵自重,以数据中间人的方式来谋生,比如数据采集、存储、整合等领域,活生生的例子是微博爬萌和同学的数据采集自游职业,能拿来买卖的是数据而不是技能。

换句话说,之前的是金子,而不是挖矿技能。

 

总的来说,这本书让我一个自认为处于大数据领域,而其实只是边缘化的人来说,涨了一些见识,懂得了什么是大数据,大数据究竟以为着什么。为什么都在热捧大数据它到底带来了什么变化。思想上的提升才能带来现实的改变,作者毫无束缚天马行空的思想,让我感觉这确实是一个广阔的天地,能发挥的空间非常大。

 

转载请注明链接:http://crazyant.net/1413.html

shell/hadoop/hive一些有用命令收集

有些命令工作中经常用到,记录在一个文章里用于查阅,本文经常更新。

shell命令

linux统计某个目录下所有文件的行数的命令
find /home/crazyant -type f -name "*" | xargs cat | wc -l

用find查找crazyant目录下所有文本文件的行数之和。不过该命令执行挺慢的。

linux统计某个目录下所有目录和总目录的大小命令
du -h --max-depth=1 /home/crazyant/

统计crazyant目录下的所有文件的大小,这里我只想看到一层目录的大小,因此加上了—max-depth=1如果不加这个参数,该命令会以递归的方式列出所有子目录的文件大小

scp命令的使用:

从本地复制到远程:scp -r  logs_jx pss@crazyant.net/home/pss/logs

hive命令

hive建立和执行索引
create index table02_index on table table02(id) as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' with deferred rebuild; 
alter index table02_index on table02 rebuild;

第一句hive在表上建立了索引但没有生效,第二句真正的执行建立索引的过程,其本身也是一个map –reduce过程。

hive的Load data命令
hive -e "
	use crazyant_net; 
	LOAD DATA INPATH '/app/crazyant/student/*' INTO TABLE student;"

其中inpath的意思是input path的意思,所以不管是从本地local还是hdfs上load data,都要带上。

 

hadoop命令

hadoop的distcp命令带用户名密码的方法
hadoop distcp -su user1,pass1 -du user1,pass1 hdfs://src1 hdfs://dest1

有时候两个集群之间是没有distcp权限的,这时候需要带上两个集群的账号密码,在-su后面带上第一个集群的账号密码,在-du后面带上目标集群的账号密码。

Mysql命令

 查看数据表的最详细的字段描述信息
SHOW FULL FIELDS FROM student

该命令把注释、权限、默认值、类型等表字段信息都列出来了。

查看正在执行的mysql线程

mysql> show processlist;

+———+———–+——————–+————-+———+——+——-+——————+

| Id      |    User   | Host               | db          | Command | Time | State | Info             |

+———+———–+——————–+————-+———+——+——-+——————+

| 2153201 | crazyant  | 127.0.0.1:25357    | pulse       | Sleep   |  914 |       | NULL             |

| 2153733 | crazyant  | 127.0.0.1:48814    | hive        | Query   |    0 | NULL  | show processlist |

| 2153735 | crazyant  | 127.0.0.1:39639    | pulse       | Sleep   |   13 |       | NULL             |

| 2153736 | crazyant  | 127.0.0.1:39640    | pulse       | Sleep   |   13 |       | NULL             |

+———+———–+——————–+————-+———+——+——-+——————+

4 rows in set (0.01 sec)

mysql>

使用kill命令可以把其中的一个进程给删了

mysql> kill 2153474;

Query OK, 0 rows affected (0.00 sec)

将数据LOAD到MYSQL的方法:

LOAD DATA LOCAL INFILE ‘D:/workbench/python/result.txt’ REPLACE INTO TABLE my_urlvisit FIELDS TERMINATED BY’\t’ LINES TERMINATED BY’\n’ IGNORE 0 LINES (url,pdate,COUNT);

vim命令

如果编辑错误,按ESC回到命令模式,按u可以撤销刚才的编辑;

vim编辑中文文本出现乱码

VIM的文本经常会出现中文乱码,这是因为fileencoding和termencoding编码不一致造成的,设置一直就可以了;

:set termencoding

termencoding=cp936

:set fileencoding

fileencoding=utf-8

:set termencoding=utf8

这样设置一下vim就能正常显示中文了;

转载请注明来源:http://crazyant.net/1209.html

hadoop第一个程序WordCount.java的编译运行过程

java是hadoop开发的标准官方语言,本文下载了官方的WordCount.java并对其进行了编译和打包,然后使用测试数据运行了该hadoop程序。

 

这里假定已经装好了hadoop的环境,在Linux下运行hadoop命令能够正常执行;

下载java版本的WordCount.java程序。

 

将WordCount.java复制到linux下的一个目录,这里我复制到/home/crazyant/hadoop_wordcount

[crazyant@dev.mechine hadoop_wordcount]$ ll

total 4

-rwxr–r–  1 crazyant crazyant 1921 Aug 16 20:03 WordCount.java

在该目录(/home/crazyant/hadoop_wordcount)下创建wordcount_classes目录,用于存放编译WordCount.java生成的class文件。

[crazyant@dev.mechine hadoop_wordcount]$ mkdir wordcount_classes

[crazyant@dev.mechine hadoop_wordcount]$ ll

total 8

drwxrwxr-x  2 crazyant crazyant 4096 Aug 16 20:07 wordcount_classes

-rwxr–r–  1 crazyant crazyant 1921 Aug 16 20:03 WordCount.java

编译WordCount.java文件,其中-classpath选项表示要引用hadoop官方的包,-d选项表示要将编译后的class文件生成的目标目录。

[crazyant@dev.mechine hadoop_wordcount]$ javac -classpath /home/crazyant/app/hadoop/hadoop-2-core.jar -d wordcount_classes WordCount.java

[crazyant@dev.mechine hadoop_wordcount]$ ll -R

.:

total 8

drwxrwxr-x  3 crazyant crazyant 4096 Aug 16 20:09 wordcount_classes

-rwxr–r–  1 crazyant crazyant 1921 Aug 16 20:03 WordCount.java

 

./wordcount_classes:

total 4

drwxrwxr-x  3 crazyant crazyant 4096 Aug 16 20:09 org

 

./wordcount_classes/org:

total 4

drwxrwxr-x  2 crazyant crazyant 4096 Aug 16 20:09 myorg

 

./wordcount_classes/org/myorg:

total 12

-rw-rw-r–  1 crazyant crazyant 1546 Aug 16 20:09 WordCount.class

-rw-rw-r–  1 crazyant crazyant 1938 Aug 16 20:09 WordCount$Map.class

-rw-rw-r–  1 crazyant crazyant 1611 Aug 16 20:09 WordCount$Reduce.class

然后将编译后的class文件打包:

[crazyant@dev.mechine hadoop_wordcount]$ jar -cvf wordcount.jar -C wordcount_classes/ .

added manifest

adding: org/(in = 0) (out= 0)(stored 0%)

adding: org/myorg/(in = 0) (out= 0)(stored 0%)

adding: org/myorg/WordCount$Map.class(in = 1938) (out= 798)(deflated 58%)

adding: org/myorg/WordCount$Reduce.class(in = 1611) (out= 649)(deflated 59%)

adding: org/myorg/WordCount.class(in = 1546) (out= 749)(deflated 51%)

[crazyant@dev.mechine hadoop_wordcount]$ ll

total 12

drwxrwxr-x  3 crazyant crazyant 4096 Aug 16 20:09 wordcount_classes

-rw-rw-r–  1 crazyant crazyant 3169 Aug 16 20:11 wordcount.jar

-rwxr–r–  1 crazyant crazyant 1921 Aug 16 20:03 WordCount.java

 

在本地用echo生成一个文件,用于输入数据:

[crazyant@dev.mechine hadoop_wordcount]$ echo “hello world, hello crazyant, i am the ant, i am your brother” > inputfile

[crazyant@dev.mechine hadoop_wordcount]$ more inputfile

hello world, hello crazyant, i am the ant, i am your brother

在hadoop上建立一个目录,里面建立输入文件的目录

[crazyant@dev.mechine hadoop_wordcount]$ hadoop fs -mkdir /app/word_count/input

[crazyant@dev.mechine hadoop_wordcount]$ hadoop fs -ls /app/word_count

Found 1 items

drwxr-xr-x   3 czt czt          0 2013-08-16 20:16 /app/word_count/input

 

将本地刚刚写的的inputfile上传到hadoop上的input目录

[crazyant@dev.mechine hadoop_wordcount]$ hadoop fs -put inputfile /app/word_count/input

[crazyant@dev.mechine hadoop_wordcount]$ hadoop fs -ls /app/word_count/input

Found 1 items

-rw-r–r–   3 czt czt         61 2013-08-16 20:18 /app/word_count/input/inputfile

 

运行jar,以建立的Input目录作为输入参数

[crazyant@dev.mechine hadoop_wordcount]$ hadoop jar wordcount.jar org.myorg.WordCount /app/word_count/input /app/word_count/output

13/08/16 20:19:38 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

13/08/16 20:19:40 INFO util.NativeCodeLoader: Loaded the native-hadoop library

13/08/16 20:19:40 INFO compress.LzoCodec: Successfully loaded & initialized native-lzo library

13/08/16 20:19:40 INFO compress.LzmaCodec: Successfully loaded & initialized native-lzma library

13/08/16 20:19:40 INFO compress.QuickLzCodec: Successfully loaded & initialized native-quicklz library

13/08/16 20:19:40 INFO mapred.FileInputFormat: Total input paths to process : 1

13/08/16 20:19:41 INFO mapred.JobClient: splits size : 61

13/08/16 20:19:41 INFO mapred.JobClient: Running job: job_20130813122541_105844

13/08/16 20:19:43 INFO mapred.JobClient:  map 0% reduce 0%

13/08/16 20:19:57 INFO mapred.JobClient:  map 24% reduce 0%

13/08/16 20:20:07 INFO mapred.JobClient:  map 93% reduce 0%

13/08/16 20:20:16 INFO mapred.JobClient:  map 100% reduce 1%

13/08/16 20:20:26 INFO mapred.JobClient:  map 100% reduce 61%

13/08/16 20:20:36 INFO mapred.JobClient:  map 100% reduce 89%

13/08/16 20:20:47 INFO mapred.JobClient:  map 100% reduce 96%

13/08/16 20:20:57 INFO mapred.JobClient:  map 100% reduce 98%

13/08/16 20:21:00 INFO mapred.JobClient: Updating completed job! Ignoring …

13/08/16 20:21:00 INFO mapred.JobClient: Updating completed job! Ignoring …

13/08/16 20:21:00 INFO mapred.JobClient: Job complete: job_20130813122541_105844

13/08/16 20:21:00 INFO mapred.JobClient: Counters: 19

13/08/16 20:21:00 INFO mapred.JobClient:   File Systems

13/08/16 20:21:00 INFO mapred.JobClient:     HDFS bytes read=1951

13/08/16 20:21:00 INFO mapred.JobClient:     HDFS bytes written=68

13/08/16 20:21:00 INFO mapred.JobClient:     Local bytes read=5174715

13/08/16 20:21:00 INFO mapred.JobClient:     Local bytes written=256814

13/08/16 20:21:00 INFO mapred.JobClient:   Job Counters

13/08/16 20:21:00 INFO mapred.JobClient:     Launched reduce tasks=100

13/08/16 20:21:00 INFO mapred.JobClient:     Rack-local map tasks=61

13/08/16 20:21:00 INFO mapred.JobClient:     ORIGINAL_REDUCES=100

13/08/16 20:21:00 INFO mapred.JobClient:     Launched map tasks=61

13/08/16 20:21:00 INFO mapred.JobClient:     MISS_SCHEDULED_REDUCES=15

13/08/16 20:21:00 INFO mapred.JobClient:   TASK_STATISTICS

13/08/16 20:21:00 INFO mapred.JobClient:     Total Map Slot Time=34

13/08/16 20:21:00 INFO mapred.JobClient:     Attempt_0 Map Task Count=61

13/08/16 20:21:00 INFO mapred.JobClient:     Total Reduce Slot Time=892

13/08/16 20:21:00 INFO mapred.JobClient:   Map-Reduce Framework

13/08/16 20:21:00 INFO mapred.JobClient:     Reduce input groups=9

13/08/16 20:21:00 INFO mapred.JobClient:     Combine output records=0

13/08/16 20:21:00 INFO mapred.JobClient:     Map input records=1

13/08/16 20:21:00 INFO mapred.JobClient:     Reduce output records=9

13/08/16 20:21:00 INFO mapred.JobClient:     Map input bytes=61

13/08/16 20:21:00 INFO mapred.JobClient:     Combine input records=0

13/08/16 20:21:00 INFO mapred.JobClient:     Reduce input records=9

查看output目录是否有结果

[crazyant@dev.mechine hadoop_wordcount]$ hadoop fs -ls /app/word_count/output                                                    Found 100 items

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00000

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00001

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00002

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00003

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00004

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00005

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00006

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00007

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00008

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00009

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00010

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00011

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00012

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00013

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00014

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00015

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00016

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00017

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00018

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00019

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00020

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00021

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00022

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00023

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00024

-rw-r–r–   3 czt czt          0 2013-08-16 20:20 /app/word_count/output/part-00025

 

将该目录下所有文本文件合并后下载到本地

[crazyant@dev.mechine hadoop_wordcount]$ hadoop fs -getmerge /app/word_count/output wordcount_result

[crazyant@dev.mechine hadoop_wordcount]$ ls

inputfile  wordcount_classes  wordcount.jar  WordCount.java  wordcount_result

查看一下下载下来的计算结果

[crazyant@dev.mechine hadoop_wordcount]$ more wordcount_result

i       2

your    1

crazyant,       1

brother 1

hello   2

am      2

world,  1

the     1

ant,    1

 

统计结果正确;

 

参考文章:http://hadoop.apache.org/docs/r0.18.3/mapred_tutorial.html#Example%3A+WordCount+v1.0

Hadoop-Streaming实战经验及问题解决方法总结

目录

1.   Join操作分清join的类型很重要…

2.  启动程序中key字段和partition字段的设定…

3.  控制hadoop程序内存的方法…

4.   对于数字key的排序问题…

5.   在mapper中获取map_input_file环境变量的方法…

6.   运行过程中记录数据的方法…

7.  多次运行Hadoop之是否成功的判断…

8.  对stdin读取的 line的预处理…

9.  Python字符串的连接方法…

10.  怎样查看mapper程序的输出…

11.  SHELL脚本中变量名的命名方法…

12.  提前设计好流程能简化很多重复工作…

13.  其他一些实用经验…

1. Join操作分清join的类型很重要

Join操作是hadoop计算中非常常见的需求,它要求将两个不同数据源的数据根据一个或多个key字段连接成一个合并数据输出,由于key字段数据的特殊性,导致join分成三种类型,处理方法各有不同,如果一个key在数据中可以重复,则记该数据源为N类型,如果只能出现一次,则记为1类型。

1)  类型1-1的join

比如(学号,姓名)和(学号,班级)两个数据集根据学号字段进行join,因为同一个学号只能指向单个名字和单个班级,所以为1-1类型,处理方法是map阶段加上标记后,reduce阶段接收到的数据是每两个一个分组,这样的话只需要读取第一行,将非key字段连到第二行后面即可。

每个学号输出数据:1*1=1个

2)  类型1-N或者N-1的join

比如(学号,姓名)和(学号,选修的课程)两个数据集根据学号字段的join,由于第二个数据源的数据中每个学号会对应很多的课程,所以为1-N类型join,处理方法是map阶段给第一个数据源(类型1)加上标记为1,第二个数据源加上标记为2。这样的话reduce阶段收到的数据以标记为1的行分组,同时每组行数会大于2,join方法是先读取标记1的行,记录其非key字段Field Value 1,然后往下遍历,每次遇到标记2的行都将Field Value 1添加到该行的末尾并输出。

每个学号输出数据:1*N=N*1=N个

3)  类型M-N的join

比如(学号,选修的课程)和(学号,喜欢的水果)根据学号字段做join,由于每个数据源的单个学号都会对应多个相应数据,所以为M*N类型。处理方法是map阶段给数据源小的加上标记1(目的是reduce阶段的节省内存),给数据源大的加上标记2,reduce阶段每个分组会有M*N行,并且标记1的全部在标记2的前面。Join方法是先初始化一个空数组,遇到标记1的行时,将非key数据都记录在数组中,然后遇到标记2的行时,将数组中的数据添加在该行之后输出。

每个学号输出数据:M*N个

2. 启动程序中key字段和partition字段的设定

在join计算过程中,有两个字段非常的重要并需要对其理解,就是排序字段key和分区字段partition的指定。

字段字段说明

num.key.fields.for.partition

用于分区,只影响数据被分发到哪个reduce机器,但不影响排序

stream.num.map.output.key.fields

Key的意思就是主键,这个主键会影响到数据根据前几列的排序
org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner如果需要对字段排序、分区,默认都得加上此设置

上面三个配置尤其会影响到join计算时的配置:

1)  如果是单key的join,因为要加上标记字段排序,所以设定key=2,同时设定partition=1对第一个字段分区来保证同Key的数据都在同一台机器上;

2)  如果是N个联合key的join,首先需要加上标记字段,所以设定key=N+1,用来对其进行排序,然后需要partition为N来对其按key分区。

3. 控制hadoop程序内存的方法

Hadoop程序是针对海量数据的,因此任何一个保存变量的操作都会在内存中造成N倍的存储,如果尝试用一个数组记录每一行或某些行的单个字段,用不到程序运行结束,hadoop平台就会爆出137内存超出的错误而被kill掉。

控制内存的方法就是少用变量、尤其数组来记录数据,最终实现当前行的处理与数据总规模的无关,汇总、M*N的join等处理不得不记录历史数据,对这种处理要做到用后及时释放,同时尽量记录在单变量而不是数组中,比如汇总计算可以每次记录累加值,而不是先记录所有的元素最后才汇总。

4. 对于数字key的排序问题

如果不加以处理,排序处理过程中数字1会排在10之后,处理方法是需要在数字前面补0,比如如果全部有2位,就将个位数补1个零,让01和10比较,最终reduce输出的时候,再转回来,需要先预测数字的位数。

在mapper.py中:

Print ‘%010d\t%s’%(int(key),value)

其中key既然是数字,就需要用数字的格式化输出%010d表示将输出10位的字符串,如果不够10位,前面补0。

在reducer.py中,最终输出时,使用转int的方法去掉前面的0:

Print ‘%d\t%s’%(int(key),value)

5. 在mapper中获取map_input_file环境变量的方法

在mapper中,有时候为了区分不同的数据文件来源,这时候可以用map_input_file变量来记录当前正在处理的脚本的文件路径。以下是两种判别方法:

a)        用文件名判断

Import os

filepath = os.environ[“map_input_file”]
filename = os.path.split(filepath)[-1]

if filename==”filename1”:

#process 1

elif filename==”filename2”:

#process2

b)        用文件路径是否包含确定字符串判断

filepath = os.environ[“map_input_file”]

if filepath.find(sys.argv[2])!=-1:

#process

6. 运行过程中记录数据的方法

Hadoop程序不同于本地程序的调试方法,可以使用错误日志来查看错误信息,提交任务前也可以在本地用cat input | mapper.py | sort | reducer.py > output这种方法来先过滤基本的错误,在运行过程中也可以通过以下方法记录信息:

1)  可以直接将信息输出到std output,程序运行结束后,需要手工筛选记录的数据,或者用awk直接查看,但是会污染结果数据

2)  大多采用的是用错误输出的方法,这样运行后可以在stderr日志里面查看自己输出的数据:sys.stderr.write(‘filename:%s\t’%(filename))

7.  多次运行Hadoop之是否成功的判断

如果要运行多次的hadoop计算,并且前一次的计算结果是下一次计算的输入,那么如果上一次计算失败了,下一次很明显不需要启动计算。因此在shell文件中可以通过$?来判断上一次是否运行成功,示例代码:

if [ $? –ne 0 ];then

   exit 1

fi

8. 对stdin读取的 line的预处理

Mapper和reducer程序都是从标准输入读取数据的,然而如果直接进行split会发现最后一个字段后面跟了个’\n’,解决方法有两种:

1)  datas = line[:-1].split(‘\t’)

2)  datas=line.strip().split(‘\t’)

第一种方法直接去除最后一个字符\n,然后split,第二种方法是去除行两边的空格 (包括换行),然后split。个人喜欢用第二种,因为我不确定是否所有行都是\n结尾的,但是有些数据两边会有空格,如果strip掉的话就会伤害数据,所以可以根据情景选用。

9. Python字符串的连接方法

Mapper和reducer的输出或者中间的处理经常需要将不同类型的字符串结合在一起,python中实现字符串连接的方法有格式化输出、字符串连接(加号)和join操作(需要将每个字段转化成字符类型)。

使用格式化输出:’%d\t%s’%(inti,str)

使用字符串的+号进行连接:’%d\t’%i+’\t’.join(list)

写成元祖的\t的Join:’\t’.join((‘%d’%i, ‘\t’.join(list)))

10. 怎样查看mapper程序的输出

一般来说,mapper程序经过处理后,会经过排序然后partition给不同的reducer来做下一步的处理,然而在开发过程中常常需要查看当前的mapper输出是否是预期的结果,对其输出的查看有两种需求。

需求一,查看mapper的直接输出:

在运行脚本中,不设定-reducer参数,也就是没有reducer程序,然后把-D mapred.reduce.tasks=0,即不需要任何reduce的处理,但是同时要设定-output选项,这样的话,在output的目录中会看到每个mapper机器输出的一个文件,就是mapper程序的直接输出。

需求二,查看mapper的输出被partition并排序后的内容,即reducer的输入是什么样子:在运行脚本中,不设定-reducer参数,也就是没有自己的reducer程序,然后把-D mapred.reduce.tasks=1或者更大的值,即有reduce机器,但是没有reducer程序,hadoop会认为有reducer是存在的,因此会继续对mapper的输出调用shuffle打乱和sort操作,这样的话就在output目录下面看到了reducer的输入文件,并且数目等于reducer设定的tasks个数。

11. SHELL脚本中变量名的命名方法

如果遇到很多的输入数据源和很多输出的中间结果,每个hadoop的输出都会用到下一步的输入,并且该人物也用到了其他的输出,这样的话最好在一个统一的shell配置文件中配置所有的文件路径名字,同时一定避免InputDir1、InputDir2这样的命名方法,变量命名是一种功力,一定要多练直观并且显而易见,这样随着程序规模的增加不会变的越来越乱。

12. 提前设计好流程能简化很多重复工作

近期自己接到一个较为复杂的hadoop数据处理流程,大大小小的处理估算的话得十几个hadoop任务才能完成,不过幸好没有直接开始写代码,而是把这些任务统一整理了一下,最后竟然发现很多个问题可以直接合并成一类代码处理,过程中同时将整个任务拆分成了很多小任务并列了个顺序,然后挨个解决小任务非常的快。Hadoop处理流程中如果任务之间错综复杂并相互依赖对方的处理结果,都需要事先设计好处理流程再开始事先。

13. 其他一些实用经验

1)  Mapper和reducer脚本写在同一个Python程序,便于对比和查看;

2)  独立编写数据源的字段信息和位置映射字典,不容易混淆;

3)  抽取常用的如输出数据、读入数据模块为独立函数;

4)  测试脚本及数据、run脚本、map-reduce程序分目录放置;

 

 

Hadoop之使用python实现数据集合间join操作

hadoop之steaming介绍

hadoop有个工具叫做steaming,能够支持python、shell、C++、PHP等其他任何支持标准输入stdin及标准输出stdout的语言,其运行原理可以通过和标准java的map-reduce程序对比来说明:

使用原生java语言实现Map-reduce程序
  1. hadoop准备好数据后,将数据传送给java的map程序
  2. java的map程序将数据处理后,输出O1
  3. hadoop将O1打散、排序,然后传给不同的reduce机器
  4. 每个reduce机器将传来的数据传给reduce程序
  5. reduce程序将数据处理,输出最终数据O2
借助hadoop streaming使用python语言实现Map-reduce程序
  1. hadoop准备好数据后,将数据传送给java的map程序
  2. java的map程序将数据处理成“键/值”对,并传送给python的map程序
  3. python的map程序将数据处理后,将结果传回给java的map程序
  4. java的map程序将数据输出为O1
  5. hadoop将O1打散、排序,然后传给不同的reduce机器
  6. 每个reduce机器将传来的数据处理成“键/值”对,并传送给python的reduce程序
  7. python的reduce程序将数据处理后,将结果返回给java的reduce程序
  8. java的reduce程序将数据处理,输出最终数据O2

上面红色表示map的对比,蓝色表示reduce的对比,可以看出streaming程序多了一步中间处理,这样说来steaming程序的效率和性能应该低于java版的程序,然而python的开发效率、运行性能有时候会大于java,这就是streaming的优势所在。

hadoop之实现集合join的需求

hadoop是用来做数据分析的,大都是对集合进行操作,因此该过程中将集合join起来使得一个集合能得到另一个集合对应的信息的需求非常常见。

比如以下这个需求,有两份数据:学生信息(学号,姓名)和学生成绩(学号、课程、成绩),特点是有个共同的主键“学号”,现在需要将两者结合起来得到数据(学号,姓名,课程,成绩),计算公式:

学号,姓名) join (学号,课程,成绩)= (学号,姓名,课程,成绩)

数据事例1-学生信息:

学号sno姓名name
01name1
02name2
03name3
04name4

数据事例2:-学生成绩:

学号sno课程号courseno成绩grade
010180
010290
020182
020295

期待的最终输出:

学号sno姓名name课程courseno成绩grade
01name10180
01name10290
02name20182
02name20295

实现join的注意点和易踩坑总结

如果你想写一个完善健壮的map reduce程序,我建议你首先弄清楚输入数据的格式、输出数据的格式,然后自己手动构建输入数据并手动计算出输出数据,这个过程中你会发现一些写程序中需要特别处理的地方:

  1. 实现join的key是哪个,是1个字段还是2个字段,本例中key是sno,1个字段
  2. 每个集合中key是否可以重复,本例中数据1不可重复,数据2的key可以重复
  3. 每个集合中key的对应值是否可以不存在,本例中有学生会没成绩,所以数据2的key可以为空

第1条会影响到hadoop启动脚本中key.fields和partition的配置,第2条会影响到map-reduce程序中具体的代码实现方式,第3条同样影响代码编写方式。

hadoop实现join操作的思路

具体思路是给每个数据源加上一个数字标记label,这样hadoop对其排序后同一个字段的数据排在一起并且按照label排好序了,于是直接将相邻相同key的数据合并在一起输出就得到了结果。

1、 map阶段:给表1和表2加标记,其实就是多输出一个字段,比如表一加标记为0,表2加标记为2;

2、 partion阶段:根据学号key为第一主键,标记label为第二主键进行排序和分区

3、 reduce阶段:由于已经按照第一主键、第二主键排好了序,将相邻相同key数据合并输出

hadoop使用python实现join的map和reduce代码

mapper.py的代码:

# -*- coding: utf-8 -*-
#Mapper.py
#来自疯狂的蚂蚁www.crazyant.net
import os
import sys

#mapper脚本
def mapper():
	#获取当前正在处理的文件的名字,这里我们有两个输入文件
	#所以要加以区分
	filepath = os.environ["map_input_file"]
	filename = os.path.split(filepath)[-1]
	for line in sys.stdin:
		if line.strip()=="":
			continue
		fields = line[:-1].split("\t")
		sno = fields[0]
		#以下判断filename的目的是不同的文件有不同的字段,并且需加上不同的标记
		if filename == 'data_info':
			name = fields[1]
			#下面的数字'0'就是为数据源1加上的统一标记
			print '\t'.join((sno,'0',name))
		elif filename == 'data_grade':
			courseno = fields[1]
			grade = fields[2]
			#下面的数字'1'就是为数据源1加上的统一标记
			print '\t'.join((sno,'1',courseno,grade))

if __name__=='__main__':
	mapper()

reducer的代码:

# -*- coding: utf-8 -*-
#reducer.py
#来自疯狂的蚂蚁www.crazyant.net
import sys

def reducer():
	#为了记录和上一个记录的区别,用lastsno记录上个sno
	lastsno = ""

	for line in sys.stdin:
		if line.strip()=="":
			continue
		fields = line[:-1].split("\t")
		sno = fields[0]
		'''
		处理思路:
		遇见当前key与上一条key不同并且label=0,就记录下来name值,
		当前key与上一条key相同并且label==1,则将本条数据的courseno、
		grade联通上一条记录的name一起输出成最终结果
		'''
		if sno != lastsno:
			name=""
			#这里没有判断label==1的情况,
			#因为sno!=lastno,并且label=1表示该条key没有数据源1的数据
			if fields[1]=="0":
				name=fields[2]
		elif sno==lastno:
			#这里没有判断label==0的情况,
			#因为sno==lastno并且label==0表示该条key没有数据源2的数据
			if fields[2]=="1":
				courseno=fields[2]
				grade=fields[3]
				if name:
					print '\t'.join((lastsno,name,courseno,grade))
		lastsno = sno

if __name__=='__main__':
	reducer()

使用shell脚本启动hadoop程序的方法:

#先删除输出目录
~/hadoop-client/hadoop/bin/hadoop fs -rmr /hdfs/jointest/output
#来自疯狂的蚂蚁www.crazyant.net
#注意,下面配置中的环境值每个人机器不一样
~/hadoop-client/hadoop/bin/hadoop streaming \
	-D mapred.map.tasks=10 \
	-D mapred.reduce.tasks=5 \
	-D mapred.job.map.capacity=10 \
	-D mapred.job.reduce.capacity=5 \
	-D mapred.job.name="join--sno_name-sno_courseno_grade" \
	-D num.key.fields.for.partition=1 \
	-D stream.num.map.output.key.fields=2 \
	-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
	-input "/hdfs/jointest/input/*" \
	-output "/hdfs/jointest/output" \
	-mapper "python26/bin/python26.sh mapper.py" \
	-reducer "python26/bin/python26.sh reducer.py" \
	-file "mapper.py" \
	-file "reducer.py" \
	-cacheArchive "/share/python26.tar.gz#python26"

#看看运行成功没,若输出0则表示成功了
echo $?

可以自己手工构造输入输出数据进行测试,本程序是验证过的。

更多需要注意的地方

hadoop的join操作可以分为很多类型,各种类型脚本的编写有所不同,其分类是按照key字段数目、value字段数目、key是否可重复来划分的,以下是一个个人总结的对照表,表示会影响的地方:

影响类型影响的范围
key字段数目1、启动脚本中num.key.fields.for.partition的配置2、启动脚本中stream.num.map.output.key.fields的配置

3、map和reduce脚本中key的获取

4、map和reduce脚本中每一条数据和上一条数据比较的方法key是否可重复如果数据源1可重复,标记为M;数据源2可重复标记为N,那么join可以分为:1*1、M*1、M*N类型

1*1类型:reduce中先记录第一个value,然后在下一条直接合并输出;

M*1类型:将类型1作为标记小的输出,然后每次遇见label=1就记录value,每遇见一次label=2就输出一次最终结果;

M*N类型:遇见类型1,就用数组记录value值,遇见label=2就将将记录的数组值全部连同该行value输出。value字段数目影响每次label=1时记录的数据个数,需要将value都记录下来

原文链接 转载须注明!