Skip to content

Commit

Permalink
Elasticsearch: Make Elasticsearch job to ignore service(s) spans (jae…
Browse files Browse the repository at this point in the history
…gertracing#92)

Signed-off-by: Mohammed Ammer <[email protected]>
  • Loading branch information
mohammedalics committed Jun 11, 2020
1 parent 6333604 commit cc2a1a2
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Elasticsearch is used when `STORAGE=elasticsearch`.
* `ES_INDEX_PREFIX`: index prefix of Jaeger indices. By default unset.
* `ES_TIME_RANGE`: How far in the past the job should look to for spans, the maximum and default is `24h`.
Any value accepted by [date-math](https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#date-math) can be used here, but the anchor is always `now`.
* `ES_SERVICES_IGNORE`: Comma seperated (,) names of the services that will be ignored by the job (e.g. `serviceX,serviceY`).

Example usage:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -60,6 +62,7 @@ public static final class Builder {
Boolean nodesWanOnly = Boolean.parseBoolean(Utils.getEnv("ES_NODES_WAN_ONLY", "false"));
String indexPrefix = Utils.getEnv("ES_INDEX_PREFIX", null);
String spanRange = Utils.getEnv("ES_TIME_RANGE", "24h");
String servicesIgnore = Utils.getEnv("ES_SERVICES_IGNORE", null);

final Map<String, String> sparkProperties = new LinkedHashMap<>();

Expand Down Expand Up @@ -124,6 +127,12 @@ public Builder spanRange(String spanRange) {
return this;
}

/** Ignore services by name. By default empty */
public Builder servicesIgnore(String servicesIgnore) {
this.servicesIgnore = servicesIgnore;
return this;
}

/** Day to process dependencies for. Defaults to today. */
public Builder day(LocalDate day) {
this.day = day.atStartOfDay(ZoneOffset.UTC);
Expand Down Expand Up @@ -166,6 +175,7 @@ private static String getSystemPropertyAsFileResource(String key) {
private final SparkConf conf;
private final String indexPrefix;
private final String spanRange;
private final String servicesIgnore;

ElasticsearchDependenciesJob(Builder builder) {
this.day = builder.day;
Expand Down Expand Up @@ -195,6 +205,8 @@ private static String getSystemPropertyAsFileResource(String key) {
}
this.indexPrefix = builder.indexPrefix;
this.spanRange = builder.spanRange;
this.servicesIgnore = builder.servicesIgnore;

}

/**
Expand Down Expand Up @@ -228,10 +240,7 @@ void run(String[] spanIndices, String[] depIndices,String peerServiceTag) {
String spanIndex = spanIndices[i];
String depIndex = depIndices[i];
log.info("Running Dependencies job for {}, reading from {} index, result storing to {}", day, spanIndex, depIndex);
// Send raw query to ES to select only the docs / spans we want to consider for this job
// This doesn't change the default behavior as the daily indexes only contain up to 24h of data
String esQuery = String.format("{\"range\": {\"startTimeMillis\": { \"gte\": \"now-%s\" }}}", spanRange);
JavaPairRDD<String, Iterable<Span>> traces = JavaEsSpark.esJsonRDD(sc, spanIndex, esQuery)
JavaPairRDD<String, Iterable<Span>> traces = JavaEsSpark.esJsonRDD(sc, spanIndex, esQuery())
.map(new ElasticTupleToSpan())
.groupBy(Span::getTraceId);
List<Dependency> dependencyLinks = DependenciesSparkHelper.derive(traces,peerServiceTag);
Expand All @@ -254,6 +263,25 @@ void run(String[] spanIndices, String[] depIndices,String peerServiceTag) {
}
}

/**
* Create ElasticSearch query to be applied by the job while retrieving the spans.
* @return ElasticSearch query to be applied by the job while retrieving the spans.
*/
private String esQuery() {
// Send raw query to ES to select only the docs / spans we want to consider for this job
// This doesn't change the default behavior as the daily indexes only contain up to 24h of data
String esMustQuery = String.format("{\"range\": {\"startTimeMillis\": { \"gte\": \"now-%s\" }}}", spanRange);
String esMustNotQuery = "";
if (servicesIgnore != null) {
esMustNotQuery = Stream.of(servicesIgnore.split(","))
.filter(serviceName -> !serviceName.trim().isEmpty())
.map(serviceName -> String.format("{\"match_phrase\":{\"process.serviceName\":{\"query\":\"%s\"}}}", serviceName.trim()))
.collect(Collectors.toList())
.toString();
}
return String.format("{\"bool\":{\"must_not\":[%s],\"must\":[%s]}}", esMustNotQuery, esMustQuery);
}

private EsMajorVersion getEsVersion() {
RestClient client = new RestClient(new SparkSettings(conf));
try {
Expand Down

0 comments on commit cc2a1a2

Please sign in to comment.