Skip to content

Commit

Permalink
[INLONG-11352][SDK] Add dirty data collection sdk (#11354)
Browse files Browse the repository at this point in the history
Co-authored-by: AloysZhang <[email protected]>
Co-authored-by: Charles Zhang <[email protected]>
  • Loading branch information
3 people authored Oct 21, 2024
1 parent 899aeab commit 8c7d31d
Show file tree
Hide file tree
Showing 13 changed files with 1,093 additions and 0 deletions.
53 changes: 53 additions & 0 deletions inlong-sdk/dirty-data-sdk/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
## Overview

This SDK is used to collect dirty data and store it in a designated storage location.

## Features

### Independent SDK

Independent SDK, not dependent on platform specific libraries (such as Flink), can be used by Agent,
DataProxy, Sort modules.

### Scalable multiple data storage options

Dirty data can be stored in various different storage locations (currently only supports sending to
DataProxy).

## Usage

### Create DirtyDataCollector object

```java
Map<String, String> configMap = new ConcurrentHashMap<>();
// Enable dirty data collection
configMap.put(DIRTY_COLLECT_ENABLE, "true");
// If ignore error messages during dirty data collection
configMap.put(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, "true");
// The storage where dirty data will be stored currently only supports 'inlong',
// which means sending the data to DataSroxy
configMap.put(DIRTY_SIDE_OUTPUT_CONNECTOR, "inlong");
// The labels of dirty side-output, format is 'key1=value1&key2=value2'
configMap.put(DIRTY_SIDE_OUTPUT_LABELS, "key1=value1&key2=value2");
// The log tag of dirty side-output, it supports variable replace like '${variable}'.
configMap.put(DIRTY_SIDE_OUTPUT_LOG_TAG, "DirtyData");
Configure config = new Configure(configMap);

DirtyDataCollector collecter = new DirtyDataCollector();
collector.open(config);
```

### Collect dirty data

```java
// In fact, the dirty data we encounter is often parsed incorrectly,
// so we use byte [] as the format for dirty data.
byte[] dirtyData = "xxxxxxxxxyyyyyyyyyyyyyy".getBytes(StandardCharsets.UTF_8);
// Here, incorrect types can be marked, such as missing fields, type errors, or unknown errors, etc.
String dirtyType = "Undefined";
// Details of errors can be passed here.
Throwable error = new Throwable();
collector.invoke(dirtyData, dirtyType, error);
```

|
96 changes: 96 additions & 0 deletions inlong-sdk/dirty-data-sdk/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.inlong</groupId>
<artifactId>inlong-sdk</artifactId>
<version>2.1.0-SNAPSHOT</version>
</parent>
<artifactId>dirty-data-sdk</artifactId>
<name>Apache InLong - Dirty Data SDK</name>
<properties>
<inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>inlong-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sdk-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>dataproxy-sdk</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.2.0</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${plugin.compile.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${plugin.surefire.version}</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>${plugin.deploy.version}</version>
</plugin>
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dirtydata;

/**
* Connector base option constant
*/
public final class Constants {

public static final String DIRTY_COLLECT_ENABLE = "dirty.collect.enable";

public static final String DIRTY_SIDE_OUTPUT_CONNECTOR = "dirty.side-output.connector";

public static final String DIRTY_SIDE_OUTPUT_IGNORE_ERRORS = "dirty.side-output.ignore-errors";

/**
* The labels of dirty side-output, format is 'key1=value1&key2=value2'
* it supports variable replace like '${variable}'
* There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE]
* are currently supported,
* and the support of other variables is determined by the connector.
*/
public static final String DIRTY_SIDE_OUTPUT_LABELS = "dirty.side-output.labels";

/**
* The log tag of dirty side-output, it supports variable replace like '${variable}'.
* There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported,
* and the support of other variables is determined by the connector.
*/
public static final String DIRTY_SIDE_OUTPUT_LOG_TAG = "dirty.side-output.log-tag";

/**
* It is used for 'inlong.metric.labels' or 'sink.dirty.labels'
*/
public static final String DELIMITER = "&";

/**
* The delimiter of key and value, it is used for 'inlong.metric.labels' or 'sink.dirty.labels'
*/
public static final String KEY_VALUE_DELIMITER = "=";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dirtydata;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

/**
* Dirty data base class, it is a wrapper of dirty data
*/
public class DirtyData {

private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE";

private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE";
private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME";

private static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

/**
* The identifier of dirty data, it will be used for filename generation of file dirty sink,
* topic generation of mq dirty sink, tablename generation of database, etc,
* and it supports variable replace like '${variable}'.
* There are several system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported,
* and the support of other variables is determined by the connector.
*/
private final String identifier;
/**
* The labels of the dirty data, it will be written to store system of dirty
*/
private final String labels;
/**
* The log tag of dirty data, it is only used to format log as follows:
* [${logTag}] ${labels} ${data}
*/
private final String logTag;
/**
* Dirty type
*/
private final String dirtyType;
/**
* Dirty describe message, it is the cause of dirty data
*/
private final String dirtyMessage;
/**
* The real dirty data
*/
private final byte[] data;

public DirtyData(byte[] data, String identifier, String labels,
String logTag, String dirtyType, String dirtyMessage) {
this.data = data;
this.dirtyType = dirtyType;
this.dirtyMessage = dirtyMessage;
Map<String, String> paramMap = genParamMap();
this.labels = PatternReplaceUtils.replace(labels, paramMap);
this.logTag = PatternReplaceUtils.replace(logTag, paramMap);
this.identifier = PatternReplaceUtils.replace(identifier, paramMap);

}

public static Builder builder() {
return new Builder();
}

private Map<String, String> genParamMap() {
Map<String, String> paramMap = new HashMap<>();
paramMap.put(SYSTEM_TIME_KEY, DATE_TIME_FORMAT.format(LocalDateTime.now()));
paramMap.put(DIRTY_TYPE_KEY, dirtyType);
paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
return paramMap;
}

public String getLabels() {
return labels;
}

public String getLogTag() {
return logTag;
}

public byte[] getData() {
return data;
}

public String getDirtyType() {
return dirtyType;
}

public String getIdentifier() {
return identifier;
}

public static class Builder {

private String identifier;
private String labels;
private String logTag;
private String dirtyType = "UNDEFINED";
private String dirtyMessage;
private byte[] data;

public Builder setDirtyType(String dirtyType) {
this.dirtyType = dirtyType;
return this;
}

public Builder setLabels(String labels) {
this.labels = labels;
return this;
}

public Builder setData(byte[] data) {
this.data = data;
return this;
}

public Builder setLogTag(String logTag) {
this.logTag = logTag;
return this;
}

public Builder setDirtyMessage(String dirtyMessage) {
this.dirtyMessage = dirtyMessage;
return this;
}

public DirtyData build() {
return new DirtyData(data, identifier, labels, logTag, dirtyType, dirtyMessage);
}
}
}
Loading

0 comments on commit 8c7d31d

Please sign in to comment.