基于AWS技術實現(xiàn)發(fā)布/訂閱服務

2018-02-24 16:04 更新

作者 Boris Lublinsky ,譯者 王麗娟

AWS提供兩種服務——Amazon簡單通知服務(Simple Notification Service)和Amazon簡單隊列服務(Simple Queue Service),兩者結合起來可以為完整的發(fā)布/訂閱服務提供支撐。

現(xiàn)有的AWS功能

Amazon簡單通知服務(Amazon SNS)是一個Web服務,能讓應用、最終用戶和設備立即從云端發(fā)送和接收通知。簡化的SNS架構如下圖所示(圖1):

圖1:Amazon SNS的基礎架構

多個發(fā)布應用和多個訂閱應用可以將SNS主題作為中介互相通訊。這樣實現(xiàn)的優(yōu)點是發(fā)布者和訂閱者不需要知道對方,因此,應用可以完全動態(tài)地進行集成。SNS支持用多種傳輸協(xié)議傳遞通知,包括HTTP、HTTPS、Email、SMS和Amazon簡單隊列(Simple Queue)。

Amazon簡單隊列服務(Amazon SQS)提供可靠、可伸縮的托管隊列,用來存儲計算機之間傳輸?shù)南?。使用Amazon SQS,你可以在執(zhí)行不同任務的應用分布式組件之間移動數(shù)據(jù),而不會丟失消息,也不必要求每個組件始終都是可用的。SQS和SNS結合起來會帶來兩個額外的優(yōu)勢——解除時間上的耦合度,根據(jù)消費應用特定的情況提供負載均衡——這是SNS無法單獨提供的。要做到第二個附加優(yōu)勢,需要同一個應用的多個實例從同一個隊列里讀取消息。下圖展示了SNS和SQS結合的總體架構(圖2)。其中的一個訂閱應用顯示為負載均衡的。

圖2:結合SNS和SQS

這個實現(xiàn)的主要缺點是,發(fā)布者和訂閱者需要明確統(tǒng)一SNS主題的名稱。此外,如果一個特定的消費者想從多個主題獲取信息,那他需要把隊列注冊到多個主題上。

期望中的發(fā)布/訂閱實現(xiàn)

這個問題的典型解決方案是采用基于樹的主題組織,大部分發(fā)布/訂閱引擎都是這樣實現(xiàn)的。OASIS規(guī)范的Web Services Topics 1.3概述了這種組織的主要原則。

這個規(guī)范將主題定義為:

“……主題是一組通知的組織和分類方式。主題機制為訂閱者推斷出感興趣的通知提供了便捷的方式……發(fā)布者可以將通知發(fā)布和一或多個主題關聯(lián)起來。當訂閱者創(chuàng)建訂閱的時候,可以提供一個主題的過濾器表達式,將訂閱和一或多個主題關聯(lián)起來……每個主題都可以有零或多個子主題,子主題本身也可以進一步包含子主題。沒有‘父親’的主題叫根主題。特定的根主題和它所有的后代會形成一個層次結構(稱為主題樹)?!?/p>

下面是手機銷售的一個主題樹例子(圖3)。

圖3:主題樹示例

主題樹的根表示銷售。銷售可以按區(qū)域細分(在我們的例子中有北美、歐洲和亞太地區(qū))。特定區(qū)域的銷售還可以按照手機類型進一步細分,依此類推。

在發(fā)布/訂閱系統(tǒng)中,這樣的結構之所以重要是因為樹反映了數(shù)據(jù)的組織。如果消費者對北美的智能手機銷售感興趣,他可以監(jiān)聽這個特定的主題。如果他對北美所有的銷售都感興趣,那他就可以監(jiān)聽北美的主題,從子主題獲取所有的通知。

當然,這種方法并不能解決所有的問題。比如說,如果消費者想監(jiān)聽所有智能手機銷售的事件,他就需要明確訂閱所有地區(qū)的智能手機銷售事件。這種情況通常是主題樹設計的問題。樹的設計基于信息的組織和典型的使用模式。在某些情況下,會設計多個主題來滿足不同的內部需求(參見Web Services Topics 1.3里的主題命名空間)。發(fā)布/訂閱架構的另一個重要特性就是基于內容的消息過濾

