Skip to content

Kafka Topics

Georgios Gousios edited this page Jul 21, 2020 · 38 revisions

This page contains information about the organisation of the FASTEN Kafka cluster.

General information

  • Kafka runs on a 3 node cluster, consisting of machines delft, samos and goteborg, on port 9092. Any client can connect on any of the machines. Zookeeper (a requirement for Kafka) runs on the same machines on port 2181.
  • Kafka is configured with 2-way replication: each topic partition is replicated on 2 random machines.
  • All contents in the FASTEN Kafka Topics is formatted in JSON.
  • All FASTEN topics are prefixed with fasten.*. Do not touch other topics on the same cluster.

Things to consider when creating topics

  • The number of partitions dictate how many parallel consumers can read from it. For topics that need high parallelism downstream (e.g., call graph generation), consider at least 30 partitions.

  • Too many partitions will mean that Kafka will spend too much time rebalancing consumer groups if a processing node fails. To avoid cascading failures consider increasing the max.poll.interval.ms in your consumer group configurations. Also, choose the number of partitions wisely.

  • Avoid replication for ephemeral topics, i.e., one-off topics, debug topics etc.

Input topics

Processing in FASTEN is triggered by new data arriving on a Kafka topic. The input topics are therefore at the beginning of the processing chain.

fasten.mvn.full.rnd

Contains information about Maven artefact co-ordinates, collected by the libraries.io project, version 1.6.0.

{
  "groupId": "org.glassfish.jersey.ext.rx",   // Maven group id
  "artifactId": "jersey-rx-client-rxjava2",   // Maven artifact id
  "version": "2.26-b07",                      // Version
  "date": 1498824450                          // Publication timestamp (on Maven)
}

fasten.mvn.pkg

Contains information about Maven artefact co-ordinates, collected by the FASTEN Maven crawler

{
  "groupId": "com.worldpay.api.client",    // Maven group id
  "artifactId": "worldpay-client-common",  // Maven artifact id
  "version": "0.0.5",                      // Version
  "date": 1446840840,                      // Publication timestamp (on Maven)
  "url": "https://repo1.maven.org/[...]/worldpay-client-common-0.0.15.pom" // URL of the POM file
}

Plugin topics

FASTEN plug-ins process data from input topics (👆) or other plug-ins and produce 2 outputs:

  • fasten.<plugin-name>.out: Normal output
  • fasten.<plugin-name>.err: Output in case an error occurred

To enable plug-ins to efficiently exchange information, all plug-ins use specific message formats (👇) for their .out and .err topics, respectively. The actual plug-in output or error message must be contained in the payload field.

The input field contains a copy of the input message that lead to the particular output. This is to help debugging and enable traceability across the production chain. As input messages can be very big (e.g., callgraphs), the processing plug-in must judiciously shave the contents of the input field with the goal of minimising the size of the total message, while not loosing any traceability information.

Generic output message format

{
    "plugin_name": "PluginName", 
    "plugin_version": "0.0.1",
    "input": {...},
    "created_at": 234, 
    "payload": {...}
}

Generic error message format

{
    "plugin_name": "PluginName",
    "plugin_version": "0.0.1",
    "input": {...},
    "created_at": 123,
    "err": {...}
}

fasten.OPALV3.*

Input: fasten.mvn.full.rnd or fasten.mvn.pkg

Description: The OPAL v3 plug-in consumes Maven co-ordinates and produces call graphs using the OPAL call graph generator.

Example output message (payload truncated. Full format description is available here)

{
  "input":{
    "date":1476224484,
    "groupId":"com.twitter",
    "artifactId":"inject-core_2.11",
    "version":"2.5.0"
  },
  "plugin_version":"0.0.1",
  "payload":{
    "product":"com.twitter:inject-core_2.11",
    "nodes":478,
    "forge":"mvn",
    "generator":"OPAL",
    "version":"2.5.0",
    "cha":{
      "externalTypes":{
        "/com.google.inject/AbstractModule":{
          "methods":{
            "444":{
              "metadata":{},
              "uri":"/com.google.inject/AbstractModule.install(Module)%2Fjava.lang%2FVoidType"
            }
          },
          "superInterfaces":[],
          "sourceFile":"",
          "superClasses":[]
        }
      }
    }
  },
  "created_at": 1594820036478,
  "plugin_name": "OPALV3"
}

Example error message

{
  "input":{
    "date":1411660344,
    "groupId":"org.commonjava.aprox",
    "artifactId":"aprox-db",
    "version":"0.16.0"
  },
  "plugin_version":"0.0.1",
  "err":{
    "msg":"https://repo.maven.apache.org/maven2/org/commonjava/aprox/aprox-db/0.16.0/aprox-db-0.16.0.jar",
    "stacktrace":[
      "java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1915)",
      "java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1515)",
      "java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getInputStream(HttpsURLConnectionImpl.java:250)",
      "java.base/java.net.URL.openStream(URL.java:1140)",
      "eu.fasten.analyzer.javacgopalv3.data.MavenCoordinate$MavenResolver.httpGetToFile(MavenCoordinate.java:192)",
      "eu.fasten.analyzer.javacgopalv3.data.MavenCoordinate$MavenResolver.downloadJar(MavenCoordinate.java:172)",
      "eu.fasten.analyzer.javacgopalv3.data.PartialCallGraph.createExtendedRevisionCallGraph(PartialCallGraph.java:98)",
      "eu.fasten.analyzer.javacgopalv3.OPALV3Plugin$OPALV3.generateCallGraph(OPALV3Plugin.java:105)",
      "eu.fasten.analyzer.javacgopalv3.OPALV3Plugin$OPALV3.consume(OPALV3Plugin.java:67)",
      "eu.fasten.server.plugins.kafka.FastenKafkaPlugin.handleConsuming(FastenKafkaPlugin.java:153)",
      "eu.fasten.server.plugins.kafka.FastenKafkaPlugin.run(FastenKafkaPlugin.java:105)",
      "java.base/java.lang.Thread.run(Thread.java:834)"
    ],
    "error":"FileNotFoundException"
  },
  "created_at":1594818677,
  "plugin_name":"OPALV3"
}