博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop源代码分析(四零)
阅读量:6625 次
发布时间:2019-06-25

本文共 3091 字,大约阅读时间需要 10 分钟。

  hot3.png

有了上面的基础,我们可以来解剖DFSOutputStream了。先看构造函数:

    privateDFSOutputStream(String src, longblockSize, Progressable progress,

        intbytesPerChecksum) throws IOException

    DFSOutputStream(String src, FsPermissionmasked, boolean overwrite,

        shortreplication, long blockSize,Progressable progress,

        intbuffersize, intbytesPerChecksum) throwsIOException

    DFSOutputStream(String src, intbuffersize, Progressable progress,

        LocatedBlock lastBlock, FileStatusstat,

        intbytesPerChecksum) throwsIOException {

这些构造函数的参数主要有:文件名src;进度回调函数progress(预留接口,目前未使用);数据块大小blockSize;Block副本数replication;每个校验chunk的大小bytesPerChecksum;文件权限masked;是否覆盖原文件标记overwrite;文件状态信息stat;文件的最后一个Block信息lastBlock;buffersize(?未见引用)。

后面两个构造函数会调用第一个构造函数,这个函数会调用父类的构造函数,并设置对象的src,blockSize,progress和checksum属性。
第二个构造函数会调用namenode.create方法,在文件空间中建立文件,并启动DataStreamer,它被DFSClient的create方法调用。第三个构造函数被DFSClient的append方法调用,显然,这种情况比价复杂,文件拥有一些数据块,添加数据往往添加在最后的数据块上。同时,append方法调用时,Client已经知道了最后一个Block的信息和文件的一些信息,如FileStatus中包含的Block大小,文件权限位等等。结合这些信息,构造函数需要计算并设置一些对象成员变量的值,并试图从可能的错误中恢复(调用processDatanodeError),最后启动DataStreamer。
我们先看正常流程,前面已经分析过,通过FSOutputSummer,HDFS客户端能将流转换成package,这个包是通过writeChunk,发送出去的,下面是它们的调用关系。
11153346_TR57.jpg 
在检查完一系列的状态以后,writeChunk先等待,直到dataQueue中未发送的包小于门限值。如果现在没有可用的Packet对象,则创建一个Packet对象,往Packet中写数据,包括校验值和数据。如果数据包被写满,那么,将它放入发送队列dataQueue中。writeChunk的过程比较简单,这里的写入,也只是把数据写到本地队列,等待DataStreamer发送,没有实际写到DataNode上。
createBlockOutputStream用于建立到第一个DataNode的连接,它的声明如下:

private booleancreateBlockOutputStream(DatanodeInfo[] nodes, String client,

                    booleanrecoveryFlag)

nodes是所有接收数据的DataNode列表,client就是客户端名称,recoveryFlag指示是否是为错误恢复建立的连接。createBlockOutputStream很简单,打开到第一个DataNode的连接,然后发送下面格式的数据包,并等待来自DataNode的Ack。如果出错,记录出错的DataNode在nodes中的位置,设置errorIndex并返回false。

11153347_Svkg.jpg 
当recoveryFlag指示为真时,意味着这次写是一次恢复操作,对于DataNode来说,这意味着为写准备的临时文件(在tmp目录中)可能已经存在,需要进行一些特殊处理,具体请看FSDataset的实现。
当Client写数据需要一个新的Block的时候,可以调用nextBlockOutputStream方法。

    privateDatanodeInfo[] nextBlockOutputStream(String client) throwsIOException

这个方法的实现很简单,首先调用locateFollowingBlock(包含了重试和出错处理),通过namenode.addBlock获取一个新的数据块,返回的是DatanodeInfo列表,有了这个列表,就可以建立写数据的pipe了。下一个大动作就是调用上面的createBlockOutputStream,建立到DataNode的连接了。

有了上面的准备,我们来分析processDatanodeError,它的主要流程是:

l          参数检查;

l          关闭可能还打开着的blockStream和blockReplyStream;

l          将未收到应答的数据块(在ackQueue中)挪到dataQueue中;

l          循环执行:

1.      计算目前还活着的DataNode列表;

2.      选择一个主DataNode,通过DataNode RPC的recoverBlock方法启动它上面的恢复过程;

3.      处理可能的出错;

4.      处理恢复后Block可能的变化(如Stamp变化);

5.      调用createBlockOutputStream到DataNode的连接。

l          启动ResponseProcessor。

这个过程涉及了DataNode上的recoverBlock方法和createBlockOutputStream中可能的Block恢复,是一个相当耗资源的方法,当系统出错的概率比较小,而且数据块上能恢复的数据很多(平均32M),还是值得这样做的。

写的流程就分析到着,接下来我们来看流的关闭,这个过程也涉及了一系列的方法,它们的调用关系如下:
11153347_vZ4E.jpg 
flushInternal会一直等待到发送队列(包括可能的currentPacket)和应答队列都为空,这意味着数据都被DataNode顺利接收。
sync作用和UNIX的sync类似,将写入数据持久化。它首先调用父类的flushBuffer方法,将可能还没拷贝到DFSOutputStream的数据拷贝回来,然后调用flushInternal,等待所有的数据都写完。然后调用namenode.fsync,持久化命名空间上的数据。
closeInternal比较复杂一点,它首先调用父类的flushBuffer方法,将可能还没拷贝到DFSOutputStream的数据拷贝回来,然后调用flushInternal,等待所有的数据都写完。接着结束两个工作线程,关闭socket,最后调用amenode.complete,通知NameNode结束一次写操作。close方法先调用closeInternal,然后再本地的leasechecker中移除对应的信息。

更多精彩内容请关注:

关注超人学院微信二维码:153133_dusP_2273204.jpg

转载于:https://my.oschina.net/crxy/blog/465596

你可能感兴趣的文章
【BZOJ】3832: [Poi2014]Rally
查看>>
[转]看懂ExtJS的API
查看>>
推荐15款制作 SVG 动画的 JavaScript 库
查看>>
转:OpenResty最佳实践(推荐了解lua语法)
查看>>
转:CEO, CFO, CIO, CTO, CSO是什么
查看>>
andriod自定义视图
查看>>
linux下vim更改注释颜色
查看>>
在SSL / https下托管SignalR
查看>>
Using JRuby with Maven
查看>>
poj 3308 (最大流)
查看>>
Netty了解与小试
查看>>
醒醒吧少年,只用Cucumber不能帮助你BDD
查看>>
一名女程序员对iOS的想法
查看>>
西班牙现新型电费退款网络诈骗 侨胞需谨防上当
查看>>
ArrayList
查看>>
Angular学习笔记(一) - 之安装教程
查看>>
Spring Websocket实现文本、图片、声音、文件下载及推送、接收及显示(集群模式)...
查看>>
最严新规发布 网络短视频平台该如何降低违规风险? ...
查看>>
云服务器ECS出现速度变慢 以及突然断开怎么办?
查看>>
208亿背后的“秘密”
查看>>