“在基于內容的系統(tǒng)中,如果消息的屬性或內容與訂閱者定義的約束相匹配,消息就只會傳遞給這個訂閱者。訂閱者負責消息的分類。”

換句話說,訂閱者在這種情況下可以使用正則表達式列表,明確指定他們感興趣的消息內容。

把這種過濾和結構化的主題結構結合起來,可以創(chuàng)建出非常靈活和強大的發(fā)布/訂閱實現(xiàn)。

我們將在本文中展示如何用AWS組件輕松構建這類系統(tǒng)。

發(fā)布/訂閱架構建議

建議給大家的架構如下圖所示(圖4)。在這個架構中,發(fā)布/訂閱服務器的實現(xiàn)是一個Tomcat容器里運行的Web應用。我們還充分利用了AWS的彈性負載均衡器(Elastic Load Balancer),它可以根據(jù)當前的負載動態(tài)擴展或縮減發(fā)布/訂閱服務器集群的大小。此外,架構還用關系型數(shù)據(jù)服務(Relational Data Service)存儲當前的配置,以便動態(tài)新增發(fā)布/訂閱實例。為了提高整體性能,我們在內存里保留了當前的拓撲結構,盡量減少數(shù)據(jù)庫訪問的次數(shù)。這樣的話,實際的消息路由會非常迅速。這個解決方案需要一種機制,能在拓撲結構發(fā)生變化的時候去通知所有的服務器(因為任何服務器都能處理負載均衡器)。Amazon SNS能輕而易舉地做到這一點。最后,我們用Amazon SQS將通知分發(fā)給消費者。需要注意的是,一個消費者可以監(jiān)聽多個隊列。

圖4:整體架構建議

發(fā)布/訂閱服務器

這個實現(xiàn)的核心是一個自定義的發(fā)布/訂閱服務器。服務器實現(xiàn)包括三個主要的層——持久化、域和服務。

持久化

服務器持久化層采用JPA 2.0實現(xiàn),定義了三個主要的實體——主題、訂閱和語義過濾器。

主題實體(清單1)描述了特定主題要存儲的相關信息,包括主題ID(數(shù)據(jù)庫的內部ID)、主題名稱(標識主題的字符串)、一個布爾變量(定義該主題是否是個根主題)、到父主題和孩子主題的引用(以便對主題層次結構進行遍歷),以及與給定主題關聯(lián)的訂閱列表。

@Entity

@NamedQueries({

??? @NamedQuery(name="Topic.RootTopics",

??????????????????? query="SELECT t FROM Topic t where t.root='true'"),

??? @NamedQuery(name="Topic.AllTopics",

  ???????????????????? query="SELECT t FROM Topic t")

})

@Table(name = "Topic")

public class Topic {

?@Id @GeneratedValue(strategy=GenerationType.IDENTITY)

?private long id;?? ?// 自動生成的ID

?@Column(name = "name",nullable = false, length = 32)

?private String name;?? ??? ??? ??? ??????// 主題名稱

??

?@Column(name = "root",nullable = false)

?private Boolean root = false;?? ??? ???// 根主題標識?? ?

?@ManyToOne(fetch=FetchType.LAZY)

?@JoinColumn(name="TOPIC_ID")

?private Topic parent;

?@OneToMany(mappedBy="parent",cascade=CascadeType.ALL,orphanRemoval=true)

?private List<Topic> children;

?

?@OneToMany(mappedBy="topic",cascade=CascadeType.ALL,orphanRemoval=true)

?private List<Subscription> subscriptions;

清單1:主題實體

我們定義了兩個命名的查詢,用來訪問主題:RootTopics獲取從根開始的主題結構,AllTopics獲取所有現(xiàn)有的主題。

這個實體提供了一個完整的主題定義,也可以支持多個主題樹(而不是實現(xiàn)示例的一部分)。

訂閱實體(清單2)描述了訂閱相關的信息,包括訂閱ID(數(shù)據(jù)庫的內部ID)、隊列名稱(SQS隊列的ARN,ARN即Amazon Resource Name)、對訂閱關聯(lián)主題的引用,還有一個語義過濾器列表。只有所有的過濾器都接受消息(見下文),通知才會分發(fā)給給定的隊列(客戶端)。如果通知不包含語義過濾器,那來自于關聯(lián)主題的所有消息都會直接傳遞給隊列。

@Entity

@NamedQueries({

??@NamedQuery(name="Subscription.AllSubscriptions",

  ??????????????????? query="SELECT s FROM Subscription s")

})

@Table(name = "Subscription")

public class Subscription {

????@Id @GeneratedValue(strategy=GenerationType.IDENTITY)

????private long id;?? ?// 自動生成的ID

????@Column(name = "queue",nullable = false, length = 128)

????private String queue;

?

????@ManyToOne(fetch=FetchType.LAZY)

????@JoinColumn(name="TOPIC_ID")

????private Topic topic;

?? ?

????@OneToMany(mappedBy="subscription",

?????  ????????????cascade=CascadeType.ALL,orphanRemoval=true)

????private List<SemanticFilter> filters;?

????……………………………………………………………

清單2:訂閱實體

我們還定義了一個命名的查詢,獲得所有存在的訂閱。

最后,語義過濾器實體(清單3)描述了特定語義過濾器的信息,包括語義過濾器ID(數(shù)據(jù)庫的內部ID)、該語義過濾器測試的屬性名稱、使用的正則表達式,以及對語義過濾器關聯(lián)訂閱的引用。

@Entity

@NamedQueries({

??@NamedQuery(name="SemanticFilter.AllSemanticFilters",

  ??????????????????? query="SELECT sf FROM SemanticFilter sf")

})

@Table(name = "Filter")

public class SemanticFilter {

????@Id @GeneratedValue(strategy=GenerationType.IDENTITY)

????private long id;?? ?// 自動生成的ID

?? ?

????@Column(name = "attribute",nullable = false, length = 32)

????private String attribute;?? ??? ??? ??? ????// 屬性名稱

????@Column(name = "filter",nullable = false, length = 128)

????private String filter;?? ??? ??? ??? ?????// 正則表達式過濾器

????@ManyToOne(fetch=FetchType.LAZY)

????@JoinColumn(name="SUBSCRIPTION_ID")

????private Subscription subscription;

????……………………………………………………………

清單3:語義過濾器實體

我們一樣定義一個命名的查詢,用來獲取所有現(xiàn)有的語義過濾器。

除了實體,持久化層還包含一個持久化管理類,負責:

管理數(shù)據(jù)庫訪問和事務

從數(shù)據(jù)庫讀取、寫入對象

對域對象(見下文)和持久化實體進行相互轉換

發(fā)送拓撲結構變化的通知

域模型

域模型對象的主要職責是支持服務操作,包括數(shù)據(jù)的訂閱和發(fā)布,并把通知真正發(fā)布到訂閱的隊列上。在這個簡單的實現(xiàn)里,域模型和持久化模型是合在一起的,但為了闡述得更清楚,我們分開介紹。這兩層的數(shù)據(jù)模型是一樣的,但域對象會多一些明確支持發(fā)布/訂閱實現(xiàn)的方法。

過濾器處理的實現(xiàn)(清單4)利用了Java String里對正則表達式處理的內置支持。

?public boolean accept(String value){

  ?? ??? if(value == null)

  ?? ??? ??? ??return false;

  ?? ??? return value.matches(_pattern);

?}

清單4:過濾器處理方法

發(fā)布實現(xiàn)(清單5)是訂閱類的一個方法。請注意,這個方法對語義過濾器進行了或操作。如果給定的客戶端能有多個訂閱,或者對訂閱實現(xiàn)進行擴展、讓它支持Boolean函數(shù),那就可以突破這個限制了。

public void publish(Map<String, String> attributes, String message){

??

????if((_filters != null) && (_filters.size() > 0)){

????????for(DomainSemanticFilter f : _filters){

????????????String av = attributes.get(f.getField());

????????????if(av == null)

????????????????return;

????????????if(!f.accept(av))

????????????????return;

????????}

????}

????SQSPublisher.getPublisher().sendMessage(_queue, message);

}

清單5:發(fā)布實現(xiàn)

這個實現(xiàn)利用了基于現(xiàn)有AWS Java API的SQSPublisher類(清單6)。

import java.io.IOException;

import com.amazonaws.auth.AWSCredentials;

import com.amazonaws.auth.PropertiesCredentials;

import com.amazonaws.services.sqs.AmazonSQSClient;

import com.amazonaws.services.sqs.model.CreateQueueRequest;

import com.amazonaws.services.sqs.model.DeleteQueueRequest;

import com.amazonaws.services.sqs.model.SendMessageRequest;

public class SQSPublisher {

  ?private static SQSPublisher _publisher;

  ??

  ?private AmazonSQSClient _sqs;?? ?

  ?? ?

  ?private SQSPublisher()throws IOException {

  ?? ??? ???AWSCredentials credentials = new PropertiesCredentials(

  ?? ??? ??? ??? ?this.getClass().getClassLoader().

getResourceAsStream("AwsCredentials.properties"));

  ?? ??? ???_sqs = new AmazonSQSClient(credentials);

   }

  ?public String createQueue(String name){

  ?? ??? ??CreateQueueRequest request = new CreateQueueRequest(name);

  ????? ??? return _sqs.createQueue(request).getQueueUrl();

  ?}

  ?public void sendMessage(String queueURL, String message){

  ?? ??? ??SendMessageRequest request = new SendMessageRequest(queueURL,

message);

  ?????? ?? _sqs.sendMessage(request);

  ?}

?????public void deleteQueue(String queueURL){

  ?? ??? ??DeleteQueueRequest request = new DeleteQueueRequest(queueURL);

  ?????? ?? _sqs.deleteQueue(request);

  ?}

  ?public static synchronized SQSPublisher getPublisher(){

  ?? ?????if(_publisher == null)

  ?? ??? ??? ?????try {

  ?? ??? ??? ??? ????????_publisher = new SQSPublisher();

  ?? ??? ??? ?????}catch (IOException e) {

  ?? ??? ??? ??? ???????e.printStackTrace();

  ?? ??? ??? ?????}

  ?? ??? ??return _publisher;

  ?}

}

清單6:SQS發(fā)布者

訂閱者可以利用這個類的其他方法創(chuàng)建/銷毀SQS隊列。

除了SQS隊列,我們的實現(xiàn)還利用SNS進行數(shù)據(jù)庫變化的同步。與SNS的交互由SNSPubSub類實現(xiàn)(清單7),這個實現(xiàn)也利用了AWS SNS Java API。

import java.io.IOException;

import com.amazonaws.auth.AWSCredentials;

import com.amazonaws.auth.PropertiesCredentials;

import com.amazonaws.services.sns.AmazonSNSClient;

import com.amazonaws.services.sns.model.PublishRequest;

import com.amazonaws.services.sns.model.SubscribeRequest;

import com.amazonaws.services.sns.model.SubscribeResult;

import com.amazonaws.services.sns.model.UnsubscribeRequest;

public class SNSPubSub {

  ? private static SNSPubSub _topicPublisher;

  ? private static String _topicARN;

  ? private static String _endpoint;

  ?? ?

  ? private AmazonSNSClient _sns;

  ? private String _protocol = "http";

  ? private String _subscriptionARN;

  ?? ?

  ? private SNSPubSub()throws IOException {

  ?? ??? ???AWSCredentials credentials = new PropertiesCredentials(

  ?? ??? ??? ??? ?this.getClass().getClassLoader().

getResourceAsStream("AwsCredentials.properties"));

  ?? ??? ???_sns = new AmazonSNSClient(credentials);

  ? }

  ? public void publish(String message){

  ?? ??? ???PublishRequest request = new PublishRequest(_topicARN, message);

  ?? ??? ???_sns.publish(request);

  ? }

  ??

  ? public void subscribe(){

  ?? ??? ???SubscribeRequest request = new SubscribeRequest

(_topicARN, _protocol, _endpoint);

  ?? ??? ???_sns.subscribe(request);

  ? }

