App下載

關(guān)于Sentinel實(shí)現(xiàn)動(dòng)態(tài)配置的集群流控的詳細(xì)內(nèi)容

猿友 2021-07-21 10:36:12 瀏覽數(shù) (3492)
反饋

本篇文章將為您介紹了一下什么是集群流控,如何使用阿里的開源項(xiàng)目 Sentine l實(shí)現(xiàn)動(dòng)態(tài)配置的集群流控的詳細(xì)內(nèi)容,以下是詳情內(nèi)容,希望能對(duì)大家的學(xué)習(xí)有所幫助。

介紹

為什么要使用集群流控呢?

相對(duì)于單機(jī)流控而言,我們給每臺(tái)機(jī)器設(shè)置單機(jī)限流閾值,在理想情況下整個(gè)集群的限流閾值為機(jī)器數(shù)量??單機(jī)閾值。不過實(shí)際情況下流量到每臺(tái)機(jī)器可能會(huì)不均勻,會(huì)導(dǎo)致總量沒有到的情況下某些機(jī)器就開始限流。因此僅靠單機(jī)維度去限制的話會(huì)無(wú)法精確地限制總體流量。而集群流控可以精確地控制整個(gè)集群的調(diào)用總量,結(jié)合單機(jī)限流兜底,可以更好地發(fā)揮流量控制的效果。

基于單機(jī)流量不均的問題以及如何設(shè)置集群整體的QPS的問題,我們需要?jiǎng)?chuàng)建一種集群限流的模式,這時(shí)候我們很自然地就想到,可以找一個(gè) server 來專門統(tǒng)計(jì)總的調(diào)用量,其它的實(shí)例都與這臺(tái) server 通信來判斷是否可以調(diào)用。這就是最基礎(chǔ)的集群流控的方式。

原理

集群限流的原理很簡(jiǎn)單,和單機(jī)限流一樣,都需要對(duì) qps 等數(shù)據(jù)進(jìn)行統(tǒng)計(jì),區(qū)別就在于單機(jī)版是在每個(gè)實(shí)例中進(jìn)行統(tǒng)計(jì),而集群版是有一個(gè)專門的實(shí)例進(jìn)行統(tǒng)計(jì)。

這個(gè)專門的用來統(tǒng)計(jì)數(shù)據(jù)的稱為 Sentinel 的 token server,其他的實(shí)例作為 Sentinel 的 token client 會(huì)向 token server 去請(qǐng)求 token,如果能獲取到 token,則說明當(dāng)前的 qps 還未達(dá)到總的閾值,否則就說明已經(jīng)達(dá)到集群的總閾值,當(dāng)前實(shí)例需要被 block,如下圖所示:

20214984821292

和單機(jī)流控相比,集群流控中共有兩種身份:

  • Token Client:集群流控客戶端,用于向所屬 Token Server 通信請(qǐng)求 token。集群限流服務(wù)端會(huì)返回給客戶端結(jié)果,決定是否限流。
  • Token Server:即集群流控服務(wù)端,處理來自 Token Client 的請(qǐng)求,根據(jù)配置的集群規(guī)則判斷是否應(yīng)該發(fā)放 token(是否允許通過)。

而單機(jī)流控中只有一種身份,每個(gè) sentinel 都是一個(gè) token server。

注意,集群限流中的 token server 是單點(diǎn)的,一旦 token server 掛掉,那么集群限流就會(huì)退化成單機(jī)限流的模式。

Sentinel 集群流控支持限流規(guī)則和熱點(diǎn)規(guī)則兩種規(guī)則,并支持兩種形式的閾值計(jì)算方式:

  • 集群總體模式:即限制整個(gè)集群內(nèi)的某個(gè)資源的總體 qps 不超過此閾值。
  • 單機(jī)均攤模式:?jiǎn)螜C(jī)均攤模式下配置的閾值等同于單機(jī)能夠承受的限額,token server 會(huì)根據(jù)連接數(shù)來計(jì)算總的閾值(比如獨(dú)立模式下有 3 個(gè) client 連接到了 token server,然后配的單機(jī)均攤閾值為 10,則計(jì)算出的集群總量就為 30),按照計(jì)算出的總的閾值來進(jìn)行限制。這種方式根據(jù)當(dāng)前的連接數(shù)實(shí)時(shí)計(jì)算總的閾值,對(duì)于機(jī)器經(jīng)常進(jìn)行變更的環(huán)境非常適合。

