詳解 Seata Golang 客戶端 AT 模式及其使用

源碼seata-golang

概述

  我們知道 Seata Java Client 的 AT 模式,通過代理數據源,實現了對業務代碼無侵入的分佈式事務協調機制,將與 Transaction Coordinator (TC) 交互的邏輯、Commit 的邏輯、Rollback 的邏輯,隱藏在切面和代理數據源相應的代碼中,使開發者無感知。那如果這個方法,要用 Golang 來實現一遍,應該如何操作呢?關於這個問題,我想了很久,最初的設想是,對 database/sql 的 mysql driver 進行增強,在對包 github.com/go-sql-driver/mysql 研究了一段時間后,還是沒有頭緒,不知如何下手,最後轉而增強 database/sql 包。由於 AT 模式必須保證本地事務的正確處理,在具體業務開發時,首先要通過 db.Begin() 獲得一個 Tx 對象,然後再 tx.Exec() 執行數據庫操作,最後 tx.Commit() 提交或 tx.Rollback() 回滾。這種處理方式算是一個 Golang 數據庫事務處理的基本操作。 所以對 database/sql 的增強,我們重點關注這幾個方法 db.Begin()tx.Exec()tx.Commit()tx.Rollback

事務提交、回滾

  通過 Seata Java Client 的相關代碼,我們知道,在本地事務提交的時候,主要是將分支事務註冊到 TC 上,並將數據庫操作產生的 undoLog 一起寫入到 undoLog 表;本地事務回滾的時候,需要將分支事務(即本地事務)的執行狀態報告給 TC,使 TC 好知道是否通知參与全局事務的其他分支回滾。

func (tx *Tx) Commit() error {
        //註冊分支事務
	branchId,err := tx.register()
	if err != nil {
		return errors.WithStack(err)
	}
	tx.tx.Context.BranchId = branchId

	if tx.tx.Context.HasUndoLog() {
                //將 undoLog 寫入 undoLog 表
		err = manager.GetUndoLogManager().FlushUndoLogs(tx.tx)
		if err != nil {
			err1 := tx.report(false)
			if err1 != nil {
				return errors.WithStack(err1)
			}
			return errors.WithStack(err)
		}
		err = tx.tx.Commit()
		if err != nil {
			err1 := tx.report(false)
			if err1 != nil {
				return errors.WithStack(err1)
			}
			return errors.WithStack(err)
		}
	} else {
		return tx.tx.Commit()
	}
	if tx.reportSuccessEnable {
		tx.report(true)
	}
	tx.tx.Context.Reset()
	return nil
}

  db.Begin() 會產生一個 Tx 對象,tx.Exec() 會產生 undoLog,tx.Commit() 將 undoLog 刷到數據庫中。那麼 undoLog 保存到哪裡呢?答案是 TxContext 中。

type TxContext struct {
	*context.RootContext
	Xid string
	BranchId int64
	IsGlobalLockRequire bool

	LockKeysBuffer *model.Set
	SqlUndoItemsBuffer []*undo.SqlUndoLog
}

  Commit() 方法中的 tx.tx.Context,第一個 tx 是封裝的 Tx 對象,第二個 tx 是 database/sql 的 Tx,tx.tx.Context 則是 TxContext。UndoLogManager 則是操作 undoLog 的核心對象,處理 undoLog 的插入、刪除,並查詢出 undoLog 用於回滾。

func (tx *Tx) Rollback() error {
	err := tx.tx.Rollback()
	if tx.tx.Context.InGlobalTransaction() && tx.tx.Context.IsBranchRegistered() {
                // 報告 TC 分支事務執行失敗
		tx.report(false)
	}
	tx.tx.Context.Reset()
	return err
}

  通過上面的代碼呢,我們知道增強型 Tx 對象需要向 TC 註冊分支事務,並報告分支事務的執行狀態,相應代碼如下:

func (tx *Tx) register() (int64,error) {
	return dataSourceManager.BranchRegister(meta.BranchTypeAT,tx.tx.ResourceId,"",tx.tx.Context.Xid,
		nil,tx.tx.Context.BuildLockKeys())
}