  ?? ?

  ? public void confirmSubscription(String token){

  ?? ??? ???ConfirmSubscriptionRequest request = new

?ConfirmSubscriptionRequest(_topicARN, token);

  ?? ??? ???ConfirmSubscriptionResult result = _sns

.confirmSubscription(request);

  ?? ??? ???_subscriptionARN = result.getSubscriptionArn();

  ? }

?? ?

  ? public void unSubscribe(){

  ?? ??? ???if(_subscribed){

  ?? ??? ??? ??????UnsubscribeRequest request = new UnsubscribeRequest(_subscriptionARN);

  ?? ??? ??????????_sns

.unsubscribe(request);

  ?? ??? ???}

  ? }

  ?? ?

  ? public static void configureSNS(String topicARN, String endpoint){

  ?? ??? ????_topicARN = topicARN;

  ?? ???????_endpoint = endpoint;

  ? }

  ?? ???

  ? public static synchronized SNSPubSub getSNS(){

  ?? ??? ????if(_topicPublisher == null){

  ?? ??? ??? ???????try{

  ?? ??? ??? ?????????????_topicPublisher = new SNSPubSub();

  ?? ??? ??? ???????}

  ?? ??? ??? ???????catch(Exception e){

  ?? ??? ??? ?????????????e.printStackTrace();

  ?? ??? ??? ???????}

  ?? ??? ????}

  ?? ??? ????return _topicPublisher;

  ? }

}

清單7:SNS Pub/Sub

使用SNS

使用SNS的時候要謹記:訂閱主題并不意味著你已經(jīng)準備好監(jiān)聽主題。SNS訂閱的過程包含兩個步驟。向SNS發(fā)送訂閱請求時,SNS返回的響應表明確認訂閱的必要性。這正是清單8既有subscribe方法又有confirmSubscription方法的原因。

<xsd:complextype name="NotificationType">

??<xsd:sequence>

????<xsd:element name="Type" type="xsd:string" />

????<xsd:element name="MessageId" type="xsd:string" />

????<xsd:element name="Token" type="xsd:string" minoccurs="0" />

????<xsd:element name="TopicArn" type="xsd:string" />

????<xsd:element name="Message" type="xsd:string" />

????<xsd:element name="SubscribeURL" type="xsd:string" minoccurs="0" />

????<xsd:element name="Timestamp" type="xsd:string" />

????<xsd:element name="SignatureVersion" type="xsd:string" />

????<xsd:element name="Signature" type="xsd:string" />

????<xsd:element name="SigningCertURL" type="xsd:string" />

????<xsd:element name="UnsubscribeURL" type="xsd:string" minoccurs="0" />

??</xsd:sequence>

</xsd:complextype>

上面的Schema描述了兩種消息類型——確認請求和實際的通知。兩種類型通過Type元素進行區(qū)分。如果元素值是“SubscriptionConfirmation”,那它就是訂閱確認的請求,如果是“Notification”,就表明是個真正的通知。

主題類實現(xiàn)了兩個方法(清單8),以便支持發(fā)布。

public void publish(Map<String, String> attributes, String message){

????

????if(_subscriptions == null)

????????return;

????for(DomainSubscription ds : _subscriptions)

????????ds.publish(attributes, message);

}

public void processPublications(List<DomainTopic> tList, StringTokenizer st) throws PublicationException{

????

????tList.add(this);

????if(!st.hasMoreTokens())

????????return;

????String topic = st.nextToken();

????for(DomainTopic dt : _children){

????????if(topic.equalsIgnoreCase(dt.getName())){

????????????dt.processPublications(tList, st);

????????????return;

????????}

????}

????throw new PublicationException("Subtopic " + topic + " is not found in topic " + _name);

}

清單8:主題對發(fā)布的支持

processPublications方法創(chuàng)建了一個主題列表,這些主題與給定的消息相關聯(lián)。這個方法有一個標記過的主題樹字符串,如果標記和主題名稱相對應,就會把當前的主題添加到列表中。主題的publish方法維護一個消息屬性的映射,對主題相關的每個訂閱來說,publish方法還會嘗試著去發(fā)布一條消息。

上面的方法都由Domain管理器類的publish方法調用(清單9)。這個方法首先標記主題字符串,然后用processPublications方法創(chuàng)建一個訂閱者感興趣的主題列表。列表一旦被創(chuàng)建好,就會構建一個消息屬性的映射(我們假設是一個XML消息),并把這個映射發(fā)布給列表里的所有主題。

???public void publish (String topic, String message){

  ?? ?? StringTokenizer st = new StringTokenizer(topic, ".");

  ?? ?? List<DomainTopic> topics = new LinkedList<Domaintopic>(); 

  ?? ?? DomainTopic root = PersistenceManager.getPersistenceManager().getRoot();

  ?? ?? try {   

  ?? ??? ??? ???if(!st.hasMoreTokens())

  ?? ??? ??? ??? ????return;

  ?? ??? ??? ???String t = st.nextToken();   

  ?? ??? ??? ???if(!t.equalsIgnoreCase(root.getName()))

  ?? ??? ??? ??? ????throw new PublicationException("Unrecognized subtopic name " + topic);   

  ?? ??? ??? ???root.processPublications(topics, st);   

  ?? ?? }catch (PublicationException e) {   

  ?? ??? ??? ???e.printStackTrace();

  ?? ??? ??? ???return;

  ?? ?? }   

  ?? ?? MessageType msg = null;   

  ?? ?? try {   

  ?? ??? ??? ???JAXBElement<MessageType> msgEl = (JAXBElement<MessageType>)

  ?? ??? ??? ??? ????_unmarshaller.unmarshal(new ByteArrayInputStream(message.getBytes()));  

  ?? ??? ??? ???msg = msgEl.getValue();   

  ?? ?? } catch (JAXBException e) {   

  ?? ??? ??? ???e.printStackTrace();

  ?? ??? ??? ???return;

  ?? ?? }   

  ?? ?? Map<String, String> attributes = new HashMap<String, String>();

  ?? ?? MessageEnvelopeType envelope = msg.getEnvelope();

  ?? ?? if(envelope != null){

  ?? ??? ??? ???for(MessageAttributeType attribute : envelope.getAttribute()){

  ?? ??? ??? ??? ????attributes.put(attribute.getName(), attribute.getValue());

  ?? ??? ??? ???}

  ?? ?? }

  ?? ?? for(DomainTopic t : topics)

  ?? ??? ??? ???t.publish(attributes, message);

}

清單9:發(fā)布方法實現(xiàn)

服務模型

我們用一組REST服務對發(fā)布/訂閱功能進行訪問(清單10)。

@Path("/")

public class PubSubServiceImplementation {

  ? // 功能方法

  ? @POST

  ? @Path("publish")

  ? @Consumes("application/text")

  ? public void publish (@QueryParam("topic")String topic, String message) throws PublicationException{

  ?? ??? ???DomainManager.getDomainManager().publish(topic, message);

  ? }

  ? @GET

  ? @Path("publish")

  ? public void publishGet (@QueryParam("topic")String topic, @QueryParam("message")String message)? throws

publicationException{

  ?? ??? ???DomainManager.getDomainManager().publish(topic, message);

  ? }

  ? @POST

  ? @Path("synch")

  ? @Consumes("text/plain")

  ? public void getSynchNotification (Object message){

  ?? ??? ???PersistenceManager.setUpdated();

  ? }

  ? // 配置方法

  ? @GET

  ? @Path("root")

  ? @Produces("application/json")

  ? public TopicType getRoot()throws PublicationException {

  ?? ??? ???return DomainManager.getDomainManager().getRoot();

  ? }

  ? @GET

  ? @Path("filters")

  ? @Produces("application/json")

  ? public FiltersType getFilters() throws PublicationException {

  ?? ??? ???return DomainManager.getDomainManager().getFilters();

  ? }

  ? @POST

  ? @Path("filter")

  ? @Consumes("application/json")

  ? public long addFilter(FilterType filter) throws PublicationException {

  ?? ??? ???return DomainManager.getDomainManager().addFilter(filter);

  ? }

  ? @DELETE

  ? @Path("filter/{id}")

