Skip to content
This repository has been archived by the owner on May 18, 2020. It is now read-only.

Support for Elasticsearch 7. #87

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ class ElasticRestClient {
this.templatesDescription = templatesDescription;
}

void createIndices() {
void createIndices(boolean onlyOneTypePerIndex) {
waitForClusterYellow();
indicesDescription.getIndicesNames().forEach(this::createIndex);
indicesDescription.getIndicesNames().forEach((name) -> createIndex(name, onlyOneTypePerIndex));
}

void createIndex(String indexName) {
void createIndex(String indexName, boolean onlyOneTypePerIndex) {
if (!indexExists(indexName)) {
HttpPut request = new HttpPut(url("/" + indexName));
indicesDescription
.getIndexSettings(indexName)
.ifPresent(indexSettings -> setIndexSettingsAsEntity(request, indexSettings));
.ifPresent(indexSettings -> setIndexSettingsAsEntity(request, indexSettings, onlyOneTypePerIndex));
httpClient.execute(request, response -> {
if (response.getStatusLine().getStatusCode() != 200) {
String responseBody = readBodySafely(response);
Expand All @@ -69,8 +69,8 @@ void createIndex(String indexName) {
}
}

private void setIndexSettingsAsEntity(HttpPut request, IndexSettings indexSettings) {
request.setEntity(new StringEntity(indexSettings.toJson().toString(), APPLICATION_JSON));
private void setIndexSettingsAsEntity(HttpPut request, IndexSettings indexSettings, boolean onlyOneTypePerIndex) {
request.setEntity(new StringEntity(indexSettings.toJson(onlyOneTypePerIndex).toString(), APPLICATION_JSON));
}

private boolean indexExists(String indexName) {
Expand Down Expand Up @@ -135,11 +135,11 @@ void deleteIndex(String indexName) {
}
}

void bulkIndex(Collection<IndexRequest> indexRequests) {
void bulkIndex(Collection<IndexRequest> indexRequests, boolean onlyOneTypePerIndex) {
String bulkRequestBody = indexRequests.stream()
.flatMap(request ->
Stream.of(
indexMetadataJson(request.getIndexName(), request.getIndexType(), request.getId(), request.getRouting()),
indexMetadataJson(request.getIndexName(), request.getIndexType(), request.getId(), request.getRouting(), onlyOneTypePerIndex),
request.getJson()
)
)
Expand All @@ -149,14 +149,14 @@ void bulkIndex(Collection<IndexRequest> indexRequests) {
performBulkRequest(url("/_bulk"), bulkRequestBody);
}

private String indexMetadataJson(String indexName, String indexType, String id, String routing) {
private String indexMetadataJson(String indexName, String indexType, String id, String routing, boolean v7Format) {
StringJoiner joiner = new StringJoiner(",");

if(indexName != null) {
joiner.add("\"_index\": \"" + indexName + "\"");
}

if(indexType != null) {
if(indexType != null && !v7Format) {
joiner.add("\"_type\": \"" + indexType + "\"");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package pl.allegro.tech.embeddedelasticsearch;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.rauschig.jarchivelib.ArchiveFormat;
import org.rauschig.jarchivelib.Archiver;
import org.rauschig.jarchivelib.ArchiverFactory;
import org.rauschig.jarchivelib.CompressionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.embeddedelasticsearch.InstallationDescription.Plugin;
Expand Down Expand Up @@ -72,7 +75,14 @@ private void installElastic(Path downloadedTo) throws IOException {
}

private void unzip(Path downloadedTo, File destination) throws IOException {
Archiver archiver = ArchiverFactory.createArchiver("zip");
Archiver archiver;
if (downloadedTo.toString().endsWith(".zip")) {
archiver = ArchiverFactory.createArchiver(ArchiveFormat.ZIP);
} else if (downloadedTo.toString().endsWith((".tar.gz"))) {
archiver = ArchiverFactory.createArchiver(ArchiveFormat.TAR, CompressionType.GZIP);
} else {
throw new IOException("Archive format not recognized");
}
archiver.extract(downloadedTo.toFile(), destination);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void index(String indexName, String indexType, List<CharSequence> jsons)
* @param indexRequests document to be indexed along with metadata
*/
public void index(List<IndexRequest> indexRequests) {
elasticRestClient.bulkIndex(indexRequests);
elasticRestClient.bulkIndex(indexRequests, installationDescription.versionIs7x());
}

/**
Expand Down Expand Up @@ -191,7 +191,7 @@ public void deleteIndex(String indexName) {
* Create all indices
*/
public void createIndices() {
elasticRestClient.createIndices();
elasticRestClient.createIndices(installationDescription.versionIs7x());
}

/**
Expand All @@ -200,7 +200,7 @@ public void createIndices() {
* @param indexName index to create
*/
public void createIndex(String indexName) {
elasticRestClient.createIndex(indexName);
elasticRestClient.createIndex(indexName, installationDescription.versionIs7x());
}

public void createTemplates() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,34 @@ public IndexSettings build() {
}

public ObjectNode toJson() {
return toJson(false);
}

public ObjectNode toJson(boolean v7Format) {
ObjectNode objectNode = new ObjectMapper().createObjectNode();
objectNode.set("settings", settings.orElse(OBJECT_MAPPER.createObjectNode()));
objectNode.set("aliases", aliases.orElse(OBJECT_MAPPER.createObjectNode()));
ObjectNode mappingsObject = prepareMappingsObject();
ObjectNode mappingsObject = prepareMappingsObject(v7Format);
objectNode.set("mappings", mappingsObject);
return objectNode;
}

private ObjectNode prepareMappingsObject() {
private ObjectNode prepareMappingsObject(boolean v7Format) {
if (v7Format) {
switch (types.size()) {
case 0:
return OBJECT_MAPPER.createObjectNode();
case 1:
JsonNode mapping = types.get(0).getMapping();
if (mapping == null) {
return OBJECT_MAPPER.createObjectNode();
}
return mapping.deepCopy();
default:
throw new RuntimeException("Elasticsearch v7 and above only allow one type per index");
}
}

ObjectNode mappingsObject = OBJECT_MAPPER.createObjectNode();
types.forEach(type -> mappingsObject.set(type.getType(), type.getMapping()));
return mappingsObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public String determineVersion() {
}

private String versionFromUrl(URL url) {
Pattern versionPattern = Pattern.compile("-([^/]*).zip");
Pattern versionPattern = Pattern.compile("-([^/]*?)(-(windows|linux|darwin)-x86_64)?.(zip|tar.gz)");
Matcher matcher = versionPattern.matcher(url.toString());
if (matcher.find()) {
return matcher.group(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package pl.allegro.tech.embeddedelasticsearch;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.lang3.tuple.Pair;

import java.net.MalformedURLException;
import java.net.URL;
Expand Down Expand Up @@ -29,17 +31,39 @@ public URL resolveDownloadUrl() {
private URL urlFromVersion(String version) {
ElsDownloadUrl elsDownloadUrl = ElsDownloadUrl.getByVersion(version);
try {
return new URL(StringUtils.replace(elsDownloadUrl.downloadUrl, "{VERSION}", version));
return new URL(StringUtils.replaceEach(
elsDownloadUrl.downloadUrl,
new String[]{"{VERSION}", "{OS}", "{FILETYPE}"},
new String[]{version, getOS(), getExtension()}));
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
}

private String getOS() {
if (SystemUtils.IS_OS_WINDOWS) {
return "windows";
} else if (SystemUtils.IS_OS_MAC) {
return "darwin";
} else if (SystemUtils.IS_OS_LINUX) {
return "linux";
}
throw new RuntimeException("OS " + SystemUtils.OS_NAME + " not supported by Elasticsearch");
}

private String getExtension() {
if (SystemUtils.IS_OS_WINDOWS) {
return "zip";
}
return "tar.gz";
}

private enum ElsDownloadUrl {
ELS_1x("1.", "https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-{VERSION}.zip"),
ELS_2x("2.", "https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/{VERSION}/elasticsearch-{VERSION}.zip"),
ELS_5x("5.", "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-{VERSION}.zip"),
ELS_6x("6.", ELS_5x.downloadUrl);
ELS_6x("6.", ELS_5x.downloadUrl),
ELS_7x("7.", "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-{VERSION}-{OS}-x86_64.{FILETYPE}");

String versionPrefix;
String downloadUrl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ boolean versionIs2x() {
return getVersion().startsWith("2.");
}

boolean versionIs7x() { return getVersion().startsWith("7."); }

boolean isCleanInstallationDirectoryOnStop() {
return cleanInstallationDirectoryOnStop;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@ class InstallationSourceSpec extends Specification {

def "should construct valid url for version"() {
given:
final installationSource = new InstallFromVersion("5.0.0-alpha1")
final installationSource = new InstallFromVersion(version)
when:
final resolvedUrl = installationSource.resolveDownloadUrl()
then:
resolvedUrl != null
resolvedUrl.toString() == url
where:
version | url
"5.0.0-alpha1" | "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.0.0-alpha1.zip"
"6.7.1" | "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.7.1.zip"
"7.0.0" | "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.0.0-windows-x86_64.zip"
}

def "should extract properly version from normal url"() {
Expand All @@ -35,6 +40,7 @@ class InstallationSourceSpec extends Specification {
"http://elasticsearch-download.example.com/elasticsearch-4.0.0.zip" | "4.0.0"
"http://example.com/elasticsearch-10.0.0-SNAPSHOT-fix-branch-123.zip" | "10.0.0-SNAPSHOT-fix-branch-123"
"http://example.com/abc-5.0.0.zip" | "5.0.0"
"http://example.com/elastic-7.0.0-windows-x86_64.zip" | "7.0.0"
}

def "should throw exception when version is missing in url"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import static java.util.concurrent.TimeUnit.MINUTES
import static pl.allegro.tech.embeddedelasticsearch.PopularProperties.HTTP_PORT

class SynchronicitySpec extends Specification {
static final ELASTIC_VERSION = "2.2.0"
static final ELASTIC_VERSION = "7.0.0"
static final HTTP_PORT_VALUE = 9999

def "should not throw exception on starting embedded instance more than once"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import static java.util.concurrent.TimeUnit.MINUTES

class ValidationSpec extends Specification {

static final ELASTIC_VERSION = "2.2.0"
static final ELASTIC_DOWNLOAD_URL = new URL("https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/2.2.0/elasticsearch-2.2.0.zip")
static final ELASTIC_VERSION = "7.0.0"
static final ELASTIC_DOWNLOAD_URL = new URL("https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.0.0-linux-x86_64.tar.gz")

def "should throw exception on missing elastic version and download url"() {
when:
Expand Down
10 changes: 10 additions & 0 deletions es70-test/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
dependencies {
testCompile project(':test-base')

testCompile group: 'org.elasticsearch', name: 'elasticsearch', version: '7.0.0'
testCompile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '7.0.0'
testCompile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '7.0.0'
testCompile group: 'org.locationtech.spatial4j', name: 'spatial4j', version: '0.6'
testCompile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.6.2'
testCompile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.6.2'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package pl.allegro.tech.embeddedelasticsearch

import org.apache.http.HttpHost
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.search.SearchRequest
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestHighLevelClient
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.search.builder.SearchSourceBuilder

import static java.util.concurrent.TimeUnit.MINUTES
import static pl.allegro.tech.embeddedelasticsearch.PopularProperties.HTTP_PORT
import static pl.allegro.tech.embeddedelasticsearch.SampleIndices.*
import static pl.allegro.tech.embeddedelasticsearch.SampleIndices.BOOKS_INDEX_7x
import static pl.allegro.tech.embeddedelasticsearch.SampleIndices.CARS_TEMPLATE_7x

class EmbeddedElasticSpec extends EmbeddedElasticCoreApiBaseSpec {

static final ELASTIC_VERSION = "7.0.0"
static final HTTP_PORT_VALUE = 9999

static EmbeddedElastic embeddedElastic = EmbeddedElastic.builder()
.withElasticVersion(ELASTIC_VERSION)
.withSetting(HTTP_PORT, HTTP_PORT_VALUE)
.withEsJavaOpts("-Xms128m -Xmx512m")
.withTemplate(CARS_TEMPLATE_NAME, CARS_TEMPLATE_7x)
.withIndex(CARS_INDEX_NAME)
.withIndex(BOOKS_INDEX_NAME, BOOKS_INDEX_7x)
.withStartTimeout(2, MINUTES)
.build()
.start()

static RestHighLevelClient client = createClient()

def setup() {
embeddedElastic.recreateIndices()
}

def cleanupSpec() {
client.close()
embeddedElastic.stop()
}

static RestHighLevelClient createClient() {
return new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", HTTP_PORT_VALUE)))
}

@Override
List<String> fetchAllDocuments() {
fetchAllDocuments(CARS_INDEX_NAME) + fetchAllDocuments(BOOKS_INDEX_NAME)
}

@Override
List<String> fetchAllDocuments(String indexName) {
final searchRequest = new SearchRequest(indexName)
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));

client.search(searchRequest, RequestOptions.DEFAULT)
.hits.hits.toList()
.collect { it.sourceAsString }
}

@Override
List<String> fetchAllDocuments(String indexName, String typeName) {
fetchAllDocuments(indexName)
}

@Override
List<String> fetchAllDocuments(String indexName, String typeName, String routing) {
final searchRequest = new SearchRequest(indexName)
.routing(routing)
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))

client.search(searchRequest, RequestOptions.DEFAULT)
.hits.hits.toList()
.collect { it.sourceAsString }
}

@Override
List<String> searchByTerm(String indexName, String typeName, String fieldName, String value) {
final searchRequest = new SearchRequest()
.source(new SearchSourceBuilder().query(QueryBuilders.termQuery(fieldName, value)));

client.search(searchRequest, RequestOptions.DEFAULT)
.hits.hits.toList()
.collect { it.sourceAsString }
}

@Override
String getById(String indexName, String typeName, String id) {
final getRequest = new GetRequest(indexName, id)
client.get(getRequest, RequestOptions.DEFAULT).sourceAsString
}
}
Loading