部署方式

token server 有兩種部署方式:

一種是獨(dú)立部署,就是單獨(dú)啟動(dòng)一個(gè) token server 服務(wù)來處理 token client 的請(qǐng)求,如下圖所示:

20214984821293

如果獨(dú)立部署的 token server 服務(wù)掛掉的話,那其他的 token client 就會(huì)退化成本地流控的模式,也就是單機(jī)版的流控,所以這種方式的集群限流需要保證 token server 的高可用性。

一種是嵌入部署,即作為內(nèi)置的 token server 與服務(wù)在同一進(jìn)程中啟動(dòng)。在此模式下,集群中各個(gè)實(shí)例都是對(duì)等的,token server 和 client 可以隨時(shí)進(jìn)行轉(zhuǎn)變,如下圖所示:

20214984821294

嵌入式部署的模式中,如果 token server 服務(wù)掛掉的話,我們可以將另外一個(gè) token client 升級(jí)為token server來,當(dāng)然啦如果我們不想使用當(dāng)前的 token server 的話,也可以選擇另外一個(gè) token client 來承擔(dān)這個(gè)責(zé)任,并且將當(dāng)前 token server 切換為 token client。Sentinel 為我們提供了一個(gè) api 來進(jìn)行 token server 與 token client 的切換:

http://<ip>:<port>/setClusterMode?mode=<xxx>

其中 mode 為 0 代表 client,1 代表 server,-1 代表關(guān)閉。

PS:注意應(yīng)用端需要引入集群限流客戶端或服務(wù)端的相應(yīng)依賴。

集群限流控制臺(tái)

sentinel為用戶提供集群限流控制臺(tái)功能,能夠通過控制臺(tái)配置集群的限流規(guī)則以及配置集群的Server與Client。

集群限流客戶端

要想使用集群限流功能,必須引入集群限流 client 相關(guān)依賴:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-cluster-client-default</artifactId>
    <version>1.8.0</version>
</dependency>

集群限流服務(wù)端

要想使用集群限流服務(wù)端,必須引入集群限流 server 相關(guān)依賴:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-cluster-server-default</artifactId>
    <version>1.8.0</version>
</dependency>

我們結(jié)合server和client實(shí)現(xiàn)一個(gè)嵌入式模式。在pom中同時(shí)引入上面的兩個(gè)依賴,并配置sentinel控制臺(tái)地址,實(shí)現(xiàn)一個(gè)查詢訂單的接口。

pom

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-cluster-server-default</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-cluster-client-default</artifactId>
</dependency>

application.yml

server:
  port: 9091

spring:
  application:
    name: cloudalibaba-sentinel-clusterServer
  cloud:
    sentinel:
      transport:
        #配置sentinel dashboard地址
        dashboard: localhost:8080
        port: 8719 #默認(rèn)8719端口

OrderController

@RestController
public class OrderController {
    /**
     * 查詢訂單
     * @return
     */
    @GetMapping("/order/{id}")
    public CommonResult<Order> getOrder(@PathVariable("id") Long id){

        Order order = new Order(id, "212121");
        return CommonResult.success(order.toString());
    }
}

代碼示例如cloudalibaba-sentinel-cluster-embedded9091

修改VM options配置,啟動(dòng)三個(gè)不同端口的實(shí)例,即可。

-Dserver.port=9091 -Dproject.name=cloudalibaba-sentinel-clusterServer -Dcsp.sentinel.log.use.pid=true 
-Dserver.port=9092 -Dproject.name=cloudalibaba-sentinel-clusterServer -Dcsp.sentinel.log.use.pid=true 
-Dserver.port=9093 -Dproject.name=cloudalibaba-sentinel-clusterServer -Dcsp.sentinel.log.use.pid=true 

控制臺(tái)配置

登錄sentinel的控制臺(tái),并有訪問量后,我們就可以在 Sentinel上面看到集群流控,如下圖所示:

點(diǎn)擊添加Token Server。

20214984821295

從實(shí)例列表中選擇一個(gè)作為Server端,其他作為Client端,并選中到右側(cè)Client列表,配置token sever端的最大允許的QPS,用于對(duì) Token Server 的資源使用進(jìn)行限制,防止在嵌入模式下影響應(yīng)用本身。

20214984821296

配置完成之后的Token Server列表,如下圖所示

20214984821297

使用控制臺(tái)配置token Server、token Client以及限流規(guī)則,有很多的缺點(diǎn):

1、限流規(guī)則,不能持久化,應(yīng)用重啟之后,規(guī)則丟失。

2、token Server 、token Client配置也會(huì)丟失。

官方推薦給集群限流服務(wù)端注冊(cè)動(dòng)態(tài)配置源來動(dòng)態(tài)地進(jìn)行配置。我們使用nacos作為配置中心,動(dòng)態(tài)配置客戶端與服務(wù)端屬性以及限流規(guī)則,實(shí)現(xiàn)動(dòng)態(tài)集群限流。

sentinel結(jié)合nacos實(shí)現(xiàn)集群限流

我們使用Nacos對(duì)cloudalibaba-sentinel-cluster-embedded9091進(jìn)行改造,實(shí)現(xiàn)動(dòng)態(tài)配置源來動(dòng)態(tài)進(jìn)行配置。

配置源注冊(cè)的相關(guān)邏輯可以置于 InitFunc 實(shí)現(xiàn)類中,并通過 SPI 注冊(cè),在 Sentinel 初始化時(shí)即可自動(dòng)進(jìn)行配置源加載監(jiān)聽。

嵌入模式部署

添加ClusterInitFunc類

public class ClusterInitFunc implements InitFunc {

    //應(yīng)用名稱
    private static final String APP_NAME = AppNameUtil.getAppName();

    //nacos集群地址
    private final String remoteAddress = "localhost:8848";

    //nacos配置的分組名稱
    private final String groupId = "SENTINEL_GROUP";

    //配置的dataId
    private final String flowDataId = APP_NAME + Constants.FLOW_POSTFIX;
    private final String paramDataId = APP_NAME + Constants.PARAM_FLOW_POSTFIX;
    private final String configDataId = APP_NAME + Constants.CLIENT_CONFIG_POSTFIX;
    private final String clusterMapDataId = APP_NAME + Constants.CLUSTER_MAP_POSTFIX;

    private static final String SEPARATOR = "@";


    @Override
    public void init() {
        // Register client dynamic rule data source.
        //動(dòng)態(tài)數(shù)據(jù)源的方式配置sentinel的流量控制和熱點(diǎn)參數(shù)限流的規(guī)則。
        initDynamicRuleProperty();

        // Register token client related data source.
        // Token client common config
        // 集群限流客戶端的配置屬性
        initClientConfigProperty();
        // Token client assign config (e.g. target token server) retrieved from assign map:
        //初始化Token客戶端
        initClientServerAssignProperty();

        // Register token server related data source.
        // Register dynamic rule data source supplier for token server:
        //集群的流控規(guī)則,比如限制整個(gè)集群的流控閥值,啟動(dòng)的時(shí)候需要添加-Dproject.name=項(xiàng)目名
        registerClusterRuleSupplier();
        // Token server transport config extracted from assign map:
        //初始化server的端口配置
        initServerTransportConfigProperty();

        // Init cluster state property for extracting mode from cluster map data source.
        //初始化集群中服務(wù)是客戶端還是服務(wù)端
        initStateProperty();
    }

    private void initDynamicRuleProperty() {

        //流量控制的DataId分別是APP_NAME + Constants.FLOW_POSTFIX;熱點(diǎn)參數(shù)限流規(guī)則的DataId是APP_NAME + Constants.PARAM_FLOW_POSTFIX;

        ReadableDataSource<String, List<FlowRule>> ruleSource = new NacosDataSource<>(remoteAddress, groupId,
            flowDataId, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
        FlowRuleManager.register2Property(ruleSource.getProperty());

        ReadableDataSource<String, List<ParamFlowRule>> paramRuleSource = new NacosDataSource<>(remoteAddress, groupId,
            paramDataId, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {}));
        ParamFlowRuleManager.register2Property(paramRuleSource.getProperty());
    }