  ? public void deleteFilter(@PathParam("id")long id) throws PublicationException {

  ?? ??? ???DomainManager.getDomainManager().removeFilter(id);

  ? }

  ? @GET

  ? @Path("subscriptions")

  ? @Produces("application/json")

  ? public SubscriptionsType getSubscriptions() throws PublicationException {

  ?? ??? ???return DomainManager.getDomainManager().getSubscriptions();

  ? }

  ? @POST

  ? @Path("subscription")

  ? @Consumes("application/json")

  ? public long addSubscription(SubscriptionType s) throws PublicationException {

  ?? ??? ???return DomainManager.getDomainManager().addSubscription(s, null);

  ? }

  ? @DELETE

  ? @Path("subscription/{id}")

  ? public void deleteSubscription(@PathParam("id")long id) throws PublicationException {

  ?? ??? ???DomainManager.getDomainManager().removeSubscription(id);

  ? }

  ? @POST

  ? @Path("subscriptionFilters/{sid}")

  ? @Consumes("application/json")

  ? public long assignFilersToSubscription(@PathParam("sid")long sid, IDsType ids)throws PublicationException{

  ?? ??? ???return DomainManager.getDomainManager().assignFilersToSubscription(sid, ids);

  ? }?? ?

  ? @POST

  ? @Path("topic")

  ? @Consumes("application/json")

  ? public long addTopic(TopicType t) throws PublicationException {

  ?? ??? ???return DomainManager.getDomainManager().addTopic(t, null);

  ? }

  ? @DELETE

  ? @Path("topic/{id}")

  ? public void deleteTopic(@PathParam("id")long id) throws PublicationException {

  ?? ??? ???DomainManager.getDomainManager().removeTopic(id);

  ? }

  ? @POST

  ? @Path("topicsubscription/{tid}")

  ? @Consumes("application/json")

  ? public void assignTopicHierarchy(@PathParam("tid")long tid, IDsType ids) throws PublicationException{

  ?? ??? ???DomainManager.getDomainManager().assignTopicHierarchy(tid, ids);

  ? }

  ? @POST

  ? @Path("topicsubscription/{tid}")

  ? @Consumes("application/json")

  ? public long assignTopicSubscriptions(@PathParam("tid")long tid, IDsType ids)throws PublicationException{

  ?? ??? ???return DomainManager.getDomainManager().assignTopicSubscriptions(tid, ids);

  ? }

清單10:發(fā)布/訂閱服務

這些服務的使用者有消息發(fā)布者(publish方法)、服務訂閱者(創(chuàng)建/刪除語義過濾器,訂閱,還有訂閱和主題訂閱相關的過濾器)、內部的發(fā)布/訂閱實現(xiàn)(獲取同步的服務)和管理應用。

結論

這個實現(xiàn)雖然簡單,但創(chuàng)建了一個非常強大、可擴展的發(fā)布/訂閱實現(xiàn),同時利用了很多現(xiàn)有的AWS功能和少量的Java定制代碼。另外它還充分利用了現(xiàn)有AWS部署功能對負載均衡和容錯的支持。

作者簡介

Boris Lublinsky博士是Nokia的主要架構師,參與大數(shù)據(jù)、SOA、BPM和中間件實現(xiàn)的相關工作。Boris去Nokia前是Herzum軟件的主要架構師,負責為客戶設計大型、可伸縮的SOA系統(tǒng);在此之前,他是CNA保險的企業(yè)架構師,參與CNA集成和SOA策略的設計及實現(xiàn),構建應用框架,實現(xiàn)面向服務的架構。Boris在企業(yè)技術架構和軟件工程方面有二十五年多的經(jīng)驗。他是OASIS SOA RM委員會的活躍成員,和他人一起編著了《Applied SOA: Service-Oriented Architecture and Design Strategies》一書,另外他還寫了很多關于架構、編程、大數(shù)據(jù)、SOA和BPM的文章。

查看英文原文:基于AWS技術實現(xiàn)發(fā)布/訂閱服務

查看原文:基于AWS技術實現(xiàn)發(fā)布/訂閱服務

以上內容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號