MapReduce环形缓冲区底层实现
时间:2023-09-06 10:07:02
实现环形缓冲区底层
首先,了解改革过程的发生Map——Collect阶段:用户编写map()
在函数中,当数据处理完成后,通常会调用OutputCollector.collect()
输出结果。它将在函数内生成key/value分片(通过调用Partitioner),并写入环形内存缓冲区。
MapOutputBuffer
缓冲区暂时存储用户输出数据,当缓冲区利用率达到一定阈值时,将缓冲区数据写在磁盘上。
数据缓冲区的设计方法直接影响Map Task写作效率,现有的数据结构可供选择,最简单的是单向缓冲区,生产者单向写入输出到缓冲区,当缓冲区满时,一次写在磁盘上,所以,不断写缓冲区,直到所有数据都写在磁盘上。单向缓冲区最大的问题是性能不高,不支持同时读写数据。
双缓冲区是对单向缓冲区的改进。它使用两个缓冲区,一个用于写入数据,另一个用于写入数据 写在磁盘上,使两个缓冲区交替读写,从而提高效率。事实上,双缓冲区只能在一定程度上平行读写,读写等待问题仍然存在。
一种更好的缓冲区设计方法是使用环形缓冲区:当缓冲区的利用率达到一定阈值时,开始将数据写入磁盘同时,生产者仍然可以将数据循环到不断增加的剩余空间中,以实现真正的读写并行。
底层是字节数组:数组前记录KV记录数组后面的索引位置KV数据。首尾连接形成一个环形缓冲区,中间是赤道。用于数据spll溢出处理。
其中,MapOutputBuffer的collect
方法和MapOutputBuffer.Buffer
的write
方法是生 产者,spillThread
线程是消费者,他们同步是通过可重入的互斥锁spillLock
和spillLock
上述两个条件变量(spillDone
和spillReady
)完成 的.生产者的主要代码如下
///获得下一个可写入位置 spillLock.lock(); if(缓冲区利用率达到阈值){
//唤醒SpillThread将缓冲区数据写入磁盘 spillReady.signal(); } if(缓冲区满){
//等待SpillThread线程结束 spillDone.wait(); } spillLock.lock(); ///将数据写入缓冲区
MapOutputBuffer
内部采用两级索引结构,涉及三个环形内存缓冲区kvoffsets
、kvindices
和kvbuffer
,这三个缓冲 内存空间的总大小io.sort.mb
(默认是100 MB)。
kvoffsets用于保存偏移量索引数组key/value
位置索引信息kvindices
中等偏移。考虑到一对key/value
需占用数组kvoffsets
的1个 int
(整形)大小,数组kvindices
的3个int大小(分别保存partition号、key开始位置和value所以Hadoop按比例1:3将大小分成比例 ${io.sort.record.percent}*${io.sort.mb}
内存空间分配给数组kvoffsets
和kvindices
,
该过 程由指针kvstart/kvend/kvindex控制,其中kvstart内存段的初始位置表示数据,kvindex表示未存储数据的内存段初始位置,而在正常写 入情况下,kvend=kvstart,一旦满足溢写条件,kvend=kvindex,此时指针区间[kvstart, kvend)有效的数据范围。
kvindices用于保存位置索引数组key/value值在数据缓冲区kvbuffer中间的起始位置。
kvbuffer即数据缓冲区,用于保存实际情况key/value默认情况下最多可以使用值io.sort.mb
当缓冲区利用率超过95%时 io.sort.spill.percent
(默认80%)之后,会触发线程SpillThread将数据写入磁盘。
您可以参考更多详细信息《Hadoop技术内幕:深入分析MapReduce结构设计与实现原则 》
点击此处扫描二维码(或微信搜索) :孙中明) 回复关键字 3006 获取相关书籍等