剖析nsq消息隊列(三) 消息傳輸的可靠性和持久化[二]diskqueue

,大概提了一下消息的持久化,--mem-queue-size 設置為 0,所有的消息將會存儲到磁盤。
總有人說nsq的持久化問題,消除疑慮的方法就是閱讀原碼做benchmark測試,個人感覺nsq還是很靠譜的。
nsq自己實現了一個先進先出的消息文件隊列是把消息保存到本地文件內,很值得分析一下他的實現過程。

整體處理邏輯

go-diskqueue 會啟動一個gorouting進行讀寫數據也就是方法ioLoop
會根據你設置的參數來進行數據的讀寫,流程圖如下

這個圖畫的也不是特別的準確 ioLoop 用的是 select 並不是if else 當有多個條件為true時,會隨機選一個進行執行

nsq 生成的數據大致如下:

xxxx.diskqueue.meta.dat 元數據保存了未讀消息的長度,讀取和存入數據的編號和讀取位置
xxxx.diskqueue.編號.dat 消息保存的文件,每一個消息的存儲:4Byte消息的長度+消息

參數說明

一些主要的參數和約束說明
這些參數的使用在後面的處理邏輯中會提到

// diskQueue implements a filesystem backed FIFO queue
type diskQueue struct {
    // run-time state (also persisted to disk)
    // 讀取數據的位置    
    readPos      int64
    // 寫入數據的位置
    writePos     int64
    // 讀取文件的編號    
    readFileNum  int64
    // 寫入文件的編號
    writeFileNum int64
    // 未處理的消息總數    
    depth        int64

    // instantiation time metadata
    // 每個文件的大小限制    
    maxBytesPerFile int64 // currently this cannot change once created
    // 每條消息的最小大小限制    
    minMsgSize      int32
    // 每條消息的最大大小限制    
    maxMsgSize      int32
    // 緩存消息有多少條後進行寫入    
    syncEvery       int64         // number of writes per fsync
    // 自動寫入消息文件的時間間隔    
    syncTimeout     time.Duration // duration of time per fsync
    exitFlag        int32
    needSync        bool

    // keeps track of the position where we have read
    // (but not yet sent over readChan)
    // 下一條消息的位置    
    nextReadPos     int64
    // 下一條消息的文件編號    
    nextReadFileNum int64

    // 讀取的文件
    readFile  *os.File
    // 寫入的文件    
    writeFile *os.File
    // 讀取的buffer    
    reader    *bufio.Reader
    // 寫入的buffer    
    writeBuf  bytes.Buffer

    // exposed via ReadChan()
    // 讀取數據的channel    
    readChan chan []byte

    //.....
}

數據

元數據

讀寫數據信息的元數據保存在xxxxx.diskqueue.meta.data文件內主要用到代碼里的字段如下
未處理的消息總數 depth
讀取文件的編號 readFileNum 讀取數據的位置 readPos
寫入文件的編號 writeFileNum 寫入數據的位置 writePos
真實數據如下

15
0,22
3,24

保存元數據信息

func (d *diskQueue) persistMetaData() error {
    // ...
    fileName := d.metaDataFileName()
    tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
    // write to tmp file
    f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600)
    // 元數據信息
    _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
        atomic.LoadInt64(&d.depth),
        d.readFileNum, d.readPos,
        d.writeFileNum, d.writePos)
    // 保存
    f.Sync()
    f.Close()
    // atomically rename
    return os.Rename(tmpFileName, fileName)
}

得到元數據信息

func (d *diskQueue) retrieveMetaData() error {
    // ...
    fileName := d.metaDataFileName()
    f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
    // 讀取數據並賦值
    var depth int64
    _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
        &depth,
        &d.readFileNum, &d.readPos,
        &d.writeFileNum, &d.writePos)
    //...
    atomic.StoreInt64(&d.depth, depth)
    d.nextReadFileNum = d.readFileNum
    d.nextReadPos = d.readPos
    return nil
}

消息數據

寫入一條數據

ioLoop 中發現有數據寫入時,會調用writeOne方法,把消息保存到文件內

        select {
        // ...
        case dataWrite := <-d.writeChan:
            count++
            d.writeResponseChan <- d.writeOne(dataWrite)
        // ...
func (d *diskQueue) writeOne(data []byte) error {
    var err error

    if d.writeFile == nil {
        curFileName := d.fileName(d.writeFileNum)
        d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
        // ...
        if d.writePos > 0 {
            _, err = d.writeFile.Seek(d.writePos, 0)
            // ...
        }
    }

    dataLen := int32(len(data))
    // 判斷消息的長度是否合法
    if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
        return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)
    }
    d.writeBuf.Reset()
    // 寫入4字節的消息長度,以大端序保存
    err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
    if err != nil {
        return err
    }
    // 寫入消息
    _, err = d.writeBuf.Write(data)
    if err != nil {
        return err
    }

    // 寫入到文件
    _, err = d.writeFile.Write(d.writeBuf.Bytes())
    // ...
    // 計算寫入位置,消息數量加1
    totalBytes := int64(4 + dataLen)
    d.writePos += totalBytes
    atomic.AddInt64(&d.depth, 1)
    // 如果寫入位置大於 單個文件的最大限制, 則持久化文件到硬盤
    if d.writePos > d.maxBytesPerFile {
        d.writeFileNum++
        d.writePos = 0

        // sync every time we start writing to a new file
        err = d.sync()
        // ...
    }
    return err
}

