Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugs fixed and Enhancement #4

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>${storm.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ public interface TridentElasticSearchMapper extends Serializable {
*/
public Map<String, Object> mapToData(TridentTuple tuple);

@Deprecated
public Settings mapToIndexSettings(TridentTuple tuple);

@SuppressWarnings("rawtypes")
@Deprecated
public Map mapToMappingSettings(TridentTuple tuple);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@
//
package com.hmsonline.storm.elasticsearch.trident;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
Expand All @@ -23,7 +20,6 @@

import storm.trident.state.State;
import storm.trident.tuple.TridentTuple;
import backtype.storm.topology.FailedException;

import com.hmsonline.storm.elasticsearch.StormElasticSearchConstants;
import com.hmsonline.storm.elasticsearch.StormElasticSearchUtils;
Expand All @@ -39,7 +35,12 @@ public ElasticSearchState(Map config) {
LOGGER.debug("Initialize ElasticSearchState");
String clusterName = (String) config.get(StormElasticSearchConstants.ES_CLUSTER_NAME);
String host = (String) config.get(StormElasticSearchConstants.ES_HOST);
Integer port = (Integer) config.get(StormElasticSearchConstants.ES_PORT);
Integer port = 9300;
try {
port = Integer.parseInt(config.get(StormElasticSearchConstants.ES_PORT).toString());
} catch (Exception e) {
LOGGER.warn("Cannot get elastic search port from config file. Use default value: 9300");
}

Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", clusterName).build();
client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress(host, port));
Expand All @@ -64,20 +65,13 @@ public void commit(Long txid) {
public void createIndices(TridentElasticSearchMapper mapper, List<TridentTuple> tuples) {
BulkRequestBuilder bulkRequest = client.prepareBulk();

Set<String> existingIndex = new HashSet<String>();
for (TridentTuple tuple : tuples) {
String indexName = mapper.mapToIndex(tuple);
String type = mapper.mapToType(tuple);
String key = mapper.mapToKey(tuple);
Map<String, Object> data = mapper.mapToData(tuple);
String parentId = mapper.mapToParentId(tuple);

if (!existingIndex.contains(indexName)
&& !client.admin().indices().exists(new IndicesExistsRequest(indexName)).actionGet().isExists()) {
createIndex(bulkRequest, indexName, mapper.mapToIndexSettings(tuple));
createMapping(bulkRequest, indexName, type, mapper.mapToMappingSettings(tuple));
existingIndex.add(indexName);
}
if (StringUtils.isBlank(parentId)) {
bulkRequest.add(client.prepareIndex(indexName, type, key).setSource(data));
} else {
Expand All @@ -89,32 +83,8 @@ public void createIndices(TridentElasticSearchMapper mapper, List<TridentTuple>
try {
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
// Index failed. Retry!
throw new FailedException("Cannot create index via ES: " + bulkResponse.buildFailureMessage());
}
} catch (ElasticSearchException e) {
StormElasticSearchUtils.handleElasticSearchException(getClass(), e);
}
}

private void createIndex(BulkRequestBuilder bulkRequest, String indexName, Settings indicesSettings) {
try {
if (indicesSettings != null) {
client.admin().indices().prepareCreate(indexName).setSettings(indicesSettings).execute().actionGet();
} else {
client.admin().indices().prepareCreate(indexName).execute().actionGet();
}
} catch (ElasticSearchException e) {
StormElasticSearchUtils.handleElasticSearchException(getClass(), e);
}
}

@SuppressWarnings("rawtypes")
private void createMapping(BulkRequestBuilder bulkRequest, String indexName, String indexType, Map json) {
try {
if (json != null) {
client.admin().indices().preparePutMapping(indexName).setType(indexType).setSource(json).execute()
.actionGet();
//TODO index failed. Figure out a better way to determine when to retry this message
LOGGER.error("Cannot execute bulk request: " + bulkResponse.buildFailureMessage());
}
} catch (ElasticSearchException e) {
StormElasticSearchUtils.handleElasticSearchException(getClass(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ private TridentElasticSearchMapper createMapper() {
json.put("message", "trying out Elastic Search");
Mockito.when(mapper.mapToData(Mockito.any(TridentTuple.class))).thenReturn(json);
Mockito.when(mapper.mapToParentId(Mockito.any(TridentTuple.class))).thenReturn(null);
Mockito.when(mapper.mapToIndexSettings(Mockito.any(TridentTuple.class))).thenReturn(null);
Mockito.when(mapper.mapToMappingSettings(Mockito.any(TridentTuple.class))).thenReturn(null);
return mapper;
}

Expand Down