有了上面的基础,我们可以来解剖DFSOutputStream了。先看构造函数:
privateDFSOutputStream(String src, longblockSize, Progressable progress,
intbytesPerChecksum) throws IOException
DFSOutputStream(String src, FsPermissionmasked, boolean overwrite,
shortreplication, long blockSize,Progressable progress,
intbuffersize, intbytesPerChecksum) throwsIOException
DFSOutputStream(String src, intbuffersize, Progressable progress,
LocatedBlock lastBlock, FileStatusstat,
intbytesPerChecksum) throwsIOException {
这些构造函数的参数主要有:文件名src;进度回调函数progress(预留接口,目前未使用);数据块大小blockSize;Block副本数replication;每个校验chunk的大小bytesPerChecksum;文件权限masked;是否覆盖原文件标记overwrite;文件状态信息stat;文件的最后一个Block信息lastBlock;buffersize(?未见引用)。
后面两个构造函数会调用第一个构造函数,这个函数会调用父类的构造函数,并设置对象的src,blockSize,progress和checksum属性。第二个构造函数会调用namenode.create方法,在文件空间中建立文件,并启动DataStreamer,它被DFSClient的create方法调用。第三个构造函数被DFSClient的append方法调用,显然,这种情况比价复杂,文件拥有一些数据块,添加数据往往添加在最后的数据块上。同时,append方法调用时,Client已经知道了最后一个Block的信息和文件的一些信息,如FileStatus中包含的Block大小,文件权限位等等。结合这些信息,构造函数需要计算并设置一些对象成员变量的值,并试图从可能的错误中恢复(调用processDatanodeError),最后启动DataStreamer。我们先看正常流程,前面已经分析过,通过FSOutputSummer,HDFS客户端能将流转换成package,这个包是通过writeChunk,发送出去的,下面是它们的调用关系。 在检查完一系列的状态以后,writeChunk先等待,直到dataQueue中未发送的包小于门限值。如果现在没有可用的Packet对象,则创建一个Packet对象,往Packet中写数据,包括校验值和数据。如果数据包被写满,那么,将它放入发送队列dataQueue中。writeChunk的过程比较简单,这里的写入,也只是把数据写到本地队列,等待DataStreamer发送,没有实际写到DataNode上。createBlockOutputStream用于建立到第一个DataNode的连接,它的声明如下:private booleancreateBlockOutputStream(DatanodeInfo[] nodes, String client,
booleanrecoveryFlag)
nodes是所有接收数据的DataNode列表,client就是客户端名称,recoveryFlag指示是否是为错误恢复建立的连接。createBlockOutputStream很简单,打开到第一个DataNode的连接,然后发送下面格式的数据包,并等待来自DataNode的Ack。如果出错,记录出错的DataNode在nodes中的位置,设置errorIndex并返回false。
当recoveryFlag指示为真时,意味着这次写是一次恢复操作,对于DataNode来说,这意味着为写准备的临时文件(在tmp目录中)可能已经存在,需要进行一些特殊处理,具体请看FSDataset的实现。当Client写数据需要一个新的Block的时候,可以调用nextBlockOutputStream方法。privateDatanodeInfo[] nextBlockOutputStream(String client) throwsIOException
这个方法的实现很简单,首先调用locateFollowingBlock(包含了重试和出错处理),通过namenode.addBlock获取一个新的数据块,返回的是DatanodeInfo列表,有了这个列表,就可以建立写数据的pipe了。下一个大动作就是调用上面的createBlockOutputStream,建立到DataNode的连接了。
有了上面的准备,我们来分析processDatanodeError,它的主要流程是:l 参数检查;
l 关闭可能还打开着的blockStream和blockReplyStream;
l 将未收到应答的数据块(在ackQueue中)挪到dataQueue中;
l 循环执行:
1. 计算目前还活着的DataNode列表;
2. 选择一个主DataNode,通过DataNode RPC的recoverBlock方法启动它上面的恢复过程;
3. 处理可能的出错;
4. 处理恢复后Block可能的变化(如Stamp变化);
5. 调用createBlockOutputStream到DataNode的连接。
l 启动ResponseProcessor。
这个过程涉及了DataNode上的recoverBlock方法和createBlockOutputStream中可能的Block恢复,是一个相当耗资源的方法,当系统出错的概率比较小,而且数据块上能恢复的数据很多(平均32M),还是值得这样做的。
写的流程就分析到着,接下来我们来看流的关闭,这个过程也涉及了一系列的方法,它们的调用关系如下: flushInternal会一直等待到发送队列(包括可能的currentPacket)和应答队列都为空,这意味着数据都被DataNode顺利接收。sync作用和UNIX的sync类似,将写入数据持久化。它首先调用父类的flushBuffer方法,将可能还没拷贝到DFSOutputStream的数据拷贝回来,然后调用flushInternal,等待所有的数据都写完。然后调用namenode.fsync,持久化命名空间上的数据。closeInternal比较复杂一点,它首先调用父类的flushBuffer方法,将可能还没拷贝到DFSOutputStream的数据拷贝回来,然后调用flushInternal,等待所有的数据都写完。接着结束两个工作线程,关闭socket,最后调用amenode.complete,通知NameNode结束一次写操作。close方法先调用closeInternal,然后再本地的leasechecker中移除对应的信息。更多精彩内容请关注:
关注超人学院微信二维码: