佳木斯湛栽影视文化发展公司

主頁 > 知識庫 > 美團DB數(shù)據(jù)同步到數(shù)據(jù)倉庫的架構(gòu)與實踐

美團DB數(shù)據(jù)同步到數(shù)據(jù)倉庫的架構(gòu)與實踐

熱門標簽:鐵路電話系統(tǒng) 網(wǎng)站排名優(yōu)化 Linux服務器 服務外包 AI電銷 呼叫中心市場需求 地方門戶網(wǎng)站 百度競價排名

背景

在數(shù)據(jù)倉庫建模中,未經(jīng)任何加工處理的原始業(yè)務層數(shù)據(jù),我們稱之為ODS(Operational Data Store)數(shù)據(jù)。在互聯(lián)網(wǎng)企業(yè)中,常見的ODS數(shù)據(jù)有業(yè)務日志數(shù)據(jù)(Log)和業(yè)務DB數(shù)據(jù)(DB)兩類。對于業(yè)務DB數(shù)據(jù)來說,從MySQL等關(guān)系型數(shù)據(jù)庫的業(yè)務數(shù)據(jù)進行采集,然后導入到Hive中,是進行數(shù)據(jù)倉庫生產(chǎn)的重要環(huán)節(jié)。

如何準確、高效地把MySQL數(shù)據(jù)同步到Hive中?一般常用的解決方案是批量取數(shù)并Load:直連MySQL去Select表中的數(shù)據(jù),然后存到本地文件作為中間存儲,最后把文件Load到Hive表中。這種方案的優(yōu)點是實現(xiàn)簡單,但是隨著業(yè)務的發(fā)展,缺點也逐漸暴露出來:

  • 性能瓶頸:隨著業(yè)務規(guī)模的增長,Select From MySQL -> Save to Localfile -> Load to Hive這種數(shù)據(jù)流花費的時間越來越長,無法滿足下游數(shù)倉生產(chǎn)的時間要求。
  • 直接從MySQL中Select大量數(shù)據(jù),對MySQL的影響非常大,容易造成慢查詢,影響業(yè)務線上的正常服務。
  • 由于Hive本身的語法不支持更新、刪除等SQL原語,對于MySQL中發(fā)生Update/Delete的數(shù)據(jù)無法很好地進行支持。

為了徹底解決這些問題,我們逐步轉(zhuǎn)向CDC (Change Data Capture) + Merge的技術(shù)方案,即實時Binlog采集 + 離線處理Binlog還原業(yè)務數(shù)據(jù)這樣一套解決方案。Binlog是MySQL的二進制日志,記錄了MySQL中發(fā)生的所有數(shù)據(jù)變更,MySQL集群自身的主從同步就是基于Binlog做的。

本文主要從Binlog實時采集和離線處理Binlog還原業(yè)務數(shù)據(jù)兩個方面,來介紹如何實現(xiàn)DB數(shù)據(jù)準確、高效地進入數(shù)倉。

整體架構(gòu)

整體的架構(gòu)如上圖所示。在Binlog實時采集方面,我們采用了阿里巴巴的開源項目Canal,負責從MySQL實時拉取Binlog并完成適當解析。Binlog采集后會暫存到Kafka上供下游消費。整體實時采集部分如圖中紅色箭頭所示。

離線處理Binlog的部分,如圖中黑色箭頭所示,通過下面的步驟在Hive上還原一張MySQL表:

  • 采用Linkedin的開源項目Camus,負責每小時把Kafka上的Binlog數(shù)據(jù)拉取到Hive上。
  • 對每張ODS表,首先需要一次性制作快照(Snapshot),把MySQL里的存量數(shù)據(jù)讀取到Hive上,這一過程底層采用直連MySQL去Select數(shù)據(jù)的方式。
  • 對每張ODS表,每天基于存量數(shù)據(jù)和當天增量產(chǎn)生的Binlog做Merge,從而還原出業(yè)務數(shù)據(jù)。

我們回過頭來看看,背景中介紹的批量取數(shù)并Load方案遇到的各種問題,為什么用這種方案能解決上面的問題呢?

  • 首先,Binlog是流式產(chǎn)生的,通過對Binlog的實時采集,把部分數(shù)據(jù)處理需求由每天一次的批處理分攤到實時流上。無論從性能上還是對MySQL的訪問壓力上,都會有明顯地改善。
  • 第二,Binlog本身記錄了數(shù)據(jù)變更的類型(Insert/Update/Delete),通過一些語義方面的處理,完全能夠做到精準的數(shù)據(jù)還原。

Binlog實時采集

對Binlog的實時采集包含兩個主要模塊:一是CanalManager,主要負責采集任務的分配、監(jiān)控報警、元數(shù)據(jù)管理以及和外部依賴系統(tǒng)的對接;二是真正執(zhí)行采集任務的Canal和CanalClient。

當用戶提交某個DB的Binlog采集請求時,CanalManager首先會調(diào)用DBA平臺的相關(guān)接口,獲取這一DB所在MySQL實例的相關(guān)信息,目的是從中選出最適合Binlog采集的機器。然后把采集實例(Canal Instance)分發(fā)到合適的Canal服務器上,即CanalServer上。在選擇具體的CanalServer時,CanalManager會考慮負載均衡、跨機房傳輸?shù)纫蛩兀瑑?yōu)先選擇負載較低且同地域傳輸?shù)臋C器。

CanalServer收到采集請求后,會在ZooKeeper上對收集信息進行注冊。注冊的內(nèi)容包括:

  • 以Instance名稱命名的永久節(jié)點。
  • 在該永久節(jié)點下注冊以自身ip:port命名的臨時節(jié)點。

這樣做的目的有兩個:

  • 高可用:CanalManager對Instance進行分發(fā)時,會選擇兩臺CanalServer,一臺是Running節(jié)點,另一臺作為Standby節(jié)點。Standby節(jié)點會對該Instance進行監(jiān)聽,當Running節(jié)點出現(xiàn)故障后,臨時節(jié)點消失,然后Standby節(jié)點進行搶占。這樣就達到了容災的目的。
  • 與CanalClient交互:CanalClient檢測到自己負責的Instance所在的Running CanalServer后,便會進行連接,從而接收到CanalServer發(fā)來的Binlog數(shù)據(jù)。

對Binlog的訂閱以MySQL的DB為粒度,一個DB的Binlog對應了一個Kafka Topic。底層實現(xiàn)時,一個MySQL實例下所有訂閱的DB,都由同一個Canal Instance進行處理。這是因為Binlog的產(chǎn)生是以MySQL實例為粒度的。CanalServer會拋棄掉未訂閱的Binlog數(shù)據(jù),然后CanalClient將接收到的Binlog按DB粒度分發(fā)到Kafka上。

離線還原MySQL數(shù)據(jù)

完成Binlog采集后,下一步就是利用Binlog來還原業(yè)務數(shù)據(jù)。首先要解決的第一個問題是把Binlog從Kafka同步到Hive上。

Kafka2Hive

整個Kafka2Hive任務的管理,在美團數(shù)據(jù)平臺的ETL框架下進行,包括任務原語的表達和調(diào)度機制等,都同其他ETL類似。而底層采用LinkedIn的開源項目Camus,并進行了有針對性的二次開發(fā),來完成真正的Kafka2Hive數(shù)據(jù)傳輸工作。

對Camus的二次開發(fā)

Kafka上存儲的Binlog未帶Schema,而Hive表必須有Schema,并且其分區(qū)、字段等的設計,都要便于下游的高效消費。對Camus做的第一個改造,便是將Kafka上的Binlog解析成符合目標Schema的格式。

對Camus做的第二個改造,由美團的ETL框架所決定。在我們的任務調(diào)度系統(tǒng)中,目前只對同調(diào)度隊列的任務做上下游依賴關(guān)系的解析,跨調(diào)度隊列是不能建立依賴關(guān)系的。而在MySQL2Hive的整個流程中,Kafka2Hive的任務需要每小時執(zhí)行一次(小時隊列),Merge任務每天執(zhí)行一次(天隊列)。而Merge任務的啟動必須要嚴格依賴小時Kafka2Hive任務的完成。

為了解決這一問題,我們引入了Checkdone任務。Checkdone任務是天任務,主要負責檢測前一天的Kafka2Hive是否成功完成。如果成功完成了,則Checkdone任務執(zhí)行成功,這樣下游的Merge任務就可以正確啟動了。

Checkdone的檢測邏輯

Checkdone是怎樣檢測的呢?每個Kafka2Hive任務成功完成數(shù)據(jù)傳輸后,由Camus負責在相應的HDFS目錄下記錄該任務的啟動時間。Checkdone會掃描前一天的所有時間戳,如果最大的時間戳已經(jīng)超過了0點,就說明前一天的Kafka2Hive任務都成功完成了,這樣Checkdone就完成了檢測。

此外,由于Camus本身只是完成了讀Kafka然后寫HDFS文件的過程,還必須完成對Hive分區(qū)的加載才能使下游查詢到。因此,整個Kafka2Hive任務的最后一步是加載Hive分區(qū)。這樣,整個任務才算成功執(zhí)行。

每個Kafka2Hive任務負責讀取一個特定的Topic,把Binlog數(shù)據(jù)寫入original_binlog庫下的一張表中,即前面圖中的original_binlog.db,其中存儲的是對應到一個MySQL DB的全部Binlog。

上圖說明了一個Kafka2Hive完成后,文件在HDFS上的目錄結(jié)構(gòu)。假如一個MySQL DB叫做user,對應的Binlog存儲在original_binlog.user表中。ready目錄中,按天存儲了當天所有成功執(zhí)行的Kafka2Hive任務的啟動時間,供Checkdone使用。每張表的Binlog,被組織到一個分區(qū)中,例如userinfo表的Binlog,存儲在table_name=userinfo這一分區(qū)中。每個table_name一級分區(qū)下,按dt組織二級分區(qū)。圖中的xxx.lzo和xxx.lzo.index文件,存儲的是經(jīng)過lzo壓縮的Binlog數(shù)據(jù)。

Merge

Binlog成功入倉后,下一步要做的就是基于Binlog對MySQL數(shù)據(jù)進行還原。Merge流程做了兩件事,首先把當天生成的Binlog數(shù)據(jù)存放到Delta表中,然后和已有的存量數(shù)據(jù)做一個基于主鍵的Merge。Delta表中的數(shù)據(jù)是當天的最新數(shù)據(jù),當一條數(shù)據(jù)在一天內(nèi)發(fā)生多次變更時,Delta表中只存儲最后一次變更后的數(shù)據(jù)。

把Delta數(shù)據(jù)和存量數(shù)據(jù)進行Merge的過程中,需要有唯一鍵來判定是否是同一條數(shù)據(jù)。如果同一條數(shù)據(jù)既出現(xiàn)在存量表中,又出現(xiàn)在Delta表中,說明這一條數(shù)據(jù)發(fā)生了更新,則選取Delta表的數(shù)據(jù)作為最終結(jié)果;否則說明沒有發(fā)生任何變動,保留原來存量表中的數(shù)據(jù)作為最終結(jié)果。Merge的結(jié)果數(shù)據(jù)會Insert Overwrite到原表中,即圖中的origindb.table。

Merge流程舉例

下面用一個例子來具體說明Merge的流程。

數(shù)據(jù)表共id、value兩列,其中id是主鍵。在提取Delta數(shù)據(jù)時,對同一條數(shù)據(jù)的多次更新,只選擇最后更新的一條。所以對id=1的數(shù)據(jù),Delta表中記錄最后一條更新后的值value=120。Delta數(shù)據(jù)和存量數(shù)據(jù)做Merge后,最終結(jié)果中,新插入一條數(shù)據(jù)(id=4),兩條數(shù)據(jù)發(fā)生了更新(id=1和id=2),一條數(shù)據(jù)未變(id=3)。

默認情況下,我們采用MySQL表的主鍵作為這一判重的唯一鍵,業(yè)務也可以根據(jù)實際情況配置不同于MySQL的唯一鍵。

上面介紹了基于Binlog的數(shù)據(jù)采集和ODS數(shù)據(jù)還原的整體架構(gòu)。下面主要從兩個方面介紹我們解決的實際業(yè)務問題。

實踐一:分庫分表的支持

隨著業(yè)務規(guī)模的擴大,MySQL的分庫分表情況越來越多,很多業(yè)務的分表數(shù)目都在幾千個這樣的量級。而一般數(shù)據(jù)開發(fā)同學需要把這些數(shù)據(jù)聚合到一起進行分析。如果對每個分表都進行手動同步,再在Hive上進行聚合,這個成本很難被我們接受。因此,我們需要在ODS層就完成分表的聚合。

首先,在Binlog實時采集時,我們支持把不同DB的Binlog寫入到同一個Kafka Topic。用戶可以在申請Binlog采集時,同時勾選同一個業(yè)務邏輯下的多個物理DB。通過在Binlog采集層的匯集,所有分庫的Binlog會寫入到同一張Hive表中,這樣下游在進行Merge時,依然只需要讀取一張Hive表。

第二,Merge任務的配置支持正則匹配。通過配置符合業(yè)務分表命名規(guī)則的正則表達式,Merge任務就能了解自己需要聚合哪些MySQL表的Binlog,從而選取相應分區(qū)的數(shù)據(jù)來執(zhí)行。

這樣通過兩個層面的工作,就完成了分庫分表在ODS層的合并。

這里面有一個技術(shù)上的優(yōu)化,在進行Kafka2Hive時,我們按業(yè)務分表規(guī)則對表名進行了處理,把物理表名轉(zhuǎn)換成了邏輯表名。例如userinfo123這張表名會被轉(zhuǎn)換為userinfo,其Binlog數(shù)據(jù)存儲在original_binlog.user表的table_name=userinfo分區(qū)中。這樣做的目的是防止過多的HDFS小文件和Hive分區(qū)造成的底層壓力。

實踐二:刪除事件的支持

Delete操作在MySQL中非常常見,由于Hive不支持Delete,如果想把MySQL中刪除的數(shù)據(jù)在Hive中刪掉,需要采用“迂回”的方式進行。

對需要處理Delete事件的Merge流程,采用如下兩個步驟:

  • 首先,提取出發(fā)生了Delete事件的數(shù)據(jù),由于Binlog本身記錄了事件類型,這一步很容易做到。將存量數(shù)據(jù)(表A)與被刪掉的數(shù)據(jù)(表B)在主鍵上做左外連接(Left outer join),如果能夠全部join到雙方的數(shù)據(jù),說明該條數(shù)據(jù)被刪掉了。因此,選擇結(jié)果中表B對應的記錄為NULL的數(shù)據(jù),即是應當被保留的數(shù)據(jù)。
  • 然后,對上面得到的被保留下來的數(shù)據(jù),按照前面描述的流程做常規(guī)的Merge。

展望

作為數(shù)據(jù)倉庫生產(chǎn)的基礎(chǔ),美團數(shù)據(jù)平臺提供的基于Binlog的MySQL2Hive服務,基本覆蓋了美團內(nèi)部的各個業(yè)務線,目前已經(jīng)能夠滿足絕大部分業(yè)務的數(shù)據(jù)同步需求,實現(xiàn)DB數(shù)據(jù)準確、高效地入倉。在后面的發(fā)展中,我們會集中解決CanalManager的單點問題,并構(gòu)建跨機房容災的架構(gòu),從而更加穩(wěn)定地支撐業(yè)務的發(fā)展。

本文主要從Binlog流式采集和基于Binlog的ODS數(shù)據(jù)還原兩方面,介紹了這一服務的架構(gòu),并介紹了我們在實踐中遇到的一些典型問題和解決方案。希望能夠給其他開發(fā)者一些參考價值,同時也歡迎大家和我們一起交流。

總結(jié)

以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對腳本之家的支持。如果你想了解更多相關(guān)內(nèi)容請查看下面相關(guān)鏈接

您可能感興趣的文章:
  • MySQL 8忘記密碼的最佳處理方式淺析
  • MySQL主從延遲現(xiàn)象及原理分析詳解
  • Mysql主鍵和唯一鍵的區(qū)別點總結(jié)
  • MySQL按時間統(tǒng)計數(shù)據(jù)的方法總結(jié)
  • 實例講解MySQL中樂觀鎖和悲觀鎖
  • MySQL limit性能分析與優(yōu)化
  • mybatis學習之路mysql批量新增數(shù)據(jù)的方法
  • MySQL和Redis實現(xiàn)二級緩存的方法詳解
  • MySQL普通索引和唯一索引的深入講解
  • Docker創(chuàng)建MySQL的講解

標簽:蘭州 崇左 仙桃 湘潭 湖南 銅川 衡水 黃山

巨人網(wǎng)絡通訊聲明:本文標題《美團DB數(shù)據(jù)同步到數(shù)據(jù)倉庫的架構(gòu)與實踐》,本文關(guān)鍵詞  ;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請?zhí)峁┫嚓P(guān)信息告之我們,我們將及時溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡,涉及言論、版權(quán)與本站無關(guān)。
  • 相關(guān)文章
  • 收縮
    • 微信客服
    • 微信二維碼
    • 電話咨詢

    • 400-1100-266
    武邑县| 福建省| 吴桥县| 胶州市| 纳雍县| 沁阳市| 涟水县| 衢州市| 同德县| 江口县| 汪清县| 墨脱县| 四子王旗| 灵山县| 固镇县| 浦县| 锦屏县| 西丰县| 东乡县| 泰州市| 仙桃市| 宜宾市| 霞浦县| 长沙市| 勃利县| 河北省| 报价| 东乌| 泸州市| 海门市| 平罗县| 临清市| 乌兰浩特市| 邢台县| 罗田县| 舟山市| 拉萨市| 普陀区| 东乌珠穆沁旗| 合川市| 定兴县|