允許多個并發(fā)用戶處理在同一個通訊通道接收的消息。這種模式使系統(tǒng)能夠同時處理多個郵件,以優(yōu)化吞吐量,提高可擴展性和可用性,以及平衡工作負載。
在云中運行的應(yīng)用程序,可以預(yù)計,以處理大量的請求。而不是過程的每個請求同步地,一個常用的方法是通過一個消息傳送系統(tǒng)到該異步地處理它們的另一服務(wù)(消費者服務(wù)),以通過他們的應(yīng)用程序。這種策略有助于確保在應(yīng)用程序的業(yè)務(wù)邏輯沒有被阻塞,而正在處理的請求。
請求的數(shù)量可以隨著時間的原因有很多顯著變化。突然一陣在用戶活動或聚集的請求,來自多個租戶未來可能會導(dǎo)致不可預(yù)測的工作負載。在高峰時間的系統(tǒng)可能需要處理許多每秒數(shù)百個請求,而在其他時間的數(shù)量可能是非常小的。此外,該工作的性質(zhì)進行的處理這些請求可能是高度可變的。使用消費者服務(wù)的單個實例,可能會導(dǎo)致該實例成為充斥請求或消息傳送系統(tǒng)可通過消息從應(yīng)用程序來的流入被重載。為了處理這種波動的負載,該系統(tǒng)可以運行消費者服務(wù)的多個實例。然而這些消費者必須協(xié)調(diào),以確保每個消息只傳送給一個單個消費者。工作量也需要跨消費者被負載平衡,以防止一個實例成為瓶頸。
使用消息隊列來實現(xiàn)應(yīng)用和消費者服務(wù)的實例之間的通信信道。在消息隊列中的形式應(yīng)用帖請求,以及消費者的服務(wù)實例從隊列中接收消息并對其進行處理。這種方法使消費者的服務(wù)實例的同一池中從應(yīng)用程序的任何實例處理消息。圖 1 示出了該架構(gòu)。
圖1 - 使用消息隊列分發(fā)工作提高到一個服務(wù)的實例
該解決方案具有以下優(yōu)點:
在決定如何實現(xiàn)這個模式時,請考慮以下幾點:
注意
微軟 Azure 服務(wù)總線隊列可以通過使用消息會先入先出消息的順序工具保證。欲了解更多信息,請參閱消息傳遞模式 MSDN 上使用會話。
注意
如果您正在使用 Azure 的工作進程可能能夠通過使用專用的郵件回復(fù)隊列回傳結(jié)果的應(yīng)用程序邏輯。應(yīng)用邏輯必須能夠?qū)⑦@些結(jié)果與原來的消息關(guān)聯(lián)起來。這種情況下進行了更詳細的異步消息的引物進行說明。
使用這種模式時:
這種模式可能不適合時:
有些郵件系統(tǒng)支持會話,使生產(chǎn)者對消息進行分組在一起,并確保它們都被同一個接收者處理。這個機制可以與優(yōu)先消息使用(如果它們支持)來實現(xiàn)消息排序的一種形式,在順序從生產(chǎn)者傳送消息到單個消費者。
Azure 提供存儲隊列和服務(wù)總線隊列,可作為一個合適的機制來實現(xiàn)這種模式。應(yīng)用邏輯可以發(fā)布消息到一個隊列,而消費者實現(xiàn)為在一個或多個角色的任務(wù)可以檢索從這個隊列中的消息并進行處理。對于彈性,一個服務(wù)總線隊列使得消費者使用 PeekLock 模式,當(dāng)它從隊列檢索消息。這種模式實際上不是刪除消息,而只是從其他消費者隱藏它。當(dāng)處理完它原來的用戶可以刪除該郵件。如果消費者要失敗,偷看鎖將超時,消息將再次變得可見,讓消費者又找回它。
有關(guān)使用 Azure 的服務(wù)總線隊列的詳細信息,請參閱服務(wù)總線隊列,主題和 MSDN 上的訂閱。有關(guān)使用 Azure 存儲隊列的信息,請參閱如何 MSDN 上使用隊列存儲服務(wù)。
從可供下載的例子 CompetingConsumers 解決方案的 QueueManager 類下面的代碼顯示了本指南說明了如何通過在網(wǎng)絡(luò)或輔助角色開始的事件處理程序使用 QueueClient 實例中創(chuàng)建一個隊列。
private string queueName = ...;
private string connectionString = ...;
...
?
public async Task Start()
{
// Check if the queue already exists.
var manager = NamespaceManager.CreateFromConnectionString(this.connectionString);
if (!manager.QueueExists(this.queueName))
{
var queueDescription = new QueueDescription(this.queueName);
?
// Set the maximum delivery count for messages in the queue. A message
// is automatically dead-lettered after this number of deliveries. The
// default value for dead letter count is 10.
queueDescription.MaxDeliveryCount = 3;
?
await manager.CreateQueueAsync(queueDescription);
}
...
?
// Create the queue client. By default the PeekLock method is used.
this.client = QueueClient.CreateFromConnectionString(
this.connectionString, this.queueName);
}
下面的代碼片段顯示了一個應(yīng)用程序如何創(chuàng)建和發(fā)送一批消息隊列。
public async Task SendMessagesAsync()
{
// Simulate sending a batch of messages to the queue.
var messages = new List<BrokeredMessage>();
?
for (int i = 0; i < 10; i++)
{
var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() };
messages.Add(message);
}
await this.client.SendBatchAsync(messages);
}
下面的代碼顯示了如何消費服務(wù)實例可以從隊列中下一個事件驅(qū)動的方式接收消息。該 processMessageTask 參數(shù)的 ReceiveMessages 法為代表,它引用在收到消息時運行的代碼。此代碼是異步運行。
private ManualResetEvent pauseProcessingEvent;
...
?
public void ReceiveMessages(Func<BrokeredMessage, Task> processMessageTask)
{
// Set up the options for the message pump.
var options = new OnMessageOptions();
?
// When AutoComplete is disabled it is necessary to manually
// complete or abandon the messages and handle any errors.
options.AutoComplete = false;
options.MaxConcurrentCalls = 10;
options.ExceptionReceived += this.OptionsOnExceptionReceived;
?
// Use of the Service Bus OnMessage message pump.
// The OnMessage method must be called once, otherwise an exception will occur.
this.client.OnMessageAsync(
async (msg) =>
{
// Will block the current thread if Stop is called.
this.pauseProcessingEvent.WaitOne();
?
// Execute processing task here.
await processMessageTask(msg);
},
options);
}
...
?
private void OptionsOnExceptionReceived(object sender,
ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
...
}
需要注意的是自動縮放的功能,例如可在天青,可用于啟動和停止的角色實例的隊列長度的波動。欲了解更多信息,請參閱自動縮放指導(dǎo)。另外,沒有必要維持角色實例和工人之間的一對一的對應(yīng)過程,單個角色實例可以實現(xiàn)多個工作進程。欲了解更多信息,請參閱計算資源整合模式。
更多建議: