作者 Boris Lublinsky ,譯者 王麗娟
AWS提供兩種服務——Amazon簡單通知服務(Simple Notification Service)和Amazon簡單隊列服務(Simple Queue Service),兩者結合起來可以為完整的發(fā)布/訂閱服務提供支撐。
Amazon簡單通知服務(Amazon SNS)是一個Web服務,能讓應用、最終用戶和設備立即從云端發(fā)送和接收通知。簡化的SNS架構如下圖所示(圖1):
圖1:Amazon SNS的基礎架構
多個發(fā)布應用和多個訂閱應用可以將SNS主題作為中介互相通訊。這樣實現(xiàn)的優(yōu)點是發(fā)布者和訂閱者不需要知道對方,因此,應用可以完全動態(tài)地進行集成。SNS支持用多種傳輸協(xié)議傳遞通知,包括HTTP、HTTPS、Email、SMS和Amazon簡單隊列(Simple Queue)。
Amazon簡單隊列服務(Amazon SQS)提供可靠、可伸縮的托管隊列,用來存儲計算機之間傳輸?shù)南?。使用Amazon SQS,你可以在執(zhí)行不同任務的應用分布式組件之間移動數(shù)據(jù),而不會丟失消息,也不必要求每個組件始終都是可用的。SQS和SNS結合起來會帶來兩個額外的優(yōu)勢——解除時間上的耦合度,根據(jù)消費應用特定的情況提供負載均衡——這是SNS無法單獨提供的。要做到第二個附加優(yōu)勢,需要同一個應用的多個實例從同一個隊列里讀取消息。下圖展示了SNS和SQS結合的總體架構(圖2)。其中的一個訂閱應用顯示為負載均衡的。
圖2:結合SNS和SQS
這個實現(xiàn)的主要缺點是,發(fā)布者和訂閱者需要明確統(tǒng)一SNS主題的名稱。此外,如果一個特定的消費者想從多個主題獲取信息,那他需要把隊列注冊到多個主題上。
這個問題的典型解決方案是采用基于樹的主題組織,大部分發(fā)布/訂閱引擎都是這樣實現(xiàn)的。OASIS規(guī)范的Web Services Topics 1.3概述了這種組織的主要原則。
這個規(guī)范將主題定義為:
“……主題是一組通知的組織和分類方式。主題機制為訂閱者推斷出感興趣的通知提供了便捷的方式……發(fā)布者可以將通知發(fā)布和一或多個主題關聯(lián)起來。當訂閱者創(chuàng)建訂閱的時候,可以提供一個主題的過濾器表達式,將訂閱和一或多個主題關聯(lián)起來……每個主題都可以有零或多個子主題,子主題本身也可以進一步包含子主題。沒有‘父親’的主題叫根主題。特定的根主題和它所有的后代會形成一個層次結構(稱為主題樹)?!?/p>
下面是手機銷售的一個主題樹例子(圖3)。
圖3:主題樹示例
主題樹的根表示銷售。銷售可以按區(qū)域細分(在我們的例子中有北美、歐洲和亞太地區(qū))。特定區(qū)域的銷售還可以按照手機類型進一步細分,依此類推。
在發(fā)布/訂閱系統(tǒng)中,這樣的結構之所以重要是因為樹反映了數(shù)據(jù)的組織。如果消費者對北美的智能手機銷售感興趣,他可以監(jiān)聽這個特定的主題。如果他對北美所有的銷售都感興趣,那他就可以監(jiān)聽北美的主題,從子主題獲取所有的通知。
當然,這種方法并不能解決所有的問題。比如說,如果消費者想監(jiān)聽所有智能手機銷售的事件,他就需要明確訂閱所有地區(qū)的智能手機銷售事件。這種情況通常是主題樹設計的問題。樹的設計基于信息的組織和典型的使用模式。在某些情況下,會設計多個主題來滿足不同的內部需求(參見Web Services Topics 1.3里的主題命名空間)。發(fā)布/訂閱架構的另一個重要特性就是基于內容的消息過濾:
“在基于內容的系統(tǒng)中,如果消息的屬性或內容與訂閱者定義的約束相匹配,消息就只會傳遞給這個訂閱者。訂閱者負責消息的分類。”
換句話說,訂閱者在這種情況下可以使用正則表達式列表,明確指定他們感興趣的消息內容。
把這種過濾和結構化的主題結構結合起來,可以創(chuàng)建出非常靈活和強大的發(fā)布/訂閱實現(xiàn)。
我們將在本文中展示如何用AWS組件輕松構建這類系統(tǒng)。
建議給大家的架構如下圖所示(圖4)。在這個架構中,發(fā)布/訂閱服務器的實現(xiàn)是一個Tomcat容器里運行的Web應用。我們還充分利用了AWS的彈性負載均衡器(Elastic Load Balancer),它可以根據(jù)當前的負載動態(tài)擴展或縮減發(fā)布/訂閱服務器集群的大小。此外,架構還用關系型數(shù)據(jù)服務(Relational Data Service)存儲當前的配置,以便動態(tài)新增發(fā)布/訂閱實例。為了提高整體性能,我們在內存里保留了當前的拓撲結構,盡量減少數(shù)據(jù)庫訪問的次數(shù)。這樣的話,實際的消息路由會非常迅速。這個解決方案需要一種機制,能在拓撲結構發(fā)生變化的時候去通知所有的服務器(因為任何服務器都能處理負載均衡器)。Amazon SNS能輕而易舉地做到這一點。最后,我們用Amazon SQS將通知分發(fā)給消費者。需要注意的是,一個消費者可以監(jiān)聽多個隊列。
圖4:整體架構建議
這個實現(xiàn)的核心是一個自定義的發(fā)布/訂閱服務器。服務器實現(xiàn)包括三個主要的層——持久化、域和服務。
服務器持久化層采用JPA 2.0實現(xiàn),定義了三個主要的實體——主題、訂閱和語義過濾器。
主題實體(清單1)描述了特定主題要存儲的相關信息,包括主題ID(數(shù)據(jù)庫的內部ID)、主題名稱(標識主題的字符串)、一個布爾變量(定義該主題是否是個根主題)、到父主題和孩子主題的引用(以便對主題層次結構進行遍歷),以及與給定主題關聯(lián)的訂閱列表。
@Entity
@NamedQueries({
??? @NamedQuery(name="Topic.RootTopics",
??????????????????? query="SELECT t FROM Topic t where t.root='true'"),
??? @NamedQuery(name="Topic.AllTopics",
???????????????????? query="SELECT t FROM Topic t")
})
@Table(name = "Topic")
public class Topic {
?@Id @GeneratedValue(strategy=GenerationType.IDENTITY)
?private long id;?? ?// 自動生成的ID
?@Column(name = "name",nullable = false, length = 32)
?private String name;?? ??? ??? ??? ??????// 主題名稱
??
?@Column(name = "root",nullable = false)
?private Boolean root = false;?? ??? ???// 根主題標識?? ?
?@ManyToOne(fetch=FetchType.LAZY)
?@JoinColumn(name="TOPIC_ID")
?private Topic parent;
?@OneToMany(mappedBy="parent",cascade=CascadeType.ALL,orphanRemoval=true)
?private List<Topic> children;
?
?@OneToMany(mappedBy="topic",cascade=CascadeType.ALL,orphanRemoval=true)
?private List<Subscription> subscriptions;
清單1:主題實體
我們定義了兩個命名的查詢,用來訪問主題:RootTopics獲取從根開始的主題結構,AllTopics獲取所有現(xiàn)有的主題。
這個實體提供了一個完整的主題定義,也可以支持多個主題樹(而不是實現(xiàn)示例的一部分)。
訂閱實體(清單2)描述了訂閱相關的信息,包括訂閱ID(數(shù)據(jù)庫的內部ID)、隊列名稱(SQS隊列的ARN,ARN即Amazon Resource Name)、對訂閱關聯(lián)主題的引用,還有一個語義過濾器列表。只有所有的過濾器都接受消息(見下文),通知才會分發(fā)給給定的隊列(客戶端)。如果通知不包含語義過濾器,那來自于關聯(lián)主題的所有消息都會直接傳遞給隊列。
@Entity
@NamedQueries({
??@NamedQuery(name="Subscription.AllSubscriptions",
??????????????????? query="SELECT s FROM Subscription s")
})
@Table(name = "Subscription")
public class Subscription {
????@Id @GeneratedValue(strategy=GenerationType.IDENTITY)
????private long id;?? ?// 自動生成的ID
????@Column(name = "queue",nullable = false, length = 128)
????private String queue;
?
????@ManyToOne(fetch=FetchType.LAZY)
????@JoinColumn(name="TOPIC_ID")
????private Topic topic;
?? ?
????@OneToMany(mappedBy="subscription",
????? ????????????cascade=CascadeType.ALL,orphanRemoval=true)
????private List<SemanticFilter> filters;?
????……………………………………………………………
清單2:訂閱實體
我們還定義了一個命名的查詢,獲得所有存在的訂閱。
最后,語義過濾器實體(清單3)描述了特定語義過濾器的信息,包括語義過濾器ID(數(shù)據(jù)庫的內部ID)、該語義過濾器測試的屬性名稱、使用的正則表達式,以及對語義過濾器關聯(lián)訂閱的引用。
@Entity
@NamedQueries({
??@NamedQuery(name="SemanticFilter.AllSemanticFilters",
??????????????????? query="SELECT sf FROM SemanticFilter sf")
})
@Table(name = "Filter")
public class SemanticFilter {
????@Id @GeneratedValue(strategy=GenerationType.IDENTITY)
????private long id;?? ?// 自動生成的ID
?? ?
????@Column(name = "attribute",nullable = false, length = 32)
????private String attribute;?? ??? ??? ??? ????// 屬性名稱
????@Column(name = "filter",nullable = false, length = 128)
????private String filter;?? ??? ??? ??? ?????// 正則表達式過濾器
????@ManyToOne(fetch=FetchType.LAZY)
????@JoinColumn(name="SUBSCRIPTION_ID")
????private Subscription subscription;
????……………………………………………………………
清單3:語義過濾器實體
我們一樣定義一個命名的查詢,用來獲取所有現(xiàn)有的語義過濾器。
除了實體,持久化層還包含一個持久化管理類,負責:
管理數(shù)據(jù)庫訪問和事務
從數(shù)據(jù)庫讀取、寫入對象
對域對象(見下文)和持久化實體進行相互轉換
發(fā)送拓撲結構變化的通知
域模型對象的主要職責是支持服務操作,包括數(shù)據(jù)的訂閱和發(fā)布,并把通知真正發(fā)布到訂閱的隊列上。在這個簡單的實現(xiàn)里,域模型和持久化模型是合在一起的,但為了闡述得更清楚,我們分開介紹。這兩層的數(shù)據(jù)模型是一樣的,但域對象會多一些明確支持發(fā)布/訂閱實現(xiàn)的方法。
過濾器處理的實現(xiàn)(清單4)利用了Java String里對正則表達式處理的內置支持。
?public boolean accept(String value){
?? ??? if(value == null)
?? ??? ??? ??return false;
?? ??? return value.matches(_pattern);
?}
清單4:過濾器處理方法
發(fā)布實現(xiàn)(清單5)是訂閱類的一個方法。請注意,這個方法對語義過濾器進行了或操作。如果給定的客戶端能有多個訂閱,或者對訂閱實現(xiàn)進行擴展、讓它支持Boolean函數(shù),那就可以突破這個限制了。
public void publish(Map<String, String> attributes, String message){
??
????if((_filters != null) && (_filters.size() > 0)){
????????for(DomainSemanticFilter f : _filters){
????????????String av = attributes.get(f.getField());
????????????if(av == null)
????????????????return;
????????????if(!f.accept(av))
????????????????return;
????????}
????}
????SQSPublisher.getPublisher().sendMessage(_queue, message);
}
清單5:發(fā)布實現(xiàn)
這個實現(xiàn)利用了基于現(xiàn)有AWS Java API的SQSPublisher類(清單6)。
import java.io.IOException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.DeleteQueueRequest;
import com.amazonaws.services.sqs.model.SendMessageRequest;
public class SQSPublisher {
?private static SQSPublisher _publisher;
??
?private AmazonSQSClient _sqs;?? ?
?? ?
?private SQSPublisher()throws IOException {
?? ??? ???AWSCredentials credentials = new PropertiesCredentials(
?? ??? ??? ??? ?this.getClass().getClassLoader().
getResourceAsStream("AwsCredentials.properties"));
?? ??? ???_sqs = new AmazonSQSClient(credentials);
}
?public String createQueue(String name){
?? ??? ??CreateQueueRequest request = new CreateQueueRequest(name);
????? ??? return _sqs.createQueue(request).getQueueUrl();
?}
?public void sendMessage(String queueURL, String message){
?? ??? ??SendMessageRequest request = new SendMessageRequest(queueURL,
message);
?????? ?? _sqs.sendMessage(request);
?}
?????public void deleteQueue(String queueURL){
?? ??? ??DeleteQueueRequest request = new DeleteQueueRequest(queueURL);
?????? ?? _sqs.deleteQueue(request);
?}
?public static synchronized SQSPublisher getPublisher(){
?? ?????if(_publisher == null)
?? ??? ??? ?????try {
?? ??? ??? ??? ????????_publisher = new SQSPublisher();
?? ??? ??? ?????}catch (IOException e) {
?? ??? ??? ??? ???????e.printStackTrace();
?? ??? ??? ?????}
?? ??? ??return _publisher;
?}
}
清單6:SQS發(fā)布者
訂閱者可以利用這個類的其他方法創(chuàng)建/銷毀SQS隊列。
除了SQS隊列,我們的實現(xiàn)還利用SNS進行數(shù)據(jù)庫變化的同步。與SNS的交互由SNSPubSub類實現(xiàn)(清單7),這個實現(xiàn)也利用了AWS SNS Java API。
import java.io.IOException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.SubscribeRequest;
import com.amazonaws.services.sns.model.SubscribeResult;
import com.amazonaws.services.sns.model.UnsubscribeRequest;
public class SNSPubSub {
? private static SNSPubSub _topicPublisher;
? private static String _topicARN;
? private static String _endpoint;
?? ?
? private AmazonSNSClient _sns;
? private String _protocol = "http";
? private String _subscriptionARN;
?? ?
? private SNSPubSub()throws IOException {
?? ??? ???AWSCredentials credentials = new PropertiesCredentials(
?? ??? ??? ??? ?this.getClass().getClassLoader().
getResourceAsStream("AwsCredentials.properties"));
?? ??? ???_sns = new AmazonSNSClient(credentials);
? }
? public void publish(String message){
?? ??? ???PublishRequest request = new PublishRequest(_topicARN, message);
?? ??? ???_sns.publish(request);
? }
??
? public void subscribe(){
?? ??? ???SubscribeRequest request = new SubscribeRequest
(_topicARN, _protocol, _endpoint);
?? ??? ???_sns.subscribe(request);
? }
?? ?
? public void confirmSubscription(String token){
?? ??? ???ConfirmSubscriptionRequest request = new
?ConfirmSubscriptionRequest(_topicARN, token);
?? ??? ???ConfirmSubscriptionResult result = _sns
.confirmSubscription(request);
?? ??? ???_subscriptionARN = result.getSubscriptionArn();
? }
?? ?
? public void unSubscribe(){
?? ??? ???if(_subscribed){
?? ??? ??? ??????UnsubscribeRequest request = new UnsubscribeRequest(_subscriptionARN);
?? ??? ??????????_sns
.unsubscribe(request);
?? ??? ???}
? }
?? ?
? public static void configureSNS(String topicARN, String endpoint){
?? ??? ????_topicARN = topicARN;
?? ???????_endpoint = endpoint;
? }
?? ???
? public static synchronized SNSPubSub getSNS(){
?? ??? ????if(_topicPublisher == null){
?? ??? ??? ???????try{
?? ??? ??? ?????????????_topicPublisher = new SNSPubSub();
?? ??? ??? ???????}
?? ??? ??? ???????catch(Exception e){
?? ??? ??? ?????????????e.printStackTrace();
?? ??? ??? ???????}
?? ??? ????}
?? ??? ????return _topicPublisher;
? }
}
清單7:SNS Pub/Sub
使用SNS
使用SNS的時候要謹記:訂閱主題并不意味著你已經(jīng)準備好監(jiān)聽主題。SNS訂閱的過程包含兩個步驟。向SNS發(fā)送訂閱請求時,SNS返回的響應表明確認訂閱的必要性。這正是清單8既有subscribe方法又有confirmSubscription方法的原因。
<xsd:complextype name="NotificationType">
??<xsd:sequence>
????<xsd:element name="Type" type="xsd:string" />
????<xsd:element name="MessageId" type="xsd:string" />
????<xsd:element name="Token" type="xsd:string" minoccurs="0" />
????<xsd:element name="TopicArn" type="xsd:string" />
????<xsd:element name="Message" type="xsd:string" />
????<xsd:element name="SubscribeURL" type="xsd:string" minoccurs="0" />
????<xsd:element name="Timestamp" type="xsd:string" />
????<xsd:element name="SignatureVersion" type="xsd:string" />
????<xsd:element name="Signature" type="xsd:string" />
????<xsd:element name="SigningCertURL" type="xsd:string" />
????<xsd:element name="UnsubscribeURL" type="xsd:string" minoccurs="0" />
??</xsd:sequence>
</xsd:complextype>
上面的Schema描述了兩種消息類型——確認請求和實際的通知。兩種類型通過Type元素進行區(qū)分。如果元素值是“SubscriptionConfirmation”,那它就是訂閱確認的請求,如果是“Notification”,就表明是個真正的通知。
主題類實現(xiàn)了兩個方法(清單8),以便支持發(fā)布。
public void publish(Map<String, String> attributes, String message){
????
????if(_subscriptions == null)
????????return;
????for(DomainSubscription ds : _subscriptions)
????????ds.publish(attributes, message);
}
public void processPublications(List<DomainTopic> tList, StringTokenizer st) throws PublicationException{
????
????tList.add(this);
????if(!st.hasMoreTokens())
????????return;
????String topic = st.nextToken();
????for(DomainTopic dt : _children){
????????if(topic.equalsIgnoreCase(dt.getName())){
????????????dt.processPublications(tList, st);
????????????return;
????????}
????}
????throw new PublicationException("Subtopic " + topic + " is not found in topic " + _name);
}
清單8:主題對發(fā)布的支持
processPublications方法創(chuàng)建了一個主題列表,這些主題與給定的消息相關聯(lián)。這個方法有一個標記過的主題樹字符串,如果標記和主題名稱相對應,就會把當前的主題添加到列表中。主題的publish方法維護一個消息屬性的映射,對主題相關的每個訂閱來說,publish方法還會嘗試著去發(fā)布一條消息。
上面的方法都由Domain管理器類的publish方法調用(清單9)。這個方法首先標記主題字符串,然后用processPublications方法創(chuàng)建一個訂閱者感興趣的主題列表。列表一旦被創(chuàng)建好,就會構建一個消息屬性的映射(我們假設是一個XML消息),并把這個映射發(fā)布給列表里的所有主題。
???public void publish (String topic, String message){
?? ?? StringTokenizer st = new StringTokenizer(topic, ".");
?? ?? List<DomainTopic> topics = new LinkedList<Domaintopic>();
?? ?? DomainTopic root = PersistenceManager.getPersistenceManager().getRoot();
?? ?? try {
?? ??? ??? ???if(!st.hasMoreTokens())
?? ??? ??? ??? ????return;
?? ??? ??? ???String t = st.nextToken();
?? ??? ??? ???if(!t.equalsIgnoreCase(root.getName()))
?? ??? ??? ??? ????throw new PublicationException("Unrecognized subtopic name " + topic);
?? ??? ??? ???root.processPublications(topics, st);
?? ?? }catch (PublicationException e) {
?? ??? ??? ???e.printStackTrace();
?? ??? ??? ???return;
?? ?? }
?? ?? MessageType msg = null;
?? ?? try {
?? ??? ??? ???JAXBElement<MessageType> msgEl = (JAXBElement<MessageType>)
?? ??? ??? ??? ????_unmarshaller.unmarshal(new ByteArrayInputStream(message.getBytes()));
?? ??? ??? ???msg = msgEl.getValue();
?? ?? } catch (JAXBException e) {
?? ??? ??? ???e.printStackTrace();
?? ??? ??? ???return;
?? ?? }
?? ?? Map<String, String> attributes = new HashMap<String, String>();
?? ?? MessageEnvelopeType envelope = msg.getEnvelope();
?? ?? if(envelope != null){
?? ??? ??? ???for(MessageAttributeType attribute : envelope.getAttribute()){
?? ??? ??? ??? ????attributes.put(attribute.getName(), attribute.getValue());
?? ??? ??? ???}
?? ?? }
?? ?? for(DomainTopic t : topics)
?? ??? ??? ???t.publish(attributes, message);
}
清單9:發(fā)布方法實現(xiàn)
我們用一組REST服務對發(fā)布/訂閱功能進行訪問(清單10)。
@Path("/")
public class PubSubServiceImplementation {
? // 功能方法
? @POST
? @Path("publish")
? @Consumes("application/text")
? public void publish (@QueryParam("topic")String topic, String message) throws PublicationException{
?? ??? ???DomainManager.getDomainManager().publish(topic, message);
? }
? @GET
? @Path("publish")
? public void publishGet (@QueryParam("topic")String topic, @QueryParam("message")String message)? throws
publicationException{
?? ??? ???DomainManager.getDomainManager().publish(topic, message);
? }
? @POST
? @Path("synch")
? @Consumes("text/plain")
? public void getSynchNotification (Object message){
?? ??? ???PersistenceManager.setUpdated();
? }
? // 配置方法
? @GET
? @Path("root")
? @Produces("application/json")
? public TopicType getRoot()throws PublicationException {
?? ??? ???return DomainManager.getDomainManager().getRoot();
? }
? @GET
? @Path("filters")
? @Produces("application/json")
? public FiltersType getFilters() throws PublicationException {
?? ??? ???return DomainManager.getDomainManager().getFilters();
? }
? @POST
? @Path("filter")
? @Consumes("application/json")
? public long addFilter(FilterType filter) throws PublicationException {
?? ??? ???return DomainManager.getDomainManager().addFilter(filter);
? }
? @DELETE
? @Path("filter/{id}")
? public void deleteFilter(@PathParam("id")long id) throws PublicationException {
?? ??? ???DomainManager.getDomainManager().removeFilter(id);
? }
? @GET
? @Path("subscriptions")
? @Produces("application/json")
? public SubscriptionsType getSubscriptions() throws PublicationException {
?? ??? ???return DomainManager.getDomainManager().getSubscriptions();
? }
? @POST
? @Path("subscription")
? @Consumes("application/json")
? public long addSubscription(SubscriptionType s) throws PublicationException {
?? ??? ???return DomainManager.getDomainManager().addSubscription(s, null);
? }
? @DELETE
? @Path("subscription/{id}")
? public void deleteSubscription(@PathParam("id")long id) throws PublicationException {
?? ??? ???DomainManager.getDomainManager().removeSubscription(id);
? }
? @POST
? @Path("subscriptionFilters/{sid}")
? @Consumes("application/json")
? public long assignFilersToSubscription(@PathParam("sid")long sid, IDsType ids)throws PublicationException{
?? ??? ???return DomainManager.getDomainManager().assignFilersToSubscription(sid, ids);
? }?? ?
? @POST
? @Path("topic")
? @Consumes("application/json")
? public long addTopic(TopicType t) throws PublicationException {
?? ??? ???return DomainManager.getDomainManager().addTopic(t, null);
? }
? @DELETE
? @Path("topic/{id}")
? public void deleteTopic(@PathParam("id")long id) throws PublicationException {
?? ??? ???DomainManager.getDomainManager().removeTopic(id);
? }
? @POST
? @Path("topicsubscription/{tid}")
? @Consumes("application/json")
? public void assignTopicHierarchy(@PathParam("tid")long tid, IDsType ids) throws PublicationException{
?? ??? ???DomainManager.getDomainManager().assignTopicHierarchy(tid, ids);
? }
? @POST
? @Path("topicsubscription/{tid}")
? @Consumes("application/json")
? public long assignTopicSubscriptions(@PathParam("tid")long tid, IDsType ids)throws PublicationException{
?? ??? ???return DomainManager.getDomainManager().assignTopicSubscriptions(tid, ids);
? }
清單10:發(fā)布/訂閱服務
這些服務的使用者有消息發(fā)布者(publish方法)、服務訂閱者(創(chuàng)建/刪除語義過濾器,訂閱,還有訂閱和主題訂閱相關的過濾器)、內部的發(fā)布/訂閱實現(xiàn)(獲取同步的服務)和管理應用。
這個實現(xiàn)雖然簡單,但創(chuàng)建了一個非常強大、可擴展的發(fā)布/訂閱實現(xiàn),同時利用了很多現(xiàn)有的AWS功能和少量的Java定制代碼。另外它還充分利用了現(xiàn)有AWS部署功能對負載均衡和容錯的支持。
Boris Lublinsky博士是Nokia的主要架構師,參與大數(shù)據(jù)、SOA、BPM和中間件實現(xiàn)的相關工作。Boris去Nokia前是Herzum軟件的主要架構師,負責為客戶設計大型、可伸縮的SOA系統(tǒng);在此之前,他是CNA保險的企業(yè)架構師,參與CNA集成和SOA策略的設計及實現(xiàn),構建應用框架,實現(xiàn)面向服務的架構。Boris在企業(yè)技術架構和軟件工程方面有二十五年多的經(jīng)驗。他是OASIS SOA RM委員會的活躍成員,和他人一起編著了《Applied SOA: Service-Oriented Architecture and Design Strategies》一書,另外他還寫了很多關于架構、編程、大數(shù)據(jù)、SOA和BPM的文章。
查看英文原文:基于AWS技術實現(xiàn)發(fā)布/訂閱服務
更多建議: