Skip to content

Commit

Permalink
Merge branch 'mainline' into mainline
Browse files Browse the repository at this point in the history
andrewyhwang authored Jun 4, 2024

Verified

This commit was signed with the committer’s verified signature.
HashEngineering HashEngineering
2 parents ebcd77b + 3508d6b commit 1b76b18
Showing 6 changed files with 387 additions and 40 deletions.
73 changes: 52 additions & 21 deletions integrations/kafka_connector/README.md

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions integrations/kafka_connector/pom.xml
Original file line number Diff line number Diff line change
@@ -124,6 +124,13 @@
<version>5.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<!-- <version>6.6.0</version> -->
<version>6.6.0</version>
</dependency>

</dependencies>

<build>
@@ -136,6 +143,11 @@
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
<!-- for debugging only -->
<!-- <addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.mypackage.MyClass</mainClass> -->
<!-- end for debugging -->
</manifest>
</archive>
</configuration>
Original file line number Diff line number Diff line change
@@ -137,7 +137,40 @@ public static ConfigDef conf() {
ConfigDef.Type.STRING,
"org.apache.kafka.common.serialization.StringSerializer",
ConfigDef.Importance.LOW,
"Serializer class for Value");
"Serializer class for Value")
/// INFLUXDB
.define(TimestreamSinkConstants.LIVE_ANALYTICS_ENABLE,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.HIGH,
"LiveAnalytics Ingestion Enabled")
.define(TimestreamSinkConstants.INFLUXDB_ENABLE,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.HIGH,
"InfluxDB Ingestion Enabled")
.define(TimestreamSinkConstants.INFLUXDB_BUCKET,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
"InfluxDB Target Bucket")
.define(TimestreamSinkConstants.INFLUXDB_URL,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
"InfluxDB API URL")
.define(TimestreamSinkConstants.INFLUXDB_TOKEN,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
"InfluxDB API Token")
.define(TimestreamSinkConstants.INFLUXDB_ORG,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
"InfluxDB Organization")
////////////////////////////////
;
}

/**
@@ -166,6 +199,69 @@ public Region getAWSRegion() {
}
}

////// INFLUXDB Configs
public boolean isLiveAnalyticsEnabled() {
try {
return getBoolean(TimestreamSinkConstants.LIVE_ANALYTICS_ENABLE);
} catch (final ConfigException ex) {
final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG,
TimestreamSinkConstants.LIVE_ANALYTICS_ENABLE, ex);
throw new TimestreamSinkConnectorException(error, ex);
}
}

public boolean isInfluxDBEnabled() {
try {
return getBoolean(TimestreamSinkConstants.INFLUXDB_ENABLE);
} catch (final ConfigException ex) {
final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG,
TimestreamSinkConstants.INFLUXDB_ENABLE, ex);
throw new TimestreamSinkConnectorException(error, ex);
}
}

public String getInfluxDBBucket() {
try {
return getString(TimestreamSinkConstants.INFLUXDB_BUCKET);
} catch (final ConfigException ex) {
final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG,
TimestreamSinkConstants.INFLUXDB_BUCKET, ex);
throw new TimestreamSinkConnectorException(error, ex);
}
}

public String getInfluxDBUrl() {
try {
return getString(TimestreamSinkConstants.INFLUXDB_URL);
} catch (final ConfigException ex) {
final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG,
TimestreamSinkConstants.INFLUXDB_URL, ex);
throw new TimestreamSinkConnectorException(error, ex);
}
}

public String getInfluxDBToken() {
try {
return getString(TimestreamSinkConstants.INFLUXDB_TOKEN);
} catch (final ConfigException ex) {
final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG,
TimestreamSinkConstants.INFLUXDB_TOKEN, ex);
throw new TimestreamSinkConnectorException(error, ex);
}
}
public String getInfluxDBOrg() {
try {
return getString(TimestreamSinkConstants.INFLUXDB_ORG);
} catch (final ConfigException ex) {
final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG,
TimestreamSinkConstants.INFLUXDB_ORG, ex);
throw new TimestreamSinkConnectorException(error, ex);
}
}


////////////////////////////////////////////////////////

/**
*
* @return The maximum number of retry attempts for retryable errors.
Original file line number Diff line number Diff line change
@@ -128,4 +128,15 @@ public class TimestreamSinkConstants {
*/
public static final String CONST_DLQ_CLIENT_ID = "DLQPublisher";