func (tx *Tx) report(commitDone bool) error {
	retry := tx.reportRetryCount
	for retry > 0 {
		var err error
		if commitDone {
			err = dataSourceManager.BranchReport(meta.BranchTypeAT, tx.tx.Context.Xid, tx.tx.Context.BranchId,
				meta.BranchStatusPhaseoneDone,nil)
		} else {
			err = dataSourceManager.BranchReport(meta.BranchTypeAT, tx.tx.Context.Xid, tx.tx.Context.BranchId,
				meta.BranchStatusPhaseoneFailed,nil)
		}
		if err != nil {
			logging.Logger.Errorf("Failed to report [%d/%s] commit done [%t] Retry Countdown: %d",
				tx.tx.Context.BranchId,tx.tx.Context.Xid,commitDone,retry)
			retry = retry -1
			if retry == 0 {
				return errors.WithMessagef(err,"Failed to report branch status %t",commitDone)
			}
		}
	}
	return nil
}

  和 TC 進行通信的主要邏輯還是在 DataSourceManager 裏面。AT 模式涉及的兩個關鍵對象 DataSourceManager、UndoLogManager 就浮出水面。一個用於遠程 TC 交互,一個用於本地數據庫處理。

事務執行

func (tx *Tx) Exec(query string, args ...interface{}) (sql.Result, error) {
	var parser = p.New()
        // 解析業務 sql
	act,_ := parser.ParseOneStmt(query,"","")
	deleteStmt,isDelete := act.(*ast.DeleteStmt)
	if isDelete {
		executor := &DeleteExecutor{
			tx:            tx.tx,
			sqlRecognizer: mysql.NewMysqlDeleteRecognizer(query,deleteStmt),
			values:        args,
		}
		return executor.Execute()
	}

	insertStmt,isInsert := act.(*ast.InsertStmt)
	if isInsert {
		executor := &InsertExecutor{
			tx:            tx.tx,
			sqlRecognizer: mysql.NewMysqlInsertRecognizer(query,insertStmt),
			values:        args,
		}
		return executor.Execute()
	}

	updateStmt,isUpdate := act.(*ast.UpdateStmt)
	if isUpdate {
		executor := &UpdateExecutor{
			tx:            tx.tx,
			sqlRecognizer: mysql.NewMysqlUpdateRecognizer(query,updateStmt),
			values:        args,
		}
		return executor.Execute()
	}

	return tx.tx.Tx.Exec(query,args)
}

  執行業務 sql,並生成 undoLog 的關鍵,在於識別業務 sql 執行了什麼操作:插入?刪除?修改?這裏使用 tidb 的 sql parser 去解析業務 sql,再使用相應的執行器去執行業務 sql,生成 undoLog 保存在 Tx_Context 中。

事務開啟

  db.Begin() 返回增強型的 Tx 對象。

func (db *DB) Begin(ctx *context.RootContext) (*Tx,error) {
	tx,err := db.DB.Begin()
	if err != nil {
		return nil,err
	}
	proxyTx := &tx2.ProxyTx{
		Tx:         tx,
		DSN:        db.conf.DSN,
		ResourceId: db.GetResourceId(),
		Context:    tx2.NewTxContext(ctx),
	}
	return &Tx{
		tx: proxyTx,
		reportRetryCount: db.conf.ReportRetryCount,
		reportSuccessEnable: db.conf.ReportSuccessEnable,
	},nil
}

seata-golang at 模式的使用

sample 代碼

  • 首先執行 scripts 腳本,初始化數據庫
    如果之前沒有初始化過 seata 數據庫,先執行 seata-golang/scripts/server/db/mysql.sql 腳本
  • 修改 dsn 數據庫配置,修改下列文件:
seata-golang/tc/app/profiles/dev/config.yml
seata-golang/samples/at/product_svc/conf/client.yml
seata-golang/samples/at/product_svc/conf/client.yml
  • 將下列文件中的 configPath 修改為 client.yml 配置文件的路徑
seata-golang/samples/at/product_svc/main.go
seata-golang/samples/at/order_svc/main.go
seata-golang/samples/at/aggregation_svc/main.go
  • 依次運行 tc、order_svc、product_svc、aggragation_svc,訪問下列地址開始測試:
http://localhost:8003/createSoCommit
http://localhost:8003/createSoRollback

TC 啟動參考參与 Seata 社區到 go 與 Seata 的邂逅

seata-golang 後續安排

  接下來不打算再增加新的 feature。Java 版 Seata 畢竟發展了一年多時間,並且有很多社區成員一起維護,Go 版本目前主要是我在開發,時間不到2個月,現有的代碼,僅是完成了框架,還需要大量優化,改bug,後續的工作重心在於使 seata-golang 穩定運行,生產可用,希望對分佈式事務感興趣且對 Go 感興趣的同學一起加入進來,一起做些事情。進入微信群,請加我微信:scottlewis,備註進群。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※回頭車貨運收費標準

網頁設計最專業,超強功能平台可客製化

※別再煩惱如何寫文案,掌握八大原則!