Geometry 判斷幾何是否被另一個幾何/線段分割成多段

如下圖,如何判斷幾何多邊形A被多邊形B,切割為多段幾何?

 幾何A被幾何B切割

1. 獲取幾何A與幾何B的交集C

 var intersectGeometry = new CombinedGeometry(GeometryCombineMode.Intersect, geometry1, geometry2); 

 

 

2.幾何A排除交集C,得到餘下空白區域D

 var combinedGeometry = new CombinedGeometry(GeometryCombineMode.Exclude, geometry1, intersectGeometry); 

 

 3.判斷幾何D區域是否包含多段幾何

幾何D區分為倆段,獲取域的邊框近似點集,發現含有倆段線條的描述(倆段M->z的文本),與真實幾何分段對應。

所以,可以通過線條終止字符”z”個數,來判斷幾何的分段數量。

  • 獲取幾何的近似多邊形值
  • 獲取其路徑內的點集
  • 判斷點集中是否含有2個及以上的線條繪製結束字符”z”
1     var flattenedPathGeometry = combinedGeometry.GetFlattenedPathGeometry();
2     var outerPointsString = flattenedPathGeometry.Figures.ToString();
3     if (outerPointsString.Length > 2
4         && outerPointsString.Replace("z", string.Empty).Length == outerPointsString.Length - 2)
5     {
6         return true;
7     }

 完整函數見下方代碼

 1     /// <summary>
 2     /// 檢查幾何是否被另一個幾何分割成多段
 3     /// </summary>
 4     /// <param name="geometry1"></param>
 5     /// <param name="geometry2"></param>
 6     /// <returns></returns>
 7     private bool CheckGeometryIsDividedByAnotherGeometry(PathGeometry geometry1, Geometry geometry2)
 8     {
 9         var intersectGeometry = new CombinedGeometry(GeometryCombineMode.Intersect, geometry1, geometry2);
10         var combinedGeometry = new CombinedGeometry(GeometryCombineMode.Exclude, geometry1, intersectGeometry);
11         var flattenedPathGeometry = combinedGeometry.GetFlattenedPathGeometry();
12         var outerPointsString = flattenedPathGeometry.Figures.ToString();
13         var geometryList = outerPointsString.Split(new[] { 'M' }, StringSplitOptions.RemoveEmptyEntries).Where(i => i.Contains("z")).Select(i => $"M{i}").ToList();
14         if (geometryList.Count >= 2 && HintStrokePath.Data == null)
15         {
16             var a = Geometry.Parse(geometryList[0]); ;
17             var b = Geometry.Parse(geometryList[1]); ;
18         }
19         if (outerPointsString.Length > 2
20             && outerPointsString.Replace("z", string.Empty).Length == outerPointsString.Length - 2)
21         {
22             return true;
23         }
24         return false;
25     }

View Code

4. 獲取幾何被分割后的多段幾何內容

解析”M”、”z”,分別獲取倆段幾何數據

1     var geometryList = outerPointsString.Split(new[] { 'M' }, StringSplitOptions.RemoveEmptyEntries).Where(i => i.Contains("z")).Select(i => $"M{i}").ToList();
2     if (geometryList.Count >= 2)
3     {
4         var geometry1 = Geometry.Parse(geometryList[0]); ;
5         var geometry2 = Geometry.Parse(geometryList[1]); ;
6     }

幾何被直線分割

幾何被線段分割,如何判斷或者獲取分割后的多段幾何?

直接用線段與幾何重複上面的步驟,是有問題的。

線段類似“M150,130L150,1300 150,170z”去與幾何去交集,CombinedGeometry中的數據是空的

需要給線條添加1的粗細:

  var geometry2 = lineGeometry.GetWidenedPathGeometry(new System.Windows.Media.Pen(System.Windows.Media.Brushes.Black, 1)); 

結果如下圖:

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想要讓你的商品成為最夯、最多人討論的話題?網頁設計公司讓你強力曝光

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

※專營大陸快遞台灣服務

台灣快遞大陸的貨運公司有哪些呢?

強颱哈吉貝沖走91袋核災污染廢棄物 日官方估:12存放所有相同危機

摘錄自2020年3月17日自由時報報導

去年第19號颱風哈吉貝重創日本關東地區,更衝刷走核災污染物臨時存放所中的污染物,引發擔憂。經日本環境省調查,在同樣降雨量下,部分臨存放所都隱藏著廢棄物流出危機;今(17日)內閣會議之後,環境大臣小泉進次郎對此做出回應。

環境省就存放所週邊河流與潛在淹水地區的322個地點進行調查,按強颱哈吉貝的降雨強度標準評估,共有12處臨時存放所,都有污染廢棄物流出的可能性,甚至還有土石流撕破塑膠袋等潛藏危機。

小泉進次郎在記者會上表示,鑑於近年暴雨的危機逐漸高升,望盡可能對此提早採取應變措施。對此,環境省將以搬運污染物至臨時存儲設施中存放、加裝圍欄以防流出等方式作為5月底以前的應變對策。

公害污染
廢棄物
核能
能源議題
國際新聞
日本
核災

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想要讓你的商品成為最夯、最多人討論的話題?網頁設計公司讓你強力曝光

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

※專營大陸快遞台灣服務

台灣快遞大陸的貨運公司有哪些呢?

武漢肺炎拖慢氣候行動腳步 開發中國家恐交不出升級版減碳目標

環境資訊中心綜合外電;姜唯 編譯;林大利 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

大陸寄台灣空運注意事項

大陸海運台灣交貨時間多久?

※避免吃悶虧無故遭抬價!台中搬家公司免費估價,有契約讓您安心有保障!

第六屆中國國際新能源汽車論壇2016即將在下周隆重開幕!

2016年4月20-22日∣中國·上海安亭穎奕皇冠假日酒店
電池、電控電機、充電設施、汽車電子
車聯網、車後市、創新核心技術、無線充電

由希邁商務諮詢(上海)有限公司主辦的2016年第六屆中國國際新能源汽車論壇即將于下周4月20日-4月22日在上海隆重舉行。此次論壇獲得了亞太電動車學會、國家新能源機動車產品品質監督檢驗中心、上海交大密西根學院、賽迪顧問及中國綠色能源產業技術創新戰略聯盟的大力支持。截止3月14日,論壇已經確認42位演講嘉賓出席本次論壇並做高品質學術演講。演講嘉賓分別來自菲律賓電動車協會、國家新能源機動車產品品質監督檢驗中心、美國國家能源局、中國工程院等在內的政府單位與研究機構,以及包括寶馬、通用汽車、特斯拉、長安汽車、宇通客車、奇瑞、比亞迪戴姆勒、上汽、北汽、觀致等在內的多個知名整車商,將在論壇上共同研討新能源汽車行業發展趨勢、技術路線及難點、基礎設施建設、商業模式,延續以往的豐碩成果,繼續為新能源汽車行業作出貢獻。

本屆得到行業內世界百強企業麥格納動力總成與格特拉克聯合作為論壇的鉑金贊助,同時也獲得了寧波招寶磁業有限公司作為論壇的銀牌贊助,廈門宏發電力電器有限公司作為論壇的銅牌贊助及北京易微行科技有限公司作為論壇的晚宴贊助的大力支持。此外,論壇還獲得了30多家來自全世界各地的知名媒體進行宣傳,並且將會在現場為部分企業進行專題訪問,將在為期3天的論壇上進行全方位的即時報導。

本屆論壇相比歷屆舉辦規模最大的第六屆新能源汽車論壇,涉及主論壇及三個分論壇、考察活動、頒獎典禮和交流晚宴。屆時將有全球範圍內的整車製造商、電網電力公司、電池廠商、零部件供應商、核心技術提供商和政府官員一起,對新能源汽車產業面臨的挑戰,機遇與對策各方面進行為期三天更深層次並具有建設和戰略性的探討。歡迎各位行業內人士積極參與,如有意向請聯繫組委會,期待與您下周共同參與!

會議結構

大會連絡人:Hill ZENG(曾先生)
聯繫電話:0086 21-6045 1760 或郵箱
我們期待與貴單位一起出席於2016年4月20-22日在上海舉辦的第六屆中國國際新能源汽車論壇2016,以利決策!

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想要讓你的商品成為最夯、最多人討論的話題?網頁設計公司讓你強力曝光

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

※專營大陸快遞台灣服務

台灣快遞大陸的貨運公司有哪些呢?

蘋果電動車專案未止步,挖角、研發行動低調進行中

市場瘋傳蘋果(Apple)將推出電動車,但至今仍只聞樓梯響。不過,海外媒體持續傳出蘋果電動車專案仍有大小動作,包含聘用新人、營運研發實驗室等,計畫看來並未胎死腹中。

蘋果電動車的計畫代號傳聞為「Titan」,傳言將在2019年正式開始出貨。蘋果電動車被報導將搭載自動駕駛技術,也會研究採用載運工具共享服務。

科技網站Electrek報導,蘋果將聘用原特斯拉(Tesla)工程開發副總裁Chris Porritt接替將要離職的Steve Zadesky,繼續執行與電動車專案相關的工作。而在這之前,蘋果已從特斯拉、福特、賓士、通用汽車等國際車廠與電池廠挖角專家,也聘用曾在福斯汽車、Nvidia等公司服務,專注於先進駕駛輔助系統與自動駕駛系統領域的專業人士。

此外,MacRumors也引述德國FrankfurterAllgemeine Zeitung報導,指出蘋果正在德國柏林暗中營運一座研發實驗室,共有15~20名研發人員,主要來自德國汽車產業界,或許正是蘋果電動車專案的一部分。

蘋果執行長Tim Cook過去曾參訪BMW的電動車i3產線,與電動車相關的傳言滿天飛。但蘋果對於電動車計畫始終守口如瓶,沒有正面回應。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

大陸寄台灣空運注意事項

大陸海運台灣交貨時間多久?

※避免吃悶虧無故遭抬價!台中搬家公司免費估價,有契約讓您安心有保障!

長安李偉:汽車智慧化分為智慧駕駛、智慧互聯和智慧交互三大發展範疇

未來汽車應該是什麼樣子?每個人都有自己的想像,每個車企都有各自的概念。而在差異化之中的相似之處是,許多企業都把智慧化作為發展方向,將無人駕駛作為嘗試的關鍵步驟。

長安汽車副總裁李偉表示智慧汽車可分為三大發展範疇:智慧駕駛、智慧互聯和智慧交互,而智慧駕駛又可分為四級技術水準。

智慧汽車的第一個範疇——智慧駕駛。對於長安汽車來講,智慧駕駛的一級技術已經成熟且在車上搭載,例如全速的自我調整巡航,緊急高速自動、緊急制動等,這些技術都已經在16款睿騁、CS75、逸動等車型上實現量產;智慧駕駛的二級技術,現在已經在做產品的研發和測試,二級技術主要是在一級自我調整巡航系統的基礎上升級,爭取把手解脫了,另外再加一個全自動倒車,二級的系統長安預計在2017年要量產;智慧駕駛三級技術水準是實現在高速路段的無人駕駛,從重慶到北京的整個無人駕駛汽車,實際上就是智慧駕駛的三級水準,長安汽車計畫在2018年實現整個技術儲備開發,全部匹配結束,2019年能夠得以上市。另外全自動化駕駛技術,就是智慧駕駛的最高級四級,長安努力爭取在2025年前能夠實現量產。

智能汽車的第二個範疇——智能互聯。李偉簡單舉了個例子,“現在長安在美國MTC現場進行叫智慧互聯汽車,它實際上是車和車可以通訊,車和路可以通訊,車和交通信號可以通訊等等。大家如果設想一下,我們現在長安目前的無人駕駛狀態,實際上是靠車本身的信號識別來判斷我的交通情況。未來如果說我們城市是智慧城市,我的交通都是數位的信號,在一公里之前車就能感知我那邊的紅綠燈什麼時候變紅燈什麼時候變綠燈,除了前邊車之外還有什麼車,通過車和車的通訊就會知道,這樣整個交通就會更加更加智慧。未來智慧城市,車聯網和車更加融合,這個車就會更加智慧,沒看到就會知道是什麼前邊情況,從這個方面來講,傳統自動智慧駕駛汽車和智慧互聯再有機的融合,就會帶來更聰明的汽車,就會有更自動的汽車。”

智能汽車的第三個範疇——智能交互。這個階段就是所謂的”人機交互”階段,需要發出什麼指令,不用操作,也不用說出來,只需要在腦袋裡一想,汽車就能執行相應命令。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

大陸寄台灣空運注意事項

大陸海運台灣交貨時間多久?

※避免吃悶虧無故遭抬價!台中搬家公司免費估價,有契約讓您安心有保障!

在 ASP.NET Core 項目中使用 MediatR 實現中介者模式

 一、前言

   最近有在看 DDD 的相關資料以及微軟的  這個項目中基於 DDD 的架構設計,在  這個示例服務中,可以看到各層之間的代碼調用與我們之前傳統的調用方式似乎差異很大,整個項目各個層之間的代碼全部是通過注入 IMediator 進行調用的,F12 查看源碼后可以看到該接口是屬於  這個組件的。既然要照葫蘆畫瓢,那我們就先來了解下如何在 ASP.NET Core 項目中使用 。

  代碼倉儲:

 二、Step by Step

    從 github 的項目主頁上可以看到作者對於這個項目的描述是基於中介者模式的 .NET 實現,是一種基於進程內的數據傳遞。也就是說這個組件主要實現的是在一個應用中實現數據傳遞,如果想要實現多個應用間的數據傳遞就不太適合了。從作者的 github 個人主頁上可以看到,他還是  這個 OOM 組件的作者,PS,如果你想要了解如何在 ASP.NET Core 項目中使用 AutoMapper,你可以查看我之前寫的這一篇文章()。而對於 MediatR 來說,在具體的學習使用之前,我們先來了解下什麼是中介者模式。

  1、什麼是中介者模式

  很多舶來詞的中文翻譯其實最終都會與實際的含義相匹配,例如軟件開發過程中的 23 種設計模式的中文名稱,我們其實可以比較容易的從中文名稱中得知出該設計模式具體想要實現的作用,就像這裏介紹的中介者模式。

  在我們通過代碼實現實際的業務邏輯時,如果涉及到多個對象類之間的交互,通常我們都是會採用直接引用的形式,隨着業務邏輯變的越來越複雜,對於一個簡單的業務抽象出的實現方法中,可能會被我們添加上各種判斷的邏輯或是對於數據的業務邏輯處理方法。

  例如一個簡單的用戶登錄事件,我們可能最終會抽象出如下的業務流程實現。

public bool Login(AppUserLoginDto dto, out string msg)
{
    bool flag = false;
    try
    {
        // 1、驗證碼是否正確
        flag = _redisLogic.GetValueByKey(dto.VerificationCode);
        if (!flag)
        {
            msg = "驗證碼錯誤,請重試";
            return false;
        }

        // 2、驗證賬戶密碼是否正確
        flag = _userLogic.GetAppUser(dto.Account.Trim(), dto.Password.Trim(), out AppUserDetailDto appUser);
        if (!flag)
        {
            msg = "賬戶或密碼錯誤,請重試";
            return false;
        }

        // 3、驗證賬戶是否可以登錄當前的站點(未被鎖定 or 具有登錄當前系統的權限...)
        flag = _authLogic.CheckIsAvailable(appUser);
        if (!flag)
        {
            msg = "用戶被禁止登錄當前系統,請重試";
            return false;
        }

        // 4、設置當前登錄用戶信息
        _authLogic.SetCurrentUser(appUser);

        // 5、記錄登錄記錄
        _userLogic.SaveLoginRecord(appUser);

        msg = "";
        return true;
    }
    catch (Exception ex)
    {
        // 記錄錯誤信息
        msg = $"用戶登錄失敗:{ex.Message}";
        return false;
    }
}

  這裏我們假設對於登錄事件的實現方法存在於 UserAppService 這個類中,對於 redis 資源的操作在 RedisLogic 類中,對於用戶相關資源的操作在 UserLogic 中,而對於權限校驗相關的資源操作位於 AuthLogic 類中。

  可以看到,為了實現 UserAppService 類中定義的登錄方法,我們至少需要依賴於 RedisLogic、UserLogic 以及 AuthLogic,甚至在某些情況下可能在 UserLogic 和 AuthLogic 之間也存在着某種依賴關係,因此我們可以從中得到如下圖所示的類之間的依賴關係。

  一個簡單的登錄業務尚且如此,如果我們需要對登錄業務添加新的需求,例如現在很多網站的登錄和註冊其實是放在一起的,當登錄時如果判斷沒有當前的用戶信息,其實會催生創建新用戶的流程,那麼,對於原本的登錄功能實現,是不是會存在繼續添加新的依賴關係的情況。同時對於很多本身就很複雜的業務,最終實現出來的方法是不是會有更多的對象類之間存在各種的依賴關係,牽一發而動全身,後期修改測試的成本會不會變得更高。

  那麼,中介者模式是如何解決這個問題呢?

  在上文有提到,對於舶來詞的中文名稱,中文更多的會根據實際的含義進行命名,試想一下我們在現實生活中提到中介,是不是更多的會想到房屋中介這一角色。當我們來到一個新的城市,面臨着租房的問題,絕大多數的情況下,我們最終需要通過中介去達成我們租房的目的。在租房這個案例中,房屋中介其實就是一个中介者,他承接我們對於想要租的房子的各種需求,從自己的房屋數據庫中去尋找符合條件的,最終以一個橋樑的形式,連接我們與房東,最終就房屋的租住達成一致。

  而在軟件開發中,中介者模式則是要求我們根據實際的業務去定義一個包含各種對象之間交互關係的對象類,之後,所有涉及到該業務的對象都只關聯於這一个中介對象類,不再顯式的調用其它類。採用了中介者模式之後設計的登錄功能所涉及到的類依賴如下圖所示,這裏的 AppUserLoginEventHandler 其實就是我們的中介類。

  當然,任何事都會有利有弊,不會存在百分百完美的事情,就像我們通過房租中介去尋找合適的房屋,最終我們需要付給中介一筆費用去作為酬勞,採用中介者模式設計的代碼架構也會存在別的問題。因為在代碼中引入了中介者這一對象,勢必會增加我們代碼的複雜度,可能會使原本很輕鬆就實現的代碼變得複雜。同時,我們引入中介者模式的初衷是為了解決各個對象類之間複雜的引用關係,對於某些業務來說,本身就很複雜,最終必定會導致這个中介者對象異常複雜。

  畢竟,軟件開發的過程中不會存在銀彈去幫我們解決所有的問題。

  那麼,在本篇文章的示例代碼中,我將使用 MediatR 這一組件,通過引入中介者模式的思想來完成上面的用戶登錄這一案例。

  2、組件加載

  在使用 MediatR 之前,這裏簡單介紹下這篇文章的示例 demo 項目。這個示例項目的架構分層可以看成是介於傳統的多層架構與採用 DDD 的思想的架構分層。嗯,你可以理解成四不像,屬於那種傳統模式下的開發人員在往 DDD 思想上進行遷移的成品,具體的代碼分層說明解釋如下。

  01_Infrastructure:基礎架構層,這層會包含一些對於基礎組件的配置或是幫助類的代碼,對於每個新建的服務來說,該層的代碼幾乎都是差不多的,所以對於基礎架構層的代碼其實最好是發布到公有 or 私有的 Nuget 倉庫中,然後我們直接在項目中通過 Nuget 去引用。

  對於採用 DDD 的思想構建的項目來說,很多人可能習慣將一些實體的配置也放置在基礎架構層,我的個人理解還是應該置於領域層,對於基礎架構層,只做一些基礎組件的封裝。如果有什麼不對的地方,歡迎在評論區提出。

  02_Domain:領域層,這層會包含我們根據業務劃分出的領域的幾乎所有重要的部分,有領域對象(Domain Object)、值對象(Value Object)、領域事件(Domain Event)、以及倉儲(Repository)等等領域組件。

  這裏雖然我創建了 AggregateModels(聚合實體)這個文件夾,其實在這個項目中,我創建的還是不包含任何業務邏輯的貧血模型。同時,對於倉儲(Repository)在領域分層中是置於 Infrastructure(基礎架構層)還是位於 Domain(領域層),每個人都會有自己的理解,這裏我還是更傾向於放在 Domain 層中更符合其定位。

  03_Application:應用層,這一層會包含我們基於領域所封裝出的各種實際的業務邏輯,每個封裝出的服務應用之間並不會出現互相調用的情況。

  Sample.Api:API 接口層,這層就很簡單了,主要是通過 API 接口暴露出我們基於領域對外提供的各種服務。

  整個示例項目的分層結構如下圖所示。

  與使用其它的第三方組件的使用方式相同,在使用之前,我們需要在項目中通過 Nuget 添加對於 MediatR 的程序集引用。

  這裏需要注意,因為我們主要是通過引用 MediatR 來實現中介者模式,所以我們只需要在領域層和應用層加載 MediatR 即可。而對於 Sample.Api 這個 Web API 項目,因為需要通過依賴注入的方式來使用我們基於 MediatR 所構建出的各種服務,所以這裏我們還要添加 MediatR.Extensions.Microsoft.DependencyInjection 這個程序集到 Sample.Api 中。

Install-Package MediatR
Install-Package MediatR.Extensions.Microsoft.DependencyInjection

  3、案例實現

  首先我們在 Sample.Domain 這個類庫的 AggregateModels 文件夾下添加 AppUser(用戶信息)類 和 Address(地址信息) 類,這裏雖然並沒有採用 DDD 的思想去劃分領域對象和值對象,我們創建出來的都是不含任何業務邏輯的貧血模型。但是在用戶管理這個業務中,對於用戶所包含的聯繫地址信息,其實是一種無狀態的數據。也就是說對於同一個地址信息,不會因為置於多個用戶中而出現數據的二義性。因此,對於地址信息來說,是不需要唯一的標識就可以區分出這個數據的,所以這裏的 Address 類就不需要添加主鍵,其實也就是對應於領域建模中的值對象。

  這裏我是使用的 EF Core 作為項目的 ORM 組件,當創建好需要使用實體之後,我們在 Sample.Domain 這個類庫下面新建一個 SeedWorks 文件夾,添加自定義的 DbContext 對象和用於執行 EF Core 第一次生成數據庫時寫入預置種子數據的信息類。

  這裏需要注意,在 EF Core 中,當我們需要將編寫的 C# 類通過 Code First 創建出數據庫表時,我們的 C# 類必須包含主鍵信息。而對應到我們這裏的 Address 類來說,它更多的是作為 AppUser 類中的屬性信息來展示的,所以這裏我們需要對 EF Core 生成數據庫表的過程進行重寫。

  這裏我們在 SeedWorks 文件夾下創建一個新的文件夾 EntityConfigurations,在這裏用來存放我們自定義的 EF Core 創建表的規則。新建一個繼承於 IEntityTypeConfiguration<AppUser> 接口的 AppUserConfiguration 配置類,在接口默認 Configure 方法中,我們需要編寫映射規則,將 Address 類作為 AppUser 類中的字段進行显示,最終實現后的代碼如下所示。 

public class AppUserConfiguration : IEntityTypeConfiguration<AppUser>
{
    public void Configure(EntityTypeBuilder<AppUser> builder)
    {
        // 表名稱
        builder.ToTable("appuser");

        // 實體屬性配置
        builder.OwnsOne(i => i.Address, n =>
        {
            n.Property(p => p.Province).HasMaxLength(50)
                .HasColumnName("Province")
                .HasDefaultValue("");

            n.Property(p => p.City).HasMaxLength(50)
                .HasColumnName("City")
                .HasDefaultValue("");

            n.Property(p => p.Street).HasMaxLength(50)
                .HasColumnName("Street")
                .HasDefaultValue("");

            n.Property(p => p.ZipCode).HasMaxLength(50)
                .HasColumnName("ZipCode")
                .HasDefaultValue("");
        });
    }
}

  當創建表的映射規則編寫完成后,我們就可以對 UserApplicationDbContext 類進行重寫 OnModelCreating 方法。在這個方法中,我們就可以去應用我們自定義設置的實體映射規則,從而讓 EF Core 按照我們的想法去創建數據庫,最終實現的代碼如下所示。

public class UserApplicationDbContext : DbContext
{
    public DbSet<AppUser> AppUsers { get; set; }

    public UserApplicationDbContext(DbContextOptions<UserApplicationDbContext> options)
        : base(options)
    {
    }

    /// <summary>
    ///
    /// </summary>
    /// <param name="modelBuilder"></param>
    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        // 自定義 AppUser 表創建規則
        modelBuilder.ApplyConfiguration(new AppUserConfiguration());
    }
}

  當我們創建好 DbContext 后,我們需要在 Startup 類的 ConfigureServices 方法中進行注入。在示例代碼中,我使用的是 MySQL 8.0 數據庫,將配置文件寫入到 appsettings.json 文件中,最終注入 DbContext 的代碼如下所示。

public void ConfigureServices(IServiceCollection services)
{
    // 配置數據庫連接字符串
    services.AddDbContext<UserApplicationDbContext>(options =>
        options.UseMySql(Configuration.GetConnectionString("SampleConnection")));
}

  數據庫的連接字符串配置如下。

{
  "ConnectionStrings": {
    "SampleConnection": "server=127.0.0.1;database=sample.application;user=root;password=123456@sql;port=3306;persistsecurityinfo=True;"
  }
}

  在上文有提到,除了創建一個 DbContext 對象,我們還創建了一個 DbInitializer 類用於在 EF Core 第一次執行創建數據庫操作時將我們預置的信息寫入到對應的數據庫表中。這裏我們只是簡單的判斷下 AppUser 這張表是否存在數據,如果沒有數據,我們就添加一條新的記錄,最終實現的代碼如下所示。

public class DbInitializer
{
    public static void Initialize(UserApplicationDbContext context)
    {
        context.Database.EnsureCreated();

        if (context.AppUsers.Any())
            return;

        AppUser admin = new AppUser()
        {
            Id = Guid.NewGuid(),
            Name = "墨墨墨墨小宇",
            Account = "danvic.wang",
            Phone = "13912345678",
            Age = 12,
            Password = "123456",
            Gender = true,
            IsEnabled = true,
            Address = new Address("啦啦啦啦街道", "啦啦啦市", "啦啦啦省", "12345"),
            Email = "danvic.wang@yuiter.com",
        };

        context.AppUsers.Add(admin);
        context.SaveChanges();
    }
}

  當我們完成種子數據植入的代碼,我們需要在程序啟動之前就去執行我們的代碼。因此我們需要修改 Program 類中的 Main 方法,實現在運行 web 程序之前去執行種子數據的植入。

public class Program
{
    public static void Main(string[] args)
    {
        var host = CreateWebHostBuilder(args).Build();

        using (var scope = host.Services.CreateScope())
        {
            // 執行種子數據植入
            //
            var services = scope.ServiceProvider;
            var context = services.GetRequiredService<UserApplicationDbContext>();
            DbInitializer.Initialize(context);
        }
    }

    public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
        WebHost.CreateDefaultBuilder(args)
            .UseStartup<Startup>();
}

  這時,運行我們的項目,程序就會自動執行創建數據庫的操作,同時會將我們預設好的種子數據寫入到數據庫表中,最終實現的效果如下圖所示。

  基礎的項目代碼已經完成之後,我們就可以開始學習如何通過 MediatR 來實現中介者模式。在這一章的示例項目中,我們會使用到 MediatR 中兩個很重要的接口類型:IRequest 和 INotification。

  在 Github 上,作者針對這兩個接口做了如下的,這裏我會按照我的理解去進行使用。同時,為了防止我的理解出現了偏差,從而對各位造成影響,這裏貼上作者回復解釋的原文。

Requests are for:
1 request to 1 handler. Handler may or may not return a value
Notifications are for:
1 notification to n handlers. Handler may not return a value.


