探索 RabbitMQ 的內部架構
從 AMQP 到訊息處理過程的介紹
前言
學習使用 RabbitMQ 是因為現今的軟體開發領域都會用到 MQ,而 RabbitMQ 作為一款很熱門且被廣泛使用的開源的 MQ,所以是我可以較為容易學習的一大原因。
透過深入了解 RabbitMQ 一些運作原理,我們可以有效地建立和管理複雜的訊息傳遞系統。
AMQP
在講解內部結構之前,我想要介紹一下 AMQP,因為他會跟接下來介紹的 Exchanges 有關。
AMQP 是一種開放標準的應用層協議,專門設計用於標準化訊息的傳遞和佇列管理。AMQP 的核心設計理念是提供一個統一的、跨平台的訊息傳遞標準,使得來自不同語言和架構的系統能夠無縫地溝通。在 AMQP 中,消息被發送到稱為 Exchanges 的地方,這些 Exchanges 根據定義好的規則將訊息 routing 到一個或多個 Queues。當消費者(Consumers)訂閱這些佇列以接收訊息,處理後可以發送確認信號表示訊息已被正確處理。
AMQP 的一個最關鍵特性是其訊息確認機制,它確保了訊息傳遞的可靠性。當訊息從生產者發送到消費者時,消費者需要發送一個確認回應(ACK),表明訊息已被接收並處理。這種機制減少了訊息丟失的可能性,特別是在分散式系統,各個環節都可能出錯的環境下。
Exchanges
在 RabbitMQ 中,Exchanges 是 AMQP 的實體用來作為訊息的分發中心。一個訊息被發送到 Exchange,隨後根據設定的路由規則,分發到一個或多個佇列中。RabbitMQ 支援多種類型的 Exchanges,包括 Direct, Fanout, Topic 和 Header
其中 AMQP 本身預設的 Exchanges 即是 Direct Exchange。
Direct Exchange
這個交換機制是最基本的,也是最簡單的交換機制。它的主要功能是根據 routing key 來決定消息的分發目的地。在 Direct Exchange 中,當一條消息發送到該 Exchange 時,它會將這條消息轉發到與該 Exchange 綁定且 routing key 完全匹配的佇列。
那什麼是 routing key?Routing key 是附加在訊息上的一個 identifier,它在 MQ 中用來辨識訊息應該被送往哪個 Queue。這個標籤通常是一個字串,它在訊息被發送到 Exchange 時用來判斷遵循哪個路由規則。
而在 Direct Exchange 中,Routing 的工作原理非常直接,他是精確的 routing key 配對。當一個 Producer 發送訊息到 Direct Exchange 時,它會指定一個 routing key。然後 Exchange 會檢查所有與這個 routing key 對應的 Queues,並將訊息傳送到這些符合的 Queues。如果一個訊息的 routing key 沒有對應的 Queues,那麼這個訊息將不會被轉發甚至可能被丟棄。
由於只是單純的精確配對,Direct Exchange 在處理大量訊息時就會非常有效率,只要好好地設計 routing key,就可以讓訊息有效地被處理。
Fanout Exchange
與 Direct Exchange 不一樣的運作機制。Fanout Exchange 則是不考慮 routing key,它會將收到的所有訊息廣播到所有訂閱這個 exchanges queues。這種類型的 Exchange 適合用於廣播場景,如系統通知。
說到廣播,大家通常就會想到 pub/sub 的模式,一個訊息由發布者上傳,所有人就會取得這些資料。在 RabbitMQ 中設定 Fanout Exchange 就非常簡單,只需要把 Queues 跟 Exchange 連結即可,不需要設定 routing key。
當然因為涉及廣播,效能一定不會比 Direct Exchange 好,今天 Fanout Exchange 如果一時間有大量的訊息要處理,效能就會非常糟糕。在設計上可能要先確認到底哪些 Queues 要跟 Fanout Exchange 綁在一起。
有沒有辦法做到廣播,但又可以設定一些機制不要每個訊息通通送到每個 Queues 來解決效能的問題?接下來介紹的 Exchange 就是要解決這樣的問題。
Topic Exchange
為了解決 Fanout Exchange 的一些問題,Topic Exchange 在廣播的概念中也加入了 Direct Exchange 的 routing key 的概念。所以當一個訊息被送到 Topic Exchange 時,它的 routing key 會先找出指定好的 Queues 將資訊傳送過去。在 routing key 方面跟 Direct Exchange 差異是,前面說了 Direct Exchange 是精確配對,而 Topic Exchange 可以使用 routing key pattern match 來跟 Queues 搭配:使用 * 這個符號,或是使用 #,來達到靈活度。
舉例來說 #.health.*
表示 在 health 這個字之前,可以 0 到多的字,在 health 之後的字至少一個。所以 #.阿.*
的 topic 可以是 阿門、阿拉、阿彌陀佛
,或是沒有阿
,但 好了拉做個效果
就不會進到 Queue 裡面。
所以你可以認為 Fanout Exchange 無條件地將訊息廣播到所有綁定的 Queues,而 Topic Exchange 則可以根據 routing key pattern match 來決定把訊息送到哪些 Queues。
在系統設計中,我們就可以利用 Topic Exchange 來做到把不同訊息轉送到不同的服務或處理流程進行後續的處理。但是如果我的訊息本身有不同屬性的話,我要怎麼設計 routing key?接下來介紹的 Header Exchange 就是要解決 routing key 不太彈性的問題。
Header Exchange
顧名思義 Header Exchange 是在訊息的 Header 加入多個 key-value pair 的內容。然後在這個 Exchange 中利用 Header 決定要把訊息送到哪些 Queues。所以你可以在 header 中設置多個不同屬性例如 x-match:any or x-match: all
,甚至可以在 Header 中設定優先處理程度或是訊息內容的機敏程度,然後送到指定的 Queues。
所以如果你的訊息處理牽涉到不同的分類、屬性的時候,考慮使用 Header Exchange 就會更靈活。
Queues
講完了 Exchanges 之後,我們來看看 Queues。
Queue 是把訊息先進先出處理的儲存容器,在 RabbitMQ 中,每個 Queue 都可以有其特定的屬性,例如 name
、durable
、exclusive
跟auto-delete
。
這些我舉例的屬性是因為他真的很常被使用,Name 是用來標誌 Queue;durable 確保這個 Queue 在整個 RabbitMQ 重啟後是不是要持續存在;Auto-delete 用在如果 consumer,例如下游的應用程式斷開對這個 Queue 的連接時,Queue 要不要自動刪除;Exclusive 是只有建立 Queue 的連接可以使用這個 Queue,且當連接終止時會自動刪除這個 Queue。
在 RabbitMQ 中的 Queue 怎麼確保訊息被正確的處理?Queue 會透過 Consumer 發送的 ACK(acknowledgement) 表達訊息的有處理成功,反之如果訊息沒有被正確處理,則會回傳 reject 或是請求訊息重新排隊(requeue)。
如果訊息的產生速度太快,Consumer 來不及處理完成,就會產生訊息堆積的問題。除了設定 Queue 的最大長度 x-max-length
,在Queue 滿載的時候丟棄最舊的訊息或是拒絕收到新的訊息以外,增加 Consumer 數量或是改善 Consumer 本身的處理能力(比如改良演算法)以外,往訊息本身的改善也是一個可以思考的要點,這些處理包括確認有沒有所謂的 hot shard 的問題,只有一個 Queue 被過度使用而其他的 Queue 閒置著。或是針對 Queue 設定不同優先處理的程度(搭配前面說的 Header Exchange)。
另外的設定也可以考慮針對訊息設定 TTL,避免塞了一堆過期但不用重新處理的訊息在裡面。
還有訊息堆積可能來自於被拒絕處理的訊息,使用 DLQ(Dead Letter Queue) 來放這些處理失敗的訊息,才不會在主要的 Queue 中產生堆疊。
針對重新排隊的訊息,我們可以使用 Exponential Backoff 的方式避免過短的時間重新嘗試造成堆積。
Bindings
介紹了 Exchanges 和 Queues,下面一個小篇幅介紹把兩者建立關連的概念說明一下,因為也是前面兩個部分的統整。
Bindings 是定義 Exchange 如何將訊息 routing 到 Queues 的規則。一個 Binding 是一個連接,它定義了從一個 Exchange 到一個 Queue 的路由規則。當一個訊息到達 Exchange 時,它是否被轉發到一個特定的 Queue 取決於這個 Binding 的規則。
在大多數類型的 Exchanges(除了 Fanout Exchange)中,Bindings 依賴於 routing key 的參數。Routing key 在前面提過是訊息的一部分,用於決定這個訊息應該被 routing 到哪個 Queue。
Comsumers
在 RabbitMQ 中,消費者(Consumers)就是著接收和處理訊息的角色,可以是,系統也可以是其他的流程。基本上在 RabbitMQ 中 Queues 是可以被一到多個 Consumers「訂閱」。Consumers 在訊息正確的接收後會回傳 ACK (message acknowledgements) 給 RabbitMQ,用以表明訊息已經收到。如果Consumers 因為某種原因無法處理該訊息,它可以選擇拒絕接收這個訊息。這個被拒絕的訊息就會重新回到 Queue 中排隊或是被丟棄,至於具體的行為則是根據 AMQP 協議中定義或是自己客製化的設定。
如果正常處理完畢後,Consumers 會回傳一個確認訊號給 RabbitMQ。
結論
RabbitMQ 本身的設計有很多為了處理訊息所做的設計,是值得我們參考學習的,可以知道這些設計,推論出要面對哪些可能存在的系統架構。