環境篇:Kylin3.0.1集成CDH6.2.0

環境篇:Kylin3.0.1集成CDH6.2.0

Kylin是什麼?

Apache Kylin™是一個開源的、分佈式的分析型數據倉庫,提供Hadoop/Spark 之上的 SQL 查詢接口及多維分析(OLAP)能力以支持超大規模數據,最初由 eBay 開發並貢獻至開源社區。它能在亞秒內查詢巨大的表。

Apache Kylin™ 令使用者僅需三步,即可實現超大數據集上的亞秒級查詢。

  1. 定義數據集上的一個星形或雪花形模型
  2. 在定義的數據表上構建cube
  3. 使用標準 SQL 通過 ODBC、JDBC 或 RESTFUL API 進行查詢,僅需亞秒級響應時間即可獲得查詢結果

如果沒有Kylin

大數據在數據積累后,需要計算,而數據越多,算力越差,內存需求也越高,詢時間與數據量成線性增長,而這些對於Kylin影響不大,大數據中硬盤往往比內存要更便宜,Kylin通過與計算的形式,以空間換時間,亞秒級的響應讓人們愛不釋手。

注:所謂詢時間與數據量成線性增長:假設查詢 1 億條記錄耗時 1 分鐘,那麼查詢 10 億條記錄就需 10分鐘,100 億條記錄就至少需要 1 小時 40 分鐘。

http://kylin.apache.org/cn/

1 Kylin架構

Kylin 提供與多種數據可視化工具的整合能力,如 Tableau,PowerBI 等,令用戶可以使用 BI 工具對 Hadoop 數據進行分析

  1. REST Server REST Server

是一套面嚮應用程序開發的入口點,旨在實現針對 Kylin 平台的應用開發 工作。 此類應用程序可以提供查詢、獲取結果、觸發 cube 構建任務、獲取元數據以及獲取 用戶權限等等。另外可以通過 Restful 接口實現 SQL 查詢。

  1. 查詢引擎(Query Engine)

當 cube 準備就緒后,查詢引擎就能夠獲取並解析用戶查詢。它隨後會與系統中的其它 組件進行交互,從而向用戶返回對應的結果。

  1. 路由器(Routing)

在最初設計時曾考慮過將 Kylin 不能執行的查詢引導去 Hive 中繼續執行,但在實踐后 發現 Hive 與 Kylin 的速度差異過大,導致用戶無法對查詢的速度有一致的期望,很可能大 多數查詢幾秒內就返回結果了,而有些查詢則要等幾分鐘到幾十分鐘,因此體驗非常糟糕。 最後這個路由功能在發行版中默認關閉。

  1. 元數據管理工具(Metadata)

Kylin 是一款元數據驅動型應用程序。元數據管理工具是一大關鍵性組件,用於對保存 在 Kylin 當中的所有元數據進行管理,其中包括最為重要的 cube 元數據。其它全部組件的 正常運作都需以元數據管理工具為基礎。 Kylin 的元數據存儲在 hbase 中。

  1. 任務引擎(Cube Build Engine)

這套引擎的設計目的在於處理所有離線任務,其中包括 shell 腳本、Java API 以及 MapReduce 任務等等。任務引擎對 Kylin 當中的全部任務加以管理與協調,從而確保每一項任務 都能得到切實執行並解決其間出現的故障。

2 Kylin軟硬件要求

  • 軟件要求
    • Hadoop: 2.7+, 3.1+ (since v2.5)
    • Hive: 0.13 – 1.2.1+
    • HBase: 1.1+, 2.0 (since v2.5)
    • Spark (optional) 2.3.0+
    • Kafka (optional) 1.0.0+ (since v2.5)
    • JDK: 1.8+ (since v2.5)
    • OS: Linux only, CentOS 6.5+ or Ubuntu 16.0.4+
  • 硬件要求
    • 最低配置:4 core CPU, 16 GB memory
    • 高負載場景:24 core CPU, 64 GB memory

3 Kylin單機安裝

3.1 修改環境變量

vim /etc/profile 
#>>>注意地址指定為自己的
#kylin
export KYLIN_HOME=/usr/local/src/kylin/apache-kylin-3.0.1-bin-cdh60
export PATH=$PATH:$KYLIN_HOME/bin
    
#cdh
export CDH_HOME=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373

#hadoop
export HADOOP_HOME=${CDH_HOME}/lib/hadoop
export HADOOP_DIR=${HADOOP_HOME}
export HADOOP_CLASSPATH=${HADOOP_HOME}
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
    
#hbase
export HBASE_HOME=${CDH_HOME}/lib/hbase
export PATH=$PATH:$HBASE_HOME/bin
    
 #hive
export HIVE_HOME=${CDH_HOME}/lib/hive
export PATH=$PATH:$HIVE_HOME/bin
    
#spark
export SPARK_HOME=${CDH_HOME}/lib/spark
export PATH=$PATH:$SPARK_HOME/bin   

#kafka
export KAFKA_HOME=${CDH_HOME}/lib/kafka
export PATH=$PATH:$KAFKA_HOME/bin 
#<<<

source /etc/profile 

3.2 修改hdfs用戶權限

usermod -s /bin/bash hdfs
su hdfs
hdfs dfs -mkdir /kylin
hdfs dfs -chmod a+rwx /kylin
su

3.3 上傳安裝包解壓

mkdir /usr/local/src/kylin
cd /usr/local/src/kylin
tar -zxvf apache-kylin-3.0.1-bin-cdh60.tar.gz
cd /usr/local/src/kylin/apache-kylin-3.0.1-bin-cdh60

3.4 Java兼容hbase

  • hbase 所有節點

在CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar后添加

