Hadoop IO

2022-02-28 09:26 更新
  1. 輸入文件從HDFS進行讀取.
  2. 輸出文件會存入本地磁盤.
  3. Reducer和Mapper間的網(wǎng)絡(luò)I/O,從Mapper節(jié)點得到Reducer的檢索文件.
  4. 使用Reducer實例從本地磁盤回讀數(shù)據(jù).
  5. Reducer輸出- 回傳到HDFS.

序列化

序列化是指將結(jié)構(gòu)化對象轉(zhuǎn)化為字節(jié)流以便在網(wǎng)絡(luò)上傳輸或?qū)懙酱疟P進行永久存儲的過程。反序列化是指將字節(jié)流轉(zhuǎn)回結(jié)構(gòu)化對象的逆過程。

序列化用于分布式數(shù)據(jù)處理的兩大領(lǐng)域:進程間通信和永久存儲

在Hadoop中,系統(tǒng)中多個節(jié)點進程間的通信是通過“遠程過程調(diào)用”(RPC)實現(xiàn)的。RPC協(xié)議將消息序列化成二進制流后發(fā)送到遠程節(jié)點,遠程節(jié)點接著將二進制流反序列化為原始消息。通常情況下,RPC序列化格式如下:

1.緊湊

緊湊格式能充分利用網(wǎng)絡(luò)帶寬(數(shù)據(jù)中心最稀缺的資源)

2.快速

進程間通信形成了分布式系統(tǒng)的骨架,所以需要盡量減少序列化和反序列化的性能開銷,這是基本的。

3.可擴展

為了滿足新的需求,協(xié)議不斷變化。所以在控制客戶端和服務(wù)期的過程中,需要直接引進相應(yīng)的協(xié)議。例如,需要能夠在方法調(diào)用的過程中增加新的參數(shù),并且新的服務(wù)器需要能夠接受來自老客戶端的老格式的消息(無新增的參數(shù))。

4.支持互操作

對于系統(tǒng)來說,希望能夠支持以不同語言寫的客戶端與服務(wù)器交互,所以需要設(shè)計一種特定的格式來滿足這一需求。


Writable 接口

Writable 接口定義了兩個方法:一個將其狀態(tài)寫入 DataOutput 二進制流,另一個從 DataInput二進制流讀取狀態(tài)。

BytesWritable

BytesWritable 是對二進制數(shù)據(jù)數(shù)組的封裝。它的序列化格式為一個指定所含數(shù)據(jù)字節(jié)數(shù)的整數(shù)域(4字節(jié)),后跟數(shù)據(jù)內(nèi)容的本身。例如,長度為2的字節(jié)數(shù)組包含數(shù)值3和5,序列化形式為一個4字節(jié)的整數(shù)(00000002)和該數(shù)組中的兩個字節(jié)(03和05)

NullWritable

NullWritable 是 writable 的特殊類型,它的序列化長度為0。它并不從數(shù)據(jù)流中讀取數(shù)據(jù),也不寫入數(shù)據(jù)。它充當占位符。

ObjectWritable和GenericWritable

ObjectWritable 是對 java 基本類型(String,enum,Writable,null或這些類型組成的數(shù)組)的一個通用封裝。它在 Hadoop RPC 中用于對方法的參數(shù)和返回類型進行封裝和解封裝。

Wriable集合類

io 軟件包共有6個 Writable 集合類,分別是 ArrayWritable,ArrayPrimitiveWritable,TwoDArrayWritable,MapWritable,SortedMapWritable以及EnumMapWritable

ArrayWritable 和 TwoDArrayWritable 是對 Writeble 的數(shù)組和兩維數(shù)組(數(shù)組的數(shù)組)的實現(xiàn)。ArrayWritable 或 TwoDArrayWritable 中所有元素必須是同一類的實例。ArrayWritable 和 TwoDArrayWritable 都有g(shù)et() ,set() 和 toArray()方法,toArray() 方法用于新建該數(shù)組的一個淺拷貝。 

ArrayPrimitiveWritable 是對 Java 基本數(shù)組類型的一個封裝。調(diào)用 set() 方法時,可以識別相應(yīng)組件類型,因而無需通過繼承該類來設(shè)置類型。