    private void initClientConfigProperty() {
        ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new NacosDataSource<>(remoteAddress, groupId,
            configDataId, source -> JSON.parseObject(source, new TypeReference<ClusterClientConfig>() {}));
        ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty());
    }

    private void initServerTransportConfigProperty() {
        ReadableDataSource<String, ServerTransportConfig> serverTransportDs = new NacosDataSource<>(remoteAddress, groupId,
            clusterMapDataId, source -> {
            List<ClusterGroupEntity> groupList = new Gson().fromJson(source, new TypeToken<List<ClusterGroupEntity>>(){}.getType());
            return Optional.ofNullable(groupList)
                .flatMap(this::extractServerTransportConfig)
                .orElse(null);
        });
        ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty());
    }

    private void registerClusterRuleSupplier() {
        // Register cluster flow rule property supplier which creates data source by namespace.
        // Flow rule dataId format: ${namespace}-flow-rules
        ClusterFlowRuleManager.setPropertySupplier(namespace -> {
            ReadableDataSource<String, List<FlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
                namespace + Constants.FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
            return ds.getProperty();
        });
        // Register cluster parameter flow rule property supplier which creates data source by namespace.
        ClusterParamFlowRuleManager.setPropertySupplier(namespace -> {
            ReadableDataSource<String, List<ParamFlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
                namespace + Constants.PARAM_FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {}));
            return ds.getProperty();
        });
    }

    private void initClientServerAssignProperty() {
        // Cluster map format:
        // [{"clientSet":["112.12.88.66@8729","112.12.88.67@8727"],"ip":"112.12.88.68","serverId":"112.12.88.68@8728","port":11111}]
        // serverId: <ip@commandPort>, commandPort for port exposed to Sentinel dashboard (transport module)
        ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new NacosDataSource<>(remoteAddress, groupId,
            clusterMapDataId, source -> {
            List<ClusterGroupEntity> groupList = new Gson().fromJson(source, new TypeToken<List<ClusterGroupEntity>>(){}.getType());
            return Optional.ofNullable(groupList)
                .flatMap(this::extractClientAssignment)
                .orElse(null);
        });
        ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());
    }

    private void initStateProperty() {
        // Cluster map format:
        // [{"clientSet":["112.12.88.66@8729","112.12.88.67@8727"],"ip":"112.12.88.68","serverId":"112.12.88.68@8728","port":11111}]
        // serverId: <ip@commandPort>, commandPort for port exposed to Sentinel dashboard (transport module)
        ReadableDataSource<String, Integer> clusterModeDs = new NacosDataSource<>(remoteAddress, groupId,
            clusterMapDataId, source -> {
            List<ClusterGroupEntity> groupList = new Gson().fromJson(source, new TypeToken<List<ClusterGroupEntity>>(){}.getType());
            return Optional.ofNullable(groupList)
                .map(this::extractMode)
                .orElse(ClusterStateManager.CLUSTER_NOT_STARTED);
        });
        ClusterStateManager.registerProperty(clusterModeDs.getProperty());
    }

    private int extractMode(List<ClusterGroupEntity> groupList) {
        // If any server group serverId matches current, then it's token server.
        if (groupList.stream().anyMatch(this::machineEqual)) {
            return ClusterStateManager.CLUSTER_SERVER;
        }
        // If current machine belongs to any of the token server group, then it's token client.
        // Otherwise it's unassigned, should be set to NOT_STARTED.
        boolean canBeClient = groupList.stream()
            .flatMap(e -> e.getClientSet().stream())
            .filter(Objects::nonNull)
            .anyMatch(e -> e.equals(getCurrentMachineId()));
        return canBeClient ? ClusterStateManager.CLUSTER_CLIENT : ClusterStateManager.CLUSTER_NOT_STARTED;
    }

    private Optional<ServerTransportConfig> extractServerTransportConfig(List<ClusterGroupEntity> groupList) {
        return groupList.stream()
            .filter(this::machineEqual)
            .findAny()
            .map(e -> new ServerTransportConfig().setPort(e.getPort()).setIdleSeconds(600));
    }

    private Optional<ClusterClientAssignConfig> extractClientAssignment(List<ClusterGroupEntity> groupList) {
        if (groupList.stream().anyMatch(this::machineEqual)) {
            return Optional.empty();
        }
        // Build client assign config from the client set of target server group.
        for (ClusterGroupEntity group : groupList) {
            if (group.getClientSet().contains(getCurrentMachineId())) {
                String ip = group.getIp();
                Integer port = group.getPort();
                return Optional.of(new ClusterClientAssignConfig(ip, port));
            }
        }
        return Optional.empty();
    }

    private boolean machineEqual(/*@Valid*/ ClusterGroupEntity group) {
        return getCurrentMachineId().equals(group.getServerId());
    }

    private String getCurrentMachineId() {
        // Note: this may not work well for container-based env.
        return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getRuntimePort();
    }
}

在resources文件夾下創(chuàng)建META-INF/service,,然后創(chuàng)建一個(gè)叫做com.alibaba.csp.sentinel.init.InitFunc的文件,在文件中指名實(shí)現(xiàn)InitFunc接口的類全路徑,內(nèi)容如下:

com.liang.springcloud.alibaba.init.ClusterInitFunc

添加配置的解析類:

public class ClusterGroupEntity implements Serializable {

    private String serverId;
    private String ip;
    private Integer port;
    private Set<String> clientSet;

    public String getServerId() {
        return serverId;
    }

    public void setServerId(String serverId) {
        this.serverId = serverId;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public Integer getPort() {
        return port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }

    public Set<String> getClientSet() {
        return clientSet;
    }

    public void setClientSet(Set<String> clientSet) {
        this.clientSet = clientSet;
    }

    @Override
    public String toString() {
        return "ClusterGroupEntity{" +
                "serverId='" + serverId + ''' +
                ", ip='" + ip + ''' +
                ", port=" + port +
                ", clientSet=" + clientSet +
                '}';
    }
}

在Nacos中添加動(dòng)態(tài)規(guī)則配置,以及token server與token client的配置:

DataId:cloudalibaba-sentinel-clusterServer-flow-rules Group:SENTINEL_GROUP 配置內(nèi)容(json格式):

[
    {
        "resource" : "/order/{id}",     // 限流的資源名稱
        "grade" : 1,                         // 限流模式為:qps,線程數(shù)限流0,qps限流1
        "count" : 20,                        // 閾值為:20
        "clusterMode" :  true,               // 是否是集群模式,集群模式為:true
        "clusterConfig" : {
            "flowId" : 111,                  // 全局唯一id
            "thresholdType" : 1,             // 閾值模式為:全局閾值,0是單機(jī)均攤,1是全局閥值
            "fallbackToLocalWhenFail" : true // 在 client 連接失敗或通信失敗時(shí),是否退化到本地的限流模式
        }
    }
]

DataId:cloudalibaba-sentinel-clusterServer-cluster-client-config Group:SENTINEL_GROUP 配置內(nèi)容(json格式):

{
    "requestTimeout": 20
}

DataId:cloudalibaba-sentinel-clusterServer-cluster-map Group:SENTINEL_GROUP 配置內(nèi)容(json格式):

[{
	"clientSet": ["10.133.40.30@8721", "10.133.40.30@8722"],
	"ip": "10.133.40.30",
	"serverId": "10.133.40.30@8720",
	"port": 18730   //這個(gè)端口是token server通信的端口
}]

重新啟動(dòng)服務(wù),并訪問接口,我們可以看到流控規(guī)則與集群流控都自動(dòng)配置完成。我們需要測(cè)試,我們集群流控是否已經(jīng)生效。

不斷執(zhí)行以下命令:

ab -n 100 -c 50 http://localhost:9091/order/1
ab -n 100 -c 50 http://localhost:9092/order/3
ab -n 100 -c 50 http://localhost:9093/order/1

測(cè)試效果圖:

我們從實(shí)時(shí)監(jiān)控圖上可以看出,資源名為/order/{id},整個(gè)集群的QPS為20,跟我們的配置是一樣的。當(dāng)作為token server的機(jī)器掛掉后,集群限流會(huì)退化到 local 模式的限流,即在本地按照單機(jī)閾值執(zhí)行限流檢查。

20214984821298

Token Server 分配配置:

20214984821299

上面這張圖可以很好幫忙我們解釋嵌入模式的具體實(shí)現(xiàn)。通過配置信息解析,管理我們的token server與token client。

適用范圍:

嵌入模式適合某個(gè)應(yīng)用集群內(nèi)部的流控。由于隔離性不佳,token server會(huì)影響應(yīng)用本身,需要限制 token server 的總QPS。

獨(dú)立模式部署

獨(dú)立模式相對(duì)于嵌入模式而言就是將token server與應(yīng)用隔離,進(jìn)行獨(dú)立部署。將嵌入模式中token server和token client分離,分別進(jìn)行配置。我們只需要將 InitFunc 實(shí)現(xiàn)類進(jìn)行拆分。

token server的nacos配置

server的名稱空間配置,(集群的namespace或客戶端項(xiàng)目名)如下:

DataId:cluster-server-namespace-set Group:SENTINEL_ALONE_GROUP 配置內(nèi)容(json格式):

[
    "cloudalibaba-sentinel-cluster-client-alone"
]

server的通信端口配置,如下:

DataId:cluster-server-transport-config Group:SENTINEL_ALONE_GROUP 配置內(nèi)容(json格式):

{
 "idleSecods":600,
 "port": 18730
}

Token sever的流控限制配置,如下:

DataId:cluster-server-flow-config Group:SENTINEL_ALONE_GROUP 配置內(nèi)容(json格式):

{
    "exceedCount":1.0,
    "maxAllowedQps":20000,
    "namespace":"cloudalibaba-sentinel-cluster-client-alone"
}

token server的host地址與端口號(hào)配置,如下:

DataId: cluster-server-config Group:SENTINEL_ALONE_GROUP 配置內(nèi)容(json格式):

{
    "serverHost": "10.133.40.30",
    "serverPort": 18730
}

token server的InitFunc類:

/**
 * @PROJECT_NAME: SpringCloud-Learning
 * @USER: yuliang
 * @DESCRIPTION:
 * @DATE: 2021-04-01 10:01
 */
public class ClusterServerInitFunc implements InitFunc {

    //nacos集群地址
    private final String remoteAddress = "localhost:8848";
    //配置的分組名稱
    private final String groupId = "SENTINEL_ALONE_GROUP";

    //配置的dataId
    private final String namespaceSetDataId = "cluster-server-namespace-set";
    private final String serverTransportDataId = "cluster-server-transport-config";
    private final String serverFlowDataId = "cluster-server-flow-config";

    @Override
    public void init() {

        //監(jiān)聽特定namespace(集群的namespace或客戶端項(xiàng)目名)下的集群限流規(guī)則
        initPropertySupplier();
        // 設(shè)置tokenServer管轄的作用域(即管理哪些應(yīng)用)
        initTokenServerNameSpaces();

        // Server transport configuration data source.
        //Server端配置
        initServerTransportConfig();

        // 初始化最大qps
        initServerFlowConfig();

        //初始化服務(wù)器狀態(tài)
        initStateProperty();

    }

    private  void initPropertySupplier(){

        // Register cluster flow rule property supplier which creates data source by namespace.
        // Flow rule dataId format: ${namespace}-flow-rules
        ClusterFlowRuleManager.setPropertySupplier(namespace -> {
            ReadableDataSource<String, List<FlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
                    namespace + Constants.FLOW_POSTFIX,
                    source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
            return ds.getProperty();
        });
        // Register cluster parameter flow rule property supplier which creates data source by namespace.
        ClusterParamFlowRuleManager.setPropertySupplier(namespace -> {
            ReadableDataSource<String, List<ParamFlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
                    namespace + Constants.PARAM_FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {}));
            return ds.getProperty();
        });

    }


    private void initTokenServerNameSpaces(){
        // Server namespace set (scope) data source.
        ReadableDataSource<String, Set<String>> namespaceDs = new NacosDataSource<>(remoteAddress, groupId,
                namespaceSetDataId, source -> JSON.parseObject(source, new TypeReference<Set<String>>() {}));
        ClusterServerConfigManager.registerNamespaceSetProperty(namespaceDs.getProperty());
    }

    private void initServerTransportConfig(){
        // Server transport configuration data source.
        ReadableDataSource<String, ServerTransportConfig> transportConfigDs = new NacosDataSource<>(remoteAddress,
                groupId, serverTransportDataId,
                source -> JSON.parseObject(source, new TypeReference<ServerTransportConfig>() {}));
        ClusterServerConfigManager.registerServerTransportProperty(transportConfigDs.getProperty());
    }


    private void initServerFlowConfig(){

        // Server namespace set (scope) data source.
        ReadableDataSource<String, ServerFlowConfig> serverFlowConfig = new NacosDataSource<>(remoteAddress, groupId,
                serverFlowDataId, source -> JSON.parseObject(source, new TypeReference<ServerFlowConfig>() {}));

        ClusterServerConfigManager.registerGlobalServerFlowProperty(serverFlowConfig.getProperty());
    }

    private void initStateProperty() {
        ClusterStateManager.applyState(ClusterStateManager.CLUSTER_SERVER);

    }
}

token client的nacos配置

客戶端請(qǐng)求超時(shí)配置,如下:

DataId:cluster-client-config Group:SENTINEL_ALONE_GROUP 配置內(nèi)容(json格式):

{
    "requestTimeout": 20
}

流控限流配置,如下:

DataId: cloudalibaba-sentinel-cluster-client-alone-flow-rules Group:SENTINEL_ALONE_GROUP 配置內(nèi)容(json格式):

[
    {
        "resource" : "/order/{id}",     // 限流的資源名稱
        "grade" : 1,                         // 限流模式為:qps
        "count" : 30,                        // 閾值為:30
        "clusterMode" :  true,               // 集群模式為:true
        "clusterConfig" : {
            "flowId" : 111,                  // 全局唯一id
            "thresholdType" : 1,             // 閾值模式為:全局閾值
            "fallbackToLocalWhenFail" : true // 在 client 連接失敗或通信失敗時(shí),是否退化到本地的限流模式
        }
    }
]

熱點(diǎn)限流配置,如下:

DataId:cloudalibaba-sentinel-cluster-client-alone-param-rules Group:SENTINEL_ALONE_GROUP 配置內(nèi)容(json格式):

[
    {
        "resource" : "order",          // 限流的資源名稱
        "paramIdx" : 1,                      //參數(shù)索引
        "grade" : 1,                         // 限流模式為:qps
        "count" : 10,                        // 閾值為:10
        "clusterMode" :  true,               // 集群模式為:true
        "clusterConfig" : {
            "flowId" : 121,                  // 全局唯一id
            "thresholdType" : 1,             // 閾值模式為:全局閾值
            "fallbackToLocalWhenFail" : true // 在 client 連接失敗或通信失敗時(shí),是否退化到本地的限流模式
        },
        "paramFlowItemList":[      //索引為1的參數(shù)值為hot時(shí),接口閾值為50,其他值均為10
            {
                object: "hot",
                count: 50,
                classType: "java.lang.String"
            }
        ]
    }
]

Token client的InitFunc類:

/**
 * @PROJECT_NAME: SpringCloud-Learning
 * @USER: yuliang
 * @DESCRIPTION:
 * @DATE: 2021-04-01 17:47
 */
public class ClusterClientInitFunc implements InitFunc {

    //項(xiàng)目名稱
    private static final String APP_NAME = AppNameUtil.getAppName();
    //nacos集群地址
    private final String remoteAddress = "localhost:8848";
    //nacos配置的分組名稱
    private final String groupId = "SENTINEL_ALONE_GROUP";

    //項(xiàng)目名稱 + Constants的配置名稱,組成配置的dataID
    private final String flowDataId = APP_NAME + Constants.FLOW_POSTFIX;
    private final String paramDataId = APP_NAME + Constants.PARAM_FLOW_POSTFIX;
    private final String configDataId = "cluster-client-config";
    private final String serverDataId =  "cluster-server-config";


    @Override
    public void init() throws Exception {

        // Register client dynamic rule data source.
        //客戶端,動(dòng)態(tài)數(shù)據(jù)源的方式配置sentinel的流量控制和熱點(diǎn)參數(shù)限流的規(guī)則。
        initDynamicRuleProperty();

        // Register token client related data source.
        // Token client common config
        // 集群限流客戶端的配置屬性
        initClientConfigProperty();
        // Token client assign config (e.g. target token server) retrieved from assign map:
        //初始化Token客戶端
        initClientServerAssignProperty();

        //初始化客戶端狀態(tài)
        initStateProperty();
    }

    private void initDynamicRuleProperty() {

        //流量控制的DataId分別是APP_NAME + Constants.FLOW_POSTFIX;熱點(diǎn)參數(shù)限流規(guī)則的DataId是APP_NAME + Constants.PARAM_FLOW_POSTFIX;

        ReadableDataSource<String, List<FlowRule>> ruleSource = new NacosDataSource<>(remoteAddress, groupId,
                flowDataId, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
        FlowRuleManager.register2Property(ruleSource.getProperty());

        ReadableDataSource<String, List<ParamFlowRule>> paramRuleSource = new NacosDataSource<>(remoteAddress, groupId,
                paramDataId, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {}));
        ParamFlowRuleManager.register2Property(paramRuleSource.getProperty());
    }