>>---
:/opt/cloudera/parcels/CDH/lib/hbase/lib/*
<<---
  • Kylin節點添加jar包
cp /opt/cloudera/cm/common_jars/commons-configuration-1.9.cf57559743f64f0b3a504aba449c9649.jar /usr/local/src/kylin/apache-kylin-3.0.1-bin-cdh60/tomcat/lib

這2步不做會引起 Could not find or load main class org.apache.hadoop.hbase.util.GetJavaProperty

3.5 啟動停止

./bin/kylin.sh start
#停止  ./bin/kylin.sh stop

3.6 web頁面

訪問端口7070

賬號密碼:ADMIN / KYLIN

4 Kylin集群安裝

4.1 修改環境變量

vim /etc/profile 
#>>>注意地址指定為自己的
#kylin
export KYLIN_HOME=/usr/local/src/kylin/apache-kylin-3.0.1-bin-cdh60
export PATH=$PATH:$KYLIN_HOME/bin
    
#cdh
export CDH_HOME=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373

#hadoop
export HADOOP_HOME=${CDH_HOME}/lib/hadoop
export HADOOP_DIR=${HADOOP_HOME}
export HADOOP_CLASSPATH=${HADOOP_HOME}
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
    
#hbase
export HBASE_HOME=${CDH_HOME}/lib/hbase
export PATH=$PATH:$HBASE_HOME/bin
    
 #hive
export HIVE_HOME=${CDH_HOME}/lib/hive
export PATH=$PATH:$HIVE_HOME/bin
    
#spark
export SPARK_HOME=${CDH_HOME}/lib/spark
export PATH=$PATH:$SPARK_HOME/bin   

#kafka
export KAFKA_HOME=${CDH_HOME}/lib/kafka
export PATH=$PATH:$KAFKA_HOME/bin 
#<<<

source /etc/profile 

4.2 修改hdfs用戶權限

usermod -s /bin/bash hdfs
su hdfs
hdfs dfs -mkdir /kylin
hdfs dfs -chmod a+rwx /kylin
su

4.3 上傳安裝包解壓

mkdir /usr/local/src/kylin
cd /usr/local/src/kylin
tar -zxvf apache-kylin-3.0.1-bin-cdh60.tar.gz
cd /usr/local/src/kylin/apache-kylin-3.0.1-bin-cdh60

4.4 Java兼容hbase

  • hbase 所有節點

在CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar后添加

vim /opt/cloudera/parcels/CDH/lib/hbase/bin/hbase
>>---
:/opt/cloudera/parcels/CDH/lib/hbase/lib/*
<<---
  • Kylin節點添加jar包
cp /opt/cloudera/cm/common_jars/commons-configuration-1.9.cf57559743f64f0b3a504aba449c9649.jar /usr/local/src/kylin/apache-kylin-3.0.1-bin-cdh60/tomcat/lib

這2步不做會引起 Could not find or load main class org.apache.hadoop.hbase.util.GetJavaProperty

4.5 修改kylin配置文件

Kylin根據自己的運行職責狀態,可以劃分為以下三大類角色

  • Job節點:僅用於任務調度,不用於查詢
  • Query節點:僅用於查詢,不用於構建任務的調度
  • All節點:模式代表該服務同時用於任務調度和 SQL 查詢
    • 2.0以前同一個集群只能有一個節點(Kylin實例)用於job調度(all或者job模式的只能有一個實例)
    • 2.0開始可以多個job或者all節點實現HA
vim conf/kylin.properties
>>----
#指定元數據庫路徑,默認值為 kylin_metadata@hbase,確保kylin集群使用一致
kylin.metadata.url=kylin_metadata@hbase
#指定 Kylin 服務所用的 HDFS 路徑,默認值為 /kylin,請確保啟動 Kylin 實例的用戶有讀寫該目錄的權限
kylin.env.hdfs-working-dir=/kylin
kylin.server.mode=all
kylin.server.cluster-servers=cdh01.cm:7070,cdh02.cm:7070,cdh03.cm:7070
kylin.storage.url=hbase
#構建任務失敗后的重試次數,默認值為 0
kylin.job.retry=2
#最大構建併發數,默認值為 10
kylin.job.max-concurrent-jobs=10
#構建引擎間隔多久檢查 Hadoop 任務的狀態,默認值為 10(s)
kylin.engine.mr.yarn-check-interval-seconds=10
#MapReduce 任務啟動前會依據輸入預估 Reducer 接收數據的總量,再除以該參數得出 Reducer 的數目,默認值為 500(MB)
kylin.engine.mr.reduce-input-mb=500
#MapReduce 任務中 Reducer 數目的最大值,默認值為 500
kylin.engine.mr.max-reducer-number=500
#每個 Mapper 可以處理的行數,默認值為 1000000,如果將這個值調小,會起更多的 Mapper
kylin.engine.mr.mapper-input-rows=1000000
#啟用分佈式任務鎖
kylin.job.scheduler.default=2
kylin.job.lock=org.apache.kylin.storage.hbase.util.ZookeeperJobLock
<<----

4.6 啟動停止

所有Kylin節點

./bin/kylin.sh start
#停止  ./bin/kylin.sh stop

4.7 nginx負載均衡

yum -y install nginx

vim /etc/nginx/nginx.conf
>>---http中添加替換內容
upstream kylin {
        least_conn;
        server 192.168.37.10:7070 weight=8;
        server 192.168.37.11:7070 weight=7;
        server 192.168.37.12:7070 weight=7;
	}
    server {
        listen       9090;
        server_name  localhost;

        location / {
                proxy_pass http://kylin;
        }
    }

<<---

#重啟 nginx 服務
systemctl restart nginx  

4.8 訪問web頁面

訪問任何節點的7070端口都可以進入kylin

訪問nginx所在機器9090端口/kylin負載均衡進入kylin

賬號密碼:ADMIN / KYLIN

4 大規模并行處理@列式存儲

自從 10 年前 Hadoop 誕生以來,大數據的存儲和批處理問題均得到了妥善解決,而如何高速地分析數據也就成為了下一個挑戰。於是各式各樣的“SQL on Hadoop”技術應運而生,其中以 Hive 為代表,Impala、Presto、Phoenix、Drill、 SparkSQL 等緊隨其後(何以解憂–唯有CV SQL BOY)。它們的主要技術是“大規模并行處理”(Massive Parallel Processing,MPP)和“列式存儲”(Columnar Storage)

大規模并行處理可以調動多台機器一起進行并行計算,用線性增加的資源來換取計算時間的線性下降

列式存儲則將記錄按列存放,這樣做不僅可以在訪問時只讀取需要的列,還可以利用存儲設備擅長連續讀取的特點,大大提高讀取的速率。

這兩項關鍵技術使得 Hadoop 上的 SQL 查詢速度從小時提高到了分鐘。 然而分鐘級別的查詢響應仍然離交互式分析的現實需求還很遠。分析師敲入 查詢指令,按下回車,還需要去倒杯咖啡,靜靜地等待查詢結果。得到結果之後才能根據情況調整查詢,再做下一輪分析。如此反覆,一個具體的場景分析常常需要幾小時甚至幾天才能完成,效率低下。 這是因為大規模并行處理和列式存儲雖然提高了計算和存儲的速度,但並沒有改變查詢問題本身的時間複雜度,也沒有改變查詢時間與數據量成線性增長的關係這一事實。

假設查詢 1 億條記錄耗時 1 分鐘,那麼查詢 10 億條記錄就需 10分鐘,100 億條記錄就至少需要 1 小時 40 分鐘。 當然,可以用很多的優化技術縮短查詢的時間,比如更快的存儲、更高效的壓縮算法,等等,但總體來說,查詢性能與數據量呈線性相關這一點是無法改變的。雖然大規模并行處理允許十倍或百倍地擴張計算集群,以期望保持分鐘級別的查詢速度,但購買和部署十倍或百倍的計算集群又怎能輕易做到,更何況還有 高昂的硬件運維成本。 另外,對於分析師來說,完備的、經過驗證的數據模型比分析性能更加重要, 直接訪問紛繁複雜的原始數據並進行相關分析其實並不是很友好的體驗,特別是在超大規模的數據集上,分析師將更多的精力花在了等待查詢結果上,而不是在更加重要的建立領域模型上

5 Kylin如何解決海量數據的查詢問題

**Apache Kylin 的初衷就是要解決千億條、萬億條記錄的秒級查詢問題,其中的關鍵就是要打破查詢時間隨着數據量成線性增長的這個規律。根據OLAP分析,可以注意到兩個結論: **

  • 大數據查詢要的一般是統計結果,是多條記錄經過聚合函數計算后的統計值。原始的記錄則不是必需的,或者訪問頻率和概率都極低。

  • 聚合是按維度進行的,由於業務範圍和分析需求是有限的,有意義的維度聚合組合也是相對有限的,一般不會隨着數據的膨脹而增長。

**基於以上兩點,我們可以得到一個新的思路——“預計算”。應盡量多地預先計算聚合結果,在查詢時刻應盡量使用預算的結果得出查詢結果,從而避免直 接掃描可能無限增長的原始記錄。 **

舉例來說,使用如下的 SQL 來查詢 11月 11日 那天銷量最高的商品:

select item,sum(sell_amount)
from sell_details
where sell_date='2020-11-11'
group by item
order by sum(sell_amount) desc

用傳統的方法時需要掃描所有的記錄,再找到 11月 11日 的銷售記錄,然後按商品聚合銷售額,最後排序返回。

假如 11月 11日 有 1 億條交易,那麼查詢必須讀取並累計至少 1 億條記錄,且這個查詢速度會隨將來銷量的增加而逐步下降。如果日交易量提高一倍到 2 億,那麼查詢執行的時間可能也會增加一倍。

而使用預 計算的方法則會事先按維度 [sell_date , item] 計 算 sum(sell_amount)並存儲下來,在查詢時找到 11月 11日 的銷售商品就可以直接排序返回了。讀取的記錄數最大不會超過維度[sell_date,item]的組合數。

顯然這個数字將遠遠小於實際的銷售記錄,比如 11月 11日 的 1 億條交易包含了 100萬條商品,那麼預計算后就只有 100 萬條記錄了,是原來的百分之一。並且這些 記錄已經是按商品聚合的結果,因此又省去了運行時的聚合運算。從未來的發展來看,查詢速度只會隨日期和商品數目(時間,商品維度)的增長而變化,與銷售記錄的總數不再有直接聯繫。假如日交易量提高一倍到 2 億,但只要商品的總數不變,那麼預計算的結果記錄總數就不會變,查詢的速度也不會變。

預計算就是 Kylin 在“大規模并行處理”和“列式存儲”之外,提供給大數據分析的第三個關鍵技術。

6 Kylin 入門案例

6.1 hive數據準備

--創建數據庫kylin_hive
create database kylin_hive; 

--創建表部門表dept
create external table if not exists kylin_hive.dept(
deptno int,
dname string,
loc int )
row format delimited fields terminated by '\t';
--添加數據
INSERT INTO TABLE kylin_hive.dept VALUES(10,"ACCOUNTING",1700),(20,"RESEARCH",1800),(30,"SALES",1900),(40,"OPERATIONS",1700)
--查看數據
SELECT * FROM kylin_hive.dept

--創建員工表emp
create external table if not exists kylin_hive.emp(
empno int,
ename string,
job string,
mgr int,
hiredate string, 
sal double, 
comm double,
deptno int)
row format delimited fields terminated by '\t';

--添加數據
INSERT INTO TABLE kylin_hive.emp VALUES(7369,"SMITHC","LERK",7902,"1980-12-17",800.00,0.00,20),(7499,"ALLENS","ALESMAN",7698,"1981-2-20",1600.00,300.00,30),(7521,"WARDSA","LESMAN",7698,"1981-2-22",1250.00,500.00,30),(7566,"JONESM","ANAGER",7839,"1981-4-2",2975.00,0.00,20),(7654,"MARTIN","SALESMAN",7698,"1981-9-28",1250.00,1400.00,30),(7698,"BLAKEM","ANAGER",7839,"1981-5-1",2850.00,0.00,30),(7782,"CLARKM","ANAGER",7839,"1981-6-9",2450.00,0.00,10),(7788,"SCOTTA","NALYST",7566,"1987-4-19",3000.00,0.00,20),(7839,"KINGPR","ESIDENT",7533,"1981-11-17",5000.00,0.00,10),(7844,"TURNER","SALESMAN",7698,"1981-9-8",1500.00,0.00,30),(7876,"ADAMSC","LERK",7788,"1987-5-23",1100.00,0.00,20),(7900,"JAMESC","LERK",7698,"1981-12-3",950.00,0.00,30),(7902,"FORDAN","ALYST",7566,"1981-12-3",3000.00,0.00,20),(7934,"MILLER","CLERK",7782,"1982-1-23",1300.00,0.00,10)
--查看數據
SELECT * FROM kylin_hive.emp

6.2 創建工程

  • 輸入工程名稱以及工程描述

6.3 Kylin加載Hive表

雖然 Kylin 使用 SQL 作為查詢接口並利用 Hive 元數據,Kylin 不會讓用戶查詢所有的 hive 表,因為到目前為止它是一個預構建 OLAP(MOLAP) 系統。為了使表在 Kylin 中可用,使用 “Sync” 方法能夠方便地從 Hive 中同步表。

  • 選擇項目添加hive數據源
  • 添加數據源表–>hive庫名稱.表名稱(以逗號分隔)

  • 這裏只添加了表的Schema元信息,如果需要加載數據,還需要點擊Reload Table

6.4 Kylin添加Models(模型)

  • 填寫模型名字
  • 選擇事實表,這裏選擇員工EMP表為事實表
  • 添加維度表,這裏選擇部門DEPT表為維度表,並選擇我們的join方式,以及join連接字段

  • 選擇聚合維度信息
  • 選擇度量信息
  • 添加分區信息及過濾條件之後“Save”

6.5 Kylin構建Cube

Kylin 的 OLAP Cube 是從星型模式的 Hive 表中獲取的預計算數據集,這是供用戶探索、管理所有 cube 的網頁管理頁面。由菜單欄進入Model 頁面,系統中所有可用的 cube 將被列出。

  • 創建一個new cube
  • 選擇我們的model以及指定cube name
  • 添加我們的自定義維度,這裡是在創建Models模型時指定的事實表和維度表中取
    • LookUpTable可選擇normal或derived(一般列、衍生列)
    • normal緯度作為普通獨立的緯度,而derived 維度不會計算入cube,將由事實表的外鍵推算出

  • 添加統計維度,勾選相應列作為度量,kylin提供8種度量:SUM、MAX、MIN、COUNT、COUNT_DISTINCT、TOP_N、EXTENDED_COLUMN、PERCENTILE
    • DISTINCT_COUNT有兩個實現:
      1. 近似實現 HyperLogLog,選擇可接受的錯誤率,低錯誤率需要更多存儲;
      2. 精確實現 bitmap
    • TopN 度量在每個維度結合時預計算,需要兩個參數:
      1. 一是被用來作為 Top 記錄的度量列,Kylin 將計算它的 SUM 值並做倒序排列,如sum(price)
      2. 二是 literal ID,代表最 Top 的記錄,如seller_id
    • EXTENDED_COLUMN
      • Extended_Column 作為度量比作為維度更節省空間。一列和零一列可以生成新的列
    • PERCENTILE
      • Percentile 代表了百分比。值越大,錯誤就越少。100為最合適的值

  • 設置多個分區cube合併信息

如果是分區統計,需要關於歷史cube的合併,

這裡是全量統計,不涉及多個分區cube進行合併,所以不用設置歷史多個cube進行合併

  • Auto Merge Thresholds:

    • 自動合併小的 segments 到中等甚至更大的 segment。如果不想自動合併,刪除默認2個選項
  • Volatile Range:

    • 默認為0,會自動合併所有可能的cube segments,或者用 ‘Auto Merge’ 將不會合併最新的 [Volatile Range] 天的 cube segments
  • Retention Threshold:

    • 默認為0,只會保存 cube 過去幾天的 segment,舊的 segment 將會自動從頭部刪除
  • Partition Start Date:

    • cube 的開始日期
  • 高級設置

暫時也不做任何設

置高級設定關係到立方體是否足夠優化,可根據實際情況將維度列定義為強制維度、層級維度、聯合維度

  • Mandatory維度指的是總會存在於group by或where中的維度
  • Hierarchy是一組有層級關係的維度,如國家、省份、城市
  • Joint是將多個維度組合成一個維度

  • 額外的其他的配置屬性

這裏也暫時不做配置

Kylin 允許在 Cube 級別覆蓋部分 kylin.properties 中的配置

  • 完成保存配置

通過Planner計劃者,可以看到4個維度,得到Cuboid Conut=15,為2的4次方-1,因為全部沒有的指標不會使用,所以結果等於15。

  • 構建Cube

6.6 數據查詢

  • 根據部門查詢,部門工資總和
SELECT  DEPT.DNAME,SUM(EMP.SAL) 
FROM EMP 
LEFT JOIN DEPT 
ON DEPT.DEPTNO = EMP.DEPTNO  
GROUP BY DEPT.DNAME

7 入門案例構建流程

  • 動畫演示

8 Kylin的工作原理

就是對數據模型做 Cube 預計算,並利用計算的結果加速查詢,具體工作過程如下:

  1. 指定數據模型,定義維度和度量。

  2. 預計算 Cube,計算所有 Cuboid 並保存為物化視圖。

  3. 執行查詢時,讀取 Cuboid,運算,產生查詢結果。

由於 Kylin 的查詢過程不會掃描原始記錄,而是通過預計算預先完成表的關聯、聚合等複雜運算,並利用預計算的結果來執行查詢,因此相比非預計算的查詢技術,其速度一般要快一到兩個數量級,並且這點在超大的數據集上優勢更明顯。當數據集達到千億乃至萬億級別時,Kylin 的速度甚至可以超越其他非預計算技術 1000 倍以上。

9 Cube 和 Cuboid

Cube(或 Data Cube),即數據立方體,是一種常用於數據分析與索引的技術;它可以對原始數據建立多維度索引。通過 Cube 對數據進行分析,可以大大加快數據的查詢效率。

Cuboid 特指在某一種維度組合下所計算的數據。 給定一個數據模型,我們可以對其上的所有維度進行組合。對於 N 個維度來說,組合的所有可能性共有 2 的 N 次方種。對於每一種維度的組合,將度量做 聚合運算,然後將運算的結果保存為一個物化視圖,稱為 Cuboid。

所有維度組合的 Cuboid 作為一個整體,被稱為 Cube。所以簡單來說,一個 Cube 就是許多按維度聚合的物化視圖的集合。

下面來列舉一個具體的例子:

假定有一個電商的銷售數據集,其中維度包括 時間(Time)、商品(Item)、地點(Location)和供應商(Supplier),度量為銷售額(GMV)。

  • 那麼所有維度的組合就有 2 的 4 次方 =16 種
    • 一維度(1D) 的組合有[Time]、[Item]、[Location]、[Supplier]4 種
    • 二維度(2D)的組合 有[Time,Item]、[Time,Location]、[Time、Supplier]、[Item,Location]、 [Item,Supplier]、[Location,Supplier]6 種
    • 三維度(3D)的組合也有 4 種
    • 零維度(0D)的組合有 1 種
    • 四維度(4D)的組合有 1 種

10 cube構建算法

10.1 逐層構建算法

我們知道,一個N維的Cube,是由1個N維子立方體、N個(N-1)維子立方體、N*(N-1)/2個(N-2)維子立方體、……、N個1維子立方體和1個0維子立方體構成,總共有2^N個子立方體組成。

在逐層算法中,按維度數逐層減少來計算,每個層級的計算(除了第一層,它是從原始數據聚合而來),是基於它上一層級的結果來計算的。比如,[Group by A, B]的結果,可以基於[Group by A, B, C]的結果,通過去掉C后聚合得來的;這樣可以減少重複計算;當 0維度Cuboid計算出來的時候,整個Cube的計算也就完成了。

每一輪的計算都是一個MapReduce任務,且串行執行;一個N維的Cube,至少需要N次MapReduce Job。

算法優點:

  1. 此算法充分利用了MapReduce的優點,處理了中間複雜的排序和shuffle工作,故而算法代碼清晰簡單,易於維護;

  2. 受益於Hadoop的日趨成熟,此算法非常穩定,即便是集群資源緊張時,也能保證最終能夠完成。

算法缺點:

  1. 當Cube有比較多維度的時候,所需要的MapReduce任務也相應增加;由於Hadoop的任務調度需要耗費額外資源,特別是集群較龐大的時候,反覆遞交任務造成的額外開銷會相當可觀;

  2. 由於Mapper邏輯中並未進行聚合操作,所以每輪MR的shuffle工作量都很大,導致效率低下。

  3. 對HDFS的讀寫操作較多:由於每一層計算的輸出會用做下一層計算的輸入,這些Key-Value需要寫到HDFS上;當所有計算都完成后,Kylin還需要額外的一輪任務將這些文件轉成HBase的HFile格式,以導入到HBase中去;

總體而言,該算法的效率較低,尤其是當Cube維度數較大的時候。

10.2 快速構建算法

也被稱作“逐段”(By Segment) 或“逐塊”(By Split) 算法,從1.5.x開始引入該算法,該算法的主要思想是,每個Mapper將其所分配到的數據塊,計算成一個完整的小Cube 段(包含所有Cuboid)。每個Mapper將計算完的Cube段輸出給Reducer做合併,生成大Cube,也就是最終結果。如圖所示解釋了此流程。

與舊的逐層構建算法相比,快速算法主要有兩點不同:

  1. Mapper會利用內存做預聚合,算出所有組合;Mapper輸出的每個Key都是不同的,這樣會減少輸出到Hadoop MapReduce的數據量,Combiner也不再需要;

  2. 一輪MapReduce便會完成所有層次的計算,減少Hadoop任務的調配。

11 備份及恢復

Kylin將它全部的元數據(包括cube描述和實例、項目、倒排索引描述和實例、任務、表和字典)組織成層級文件系統的形式。然而,Kylin使用hbase來存儲元數據,而不是一個普通的文件系統。如果你查看過Kylin的配置文件(kylin.properties),你會發現這樣一行:

## The metadata store in hbase
kylin.metadata.url=kylin_metadata@hbase

這表明元數據會被保存在一個叫作“kylin_metadata”的htable里。你可以在hbase shell里scan該htbale來獲取它。

11.1 使用二進制包來備份Metadata Store

有時你需要將Kylin的Metadata Store從hbase備份到磁盤文件系統。在這種情況下,假設你在部署Kylin的hadoop命令行(或沙盒)里,你可以到KYLIN_HOME並運行:

./bin/metastore.sh backup

來將你的元數據導出到本地目錄,這個目錄在KYLIN_HOME/metadata_backps下,它的命名規則使用了當前時間作為參數:KYLIN_HOME/meta_backups/meta_year_month_day_hour_minute_second,如:meta_backups/meta_2020_06_18_19_37_49/

11.2 使用二進制包來恢復Metatdara Store

萬一你發現你的元數據被搞得一團糟,想要恢復先前的備份:

  1. 首先,重置Metatdara Store(這個會清理Kylin在hbase的Metadata Store的所有信息,請確保先備份):
./bin/metastore.sh reset
  1. 然後上傳備份的元數據到Kylin的Metadata Store:
./bin/metastore.sh restore $KYLIN_HOME/meta_backups/meta_xxxx_xx_xx_xx_xx_xx
  1. 等恢復操作完成,可以在“Web UI”的“System”頁面單擊“Reload Metadata”按鈕對元數據緩存進行刷新,即可看到最新的元數據

做完備份,刪除一些文件,然後進行恢複測試,完美恢復,叮叮叮!

12 kylin的垃圾清理

Kylin在構建cube期間會在HDFS上生成中間文件;除此之外,當清理/刪除/合併cube時,一些HBase表可能被遺留在HBase卻以後再也不會被查詢;雖然Kylin已經開始做自動化的垃圾回收,但不一定能覆蓋到所有的情況;你可以定期做離線的存儲清理:

  1. 檢查哪些資源可以清理,這一步不會刪除任何東西:
${KYLIN_HOME}/bin/kylin.sh org.apache.kylin.tool.StorageCleanupJob --delete false
  1. 你可以抽查一兩個資源來檢查它們是否已經沒有被引用了;然後加上“–delete true”選項進行清理。
${KYLIN_HOME}/bin/kylin.sh org.apache.kylin.tool.StorageCleanupJob --delete true

完成后,中間HDFS上的中間文件和HTable會被移除。

13 Kylin優化

13.1 維度優化

如果不進行任何維度優化,直接將所有的維度放在一個聚集組裡,Kylin就會計算所有的維度組合(cuboid)。

比如,有12個維度,Kylin就會計算2的12次方即4096個cuboid,實際上查詢可能用到的cuboid不到1000個,甚至更少。 如果對維度不進行優化,會造成集群計算和存儲資源的浪費,也會影響cube的build時間和查詢性能,所以我們需要進行cube的維度優化。

當你在保存cube時遇到下面的異常信息時,意味1個聚集組的維度組合數已經大於 4096 ,你就必須進行維度優化了。

或者發現cube的膨脹率過大。

但在現實情況中,用戶的維度數量一般遠遠大於4個。假設用戶有10 個維度,那麼沒有經過任何優化的Cube就會存在 2的10次方 = 1024個Cuboid;雖然每個Cuboid的大小存在很大的差異,但是單單想到Cuboid的數量就足以讓人想象到這樣的Cube對構建引擎、存儲引擎來說壓力有多麼巨大。因此,在構建維度數量較多的Cube時,尤其要注意Cube的剪枝優化(即減少Cuboid的生成)。

13.2 使用衍生維度

  • 衍生維度:維表中可以由主鍵推導出值的列可以作為衍⽣維度。

  • 使用場景:以星型模型接入時。例如用戶維表可以從userid推導出用戶的姓名,年齡,性別。

  • 優化效果:維度表的N個維度組合成的cuboid個數會從2的N次方降為2。

衍生維度用於在有效維度內將維度表上的非主鍵維度排除掉,並使用維度表的主鍵(其實是事實表上相應的外鍵)來替代它們。Kylin會在底層記錄維度表主鍵與維度表其他維度之間的映射關係,以便在查詢時能夠動態地將維度表的主鍵“翻譯”成這些非主鍵維度,並進行實時聚合。

雖然衍生維度具有非常大的吸引力,但這也並不是說所有維度表上的維度都得變成衍生維度,如果從維度表主鍵到某個維度表維度所需要的聚合工作量非常大,則不建議使用衍生維度。

13.3 使用聚合組(Aggregation group)

聚合組(Aggregation Group)是一種強大的剪枝工具。聚合組假設一個Cube的所有維度均可以根據業務需求劃分成若干組(當然也可以是一個組),由於同一個組內的維度更可能同時被同一個查詢用到,因此會表現出更加緊密的內在關聯。每個分組的維度集合均是Cube所有維度的一個子集,不同的分組各自擁有一套維度集合,它們可能與其他分組有相同的維度,也可能沒有相同的維度。每個分組各自獨立地根據自身的規則貢獻出一批需要被物化的Cuboid,所有分組貢獻的Cuboid的並集就成為了當前Cube中所有需要物化的Cuboid的集合。不同的分組有可能會貢獻出相同的Cuboid,構建引擎會察覺到這點,並且保證每一個Cuboid無論在多少個分組中出現,它都只會被物化一次。

對於每個分組內部的維度,用戶可以使用如下三種可選的方式定義,它們之間的關係,具體如下。

  1. 強制維度(Mandatory)

    • 強制維度:所有cuboid必須包含的維度,不會計算不包含強制維度的cuboid。

    • 適用場景:可以將確定在查詢時一定會使用的維度設為強制維度。例如,時間維度。

    • 優化效果:將一個維度設為強制維度,則cuboid個數直接減半。

如果一個維度被定義為強制維度,那麼這個分組產生的所有Cuboid中每一個Cuboid都會包含該維度。每個分組中都可以有0個、1個或多個強制維度。如果根據這個分組的業務邏輯,則相關的查詢一定會在過濾條件或分組條件中,因此可以在該分組中把該維度設置為強制維度。

  1. 層級維度(Hierarchy),

    • 層級維度:具有一定層次關係的維度。

    • 使用場景:像年,月,日;國家,省份,城市這類具有層次關係的維度。

    • 優化效果:將N個維度設置為層次維度,則這N個維度組合成的cuboid個數會從2的N次方減少到N+1。

每個層級包含兩個或更多個維度。假設一個層級中包含D1,D2…Dn這n個維度,那麼在該分組產生的任何Cuboid中, 這n個維度只會以(),(D1),(D1,D2)…(D1,D2…Dn)這n+1種形式中的一種出現。每個分組中可以有0個、1個或多個層級,不同的層級之間不應當有共享的維度。如果根據這個分組的業務邏輯,則多個維度直接存在層級關係,因此可以在該分組中把這些維度設置為層級維度。

  1. 聯合維度(Joint),

    • 聯合維度:將幾個維度視為一個維度。

    • 適用場景:

      1. 可以將確定在查詢時一定會同時使用的幾個維度設為一個聯合維度。
      2. 可以將基數很小的幾個維度設為一個聯合維度。
      3. 可以將查詢時很少使用的幾個維度設為一個聯合維度。
    • 優化效果:將N個維度設置為聯合維度,則這N個維度組合成的cuboid個數會從2的N次方減少到1。

每個聯合中包含兩個或更多個維度,如果某些列形成一個聯合,那麼在該分組產生的任何Cuboid中,這些聯合維度要麼一起出現,要麼都不出現。每個分組中可以有0個或多個聯合,但是不同的聯合之間不應當有共享的維度(否則它們可以合併成一個聯合)。如果根據這個分組的業務邏輯,多個維度在查詢中總是同時出現,則可以在該分組中把這些維度設置為聯合維度。

這些操作可以在Cube Designer的Advanced Setting中的Aggregation Groups區域完成,如下圖所示。

聚合組的設計非常靈活,甚至可以用來描述一些極端的設計。假設我們的業務需求非常單一,只需要某些特定的Cuboid,那麼可以創建多個聚合組,每個聚合組代表一個Cuboid。具體的方法是在聚合組中先包含某個Cuboid所需的所有維度,然後把這些維度都設置為強制維度。這樣當前的聚合組就只能產生我們想要的那一個Cuboid了。

再比如,有的時候我們的Cube中有一些基數非常大的維度,如果不做特殊處理,它就會和其他的維度進行各種組合,從而產生一大堆包含它的Cuboid。包含高基數維度的Cuboid在行數和體積上往往非常龐大,這會導致整個Cube的膨脹率變大。如果根據業務需求知道這個高基數的維度只會與若干個維度(而不是所有維度)同時被查詢到,那麼就可以通過聚合組對這個高基數維度做一定的“隔離”。我們把這個高基數的維度放入一個單獨的聚合組,再把所有可能會與這個高基數維度一起被查詢到的其他維度也放進來。這樣,這個高基數的維度就被“隔離”在一個聚合組中了,所有不會與它一起被查詢到的維度都沒有和它一起出現在任何一個分組中,因此也就不會有多餘的Cuboid產生。這點也大大減少了包含該高基數維度的Cuboid的數量,可以有效地控制Cube的膨脹率。

13.4 併發粒度優化

當Segment中某一個Cuboid的大小超出一定的閾值時,系統會將該Cuboid的數據分片到多個分區中,以實現Cuboid數據讀取的并行化,從而優化Cube的查詢速度。具體的實現方式如下:構建引擎根據Segment估計的大小,以及參數“kylin.hbase.region.cut”的設置決定Segment在存儲引擎中總共需要幾個分區來存儲,如果存儲引擎是HBase,那麼分區的數量就對應於HBase中的Region數量。kylin.hbase.region.cut的默認值是5.0,單位是GB,也就是說對於一個大小估計是50GB的Segment,構建引擎會給它分配10個分區。用戶還可以通過設置kylin.hbase.region.count.min(默認為1)和kylin.hbase.region.count.max(默認為500)兩個配置來決定每個Segment最少或最多被劃分成多少個分區。

由於每個Cube的併發粒度控制不盡相同,因此建議在Cube Designer 的Configuration Overwrites(上圖所示)中為每個Cube量身定製控制併發粒度的參數。假設將把當前Cube的kylin.hbase.region.count.min設置為2,kylin.hbase.region.count.max設置為100。這樣無論Segment的大小如何變化,它的分區數量最小都不會低於2,最大都不會超過100。相應地,這個Segment背後的存儲引擎(HBase)為了存儲這個Segment,也不會使用小於兩個或超過100個的分區。我們還調整了默認的kylin.hbase.region.cut,這樣50GB的Segment基本上會被分配到50個分區,相比默認設置,我們的Cuboid可能最多會獲得5倍的併發量。

13.5 Row Key優化

Kylin會把所有的維度按照順序組合成一個完整的Rowkey,並且按照這個Rowkey升序排列Cuboid中所有的行。

設計良好的Rowkey將更有效地完成數據的查詢過濾和定位,減少IO次數,提高查詢速度,維度在rowkey中的次序,對查詢性能有顯著的影響。

Row key的設計原則如下:

  1. 被用作where過濾的維度放在前邊。
  1. 基數大的維度放在基數小的維度前邊。

13.6 增量cube構建

構建全量cube,也可以實現增量cube的構建,就是通過分區表的分區時間字段來進行增量構建

  1. 更改model

  1. 更改cube

14 Kafka 流構建 Cube(Kylin實時案例)

Kylin v1.6 發布了可擴展的 streaming cubing 功能,它利用 Hadoop 消費 Kafka 數據的方式構建 cube。

參考:http://kylin.apache.org/blog/2016/10/18/new-nrt-streaming/

前期準備:kylin v1.6.0 或以上版本 和 可運行的 Kafka(v0.10.0 或以上版本)的 Hadoop 環境

14.1 Kafka創建Topic

  • 創建樣例名為 “kylin_streaming_topic” 具有一個副本三個分區的 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kylin_streaming_topic

  • 將樣例數據放入 topic,Kylin 有一個實用類可以做這項工作;
cd $KYLIN_HOME
./bin/kylin.sh org.apache.kylin.source.kafka.util.KafkaSampleProducer --topic kylin_streaming_topic --broker cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092

工具每一秒會向 Kafka 發送 100 條記錄。直至本案例結束請讓其一直運行。

14.2 用streaming定義一張表

登陸 Kylin Web GUI,選擇一個已存在的 project 或創建一個新的 project;點擊 “Model” -> “Data Source”,點擊 “Add Streaming Table” 圖標

  • 在彈出的對話框中,輸入您從 kafka-console-consumer 中獲得的樣例記錄,點擊 “»” 按鈕,Kylin 會解析 JSON 消息並列出所有的消息
{"country":"CHINA","amount":41.53789973661185,"qty":6,"currency":"USD","order_time":1592485535129,"category":"TOY","device":"iOS","user":{"gender":"Male","id":"12d127ab-707e-592f-2e4c-69ad654afa48","first_name":"unknown","age":25}}
  • 您需要為這個 streaming 數據源起一個邏輯表名;該名字會在後續用於 SQL 查詢;這裡是在 “Table Name” 字段輸入 “STREAMING_SALES_TABLE” 作為樣例。

  • 您需要選擇一個時間戳字段用來標識消息的時間;Kylin 可以從這列值中獲得其他時間值,如 “year_start”,”quarter_start”,這為您構建和查詢 cube 提供了更高的靈活性。這裏可以查看 “order_time”。您可以取消選擇那些 cube 不需要的屬性。這裏我們保留了所有字段。

  • 注意 Kylin 從 1.6 版本開始支持結構化 (或稱為 “嵌入”) 消息,會將其轉換成一個 flat table structure。默認使用 “_” 作為結構化屬性的分隔符。

  • 點擊 “Next”。在這個頁面,提供了 Kafka 集群信息;輸入 “kylin_streaming_topic” 作為 “Topic” 名;集群有 3 個 broker,其主機名為”cdh01.cm,cdh02.cm,cdh03.cm“,端口為 “9092”,點擊 “Save”。
  • 在 “Advanced setting” 部分,”timeout” 和 “buffer size” 是和 Kafka 進行連接的配置,保留它們。

  • 在 “Parser Setting”,Kylin 默認您的消息為 JSON 格式,每一個記錄的時間戳列 (由 “tsColName” 指定) 是 bigint (新紀元時間) 類型值;在這個例子中,您只需設置 “tsColumn” 為 “order_time”;

  • 在現實情況中如果時間戳值為 string 如 “Jul 20,2016 9:59:17 AM”,您需要用 “tsParser” 指定解析類和時間模式例如:
  • 點擊 “Submit” 保存設置。現在 “Streaming” 表就創建好了。

14.3 定義數據模型

  • 有了上一步創建的表,現在我們可以創建數據模型了。步驟和您創建普通數據模型是一樣的,但有兩個要求:

    • Streaming Cube 不支持與 lookup 表進行 join;當定義數據模型時,只選擇 fact 表,不選 lookup 表;
    • Streaming Cube 必須進行分區;如果您想要在分鐘級別增量的構建 Cube,選擇 “MINUTE_START” 作為 cube 的分區日期列。如果是在小時級別,選擇 “HOUR_START”。
  • 這裏我們選擇 13 個 dimension 和 2 個 measure 列:

保存數據模型。

14.4 創建 Cube

Streaming Cube 和普通的 cube 大致上一樣. 有以下幾點需要您注意:

  • 分區時間列應該是 Cube 的一個 dimension。在 Streaming OLAP 中時間總是一個查詢條件,Kylin 利用它來縮小掃描分區的範圍。
  • 不要使用 “order_time” 作為 dimension 因為它非常的精細;建議使用 “mintue_start”,”hour_start” 或其他,取決於您如何檢查數據。
  • 定義 “year_start”,”quarter_start”,”month_start”,”day_start”,”hour_start”,”minute_start” 作為層級以減少組合計算。
  • 在 “refersh setting” 這一步,創建更多合併的範圍,如 0.5 小時,4 小時,1 天,然後是 7 天;這將會幫助您控制 cube segment 的數量。
  • 在 “rowkeys” 部分,拖拽 “minute_start” 到最上面的位置,對於 streaming 查詢,時間條件會一直显示;將其放到前面將會幫助您縮小掃描範圍。

保存 cube。

14.5 運行Cube

可以在 web GUI 觸發 build,通過點擊 “Actions” -> “Build”,或用 ‘curl’ 命令發送一個請求到 Kylin RESTful API:

curl -X PUT --user ADMIN:KYLIN -H "Content-Type: application/json;charset=utf-8" -d '{ "sourceOffsetStart": 0, "sourceOffsetEnd": 9223372036854775807, "buildType": "BUILD"}' http://localhost:7070/kylin/api/cubes/{your_cube_name}/build2

請注意 API 終端和普通 cube 不一樣 (這個 URL 以 “build2” 結尾)。

這裏的 0 表示從最後一個位置開始,9223372036854775807 (Long 類型的最大值) 表示到 Kafka topic 的結束位置。如果這是第一次 build (沒有以前的 segment),Kylin 將會尋找 topics 的開頭作為開始位置。

在 “Monitor” 頁面,一個新的 job 生成了;等待其直到 100% 完成。

14.6 查看結果

點擊 “Insight” 標籤,編寫 SQL 運行,例如:

select minute_start, count(*), sum(amount), sum(qty) from streaming_sales_table group by minute_start order by minute_start

14.7 自動 build

一旦第一個 build 和查詢成功了,您可以按照一定的頻率調度增量 build。Kylin 將會記錄每一個 build 的 offsets;當收到一個 build 請求,它將會從上一個結束的位置開始,然後從 Kafka 獲取最新的 offsets。有了 REST API 您可以使用任何像 Linux cron 調度工具觸發它:

crontab -e
*/5 * * * * curl -X PUT --user ADMIN:KYLIN -H "Content-Type: application/json;charset=utf-8" -d '{ "sourceOffsetStart": 0, "sourceOffsetEnd": 9223372036854775807, "buildType": "BUILD"}' http://localhost:7070/kylin/api/cubes/{your_cube_name}/build2

現在您可以觀看 cube 從 streaming 中自動 built。當 cube segments 累積到更大的時間範圍,Kylin 將會自動的將其合併到一個更大的 segment 中。

15 JDBC查詢kylin

  • maven依賴
    <dependencies>
        <dependency>
            <groupId>org.apache.kylin</groupId>
            <artifactId>kylin-jdbc</artifactId>
            <version>3.0.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- 限制jdk版本插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
  • java類
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class KylinJdbc {
    public static void main(String[] args) throws Exception {
        //Kylin_JDBC 驅動
        String KYLIN_DRIVER = "org.apache.kylin.jdbc.Driver";
        //Kylin_URL
        String KYLIN_URL = "jdbc:kylin://localhost:9090/kylin_hive";
        //Kylin的用戶名
        String KYLIN_USER = "ADMIN";
        //Kylin的密碼
        String KYLIN_PASSWD = "KYLIN";
        //添加驅動信息
        Class.forName(KYLIN_DRIVER);
        //獲取連接
        Connection connection = DriverManager.getConnection(KYLIN_URL, KYLIN_USER, KYLIN_PASSWD);
        //預編譯SQL
        PreparedStatement ps = connection.prepareStatement("SELECT sum(sal) FROM emp group by deptno");
        //執行查詢
        ResultSet resultSet = ps.executeQuery();
        //遍歷打印
        while (resultSet.next()) {
                    System.out.println(resultSet.getInt(1));
        }
    }
}

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

聚甘新

Spring Data 教程 – Redis

1. Redis簡介

Redis(Remote Dictionary Server ),即遠程字典服務,是一個開源的使用ANSI C語言編寫、支持網絡、可基於內存亦可持久化的日誌型、Key-Value 數據庫,並提供多種語言的API。Redis 是一個高性能的key-value數據庫。 redis的出現,在部分場合可以對關係數據庫起到很好的補充作用。它提供了Java,C/C++,C#,PHP,JavaScript,Perl,Object-C,Python,Ruby,Erlang等客戶端,使用很方便。

Redis支持主從同步。數據可以從主服務器向任意數量的從服務器上同步,從服務器可以是關聯其他從服務器的主服務器。這使得Redis可執行單層樹複製。存盤可以有意無意的對數據進行寫操作。由於完全實現了發布/訂閱機制,使得從數據庫在任何地方同步樹時,可訂閱一個頻道並接收主服務器完整的消息發布記錄。同步對讀取操作的可擴展性和數據冗餘很有幫助。

redis的key都是字符串String類型,它的value是多樣化的,如下圖:

redis數據類型 ENCODING返回的編碼 底層對應的數據結構
string int long類型的整數
string embstr embstr編碼的簡單動態字符串
string raw 簡單動態字符串
list ziplist 壓縮列表
list linkedlist 雙向鏈表
hash ziplist 壓縮列表
hash ht 字典
set intset 整數集合
set ht 字典
zset ziplist 壓縮列表
zset skiplist 跳錶

2. Redis的五種數據類型

2.1 字符串對象(String)

字符串對象的模型:

redis底層提供了三種不同的數據結構實現字符串對象,根據不同的數據自動選擇合適的數據結構。這裏的字符串對象並不是指的純粹的字符串,数字也是可以的。

  • int:當數據是long類型的整数字符串時,底層使用long類型的整數實現。這個值會直接存儲在字符串對象的ptr屬性中,同時OBJECT ENCODING為int。

  • raw:當數據為長度大於44字節的字符串時,底層使用簡單動態字符串實現,說到這裏就不得不提下redis的簡單隨機字符串(Simple Dynamic String,SDS),SDS有三個屬性,free,len和buf。free存的是還剩多少空間,len存的是目前字符串長度,不包含結尾的空字符。buf是一個list,存放真實字符串數據,包含free和空字符。針對SDS本文不做詳細介紹,歡迎點擊SDS了解。

  • embstr:當數據為長度小於44字節的字符串時,底層使用embstr編碼的簡單動態字符串實現。相比於raw,embstr內存分配只需要一次就可完成,分配的是一塊連續的內存空間。

2.2 列表對象(List)

列表對象的模型:

redis中的列表對象經常被用作消息隊列使用,底層採用ziplist和linkedlist實現。大家使用的時候當作鏈表使用就可以了。

  • ziplist

    列表對象使用ziplist編碼需要滿足兩個要求,一是所有字符串長度都小於設定值值64字節(可以在配置文件中修改list-max-ziplist-value字段改變)。二是所存元素數量小於設定值512個(可以在配置文件中修改list-max-ziplist-entries字段改變)。ziplist類似與python中的list,佔用一段連續的內存地址,由此減小指針內存佔用。

    zlbytes:占內存總數

    zltail:到尾部的偏移量

    zllen:內部節點數

    node:節點

    zlend:尾部標識

    previous_entry_length:前一節點的長度

    encoding:數據類型

    content:真實數據

    遍歷的時候會根據zlbytes和zltail直接找到尾部節點nodeN,然後根據每個節點的previous_entry_length反向遍歷。增加和刪除節點會導致其他節點連鎖更新,因為每個節點都存儲了前一節點的長度。

  • linkedlist

    linkedlist有三個屬性,head,tail和len。head指向鏈表的頭部,tail指向鏈表的尾部,len為鏈表的長度。

2.3 哈希類型對象(Hash)

哈希類型對象的模型:

redis的value類型hash類型,其實就是map類型,就是在值的位置放一個map類型的數據。大家想詳細了解一下,可以參考一下這篇文章:https://www.jianshu.com/p/658365f0abfc 。

2.4 集合對象(Set)

集合對象類型的模型:

Set類型的value保證每個值都不重複。

redis中的集合對象底層有兩種實現方式,分別有整數集合和hashtable。當所有元素都是整數且元素數小於512(可在配置文件中set-max-intset-entries字段配置)時採用整數集合實現,其餘情況都採用hashtable實現。hashtable請移駕上文鏈接查閱,接下來介紹整數集合intset。intset有三個屬性,encoding:記錄数字的類型,有int16,int32和int64等,length:記錄集合的長度,content:存儲具體數據。具體結構如下圖:

2.5 有序集合對象

有序集合對象(zset)和集合對象(set)沒有很大區別,僅僅是多了一個分數(score)用來排序。

redis中的有序集合底層採用ziplist和skiplist跳錶實現,當所有字符串長度都小於設定值值64字節(可以在配置文件中修改list-max-ziplist-value字段改變),並且所存元素數量小於設定值512個(可以在配置文件中修改list-max-ziplist-entries字段改變)使用ziplist實現,其他情況均使用skiplist實現,跳躍表的實現原理這裏偷個懶,給大家推薦一篇寫的非常好的博客,點擊查看跳躍表原理。

3. Redis的安裝

可以去官網或者中文網下載Redis。redis的windows版本現在已經不更新了,所以我們安裝redis的6.0.3版本,這個版本支持的東西很多,在此次教程中,我們只對redis的五種數據類型做解釋和學習。

官網:https://redis.io/

中文網:https://www.redis.net.cn/

本教程安裝的redis版本為6.0.3版本,redis使用C語言編寫的,CentOS7的gcc自帶版本為4.8.5,而redis6.0+需要的gcc版本為5.3及以上,所以需要升級gcc版本。

下載Linux版本的tar.gz包,解壓以後進入解壓產生的包:

cd redis-6.0.3

發現沒有bin目錄,這裏需要通過make進行安裝。

# 先檢查gcc的環境 
gcc -v 
# 查看gcc版本 
yum -y install centos-release-scl 
# 升級到9.1版本 
yum -y install devtoolset-9-gcc devtoolset-9-gcc- c++ devtoolset-9-binutils 

scl enable devtoolset-9 bash 
#以上為臨時啟用,如果要長期使用gcc 9.1的話: 
echo "source /opt/rh/devtoolset-9/enable" >>/etc/profile 
# 進入redis解壓文件 
make 
# 6.0的坑,gcc版本 9.0 以上
# 等待完畢

執行完make操作之後,就可以在redis目錄看到src目錄了。進入src目錄后就可以看到redis-serverredis-cli

這裏建議將Redis的配置文件複製,保留一份原生的配置文件。

redis的配置大家可以在網上搜一下常用的配置,在這裏給大家推薦一個常用的配置,比較詳細:

https://blog.csdn.net/ymrfzr/article/details/51362125

到這裏redis就可以啟動並且正常訪問了。

注意:一定要將redis的IP地址綁定註釋掉,允許所有的IP地址訪問,不然我們從Windows訪問就訪問不了。

註釋掉下面的這一行:

同時關閉Redis的服務保護模式,將protected-mode設置為no。如下:

4. Spring Boot 整合 Redis

  • 4.1 搭建工程,引入依賴

    搭建工程的操作我這裏就不在寫出來了。直接上pom.xml

    <!--springboot父工程-->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    
    <dependencies>
        <!--springboot-web組件-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.2.2.RELEASE</version>
        </dependency>
        <!--redis整合springboot組件-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <version>2.3.0.RELEASE</version>
        </dependency>
        <!--lombok組件-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
        </dependency>
    </dependencies>
    
  • 4.2 redis的配置

    項目的配置文件,application.yml

    butterflytri:
      host: 127.0.0.1
    server:
      port: 8080 # 應用端口
      servlet:
        context-path: /butterflytri # 應用映射
    spring:
      application:
        name: redis # 應用名稱
      redis:
        host: ${butterflytri.host} # redis地址
        port: 6379 # redis端口,默認是6379
        timeout: 10000 # 連接超時時間(ms)
        database: 0 # redis默認情況下有16個分片,這裏配置具體使用的分片,默認是0
        jedis: # 使用連接redis的工具-jedis
          pool:
            max-active: 8 # 連接池最大連接數(使用負值表示沒有限制) 默認 8
            max-wait: -1 # 連接池最大阻塞等待時間(使用負值表示沒有限制) 默認 -1
            max-idle: 8 # 連接池中的最大空閑連接 默認 8
            min-idle: 0 # 連接池中的最小空閑連接 默認 0
    

    另外還有額外的配置類RedisConfig.java

    package com.butterflytri.config;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.RedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    /**
     * @author: WJF
     * @date: 2020/5/24
     * @description: RedisConfig
     */
    
    @Configuration
    public class RedisConfig {
    
        /**
         * redis鍵值對的值的序列化方式:通用方式
         * @return RedisSerializer
         */
        private RedisSerializer redisValueSerializer() {
            return new GenericJackson2JsonRedisSerializer();
        }
    
        /**
         * redis鍵值對的健的序列化方式:所有的健都是字符串
         * @return RedisSerializer
         */
        private RedisSerializer redisKeySerializer() {
            return new StringRedisSerializer();
        }
    
        @Bean("redisTemplate")
        public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
            redisTemplate.setConnectionFactory(redisConnectionFactory);
            redisTemplate.setKeySerializer(redisKeySerializer());
            redisTemplate.setValueSerializer(redisValueSerializer());
            return redisTemplate;
        }
    
    }
    
  • 4.3 redisTemplate的使用

    value類型的值的CRUD:

    ValueServiceImpl.java

    package com.butterflytri.service.impl;
    
    import com.butterflytri.service.ValueService;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    
    /**
     * @author: WJF
     * @date: 2020/5/27
     * @description: ValueServiceImpl
     */
    @Service
    public class ValueServiceImpl implements ValueService {
    
        @Resource
        private RedisTemplate<String, Object> redisTemplate;
    
        @Override
        public void addValue(String key, Object value) {
            redisTemplate.opsForValue().set(key,value);
        }
    
        @Override
        public Object get(String key) {
            return redisTemplate.opsForValue().get(key);
        }
    
        @Override
        public Object update(String key, Object newValue) {
            return redisTemplate.opsForValue().getAndSet(key,newValue);
        }
    
        @Override
        public void delete(String key) {
            redisTemplate.delete(key);
        }
    }	
    

    List類型的值的CRUD:

    這裏我加了枚舉類型用來控制增加的位置,因為List類型對應的是鏈表。

    ListServiceImpl.java

    package com.butterflytri.service.impl;
    
    import com.butterflytri.enums.OpsType;
    import com.butterflytri.service.ListService;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import java.util.List;
    
    /**
     * @author: WJF
     * @date: 2020/5/28
     * @description: ListServiceImpl
     */
    @Service
    public class ListServiceImpl implements ListService {
    
        @Resource
        private RedisTemplate<String, Object> redisTemplate;
    
        @Override
        public void addList(String key, List<Object> list, OpsType type) {
            switch (type) {
                case RIGHT:
                    redisTemplate.opsForList().rightPushAll(key, list);
                    break;
                case LEFT:
                    redisTemplate.opsForList().leftPushAll(key, list);
                    break;
                default:
                    throw new RuntimeException("type不能為null");
            }
        }
    
        @Override
        public void add(String redisKey, Object value, OpsType type) {
            switch (type) {
                case RIGHT:
                    redisTemplate.opsForList().rightPush(redisKey, value);
                    break;
                case LEFT:
                    redisTemplate.opsForList().leftPush(redisKey, value);
                    break;
                default:
                    throw new RuntimeException("type不能為null");
            }
        }
    
        @Override
        public List<Object> get(String key) {
            return redisTemplate.opsForList().range(key, 0, -1);
        }
    
        @Override
        public Object update(String key, Object value, Integer index) {
            Object obj = redisTemplate.opsForList().index(key, index);
            redisTemplate.opsForList().set(key,index,value);
            return obj;
        }
    
        @Override
        public void delete(String key) {
            redisTemplate.delete(key);
        }
    
        @Override
        public void deleteValue(String redisKey, OpsType type) {
            switch (type) {
                case RIGHT:
                    redisTemplate.opsForList().rightPop(redisKey);
                    break;
                case LEFT:
                    redisTemplate.opsForList().leftPop(redisKey);
                    break;
                default:
                    throw new RuntimeException("type不能為null");
            }
        }
    }
    

    Hash類型的值的CRUD:

    hash類型是我們使用最常用的類型。

    HashServiceImpl.java:

    package com.butterflytri.service.impl;
    
    import com.butterflytri.service.HashService;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import java.util.Map;
    
    /**
     * @author: WJF
     * @date: 2020/5/28
     * @description: HashServiceImpl
     */
    @Service
    public class HashServiceImpl implements HashService {
    
        @Resource
        private RedisTemplate<String, Object> redisTemplate;
    
        @Override
        public void addHashAll(String key, Map<String, Object> value) {
            redisTemplate.opsForHash().putAll(key, value);
        }
    
        @Override
        public void addHash(String redisKey, String key, Object value) {
            redisTemplate.opsForHash().put(redisKey, key, value);
        }
    
        @Override
        public Object get(String redisKey, String key) {
            return redisTemplate.opsForHash().get(redisKey, key);
        }
    
        @Override
        public Object update(String redisKey, String key, Object value) {
            Object obj = this.get(redisKey, key);
            this.delete(redisKey,key);
            redisTemplate.opsForHash().put(redisKey, key, value);
            return obj;
        }
    
        @Override
        public void delete(String redisKey, String key) {
            redisTemplate.opsForHash().delete(redisKey, key);
        }
    
        @Override
        public void deleteAll(String redisKey) {
            redisTemplate.delete(redisKey);
        }
    }
    

    Set的值的CRUD:

    package com.butterflytri.service.impl;
    
    import com.butterflytri.service.SetService;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import java.util.Set;
    
    /**
     * @author: WJF
     * @date: 2020/5/28
     * @description: SetServiceImpl
     */
    @Service
    public class SetServiceImpl implements SetService {
    
        @Resource
        private RedisTemplate<String, Object> redisTemplate;
    
    
        @Override
        public void addAll(String key, Set<Object> set) {
            redisTemplate.opsForSet().add(key,set);
        }
    
        @Override
        public void add(String key, Object value) {
            redisTemplate.opsForSet().add(key,value);
        }
    
        @Override
        public Set<Object> findAll(String key) {
            return redisTemplate.opsForSet().members(key);
        }
    
        @Override
        public void deleteValue(String key, Object value) {
            redisTemplate.opsForSet().remove(key,value);
        }
    
        @Override
        public void delete(String key) {
            redisTemplate.delete(key);
        }
    }
    

    ZSet類型的值的CRUD:

    package com.butterflytri.service.impl;
    
    import com.butterflytri.service.SortedSetService;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import java.util.LinkedHashSet;
    
    /**
     * @author: WJF
     * @date: 2020/5/28
     * @description: SortedSetServiceImpl
     */
    @Service
    public class SortedSetServiceImpl implements SortedSetService {
    
        @Resource
        private RedisTemplate<String, Object> redisTemplate;
    
        @Override
        public void add(String key, String value, Double score) {
            redisTemplate.opsForZSet().add(key, value, score);
        }
    
        @Override
        public LinkedHashSet<Object> findAll(String key) {
            return (LinkedHashSet<Object>) redisTemplate.opsForZSet().range(key,0,-1);
        }
    
        @Override
        public Long count(String key, Double scoreFrom, Double scoreTo) {
            return redisTemplate.opsForZSet().count(key,scoreFrom,scoreTo);
        }
    
        @Override
        public LinkedHashSet<Object> findByScore(String key, Double scoreFrom, Double scoreTo) {
            return (LinkedHashSet<Object>) redisTemplate.opsForZSet().rangeByScore(key,scoreFrom,scoreTo);
        }
    
        @Override
        public Long rank(String key, Object value) {
            return redisTemplate.opsForZSet().rank(key,value);
        }
    
        @Override
        public void remove(String key, String value) {
            redisTemplate.opsForZSet().remove(key,value);
        }
    
        @Override
        public void delete(String key) {
            redisTemplate.delete(key);
        }
    
    }
    

    redis的Java客戶端有很多,在這裏我們使用的是jedis,還有一個很好的Java語言的客戶端叫lettuce,大家可以去了解一下,Spring從不重複造輪子,只會簡化輪子的使用,redisTemplate就是一個超級簡單的使用實現。到這裏redis整合Spring Boot 就結束了。

5. 項目地址

本項目傳送門:

  • GitHub —> spring-data-redis
  • Gitee —> spring-data-redis

此教程會一直更新下去,覺得博主寫的可以的話,關注一下,也可以更方便下次來學習。

  • 作者:Butterfly-Tri
  • 出處:Butterfly-Tri個人博客
  • 版權所有,歡迎保留原文鏈接進行轉載

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

聚甘新

xenomai內核解析之雙核系統調用(一)

版權聲明:本文為本文為博主原創文章,轉載請註明出處。如有錯誤,歡迎指正。博客地址:https://www.cnblogs.com/wsg1100/

目錄

  • xenomai 內核系統調用
    • 一、32位Linux系統調用
    • 二、32位實時系統調用
    • 三、 64位系統調用
    • 五、 實時系統調用表cobalt_syscalls
    • 六、實時系統調用權限控制cobalt_sysmodes
    • 參考

xenomai 內核系統調用

解析系統調用是了解內核架構最有力的一把鑰匙,在這之前先搞懂xenomai與linux兩個內核共存后系統調用是如何實現的。

為什麼需要系統調用

linux內核中設置了一組用於實現系統功能的子程序,稱為系統調用。系統調用和普通庫函數調用非常相似,只是系統調用由操作系統核心提供,運行於內核態,而普通的函數調用由函數庫或用戶自己提供,運行於用戶態

一般的,進程是不能訪問內核的。它不能訪問內核所佔內存空間也不能調用內核函數。CPU硬件決定了這些(這就是為什麼它被稱作“保護模式”

為了和用戶空間上運行的進程進行交互,內核提供了一組接口。透過該接口,應用程序可以訪問硬件設備和其他操作系統資源。這組接口在應用程序和內核之間扮演了使者的角色,應用程序發送各種請求,而內核負責滿足這些請求(或者讓應用程序暫時擱置)。實際上提供這組接口主要是為了保證系統穩定可靠,避免應用程序肆意妄行,惹出大麻煩。

系統調用在用戶空間進程和硬件設備之間添加了一个中間層。該層主要作用有三個:

  • 它為用戶空間提供了一種統一的硬件的抽象接口。比如當需要讀些文件的時候,應用程序就可以不去管磁盤類型和介質,甚至不用去管文件所在的文件系統到底是哪種類型。
  • 系統調用保證了系統的穩定和安全。作為硬件設備和應用程序之間的中間人,內核可以基於權限和其他一些規則對需要進行的訪問進行裁決。舉例來說,這樣可以避免應用程序不正確地使用硬件設備,竊取其他進程的資源,或做出其他什麼危害系統的事情。
  • 每個進程都運行在虛擬系統中,而在用戶空間和系統的其餘部分提供這樣一層公共接口,也是出於這種考慮。如果應用程序可以隨意訪問硬件而內核又對此一無所知的話,幾乎就沒法實現多任務和虛擬內存,當然也不可能實現良好的穩定性和安全性。在Linux中,系統調用是用戶空間訪問內核的惟一手段;除異常和中斷外,它們是內核惟一的合法入口。

Linux加上實時系統內核xenomai后,實時任務常調用xenomai系統調用來完成實時的服務,如果實時任務需要用到linux的服務,還會調用linux的系統調用。

一、32位Linux系統調用

linux應用程序除直接系統調用外還會由glibc觸發系統調用,glibc為了提高應用程序的性能,對一些系統調用進行了封裝。
32位系統系統調用使用軟中斷int 0x80指令實現,軟中斷屬於異常的一種,通過它陷入(trap)內核,trap在整理的文檔x86 Linux中斷系統有說明。tarp_init()中設置IDT(Interrupt Descriptor Table 每个中斷處理程序的地址都保存在一個特殊的位置)由關int 0x80的IDT如下:

static const __initconst struct idt_data def_idts[] = {
	......
	SYSG(IA32_SYSCALL_VECTOR,	entry_INT80_32),
	......
};

當生系統調用時,硬件根據向量號在 IDT 中找到對應的表項,即中斷描述符,進行特權級檢查,發現 DPL = CPL = 3 ,允許調用。然後硬件將切換到內核棧 (tss.ss0 : tss.esp0)。接着根據中斷描述符的 segment selector 在 GDT / LDT 中找到對應的段描述符,從段描述符拿到段的基址,加載到 cs 。將 offset 加載到 eip。最後硬件將 ss / sp / eflags / cs / ip / error code 依次壓到內核棧。於是開始執行entry_INT80_32函數,該函數在entry_32.S定義:

ENTRY(entry_INT80_32)
	ASM_CLAC
	pushl	%eax		/* pt_regs->orig_ax */
	SAVE_ALL pt_regs_ax=$-ENOSYS	/* *存儲當前用戶態寄存器,保存在pt_regs結構里*/
	/*
	 * User mode is traced as though IRQs are on, and the interrupt gate
	 * turned them off.
	 */
	TRACE_IRQS_OFF

	movl	%esp, %eax
	call	do_int80_syscall_32
.Lsyscall_32_done:
	.......
.Lirq_return:
	INTERRUPT_RETURN/*iret 指令將原來用戶態保存的現場恢復回來,包含代碼段、指令指針寄存器等。這時候用戶態
進程恢復執行。*/

在內核棧的最高地址端,存放的是結構 pt_regs,首先通過 push 和 SAVE_ALL 將當前用戶態的寄存器,保存在棧中 pt_regs 結構裏面.保存完畢后,關閉中斷,將當前棧指針保存到 eax,即do_int80_syscall_32的參數1。
調用do_int80_syscall_32=>do_syscall_32_irqs_on。先看看沒有ipipe時Linux實現如下:

__always_inline void do_syscall_32_irqs_on(struct pt_regs *regs)
{
	struct thread_info *ti = pt_regs_to_thread_info(regs);
	unsigned int nr = (unsigned int)regs->orig_ax;

	.....
	if (likely(nr < IA32_NR_syscalls)) {
		nr = array_index_nospec(nr, IA32_NR_syscalls);
		regs->ax = ia32_sys_call_table[nr](	/*根據系統調用號索引直接執行*/
			(unsigned int)regs->bx, (unsigned int)regs->cx,
			(unsigned int)regs->dx, (unsigned int)regs->si,
			(unsigned int)regs->di, (unsigned int)regs->bp);
	}
	syscall_return_slowpath(regs);
}

在這裏,將系統調用號從pt_reges中eax 裏面取出來,然後根據系統調用號,在系統調用表中找到相應的函數進行調用,並將寄存器中保存的參數取出來,作為函數參數。如果仔細比對,就能發現,這些參數所對應的寄存器,和 Linux 的註釋是一樣的。ia32_sys_call_table系統調用表生成後面解析(此圖來源於網絡)。

相關內核調用執行完后,一直返回到 do_syscall_32_irqs_on ,如果系統調用有返回值,會被保存到 regs->ax 中。接着返回 entry_INT80_32 繼續執行,最後執行 INTERRUPT_RETURN 。 INTERRUPT_RETURN 在 arch/x86/include/asm/irqflags.h 中定義為 iret ,iret 指令將原來用戶態保存的現場恢復回來,包含代碼段、指令指針寄存器等。這時候用戶態進程恢復執行。

系統調用執行完畢。

二、32位實時系統調用

Xenomai使用I-pipe 攔截常規Linux系統調用調度程序,並將系統調用定向到實現它們的系統。

實時系統調用,除了直接系統調用外,xenomai還實現了libcoblat實時庫,相當於glibc,通過libcoblat進行xenomai系統調用,以libcoblat庫函數sem_open為例,libcolat庫中C函數實現如下:

COBALT_IMPL(sem_t *, sem_open, (const char *name, int oflags, ...))
{
	......
	err = XENOMAI_SYSCALL5(sc_cobalt_sem_open,
			       &rsem, name, oflags, mode, value);
	if (err == 0) {
		if (rsem != sem)
			free(sem);
		return &rsem->native_sem;
	}
	.......
	return SEM_FAILED;
}

libcolat庫調用系統調用使用宏XENOMAI_SYSCALL5XENOAI_SYSCALL宏在\include\asm\xenomai\syscall.h中聲明,XENOMAI_SYSCALL5中的’5’代表’該系統調用有五個參數:

#define XENOMAI_DO_SYSCALL(nr, op, args...)			\
({								\
	unsigned __resultvar;					\
	asm volatile (						\
		LOADARGS_##nr					\
		"movl %1, %%eax\n\t"				\
		DOSYSCALL					\
		RESTOREARGS_##nr				\
		: "=a" (__resultvar)				\
		: "i" (__xn_syscode(op)) ASMFMT_##nr(args)	\
		: "memory", "cc");				\
	(int) __resultvar;					\
})

#define XENOMAI_SYSCALL0(op)			XENOMAI_DO_SYSCALL(0,op)
#define XENOMAI_SYSCALL1(op,a1)			XENOMAI_DO_SYSCALL(1,op,a1)
#define XENOMAI_SYSCALL2(op,a1,a2)		XENOMAI_DO_SYSCALL(2,op,a1,a2)
#define XENOMAI_SYSCALL3(op,a1,a2,a3)		XENOMAI_DO_SYSCALL(3,op,a1,a2,a3)
#define XENOMAI_SYSCALL4(op,a1,a2,a3,a4)	XENOMAI_DO_SYSCALL(4,op,a1,a2,a3,a4)
#define XENOMAI_SYSCALL5(op,a1,a2,a3,a4,a5)	XENOMAI_DO_SYSCALL(5,op,a1,a2,a3,a4,a5)

每個宏中,內嵌另一個宏DOSYSCALL,即實現系統調用的int指令:int $0x80

#define DOSYSCALL  "int $0x80\n\t"

系統調用過程硬件處理及中斷入口上節一致,從do_syscall_32_irqs_on開始不同,有ipipe后變成下面這樣子:

static __always_inline void do_syscall_32_irqs_on(struct pt_regs *regs)
{
	struct thread_info *ti = current_thread_info();
	unsigned int nr = (unsigned int)regs->orig_ax;/*取出系統調用號*/
	int ret;
	
	ret = pipeline_syscall(ti, nr, regs);/*pipeline 攔截系統調用*/
	......
done:
	syscall_return_slowpath(regs);
}

套路和ipipe接管中斷類似,在關鍵路徑上攔截系統調用,然後調用ipipe_handle_syscall(ti, nr, regs)讓ipipe來接管處理:

int ipipe_handle_syscall(struct thread_info *ti,
			 unsigned long nr, struct pt_regs *regs)
{
	unsigned long local_flags = READ_ONCE(ti->ipipe_flags);
	int ret; 
	if (nr >= NR_syscalls && (local_flags & _TIP_HEAD)) {/*運行在head域且者系統調用號超過linux*/
		ipipe_fastcall_hook(regs);			/*快速系統調用路徑*/
		local_flags = READ_ONCE(ti->ipipe_flags);
		if (local_flags & _TIP_HEAD) {
			if (local_flags &  _TIP_MAYDAY)
				__ipipe_call_mayday(regs);
			return 1; /* don't pass down, no tail work. */
		} else {
			sync_root_irqs();
			return -1; /* don't pass down, do tail work. */
		}
	}

	if ((local_flags & _TIP_NOTIFY) || nr >= NR_syscalls) {
		ret =__ipipe_notify_syscall(regs);
		local_flags = READ_ONCE(ti->ipipe_flags);
		if (local_flags & _TIP_HEAD)
			return 1; /* don't pass down, no tail work. */
		if (ret)
			return -1; /* don't pass down, do tail work. */
	}

	return 0; /* pass syscall down to the host. */
}

這個函數的處理邏輯是這樣,怎樣區分xenomai系統調用和linux系統調用?每個CPU架構不同linux系統調用總數不同,在x86系統中有300多個,用變量NR_syscalls表示,系統調用號與系統調用一一對應。首先獲取到的系統調用號nr >= NR_syscalls,不用多想,那這個系統調用是xenomai內核的系統調用。
另外還有個問題,如果是Linux非實時任務觸發的xenomai系統調用,或者xenomai 實時任務要調用linux的服務,這些交叉服務涉及實時任務與非實時任務在兩個內核之間運行,優先級怎麼處理等問題。這些涉及cobalt_sysmodes[].

首先看怎麼區分一個任務是realtime還是no_realtime。在task_struct結構的頭有一個成員結構體thread_info,存儲着當前線程的信息,ipipe在結構體thread_info中增加了兩個成員變量ipipe_flagsipipe_data,ipipe_flags用來來標示一個線程是實時還是非實時,_TIP_HEAD置位表示已經是實時上下文。對於需要切換到xenomai上下文的系統調用_TIP_NOTIFY置位。

struct thread_info {
	unsigned long		flags;		/* low level flags */
	u32			status;		/* thread synchronous flags */
#ifdef CONFIG_IPIPE
	unsigned long		ipipe_flags;
	struct ipipe_threadinfo ipipe_data;
#endif
};

ipipe_handle_syscall處理邏輯:
1.對於已經在實時上下文的實時任務發起xenomai的系統調用,使用快速調用路徑函數ipipe_fastcall_hook(regs);
2.需要切換到實時上下文或者非實時調用實時的,使用慢速調用路徑:

__ipipe_notify_syscall(regs)
->ipipe_syscall_hook(caller_domain, regs)

快速調用ipipe_fastcall_hook(regs)內直接handle_head_syscall執行代碼如下:

static int handle_head_syscall(struct ipipe_domain *ipd, struct pt_regs *regs)
{
	....
	code = __xn_syscall(regs);
	nr = code & (__NR_COBALT_SYSCALLS - 1);
	......
	handler = cobalt_syscalls[code];
	sysflags = cobalt_sysmodes[nr];
	........

	ret = handler(__xn_reg_arglist(regs));
	.......

	__xn_status_return(regs, ret);

	.......
}

這個函數很複雜,涉及xenomai與linux之間很多聯繫,代碼是簡化后的,先取出系統調用號,然後從cobalt_syscalls取出系統調用入口handler,然後執行handler(__xn_reg_arglist(regs))執行完成后將執行結果放到寄存器ax,後面的文章會詳細分析ipipe如何處理系統調用。

三、 64位系統調用

我們再來看 64 位的情況,系統調用,不是用中斷了,而是改用 syscall 指令。並且傳遞參數的寄存器也變了。

#define DO_SYSCALL(name, nr, args...)			\
({							\
	unsigned long __resultvar;			\
	LOAD_ARGS_##nr(args)				\
	LOAD_REGS_##nr					\
	asm volatile (					\
		"syscall\n\t"				\
		: "=a" (__resultvar)			\
		: "0" (name) ASM_ARGS_##nr		\
		: "memory", "cc", "r11", "cx");		\
	(int) __resultvar;				\
})

#define XENOMAI_DO_SYSCALL(nr, op, args...) \
	DO_SYSCALL(__xn_syscode(op), nr, args)

#define XENOMAI_SYSBIND(breq) \
	XENOMAI_DO_SYSCALL(1, sc_cobalt_bind, breq)

這裏將系統調用號使用__xn_syscode(op)處理了一下,把最高位置1,表示Cobalt系統調用,然後使用syscall 指令。

#define __COBALT_SYSCALL_BIT	0x10000000
#define __xn_syscode(__nr)	(__COBALT_SYSCALL_BIT | (__nr))

syscall 指令還使用了一種特殊的寄存器,我們叫特殊模塊寄存器(Model Specific Registers,簡稱 MSR)。這種寄存器是 CPU 為了完成某些特殊控制功能為目的的寄存器,其中就有系統調用。在系統初始化的時候,trap_init 除了初始化上面的中斷模式,這裏面還會調用 cpu_init->syscall_init。這裏面有這樣的代碼:

wrmsrl(MSR_LSTAR, (unsigned long)entry_SYSCALL_64);

rdmsr 和 wrmsr 是用來讀寫特殊模塊寄存器的。MSR_LSTAR 就是這樣一個特殊的寄存器, 當 syscall 指令調用的時候,會從這個寄存器裏面拿出函數地址來調用,也就是調entry_SYSCALL_64。
該函數在’entry_64.S’定義:

ENTRY(entry_SYSCALL_64)
	UNWIND_HINT_EMPTY
	......
	swapgs
	/*
	 * This path is only taken when PAGE_TABLE_ISOLATION is disabled so it
	 * is not required to switch CR3.
	 */
	movq	%rsp, PER_CPU_VAR(rsp_scratch)
	movq	PER_CPU_VAR(cpu_current_top_of_stack), %rsp

	/* Construct struct pt_regs on stack */
	pushq	$__USER_DS			/* pt_regs->ss */
	pushq	PER_CPU_VAR(rsp_scratch)	/* pt_regs->sp */
	pushq	%r11				/* pt_regs->flags */
	pushq	$__USER_CS			/* pt_regs->cs */
	pushq	%rcx				/* pt_regs->ip *//*保存用戶太指令指針寄存器*/
GLOBAL(entry_SYSCALL_64_after_hwframe)
	pushq	%rax				/* pt_regs->orig_ax */

	PUSH_AND_CLEAR_REGS rax=$-ENOSYS

	TRACE_IRQS_OFF

	/* IRQs are off. */
	movq	%rsp, %rdi
	call	do_syscall_64		/* returns with IRQs disabled */

	TRACE_IRQS_IRETQ		/* we're about to change IF */

	/*
	 * Try to use SYSRET instead of IRET if we're returning to
	 * a completely clean 64-bit userspace context.  If we're not,
	 * go to the slow exit path.
	 */
	movq	RCX(%rsp), %rcx
	movq	RIP(%rsp), %r11

	cmpq	%rcx, %r11	/* SYSRET requires RCX == RIP */
	jne	swapgs_restore_regs_and_return_to_usermode
	.......
	testq	$(X86_EFLAGS_RF|X86_EFLAGS_TF), %r11
	jnz	swapgs_restore_regs_and_return_to_usermode

	/* nothing to check for RSP */

	cmpq	$__USER_DS, SS(%rsp)		/* SS must match SYSRET */
	jne	swapgs_restore_regs_and_return_to_usermode

	/*
	 * We win! This label is here just for ease of understanding
	 * perf profiles. Nothing jumps here.
	 */
syscall_return_via_sysret:
	/* rcx and r11 are already restored (see code above) */
	UNWIND_HINT_EMPTY
	POP_REGS pop_rdi=0 skip_r11rcx=1

	/*
	 * Now all regs are restored except RSP and RDI.
	 * Save old stack pointer and switch to trampoline stack.
	 */
	movq	%rsp, %rdi
	movq	PER_CPU_VAR(cpu_tss_rw + TSS_sp0), %rsp

	pushq	RSP-RDI(%rdi)	/* RSP */
	pushq	(%rdi)		/* RDI */

	/*
	 * We are on the trampoline stack.  All regs except RDI are live.
	 * We can do future final exit work right here.
	 */
	SWITCH_TO_USER_CR3_STACK scratch_reg=%rdi

	popq	%rdi
	popq	%rsp
	USERGS_SYSRET64
END(entry_SYSCALL_64)

這裏先保存了很多寄存器到 pt_regs 結構裏面,例如用戶態的代碼段、數據段、保存參數的寄存器.

然後調用 entry_SYSCALL64_slow_pat->do_syscall_64

__visible void do_syscall_64(struct pt_regs *regs)
{
	struct thread_info *ti = current_thread_info();
	unsigned long nr = regs->orig_ax;	/*取出系統調用號*/
	int ret;

	enter_from_user_mode();
	enable_local_irqs();

	ret = ipipe_handle_syscall(ti, nr & __SYSCALL_MASK, regs);
	if (ret > 0) {
		disable_local_irqs();
		return;
	}
	if (ret < 0)
		goto done;
	......
	if (likely((nr & __SYSCALL_MASK) < NR_syscalls)) {
		nr = array_index_nospec(nr & __SYSCALL_MASK, NR_syscalls);
		regs->ax = sys_call_table[nr](
			regs->di, regs->si, regs->dx,
			regs->r10, regs->r8, regs->r9);
	}
done:
	syscall_return_slowpath(regs);
}

與32位一樣,ipipe攔截了系統調用,後面的處理流程類似所以,無論是 32 位,還是 64 位,都會到linux系統調用表 sys_call_table和xenomai系統調用表cobalt_syscalls[] 這裏來。

五、 實時系統調用表cobalt_syscalls

xenomai每個系統的系統系統調用號在\cobalt\uapi\syscall.h中:


#define sc_cobalt_bind				0
#define sc_cobalt_thread_create			1
#define sc_cobalt_thread_getpid			2
	......
#define sc_cobalt_extend			96

bind()函數在內核代碼中對應的聲明和實現為:

/*聲明*/
#define COBALT_SYSCALL_DECL(__name, __args)	\
	long CoBaLt_ ## __name __args
static COBALT_SYSCALL_DECL(bind, lostage,
		      (struct cobalt_bindreq __user *u_breq));
/*實現*/
#define COBALT_SYSCALL(__name, __mode, __args)	\
	long CoBaLt_ ## __name __args
static COBALT_SYSCALL(bind, lostage,
		      (struct cobalt_bindreq __user *u_breq)){......}

其中__name表示系統調用名對應bind、__mode表示該系統調用模式對應lostage。COBALT_SYSCALL展開定義的bind函數后如下:

long CoBaLt_bind(struct cobalt_bindreq __user *u_breq){......}

怎麼將CoBaLt_bind與系統調用號sc_cobalt_bind聯繫起來後放入cobalt_syscalls[]的呢?
在編譯過程中Makefile使用腳本gen-syscall-entries.sh處理各個.c文件中的COBALT_SYSCALL宏,生成一個頭文件syscall_entries.h,裏面是對每個COBALT_SYSCALL宏處理后后的項,以上面COBALT_SYSCALL(bind,...)為例syscall_entries.h中會生成如下兩項,第一項為系統調用入口,第二項為系統調用的模式:

#define __COBALT_CALL_ENTRIES __COBALT_CALL_ENTRY(bind)
#define __COBALT_CALL_MODES	__COBALT_MODE(lostage)

實時系統調用表cobalt_syscalls[]定義在文件kernel\cobalt\posix\syscall.c中:

#define __syshand__(__name)	((cobalt_syshand)(CoBaLt_ ## __name))

#define __COBALT_NI	__syshand__(ni)

#define __COBALT_CALL_NI				\
	[0 ... __NR_COBALT_SYSCALLS-1] = __COBALT_NI,	\
	__COBALT_CALL32_INITHAND(__COBALT_NI)

#define __COBALT_CALL_NFLAGS				\
	[0 ... __NR_COBALT_SYSCALLS-1] = 0,		\
	__COBALT_CALL32_INITMODE(0)

#define __COBALT_CALL_ENTRY(__name)				\
	[sc_cobalt_ ## __name] = __syshand__(__name),		\
	__COBALT_CALL32_ENTRY(__name, __syshand__(__name))

#define __COBALT_MODE(__name, __mode)	\
	[sc_cobalt_ ## __name] = __xn_exec_##__mode,
	
#include "syscall_entries.h"		/*該頭文件由腳本生成*/

static const cobalt_syshand cobalt_syscalls[] = {
	__COBALT_CALL_NI
	__COBALT_CALL_ENTRIES
};

static const int cobalt_sysmodes[] = {
	__COBALT_CALL_NFLAGS
	__COBALT_CALL_MODES
};

__COBALT_CALL_NI宏表示數組空間大小為__NR_COBALT_SYSCALLS(128),每一項由__COBALT_CALL_ENTRIES定義,即腳本頭文件syscall_entries.h中生成的每一項來填充:

#define __COBALT_CALL_ENTRY(__name)				\
	[sc_cobalt_ ## __name] = __syshand__(__name),		\
	__COBALT_CALL32_ENTRY(__name, __syshand__(__name))

__COBALT_CALL32_ENTRY是定義兼容的系統調用,宏展開如下,相當於在數組的多個位置定義包含了同一項CoBaLt_bind

#define __COBALT_CALL32_ENTRY(__name, __handler)	\
	__COBALT_CALL32x_ENTRY(__name, __handler)	\
	__COBALT_CALL32emu_ENTRY(__name, __handler)

#define __COBALT_CALL32emu_ENTRY(__name, __handler)		\
			[sc_cobalt_ ## __name + 256] = __handler,
#define __COBALT_CALL32x_ENTRY(__name, __handler)		\
		[sc_cobalt_ ## __name + 128] = __handler,

最後bind系統調用在cobalt_syscalls[]中如下

static const cobalt_syshand cobalt_syscalls[] = {
	[sc_cobalt_bind] = CoBaLt_bind,
    [sc_cobalt_bind + 128] = CoBaLt_bind,   /*x32 support */
    [sc_cobalt_bind + 256] = CoBaLt_bind,   /*ia32 emulation support*/
	.....
};

相應的數組cobalt_sysmodes[]中的內容如下:

static const int cobalt_sysmodes[] = {
	[sc_cobalt_bind] = __xn_exec_bind,
    [sc_cobalt_bind + 256] = __xn_exec_lostage, /*x32 support */
    [sc_cobalt_bind + 128] = __xn_exec_lostage, /*ia32 emulation support*/
    ......
};

六、實時系統調用權限控制cobalt_sysmodes

上面說到,ipipe管理應用的系統調用時需要分清該系統調用是否合法,是否需要域切換等等。cobalt_sysmodes[]就是每個系統調用對應的模式,控制着每個系統調用的調用路徑。系統調用號為下標,值為具體模式。每個系統調用的sysmode如何生成見上一節,還是以實時應用的bind系統調用為例:

static const int cobalt_sysmodes[] = {
	[sc_cobalt_bind] = __xn_exec_bind,
    [sc_cobalt_bind + 256] = __xn_exec_lostage, /*x32 support */
    [sc_cobalt_bind + 128] = __xn_exec_lostage, /*ia32 emulation support*/
    ......
};

xenomai中所有的系統調用模式定義如下:

/*xenomai\posix\syscall.c*/
#define __xn_exec_lostage    0x1	/*必須在linux域運行該系統調用*/	
#define __xn_exec_histage    0x2	/*必須在Xenomai域運行該系統調用*/	
#define __xn_exec_shadow     0x4		/*影子系統調用:必須映射調用方*/
#define __xn_exec_switchback 0x8 	/*切換回切換; 調用者必須返回其原始模式*/
#define __xn_exec_current    0x10		/*在不管域直接執行。*/
#define __xn_exec_conforming 0x20  	/*在兼容域(Xenomai或Linux)中執行*/
#define __xn_exec_adaptive   0x40	/* 先直接執行如果返回-ENOSYS,則嘗試在相反的域中重新執行系統調用 */
#define __xn_exec_norestart  0x80  /*收到信號后不要重新啟動syscall*/
 /*Shorthand初始化系統調用的簡寫*/
#define __xn_exec_init       __xn_exec_lostage 
/*Xenomai空間中shadow系統調用的簡寫*/
#define __xn_exec_primary   (__xn_exec_shadow|__xn_exec_histage) 
/*Linux空間中shadow系統調用的簡寫*/
#define __xn_exec_secondary (__xn_exec_shadow|__xn_exec_lostage)
/*Linux空間中syscall的簡寫,如果有shadow則切換回linux*/
#define __xn_exec_downup    (__xn_exec_lostage|__xn_exec_switchback)
/* 主域系統不可重啟調用的簡寫 */
#define __xn_exec_nonrestartable (__xn_exec_primary|__xn_exec_norestart)
/*域探測系統調用簡寫*/
#define __xn_exec_probing   (__xn_exec_conforming|__xn_exec_adaptive)
/*將模式選擇移交給syscall。*/
#define __xn_exec_handover  (__xn_exec_current|__xn_exec_adaptive)

使用一個無符號32 位數的每一位來表示一種模式,各模式註釋已經很清楚,不在解釋,後面文章解析ipipe是如何根據mode來處理的。

參考

英特爾® 64 位和 IA-32 架構軟件開發人員手冊第 3 卷 :系統編程指南
極客時間專欄-趣談Linux操作系統
《linux內核源代碼情景分析》

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

※教你寫出一流的銷售文案?

聚甘新

如何只用5分鐘完成數據 列表、創建頁面

前言

我們當然希望能夠更快的完成我們的工作,這樣我們才能有更多的時間做其他的事情,比如說測試、學習、放鬆。

背景

軟件一般也就這麼幾個方面的工作要做,增、刪、改、查。如果歸結到頁面上來說,那麼無非也就這麼幾個頁面Form頁面(增)、列表頁面(查、刪)、編輯頁面(改)。很大程度上,你的項目就是由不同的實體的這麼幾個頁面組裝起來的。既然他們都是這麼幾個頁面,那麼,我們是不是可以考慮針對這幾個頁面進行抽象呢?然後使用數據描述這幾個頁面的行為。

效果

經典倒敘,先上效果圖

列表頁面

創建頁面

目前就簡單實現了列表頁面和創建頁面。編輯頁面,跟創建頁面太像了。暫時還沒有實現相關內容,不過,這個不是很重要了。

實現過程

需求分析

其實,每個頁面都是存在固定的路數的。

比如說:

列表頁面裡邊主要存在這麼幾個參數:列表名、列表頭上的按鈕、列表的表頭、列表內容、列表每一行中的操作、分頁控件。

表單頁面列表主要存在這麼幾個參數:表單名、表單內容項。

主要的參數出現的位置都是固定的。但是什麼地方出現什麼內容則是可以變化的,一般情況下,我們都是通過代碼,一遍一遍的重寫這些頁面,然後來達到不同的應用之間的變化的目的。其實我們是可以通過數據來描述他們的。比如說向下面這樣。

列表頁面的定義

Form表單頁面的定義

原始數據的定義

然後將這些定義好的屬性通過後端渲染到頁面上。

就可以達到,前邊展示的這種效果了。

數據存儲

因為數據類型是自定義的,所以數據存儲的字段也是可以自己隨便預設的。然後系統就可以直接支持這一數據類型。在這個Demo裡邊,我是簡單粗暴的使用了文件存儲Json文件的方式來進行保存的數據。

其實應該鏈接數據庫的。不過我在Demo項目裡邊留下了相關的接口,只要再實現一個數據庫版本的實例就可以無縫對接了。

其實

當然了這隻是他的最初級的形態,因為現在寫的配置文件都是通過手寫來實現,將來可以做一個編輯器。並且可以實時看到調整過的效果。

其實這個做法,是來源於PaaS項目中的一個很小很小的功能塊。真正的PaaS項目這一整套東西都是在線上直接編輯看效果的。

最後

系列

這個項目將來會融入到我寫的PaaS Demo中作為前端展示部分。 系列的目錄在 https://juejin.im/post/5eca2a186fb9a047e96b2884 這個部分會一點點完善。

開源

雖然東西不大,但是還是希望能給你一點點啟發。 項目地址 https://gitee.com/anxin1225/Dov.GenericWeb

簡單的體驗

部署到雲端了,可以簡單體驗一下。

http://gw.ash50p.com/Generic/Meeting.Record/List

轉載莫忘原文地址:https://juejin.im/post/5eeb85b8e51d45740850f755

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司“嚨底家”!

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

聚甘新

80386學習(五) 80386分頁機制與虛擬內存

一. 頁式內存管理介紹

  80386能夠將內存分為不同屬性的段,並通過段描述符、段表以及段選擇子等機制,通過段基址和段內偏移量計算出線性地址進行訪問,這一內存管理方式被稱為段式內存管理

  這裏要介紹的是另一種內存管理的方式:80386在開啟了分頁機制后,便能夠將物理內存劃分為一個個大小相同且連續的物理內存頁,訪問時通過物理內存頁號和頁內偏移計算出最終需要訪問的線性地址進行訪問,由於內存管理單元由段變成了頁,因此這一內存管理方式被稱為頁式內存管理

  80386的分頁機制只能在保護模式下開啟。

為什麼需要頁式內存管理?

  在介紹80386分頁機制前,需要先理解為什麼CPU在管理內存時,要在段式內存管理的基礎上再引入一種有很大差異的頁式內存管理方式?頁式內存管理與純段式內存管理相比到底具有哪些優點?

  一個很重要的原因是為了解決多任務環境下,段式內存管理中多任務的創建與終止時會產生較多內存碎片,使得內存空間使用率不高的問題

  內存碎片分為外碎片和內碎片兩種。

外碎片

  對於指令和數據的訪問通常都是連續的,所以需要為一個任務分配連續的內存空間。在段式內存管理中,通常為任務分配一個完整的內存段,或是按照任務內段功能的不同,分配包括代碼段、數據段和堆棧段在內的多個完整連續段空間。支持多道任務的系統分配的內存空間,會在某些任務退出並釋放內存時,產生外部內存碎片。

  舉個例子,假設當前存在10MB的內存空間,存在A/B/C/D四個任務,併為每個任務分配一整塊的內存空間,其所佔用的內存空間分別為3MB/2MB/4MB/1MB,如下圖所示(一個格子代表1MB內存)。

  當任務B和任務D執行完成后,所佔用的內存空間被釋放,10MB的內存空間中出現了3MB大小的空閑內存。如果此時出現了一個任務E,需要為其分配3MB的內存空間,此時內存雖然存在3MB的內存空間,卻由於空閑內存的不連續,碎片化,導致無法直接分配給任務E使用。而這裏任務B、任務D結束后釋放的空餘內存空間就被視為外碎片。

  這裏的例子任務數量少且內存空間也很小。而在實際的32位甚至64位的系統中,物理內存空間少則4GB,多則幾十甚至上百GB,由於任務內存的反覆分配和釋放,導致出現的外碎片的數量及浪費的內存空間會很多,很大程度上降低了內存空間的利用率。

  雖然理論上能夠通過操作系統小心翼翼的挪動內存,使得外碎片能夠拼接為連續的大塊,得以被有效利用(內存緊縮)。但是操作系統挪動、複製內存本身很佔用CPU資源,且存在對指令進行地址重定位、暫時暫停對所挪動內存區域的訪問等附加問題,造成的效率降低程度幾乎是不可忍受的,因此這一解決方案並沒有被廣泛使用。

  

內碎片

  外碎片指的是不同任務內存之間的碎片,而內碎片指的是一個任務內產生的內存碎片。

  通常操作系統為了管理多任務環境下的物理內存,會將內存分隔為固定大小的分區,使用系統表記錄對應分區內存的使用情況(如是否已分配等)。分區的大小必須適當,如果分區過小,則相同物理內存大小下,系統表項過多使得所佔用的空間過大;可如果分區過大,則會產生過大的內碎片,造成不必要的內存空間浪費。

  以上述介紹外碎片的數據為例,系統中的內存分區固定大小為1MB,其中為任務C分配了4個內存分區,共4MB大小。可實際上任務C實際只需要3.5MB的空間即可滿足需求,但由於分區是內存管理的最小單元,只能為任務分配整數個的內存分區。3個分區3MB並不滿足任務C的3.5MB的內存需求,因此只能分配4個分區給任務C。而這裏任務C額外多佔用的0.5MB內存就是內碎片。 

  內碎片就是已經被分配出去,卻不能被有效利用的內存空間。

80386是如何解決內存碎片問題的?

外碎片的解決

  外碎片問題產生的主要原因是程序所需要分配的內存空間是連續的。為此,80386提供了分頁機制,使得最終分配給任務的物理內存空間可以不連續。如果任務所使用的內存不必連續,前面外碎片例子中提到的任務E就能夠在1MB+2MB的離散物理內存上正常運行,外碎片問題自然就得到了解決。

內碎片的解決

  內碎片從本質上來說是很難完全避免的(內存管理最小單元不能過小),主要的問題在於前面提到的內存分區管理單元大小的較優值不好確定。開啟了分頁管理的80386,允許將物理內存分割最小為4KB固定大小的管理單元,這個固定大小的內存管理單元被稱為頁,並由專門的被稱為頁表的數據結構來追蹤內存頁的使用情況。

  對於頁表項過多的問題,80386的設計者提供了多級頁表機制,減少了頁表所佔用的空間。

  對於內碎片過大的問題,由於80386所運行的任務所佔用的內存段一般遠大於一個內存頁的大小,因此頁機制下所產生的內部碎片是十分有限的,可以達到一個令人滿意的內存使用率。

二. 虛擬內存簡單介紹 

  為了解決應用程序高速增長的內存需求與物理內存增加緩慢的矛盾,計算機科學家們提供了虛擬內存的概念。使用了虛擬內存的系統,可以使得系統內運行的程序所佔用的內存空間總量,遠大於實際物理內存的容量。

  能夠實現虛擬內存的關鍵在於程序在特定時刻所需要訪問的內存地址是符合局部性原理的。通過操作系統和硬件的緊密配合,能夠將任務暫時不需要訪問的內存交換到外部硬盤中,而將物理內存留給真正需要訪問的那部分內存(工作集內存)。

  虛擬內存和分頁機制是一對好搭檔,分頁機制提供了管理內存的基本單位:頁,80386的頁式虛擬內存實現在工作集內存調度時也依賴分頁機制提供的頁來進行。隨着程序的執行,程序的工作集內存在動態變化,當CPU檢測到當前所訪問的內存頁不在物理內存中時,便會通知操作系統(內存缺頁異常),操作系統的缺頁異常處理程序會將硬盤交換區中的對應內存頁數據寫回物理內存。如果物理內存頁已經滿了的情況下,則還需要根據某種算法將另一個物理內存頁替換,來容納這一換入的內存頁。

三. 80386分頁機制原理

  在介紹分頁機制原理之前,需要先理解關於80386保護模式下32位內存尋址時幾種地址的概念。

物理地址(Physical Address):

  物理地址就是32位的地址總線所對應的真實的硬件存儲空間。對於物理內存的訪問,無論中間會經過多少次轉換,最終必須轉換為最終的物理地址進行訪問。

邏輯地址(Logical Address):

  在80386保護模式的程序指令中,對內存的訪問是由段選擇子和段內偏移決定的。段選擇子+段內偏移 –> 邏輯地址。

線性地址(Linear Address):

  CPU在內存尋址時,從指令中獲得段選擇子和段內偏移,即邏輯地址。由段選擇子在段表(GDT或LDT)中找到對應的段描述符,獲取段基址。段基址+段內偏移決定線性地址。

  如果沒有開啟分頁,CPU就使用生成的線性地址直接作為最終的物理地址進行訪問;如果開啟了分頁,則還需要通過頁表等機制,將線性地址進一步處理才能生成物理地址進行訪問。

頁式虛擬內存實現原理

  程序要求訪問一個段時,其線性地址必須是連續的。在純粹的段式內存管理中,線性地址等於物理地址的情況下,就會出現外碎片的問題。而在段式內存管理的基礎上,80386如果還開啟了頁機制,就能通過抽象出一層線性地址到物理地址的映射,使得最終分配給程序的物理內存段不必連續。

  80386中的內存頁大小為4KB,在32位的內存尋址空間中(4GB),存在着0x10000 = 1048576個頁。每個頁對應的起始地址低12位都為0,第一個物理內存頁的物理地址為0x00000000,第二個物理內存頁的物理地址為0x00001000,依此類推,最後一個物理頁的物理地址是0xFFFFF000。

頁表

  在80386的分頁機制的實現中,是通過頁表來實現線性地址到物理地址映射轉換的。每個任務都有一個自己的頁表記錄著任務的線性地址到物理地址的映射關係。

  開啟了頁機制后的線性地址也被稱為虛擬地址,這是因為線性地址已經不再直接對應真實的物理地址,而是一個不承載真實數據的虛擬內存地址。開啟了分頁機制后,一個任務的虛擬地址空間依然是連續的,但所佔用的物理地址空間卻可以不連續

  頁表保存着被稱為頁表項的數據結構集合,每一個頁表項都記載着一個虛擬內存頁到物理內存頁的映射關係。開啟了頁機制之後,CPU在內存尋址時,在通過段表計算出了線性地址(虛擬地址)后,便可以在連續排布的虛擬地址空間中找到對應的頁表項,通過頁表項獲取虛擬內存頁所對應的物理內存頁地址,進行物理內存的訪問。虛擬地址到物理地址映射的細節會在後面進行展開。

  由於是將不斷變化的虛擬內存頁裝載進相對不變的物理內存頁中,就像畫廊中展示的畫會不斷的更替,但畫框基本不變一樣。為了更好的區分這兩者,頁通常特指虛擬內存頁,而物理內存頁則被稱為頁框。

頁表項介紹

  頁表項是32位的,其結構如下圖所示。

  

P位:

  P(Present),存在位。標識當前虛擬內存頁是否存在於物理內存頁中。當P位為1時,表示當前虛擬內存頁存在於物理內存中,可以直接進行訪問。當P位為0時,表示對應的物理內存頁不存在,需要新分配物理內存頁或是從磁盤中將其調度回物理內存。

  分頁模式下的內存尋址,如果CPU發現對應的頁表項P位為0,會引發缺頁異常中斷,操作系統在缺頁異常處理程序中進行對應的處理,以實現虛擬內存。

RW位:

  RW(Read/Write)位,讀寫位。標識當前頁是否能夠寫入。當RW為1時,代表當前頁可讀可寫;當RW為0時,代表當前頁是只讀的。

US位:

  US(User/Supervisor)位,用戶/管理位。當US為1時,標識當前頁是用戶級別的,允許所有當前特權級的任務進行訪問。當US為0時,表示當前頁是屬於管理員級別的,只允許當前特權級為0、1、2的任務進行訪問,而當前特權級為3的用戶態任務無法進行訪問。

PWT位/PCD位:

  PWT(Page-level Write Through)位,頁級通寫位。PWT為1時,表示當前物理頁的高速緩存採用通寫法;PWT為0時,表示當前物理頁的高速緩存採用回寫法。

  PCD(Page-level Cache Disable)位,頁級高速緩存禁止位。PCD為1時,表示訪問當前物理頁禁用高速緩存;PCD為0時,表示訪問當前物理頁時允許使用高速緩存。

  PWT與PCD位的使用,涉及到了80386高速緩存的工作原理與內存一致性問題,限於篇幅不在這裏展開。

A位:

  A(Access)位,訪問位。A位為1時,代表當前頁曾經被訪問過;A位為0時,代表當前頁沒有被訪問過。

  A位的設置由CPU固件在對應內存頁訪問時自動設置為1,且可以由操作系統在適當的時候通過程序指令重置為0,用以計算內存頁的訪問頻率。通過訪問頻率,操作系統能夠以此作為虛擬內存調度算法中評估的依據,在物理內存緊張的情況下,可以選擇將最少使用的內存頁換出,以減少不必要的虛擬內存頁調度時的磁盤I/O,提高虛擬內存的效率。

D位:

  D(Dirty)位,臟位。當D位為1時,表示當前頁被寫入修改過;D位為0時,代表當前頁沒有被寫入修改過。

  臟位由CPU在對應內存頁被寫入時自動設置為1。操作系統在進行內存頁調度時,如果發現需要被換出的內存頁D位為1時,則需要將對應物理內存頁數據寫回虛擬頁對應的磁盤交換區,保證磁盤/內存數據的一致性;當發現需要被換出的物理內存頁的D位為0時,表示當前頁自從換入物理內存以來沒有被修改過,和磁盤交換區中的數據一致,便直接將其覆蓋,而不進行磁盤的寫回,減少不必要的I/O以提高效率。

PAT位:

  PAT(Page Attribute Table),頁屬性表支持位。PAT位的存在使得CPU能夠支持更複雜的,不同頁大小的分頁管理。當PAT=0時,每一頁的大小為4KB;當PAT=1時,每一頁的大小是4MB,或是其它大小(分CPU的情況而定)。

G位:

  G(Global),全局位。表示當前頁是否是全局的,而不是屬於某一特定任務的。G=1時,表示當前頁是全局的;G=0時,表示當前頁是屬於特定任務的。

  為了加速頁表項的訪問,80386提供了TLB快表,作為頁表訪問的高速緩存。當任務切換時,TLB內所有G=0的非全局頁將會被清除,G=1的全局頁將會被保留。將操作系統內核中關鍵的,頻繁訪問的頁設置為全局頁,使得其能夠一直保存在TLB快表中,加速對其的訪問速度,提高效率。

AVL位:

  AVL(Avaliable),可用位。和段描述符中的AVL位功能類似,CPU並不使用它,而是提供給操作系統軟件自定義使用。

頁物理基地址字段:     

  頁物理基地址字段用於標識對應的物理頁,共20位。

  由於32位的80386的頁最小是4KB,而4GB的物理內存被分解為了最多0x10000個4KB的物理頁。20位的頁物理基地址字段作為物理頁的索引標號與每一個具體的物理頁一一對應。通過頁物理基地址字段,便能找到唯一對應的物理內存頁。

多級頁表

  在32位的CPU中,操作系統可以給每個程序分配至多4GB的虛擬內存空間,如果一個內存頁佔4KB,那麼對應的每個程序的頁表中最多需要存放着0x10000個頁表項來進行映射。即使每個頁表項只佔小小的32位共4個字節(4Byte),這依然是一個不小的內存開銷(0x10000個頁表項的大小為4MB)。

  一個應用程序雖然可以被分配4GB的虛擬內存空間,但實際上可能只使用其中的一小部分,例如40MB的大小。通常程序的堆棧段和數據段都分別位於虛擬內存空間的高低兩端,並隨着程序的執行慢慢的向中間擴展,由於頁表項對應與虛擬地址空間的連續性,這就要求任務在執行時必須完整的定義整張頁表。

  可以看到,一級的平面頁表結構存在着明顯的頁表空間浪費的問題。雖然可以要求應用程序不要一下子就以4GB的內存規格進行編程,而是一開始用較小的內存,並在需要更大內存時梯度的申請更大的內存空間,並重新構造數據段和堆棧段以減少每個任務的無用頁表項空間的浪費。但這將頁表空間優化的繁重任務強加給了應用程序,並不是一個好的解決辦法。

  為此,計算機科學家們提出了多級頁表的方案來解決頁表項過多的問題。多級頁表顧名思義,頁表的結構不再是一個一級的平面結構(一級頁表),而是像一顆樹一樣,由頁目錄項節點頁表項節點組成。目錄節點中保存着下一級節點的物理頁地址等信息,恭弘=叶 恭弘子節點中則包含着真正的頁表項信息。查詢頁表項時,從一級頁目錄節點(根目錄)出發,按照一定的規則可以找到對應的下一級子目錄節點,直到查詢出對應的恭弘=叶 恭弘子節點為止。

  

80386頁目錄項介紹

  80386採用的是二級頁表的設計,二級頁表由頁目錄表和頁表共同組成。頁目錄表中存放的是頁目錄項,頁目錄項的大小和頁表項一致,為4字節。

  通過80386指令得到的32位線性地址,其中高20位作為頁表項索引,低12位作為頁內偏移地址(4KB大小的物理頁)。如果採用的是一級頁表結構,20位的頁表項索引能直接找到4MB頁表中的對應頁表項。

  而對於80386二級頁表的設計來說,由於一個物理頁大小為4KB,最多可以容納1024(2^10)個頁表項或者頁目錄項,所以將頁表項索引的高10位作為根目錄頁中頁目錄項的索引值,通過頁目錄項中的頁表項物理頁號可以找到對應的頁表物理頁;再根據頁表項索引的后10位找到頁表中對應的頁表項。

  

80386頁目錄項結構圖

   80386的二級頁表的頁目錄項佔32位,其低12位的含義與頁表項一致。主要區別在於其高20位存放的是下一級頁表的物理頁索引,而不是虛擬地址映射的物理內存頁地址。

  

頁表基址寄存器

  前面提到過,和LDT一樣,每個任務都擁有着自己獨立的頁表。為此80386CPU提供了一個專門的寄存器用於追蹤定位任務自己的頁表,這個寄存器的名稱叫做頁表基址寄存器(Page Directory Base Register,PDBR),也就是控制寄存器CR3。

  由於80386分頁機制使用的是二級頁表,因此PDBR指向的是二級頁表結構中的頁目錄,通過頁目錄表便能夠間接的訪問整個二級頁表。為了效率其中存放的直接就是頁目錄表的32位物理地址,一般由操作系統負責在任務切換時將新任務對應的頁目錄表預先加載進物理內存。

  由於PDBR是和當前任務有關的,在任務切換時會被新任務TSS中的PDBR字段值所替換,指向新任務的頁目錄表,而舊任務的PDBR的值則在保護現場時被存入對應的TSS中。

多級頁表是如何解決頁表項浪費問題的?

  以80386的二級頁表設計為例,最大4GB的虛擬內存空間下,無論如何一級頁目錄表是必須存在的。當不需要為應用程序分配過多的內存時,頁目錄表中的頁目錄項所指向的對應頁表可以不存在,即頁目錄項的P位為0,實際不使用的虛擬內存空間將沒有對應的二級頁表節點,相比一級頁表的設計其浪費的內存會少很多。

  假設需要為一個虛擬地址首尾各需要分配20MB,共佔用40MB內存的任務構建對應的頁表。

  1. 如果使用一級頁表,4GB的虛擬內存空間下需要提供0x10000個頁表項,共4MB,頁表的體積達到了任務自身所需40MB內存的10%,但其中絕大多數的頁表項都是沒用的(P位為0),不會對應實際的物理內存,空間效率很低。

  2. 如果使用二級頁表,除了佔一個物理頁4KB大小的頁目錄表是必須存在的外,其頁目錄表中只有首尾兩項的P位為1,分別指向一個實際存在的頁表(二級節點),頁目錄表中間其它的頁目錄項P位都為0,不需要為這些不會使用到的虛擬地址分配頁表。對於這個40MB的程序來說,其頁表只佔了3個物理頁面,共12KB,空間效率相比一級頁表高很多。

TLB快表

  前面提到了多級頁表所帶來的好處:通過頁表分層,可以減少順序排列的無效頁表項數量,節約內存空間;頁表的層級越多,空間效率也越高。

  計算機領域中,通常並沒有免費的午餐,一個問題的解決,往往會帶來新的問題:多級頁表本質上是一個樹狀結構,每一個節點頁都是離散的,因此每一層級訪問都需要進行一次內存尋址操作,頁表的層級越多,訪問的次數也就越多,虛擬頁地址映射過程也越慢。在32位的80386中,2級頁表下問題還不算特別嚴重;但64位CPU的出現帶來了更大的尋址空間,也需要更多的頁表項,頁表的層級也漸漸的從2級變成了3級、4級甚至更多。頁機制開啟之後,所有的內存尋址都需要經過CPU的頁部件進行轉化才能獲得最終的物理地址,因此這一過程必須要快,不能因為頁表的離散層次訪問就嚴重影響虛擬地址空間到物理地址空間的轉換速度。

  要加快原本相對耗時的查詢操作,一個常用的辦法便是引入緩存。為了加速通用內存的訪問,80386利用局部性原理提供了高速緩存;為了加速多級頁表的頁表項訪問,80386提供了TLB。

  TLB(Translation Lookaside Buffer)直譯為地址轉換後援緩衝器,根據其作用也被稱為頁表緩存或是快表(快速頁表)。TLB中存放着一張表,其中的每一項用於緩存當前任務虛擬頁號和對應頁表項中的關鍵信息,被稱為TLB項。

  TLB的工作原理和高速緩存類似:當CPU訪問某一虛擬頁時,通過虛擬頁號先在TLB中尋找,如果發現對應的TLB項存在,則直接以TLB項中的數據進行物理地址的轉換,這被稱為TLB命中;當發現對應的TLB項不存在時(TLB未命中),則進行內存的訪問,在獲取內存中頁表項數據的同時,也將對應頁表項緩存入TLB中。如果TLB已滿則需要通過某種置換算法選出一個已存在的TLB項將其替換。

  TLB的查詢速度比內存快,但容量相對內存小很多,因此只能緩存數量有限的頁表項。但由於內存訪問的局部性,只要通過合理的設計提高TLB的命中率(通常可以達到90%以上),就能達到很好的效果。 

四. 80386分頁機制下的內存尋址流程

  下面總結一下開啟了分頁機制的80386是如何進行內存尋址的。

  1. CPU首先從內存訪問指令中獲取段選擇子和段內偏移地址

  2. 根據段選擇子從段表(GDT或LDT)中查詢出對應的段描述符

  3. 根據段描述符中的段基址和指令中的段內偏移地址生成32位的線性地址(頁機制下的虛擬地址)

  4. 32位的線性地址根據80386二級頁表的設計,拆分成三個部分:高10位作為頁目錄項索引,中間次高10位作為頁表項索引,低12位作為頁內偏移地址。

  5. 通過高10位的頁目錄項索引從一級頁目錄表中獲取二級頁表的物理頁地址(通過物理頁框號可得),再根據中間10位的頁表項索引找到對應的物理頁框。根據物理頁框號與頁內偏移地址共同生成最終的物理地址,進行物理內存的訪問。

五. 總結

  想要通過學習操作系統來更好的理解計算機程序底層的工作原理,基礎的硬件知識是必須要了解的。紙上得來終覺淺,絕知此事要躬行,在理解了基礎原理后,還需要通過實踐來加深對原理知識的理解,而閱讀相關操作系統的實現源碼就是一個很好的將實踐與原理緊密結合的學習方式。

  希望通過對硬件和操作系統的學習能幫助我打開計算機程序底層運行的神秘黑盒子一窺究竟,在思考問題時能夠換一個角度從底層的視角出發,去更好的理解和掌握上層的應用技術,以避免迷失在快速發展的技術浪潮中。

   

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

FB行銷專家,教你從零開始的技巧

聚甘新

【String註解驅動開發】面試官讓我說說:如何使用FactoryBean向Spring容器中註冊bean?

寫在前面

在前面的文章中,我們知道可以通過多種方式向Spring容器中註冊bean。可以使用@Configuration結合@Bean向Spring容器中註冊bean;可以按照條件向Spring容器中註冊bean;可以使用@Import向容器中快速導入bean對象;可以在@Import中使用ImportBeanDefinitionRegistrar向容器中註冊bean。

項目工程源碼已經提交到GitHub:https://github.com/sunshinelyz/spring-annotation

FactoryBean概述

一般情況下,Spring通過反射機制利用bean的class屬性指定實現類來實例化bean 。在某些情況下,實例化bean過程比較複雜,如果按照傳統的方式,則需要在 標籤中提供大量的配置信息,配置方式的靈活性是受限的,這時採用編碼的方式可以得到一個更加簡單的方案。Spring為此提供了一個org.springframework.bean.factory.FactoryBean的工廠類接口,用戶可以通過實現該接口定製實例化bean的邏輯。

FactoryBean接口對於Spring框架來說佔有重要的地位,Spring 自身就提供了70多個FactoryBean的實現。它們隱藏了實例化一些複雜bean的細節,給上層應用帶來了便利。從Spring 3.0 開始, FactoryBean開始支持泛型,即接口聲明改為FactoryBean 的形式:

在Spring 5.2.6版本中,FactoryBean接口的定義如下所示。

package org.springframework.beans.factory;
import org.springframework.lang.Nullable;

public interface FactoryBean<T> {

	String OBJECT_TYPE_ATTRIBUTE = "factoryBeanObjectType";

	@Nullable
	T getObject() throws Exception;

	@Nullable
	Class<?> getObjectType();

	default boolean isSingleton() {
		return true;
	}
}
  • T getObject():返回由FactoryBean創建的bean實例,如果isSingleton()返回true,則該實例會放到Spring容器中單實例緩存池中。
  • boolean isSingleton():返回由FactoryBean創建的bean實例的作用域是singleton還是prototype。
  • Class getObjectType():返回FactoryBean創建的bean類型。

這裏,需要注意的是:當配置文件中 標籤的class屬性配置的實現類是FactoryBean時,通過 getBean()方法返回的不是FactoryBean本身,而是FactoryBean#getObject()方法所返回的對象,相當於FactoryBean#getObject()代理了getBean()方法。

FactoryBean實例

首先,創建一個PersonFactoryBean,實現FactoryBean接口,如下所示。

package io.mykit.spring.plugins.register.bean;

import org.springframework.beans.factory.FactoryBean;
/**
 * @author binghe
 * @version 1.0.0
 * @description 商品的FactoryBean,測試FactoryBean
 */
public class PersonFactoryBean implements FactoryBean<Person> {

    //返回一個Person對象,這個對象會被註冊到Spring容器中
    @Override
    public Person getObject() throws Exception {
        return new Person();
    }

    @Override
    public Class<?> getObjectType() {
        return Person.class;
    }

    //bean是否為單例;true:是;false:否
    @Override
    public boolean isSingleton() {
        return true;
    }
}

接下來,我們在PersonConfig2類中加入PersonFactoryBean的聲明,如下所示。

@Bean
public PersonFactoryBean personFactoryBean(){
    return new PersonFactoryBean();
}

這裏需要小夥伴們注意的是:我在這裏使用@Bean註解向Spring容器中添加的是PersonFactory對象。那我們就來看看Spring容器中有哪些bean。接下來,運行SpringBeanTest類中的testAnnotationConfig7()方法,輸出的結果信息如下所示。

org.springframework.context.annotation.internalConfigurationAnnotationProcessor
org.springframework.context.annotation.internalAutowiredAnnotationProcessor
org.springframework.context.annotation.internalCommonAnnotationProcessor
org.springframework.context.event.internalEventListenerProcessor
org.springframework.context.event.internalEventListenerFactory
personConfig2
io.mykit.spring.plugins.register.bean.Department
io.mykit.spring.plugins.register.bean.Employee
io.mykit.spring.plugins.register.bean.User
io.mykit.spring.plugins.register.bean.Role
person
binghe001
personFactoryBean
company

可以看到,結果信息中輸出了一個personFactoryBean,我們看下這個personFactoryBean到底是個什麼鬼!此時,我們對SpringBeanTest類中的testAnnotationConfig7()方法稍加改動,添加獲取personFactoryBean的代碼,並輸出personFactoryBean實例的類型,如下所示。

@Test
public void testAnnotationConfig7(){
    ApplicationContext context = new AnnotationConfigApplicationContext(PersonConfig2.class);
    String[] names = context.getBeanDefinitionNames();
    Arrays.stream(names).forEach(System.out::println);

    Object personFactoryBean = context.getBean("personFactoryBean");
    System.out.println("personFactoryBean實例的類型為:" + personFactoryBean.getClass());
}

再次運行SpringBeanTest類中的testAnnotationConfig7()方法,輸出的結果信息如下所示。

org.springframework.context.annotation.internalConfigurationAnnotationProcessor
org.springframework.context.annotation.internalAutowiredAnnotationProcessor
org.springframework.context.annotation.internalCommonAnnotationProcessor
org.springframework.context.event.internalEventListenerProcessor
org.springframework.context.event.internalEventListenerFactory
personConfig2
io.mykit.spring.plugins.register.bean.Department
io.mykit.spring.plugins.register.bean.Employee
io.mykit.spring.plugins.register.bean.User
io.mykit.spring.plugins.register.bean.Role
person
binghe001
personFactoryBean
company
personFactoryBean實例的類型為:class io.mykit.spring.plugins.register.bean.Person

可以看到,雖然我在代碼中使用@Bean註解注入的PersonFactoryBean對象,但是,實際上從Spring容器中獲取到的bean對象卻是調用PersonFactoryBean類中的getObject()獲取到的Person對象。

看到這裏,是不是有種豁然開朗的感覺!!!

在PersonFactoryBean類中,我們將Person對象設置為單實例bean,接下來,我們在SpringBeanTest類中的testAnnotationConfig7()方法多次獲取Person對象,並輸出多次獲取的對象是否為同一對象,如下所示。

@Test
public void testAnnotationConfig7(){
    ApplicationContext context = new AnnotationConfigApplicationContext(PersonConfig2.class);
    String[] names = context.getBeanDefinitionNames();
    Arrays.stream(names).forEach(System.out::println);

    Object personFactoryBean1 = context.getBean("personFactoryBean");
    Object personFactoryBean2 = context.getBean("personFactoryBean");
    System.out.println(personFactoryBean1 == personFactoryBean2);
}

運行testAnnotationConfig7()方法輸出的結果信息如下所示。

org.springframework.context.annotation.internalConfigurationAnnotationProcessor
org.springframework.context.annotation.internalAutowiredAnnotationProcessor
org.springframework.context.annotation.internalCommonAnnotationProcessor
org.springframework.context.event.internalEventListenerProcessor
org.springframework.context.event.internalEventListenerFactory
personConfig2
io.mykit.spring.plugins.register.bean.Department
io.mykit.spring.plugins.register.bean.Employee
io.mykit.spring.plugins.register.bean.User
io.mykit.spring.plugins.register.bean.Role
person
binghe001
personFactoryBean
company
true

可以看到,在PersonFactoryBean類的isSingleton()方法中返回true時,每次獲取到的Person對象都是同一個對象,說明Person對象是單實例bean。

這裏,可能就會有小夥伴要問了,如果將Person對象修改成多實例bean呢?別急,這裏我們只需要在PersonFactoryBean類的isSingleton()方法中返回false,即可將Person對象設置為多實例bean,如下所示。

//bean是否為單例;true:是;false:否
@Override
public boolean isSingleton() {
    return false;
}

再次運行SpringBeanTest類中的testAnnotationConfig7()方法,輸出的結果信息如下所示。

org.springframework.context.annotation.internalConfigurationAnnotationProcessor
org.springframework.context.annotation.internalAutowiredAnnotationProcessor
org.springframework.context.annotation.internalCommonAnnotationProcessor
org.springframework.context.event.internalEventListenerProcessor
org.springframework.context.event.internalEventListenerFactory
personConfig2
io.mykit.spring.plugins.register.bean.Department
io.mykit.spring.plugins.register.bean.Employee
io.mykit.spring.plugins.register.bean.User
io.mykit.spring.plugins.register.bean.Role
person
binghe001
personFactoryBean
company
false

可以看到,最終結果返回了false,說明此時Person對象是多實例bean。

如何在Spring容器中獲取到FactoryBean對象?

之前,我們使用@Bean註解向Spring容器中註冊的PersonFactoryBean,獲取出來的確實Person對象。那麼,小夥伴們可能會問:我就想獲取PersonFactoryBean實例,該怎麼辦呢?

其實,這也很簡單, 只需要在獲取bean對象時,在id前面加上&符號即可

打開我們的測試類SpringBeanTest,在testAnnotationConfig7()方法中添加獲取PersonFactoryBean實例的代碼,如下所示。

@Test
public void testAnnotationConfig7(){
    ApplicationContext context = new AnnotationConfigApplicationContext(PersonConfig2.class);
    String[] names = context.getBeanDefinitionNames();
    Arrays.stream(names).forEach(System.out::println);

    Object personFactoryBean1 = context.getBean("personFactoryBean");
    Object personFactoryBean2 = context.getBean("personFactoryBean");
    System.out.println("personFactoryBean1類型:" + personFactoryBean1.getClass());
    System.out.println("personFactoryBean2類型:" + personFactoryBean2.getClass());
    System.out.println(personFactoryBean1 == personFactoryBean2);

    Object personFactoryBean3 = context.getBean("&personFactoryBean");
    System.out.println("personFactoryBean3類型:" + personFactoryBean3.getClass());
}

運行SpringBeanTest類中的testAnnotationConfig7()方法,輸出的結果信息如下所示。

org.springframework.context.annotation.internalConfigurationAnnotationProcessor
org.springframework.context.annotation.internalAutowiredAnnotationProcessor
org.springframework.context.annotation.internalCommonAnnotationProcessor
org.springframework.context.event.internalEventListenerProcessor
org.springframework.context.event.internalEventListenerFactory
personConfig2
io.mykit.spring.plugins.register.bean.Department
io.mykit.spring.plugins.register.bean.Employee
io.mykit.spring.plugins.register.bean.User
io.mykit.spring.plugins.register.bean.Role
person
binghe001
personFactoryBean
company
personFactoryBean1類型:class io.mykit.spring.plugins.register.bean.Person
personFactoryBean2類型:class io.mykit.spring.plugins.register.bean.Person
false
personFactoryBean3類型:class io.mykit.spring.plugins.register.bean.PersonFactoryBean

可以看到,在獲取bean時,在id前面加上&符號就會獲取到PersonFactoryBean實例對象。

那問題又來了!!為什麼在id前面加上&符號就會獲取到PersonFactoryBean實例對象呢?

接下來,我們就揭開這個神秘的面紗,打開BeanFactory接口,

package org.springframework.beans.factory;
import org.springframework.beans.BeansException;
import org.springframework.core.ResolvableType;
import org.springframework.lang.Nullable;

public interface BeanFactory {
	String FACTORY_BEAN_PREFIX = "&";
    /**************以下省略n行代碼***************/
}

看到這裏,是不是明白了呢?沒錯,在BeanFactory接口中定義了一個&前綴,只要我們使用bean的id來從Spring容器中獲取bean時,Spring就會知道我們是在獲取FactoryBean本身。

好了,咱們今天就聊到這兒吧!別忘了給個在看和轉發,讓更多的人看到,一起學習一起進步!!

項目工程源碼已經提交到GitHub:https://github.com/sunshinelyz/spring-annotation

寫在最後

如果覺得文章對你有點幫助,請微信搜索並關注「 冰河技術 」微信公眾號,跟冰河學習Spring註解驅動開發。公眾號回復“spring註解”關鍵字,領取Spring註解驅動開發核心知識圖,讓Spring註解驅動開發不再迷茫。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化

聚甘新

Netty中的這些知識點,你需要知道!

一、Channel

Channel是一個接口,而且是一個很大的接口,我們稱之為“大而全”,囊括了server端及client端接口所需要的接口。

Channel是一個門面,封裝了包括網絡I/O及相關的所有操作。

Channel聚合了包括網絡讀寫、鏈路管理、網絡連接信息、獲取EventLoop、Pipeline等相關功能類;統一分配,調度實現相應場景的功能。

一個Channel 對應一個物理連接,是基於物理連接上的操作包裝。

二、EventLoop

EventLoop,Event意為事件、Loop意為環,EventLoo即為事件環

EventLoop是一種程序設計結構等待以及分發事件。

NioEventLoop,是一個Netty工作線程,又不僅僅是一個Netty工作線程。

標準的netty線程模型 中我們講過Netty的標準線程池模型,池子里的每個線程對象就是一個NioEventLoop對象。或負責接受連接,或負責網絡I/O

說它不僅僅是一個Netty線程,因為它實現了很多功能,我們可以看下它的繼承圖:

它的上方有兩個枝丫,一個線程屬性,一個EventLoop,它是Netty的Reactor線程

既然是Reactor線程,那麼首先我們需要一個多路復用器。在Netty NioEventLoop中,包就含一個 Selector,它的操作對象是Channel。

NioEventLoop的主要邏輯在它的run()方法,方法體內是一個無限循環 for (;;),循環體內實現Loop功能。這也是通用的NIO線程實現方式。

 

Loop 從任務隊列里獲取任務,然後檢查多路復用器中就緒的Channel進行處理。

三、Unsafe

Netty中的Unsafe,一個Channel內部聚合接口,用以處理實際的網絡I/O讀寫。當然,取Unsafe命名,源碼中釋義:提供的網絡相關的操作方法,永遠不應該被開發人員操作使用。

它是Channel的一個輔助接口,主要方法:

1、register:註冊Channel

2、deregister:取消註冊

3、bind:綁定地址,服務端綁定監聽特定端口;客戶端指定本地綁定Socket地址。

4、connect:建立連接

5、disconnect:斷開連接

6、close:關閉連接

7、write:調度寫,將數據寫入buffer,並未真正進入Channel

8、flush:將緩衝區中的數據寫入Channel

四、AdaptiveRecvByteBufAllocator

動態緩衝區分配器,源碼說明:根據實時的反饋動態的增加或者減少預需的緩衝區大小。

如果一次分配的緩衝區被填滿了,則調高下一次分配的緩衝區大小。

如果連續兩次實際使用的容量低於分配的緩衝區大小特定比例,則減小下一次分配的緩衝區大小。

其它情景,保持分配大小不變。

Netty的這種“智能化”處理,可以說是相當有用的:

1、首先,實際的應用場景千差萬別,同一場景下不同時刻的緩衝區需求也是實時變化(一句話可以是一個字,也可能是1000個字),這就需要Netty動態調整緩衝分配大小以適應不同的業務場景,時刻場景

2、其次,過大的不必要的內存分配,會導致Buffer處理性能下降;過小的內存分配,則會導致頻繁的分配釋放。這都是一個優良的網絡框架不應該有的。 

3、最後,動態的調整最直接的好處就是內存的的高效使用,一定程度上做到了按需分配。 

五、ChannelPipeline

Pipeline 管道,Channel的數據流通管道,在這個管道中,可以做很多事情。

ChannelPipeline 是一種職責鏈,可以對其中流動的數據進行過濾、攔截處理,是一種插拔式的鏈路裝配器

1、ChannelPipline是一個容器

支持查詢、添加、刪除、替換等容器操作。

2、ChannelPipline支持動態的添加和刪除 Handler

ChannelPipline的這種特性給了我們相當的想象空間,例如動態的添加系統擁塞保護Handler,敏感數據過濾Handler、日誌記錄Handler、性能統計Handler等。

3、ChannelPipline 是線程安全的

ChannelPipline使用 synchronized 實現線程安全,業務線程可以併發的操作ChannelPipline。但需要注意的是,Handler是非線程安全的

六、HandlerAdapter

Adapter是一種適配器,對於用戶自定義的Handler,可以通過繼承HandlerAdapter,來規避不必要的接口實現

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

聚甘新

【原創】強擼 .NET Redis Cluster 集群訪問組件

  Hello 大家好,我是TANZAME,我們又見面了。今天我們來聊聊怎麼手擼一個 Redis Cluster 集群客戶端,純手工有乾貨,您細品。

  隨着業務增長,線上環境的QPS暴增,自然而然將當前的單機 Redis 切換到群集模式。燃鵝,我們悲劇地發現,ServiceStack.Redis這個官方推薦的 .NET 客戶端並沒有支持集群模式。一通度娘翻牆無果后,決定自己強擼一個基於ServiceStack.Redis的Redis集群訪問組件。

  話不多說,先上運行效果圖:

 

  Redis-Cluster集群使用 hash slot 算法對每個key計算CRC16值,然後對16383取模,可以獲取key對應的 hash slot。Redis-Cluster中每個master都會持有部分 slot,在訪問key時根據計算出來的hash slot去找到具體的master節點,再由當前找到的節點去執行具體的 Redis 命令(具體可查閱官方說明文檔)。

  由於 ServiceStack.Redis已經實現了單個實例的Redis命令,因此我們可以將即將要實現的 Redis 集群客戶端當做一個代理,它只負責計算 key 落在哪一個具體節點(尋址)然後將Redis命令轉發給對應的節點執行即可。

  ServiceStack.Redis的RedisClient是非線程安全的,ServiceStack.Redis 使用緩存客戶端管理器(PooledRedisClientManager)來提高性能和併發能力,我們的Redis Cluster集群客戶端也應集成PooledRedisClientManager來獲取 RedisClient 實例。

  同時,Redis-Cluster集群支持在線動態擴容和slot遷移,我們的Redis集群客戶端也應具備自動智能發現新節點和自動刷新 slot 分佈的能力。

  總結起來,要實現一個Redis-Cluster客戶端,需要實現以下幾個要點:

  • 根據 key 計算 hash slot
  • 自動讀取群集上所有的節點信息
  • 為節點分配緩存客戶端管理器
  • 將 hash slot 路由到正確的節點
  • 自動發現新節點和自動刷新slot分佈

  如下面類圖所示,接下來我們詳細分析具體的代碼實現。

  

  一、CRC16  

  CRC即循環冗餘校驗碼,是信息系統中一種常見的檢錯碼。CRC校驗碼不同的機構有不同的標準,這裏Redis遵循的標準是CRC-16-CCITT標準,這也是被XMODEM協議使用的CRC標準,所以也常用XMODEM CRC代指,是比較經典的“基於字節查表法的CRC校驗碼生成算法”。 

 1 /// <summary>
 2 /// 根據 key 計算對應的哈希槽
 3 /// </summary>
 4 public static int GetSlot(string key)
 5 {
 6     key = CRC16.ExtractHashTag(key);
 7     // optimization with modulo operator with power of 2 equivalent to getCRC16(key) % 16384
 8     return GetCRC16(key) & (16384 - 1);
 9 }
10 
11 /// <summary>
12 /// 計算給定字節組的 crc16 檢驗碼
13 /// </summary>
14 public static int GetCRC16(byte[] bytes, int s, int e)
15 {
16     int crc = 0x0000;
17 
18     for (int i = s; i < e; i++)
19     {
20         crc = ((crc << 8) ^ LOOKUP_TABLE[((crc >> 8) ^ (bytes[i] & 0xFF)) & 0xFF]);
21     }
22     return crc & 0xFFFF;
23 }

 

  二、讀取集群節點

  從集群中的任意節點使用 CLUSTER NODES 命令可以讀取到集群中所有的節點信息,包括連接狀態,它們的標誌,屬性和分配的槽等等。CLUSTER NODES 以串行格式提供所有這些信息,輸出示例:

d99b65a25ef726c64c565901e345f98c496a1a47 127.0.0.1:7007 master - 0 1592288083308 8 connected
2d71879d6529d1edbfeed546443051986245c58e 127.0.0.1:7003 master - 0 1592288084311 11 connected 10923-16383
654cdc25a5fa11bd44b5b716cdf07d4ce176efcd 127.0.0.1:7005 slave 484e73948d8aacd8327bf90b89469b52bff464c5 0 1592288085313 10 connected
ed65d52dad7ef6854e0e261433b56a551e5e11cb 127.0.0.1:7004 slave 754d0ec7a7f5c7765f784a6a2c370ea38ea0c089 0 1592288081304 9 connected
754d0ec7a7f5c7765f784a6a2c370ea38ea0c089 127.0.0.1:7001 master - 0 1592288080300 9 connected 0-5460
484e73948d8aacd8327bf90b89469b52bff464c5 127.0.0.1:7002 master - 0 1592288082306 10 connected 5461-10922
2223bc6d099bd9838e5d2f1fbd9a758f64c554c4 127.0.0.1:7006 myself,slave 2d71879d6529d1edbfeed546443051986245c58e 0 0 6 connected

  每個字段的含義如下:

  1. id:節點 ID,一個40個字符的隨機字符串,當一個節點被創建時不會再發生變化(除非CLUSTER RESET HARD被使用)。

  2. ip:port:客戶端應該聯繫節點以運行查詢的節點地址。

  3. flags:逗號列表分隔的標誌:myselfmasterslavefail?failhandshakenoaddrnoflags。標誌在下一節詳細解釋。

  4. master:如果節點是從屬節點,並且主節點已知,則節點ID為主節點,否則為“ – ”字符。

  5. ping-sent:以毫秒為單位的當前激活的ping發送的unix時間,如果沒有掛起的ping,則為零。

  6. pong-recv:毫秒 unix 時間收到最後一個乒乓球。

  7. config-epoch:當前節點(或當前主節點,如果該節點是從節點)的配置時期(或版本)。每次發生故障切換時,都會創建一個新的,唯一的,單調遞增的配置時期。如果多個節點聲稱服務於相同的哈希槽,則具有較高配置時期的節點將獲勝。

  8. link-state:用於節點到節點集群總線的鏈路狀態。我們使用此鏈接與節點進行通信。可以是connecteddisconnected

  9. slot:散列槽號或範圍。從參數9開始,但總共可能有16384個條目(限制從未達到)。這是此節點提供的散列槽列表。如果條目僅僅是一個数字,則被解析為這樣。如果它是一個範圍,它是在形式start-end,並且意味着節點負責所有散列時隙從startend包括起始和結束值。

標誌的含義(字段編號3):

  • myself:您正在聯繫的節點。
  • master:節點是主人。
  • slave:節點是從屬的。
  • fail?:節點處於PFAIL狀態。對於正在聯繫的節點無法訪問,但仍然可以在邏輯上訪問(不處於FAIL狀態)。
  • fail:節點處於FAIL狀態。對於將PFAIL狀態提升為FAIL的多個節點而言,這是無法訪問的。
  • handshake:不受信任的節點,我們握手。
  • noaddr:此節點沒有已知的地址。
  • noflags:根本沒有標誌。
  1 // 讀取集群上的節點信息
  2 static IList<InternalClusterNode> ReadClusterNodes(IEnumerable<ClusterNode> source)
  3 {
  4     RedisClient c = null;
  5     StringReader reader = null;
  6     IList<InternalClusterNode> result = null;
  7 
  8     int index = 0;
  9     int rowCount = source.Count();
 10 
 11     foreach (var node in source)
 12     {
 13         try
 14         {
 15             // 從當前節點讀取REDIS集群節點信息
 16             index += 1;
 17             c = new RedisClient(node.Host, node.Port, node.Password);
 18             RedisData data = c.RawCommand("CLUSTER".ToUtf8Bytes(), "NODES".ToUtf8Bytes());
 19             string info = Encoding.UTF8.GetString(data.Data);
 20 
 21             // 將讀回的字符文本轉成強類型節點實體
 22             reader = new StringReader(info);
 23             string line = reader.ReadLine();
 24             while (line != null)
 25             {
 26                 if (result == null) result = new List<InternalClusterNode>();
 27                 InternalClusterNode n = InternalClusterNode.Parse(line);
 28                 n.Password = node.Password;
 29                 result.Add(n);
 30 
 31                 line = reader.ReadLine();
 32             }
 33 
 34             // 只要任意一個節點拿到集群信息,直接退出
 35             if (result != null && result.Count > 0) break;
 36         }
 37         catch (Exception ex)
 38         {
 39             // 出現異常,如果還沒到最後一個節點,則繼續使用下一下節點讀取集群信息
 40             // 否則拋出異常
 41             if (index < rowCount)
 42                 Thread.Sleep(100);
 43             else
 44                 throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex);
 45         }
 46         finally
 47         {
 48             if (reader != null) reader.Dispose();
 49             if (c != null) c.Dispose();
 50         }
 51     }
 52 
 53 
 54     if (result == null)
 55         result = new List<InternalClusterNode>(0);
 56     return result;
 57 }
 58 
 59 /// <summary>
 60 /// 從 cluster nodes 的每一行命令里讀取出集群節點的相關信息
 61 /// </summary>
 62 /// <param name="line">集群命令</param>
 63 /// <returns></returns>
 64 public static InternalClusterNode Parse(string line)
 65 {
 66     if (string.IsNullOrEmpty(line))
 67         throw new ArgumentException("line");
 68 
 69     InternalClusterNode node = new InternalClusterNode();
 70     node._nodeDescription = line;
 71     string[] segs = line.Split(' ');
 72 
 73     node.NodeId = segs[0];
 74     node.Host = segs[1].Split(':')[0];
 75     node.Port = int.Parse(segs[1].Split(':')[1]);
 76     node.MasterNodeId = segs[3] == "-" ? null : segs[3];
 77     node.PingSent = long.Parse(segs[4]);
 78     node.PongRecv = long.Parse(segs[5]);
 79     node.ConfigEpoch = int.Parse(segs[6]);
 80     node.LinkState = segs[7];
 81 
 82     string[] flags = segs[2].Split(',');
 83     node.IsMater = flags[0] == MYSELF ? flags[1] == MASTER : flags[0] == MASTER;
 84     node.IsSlave = !node.IsMater;
 85     int start = 0;
 86     if (flags[start] == MYSELF)
 87         start = 1;
 88     if (flags[start] == SLAVE || flags[start] == MASTER)
 89         start += 1;
 90     node.NodeFlag = string.Join(",", flags.Skip(start));
 91 
 92     if (segs.Length > 8)
 93     {
 94         string[] slots = segs[8].Split('-');
 95         node.Slot.Start = int.Parse(slots[0]);
 96         if (slots.Length > 1) node.Slot.End = int.Parse(slots[1]);
 97 
 98         for (int index = 9; index < segs.Length; index++)
 99         {
100             if (node.RestSlots == null)
101                 node.RestSlots = new List<HashSlot>();
102 
103             slots = segs[index].Split('-');
104 
105             int s1 = 0;
106             int s2 = 0;
107             bool b1 = int.TryParse(slots[0], out s1);
108             bool b2 = int.TryParse(slots[1], out s2);
109             if (!b1 || !b2)
110                 continue;
111             else
112                 node.RestSlots.Add(new HashSlot(s1, slots.Length > 1 ? new Nullable<int>(s2) : null));
113         }
114     }
115 
116     return node;
117 }

View Code

 

  三、為節點分配緩存客戶端管理器

  在單實例的Redis中,我們通過 PooledRedisClientManager 這個管理器來獲取RedisClient。借鑒這個思路,在Redis Cluster集群中,我們為每一個主節點實例化一個 PooledRedisClientManager,並且該主節點持有的 slot 都共享一個 PooledRedisClientManager 實例。以 slot 做為 key 將 slot 與 PooledRedisClientManager 一一映射並緩存起來。

 1 // 初始化集群管理
 2 void Initialize(IList<InternalClusterNode> clusterNodes = null)
 3 {
 4     // 從 redis 讀取集群信息
 5     IList<InternalClusterNode> nodes = clusterNodes == null ? RedisCluster.ReadClusterNodes(_source) : clusterNodes;
 6 
 7     // 生成主節點,每個主節點的 slot 對應一個REDIS客戶端緩衝池管理器
 8     IList<InternalClusterNode> masters = null;
 9     IDictionary<int, PooledRedisClientManager> managers = null;
10     foreach (var n in nodes)
11     {
12         // 節點無效或者
13         if (!(n.IsMater &&
14             !string.IsNullOrEmpty(n.Host) &&
15             string.IsNullOrEmpty(n.NodeFlag) &&
16             (string.IsNullOrEmpty(n.LinkState) || n.LinkState == InternalClusterNode.CONNECTED))) continue;
17 
18         n.SlaveNodes = nodes.Where(x => x.MasterNodeId == n.NodeId);
19         if (masters == null)
20             masters = new List<InternalClusterNode>();
21         masters.Add(n);
22 
23         // 用每一個主節點的哈希槽做鍵,導入REDIS客戶端緩衝池管理器
24         // 然後,方法表指針(又名類型對象指針)上場,佔據 4 個字節。 4 * 16384 / 1024 = 64KB
25         if (managers == null)
26             managers = new Dictionary<int, PooledRedisClientManager>();
27 
28         string[] writeHosts = new[] { n.HostString };
29         string[] readHosts = n.SlaveNodes.Where(n => false).Select(n => n.HostString).ToArray();
30         var pool = new PooledRedisClientManager(writeHosts, readHosts, _config);
31         managers.Add(n.Slot.Start, pool);
32         if (n.Slot.End != null)
33         {
34             // 這個範圍內的哈希槽都用同一個緩衝池
35             for (int s = n.Slot.Start + 1; s <= n.Slot.End.Value; s++)
36                 managers.Add(s, pool);
37         }
38         if (n.RestSlots != null)
39         {
40             foreach (var slot in n.RestSlots)
41             {
42                 managers.Add(slot.Start, pool);
43                 if (slot.End != null)
44                 {
45                     // 這個範圍內的哈希槽都用同一個緩衝池
46                     for (int s = slot.Start + 1; s <= slot.End.Value; s++)
47                         managers.Add(s, pool);
48                 }
49             }
50         }
51     }
52 
53     _masters = masters;
54     _redisClientManagers = managers;
55     _clusterNodes = nodes != null ? nodes : null;
56 
57     if (_masters == null) _masters = new List<InternalClusterNode>(0);
58     if (_clusterNodes == null) _clusterNodes = new List<InternalClusterNode>(0);
59     if (_redisClientManagers == null) _redisClientManagers = new Dictionary<int, PooledRedisClientManager>(0);
60 
61     if (_masters.Count > 0)
62         _source = _masters.Select(n => new ClusterNode(n.Host, n.Port, n.Password)).ToList();
63 }

View Code

 

  四、將 hash slot 路由到正確的節點

  在訪問一個 key 時,根據第三步緩存起來的 PooledRedisClientManager ,用 key 計算出來的 hash slot 值可以快速找出這個 key 對應的 PooledRedisClientManager 實例,調用 PooledRedisClientManager.GetClient() 即可將 hash slot 路由到正確的主節點。

 1 // 執行指定動作並返回值
 2 private T DoExecute<T>(string key, Func<RedisClient, T> action) => this.DoExecute(() => this.GetRedisClient(key), action);
 3 
 4 // 執行指定動作並返回值
 5 private T DoExecute<T>(Func<RedisClient> slot, Func<RedisClient, T> action, int tryTimes = 1)
 6 {
 7     RedisClient c = null;
 8     try
 9     {
10         c = slot();
11         return action(c);
12     }
13     catch (Exception ex)
14     {
15         // 此處省略 ...
16     }
17     finally
18     {
19         if (c != null)
20             c.Dispose();
21     }
22 }
23 
24 // 獲取指定key對應的主設備節點
25 private RedisClient GetRedisClient(string key)
26 {
27     if (string.IsNullOrEmpty(key))
28         throw new ArgumentNullException("key");
29 
30     int slot = CRC16.GetSlot(key);
31     if (!_redisClientManagers.ContainsKey(slot))
32         throw new SlotNotFoundException(string.Format("No reachable node in cluster for slot {{{0}}}", slot), slot, key);
33 
34     var pool = _redisClientManagers[slot];
35     return (RedisClient)pool.GetClient();
36 }

   

  五、自動發現新節點和自動刷新slot分佈

  在實際生產環境中,Redis 集群經常會有添加/刪除節點、遷移 slot 、主節點宕機從節點轉主節點等,針對這些情況,我們的 Redis Cluster 組件必須具備自動發現節點和刷新在 第三步  緩存起來的 slot 的能力。在這裏我的實現思路是當節點執行 Redis 命令時返回 RedisException 異常時就強制刷新集群節點信息並重新緩存 slot 與 節點之間的映射。

  1 // 執行指定動作並返回值
  2 private T DoExecute<T>(Func<RedisClient> slot, Func<RedisClient, T> action, int tryTimes = 1)
  3 {
  4     RedisClient c = null;
  5     try
  6     {
  7         c = slot();
  8         return action(c);
  9     }
 10     catch (Exception ex)
 11     {
 12         if (!(ex is RedisException) || tryTimes == 0) throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex);
 13         else
 14         {
 15             tryTimes -= 1;
 16             // 嘗試重新刷新集群信息
 17             bool isRefresh = DiscoveryNodes(_source, _config);
 18             if (isRefresh)
 19                 // 集群節點有更新過,重新執行
 20                 return this.DoExecute(slot, action, tryTimes);
 21             else
 22                 // 集群節點未更新過,直接拋出異常
 23                 throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex);
 24         }
 25     }
 26     finally
 27     {
 28         if (c != null)
 29             c.Dispose();
 30     }
 31 }
 32 
 33 // 重新刷新集群信息
 34 private bool DiscoveryNodes(IEnumerable<ClusterNode> source, RedisClientManagerConfig config)
 35 {
 36     bool lockTaken = false;
 37     try
 38     {
 39         // noop
 40         if (_isDiscoverying) { }
 41 
 42         Monitor.Enter(_objLock, ref lockTaken);
 43 
 44         _source = source;
 45         _config = config;
 46         _isDiscoverying = true;
 47 
 48         // 跟上次同步時間相隔 {MONITORINTERVAL} 秒鐘以上才需要同步
 49         if ((DateTime.Now - _lastDiscoveryTime).TotalMilliseconds >= MONITORINTERVAL)
 50         {
 51             bool isRefresh = false;
 52             IList<InternalClusterNode> newNodes = RedisCluster.ReadClusterNodes(_source);
 53             foreach (var node in newNodes)
 54             {
 55                 var n = _clusterNodes.FirstOrDefault(x => x.HostString == node.HostString);
 56                 isRefresh =
 57                     n == null ||                        // 新節點                                                                
 58                     n.Password != node.Password ||      // 密碼變了                                                                
 59                     n.IsMater != node.IsMater ||        // 主變從或者從變主                                                                
 60                     n.IsSlave != node.IsSlave ||        // 主變從或者從變主                                                                
 61                     n.NodeFlag != node.NodeFlag ||      // 節點標記位變了                                                                
 62                     n.LinkState != node.LinkState ||    // 節點狀態位變了                                                                
 63                     n.Slot.Start != node.Slot.Start ||  // 哈希槽變了                                                                
 64                     n.Slot.End != node.Slot.End ||      // 哈希槽變了
 65                     (n.RestSlots == null && node.RestSlots != null) ||
 66                     (n.RestSlots != null && node.RestSlots == null);
 67                 if (!isRefresh && n.RestSlots != null && node.RestSlots != null)
 68                 {
 69                     var slots1 = n.RestSlots.OrderBy(x => x.Start).ToList();
 70                     var slots2 = node.RestSlots.OrderBy(x => x.Start).ToList();
 71                     for (int index = 0; index < slots1.Count; index++)
 72                     {
 73                         isRefresh =
 74                             slots1[index].Start != slots2[index].Start ||   // 哈希槽變了                                                                
 75                             slots1[index].End != slots2[index].End;         // 哈希槽變了
 76                         if (isRefresh) break;
 77                     }
 78                 }
 79 
 80                 if (isRefresh) break;
 81             }
 82 
 83             if (isRefresh)
 84             {
 85                 // 重新初始化集群
 86                 this.Dispose();
 87                 this.Initialize(newNodes);
 88                 this._lastDiscoveryTime = DateTime.Now;
 89             }
 90         }
 91 
 92         // 最後刷新時間在 {MONITORINTERVAL} 內,表示是最新群集信息 newest
 93         return (DateTime.Now - _lastDiscoveryTime).TotalMilliseconds < MONITORINTERVAL;
 94     }
 95     finally
 96     {
 97         if (lockTaken)
 98         {
 99             _isDiscoverying = false;
100             Monitor.Exit(_objLock);
101         }
102     }
103 }

View Code

 

  六、配置訪問組件調用入口

  最後我們需要為組件提供訪問入口,我們用 RedisCluster 類實現 字符串、列表、哈希、集合、有序集合和Keys的基本操作,並且用 RedisClusterFactory 工廠類對外提供單例操作,這樣就可以像單實例 Redis 那樣調用 Redis Cluster 集群。調用示例:

var node = new ClusterNode("127.0.0.1", 7001);
var redisCluster = RedisClusterFactory.Configure(node, config);
string key = "B070x14668";
redisCluster.Set(key, key);
string value = redisCluster.Get<string>(key);
redisCluster.Del(key);
 1 /// <summary>
 2 /// REDIS 集群工廠
 3 /// </summary>
 4 public class RedisClusterFactory
 5 {
 6     static RedisClusterFactory _factory = new RedisClusterFactory();
 7     static RedisCluster _cluster = null;
 8 
 9     /// <summary>
10     /// Redis 集群
11     /// </summary>
12     public static RedisCluster Cluster
13     {
14         get
15         {
16             if (_cluster == null)
17                 throw new Exception("You should call RedisClusterFactory.Configure to config cluster first.");
18             else
19                 return _cluster;
20         }
21     }
22 
23     /// <summary>
24     /// 初始化 <see cref="RedisClusterFactory"/> 類的新實例
25     /// </summary>
26     private RedisClusterFactory()
27     {
28     }
29 
30     /// <summary>
31     /// 配置 REDIS 集群
32     /// <para>若群集中有指定 password 的節點,必須使用  IEnumerable&lt;ClusterNode&gt; 重載列舉出這些節點</para>
33     /// </summary>
34     /// <param name="node">集群節點</param>
35     /// <returns></returns>
36     public static RedisCluster Configure(ClusterNode node)
37     {
38         return RedisClusterFactory.Configure(node, null);
39     }
40 
41     /// <summary>
42     /// 配置 REDIS 集群
43     /// <para>若群集中有指定 password 的節點,必須使用  IEnumerable&lt;ClusterNode&gt; 重載列舉出這些節點</para>
44     /// </summary>
45     /// <param name="node">集群節點</param>
46     /// <param name="config"><see cref="RedisClientManagerConfig"/> 客戶端緩衝池配置</param>
47     /// <returns></returns>
48     public static RedisCluster Configure(ClusterNode node, RedisClientManagerConfig config)
49     {
50         return RedisClusterFactory.Configure(new List<ClusterNode> { node }, config);
51     }
52 
53     /// <summary>
54     /// 配置 REDIS 集群
55     /// </summary>
56     /// <param name="nodes">集群節點</param>
57     /// <param name="config"><see cref="RedisClientManagerConfig"/> 客戶端緩衝池配置</param>
58     /// <returns></returns>
59     public static RedisCluster Configure(IEnumerable<ClusterNode> nodes, RedisClientManagerConfig config)
60     {
61         if (nodes == null)
62             throw new ArgumentNullException("nodes");
63 
64         if (nodes == null || nodes.Count() == 0)
65             throw new ArgumentException("There is no nodes to configure cluster.");
66 
67         if (_cluster == null)
68         {
69             lock (_factory)
70             {
71                 if (_cluster == null)
72                 {
73                     RedisCluster c = new RedisCluster(nodes, config);
74                     _cluster = c;
75                 }
76             }
77         }
78 
79         return _cluster;
80     }
81 }

View Code

 

  總結

  今天我們詳細介紹了如何從0手寫一個Redis Cluster集群客戶端訪問組件,相信對同樣在尋找類似解決方案的同學們會有一定的啟發,喜歡的同學請點個 star。在沒有相同案例可以參考的情況下筆者通過查閱官方說明文檔和借鑒 Java 的 JedisCluster 的實現思路,雖說磕磕碰碰但最終也初步完成這個組件並投入使用,必須給自己加一個雞腿!!在此我有一個小小的疑問,.NET 的同學們在用 Redis 集群時,你們是用什麼組件耍的,為何網上的相關介紹和現成組件幾乎都沒有?歡迎討論。

  GitHub 代碼託管:https://github.com/TANZAME/ServiceStack.Redis.Cluster

  技術交流 QQ 群:816425449

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

FB行銷專家,教你從零開始的技巧

聚甘新

荷蘭開放首條塑膠再製自行車道 約含50萬個瓶蓋

摘錄自2018年09月18日科技新報報導

荷蘭人以愛好騎單車聞名,該國約 1,700 萬人,卻擁有超過 2,200 萬輛腳踏車,光是阿姆斯特丹就鋪設了近 800 公里的自行車道,東北部城鎮茲沃勒(Zwolle)日前又正式開放一條全由回收塑膠再製生成的自行車道。

該塑膠車道概念來自道路工程公司 KWS 的 Anne Koudstaal 和 Simon Jorritsma,道路全長 30 公尺,採用預製模塊,重量輕、易安裝,下方設計了排水系統讓管道、電纜通過,可讓水快速流通,遇到暴雨時還能充當臨時儲水槽、避免淹水。

據估計,這條自行車道包含了約 218,000 個塑膠杯,或 50 萬個塑膠瓶蓋,使用年限可比傳統道路長 2~3 倍(有待觀察),路面應該也不會出現裂縫或坑洞。

 

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

德國小城弗萊堡 交通轉型有成 35年私人汽車減半

環境資訊中心特約記者 陳文姿報導

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司“嚨底家”!

※推薦評價好的iphone維修中心

聚甘新