In practical terms, requests are "commands", notifications are "events".
Command would be directing MediatR to do something like "ApproveInvoiceCommand -> ApproveInvoiceHandler". Event would be
notifications, like "InvoiceApprovedEvent -> SendThankYouEmailToCustomerHandler"

 

  對於繼承於 IRequest 接口的類來說,一個請求(request)只會有一個針對這個請求的處理程序(requestHandler),它可以返回值或者不返回任何信息;

  而對於繼承於 INotification 接口的類來說,一個通知(notification)會對應多個針對這個通知的處理程序(notificationHandlers),而它們不會返回任何的數據。

  請求(request)更像是一種命令(command),而通知(notification)更像是一種事件(event)。嗯,可能看起來更暈了,jbogard 這裏給了一個案例給我們進一步的解釋了 request 與 notification 之間的差異性。

  雙十一剛過,很多人都會瘋狂剁手,對於購買大件來說,為了能夠更好地擁有售後服務,我們在購買后肯定會期望商家給我們提供發票,這裏的要求商家提供發票就是一種 request,而針對我們的這個請求,商家會做出回應,不管能否開出來發票,商家都應當通知到我們,這裏的通知用戶就是一種 notification。

  對於提供發票這個 request 來說,不管最終的結果如何,它只會存在一種處理方式;而對於通知用戶這個 notification 來說,商家可以通過短信通知,可以通過公眾號推送,也可以通過郵件通知,不管採用什麼方式,只要完成了通知,對於這個事件來說也就已經完成了。    

  而對應於用戶登錄這個業務來說,用戶的登錄行為其實就是一個 request,對於這個 request 來說,我們可能會去數據庫查詢賬戶是否存在,判斷是不是具有登錄系統的權限等等。而不管我們在這個過程中做了多少的邏輯判斷,它只會有兩種結果,登錄成功或登錄失敗。而對於用戶登錄系統之後可能需要設置當前登錄人員信息,記錄用戶登錄日誌這些行為來說,則是歸屬於 notification 的。

  弄清楚了用戶登錄事件中的 request 和 notification 劃分,那麼接下來我們就可以通過代碼來實現我們的功能。這裏對於示例項目中的一些基礎組件的配置我就跳過了,如果你想要具體的了解這裏使用到的一些組件的使用方法,你可以查閱我之前的文章。

  首先,我們在 Sample.Application 這個類庫下面創建一個 Commands 文件夾,在下面存放用戶的請求信息。現在我們創建一個用於映射用戶登錄請求的 UserLoginCommand 類,它需要繼承於 IRequest<T> 這個泛型接口。因為對於用戶登錄這個請求來說,只會有可以或不可以這兩個結果,所以對於這個請求的響應的結果是 bool 類型的,也就是說,我們具體應該繼承的是 IRequest<bool>。

  對於用戶發起的各種請求來說,它其實只是包含了對於這次請求的一些基本信息,而對於 UserLoginCommand 這個用戶登錄請求類來說,它可能只會有賬號、密碼、驗證碼這三個信息,請求類代碼如下所示。

public class UserLoginCommand : IRequest<bool>
{
    /// <summary>
    /// 賬戶
    /// </summary>
    public string Account { get; private set; }

    /// <summary>
    /// 密碼
    /// </summary>
    public string Password { get; private set; }

    /// <summary>
    /// 驗證碼
    /// </summary>
    public string VerificationCode { get; private set; }

    /// <summary>
    /// ctor
    /// </summary>
    /// <param name="account">賬戶</param>
    /// <param name="password">密碼</param>
    /// <param name="verificationCode">驗證碼</param>
    public UserLoginCommand(string account, string password, string verificationCode)
    {
        Account = account;
        Password = password;
        VerificationCode = verificationCode;
    }
}

  當我們擁有了存儲用戶登錄請求信息的類之後,我們就需要對用戶的登錄請求進行處理。這裏,我們在 Sample.Application 這個類庫下面新建一個 CommandHandlers 文件夾用來存放用戶請求的處理類。

  現在我們創建一個繼承於 IRequestHandler 接口的 UserLoginCommandHandler 類用來實現對於用戶登錄請求的處理。IRequestHandler 是一個泛型的接口,它需要我們在繼承時聲明我們需要實現的請求,以及該請求的返回信息。因此,對於 UserLoginCommand 這個請求來說,UserLoginCommandHandler 這個請求的處理類,最終需要繼承於 IRequestHandler<UserLoginCommand, bool>。

  就像上面提到的一樣,我們需要在這個請求的處理類中對用戶請求的信息進行處理,在 UserLoginCommandHandler 類中,我們應該在 Handle 方法中去執行我們的判斷邏輯,這裏我們會引用到倉儲來獲取用戶的相關信息。倉儲中的代碼這裏我就不展示了,最終我們實現后的代碼如下所示。

public class UserLoginCommandHandler : IRequestHandler<UserLoginCommand, bool>
{
    #region Initizalize

    /// <summary>
    /// 倉儲實例
    /// </summary>
    private readonly IUserRepository _userRepository;

    /// <summary>
    /// ctor
    /// </summary>
    /// <param name="userRepository"></param>
    public UserLoginCommandHandler(IUserRepository userRepository)
    {
        _userRepository = userRepository ?? throw new ArgumentNullException(nameof(userRepository));
    }

    #endregion Initizalize

    /// <summary>
    /// Command Handler
    /// </summary>
    /// <param name="request"></param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<bool> Handle(UserLoginCommand request, CancellationToken cancellationToken)
    {
        // 1、判斷驗證碼是否正確
        if (string.IsNullOrEmpty(request.VerificationCode))
            return false;

        // 2、驗證登錄密碼是否正確
        var appUser = await _userRepository.GetAppUserInfo(request.Account.Trim(), request.Password.Trim());
        if (appUser == null)
            return false;

        return true;
    }
}

  當我們完成了對於請求的處理代碼后,就可以在 controller 中提供用戶訪問的入口。當然,因為我們需要採用依賴注入的方式去使用 MediatR,所以在使用之前,我們需要將請求的對應處理關係注入到依賴注入容器中。

  在通過依賴注入的方式使用 MediatR 時,我們需要將所有的事件(請求以及通知)注入到容器中,而 MediatR 則會自動尋找對應事件的處理類,除此之外,我們也需要將通過依賴注入使用到的 IMediator 接口的實現類注入到容器中。而在這個示例項目中,我們主要是在 Sample.Domain、Sample.Application 以及我們的 Web Api 項目中使用到了 MediatR,因此,我們需要將這三個項目中使用到 MediatR 的類全部注入到容器中。

  一個個的注入會比較的麻煩,所以這裏我還是採用對指定的程序集進行反射操作,去獲取需要加載的信息批量的進行注入操作,最終實現后的代碼如下。

public static IServiceCollection AddCustomMediatR(this IServiceCollection services, MediatorDescriptionOptions options)
{
    // 獲取 Startup 類的 type 類型
    var mediators = new List<Type> { options.StartupClassType };

    // IRequest<T> 接口的 type 類型
    var parentRequestType = typeof(IRequest<>);

    // INotification 接口的 type 類型
    var parentNotificationType = typeof(INotification);

    foreach (var item in options.Assembly)
    {
        var instances = Assembly.Load(item).GetTypes();

        foreach (var instance in instances)
        {
            // 判斷是否繼承了接口
            //
            var baseInterfaces = instance.GetInterfaces();
            if (baseInterfaces.Count() == 0 || !baseInterfaces.Any())
                continue;

            // 判斷是否繼承了 IRequest<T> 接口
            //
            var requestTypes = baseInterfaces.Where(i => i.IsGenericType
                && i.GetGenericTypeDefinition() == parentRequestType);

            if (requestTypes.Count() != 0 || requestTypes.Any())
                mediators.Add(instance);

            // 判斷是否繼承了 INotification 接口
            //
            var notificationTypes = baseInterfaces.Where(i => i.FullName == parentNotificationType.FullName);

            if (notificationTypes.Count() != 0 || notificationTypes.Any())
                mediators.Add(instance);
        }
    }

    // 添加到依賴注入容器中
    services.AddMediatR(mediators.ToArray());

    return services;
}

  因為需要知道哪些程序集應該進行反射獲取信息,而對於 Web Api 這個項目來說,它只會通過依賴注入使用到 IMediator 這一個接口,所以這裏需要採用不同的參數的形式去確定具體需要通過反射加載哪些程序集。

public class MediatorDescriptionOptions
{
    /// <summary>
    /// Startup 類的 type 類型
    /// </summary>
    public Type StartupClassType { get; set; }

    /// <summary>
    /// 包含使用到 MediatR 組件的程序集
    /// </summary>
    public IEnumerable<string> Assembly { get; set; }
}

  最終,我們就可以在 Startup 類中通過擴展方法的信息進行快速的注入,實際使用的代碼如下,這裏我是將需要加載的程序集信息放在 appsetting 這個配置文件中的,你可以根據你的喜好進行調整。

public class Startup
{
    // This method gets called by the runtime. Use this method to add services to the container.
    public void ConfigureServices(IServiceCollection services)
    {
        // Config mediatr
        services.AddCustomMediatR(new MediatorDescriptionOptions
        {
            StartupClassType = typeof(Startup),
            Assembly = Configuration["Assembly:Mediator"].Split("|", StringSplitOptions.RemoveEmptyEntries)
        });
    }
}

  在這個示例項目中的配置信息如下所示。

{
  "Assembly": {
    "Function": "Sample.Domain",
    "Mapper": "Sample.Application",
    "Mediator": "Sample.Application|Sample.Domain"
  }
}

  當我們注入完成后,就可以直接在 controller 中進行使用。對於繼承了 IRequest 的方法,可以直接通過 Send 方法進行調用請求信息,MediatR 會幫我們找到對應請求的處理方法,最終登錄 action 中的代碼如下。

[ApiVersion("1.0")]
[ApiController]
[Route("api/v{version:apiVersion}/[controller]")]
public class UsersController : ControllerBase
{
    #region Initizalize

    /// <summary>
    ///
    /// </summary>
    private readonly IMediator _mediator;

    /// <summary>
    /// ctor
    /// </summary>
    /// <param name="mediator"></param>
    public UsersController(IMediator mediator)
    {
        _mediator = mediator ?? throw new ArgumentNullException(nameof(mediator));
    }

    #endregion Initizalize

    #region APIs

    /// <summary>
    /// 用戶登錄
    /// </summary>
    /// <param name="login">用戶登錄數據傳輸對象</param>
    /// <returns></returns>
    [HttpPost("login")]
    [ProducesResponseType(StatusCodes.Status200OK)]
    [ProducesResponseType(StatusCodes.Status401Unauthorized)]
    public async Task<IActionResult> Post([FromBody] AppUserLoginDto login)
    {
        // 實體映射轉換
        var command = new UserLoginCommand(login.Account, login.Password, login.VerificationCode);

        bool flag = await _mediator.Send(command);

        if (flag)
            return Ok(new
            {
                code = 20001,
                msg = $"{login.Account} 用戶登錄成功",
                data = login
            });
        else
            return Unauthorized(new
            {
                code = 40101,
                msg = $"{login.Account} 用戶登錄失敗",
                data = login
            });
    }

    #endregion APIs
}

  當我們完成了對於用戶登錄請求的處理之後,就可以去執行後續的“通知類”的事件。與用戶登錄的請求信息類相似,對於用戶登錄事件的通知類也只是包含一些通知的基礎信息。在 Smaple.Domain 這個類庫下面,創建一個 Events 文件用來存放我們的事件,我們來新建一個繼承於 INotification 接口的 AppUserLoginEvent 類,用來對用戶登錄事件進行相關的處理。

public class AppUserLoginEvent : INotification
{
    /// <summary>
    /// 賬戶
    /// </summary>
    public string Account { get; }

    /// <summary>
    /// ctor
    /// </summary>
    /// <param name="account"></param>
    public AppUserLoginEvent(string account)
    {
        Account = account;
    }
}

  在上文中有提到過,對於一個通知事件可能會存在着多種處理方式,所以這裏我們在 Smaple.Application 這個類庫的 DomainEventHandlers 文件夾下面會按照事件去創建對應的文件夾去存放實際處理方法。

  對於繼承了 INotification 接口的通知類來說,在 MediatR 中我們可以通過創建繼承於 INotificationHandler 接口的類去處理對應的事件。因為一個 notification 可以有多個的處理程序,所以我們可以創建多個的 NotificationHandler 類去處理同一個 notification。一個示例的 NotificationHandler 類如下所示。

public class SetCurrentUserEventHandler : INotificationHandler<AppUserLoginEvent>
{
    #region Initizalize

    /// <summary>
    ///
    /// </summary>
    private readonly ILogger<SetCurrentUserEventHandler> _logger;

    /// <summary>
    ///
    /// </summary>
    /// <param name="logger"></param>
    public SetCurrentUserEventHandler(ILogger<SetCurrentUserEventHandler> logger)
    {
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
    }

    #endregion Initizalize

    /// <summary>
    /// Notification handler
    /// </summary>
    /// <param name="notification"></param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public Task Handle(AppUserLoginEvent notification, CancellationToken cancellationToken)
    {
        _logger.LogInformation($"CurrentUser with Account: {notification.Account} has been successfully setup");

        return Task.FromResult(true);
    }
}

  如何去引發這個事件,對於領域驅動設計的架構來說,一個更好的方法是將各種領域事件添加到事件的集合中,然後在提交事務之前或之後立即調度這些域事件,而對於我們這個項目來說,因為這不在這篇文章考慮的範圍內,只是演示如何去使用 MediatR 這個組件,所以這裏我就採取在請求邏輯處理完成后直接觸發事件的方式。

  在 UserLoginCommandHandler 類中,修改我們的代碼,在確認登錄成功后,通過調用 AppUser 類的 SetUserLoginRecord 方法來觸發我們的通知事件,修改后的代碼如下所示。

public class UserLoginCommandHandler : IRequestHandler<UserLoginCommand, bool>
{
    #region Initizalize

    /// <summary>
    /// 倉儲實例
    /// </summary>
    private readonly IUserRepository _userRepository;

    /// <summary>
    ///
    /// </summary>
    private readonly IMediator _mediator;

    /// <summary>
    /// ctor
    /// </summary>
    /// <param name="userRepository"></param>
    /// <param name="mediator"></param>
    public UserLoginCommandHandler(IUserRepository userRepository, IMediator mediator)
    {
        _userRepository = userRepository ?? throw new ArgumentNullException(nameof(userRepository));
        _mediator = mediator ?? throw new ArgumentNullException(nameof(mediator));
    }

    #endregion Initizalize

    /// <summary>
    /// Command Handler
    /// </summary>
    /// <param name="request"></param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<bool> Handle(UserLoginCommand request, CancellationToken cancellationToken)
    {
        // 1、判斷驗證碼是否正確
        if (string.IsNullOrEmpty(request.VerificationCode))
            return false;

        // 2、驗證登錄密碼是否正確
        var appUser = await _userRepository.GetAppUserInfo(request.Account.Trim(), request.Password.Trim());
        if (appUser == null)
            return false;

        // 3、觸發登錄事件
        appUser.SetUserLoginRecord(_mediator);

        return true;
    }
}

  與使用 Send 方法去調用 request 類的請求不同,對於繼承於 INotification 接口的事件通知類,我們需要採用 Publish 的方法去調用。至此,對於一個採用中介者模式設計的登錄流程就結束了,SetUserLoginRecord 方法的定義,以及最終我們實現的效果如下所示。

public void SetUserLoginRecord(IMediator mediator)
{
    mediator.Publish(new AppUserLoginEvent(Account));
}

 三、總結

  這一章主要是介紹了如何通過 MediatR 來實現中介者模式,因為自己也是第一次接觸這種思想,對於 MediatR 這個組件也是第一次使用,所以僅僅是採用案例分享的方式對中介者模式的使用方法進行了一個解釋。如果你想要對中介者模式的具體定義與基礎的概念進行進一步的了解的話,可能需要你自己去找資料去弄明白具體的定義。因為初次接觸,難免會有遺漏或錯誤,如果從文章中發現有不對的地方,歡迎在評論區中指出,先行感謝。

 四、參考

  1、

  2、

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想要讓你的商品成為最夯、最多人討論的話題?網頁設計公司讓你強力曝光

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

※專營大陸快遞台灣服務

台灣快遞大陸的貨運公司有哪些呢?

Kafka needs no Keeper(關於KIP-500的討論)

寫在前面的

最近看了Kafka Summit上的這個分享,覺得名字很霸氣,標題直接沿用了。這個分享源於社區的,大體的意思今後Apache Kafka不再需要ZooKeeper。整個分享大約40幾分鐘。完整看下來感覺乾貨很多,這裏特意總結出來。如果你把這個分享看做是《三國志》的話,那麼姑且就把我的這篇看做是裴松之注吧:)

客戶端演進

首先,社區committer給出了Kafka Java客戶端移除ZooKeeper依賴的演進過程。下面兩張圖總結了0.8.x版本和0.11.x版本(是否真的是從0.11版本開始的變化並不重要)及以後的功能變遷:在Kafka 0.8時代,Kafka有3個客戶端,分別是Producer、Consumer和Admin Tool。其中Producer負責向Kafka寫消息,Consumer負責從Kafka讀消息,而Admin Tool執行各種運維任務,比如創建或刪除主題等。其中Consumer的位移數據保存在ZooKeeper上,因此Consumer端的位移提交和位移獲取操作都需要訪問ZooKeeper。另外Admin Tool執行運維操作也要訪問ZooKeeper,比如在對應的ZooKeeper znode上創建一個臨時節點,然後由預定義的Watch觸發相應的處理邏輯。

後面隨着Kafka的演進,社區引入了__consumer_offsets位移主題,同時定義了OffsetFetch和OffsetCommit等新的RPC協議,這樣Consumer的位移提交和位移獲取操作全部轉移到與位移主題進行交互,避免了對ZooKeeper的訪問。同時社區引入了新的運維工具AdminClient以及相應的CreateTopics、DeleteTopics、AlterConfigs等RPC協議,替換了原先的Admin Tool,這樣創建和刪除主題這樣的運維操作也完全移動Kafka這一端來做,就像下面右邊這張圖展示的:

至此, Kafka的3個客戶端基本上都不需要和ZooKeeper交互了。應該說移除ZooKeeper的工作完成了大部分,但依然還有一部分工作要在ZooKeeper的幫助下完成,即Consumer的Rebalance操作。在0.8時代,Consumer Group的管理是交由ZooKeeper完成的,包括組成員的管理和訂閱分區的分配。這個設計在新版Consumer中也得到了修正。全部的Group管理操作交由Kafka Broker端新引入的Coordinator組件來完成。要完成這些工作,Broker端新增了很多RPC協議,比如JoinGroup、SyncGroup、Heartbeat、LeaveGroup等。

  

此時,Kafka的Java客戶端除了AdminClient還有一點要依賴ZooKeeper之外,所有其他的組件全部擺脫了對ZooKeeper的依賴。

之後,社區引入了Kafka安全層,實現了對用戶的認證和授權。這個額外的安全層也是不需要訪問ZooKeeper的,因此之前依賴ZooKeeper的客戶端是無法“享用”這個安全層。一旦啟用,新版Clients都需要首先接入這一層並通過審核之後才能訪問到Broker,如下圖所示:

這麼做的好處在於統一了Clients訪問Broker的模式,即定義RPC協議,比如我們熟知的PRODUCE協議、FETCH協議、METADATA協議、CreateTopics協議等。如果後面需要實現更多的功能,社區只需要定義新的RPC協議即可。同時新引入的安全層負責對這套RPC協議進行安全校驗,統一了訪問模式。另外這些協議都是版本化的(versioned),因此能夠獨立地進行演進,同時也兼顧了兼容性方面的考量。

Broker間交互

說完了Clients端,我們說下Broker端的現狀。目前,應該說Kafka Broker端對ZooKeeper是重度依賴的,主要表現在以下幾個方面:

  • Broker註冊管理
  • ACL安全層配置管理
  • 動態參數管理
  • 副本ISR管理
  • Controller選舉

我們拿一張圖來說明,圖中有4個Broker節點和一個ZooKeeper,左上角的Broker充當Controller的角色。當前,所有的Broker啟動后都必須維持與ZooKeeper的會話。Kafka依賴於這個會話實現Broker端的註冊,而且Kafka集群中的所有配置信息、副本信息、主題信息也都保存在ZooKeeper上。最後Controller與集群中每個Broker都維持了一個TCP長連接用於向這些Broker發送RPC請求。當前的Controller RPC類型主要有3大類:

  • LeaderAndIsr:主要用於向集群廣播主題分區Leader和ISR的變更情況,比如對應的Broker應該是特定分區的Leader還是Follower
  • StopReplica:向集群廣播執行停止副本的命令
  • UpdateMetadata:向集群廣播執行變更元數據信息的命令

圖中還新增了一個AlterISR RPC,這是KIP-497要實現的新RPC協議。現階段Kafka各個主題的ISR信息全部保存在ZooKeeper中。如果後續要捨棄ZooKeeper,必須要將這些信息從ZooKeeper中移出來,放在了Controller一端來做。同時還要在程序層面支持對ISR的管理。因此社區計劃在KIP-497上增加AlterISR協議。對了,還要提一句,當前Controller的選舉也是依靠ZooKeeper完成的。

所以後面Broker端的演進可能和Clients端的路線差不多:首先是把Broker與ZooKeeper的交互全部幹掉,只讓Controller與ZooKeeper進行交互,而其他所有Broker都只與Controller交互,如下圖所示:

 

看上去這種演進路線社區已經走得輕車熟路了,但實際上還有遺留了一些問題需要解決。

Broker Liveness

首先就是Broker的liveness問題,即Kafka如何判斷一個Broker到底是否存活?在目前的設計中,Broker的生存性監測完全依賴於與ZooKeeper之間的會話。一旦會話超時或斷開Controller自動觸發ZooKeeper端的Watch來移除該Broker,並對其上的分區做善後處理。如果移除了ZooKeeper,Kafka應該採用什麼機制來判斷Broker的生存性是一個問題。

Network Partition

如何防範網絡分區也是一個需要討論的話題。當前可能出現的Network Partition有4種:1、單個Broker完全與集群隔離;2、Broker間無法通訊;3、Broker與ZooKeeper無法通訊;4、Broker與Controller無法通訊。下面4張圖分別展示了這4種情況:

 

我們分別討論下。首先是第一種情況,單Broker與集群其他Broker隔離,這其實並不算太嚴重的問題。當前的設計已然能夠保證很好地應對此種情況。一旦Broker被隔離,Controller會將其從集群中摘除,雖然可用性降低了,但是整個集群的一致性依然能夠得到保證。第二種情況是Broker間無法通訊,可能的後果是消息的備份機制無法執行,Kafka要收縮ISR,依然是可用性上的降低,但是一致性狀態並沒有被破壞。情況三是Broker無法與ZooKeeper通訊。Broker能正常運轉,它只是無法與ZooKeeper進行通訊。此時我們說該Broker處於殭屍狀態,即所謂的Zoobie狀態。因Zoobie狀態引入的一致性bug社區jira中一直沒有斷過,社區這幾年也一直在修正這方面的問題,主要對抗的機制就是fencing。比如leader epoch等。最後一類情況是Broker無法與Controller通訊,那麼所有的元數據更新通道被堵死,即使這個Broker依然是healthy的,但是它保存的元數據信息可能是非常過期的。這樣連接該Broker的客戶端可能會看到各種非常古怪的問題。之前在知乎上回答過類似的問題:4。目前,社區對這種情況並沒有太好的解決辦法,主要的原因是Broker的liveness完全交由ZooKeeper來做的。一旦Broker與ZooKeeper之間的交互沒有問題,其他原因導致的liveness問題就無法徹底規避。

第四類Network Partition引入了一個經典的場景:元數據不一致。目前每個Broker都緩存了一份集群的元數據信息,這份數據是異步更新的。當第四類Partition發生時,Broker端緩存的元數據信息必然與Controller的不同步,從而造成各種各樣的問題。

下面簡要介紹一下元數據更新的過程。主要的流程就是Controller啟動時會同步地從ZooKeeper上拉取集群全量的元數據信息,之後再以異步的方式同步給其他Broker。其他Broker與Controller之間的同步往往有一個時間差,也就是說可能Clients訪問的元數據並不是最新的。我個人認為現在社區很多flaky test failure都是因為這個原因導致的。 事實上,實際使用過程中有很多場景是Broker端的元數據與Controller端永遠不同步。通常情況下如果我們不重啟Broker的話,那麼這個Broker上的元數據將永遠“錯誤”下去。好在社區還給出了一個最後的“大招”: 登錄到ZooKeeper SHELL,手動執行rmr /controller,強迫Controller重選舉,然後重新加載元數據,並給所有Broker重刷一份。不過在實際生產環境,我懷疑是否有人真的要這麼干,畢竟代價不小,而且最關鍵的是這麼做依然可能存在兩個問題:1. 我們如何確保Controller和Broker的數據是一致的?2. 加載元數據的過程通常很慢。

這裏詳細說說第二點,即加載元數據的性能問題。總體來說,加載元數據是一個O(N)時間複雜度的過程,這裏的N就是你集群中總的分區數。考慮到Controller從ZooKeeper加載之後還要推給其他的Broker,那麼做這件事的總的時間複雜度就是O(N * M),其中M是集群中Broker的數量。可以想見,當M和N都很大時,在集群中廣播元數據不是一個很快的過程。

Metadata as an Event Log

Okay,鑒於以上所提到的所有問題,當Kafka拋棄了ZooKeeper之後,社區應該如何解決它們呢?總體的思路就是Metadata as an Event Log + Controller quorum。我們先說metadata as an event log。如果你讀過Jay Kreps的《I ️Logs》,你應該有感觸,整個Kafka的架構其實都是構建在Log上的。每個topic的分區本質上就是一個Commit Log,但元數據信息的保存卻不是Log形式。在現有的架構設計中你基本上可以認為元數據的數據結構是KV形式的。這一次,社區採用了與消息相同的數據保存方式,即將元數據作為Log的方式保存起來,如下錶所示:

 

這樣做的好處在於每次元數據的變更都被當做是一條消息保存在Log中,而這個Log可以被視作是一個普通的Kafka主題被備份到多台Broker上。Log的一個好處在於它有清晰的前後順序關係,即每個事件發生的時間是可以排序的,配合以恰當的處理邏輯,我們就能保證對元數據變更的處理是按照變更發生時間順序處理,不出現亂序的情形。另外Log機制還有一個好處是,在Broker間同步元數據時,我們可以選擇同步增量數據(delta),而非全量狀態。現在Kafka Broker間同步元數據都是全量狀態同步的。前面說過了,當集群分區數很大時,這個開銷是很可觀的。如果我們能夠只同步增量狀態,勢必能極大地降低同步成本。最後一個好處是,我們可以很容易地量化元數據同步的進度,因為對Log的消費有位移數據,因此通過監控Log Lag就能算出當前同步的進度或是落後的進度。

採用Log機制后,其他Broker像是一個普通的Consumer,從Controller拉取元數據變更消息或事件。由於每個Broker都是一個Consumer,所以它們會維護自己的消費位移,就像下面這張圖一樣:

 這種設計下,Controller所在的Broker必須要承擔起所有元數據topic的管理工作,包括創建topic、管理topic分區的leader以及為每個元數據變更創建相應的事件等。既然社區選擇和__consumer_offsets類似的處理方式,一個很自然的問題在於這個元數據topic的管理是否能夠復用Kafka現有的副本機制?答案是:不可行。理由是現有的副本機制依賴於Controller,因此Kafka沒法依靠現有的副本機制來實現Controller——按照我們的俗語來說,這有點雞生蛋、蛋生雞的問題,屬於典型的循環依賴。為了實現這個,Kafka需要一套leader選舉協議,而這套協議或算法是不依賴於Controller的,即它是一個自管理的集群quorum(抱歉,在分佈式領域內,特別是分佈式共識算法領域中,針對quorum的恰當翻譯我目前還未找到,因此直接使用quorum原詞了)。最終社區決定採用Raft來實現這組quorum。這就是上面我們提到的第二個解決思路:Controller quorum。

Controller Quorum

與藉助Controller幫忙選擇Leader不同,Raft是讓自己的節點自行選擇Leader並最終令所有節點達成共識——對選擇Controller而言,這是一個很好的特性。其實Kafka現有的備份機制與Raft已經很接近了,下錶羅列了一下它們的異同:

 一眼掃過去,其實Kafka的備份機制和Raft很類似,比如Kafka中的offset其實就是Raft中的index,epoch對應於term。當然Raft中採用的半數機制來確保消息被提交以及Leader選舉,而Kafka設計了ISR機制來實現這兩點。總體來說,社區認為只需要對備份機製做一些小改動就應該可以很容易地切換到Raft-based算法。

下面這張圖展示Controller quorum可能更加直觀:

整個controller quorum類似於一個小的集群。和ZooKeeper類似,這個quorum通常是3台或5台機器,不需要讓Kafka中的每個Broker都自動稱為這個quorum中的一個節點。該quorum裏面有一個Leader負責處理客戶端發來的讀寫請求,這個Leader就是Kafka中的active controller。根據ZooKeeper的Zab協議,leader處理所有的寫請求,而follower是可以處理讀請求的。當寫請求發送給follower后,follower會將該請求轉發給leader處理。不過我猜Kafka應該不會這樣實現,它應該只會讓leader(即active controller)處理所有的讀寫請求,而客戶端(也就是其他Broker)壓根就不會發送讀寫請求給follower。在這一點上,這種設計和現有的Kafka請求處理機制是一致的。

現在還需要解決一個問題,即Leader是怎麼被選出來的?既然是Raft-based,那麼採用的也是Raft算法中的Leader選舉策略。讓Raft選出的Leader稱為active controller。網上有很多關於Raft選主的文章,這裏就不在贅述了,有興趣的可以讀一讀Raft的論文:《In Search of an Understandable Consensus Algorithm(Extended Version)》。

這套Raft quorum的一個好處在於它天然提供了低延時的failover,因此leader的切換會非常的迅速和及時,因為理論上不再有元數據加載的過程了,所有的元數據現在都同步保存follower節點的內存中,它已經有其他Broker需要拉取的所有元數據信息了!更酷的是,它避免了現在機制中一旦Controller切換要全量拉取元數據的低效行為,Broker無需重新拉取之前已經“消費”的元數據變更消息,它只需要從新Leader繼續“消費”即可。

另一個好處在於:採用了這套機制后,Kafka可以做元數據的緩存了(metadata caching):即Broker能夠把元數據保存在磁盤上,同時就像剛才說的,Broker只需讀取它關心的那部分數據即可。還有,和現在snapshot機制類似,如果一個Broker保存的元數據落後Controller太多或者是一個全新的Broker,Kafka甚至可以像Raft那樣直接發送一個snapshot文件,快速令其追上進度。當然大多數情況下,Broker只需要拉取delta增量數據即可。

Post KIP-500 Broker註冊

當前Broker啟動之後會向ZooKeeper註冊自己的信息,比如自己的主機名、端口、監聽協議等數據。移除ZooKeeper之後,Broker的註冊機制也要發生變化:Broker需要向active controller發送心跳來進行註冊。Controller收集心跳中包含的Broker數據構建整個Kafka集群信息,如下圖所示:

 同時Controller也會對心跳進行響應,顯式地告知Broker它們是否被允許加入集群——如果不允許,則可能需要被隔離(fenced)。當然controller自己也可以對自己進行隔離。我們針對前面提到的隔離場景討論下KIP-500是怎麼應對的。

Fencing

首先是普通Broker與集群完全隔離的場景,比如該Broker無法與controller和其他Broker進行通信,但它依然可以和客戶端程序交互。此時,fencing機制就很簡單了,直接讓controller令其下線即可。這和現在依靠ZooKeeper會話機制維持Broker判活的機制是一模一樣的,沒有太大改進。

第二種情況是Broker間的通訊中斷。此時消息無法在leader、follower間進行備份。但是對於元數據而言,我們不會看到數據不一致的情形,因為Broker依然可以和controller通訊,因此也不會有什麼問題。

第三種情況是Broker與Controller的隔離。現有機制下這是個問題,但KIP-500之後,Controller僅僅將該Broker“踢出場”即可,不會造成元數據的不一致。

最後一種情況是Broker與ZooKeeper的隔離, 既然ZooKeeper要被移除了,自然這也不是問題了。

部署

終於聊到KIP-500之後的Kafka運維了。下錶總結了KIP-500前後的部署情況對比:

很簡單,現在任何時候部署和運維Kafka都要考慮對ZooKeeper的運維管理。在KIP-500之後我們只需要關心Kafka即可。

Controller quorum共享模式

如前所述,controller改成Raft quorum機制后,可能使用3或5台機器構成一個小的quorum。那麼一個很自然的問題是,這些Broker機器還能否用作他用,是唯一用作controller quorum還是和其他Broker一樣正常處理。社區對此也做了解釋:兩種都支持!

如果你的Kafka集群資源很緊張,你可以使用共享controller模式(Shared Controller Mode),即充當controller quorum的Broker機器也能處理普通的客戶端請求;相反地,如果你的Kafka資源很充足,專屬controller模式(Separate Controller Mode)可能是更適合的,即在controller quorum中的Broker機器排它地用作Controller的選舉之用,不再對客戶端提供讀寫服務。這樣可以實現更好的資源隔離,適用於大集群。

Roadmap

最後說一下KIP-500的計劃。社區計劃分三步走:

第一步是移除客戶端對ZooKeeper的依賴——這一步基本上已經完成了,除了目前AdminClient還有少量的API依賴ZooKeeper之外,其他客戶端應該說都不需要訪問ZooKeeper了;第二步是移除Broker端的ZooKeeper依賴:這主要包括移除Broker端需要訪問ZooKeeper的代碼,以及增加新的Broker端API,如前面所說的AlterISR等,最後是將對ZooKeeper的訪問全部集中在controller端;最後一步就是實現controller quorum,實現Raft-based的quorum負責controller的選舉。

至於Kafka升級,如果從現有的Kafka直接升級到KIP-500之後的Kafka會比較困難,因此社區打算引入一個名為Bridge Release的中間過渡版本,如下圖所示:

這個Bridge版本的特點在於所有對ZooKeeper的訪問都集中到了controller端,Broker訪問ZooKeeper的其他代碼都被移除了。 

總結

KIP-500應該說是最近幾年社區提出的最重磅的KIP改進了。它幾乎是顛覆了Kafka已有的使用模式,摒棄了之前重度依賴的Apache ZooKeeper。就我個人而言,我是很期待這個KIP,後續有最新消息我也會在一併同步出來。讓我們靜觀其變吧~~~

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

大陸寄台灣空運注意事項

大陸海運台灣交貨時間多久?

※避免吃悶虧無故遭抬價!台中搬家公司免費估價,有契約讓您安心有保障!

SpringBoot基本配置詳解

SpringBoot項目有一些基本的配置,比如啟動圖案(banner),比如默認配置文件application.properties,以及相關的默認配置項。

示例項目代碼在:

一、啟動圖案banner

編寫banner.txt放入resources文件夾下,然後啟動項目即可修改默認圖案。

關於banner的生成,可以去一些專門的網站。

比如:https://www.bootschool.net/ascii

二、配置文件application

2.1 application.properties/yml

resources下通常會默認生成一個application.properties文件,這個文件包含了SpringBoot項目的全局配置文件。裏面的配置項通常是這樣的:

server.port=8080

在這個文件里我們可以添加框架支持的配置項,比如項目端口號、JDBC連接的數據源、日誌級別等等。

現在比較流行的是將properties文件改為yml文件。yml文件的格式yaml是這樣的:

server:
    port: 8080

yml和properties的作用是一樣的。而yml的好處是顯而易見的——更易寫易讀。

屬性之間互相調用使用${name}:

eknown:
    email: eknown@163.com
    uri: http://www.eknown.cn
    title: 'hello, link to ${eknown.uri} or email to ${eknown.email}'

鏈接:

2.2 多環境配置文件

通常開發一個應用會有多個環境,常見如dev/prod,也會有test,甚至其他一些自定義的環境,SpringBoot支持配置文件的靈活切換。

定義新配置文件需要遵循以下格式:application-{profile}.properties 或者application-{profile}.yml

比如現在有dev和prod兩個環境,我需要在application.yml文件之外新建兩個文件:

  1. application-dev.yml

    server:
       port: 8080
  2. application-prod.yml

    server:
      port: 8081

然後在application.yml中通過application.profiles.active={profile}指明啟用那個配置:

application:
    profiles:
      active: dev

除了在application.yml中指定配置文件外,還可以通過啟動命令指定:java -jar xxx.jar --spring.profiles.active=dev

2.2 自定義配置項並獲取它

主要介紹兩種方式,獲取單個配置項和獲取多個配置項。

舉例:

eknown:
    email: eknown@163.com
    uri: http://www.eknown.cn

2.2.1 使用@Value註解獲取單個配置項

@Value("${eknown.email}")
private String email;

@Value("${eknown.uri}")
private String url;

注意:使用@Value註解的時候,所在類必須被Spring容器管理,也就是被@Component、@Controller、@Service等註解定義的類。

2.2.2 獲取多個配置項

第一種,定義一個bean類,通過@Value獲取多個配置項:

@Component
public class MyConfigBean {
  
}

然後我們通過get方法來獲取這些值:

@RestController
public class BasicAction {
  
  @Autowired
  private MyConfigBean myConfigBean;

}

第二種,使用註解@ConfigurationProperties:

@Component
@ConfigurationProperties(perfix="eknown")
public class MyConfigBean {

  private String email;
  private String uri;
}

這裏只需要通過prefix指定前綴即可,後面的值自動匹配。

這裏我們還使用了@Component註解來讓spring容器管理這個MyConfigBean。

此外,我們可以不需要引入@Component,轉而在Application啟動類上加上@EnableConfigurationProperties({MyConfigBean.class})來啟動這個配置。

注意:我們這裡是從主配置文件,也就是SpringBoot默認的application-profile文件中獲取配置數據的。

而從自定義的配置文件,比如test.yml這種形式中獲取配置項時,情況是有點不大一樣的。

三、自定義配置文件

上面介紹的配置文件都是springboot默認的application開頭的文件。如果要自定義一個配置文件呢,比如test.yml或test.properties,怎麼獲取其中的配置項呢?

使用@PageResource註解即可。

首先我們來看一下讀取自定義的properties文件里的內容:

test.properties

hello.time=2019.11.19
hello.name=eknown

定義Configuration類:

@Configuration
@PropertySource("classpath:test.properties")
//@PropertySource("classpath:test.yml") // 注意,yml文件不能直接這樣寫,會讀不出數據
@ConfigurationProperties(prefix = "hello")
public class TestConfiguration {
    private String name;
    private String time;

    // hide get and set methods
}

測試一下:

@RestController
@RequestMapping(value = "test")
public class TestAction {

    @Autowired
    private TestConfiguration testConfiguration;

    @GetMapping(value = "config")
    public String test() {
        return testConfiguration.getName() + "<br/>" + testConfiguration.getTime();
    }
}

如果將properties文件換成yml文件呢?

我們嘗試一下,發現:

讀不出數據?

分析一下@PropertySource註解,發現其使用的PropertySourceFactory是DefaultPropertySourceFactory.

這個類的源碼如下:

public class DefaultPropertySourceFactory implements PropertySourceFactory {
    public DefaultPropertySourceFactory() {
    }

    public PropertySource<?> createPropertySource(@Nullable String name, EncodedResource resource) throws IOException {
        return name != null ? new ResourcePropertySource(name, resource) : new ResourcePropertySource(resource);
    }
}

這個類只能處理properties文件,無法處理yml文件。所以我們需要自定義一個YmlSourceFactory。

public class YamlSourceFactory extends DefaultPropertySourceFactory {

    @Override
    public PropertySource<?> createPropertySource(String name, EncodedResource resource) throws IOException {
        return new YamlPropertySourceLoader().load(resource.getResource().getFilename()
                , resource.getResource()).get(0);
    }
}

然後定義test.yml文件的config類:

@Configuration
@PropertySource(value = "classpath:test.yml", encoding = "utf-8", factory = YamlSourceFactory.class)
@ConfigurationProperties(prefix = "yml.hello")
public class TestYamlConfiguration {
    private String name;
    private String time;

    // hide get and set methods
}

注:為了區分test.properties和test.yml,這裏的test.yml中的屬性以yml.hello開頭。

編寫一下測試:

    @Autowired
    private TestYamlConfiguration ymlConfiguration;

    @GetMapping(value = "yml")
    public String testYml() {
        return "yml config: <br/>" + ymlConfiguration.getName() + "<br/>" + ymlConfiguration.getTime();
    }

訪問:

四、補充@ConfigurationProperties

網上一些資料中,為配合使用@ConfigurationProperties,還使用了@EnableConfigurationProperties註解。

經過測試發現:

  1. 從SpringBoot默認配置文件讀取配置信息,使用@ConfigurationProperties + @Component/@Configuration,或者@ConfigurationProperties + 在啟動類添加@EnableConfigurationProperties({class})。這兩種方式都能解決問題

  2. 從非默認配置文件讀取配置信息,需要利用@PropertySource註解。同樣兩種方式:

    2.1 @PropertySource + @ConfigurationProperties + @Component/@Configuration

    2.2 @PropertySource + @ConfigurationProperties + @Component/@Configuration + @EnableConfigurationProperties,第二種方式存在一個問題,即還是必須要使用@Component註解,如果不使用,則會導致讀取配置信息為null,但程序不會報錯;而如果採用了,則會導致bean類的set方法被執行兩次(也就是生成了兩個同樣類型的bean類)。這種方式不建議!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想要讓你的商品成為最夯、最多人討論的話題?網頁設計公司讓你強力曝光

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

※專營大陸快遞台灣服務

台灣快遞大陸的貨運公司有哪些呢?

Hadoop壓縮的圖文教程

近期由於Hadoop集群機器硬盤資源緊張,有需求讓把 Hadoop 集群上的歷史數據進行下壓縮,開始從網上查找的都是關於各種壓縮機制的對比,很少有關於怎麼壓縮的教程(我沒找到。。),再此特記錄下本次壓縮的過程,方便以後查閱,利己利人。

 

本文涉及的所有 jar包、腳本、native lib 見文末的相關下載 ~

 

我的壓縮版本: 

Jdk 1.7及以上

Hadoop-2.2.0 版本

 

壓縮前環境準備:

關於壓縮算法對比,網上資料很多,這裏我用的是 Bzip2 的壓縮方式,比較中庸,由於是Hadoop自帶的壓縮機制,也不需要額外下載別的東西,只需要在 Hadoop根目錄下 lib/native 文件下有如下文件即可:

 

 

 

 

壓縮之前要檢查 Hadoop 集群支持的壓縮算法: hadoop checknative

每台機器都要檢查一下,都显示如圖 true 則說明 集群支持 bzip2 壓縮,

如果显示false 則需要將上圖的文件下載拷貝到 Hadoop根目錄下 lib/native

 

 

 

壓縮程序介紹:

 

壓縮程序用到的類 getFileList(獲取文件路徑) 、 FileHdfsCompress(壓縮類)、FileHdfsDeCompress(解壓縮類) ,只用到這三個類即可完成壓縮/解壓縮操作。

 

 

 

 

getFileList 作用:遞歸打印 傳入文件目錄下文件的根路徑,包括子目錄下的文件。開始想直接輸出到文件中,後來打包放到集群上運行時,發現文件沒有內容,可能是由於分佈式運行的關係,所以就把路徑打印出來,人工在放到文件中。

 

核心代碼:
public static void listAllFiles(String path, List<String> strings) throws IOException {
        FileSystem hdfs = getHdfs(path);
        Path[] filesAndDirs = getFilesAndDirs(path);

        for(Path p : filesAndDirs){
            if(hdfs.getFileStatus(p).isFile()){
                if(!p.getName().contains("SUCCESS")){
                    System.out.println(p);
                }
            }else{
                listAllFiles(p.toString(),strings);
            }
        }
       // FileUtils.writeLines(new File(FILE_LIST_OUTPUT_PATH), strings,true);

    }

public static FileSystem getHdfs(String path) throws IOException {
        Configuration conf = new Configuration();
        return FileSystem.get(URI.create(path),conf);
    }


public static Path[] getFilesAndDirs(String path) throws IOException { FileStatus[] fs = getHdfs(path).listStatus(new Path(path)); return FileUtil.stat2Paths(fs); }

  

 FileHdfsCompress:壓縮程序非常簡單,對應程序里的 FileHdfsCompress 類,(解壓縮是 FileHdfsDeCompress),採用的是Hadoop 原生API  ,將Hadoop集群上原文件讀入作為輸入流,將壓縮路徑的輸入流作為輸出,再使用相關的壓縮算法即,代碼如下:

 

核心代碼:

 //指定壓縮方式
            Class<?> codecClass = Class.forName(COMPRESS_CLASS_NAME);

            Configuration conf = new Configuration();
            CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
            // FileSystem fs = FileSystem.get(conf);
            FileSystem fs = FileSystem.get(URI.create(inputPath),conf);

            //原文件路徑 txt 用原本的輸入流讀入
            FSDataInputStream in = fs.open(new Path(inputPath));

            //創建 HDFS 上的輸出流,壓縮路徑
            //通過文件系統獲取輸出流
            OutputStream out = fs.create(new Path(FILE_OUTPUT_PATH));
            //對輸出流的數據壓縮
            CompressionOutputStream compressOut = codec.createOutputStream(out);
            //讀入原文件 壓縮到HDFS上 輸入--普通流  輸出-壓縮流
            IOUtils.copyBytes(in, compressOut, 4096,true);

  

以下是代碼優化的過程,不涉及壓縮程序使用,不感興趣同學可以跳過 ~ :

 

在實際編碼中,我其實是走了彎路的,一開始並沒有想到用 Hadoop API 就能實現壓縮解壓縮功能,代碼到此其實是經歷了優化迭代的過程。

 

最開始時壓縮的思路就是 將文件讀進來,再壓縮出去,一開始使用了 MapReduce 的方式,在編碼過程中,由於對生成壓縮文件的路徑還有要求,又在 Hadoop 輸出時自定義了輸出類來使的輸出文件的名字符合要求,不是 part-r-0000.txt ,而是時間戳.txt 的格式,至此符合原線上路徑的要求。

 

而在實際運行過程中發現,MR 程序需要啟動 Yarn,並佔用Yarn 資源,由於壓縮時間較長,有可能會長時間佔用 集群資源不釋放,後來發現 MR 程序的初衷是用來做并行計算的,而壓縮僅僅是 map 任務讀取一條就寫一條,不涉及計算,就是內容的簡單搬運。所以這裏放棄了使用 MR 想着可不可以就用簡單的 Hadoop API 就完成壓縮功能,經過一番嘗試后,發現真的可行! 使用了 Hadoop API 釋放了集群資源,壓縮速度也還可以,這樣就把這個壓縮程序當做一個後台進程跑就行了也不用考慮集群資源分配的問題

 

實測壓縮步驟:

 

1 將項目打包,上傳到hadoop 集群任一節點即可,準備好相應的腳本,輸入數據文件,日誌文件,如下圖:

 

 

 

 

2 使用獲取文件路徑腳本,打印路徑: 

 

 

 

getFileLish.sh 腳本內容如下,就是簡單調用,傳入參數為 hadoop集群上 HDFS 上目錄路徑

#!/bin/sh

echo "begin get fileList"

echo "第一個參數$1"

if [ ! -n "$1" ]; then
echo "check param!"
fi

#original file
hadoop jar hadoop-compress-1.0.jar com.people.util.getFileList  $1

  

3 將 打印的路徑 粘貼到 compress.txt 中,第 2 步中會把目錄的文件路徑包括子目錄路徑都打印出來,將其粘貼進  compress.txt 中即可,注意 文件名可隨意定

 

4 使用壓縮腳本即可,sh compress.sh /data/new_compress/compress.txt  ,加粗的部分是腳本的參數意思是 第3步中文件的路徑,注意:這裏只能是絕對路徑,不然可能報找不到文件的異常。

 

 

 compress.sh 腳本內容如下,就是簡單調用,傳入參數為 第3步中文件的絕對路徑

 

#!/bin/sh
echo "begin compress"

echo "第一個參數$1"

if [ ! -n "$1" ]; then
echo "check param!"
fi


hadoop jar hadoop-compress-1.0.jar com.people.compress.FileHdfsCompress  $1 >> /data/new_compress/compress.log 2>&1 &

  

 

5 查看壓縮日誌,發現後台程序已經開始壓縮了!,tail  -f  compress.log

 

6 如果感覺壓縮速度不夠,可以多台機器執行腳本,也可以一台機器執行多個任務,因為這個腳本任務是一個後台進程,不會佔用集群 Yarn 資源。

 

 相關下載:

 

程序源碼下載: git@github.com:fanpengyi/hadoop-compress.git

 

Hadoop 壓縮相關需要的 腳本、jar包、lib 下載: 關注公眾號 “大數據江湖”,後台回復 “hadoop壓縮”,即可下載

 

長按即可關注

 

 — The End —

 

 

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想要讓你的商品成為最夯、最多人討論的話題?網頁設計公司讓你強力曝光

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

※專營大陸快遞台灣服務

台灣快遞大陸的貨運公司有哪些呢?