    private void initClientConfigProperty() {
        ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new NacosDataSource<>(remoteAddress, groupId,
                configDataId, source -> JSON.parseObject(source, new TypeReference<ClusterClientConfig>() {}));
        ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty());
    }

    private void initClientServerAssignProperty() {
        ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new NacosDataSource<>(remoteAddress, groupId,
                serverDataId, source -> JSON.parseObject(source, new TypeReference<ClusterClientAssignConfig>() {}));
        ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());
    }

    private void initStateProperty() {
        ClusterStateManager.applyState(ClusterStateManager.CLUSTER_CLIENT);

    }
}

核心的代碼與配置,如上所示,其他代碼,可以訪問:

<module>cloudalibaba-sentinel-cluster-server-alone9092</module>
<module>cloudalibaba-sentinel-cluster-client-alone9093</module>

測(cè)試:

啟動(dòng)cloudalibaba-sentinel-cluster-server-alone9092,我們啟動(dòng)兩個(gè)實(shí)例,模擬集群(可以啟動(dòng)多個(gè)):

-Dserver.port=9092 -Dcsp.sentinel.log.use.pid=true
-Dserver.port=9094 -Dcsp.sentinel.log.use.pid=true

啟動(dòng)cloudalibaba-sentinel-cluster-client-alone9093,我們啟動(dòng)1個(gè)實(shí)例,模擬server(實(shí)現(xiàn)master選舉之后,可以啟動(dòng)多個(gè)):

-Dserver.port=9093 -Dcsp.sentinel.log.use.pid=true

不斷執(zhí)行以下命令,進(jìn)行接口訪問測(cè)試:

ab -n 100 -c 50 http://localhost:9092/order/1
ab -n 100 -c 50 http://localhost:9094/order/3

我們從實(shí)時(shí)監(jiān)控圖上可以看出,資源名為/order/{id},整個(gè)集群的QPS為30,跟我們的配置是一樣的。當(dāng)作為token server的機(jī)器掛掉后,集群限流會(huì)退化到 local 模式的限流,即在本地按照單機(jī)閾值執(zhí)行限流檢查。

202149848212100

熱點(diǎn)限流已經(jīng)為大家實(shí)現(xiàn)了,大家可以自行測(cè)試,比較簡(jiǎn)單,不再累述。

ab -n 100 -c 50  http://localhost:9092/hot_order/1/hot
ab -n 100 -c 50  http://localhost:9094/hot_order/1/hot

ab -n 100 -c 50  http://localhost:9092/hot_order/1/nothot
ab -n 100 -c 50  http://localhost:9094/hot_order/1/nothot

其它

若在生產(chǎn)環(huán)境使用集群限流,管控端還需要關(guān)注以下的問題:

  • Token Server 自動(dòng)管理、調(diào)度(分配/選舉 Token Server)
  • Token Server 高可用,在某個(gè) server 不可用時(shí)自動(dòng) failover 到其它機(jī)器

 總結(jié)

集群流控,有兩種模式,嵌入模式和獨(dú)立模式,個(gè)人不建議在業(yè)務(wù)系統(tǒng)使用集群流控,集群流控可以在網(wǎng)關(guān)層做,業(yè)務(wù)層的話可以使用單機(jī)流控,相對(duì)來說簡(jiǎn)單好上手。token server目前存在單點(diǎn)問題,需要個(gè)人實(shí)現(xiàn)master選舉,并修改 cluster-server-config的IP即可。

代碼示例

本文示例讀者可以通過查看下面?zhèn)}庫(kù)中的項(xiàng)目,如下所示:

<module>cloudalibaba-sentinel-cluster</module>

Github:https://github.com/jiuqiyuliang/SpringCloud-Learning

到此這篇關(guān)于 Sentinel 實(shí)現(xiàn)動(dòng)態(tài)配置的集群流控的文章就介紹到這了,想要了解更多相關(guān) Sentinel 項(xiàng)目的其他應(yīng)用和集群流控的其他內(nèi)容請(qǐng)搜索W3Cschool以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持!


0 人點(diǎn)贊