序列化框架

盡管大多數(shù) Mapreduce 程序使用的都是 Writable 類型的鍵和值,但這并不是 MapReduce API 強制要求使用的。事實上,可以使用任何類型,只要能有一個機制對每個類型進行類型與二進制表示的來回轉(zhuǎn)換就可以。

為了支持這個機制,Hadoop 有一個針對可替換序列化框架的 API 。序列化框架用一個 Serialization 實現(xiàn)來表示。Serialization 對象定義了從類型到 Serializer 實例(將對象轉(zhuǎn)換為字節(jié)流)和 Deserializer 實例(將字節(jié)流轉(zhuǎn)換為對象)的映射方式。

序列化IDL

還有許多其他序列化框架從不同的角度來解決問題:不通過代碼來定義類型,而是使用接口定義語言以不依賴與具體語言的方式進行聲明。由此,系統(tǒng)能夠為其他語言生成模型,這種形式能有效提高互操作能力。它們一般還會定義版本控制方案。

兩個比較流行的序列化框架 Apache Thrift 和Google的Protocol Buffers,常常用作二進制數(shù)據(jù)的永久存儲格式。Mapreduce 格式對該類的支持有限,但在 Hadoop 內(nèi)部,部分組件仍使用上述兩個序列化框架來實現(xiàn) RPC 和數(shù)據(jù)交換。

基于文件的數(shù)據(jù)結(jié)構(gòu)

對于某些應(yīng)用,我們需要一種特殊的數(shù)據(jù)結(jié)構(gòu)來存儲自己的數(shù)據(jù)。對于基于 Mapreduce 的數(shù)據(jù)處理,將每個二進制數(shù)據(jù)大對象單獨放在各自的文件中不能實現(xiàn)可擴展性,所以 Hadoop 為此開發(fā)了很多更高層次的容器。

關(guān)于 SequenceFile 。

考慮日志文件,其中每一行文本代表一條日志記錄。純文本不適合記錄二進制類型的數(shù)據(jù)。在這種情況下,Hadoop 的 SequenceFile 類非常合適,為二進制鍵值對提供了一個持久數(shù)據(jù)結(jié)構(gòu)。將它作為日志文件的存儲格式時,你可以自己選擇鍵,以及值可以是 Writable 類型。

SequenceFile 也可以作為小文件的容器。HDFS和Mapreduce 是針對大文件優(yōu)化的,所以通過 SequenceFile 類型將小文件包裝起來,可以獲得更高效率的存儲和處理。

SequnceFile的寫操作

通過 createWriter()靜態(tài)方法可以創(chuàng)建 SequenceFile 對象,并返回 SequnceFile.Writer 實例。該靜態(tài)方法有多個重載版本,但都需要制定待寫入的數(shù)據(jù)流,Configuration 對象,以及鍵和值的類型。存儲在 SequenceFIle 中的鍵和值并不一定是 Writable 類型。只要能夠被 Sertialization 序列化和反序列化,任何類型都可以。

SequenceFile的讀操作

從頭到尾讀取順序文件不外乎創(chuàng)建 SequenceFile.reader 實例后反復(fù)調(diào)用 next() 方法迭代讀取記錄。讀取的是哪條記錄與使用的序列化框架有關(guān)。如果使用的是 Writable 類型,那么通過鍵和值作為參數(shù)的 next() 方法可以將數(shù)據(jù)流的下一條鍵值對讀入變量中。 

1.通過命令行接口顯示 SequenceFile。

hadoop fs 命令有一個 -text 選項可以以文本形式顯示順序文件。該選項可以查看文件的代碼,由此檢測出文件的類型并將其轉(zhuǎn)換為相應(yīng)的文本。該選項可以識別 gzip 壓縮文件,順序文件和 Avro 數(shù)據(jù)文件;否則,假設(shè)輸入為純文本文件。

2. SequenceFile 的排序和合并。

 Mapreduce 是對多個順序文件進行排序(或合并)最有效的方法。Mapreduce 本身是并行的,并且可由你制定使用多少個 reducer 。例如,通過制定一個 reducer ,可以得到一個輸出文件。

