App下載

使用Quarkus在Elasticsearch進(jìn)行響應(yīng)式方法,案例分享!干貨!

迪士尼在逃公主 2021-09-09 16:55:30 瀏覽數(shù) (2640)
反饋

我已經(jīng)實(shí)現(xiàn)了一項(xiàng)服務(wù),Quarkus作為主要框架,Elasticsearch作為數(shù)據(jù)存儲。在實(shí)現(xiàn)過程中,我萌生了寫一篇關(guān)于如何使用Elasticsearch 的 Java High Level REST Client以反應(yīng)式方式綁定 Quarkus 的想法。

開始對文章做筆記,將常用庫(otaibe-commons-quarkus-elasticsearch模塊)中常用的Elasticsearch相關(guān)代碼分離出來存放在Github中。然后,我花了幾個小時以 Quarkus 指南頁面中的方式組裝了一個簡單的示例項(xiàng)目(也在 Github 中)  。目前,那里缺少 Elasticsearch 指南。

讓我們繼續(xù)更詳細(xì)地解釋如何連接 Quarkus 和 Elasticsearch。

創(chuàng)建 Quarkus 項(xiàng)目

mvn io.quarkus:quarkus-maven-plugin:1.0.1.Final:create \
    -DprojectGroupId=org.otaibe.quarkus.elasticsearch.example \
    -DprojectArtifactId=otaibe-quarkus-elasticsearch-example \
    -DclassName="org.otaibe.quarkus.elasticsearch.example.web.controller.FruitResource" \
    -Dpath="/fruits" \
    -Dextensions="resteasy-jackson,elasticsearch-rest-client"

Maven 設(shè)置

如您所見,Quarkus 中存在一個elasticsearch -rest-client  ;然而,這是一個 Elasticsearch Java 低級 REST 客戶端。如果我們想使用 Elasticsearch Java High Level REST Client,我們只需要將它作為依賴添加到pom.xml文件中:

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
   <version>7.4.0</version>
</dependency>

    

請確保 Elasticsearch Java Low Level REST Client 的版本與 Elasticsearch Java High Level REST Client匹配。

由于我們以響應(yīng)式方式使用 Elasticsearch,因此我更喜歡使用 Project Reactor。我們必須在依賴管理部分添加 BOM:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-bom</artifactId>
    <version>Dysprosium-SR2</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

我們還必須添加 reactor-core 作為依賴項(xiàng):

<dependency>
    <groupId>io.projectreactor</groupId>
  <artifactId>reactor-core</artifactId>
</dependency>

我已經(jīng)在一個庫中分離了公共代碼,所以我們應(yīng)該將這個庫添加到我們的示例項(xiàng)目中。為此,我們將使用Jitpack。這是一項(xiàng)很棒的服務(wù)。你只需要為 你的 Github 項(xiàng)目指出 正確的方法,它就會為它構(gòu)建一個工件。這是我使用它的方式:

<dependency>
    <groupId>com.github.tpenakov.otaibe-commons-quarkus</groupId>
    <artifactId>otaibe-commons-quarkus-core</artifactId>
    <version>elasticsearch-example.02</version>
</dependency>
<dependency>
    <groupId>com.github.tpenakov.otaibe-commons-quarkus</groupId>
    <artifactId>otaibe-commons-quarkus-elasticsearch</artifactId>
    <version>elasticsearch-example.02</version>
</dependency>
<dependency>
    <groupId>com.github.tpenakov.otaibe-commons-quarkus</groupId>
    <artifactId>otaibe-commons-quarkus-rest</artifactId>
    <version>elasticsearch-example.02</version>
</dependency>

通過 Docker 啟動 Elasticsearch

此外,我們應(yīng)該啟動 Elastisearch。最簡單的方法是通過 Docker 運(yùn)行它:

docker run -it --rm=true --name elasticsearch_quarkus_test \
    -p 11027:9200 -p 11028:9300 \
    -e "discovery.type=single-node" \
    docker.elastic.co/elasticsearch/elasticsearch:7.4.0

連接到 Elasticsearch

讓我們從將我們的服務(wù)連接到 Elasticsearch 開始——示例項(xiàng)目中的實(shí)現(xiàn)很簡單——因此它將偵聽 Quarkus 啟動和關(guān)閉事件并初始化或終止連接:

package org.otaibe.quarkus.elasticsearch.example.service;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.otaibe.commons.quarkus.elasticsearch.client.service.AbstractElasticsearchService;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
@ApplicationScoped
@Getter
@Setter
@Slf4j
public class ElasticsearchService extends AbstractElasticsearchService {
    public void init(@Observes StartupEvent event) {
        log.info("init started");
        super.init();
        log.info("init completed");
    }

    public void shutdown(@Observes ShutdownEvent event) {
        log.info("shutdown started");
        super.shutdown();
        log.info("shutdown completed");
    }
}

連接到 Elasticsearch 的實(shí)際工作是在AbstractElasticsearchService 中完成的:

public abstract class AbstractElasticsearchService {
    @ConfigProperty(name = "service.elastic-search.hosts")
    String[] hosts;
    @ConfigProperty(name = "service.elastic-search.num-threads", defaultValue = "10")
    Optional<Integer> numThreads;
    private RestHighLevelClient restClient;
    private Sniffer sniffer;
    @PostConstruct
    public void init() {
        log.info("init started");
        List<HttpHost> httpHosts = Arrays.stream(hosts)
                .map(s -> StringUtils.split(s, ':'))
                .map(strings -> new HttpHost(strings[0], Integer.valueOf(strings[1])))
                .collect(Collectors.toList());
        RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
        getNumThreads().ifPresent(integer ->
                builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultIOReactorConfig(
                        IOReactorConfig
                                .custom()
                                .setIoThreadCount(integer)
                                .build())
                ));
        restClient = new RestHighLevelClient(builder);
        sniffer = Sniffer.builder(getRestClient().getLowLevelClient()).build();
        log.info("init completed");
    }
}

如您所見,此處的連接是按照Elasticsearch 文檔 中建議的方式完成的。我的實(shí)現(xiàn)取決于兩個配置屬性:

屬性文件:

service.elastic-search.hosts=localhost:11027

這是從 Docker 啟動后的 Elasticsearch 連接字符串。第二個可選屬性是:屬性文件

service.elastic-search.num-threads

這是客戶端所需的線程數(shù)。

創(chuàng)建 POJO

現(xiàn)在,讓我們創(chuàng)建域?qū)ο螅?a >Fruit):

package org.otaibe.quarkus.elasticsearch.example.domain;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
public class Fruit {
    public static final String ID = "id";
    public static final String EXT_REF_ID = "ext_ref_id";
    public static final String NAME = "name";
    public static final String DESCRIPTION = "description";
    public static final String VERSION = "version";
    @JsonProperty(ID)
    public String id;
    @JsonProperty(EXT_REF_ID)
    public String extRefId;
    @JsonProperty(NAME)
    public String name;
    @JsonProperty(DESCRIPTION)
    public String description;
    @JsonProperty(VERSION)
    public Long version;
}

創(chuàng)建和實(shí)現(xiàn) DAO

創(chuàng)建索引

讓我們創(chuàng)建 FruitDaoImpl。它是一個高級類,用于填充 AbstractElasticsearchReactiveDaoImplementation 并實(shí)現(xiàn)所需的業(yè)務(wù)邏輯。這里的另一個重要部分是為 Fruit 類創(chuàng)建索引:

@Override
protected Mono<Boolean> createIndex() {
    CreateIndexRequest request = new CreateIndexRequest(getTableName());
    Map<String, Object> mapping = new HashMap();
    Map<String, Object> propsMapping = new HashMap<>();
    propsMapping.put(Fruit.ID, getKeywordTextAnalizer());
    propsMapping.put(Fruit.EXT_REF_ID, getKeywordTextAnalizer());
    propsMapping.put(Fruit.NAME, getTextAnalizer(ENGLISH));
    propsMapping.put(Fruit.DESCRIPTION, getTextAnalizer(ENGLISH));
    propsMapping.put(Fruit.VERSION, getLongFieldType());
    mapping.put(PROPERTIES, propsMapping);
    request.mapping(mapping);
    return createIndex(request);
}

對 Elasticsearch 的真正創(chuàng)建索引調(diào)用是在父類 ( AbstractElasticsearchReactiveDaoImplementation ) 中實(shí)現(xiàn)的:

protected Mono<Boolean> createIndex(CreateIndexRequest request) {
    return Flux.<Boolean>create(fluxSink -> getRestClient().indices().createAsync(request, RequestOptions.DEFAULT, new ActionListener<CreateIndexResponse>() {
        @Override
        public void onResponse(CreateIndexResponse createIndexResponse) {
            log.info("CreateIndexResponse: {}", createIndexResponse);
            fluxSink.next(createIndexResponse.isAcknowledged());
            fluxSink.complete();
        }
        @Override
        public void onFailure(Exception e) {
            log.error("unable to create index", e);
            fluxSink.error(new RuntimeException(e));
        }
    }))
            .next();
}

玩轉(zhuǎn) DAO

大多數(shù) CRUD 操作在AbstractElasticsearchReactiveDaoImplementation中實(shí)現(xiàn) 。