public static final String LIVE_ANALYTICS_ENABLE = "timestream.target.liveanalytics";

public static final String INFLUXDB_ENABLE = "timestream.target.influxdb";

public static final String INFLUXDB_BUCKET = "timestream.influxdb.bucket";

public static final String INFLUXDB_URL = "timestream.influxdb.url";

public static final String INFLUXDB_TOKEN = "timestream.influxdb.token";

public static final String INFLUXDB_ORG = "timestream.influxdb.org";
}
Original file line number Diff line number Diff line change
@@ -120,4 +120,11 @@ public class TimestreamSinkErrorCodes {
*/
public static final String INVALID_MEASURE_VALUE ="invalid.measure.value";

/**
* Error code: no target engine specified (InfluxDB or LiveAnalytics)
*/
public static final String NO_INGESTION_TARGET ="No ingestion engine specified, enable either LiveAnalytics or InfluxDB";

//// INFLUX DB
// #TODO fix later to add detailed errors
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package software.amazon.timestream.utility;

import com.influxdb.client.WriteApiBlocking;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -17,6 +18,16 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

// Influx:
import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxTable;

/**
* Class that receives the non-empty Kafka messages as {@link SinkRecord}
* objects and writes to Timestream table as records
@@ -60,6 +71,18 @@ public class TimestreamWriter {
*/
private final DataModel schemaDefinition;

//// INFLUXDB
private final Boolean liveAnalyticsEnabled;
private final Boolean influxDBEnabled;
private final String influxDBBucket;
private final String influxDBUrl;
private final String influxDBToken;
private final String influxDBOrg;
private static InfluxDBClient influxDBClient = null;
private static WriteApiBlocking influxWriteApi = null;

////////////////////////

/**
*
* @param schemaDefinition table schema
@@ -73,6 +96,46 @@ public TimestreamWriter(final DataModel schemaDefinition, final TimestreamSinkCo
this.skipDimension = config.isSkipEmptyDimensions();
this.skipMeasure = config.isSkipEmptyMeasures();
this.schemaDefinition = schemaDefinition;

// InfluxDB
this.liveAnalyticsEnabled = config.isLiveAnalyticsEnabled();
this.influxDBEnabled = config.isInfluxDBEnabled();
this.influxDBBucket = config.getInfluxDBBucket();
this.influxDBUrl = config.getInfluxDBUrl();
this.influxDBToken = config.getInfluxDBToken();
this.influxDBOrg = config.getInfluxDBOrg();

if (!this.influxDBEnabled && !this.liveAnalyticsEnabled) {
LOGGER.error("ERROR::TimeStreamWriter:: initialization failed on : [{}]", TimestreamSinkErrorCodes.NO_INGESTION_TARGET);

}

if (this.influxDBEnabled) {
this.influxDBClient = getInfluxDBClient(
this.influxDBUrl, this.influxDBToken, this.influxDBBucket, this.influxDBOrg
);
if (this.influxDBClient != null) {
LOGGER.info("INFO::TimeStreamWriter:: influxDB client successfull connected: [{}] [{}] [{}]",
this.influxDBUrl, this.influxDBBucket, influxDBOrg
);
this.influxWriteApi = getInfluxDBWriteApi(this.influxDBClient);
if (this.influxWriteApi == null) {
LOGGER.error("ERROR::TimeStreamWriter:: influxDB writer API successfull connected: [{}] [{}] [{}]",
this.influxDBUrl, this.influxDBBucket, influxDBOrg
);
}
else {
LOGGER.info("INFO::TimeStreamWriter:: influxDB writer API successfull connected: [{}] [{}] [{}]",
this.influxDBUrl, this.influxDBBucket, influxDBOrg
);
}
}
else {
LOGGER.error("ERROR::TimeStreamWriter:: getInfluxDBClient failed on : [{}]", this.influxDBUrl);
}
}

/////////////////////////
}

/**
@@ -81,34 +144,62 @@ public TimestreamWriter(final DataModel schemaDefinition, final TimestreamSinkCo
* @param sinkRecords List of incoming records from the source Kafka topic
*/
public List<RejectedRecord> writeRecords(final AWSServiceClientFactory clientFactory, final Collection<SinkRecord> sinkRecords) {

LOGGER.trace("Begin::TimeStreamWriter::writeRecords");
final List<RejectedRecord> rejectedRecords = new ArrayList<>();
final List<Record> records = getTimestreamRecordsFromSinkRecords(sinkRecords, rejectedRecords);
if (!records.isEmpty()) {
final int batchSize = records.size() / TimestreamSinkConstants.DEFAULT_BATCHSIZE + 1;
List<Record> batchRecords = null;
for (int currentBatch = 0; currentBatch < batchSize; currentBatch ++) {
try {
if (!this.influxDBEnabled && !this.liveAnalyticsEnabled) {
// no target specified, cannot write, send records to DLQ
batchRecords = getBatchRecords(records, currentBatch);
if (batchRecords != null && !batchRecords.isEmpty()) {
final WriteRecordsRequest writeRequest = WriteRecordsRequest.builder()
.databaseName(databaseName)
.tableName(tableName)
.records(batchRecords)
.build();
final WriteRecordsResponse writeResponse = clientFactory.getTimestreamClient().writeRecords(writeRequest);
LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: batch size [{}], status [{}] ", batchRecords.size(), writeResponse.sdkHttpResponse().statusCode());
} else {
LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: Batch ingestion is complete for the records of size [{}] ", records.size());
for (Record record : batchRecords) {
RejectedRecord rejectedRecord = new RejectedRecord(record,TimestreamSinkErrorCodes.NO_INGESTION_TARGET);
rejectedRecords.add(rejectedRecord);
}
} catch (RejectedRecordsException e) {
LOGGER.error("ERROR::TimeStreamWriter::writeRecords: Few records have been rejected in the batch [{}] , due to [{}]", currentBatch, e.getLocalizedMessage());
if (e.hasRejectedRecords()) {
rejectedRecords.addAll(getRejectedTimestreamRecords(e.rejectedRecords(), batchRecords));
LOGGER.error("ERROR::TimeStreamWriter::writeRecords: Records have been rejected in the batch [{}] , due to [{}]", currentBatch, TimestreamSinkErrorCodes.NO_INGESTION_TARGET);
}
else {
try {
batchRecords = getBatchRecords(records, currentBatch);
if (batchRecords != null && !batchRecords.isEmpty()) {
if (this.liveAnalyticsEnabled) {

final WriteRecordsRequest writeRequest = WriteRecordsRequest.builder()
.databaseName(databaseName)
.tableName(tableName)
.records(batchRecords)
.build();
final WriteRecordsResponse writeResponse = clientFactory.getTimestreamClient().writeRecords(writeRequest);
LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: batch size [{}], status [{}] ", batchRecords.size(), writeResponse.sdkHttpResponse().statusCode());
} else {
LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: LiveAnalytics disabled");
}
if (this.influxDBEnabled && influxWriteApi != null) {
LOGGER.info("INFO::TimeStreamWriter::writeRecords: InfluxDB writing {} records", batchRecords.size());

final ArrayList<Point> pointList = convertLiveAnalyticsRecord(batchRecords);
// enhance here
influxWriteApi.writePoints(pointList);

/* // writing one record at time only
convertAndWriteLiveAnalyticsRecord(batchRecords);
*/
} else {
LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: InfluxDB disabled");
}
} else {
LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: Batch ingestion is complete for the records of size [{}] ", records.size());
}
} catch (RejectedRecordsException e) {
LOGGER.error("ERROR::TimeStreamWriter::writeRecords: Few records have been rejected in the batch [{}] , due to [{}]", currentBatch, e.getLocalizedMessage());
if (e.hasRejectedRecords()) {
rejectedRecords.addAll(getRejectedTimestreamRecords(e.rejectedRecords(), batchRecords));
}
} catch (SdkException e) {
LOGGER.error("ERROR::TimeStreamWriter::writeRecords", e);
}
} catch (SdkException e) {
LOGGER.error("ERROR::TimeStreamWriter::writeRecords", e);
}
}
}
@@ -373,4 +464,103 @@ private List<Record> getBatchRecords(final List<Record> allRecords, final int cu
}
return batch;
}

// InfluxDB enhancements

public static InfluxDBClient getInfluxDBClient(String url, String token, String bucket, String org) {
if (influxDBClient != null) return influxDBClient;

InfluxDBClient client = InfluxDBClientFactory.create(
url,
token.toCharArray(), org, bucket);

influxDBClient = client;

return influxDBClient;
}

public static WriteApiBlocking getInfluxDBWriteApi(InfluxDBClient client) {
if (influxWriteApi==null) {
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
influxWriteApi = writeApi;
}
return influxWriteApi;
}

private ArrayList<Point> convertLiveAnalyticsRecord(List<Record> recordList){
ArrayList<Point> pointList = new ArrayList<Point>();
for (final Record record : recordList) {
// LOGGER.trace("Sink Record: {} ", record);

List<Dimension> dimensions = record.dimensions();
List<MeasureValue> measureValues = record.measureValues();
String measureName = record.measureName();
Long time = Long.parseLong(record.time());

Point point = Point
.measurement(measureName)
.time(time, WritePrecision.MS);

for (Dimension dimension : dimensions) {
String key = dimension.name();
String value = dimension.value();
point = point.addTag(key, value);
}

for (MeasureValue measureValue : measureValues) {
String key = measureValue.name();
String value = measureValue.value();
MeasureValueType type = measureValue.type();

switch (type) {
case TIMESTAMP: // not existent in InfluxDB, use Long for now
case BIGINT:
Long value_l = Long.parseLong(value);
point = point.addField(key, value_l);
break;
case DOUBLE:
Double value_d = Double.parseDouble(value);
point = point.addField(key,value_d);
break;
case BOOLEAN:
Boolean valued_b = Boolean.parseBoolean(value);
point = point.addField(key, valued_b);
case VARCHAR:
point = point.addField(key, value);
break;
}
}

/* // STATIC from InfluxDB example code
Point point = Point
.measurement("mem")
.addTag("host", "host1")
.addField("used_percent", 23.43234543)
.time(1714180752000L, WritePrecision.MS);
// replace above with data from SinkRecord
*/

LOGGER.info("Complete::TimeStreamWriter::convertLiveAnalyticsRecord: line protocol [{}]", point.toLineProtocol());

pointList.add(point);
}
return pointList;
}

private void convertAndWriteLiveAnalyticsRecord(List<Record> recordList){
for (final Record record : recordList) {
// LOGGER.trace("Sink Record: {} ", record);
Point point = Point
.measurement("mem")
.addTag("host", "host1")
.addField("used_percent", 23.43234543)
.time(1714180752000L, WritePrecision.MS);
// replace above with data from SinkRecord
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
writeApi.writePoint(influxDBBucket, influxDBOrg, point);
}
}

//////////////////

}

0 comments on commit 1b76b18

Please sign in to comment.