-
Notifications
You must be signed in to change notification settings - Fork 28
Kafka Topics
This page contains information about the organisation of the FASTEN Kafka cluster and the formats of the messages that we write to it.
- Kafka runs on a 3 node cluster, consisting of machines
delft
,samos
andgoteborg
, on port9092
. Any client can connect on any of the machines. Zookeeper (a requirement for Kafka) runs on the same machines on port2181
. - Kafka is configured with 2-way replication: each topic partition is replicated on 2 random machines.
- All contents in the FASTEN Kafka Topics is formatted in JSON.
- All FASTEN topics are prefixed with
fasten.*
. Do not touch other topics on the same cluster.
-
The number of partitions dictate how many parallel consumers can read from it. For topics that need high parallelism downstream (e.g., call graph generation), consider at least 30 partitions.
-
Too many partitions will mean that Kafka will spend too much time rebalancing consumer groups if a processing node fails. To avoid cascading failures consider increasing the
max.poll.interval.ms
in your consumer group configurations. Also, choose the number of partitions wisely. -
Avoid replication for ephemeral topics, i.e., one-off topics, debug topics etc.
Processing in FASTEN is triggered by new data arriving on a Kafka topic. The input topics are therefore at the beginning of the processing chain.
Contains information about Maven artefact co-ordinates, collected by the libraries.io project, version 1.6.0.
{
"groupId": "org.glassfish.jersey.ext.rx", // Maven group id
"artifactId": "jersey-rx-client-rxjava2", // Maven artifact id
"version": "2.26-b07", // Version
"date": 1498824450 // Publication timestamp (on Maven)
}
Contains information about Maven artefact co-ordinates, collected by the FASTEN Maven crawler
{
"groupId": "com.worldpay.api.client", // Maven group id
"artifactId": "worldpay-client-common", // Maven artifact id
"version": "0.0.5", // Version
"date": 1446840840, // Publication timestamp (on Maven)
"url": "https://repo1.maven.org/[...]/worldpay-client-common-0.0.15.pom" // URL of the POM file
}
FASTEN plug-ins process data from input topics (👆) or other plug-ins and produce 2 outputs:
-
fasten.<plugin-name>.out
: Normal output -
fasten.<plugin-name>.err
: Output in case an error occurred
To enable plug-ins to efficiently exchange information, all plug-ins use specific message formats (👇) for their .out
and .err
topics, respectively. The actual plug-in output or error message must be contained in the payload
field.
The input
field contains a copy of the input message that lead to the particular output. This is to help debugging and enable traceability across the production chain. As input messages can be very big (e.g., callgraphs), the processing plug-in must judiciously shave the contents of the input
field with the goal of minimising the size of the total message, while not loosing any traceability information.
{
"plugin_name": "PluginName",
"plugin_version": "0.0.1",
"input": {...},
"created_at": 234,
"payload": {...}
}
{
"plugin_name": "PluginName",
"plugin_version": "0.0.1",
"input": {...},
"created_at": 123,
"err": {...}
}
Input: fasten.mvn.full.rnd
or fasten.mvn.pkg
Description: The POM Analyzer plug-in consumes Maven co-ordinates, analyzes its pom.xml
file, extracts from it all the relevant information (dependencies
, dependencyManagement
, repository URL, commit tag and Maven sources link), saves this information in the Metadata database and produces it to the Kafka topic (fasten.POMAnalyzer.out
).
Example output message:
{
"input": {
"artifactId": "junit",
"groupId": "junit",
"version": "4.12"
},
"plugin_version":"0.0.1",
"payload":{
"artifactId": "junit",
"groupId": "junit",
"version": "4.12",
"dependendencyData": {
"dependencyManagement":{
"dependencies":[]
},
"dependencies":[
{
"versionConstraints":[
{
"isUpperHardRequirement":false,
"isLowerHardRequirement":false,
"upperBound":"1.3",
"lowerBound":"1.3"
}
],
"groupId":"org.hamcrest",
"scope":"",
"classifier":"",
"artifactId":"hamcrest-core",
"exclusions":[],
"optional":false,
"type":""
}
]
},
"repoUrl": "http://github.com/junit-team/junit/tree/master",
"commitTag": "",
"sourcesUrl": "https://repo.maven.apache.org/maven2/junit/junit/4.12/junit-4.12-sources.jar",
"packagingType": "jar",
"projectName": "JUnit",
"date": 1594820036478
},
"created_at": 1594820036478,
"plugin_name": "POMAnalyzer"
}
Example error message
{
"input":{
"date":"1569806220",
"groupId":"com.yahoo.vespa",
"artifactId":"bundle-plugin",
"version":"7.112.6",
"url":"https://repo1.maven.org/maven2/com/yahoo/vespa/bundle-plugin/7.112.6/bundle-plugin-7.112.6.pom"
},
"plugin_version":"0.0.1",
"err":{
"msg":"SQL [insert into \"public\".\"packages\" (\"package_name\", \"forge\", \"project_name\", \"repository\", \"created_at\") values (?, ?, ?, ?, cast(? as timestamp)) on conflict on constraint \"unique_package_forge\" do update set \"project_name\" = \"excluded\".\"project_name\", \"repository\" = \"excluded\".\"repository\", \"created_at\" = \"excluded\".\"created_at\" returning \"public\".\"packages\".\"id\"]; ERROR: deadlock detected\n Detail: Process 30868 waits for ShareLock on transaction 2399178; blocked by process 67968.\nProcess 67968 waits for ExclusiveLock on tuple (645,172) of relation 24015 of database 24012; blocked by process 65504.\nProcess 65504 waits for ShareLock on transaction 2399170; blocked by process 30868.\n Hint: See server log for query details.\n Where: while inserting index tuple (837,70) in relation \"packages\"",
"stacktrace":[
"org.jooq_3.12.3.POSTGRES.debug(Unknown Source)",
"org.jooq.impl.Tools.translate(Tools.java:2717)",
"org.jooq.impl.DefaultExecuteContext.sqlException(DefaultExecuteContext.java:755)",
"org.jooq.impl.AbstractQuery.execute(AbstractQuery.java:382)",
"org.jooq.impl.InsertImpl.fetchOne(InsertImpl.java:1059)",
"eu.fasten.core.data.metadatadb.MetadataDao.insertPackage(MetadataDao.java:80)",
"eu.fasten.analyzer.pomanalyzer.POMAnalyzerPlugin$POMAnalyzer.saveToDatabase(POMAnalyzerPlugin.java:167)",
"eu.fasten.analyzer.pomanalyzer.POMAnalyzerPlugin$POMAnalyzer.lambda$consume$0(POMAnalyzerPlugin.java:116)",
"org.jooq.impl.DefaultDSLContext$3.run(DefaultDSLContext.java:608)",
"org.jooq.impl.DefaultDSLContext$3.run(DefaultDSLContext.java:605)",
"org.jooq.impl.DefaultDSLContext.lambda$transactionResult0$0(DefaultDSLContext.java:536)",
"org.jooq.impl.Tools$12$1.block(Tools.java:4843)",
"java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128)",
"org.jooq.impl.Tools$12.get(Tools.java:4840)",
"org.jooq.impl.DefaultDSLContext.transactionResult0(DefaultDSLContext.java:588)",
"org.jooq.impl.DefaultDSLContext.transactionResult(DefaultDSLContext.java:505)",
"org.jooq.impl.DefaultDSLContext.transaction(DefaultDSLContext.java:605)",
"eu.fasten.analyzer.pomanalyzer.POMAnalyzerPlugin$POMAnalyzer.consume(POMAnalyzerPlugin.java:112)",
"eu.fasten.server.plugins.kafka.FastenKafkaPlugin.handleConsuming(FastenKafkaPlugin.java:153)",
"eu.fasten.server.plugins.kafka.FastenKafkaPlugin.run(FastenKafkaPlugin.java:105)",
"java.base/java.lang.Thread.run(Thread.java:834)"
],
"error":"DataAccessException"
},
"created_at":1595238867,
"plugin_name":"POMAnalyzer"
}
Input: fasten.mvn.full.rnd
or fasten.mvn.pkg
Description: The OPAL plug-in consumes Maven co-ordinates and produces call graphs using the OPAL call graph generator.
Example output message (payload
truncated. Full format description is available here)
{
"input":{
"date":1476224484,
"groupId":"com.twitter",
"artifactId":"inject-core_2.11",
"version":"2.5.0"
},
"plugin_version":"0.0.1",
"payload":{
"product":"com.twitter:inject-core_2.11",
"nodes":478,
"forge":"mvn",
"generator":"OPAL",
"version":"2.5.0",
"cha":{
"externalTypes":{
"/com.google.inject/AbstractModule":{
"methods":{
"444":{
"metadata":{},
"uri":"/com.google.inject/AbstractModule.install(Module)%2Fjava.lang%2FVoidType"
}
},
"superInterfaces":[],
"sourceFile":"",
"superClasses":[]
}
}
}
},
"created_at": 1594820036478,
"plugin_name": "OPAL"
}
Example error message
{
"input":{
"date":1411660344,
"groupId":"org.commonjava.aprox",
"artifactId":"aprox-db",
"version":"0.16.0"
},
"plugin_version":"0.0.1",
"err":{
"msg":"https://repo.maven.apache.org/maven2/org/commonjava/aprox/aprox-db/0.16.0/aprox-db-0.16.0.jar",
"stacktrace":[
"java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1915)",
"java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1515)",
"java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getInputStream(HttpsURLConnectionImpl.java:250)",
"java.base/java.net.URL.openStream(URL.java:1140)",
"eu.fasten.analyzer.javacgopalv3.data.MavenCoordinate$MavenResolver.httpGetToFile(MavenCoordinate.java:192)",
"eu.fasten.analyzer.javacgopalv3.data.MavenCoordinate$MavenResolver.downloadJar(MavenCoordinate.java:172)",
"eu.fasten.analyzer.javacgopalv3.data.PartialCallGraph.createExtendedRevisionCallGraph(PartialCallGraph.java:98)",
"eu.fasten.analyzer.javacgopalv3.OPALV3Plugin$OPALV3.generateCallGraph(OPALV3Plugin.java:105)",
"eu.fasten.analyzer.javacgopalv3.OPALV3Plugin$OPALV3.consume(OPALV3Plugin.java:67)",
"eu.fasten.server.plugins.kafka.FastenKafkaPlugin.handleConsuming(FastenKafkaPlugin.java:153)",
"eu.fasten.server.plugins.kafka.FastenKafkaPlugin.run(FastenKafkaPlugin.java:105)",
"java.base/java.lang.Thread.run(Thread.java:834)"
],
"error":"FileNotFoundException"
},
"created_at":1594818677,
"plugin_name":"OPAL"
}
Input: fasten.POMAnalyzer.out
Description: Repo Cloner plug-in consumes a repository URL, clones the repo to the provided directory building directory hierarchy, and produces the path to directory with the repository.
Example output message (truncated because dependencyData
is not relevant, only repoUrl
is needed)
{
"input":{
"artifactId": "junit",
"groupId": "junit",
"version": "4.12",
"dependendencyData": {
...
},
"repoUrl": "http://github.com/junit-team/junit/tree/master",
"commitTag": ""
},
"plugin_version":"0.0.1",
"payload":{
"artifactId": "junit",
"groupId": "junit",
"version": "4.12",
"repoPath": "/j/junit/junit"
},
"created_at": 1594820036478,
"plugin_name": "RepoCloner"
}
Example error message
{
"input":{
"payload":{
"repoUrl":"[email protected]:knowm/XChart.git",
"groupId":"org.knowm.xchart",
"artifactId":"xchart-parent"
}
},
"plugin_version":"0.0.1",
"err":{
"msg":"[email protected]:knowm/XChart.git: remote hung up unexpectedly",
"stacktrace":[
"org.eclipse.jgit.api.FetchCommand.call(FetchCommand.java:222)",
"org.eclipse.jgit.api.CloneCommand.fetch(CloneCommand.java:292)",
"org.eclipse.jgit.api.CloneCommand.call(CloneCommand.java:176)",
"eu.fasten.analyzer.repoclonerplugin.utils.GitCloner.cloneRepo(GitCloner.java:58)",
"eu.fasten.analyzer.repoclonerplugin.RepoClonerPlugin$RepoCloner.cloneRepo(RepoClonerPlugin.java:146)",
"eu.fasten.analyzer.repoclonerplugin.RepoClonerPlugin$RepoCloner.consume(RepoClonerPlugin.java:126)",
"eu.fasten.server.plugins.kafka.FastenKafkaPlugin.handleConsuming(FastenKafkaPlugin.java:153)",
"eu.fasten.server.plugins.kafka.FastenKafkaPlugin.run(FastenKafkaPlugin.java:105)",
"java.base/java.lang.Thread.run(Thread.java:834)"
],
"error":"TransportException"
},
"created_at":1595502407,
"plugin_name":"RepoCloner"
}
Input: fasten.OPAL.out
or fasten.OPALV3.out
Description: Metadata plug-in consumes (Extended-) RevisionCallgraph-formatted JSON objects from Kafka topic and populates a metadata database with consumed data and produces a GID Graph to fasten.MetadataDBExtension.out
topic.
Example output message (truncated)
{
"input":{
"product":"io.github.jlike.jlike",
"forge":"mvn",
"generator":"OPAL",
"depset":[
[
{
"product":"com.fasterxml.jackson.datatype.jackson-datatype-hibernate5",
"forge":"mvn",
"constraints":[
"[*]"
]
},
...
]
],
"version":"1.1.0",
"cha":{
"/io.github.jlike.domain.util/JSR310DateConverters$LocalDateTimeToDateConverter":{
"methods":{
"245":"/io.github.jlike.domain.util/JSR310DateConverters$LocalDateTimeToDateConverter.%3Cinit%3E()%2Fjava.lang%2FVoidType",
...
},
"superInterfaces":[
"/org.springframework.core.convert.converter/Converter"
],
"sourceFile":"JSR310DateConverters.java",
"superClasses":[
"/java.lang/Object"
]
},
...
},
"graph":{
"internalCalls":[
[
270,
142
],
...
[
34,
31
]
],
"externalCalls":[
[
"161",
"///java.lang/Object.Object()VoidType",
{
"invokespecial":"1"
}
],
...
]
},
"timestamp":1489597878
},
"plugin_version":"0.0.1",
"payload":{
"product":"io.github.jlike.jlike",
"nodes":[
340250637,
...
17826
],
"edges":[
[
340250534,
340250531
],
...
[
340250491,
17826
]
],
"index":198949,
"version":"1.1.0",
"numInternalNodes":286
},
"created_at":1595254611308,
"plugin_name":"MetadataDBExtension"
}
Example error message (truncated)
{
"input":{
"product":"com.yammer.tenacity.tenacity-core",
"forge":"mvn",
"generator":"OPAL",
"depset":[
[
{
"product":"io.dropwizard.dropwizard-core",
"forge":"mvn",
"constraints":[
"[*]"
]
},
...
]
],
"version":"1.1.0",
"cha":{
"/com.yammer.tenacity.core.core/TenacityPredicates":{
"methods":{
"232":"/com.yammer.tenacity.core.core/TenacityPredicates.TenacityPredicates()%2Fjava.lang%2FVoidType",
"233":"/com.yammer.tenacity.core.core/TenacityPredicates.isEqualTo(%2Fcom.yammer.tenacity.core.properties%2FTenacityPropertyKey)%2Fcom.google.common.base%2FPredicate"
},
"superInterfaces":[
],
"sourceFile":"TenacityPredicates.java",
"superClasses":[
"/java.lang/Object"
]
},
...
"/com.yammer.tenacity.core.bundle/TenacityBundleBuilder":{
"methods":{
"110":"/com.yammer.tenacity.core.bundle/TenacityBundleBuilder.TenacityBundleBuilder()%2Fjava.lang%2FVoidType",
...
"118":"/com.yammer.tenacity.core.bundle/TenacityBundleBuilder.withCircuitBreakerHealthCheck()TenacityBundleBuilder"
},
"superInterfaces":[
],
"sourceFile":"TenacityBundleBuilder.java",
"superClasses":[
"/java.lang/Object"
]
}
},
"graph":{
"internalCalls":[
[
274,
273
],
...
[
364,
42
]
],
"externalCalls":[
[
"0",
"///com.netflix.hystrix.exception/HystrixRuntimeException$FailureType.ordinal()%2Fjava.lang%2FIntegerType",
{
"invokevirtual":"6"
}
],
...
[
"181",
"///com.google.common.base/Optional.fromNullable(%2Fjava.lang%2FObject)Optional",
{
"invokestatic":"1"
}
]
]
},
"timestamp":1490125728
},
"plugin_version":"0.0.1",
"err":{
"msg":"SQL [insert into \"public\".\"callables\" (\"module_id\", \"fasten_uri\", \"is_internal_call\", \"created_at\", \"metadata\") values (?, ?, ?, cast(? as timestamp), cast(? as jsonb)) on conflict on constraint \"unique_uri_call\" do update set \"module_id\" = \"excluded\".\"module_id\", \"created_at\" = \"excluded\".\"created_at\", \"metadata\" = \"public\".\"callables\".\"metadata\" || \"excluded\".\"metadata\" returning \"public\".\"callables\".\"id\"]; ERROR: deadlock detected\n Detail: Process 66386 waits for ShareLock on transaction 2403435; blocked by process 68351.\nProcess 68351 waits for ShareLock on transaction 2403434; blocked by process 66386.\n Hint: See server log for query details.\n Where: while inserting index tuple (3425223,15) in relation \"callables\"",
"stacktrace":[
"org.jooq_3.12.3.POSTGRES.debug(Unknown Source)",
"org.jooq.impl.Tools.translate(Tools.java:2717)",
"org.jooq.impl.DefaultExecuteContext.sqlException(DefaultExecuteContext.java:755)",
"org.jooq.impl.AbstractQuery.execute(AbstractQuery.java:382)",
"org.jooq.impl.InsertImpl.fetchOne(InsertImpl.java:1059)",
"eu.fasten.analyzer.metadataplugin.db.MetadataDao.insertCallable(MetadataDao.java:481)",
"eu.fasten.analyzer.metadataplugin.MetadataDatabasePlugin$MetadataDBExtension.saveToDatabase(MetadataDatabasePlugin.java:251)",
"eu.fasten.analyzer.metadataplugin.MetadataDatabasePlugin$MetadataDBExtension.lambda$consume$0(MetadataDatabasePlugin.java:136)",
"org.jooq.impl.DefaultDSLContext$3.run(DefaultDSLContext.java:608)",
"org.jooq.impl.DefaultDSLContext$3.run(DefaultDSLContext.java:605)",
"org.jooq.impl.DefaultDSLContext.lambda$transactionResult0$0(DefaultDSLContext.java:536)",
"org.jooq.impl.Tools$12$1.block(Tools.java:4843)",
"java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128)",
"org.jooq.impl.Tools$12.get(Tools.java:4840)",
"org.jooq.impl.DefaultDSLContext.transactionResult0(DefaultDSLContext.java:588)",
"org.jooq.impl.DefaultDSLContext.transactionResult(DefaultDSLContext.java:505)",
"org.jooq.impl.DefaultDSLContext.transaction(DefaultDSLContext.java:605)",
"eu.fasten.analyzer.metadataplugin.MetadataDatabasePlugin$MetadataDBExtension.consume(MetadataDatabasePlugin.java:132)",
"eu.fasten.server.plugins.kafka.FastenKafkaPlugin.handleConsuming(FastenKafkaPlugin.java:153)",
"eu.fasten.server.plugins.kafka.FastenKafkaPlugin.run(FastenKafkaPlugin.java:105)",
"java.base/java.lang.Thread.run(Thread.java:834)"
],
"error":"DataAccessException"
},
"created_at":1595254102,
"plugin_name":"MetadataDBExtension"
}
Input: fasten.MetadataDBExtension.out
Description: Graph plug-in consumes a GID Graph produced by the metadata plugin and populates graph database (RocksDB) with consumed data.
Example output message (truncated)
{
"input":{
"product":"org.wso2.carbon.apimgt.org.wso2.carbon.apimgt.api",
"nodes":[
317953184,
...
25223
],
"edges":[
[
317953063,
317952568
],
...
[
317951606,
25223
]
],
"index":184323,
"version":"6.5.213",
"numInternalNodes":1699
},
"plugin_version":"0.0.1",
"payload":"",
"created_at":1594921676150,
"plugin_name":"GraphDBExtension"
}
Input: cf_pypi_releases
Description: Consumes PyPI packaging information and produces unique package-version tuples along with metadata.
Example output message (truncated)
{
"product": "static-frame",
"version": "0.1.2",
"version_timestamp": 1525912010,
"requires_dist": [
{
"forge": "PyPI",
"product": "numpy",
"constraints": [
"[1.16.5..]"
]
},
...
]
}
Input: fasten.pypi_filter.out
Description: Consumes package-version tuples along with metadata, downloads their source code and generates their call graphs using PyCG.
Example output message (truncated)
{
"payload": {
"product": "static-frame",
"forge": "PyPI",
"generator": "PyCG",
"depset": [
...
],
"version": "0.2.2",
"timestamp": "1548267898",
"modules": {
"/static_frame/": {
"sourceFile": "static_frame/__init__.py",
"namespaces": {
"0": {
"namespace": "/static_frame/",
"metadata": {
"first": 1,
"last": 49
}
},
...
}
},
...
},
"cha": {
"249": [242, 200],
...
},
"graph": {
"internalCalls": [
[469, 468],
...
],
"externalCalls": [
[467, "//numpy//numpy.reshape"],
...
]
},
"metadata": {
"loc": 7058,
"time_elapsed": 0.89,
"max_rss": 36568,
"num_files": 12
}
},
"plugin_name": "PyCG",
"plugin_version": "0.0.1",
"input": {...},
"created_at": 1595963242
}
Example error message (truncated)
{
"plugin_name": "PyCG",
"plugin_version": "0.0.1",
"input": {...},
"created_at": 1595963614,
"err": {
"phase": "decompress",
"message": ...
}
}