它有  save、   update、  findById和  deleteById 公共方法。它也有findByExactMatch和 findByMatch保護(hù)方法。FindBy*當(dāng)需要填充業(yè)務(wù)邏輯時,這些 方法在后代類中非常有用。

業(yè)務(wù)查找方法在FruitDaoImpl 類中實(shí)現(xiàn) :

public Flux<Fruit> findByExternalRefId(String value) {
    return findByMatch(Fruit.EXT_REF_ID, value);
}
public Flux<Fruit> findByName(String value) {
    return findByMatch(Fruit.NAME, value);
}
public Flux<Fruit> findByDescription(String value) {
    return findByMatch(Fruit.NAME, value);
}
public Flux<Fruit> findByNameOrDescription(String value) {
    Map<String, Object> query = new HashMap<>();
    query.put(Fruit.NAME, value);
    query.put(Fruit.DESCRIPTION, value);
    return findByMatch(query);
}

在Service類中封裝 DAO

FruitDaoImpl 封裝在 FruitService 中

@ApplicationScoped
@Getter
@Setter
@Slf4j
public class FruitService {
    @Inject
    FruitDaoImpl dao;
    public Mono<Fruit> save(Fruit entity) {
        return getDao().save(entity);
    }
    public Mono<Fruit> findById(Fruit entity) {
        return getDao().findById(entity);
    }

    public Mono<Fruit> findById(String id) {
        return Mono.just(Fruit.of(id, null, null, null, null))

                .flatMap(entity -> findById(entity));
    }
    public Flux<Fruit> findByExternalRefId(String value) {
        return getDao().findByExternalRefId(value);
    }
    public Flux<Fruit> findByName(String value) {
        return getDao().findByName(value);
    }
    public Flux<Fruit> findByDescription(String value) {
        return getDao().findByDescription(value);
    }
    public Flux<Fruit> findByNameOrDescription(String value) {
        return getDao().findByNameOrDescription(value);
    }
    public Mono<Boolean> delete(Fruit entity) {
        return Mono.just(entity.getId())
                .filter(s -> StringUtils.isNotBlank(s))
                .flatMap(s -> getDao().deleteById(entity))
                .defaultIfEmpty(false);
    }
}

測試 FruitService

該 FruitServiceTests 寫入,以測試基本功能。它還用于確保 Fruit 類字段被正確索引并且全文搜索按預(yù)期工作:

@Test
public void manageFruitTest() {
    Fruit apple = getTestUtils().createApple();
    Fruit apple1 = getFruitService().save(apple).block();
    Assertions.assertNotNull(apple1.getId());
    Assertions.assertTrue(apple1.getVersion() > 0);
    log.info("saved result: {}", getJsonUtils().toStringLazy(apple1));
    List<Fruit> fruitList = getFruitService().findByExternalRefId(TestUtils.EXT_REF_ID).collectList().block();
    Assertions.assertTrue(fruitList.size() > 0);
    List<Fruit> fruitList1 = getFruitService().findByNameOrDescription("bulgaria").collectList().block();
    Assertions.assertTrue(fruitList1.size() > 0);
    //Ensure that the full text search is working - it is 'Apples' in description
    List<Fruit> fruitList2 = getFruitService().findByDescription("apple").collectList().block();
    Assertions.assertTrue(fruitList2.size() > 0);
    //Ensure that the full text search is working - it is 'Apple' in name
    List<Fruit> fruitList3 = getFruitService().findByName("apples").collectList().block();
    Assertions.assertTrue(fruitList3.size() > 0);
    Boolean deleteAppleResult = getFruitService().getDao().deleteById(apple1).block();
    Assertions.assertTrue(deleteAppleResult);
}
        

添加 REST 端點(diǎn)

因?yàn)檫@是一個示例項(xiàng)目,完整的 CRUD 功能不會作為 REST 端點(diǎn)添加。只有save和 findById被添加為 REST 端點(diǎn)。它們被添加到 FruitResource 中。那里的方法返回 CompletionStage<Response>,這確保我們的應(yīng)用程序中不會有阻塞的線程。

測試 REST 端點(diǎn)

 添加FruitResourceTest以測試 RESTendpoints:

package org.otaibe.quarkus.elasticsearch.example.web.controller;
import io.quarkus.test.junit.QuarkusTest;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.otaibe.commons.quarkus.core.utils.JsonUtils;
import org.otaibe.quarkus.elasticsearch.example.domain.Fruit;
import org.otaibe.quarkus.elasticsearch.example.service.FruitService;
import org.otaibe.quarkus.elasticsearch.example.utils.TestUtils;
import javax.inject.Inject;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.net.URI;
import java.util.Optional;
import static io.restassured.RestAssured.given;
@QuarkusTest
@Getter(value = AccessLevel.PROTECTED)
@Slf4j
public class FruitResourceTest {
    @ConfigProperty(name = "service.http.host")
    Optional<URI> host;
    @Inject
    TestUtils testUtils;
    @Inject
    JsonUtils jsonUtils;
    @Inject
    FruitService service;
    @Test
    public void restEndpointsTest() {
        log.info("restEndpointsTest start");
        Fruit apple = getTestUtils().createApple();
        Fruit savedApple = given()
                .when()
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
                .body(apple)
                .post(getUri(FruitResource.ROOT_PATH))
                .then()
                .statusCode(200)
                .extract()
                .as(Fruit.class);
        String id = savedApple.getId();
        Assertions.assertTrue(StringUtils.isNotBlank(id));
        URI findByIdPath = UriBuilder.fromPath(FruitResource.ROOT_PATH)
                .path(id)
                .build();
        Fruit foundApple = given()
                .when().get(getUri(findByIdPath.getPath()).getPath())
                .then()
                .statusCode(200)
                .extract()
                .as(Fruit.class);
        Assertions.assertEquals(savedApple, foundApple);
        Boolean deleteResult = getService().delete(foundApple).block();
        Assertions.assertTrue(deleteResult);
        given()
                .when().get(findByIdPath.getPath())
                .then()
                .statusCode(Response.Status.NOT_FOUND.getStatusCode()) ;
        log.info("restEndpointsTest end");
    }
    private URI getUri(String path) {
        return getUriBuilder(path)
                .build();
    }
    private UriBuilder getUriBuilder(String path) {
        return getHost()
                .map(uri -> UriBuilder.fromUri(uri))
                .map(uriBuilder -> uriBuilder.path(path))
                .orElse(UriBuilder
                        .fromPath(path)
                );
    }
}
            

構(gòu)建本地可執(zhí)行文件

在構(gòu)建本機(jī)可執(zhí)行文件之前,我們必須注冊我們的 Fruit 域?qū)ο?。這樣做的原因是我們的 FruitResource 返回 CompletionStage<Response>,因此,應(yīng)用程序的實(shí)際返回類型是未知的,因此我們必須顯式注冊它以進(jìn)行反射。在 Quarkus 中至少有兩種方法可以做到這一點(diǎn):

  1. 通過 @RegisterForReflection 注釋。
  2. 通過 反射-config.json。

我個人更喜歡第二種方法,因?yàn)槟缘念惪赡茉诘谌綆熘?,并且不可能?nbsp;@RegisterForReflection 放在 那里。

現(xiàn)在,  reflection-config.json 看起來像這樣:

[
  {
    "name" : "org.otaibe.quarkus.elasticsearch.example.domain.Fruit",
    "allDeclaredConstructors" : true,
    "allPublicConstructors" : true,
    "allDeclaredMethods" : true,
    "allPublicMethods" : true,
    "allDeclaredFields" : true,
    "allPublicFields" : true
  }
]

下一步是讓 Quarkus 知道 reflection-config.json 文件。您應(yīng)該將此行添加到pom.xml文件中的 native配置文件中:

<quarkus.native.additional-build-args>-H:ReflectionConfigurationFiles=${project.basedir}/src/main/resources/reflection-config.json</quarkus.native.additional-build-args>

您現(xiàn)在可以構(gòu)建您的本機(jī)應(yīng)用程序:

mvn clean package -Pnative

并啟動它:

./target/otaibe-quarkus-elasticsearch-example-1.0-SNAPSHOT-runner

該服務(wù)將在http://localhost:11025上可用,因?yàn)檫@是application.properties 中明確指定的端口。

quarkus.http.port=11025

測試本機(jī)構(gòu)建

該 FruitResourceTest 預(yù)計(jì)以下可選屬性:

屬性文件:

service.http.host

如果存在,測試請求將命中指定的主機(jī)。如果您啟動本機(jī)可執(zhí)行文件:

shell:

./target/otaibe-quarkus-elasticsearch-example-1.0-SNAPSHOT-runner

并使用以下代碼執(zhí)行測試/構(gòu)建:

shell:

mvn package -D %test.service.http .host = http://localhost:11025

測試將針對本機(jī)構(gòu)建運(yùn)行。

結(jié)論

我驚喜地發(fā)現(xiàn) Elasticsearch 與 Quarkus 一起開箱即用,可以編譯為本地代碼,結(jié)合通過Project Reactor 的反應(yīng)式實(shí)現(xiàn) ,將使應(yīng)用程序的占用空間幾乎微不足道。



0 人點(diǎn)贊