寫入完消息后,會判斷當前的文件大小是否已經已於maxBytesPerFile如果大,就持久化文件到硬盤,然後重新打開一個新編號文件,進行寫入。

什麼時候持久化文件到硬盤

調用sync()方法會持久化文件到硬盤,然後重新打開一個新編號文件,進行寫入。
有幾個地方調用會調用這個方法:

  • 一個寫入文件的條數達到了syncEvery的值時,也就是初始化時設置的最大的條數。會調用sync()
  • syncTimeout 初始化時設置的同步時間間隔,如果這個時間間隔到了,並且寫入的文件條數>0的時候,會調用sync()
  • 還有就是上面說過的writeOne方法,寫入完消息后,會判斷當前的文件大小是否已經已於maxBytesPerFile如果大,會調用sync()
  • 當讀取文件時,把整個文件讀取完時,會刪除這個文件並且會把needSync 設置為trueioLoop 會調用sync()
  • 還有就是Close的時候,會調用sync()
func (d *diskQueue) sync() error {
    if d.writeFile != nil {
        // 把數據 flash到硬盤,關閉文件並設置為 nil
        err := d.writeFile.Sync()
        if err != nil {
            d.writeFile.Close()
            d.writeFile = nil
            return err
        }
    }
    // 保存元數據信息
    err := d.persistMetaData()
    // ...
    d.needSync = false
    return nil
}

讀取一條數據

元數據保存着 讀取文件的編號 readFileNum 和讀取數據的位置 readPos
並且diskQueue暴露出了一個方法來,通過channel來讀取數據

func (d *diskQueue) ReadChan() chan []byte {
    return d.readChan
}

ioLoop里,當發現讀取位置小於寫入位置 或者讀文件編號小於寫文件編號,並且下一個讀取位置等於當前位置時才會讀取一條數據,然後放在一個外部全局變量 dataRead 里,並把 讀取的channel 賦值監聽 r = d.readChan,當外部有人讀取了消息,則進行moveForward操作

func (d *diskQueue) ioLoop() {
    var dataRead []byte
    var err error
    var count int64
    var r chan []byte
    for {
        // ...
        if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
            if d.nextReadPos == d.readPos {
                dataRead, err = d.readOne()
                if err != nil {
                    d.handleReadError()
                    continue
                }
            }
            r = d.readChan
        } else {
            r = nil
        }

        select {
        // ...
        case r <- dataRead:
            count++
            // moveForward sets needSync flag if a file is removed
            d.moveForward()
        // ...
        }
    }

// ...
}

readOne 從文件里讀取一條消息,4個bit的大小,然後讀取具體的消息。如果讀取位置大於最大文件限制,則close。在moveForward里會進行刪除操作

func (d *diskQueue) readOne() ([]byte, error) {
    var err error
    var msgSize int32
    // 如果readFile是nil,打開一個新的
    if d.readFile == nil {
        curFileName := d.fileName(d.readFileNum)
        d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
        // ...
        d.reader = bufio.NewReader(d.readFile)
    }
    err = binary.Read(d.reader, binary.BigEndian, &msgSize)
    // ...
    readBuf := make([]byte, msgSize)
    _, err = io.ReadFull(d.reader, readBuf)
    totalBytes := int64(4 + msgSize)
    // ...
    d.nextReadPos = d.readPos + totalBytes
    d.nextReadFileNum = d.readFileNum
    // 如果讀取位置大於最大文件限制,則close。在moveForward里會進行刪除操作
    if d.nextReadPos > d.maxBytesPerFile {
        if d.readFile != nil {
            d.readFile.Close()
            d.readFile = nil
        }
        d.nextReadFileNum++
        d.nextReadPos = 0
    }
    return readBuf, nil
}

moveForward方法會查看讀取的編號,如果發現下一個編號 和當前的編號不同時,則刪除舊的文件。

func (d *diskQueue) moveForward() {
    oldReadFileNum := d.readFileNum
    d.readFileNum = d.nextReadFileNum
    d.readPos = d.nextReadPos
    depth := atomic.AddInt64(&d.depth, -1)

    // see if we need to clean up the old file
    if oldReadFileNum != d.nextReadFileNum {
        // sync every time we start reading from a new file
        d.needSync = true

        fn := d.fileName(oldReadFileNum)
        err := os.Remove(fn)
        // ...
    }
    d.checkTailCorruption(depth)

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家費用,距離,噸數怎麼算?達人教你簡易估價知識!