行业报告 AI展会 数据标注 标注供求
数据标注数据集
主页 > 数据挖掘 正文

hdfs,mapreduce相关流程总结

HDFS读写数据的过程

 

 

1)client向namenode发送请求,namenode从元数据中检查目标文件是否存在,上传路径路径是否合法

2)namenode返回是否可以上传文件,假设可以上传

3)client请求第一个 block该传输到哪些datanode服务器上

4)namenode返回可以上传的datanode 服务器dn1 dn2 dn4

5)client将于datanode中最近的一个datanode 建立RPC连接 ,形成pipeline 通道 , pipeline会连接所有将被上传的datanode机器,

6)、client开始往dn1上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,dn1收到一个packet就会传给dn2,dn2传给dn4

7)当一个block传输完成之后,client再次请求namenode上传第二个block的服务器。最终完成上传

 

 

1)client向namenode通信查询元数据,找到存放block文件的datanode

2)挑选一台datanode(就近原则,然后随机)服务器,请求建立socket流 利用FileChannel读取数据

3)datanode开始发送数据(从磁盘里面读取数据放入流,以packet为单位来做校验)

4)客户端以packet为单位接收,现在本地缓存,然后写入目标文件

mapreduce的执行流程 (大体流程)

 

 

1、 一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程

2、 maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:T

a) 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对

b) 将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存

c) 将缓存中的KV对按照K分区排序后不断溢写到磁盘文.件

3、 MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)

4、 Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储

mapTast 的 并行机制

一个job的map阶段并行度由客户端在提交job时决定。而客户端对应map阶段并行度的划分规则:

将文件划分为多个split 每个split 交给一个mapTast进行处理 一个split的大小默认为128MB

切片的形成是由FileInputFormat实现类的getSplits()方法完成,

如果input的文件非常的大,比如1TB,可以考虑将hdfs上的每个block size设大,比如设成256MB或者512MB

如果input的文件的文件非常小 ,且非常多 启动多个maptast明显会降低程序的效率 , 可以先将小文件合并成大文件 再交给mapreduce处理

FileInputFormat切片机制

1、切片定义在InputFormat类中的getSplit()方法

2、FileInputFormat中默认的切片机制:

简单地按照文件的内容长度进行切片 切片大小,默认等于block大小 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

比如待处理数据有两个文件:

file1.txt 320M
file2.txt 10M

经过FileInputFormat的切片机制运算后,形成的切片信息如下:

file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~320
file2.txt.split1-- 0~10M

3、FileInputFormat中切片的大小的参数配置

通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由这几个值来运算决定

minsize:默认值:1

配置参数:
mapreduce.input.fileinputformat.split.minsize

maxsize:默认值:Long.MAXValue

配置参数:mapreduce.input.fileinputformat.split.maxsize

blocksize
因此,默认情况下,切片大小=blocksize

maxsize(切片最大值):
参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值

minsize (切片最小值):
参数调的比blockSize大,则可以让切片变得比blocksize还大

选择并发数的影响因素:
运算节点的硬件配置 运算任务的类型:CPU密集型还是IO密集型 运算任务的数据量

ReduceTask并行度的决定

这个是由我们自己设置的

//默认值是1,手动设置为4

job.setNumReduceTasks(4);

如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜

注意: reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask

尽量不要运行太多的reduce task。对大多数job来说,最好rduce的个数最多和集群中的reduce持平,或者比集群的 reduce slots小。这个对于小集群而言,尤其重要。

mapreduce的shuffle机制(只能简单概述一下以统计统计单词为例)

shuffle 分为map端和reduce端

map端

当map函数通过context.write()开始输出数据数,outputCollector将数据以输出到输出到环形缓冲

环形缓冲区(kvbuffer) 默认大小为100 实际上是是一个 字节数组 ,其中包括了数据区,和索引区 当输出到环形缓冲区的数据达到阀值的时候,

默认为80% 系统就会开启一个线程将数据写入磁盘, 这个过程叫做spill (索引区的内容保存在内存, 也肯也写入到文件中)

在写入之前 , 通过调用Partitioner的getPartition(),计算出数据要分到那个reduce,将这个信息同数据一起保存kvbuffer, 并对数据的key进行了一个快速排序(QuickSort)

排序后的数据被写入到mapreduce.cluster.local.dir配置的目录中的其中一个,Spill文件名像sipll0.out,spill1.out等

最后, 多个spill小文件 ,会合并成一个大文件 (这里进行了一个合并排序, 分区),并最终形成一个唯一大文件 以及相对于的索引文件 建立索引文件是为了方便进行数据的查找,

 

 

Reducer端

Reducer端的shuffle主要包括三个阶段,copy、sort(merge)和reduce。

copy: Mrappmaster告知reduce ,mapTast的位置 , 每个Reducer只获取属于自己分区的数据

sort:这个阶段按照 ,在每个reduce对自己这个分区的数据 进行一个归并排序 ,

reduce: 合并后的文件作为输入传递给Reducer,Reducer针对每个key及其排序的数据调用reduce函数。产生的reduce输出一般写入到HDFS

 

微信公众号

声明:本站部分作品是由网友自主投稿和发布、编辑整理上传,对此类作品本站仅提供交流平台,转载的目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,不为其版权负责。如果您发现网站上有侵犯您的知识产权的作品,请与我们取得联系,我们会及时修改或删除。

网友评论:

发表评论
请自觉遵守互联网相关的政策法规,严禁发布色情、暴力、反动的言论。
评价:
表情:
用户名: 验证码:点击我更换图片
最新文章
SEM推广服务
热门文章
热点图文

Copyright©2005-2028 Sykv.com 可思数据 版权所有    京ICP备14056871号

关于我们   免责声明   广告合作   版权声明   联系我们   原创投稿   网站地图  

可思数据 数据标注

扫码入群
扫码关注

微信公众号

返回顶部