3.SequenceFile 的格式。

順序文件由文件頭和隨后的一條或多條記錄組成。順序文件的前三個字節(jié)為 SEQ,緊隨其后的一個字節(jié)表示順序文件的版本號。文件頭還包括其他字段,例如鍵和值的名稱,數(shù)據(jù)壓縮細節(jié),用戶定義的元數(shù)據(jù)以及同步標識。同步標識用于在讀取文件時能夠從任意位置開始識別記錄邊界。每個文件都有一個隨機生成的同步標識,其值存儲在文件頭中,位于順序文件中的記錄與記錄之間。同步標識的額外存儲開銷要求小于1%,所以沒有必要在每條記錄末尾添加該標識。

關(guān)于MapFile

MapFile 是已經(jīng)排過序的 SequenceFile ,它有索引,所以可以按鍵查找。索引本身就是一個 SequenceFile ,包含 map 中的一小部分鍵。由于索引能夠加載進內(nèi)存,因此可以提供對主數(shù)據(jù)文件的快速查找。主數(shù)據(jù)文件則是另一個 SequenceFIle ,包含了所有的 map 條目,這些條目都按照鍵順序進行了排序。

其他文件格式和面向列的格式

順序文件和 map 文件是 Hadoop 中最早的,但并不是僅有的二進制文件格式,事實上,對于新項目而言,有更好的二進制格式可供選擇。

Avro 數(shù)據(jù)文件在某些方面類似順序文件,是面向大規(guī)模數(shù)據(jù)處理而設(shè)計的。但是 Avro 數(shù)據(jù)文件又是可移植的,它們可以跨越不同的編程語言使用。順序文件,map 文件和 Avro 數(shù)據(jù)文件都是面向行的格式,意味著每一行的值在文件中是連續(xù)存儲的。在面向列的格式中,文件中的行被分割成行的分片,然后每個分片以面向列的形式存儲:首先存儲每行第一列的值,然后是每行第2列的值,如此以往。

壓縮

能夠減少磁盤的占用空間和網(wǎng)絡(luò)傳輸?shù)牧?,并加速?shù)據(jù)在網(wǎng)絡(luò)和磁盤上的傳輸。


Hadoop 應(yīng)用處理的數(shù)據(jù)集非常大,因此需要借助于壓縮。使用哪種壓縮格式與待處理的文件的大小,格式和所用的工具有關(guān)。比較各種壓縮算法的壓縮比和性能(從高到低):

1. 使用容器文件格式,例如順序文件, Avro 數(shù)據(jù)文件。 ORCF 了說 Parquet 文件

2. 使用支持切分的壓縮格式,例如 bzip2 或者通過索引實現(xiàn)切分的壓縮格式,例子如LZO。

3. 在應(yīng)用中將文件中切分成塊,并使用任意一種他所格式為每個數(shù)據(jù)塊建立壓縮文件(不論它是否支持切分)。在這種情況下,需要合理選擇數(shù)據(jù)大小,以確保壓縮后的數(shù)據(jù)塊的大小近似于HDFS塊的大小。

4. 存儲未經(jīng)壓縮的文件。


重點:壓縮和拆分一般是沖突的(壓縮后的文件的 block 是不能很好地拆分獨立運行,很多時候某個文件的拆分點是被拆分到兩個壓縮文件中,這時 Map 任務(wù)就無法處理,所以對于這些壓縮,Hadoop 往往是直接使用一個 Map 任務(wù)處理整個文件的分析)。對大文件不可使用不支持切分整個文件的壓縮格式,會失去數(shù)據(jù)的特性,從而造成 Mapreduce 應(yīng)用效率低下。

Map 的輸出結(jié)果也可以進行壓縮,這樣可以減少 Map 結(jié)果到 Reduce 的傳輸?shù)臄?shù)據(jù)量,加快傳輸速率。


在 Mapreduce 中使用壓縮

FileOutputFormat.setCompressOutput(job,true);
FileOutputFormat.setOutputCompressorClass(job,GzipCodec.class);

