Skip to content

Commit

Permalink
Closes #1426: Run IIS experiments by relying on spark 3.4 version
Browse files Browse the repository at this point in the history
WIP.

Replacing scala source code in iis-common module with java-based counterpart. Simplifying the code, aligning other classes with changes in avro read/write code.
  • Loading branch information
marekhorst committed May 8, 2024
1 parent f92e590 commit 92e1e51
Show file tree
Hide file tree
Showing 21 changed files with 198 additions and 453 deletions.
34 changes: 0 additions & 34 deletions iis-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,6 @@
<artifactId>hadoop-common</artifactId>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
Expand Down Expand Up @@ -169,29 +162,6 @@

<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Plugin that generates Java classes from Avro schemas -->
<plugin>
<groupId>org.apache.avro</groupId>
Expand Down Expand Up @@ -257,8 +227,4 @@
</plugins>
</build>

<properties>
<scala.version>2.12.14</scala.version>
</properties>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package eu.dnetlib.iis.common.spark.avro;

import java.io.Serializable;

import org.apache.avro.Schema;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.types.StructType;

/**
* Support for reading avro datastores as dataframes.
*
* @author mhorst
*
*/
public class AvroDataFrameReader implements Serializable {

private static final long serialVersionUID = 4858427693578954728L;

private final SparkSession sparkSession;

/**
* Default constructor accepting spark session as parameter.
* @param sparkSession spark session
*/
public AvroDataFrameReader(SparkSession sparkSession) {
this.sparkSession = sparkSession;
}

/**
* @param path Path to the data store
* @param avroSchema Avro schema of the records
* @return DataFrame with data read from given path
*/
public Dataset<Row> read(String path, Schema avroSchema) {
Dataset<Row> in = sparkSession.read().format("avro").option("avroSchema", avroSchema.toString()).load(path);
return sparkSession.createDataFrame(in.rdd(), (StructType) SchemaConverters.toSqlType(avroSchema).dataType());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package eu.dnetlib.iis.common.spark.avro;

import java.io.Serializable;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
* Support for dataframes of avro types.
*
* @author mhorst
*/
public class AvroDataFrameSupport implements Serializable {

private static final long serialVersionUID = -3980871922050483460L;

private AvroDataFrameSupport() {
}

/**
* @param <T> type of elements
* @param dataFrame seq with elements for the dataframe
* @param clazz class of objects in the dataset
* @return Dataset of objects corresponding to records in the given dataframe
*/
public static <T extends SpecificRecordBase> Dataset<T> toDS(final Dataset<Row> dataFrame, final Class<T> clazz) {
final ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return (Dataset<T>) dataFrame.toJSON().map((MapFunction<String, T>) json -> (T) mapper.readValue(json, clazz),
Encoders.kryo((Class<T>) clazz));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package eu.dnetlib.iis.common.spark.avro;

import java.io.Serializable;

import org.apache.avro.Schema;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/**
* Support for writing dataframes of avro types.
*
* @author mhorst
*/
public class AvroDataFrameWriter implements Serializable {

private static final long serialVersionUID = 7842491849433906246L;

private final Dataset<Row> dataFrame;

/**
* Default constructor accepting DataFrame.
*
* @param dataFrame DataFrame of avro type
*/
public AvroDataFrameWriter(Dataset<Row> dataFrame) {
this.dataFrame = dataFrame;
}

/**
* Writes a dataframe as avro datastore using avro schema.
* @param path path to the data store
* @param avroSchema Avro schema of the records
*/
public void write(String path, Schema avroSchema) {
dataFrame.write().format("avro").option("avroSchema", avroSchema.toString())
.option("compression", "uncompressed").save(path);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package eu.dnetlib.iis.common.spark.avro;

import java.io.Serializable;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;

/**
* Support for reading avro datastores as datasets.
*
* @author mhorst
*/
public class AvroDatasetReader implements Serializable {

private static final long serialVersionUID = 4858427693578954728L;

private final SparkSession sparkSession;

/**
* Default constructor accepting spark session as parameter.
* @param sparkSession spark session
*/
public AvroDatasetReader(SparkSession sparkSession) {
this.sparkSession = sparkSession;
}

/**
* Reads avro datastore as Spark dataset using avro schema and kryo encoder.
*
* NOTE: due to inability to use bean-based encoder for avro types this method uses kryo encoder;
* for this reason this method creates objects by mapping rows to jsons and jsons to instances of objects.
*
* @param <T> type of objects in the dataset
* @param path path to the data store
* @param avroSchema Avro schema of the records
* @param clazz class of objects in the dataset
* @return Dataset with data read from given path
*/
public <T extends SpecificRecordBase> Dataset<T> read(String path, Schema avroSchema, Class<T> clazz) {
return AvroDataFrameSupport.toDS(new AvroDataFrameReader(sparkSession).read(path, avroSchema), clazz);
}
}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 92e1e51

Please sign in to comment.