如果輸出生成順序文件,可以設(shè)置 mapreduce.output.fileoutputformat.compress.type 屬性來控制限制使用壓縮格式。默認值是RECORD,即針對每條記錄進行壓縮。如果將其改為BLOCK,將針對一組記錄進行壓縮,這是推薦的壓縮策略,因為它的壓縮效率更高。

完整性

  • 檢測數(shù)據(jù)是否損壞的常見措施是,在數(shù)據(jù)第一次引入系統(tǒng)時計算校驗和并在數(shù)據(jù)通過一個不可靠的通道進行傳輸時再次計算校驗和,這樣就能發(fā)現(xiàn)數(shù)據(jù)是否損壞,如果計算所得的新校驗和和原來的校驗和不匹配,我們就認為數(shù)據(jù)已損壞。但該技術(shù)并不能修復(fù)數(shù)據(jù)。常見的錯誤檢測碼是 CRC-32(32位循環(huán)冗余檢驗),任何大小的數(shù)據(jù)輸入均計算得到一個32位的整數(shù)校驗和。
  • datanode 負責在收到數(shù)據(jù)后存儲該數(shù)據(jù)及其校驗和之前對數(shù)據(jù)進行驗證。它在收到客戶端的數(shù)據(jù)或復(fù)制其他 datanode 的數(shù)據(jù)時執(zhí)行這個操作。正在寫數(shù)據(jù)的客戶端將數(shù)據(jù)及其校驗和發(fā)送到由一系列 datanode 組成的管線,管線中最后一個 datanode 負責驗證校驗和。如果 datanode 檢測到錯誤,客戶端就會收到一個 IOException 異常的子類。
  • 客戶端從 datanode 讀取數(shù)據(jù)時,也會驗證校驗和,將它們與 datanode 中存儲的校驗和進行比較。每個datanode均持久保存有一個驗證的校驗和日志,所以它知道每個數(shù)據(jù)塊的最后一次驗證時間??蛻舳顺晒︱炞C一個數(shù)據(jù)塊后,會告訴這個 datanode , datanode 由此更新日志。保存這些統(tǒng)計信息對于檢測損壞的磁盤很有價值。
  • 不只是客戶端在讀取數(shù)據(jù)塊時會驗證校驗和,每個 datanode 也會在一個后臺線程中運行一個 DataBlockScanner ,從而定期驗證存儲在這個 datanode 上的所有數(shù)據(jù)塊。該項措施是解決物理存儲媒體上位損壞的有力措施。
  • 由于 HDFS 存儲著每個數(shù)據(jù)塊的復(fù)本,因此它可以通過數(shù)據(jù)復(fù)本來修復(fù)損壞的數(shù)據(jù)塊,進而得到一個新的,完好無損的復(fù)本?;舅悸肥?,客戶端在讀取數(shù)據(jù)塊時,如果檢測到錯誤,首先向 namenode 報告已損壞的數(shù)據(jù)塊及其正在嘗試讀取操作的這個 datanode ,再拋出 ChecksumException 異常。namenode 將這個數(shù)據(jù)塊復(fù)本標記為已損壞,這樣它不再將客戶端處理請求直接發(fā)送到這個節(jié)點,或嘗試將這個復(fù)本復(fù)制到另一個 datanode 。之后,它安排這個數(shù)據(jù)塊的一個復(fù)本復(fù)制到另一個 datanode ,這樣一來,數(shù)據(jù)塊的復(fù)本因子又回到期望水平。此后,已損壞的數(shù)據(jù)塊復(fù)本便被刪除。
  • Hadoop的LocalFileSystem 執(zhí)行客戶端的校驗和驗證。這意味著在你寫入一個名為 filename 的文件時,文件系統(tǒng)客戶端會明確在包含每個文件快校驗和的同一個目錄內(nèi)新建一個 filename.crc 隱藏文件。文件塊的大小作為元數(shù)據(jù)存儲在.crc文件中,所以即使文件塊大小的設(shè)置已經(jīng)發(fā)生變化,仍然可以正確讀回文件。在讀取文件時需要驗證校驗和,并且如果檢測到錯誤,LocalFileSystem 還會拋出一個 ChecksumException 異常。



以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號