From fe4bc46d48ea8e6fe73459e7e7a018c429b9f16d Mon Sep 17 00:00:00 2001 From: Ghislain Fourny Date: Fri, 6 Dec 2019 17:22:34 +0100 Subject: [PATCH 1/6] Fix count in let clauses. --- .../java/sparksoniq/spark/DataFrameUtils.java | 50 +++++++++++++++++++ .../flowr/GroupByClauseSparkIterator.java | 13 +++-- .../flowr/LetClauseSparkIterator.java | 13 +++-- .../sparksoniq/spark/udf/LetClauseUDF.java | 29 ++++++++--- 4 files changed, 89 insertions(+), 16 deletions(-) diff --git a/src/main/java/sparksoniq/spark/DataFrameUtils.java b/src/main/java/sparksoniq/spark/DataFrameUtils.java index 9a032f56b5..425bbca8e2 100644 --- a/src/main/java/sparksoniq/spark/DataFrameUtils.java +++ b/src/main/java/sparksoniq/spark/DataFrameUtils.java @@ -159,6 +159,56 @@ public static List getColumnNames( return result; } + /** + * @param inputSchema schema specifies the columns to be used in the query + * @param duplicateVariableIndex enables skipping a variable + * @param dependencies restriction of the results to within a specified set + * @return list of SQL column names in the schema + */ + public static List getBinaryColumnNames( + StructType inputSchema, + int duplicateVariableIndex, + Map dependencies + ) { + List result = new ArrayList(); + String[] columnNames = inputSchema.fieldNames(); + for (int columnIndex = 0; columnIndex < columnNames.length; columnIndex++) { + if (columnIndex == duplicateVariableIndex) { + continue; + } + String var = columnNames[columnIndex]; + if (dependencies == null || (dependencies.containsKey(var) && !dependencies.get(var).equals(DynamicContext.VariableDependency.COUNT))) { + result.add(columnNames[columnIndex]); + } + } + return result; + } + + /** + * @param inputSchema schema specifies the columns to be used in the query + * @param duplicateVariableIndex enables skipping a variable + * @param dependencies restriction of the results to within a specified set + * @return list of SQL column names in the schema + */ + public static List getLongColumnNames( + StructType inputSchema, + int duplicateVariableIndex, + Map dependencies + ) { + List result = new ArrayList(); + String[] columnNames = inputSchema.fieldNames(); + for (int columnIndex = 0; columnIndex < columnNames.length; columnIndex++) { + if (columnIndex == duplicateVariableIndex) { + continue; + } + String var = columnNames[columnIndex]; + if (dependencies != null && dependencies.containsKey(var) && dependencies.get(var).equals(DynamicContext.VariableDependency.COUNT)) { + result.add(columnNames[columnIndex]); + } + } + return result; + } + /** * @param inputSchema schema specifies the columns to be used in the query * @return list of SQL column names in the schema diff --git a/src/main/java/sparksoniq/spark/iterator/flowr/GroupByClauseSparkIterator.java b/src/main/java/sparksoniq/spark/iterator/flowr/GroupByClauseSparkIterator.java index 02e79f39e1..430b8fa864 100644 --- a/src/main/java/sparksoniq/spark/iterator/flowr/GroupByClauseSparkIterator.java +++ b/src/main/java/sparksoniq/spark/iterator/flowr/GroupByClauseSparkIterator.java @@ -244,26 +244,29 @@ public Dataset getDataFrame( int duplicateVariableIndex = columnNames.indexOf(newVariableName); List allColumns = DataFrameUtils.getColumnNames(inputSchema, duplicateVariableIndex, null); - List UDFcolumns = DataFrameUtils.getColumnNames(inputSchema, -1, _dependencies); + List UDFbinarycolumns = DataFrameUtils.getBinaryColumnNames(inputSchema, -1, _dependencies); + List UDFlongcolumns = DataFrameUtils.getLongColumnNames(inputSchema, -1, _dependencies); df.sparkSession() .udf() .register( "letClauseUDF", - new LetClauseUDF(newVariableExpression, UDFcolumns), + new LetClauseUDF(newVariableExpression, UDFbinarycolumns, UDFlongcolumns), DataTypes.BinaryType ); String selectSQL = DataFrameUtils.getSQL(allColumns, true); - String udfSQL = DataFrameUtils.getSQL(UDFcolumns, false); + String udfBinarySQL = DataFrameUtils.getSQL(UDFbinarycolumns, false); + String udfLongSQL = DataFrameUtils.getSQL(UDFlongcolumns, false); df.createOrReplaceTempView("input"); df = df.sparkSession() .sql( String.format( - "select %s letClauseUDF(array(%s)) as `%s` from input", + "select %s letClauseUDF(array(%s), array(%s)) as `%s` from input", selectSQL, - udfSQL, + udfBinarySQL, + udfLongSQL, newVariableName ) ); diff --git a/src/main/java/sparksoniq/spark/iterator/flowr/LetClauseSparkIterator.java b/src/main/java/sparksoniq/spark/iterator/flowr/LetClauseSparkIterator.java index 7cb864d63d..a73c09c207 100644 --- a/src/main/java/sparksoniq/spark/iterator/flowr/LetClauseSparkIterator.java +++ b/src/main/java/sparksoniq/spark/iterator/flowr/LetClauseSparkIterator.java @@ -163,26 +163,29 @@ public Dataset getDataFrame( int duplicateVariableIndex = Arrays.asList(inputSchema.fieldNames()).indexOf(_variableName); List allColumns = DataFrameUtils.getColumnNames(inputSchema, duplicateVariableIndex, null); - List UDFcolumns = DataFrameUtils.getColumnNames(inputSchema, -1, _dependencies); + List UDFbinarycolumns = DataFrameUtils.getBinaryColumnNames(inputSchema, -1, _dependencies); + List UDFlongcolumns = DataFrameUtils.getLongColumnNames(inputSchema, -1, _dependencies); df.sparkSession() .udf() .register( "letClauseUDF", - new LetClauseUDF(_expression, UDFcolumns), + new LetClauseUDF(_expression, UDFbinarycolumns, UDFlongcolumns), DataTypes.BinaryType ); String selectSQL = DataFrameUtils.getSQL(allColumns, true); - String udfSQL = DataFrameUtils.getSQL(UDFcolumns, false); + String udfBinarySQL = DataFrameUtils.getSQL(UDFbinarycolumns, false); + String udfLongSQL = DataFrameUtils.getSQL(UDFlongcolumns, false); df.createOrReplaceTempView("input"); df = df.sparkSession() .sql( String.format( - "select %s letClauseUDF(array(%s)) as `%s` from input", + "select %s letClauseUDF(array(%s), array(%s)) as `%s` from input", selectSQL, - udfSQL, + udfBinarySQL, + udfLongSQL, _variableName ) ); diff --git a/src/main/java/sparksoniq/spark/udf/LetClauseUDF.java b/src/main/java/sparksoniq/spark/udf/LetClauseUDF.java index 6c17c2ec38..b551e29957 100644 --- a/src/main/java/sparksoniq/spark/udf/LetClauseUDF.java +++ b/src/main/java/sparksoniq/spark/udf/LetClauseUDF.java @@ -21,6 +21,7 @@ package sparksoniq.spark.udf; import org.apache.spark.sql.api.java.UDF1; +import org.apache.spark.sql.api.java.UDF2; import org.rumbledb.api.Item; import com.esotericsoftware.kryo.Kryo; @@ -28,6 +29,7 @@ import com.esotericsoftware.kryo.io.Output; import scala.collection.mutable.WrappedArray; +import sparksoniq.jsoniq.item.ItemFactory; import sparksoniq.jsoniq.runtime.iterator.RuntimeIterator; import sparksoniq.semantics.DynamicContext; import sparksoniq.spark.DataFrameUtils; @@ -36,12 +38,14 @@ import java.util.ArrayList; import java.util.List; -public class LetClauseUDF implements UDF1, byte[]> { +public class LetClauseUDF implements UDF2, WrappedArray, byte[]> { private static final long serialVersionUID = 1L; private RuntimeIterator _expression; - List _columnNames; + List _binaryColumnNames; + List _longColumnNames; + List _allColumnNames; private List> _deserializedParams; private DynamicContext _context; @@ -53,7 +57,8 @@ public class LetClauseUDF implements UDF1, byte[]> { public LetClauseUDF( RuntimeIterator expression, - List columnNames + List binaryColumnNames, + List longColumnNames ) { _expression = expression; @@ -67,19 +72,31 @@ public LetClauseUDF( _output = new Output(128, -1); _input = new Input(); - _columnNames = columnNames; + _binaryColumnNames = binaryColumnNames; + _longColumnNames = longColumnNames; + _allColumnNames = new ArrayList<>(_binaryColumnNames); + _allColumnNames.addAll(_longColumnNames); } @Override - public byte[] call(WrappedArray wrappedParameters) { + public byte[] call(WrappedArray wrappedParameters, WrappedArray wrappedParametersLong) { _deserializedParams.clear(); _context.removeAllVariables(); _nextResult.clear(); DataFrameUtils.deserializeWrappedParameters(wrappedParameters, _deserializedParams, _kryo, _input); - DataFrameUtils.prepareDynamicContext(_context, _columnNames, _deserializedParams); + Object[] longParams = (Object[]) wrappedParametersLong.array(); + for (int i = 0; i < wrappedParametersLong.size(); ++i) { + List longItemList = new ArrayList<>(); + for (Object longParam : longParams) { + longItemList.add(ItemFactory.getInstance().createIntegerItem((int) ((Long) longParam).longValue())); + } + _deserializedParams.add(longItemList); + } + + DataFrameUtils.prepareDynamicContext(_context, _allColumnNames, _deserializedParams); // apply expression in the dynamic context _expression.open(_context); From fd703d1549e5becd9da3633c3413eb89201c5961 Mon Sep 17 00:00:00 2001 From: Ghislain Fourny Date: Fri, 6 Dec 2019 17:50:08 +0100 Subject: [PATCH 2/6] Fix. --- .../java/sparksoniq/spark/DataFrameUtils.java | 29 +++++++++++++++++-- .../sparksoniq/spark/udf/LetClauseUDF.java | 24 ++++++++------- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/src/main/java/sparksoniq/spark/DataFrameUtils.java b/src/main/java/sparksoniq/spark/DataFrameUtils.java index 425bbca8e2..37d76c1148 100644 --- a/src/main/java/sparksoniq/spark/DataFrameUtils.java +++ b/src/main/java/sparksoniq/spark/DataFrameUtils.java @@ -177,7 +177,11 @@ public static List getBinaryColumnNames( continue; } String var = columnNames[columnIndex]; - if (dependencies == null || (dependencies.containsKey(var) && !dependencies.get(var).equals(DynamicContext.VariableDependency.COUNT))) { + if ( + dependencies == null + || (dependencies.containsKey(var) + && !dependencies.get(var).equals(DynamicContext.VariableDependency.COUNT)) + ) { result.add(columnNames[columnIndex]); } } @@ -202,7 +206,11 @@ public static List getLongColumnNames( continue; } String var = columnNames[columnIndex]; - if (dependencies != null && dependencies.containsKey(var) && dependencies.get(var).equals(DynamicContext.VariableDependency.COUNT)) { + if ( + dependencies != null + && dependencies.containsKey(var) + && dependencies.get(var).equals(DynamicContext.VariableDependency.COUNT) + ) { result.add(columnNames[columnIndex]); } } @@ -230,6 +238,23 @@ public static void prepareDynamicContext( } } + public static void prepareDynamicContext( + DynamicContext context, + List binaryColumnNames, + List longColumnNames, + List> deserializedParams, + List longParams + ) { + // prepare dynamic context + for (int columnIndex = 0; columnIndex < binaryColumnNames.size(); columnIndex++) { + context.addVariableValue(binaryColumnNames.get(columnIndex), deserializedParams.get(columnIndex)); + } + // prepare dynamic context + for (int columnIndex = 0; columnIndex < longColumnNames.size(); columnIndex++) { + context.addVariableCount(longColumnNames.get(columnIndex), longParams.get(columnIndex)); + } + } + /** * @param columnNames schema specifies the columns to be used in the query * @param trailingComma boolean field to have a trailing comma diff --git a/src/main/java/sparksoniq/spark/udf/LetClauseUDF.java b/src/main/java/sparksoniq/spark/udf/LetClauseUDF.java index b551e29957..e62c13cf2a 100644 --- a/src/main/java/sparksoniq/spark/udf/LetClauseUDF.java +++ b/src/main/java/sparksoniq/spark/udf/LetClauseUDF.java @@ -45,9 +45,9 @@ public class LetClauseUDF implements UDF2, WrappedArray _binaryColumnNames; List _longColumnNames; - List _allColumnNames; private List> _deserializedParams; + private List _longParams; private DynamicContext _context; private List _nextResult; @@ -63,6 +63,7 @@ public LetClauseUDF( _expression = expression; _deserializedParams = new ArrayList<>(); + _longParams = new ArrayList<>(); _context = new DynamicContext(); _nextResult = new ArrayList<>(); @@ -74,29 +75,32 @@ public LetClauseUDF( _binaryColumnNames = binaryColumnNames; _longColumnNames = longColumnNames; - _allColumnNames = new ArrayList<>(_binaryColumnNames); - _allColumnNames.addAll(_longColumnNames); } @Override public byte[] call(WrappedArray wrappedParameters, WrappedArray wrappedParametersLong) { _deserializedParams.clear(); + _longParams.clear(); _context.removeAllVariables(); _nextResult.clear(); DataFrameUtils.deserializeWrappedParameters(wrappedParameters, _deserializedParams, _kryo, _input); Object[] longParams = (Object[]) wrappedParametersLong.array(); - for (int i = 0; i < wrappedParametersLong.size(); ++i) { - List longItemList = new ArrayList<>(); - for (Object longParam : longParams) { - longItemList.add(ItemFactory.getInstance().createIntegerItem((int) ((Long) longParam).longValue())); - } - _deserializedParams.add(longItemList); + for (Object longParam : longParams) { + System.out.println("Found " + ((Long) longParam).intValue()); + Item count = ItemFactory.getInstance().createIntegerItem(((Long) longParam).intValue()); + _longParams.add(count); } - DataFrameUtils.prepareDynamicContext(_context, _allColumnNames, _deserializedParams); + DataFrameUtils.prepareDynamicContext( + _context, + _binaryColumnNames, + _longColumnNames, + _deserializedParams, + _longParams + ); // apply expression in the dynamic context _expression.open(_context); From a77d4bb10e4733febe60e62579646a524d9ad09a Mon Sep 17 00:00:00 2001 From: Ghislain Fourny Date: Mon, 9 Dec 2019 10:08:33 +0100 Subject: [PATCH 3/6] Add test and clean up. --- src/main/java/sparksoniq/spark/DataFrameUtils.java | 8 ++++---- src/main/java/sparksoniq/spark/udf/LetClauseUDF.java | 3 ++- .../runtime-spark/DataFrames/GroupbyClause12.jq | 6 ++++++ 3 files changed, 12 insertions(+), 5 deletions(-) create mode 100644 src/main/resources/test_files/runtime-spark/DataFrames/GroupbyClause12.jq diff --git a/src/main/java/sparksoniq/spark/DataFrameUtils.java b/src/main/java/sparksoniq/spark/DataFrameUtils.java index 37d76c1148..7050bfa6e1 100644 --- a/src/main/java/sparksoniq/spark/DataFrameUtils.java +++ b/src/main/java/sparksoniq/spark/DataFrameUtils.java @@ -241,17 +241,17 @@ public static void prepareDynamicContext( public static void prepareDynamicContext( DynamicContext context, List binaryColumnNames, - List longColumnNames, + List countColumnNames, List> deserializedParams, - List longParams + List counts ) { // prepare dynamic context for (int columnIndex = 0; columnIndex < binaryColumnNames.size(); columnIndex++) { context.addVariableValue(binaryColumnNames.get(columnIndex), deserializedParams.get(columnIndex)); } // prepare dynamic context - for (int columnIndex = 0; columnIndex < longColumnNames.size(); columnIndex++) { - context.addVariableCount(longColumnNames.get(columnIndex), longParams.get(columnIndex)); + for (int columnIndex = 0; columnIndex < countColumnNames.size(); columnIndex++) { + context.addVariableCount(countColumnNames.get(columnIndex), counts.get(columnIndex)); } } diff --git a/src/main/java/sparksoniq/spark/udf/LetClauseUDF.java b/src/main/java/sparksoniq/spark/udf/LetClauseUDF.java index e62c13cf2a..9553dd60a6 100644 --- a/src/main/java/sparksoniq/spark/udf/LetClauseUDF.java +++ b/src/main/java/sparksoniq/spark/udf/LetClauseUDF.java @@ -87,9 +87,10 @@ public byte[] call(WrappedArray wrappedParameters, WrappedArray wr DataFrameUtils.deserializeWrappedParameters(wrappedParameters, _deserializedParams, _kryo, _input); + // Long parameters correspond to pre-computed counts, when a materialization of the + // actual sequence was avoided upfront. Object[] longParams = (Object[]) wrappedParametersLong.array(); for (Object longParam : longParams) { - System.out.println("Found " + ((Long) longParam).intValue()); Item count = ItemFactory.getInstance().createIntegerItem(((Long) longParam).intValue()); _longParams.add(count); } diff --git a/src/main/resources/test_files/runtime-spark/DataFrames/GroupbyClause12.jq b/src/main/resources/test_files/runtime-spark/DataFrames/GroupbyClause12.jq new file mode 100644 index 0000000000..a06b020c2c --- /dev/null +++ b/src/main/resources/test_files/runtime-spark/DataFrames/GroupbyClause12.jq @@ -0,0 +1,6 @@ +(:JIQS: ShouldRun; Output="(2, 1, 1, 1)" :) +for $i in json-file("./src/main/resources/queries/conf-ex.json") +group by $y := $i.country, $t := $i.target +let $c := count($i) +order by $c descending +return $c \ No newline at end of file From d32946f46bf66a5a0740ae1cb2cdeaeb92d02d3f Mon Sep 17 00:00:00 2001 From: Ghislain Fourny Date: Mon, 9 Dec 2019 10:10:20 +0100 Subject: [PATCH 4/6] Rename function. --- src/main/java/sparksoniq/spark/DataFrameUtils.java | 4 ++-- .../spark/iterator/flowr/GroupByClauseSparkIterator.java | 4 ++-- .../spark/iterator/flowr/LetClauseSparkIterator.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/sparksoniq/spark/DataFrameUtils.java b/src/main/java/sparksoniq/spark/DataFrameUtils.java index 7050bfa6e1..19032c4415 100644 --- a/src/main/java/sparksoniq/spark/DataFrameUtils.java +++ b/src/main/java/sparksoniq/spark/DataFrameUtils.java @@ -165,7 +165,7 @@ public static List getColumnNames( * @param dependencies restriction of the results to within a specified set * @return list of SQL column names in the schema */ - public static List getBinaryColumnNames( + public static List getColumnNamesExceptPrecomputedCounts( StructType inputSchema, int duplicateVariableIndex, Map dependencies @@ -194,7 +194,7 @@ public static List getBinaryColumnNames( * @param dependencies restriction of the results to within a specified set * @return list of SQL column names in the schema */ - public static List getLongColumnNames( + public static List getPrecomputedCountColumnNames( StructType inputSchema, int duplicateVariableIndex, Map dependencies diff --git a/src/main/java/sparksoniq/spark/iterator/flowr/GroupByClauseSparkIterator.java b/src/main/java/sparksoniq/spark/iterator/flowr/GroupByClauseSparkIterator.java index 430b8fa864..0220de367b 100644 --- a/src/main/java/sparksoniq/spark/iterator/flowr/GroupByClauseSparkIterator.java +++ b/src/main/java/sparksoniq/spark/iterator/flowr/GroupByClauseSparkIterator.java @@ -244,8 +244,8 @@ public Dataset getDataFrame( int duplicateVariableIndex = columnNames.indexOf(newVariableName); List allColumns = DataFrameUtils.getColumnNames(inputSchema, duplicateVariableIndex, null); - List UDFbinarycolumns = DataFrameUtils.getBinaryColumnNames(inputSchema, -1, _dependencies); - List UDFlongcolumns = DataFrameUtils.getLongColumnNames(inputSchema, -1, _dependencies); + List UDFbinarycolumns = DataFrameUtils.getColumnNamesExceptPrecomputedCounts(inputSchema, -1, _dependencies); + List UDFlongcolumns = DataFrameUtils.getPrecomputedCountColumnNames(inputSchema, -1, _dependencies); df.sparkSession() .udf() diff --git a/src/main/java/sparksoniq/spark/iterator/flowr/LetClauseSparkIterator.java b/src/main/java/sparksoniq/spark/iterator/flowr/LetClauseSparkIterator.java index a73c09c207..981a8251ed 100644 --- a/src/main/java/sparksoniq/spark/iterator/flowr/LetClauseSparkIterator.java +++ b/src/main/java/sparksoniq/spark/iterator/flowr/LetClauseSparkIterator.java @@ -163,8 +163,8 @@ public Dataset getDataFrame( int duplicateVariableIndex = Arrays.asList(inputSchema.fieldNames()).indexOf(_variableName); List allColumns = DataFrameUtils.getColumnNames(inputSchema, duplicateVariableIndex, null); - List UDFbinarycolumns = DataFrameUtils.getBinaryColumnNames(inputSchema, -1, _dependencies); - List UDFlongcolumns = DataFrameUtils.getLongColumnNames(inputSchema, -1, _dependencies); + List UDFbinarycolumns = DataFrameUtils.getColumnNamesExceptPrecomputedCounts(inputSchema, -1, _dependencies); + List UDFlongcolumns = DataFrameUtils.getPrecomputedCountColumnNames(inputSchema, -1, _dependencies); df.sparkSession() .udf() From 1975583ca2ff06fac6cbba757726e45177615d5f Mon Sep 17 00:00:00 2001 From: Ghislain Fourny Date: Mon, 9 Dec 2019 10:12:51 +0100 Subject: [PATCH 5/6] Spotless. --- .../iterator/flowr/GroupByClauseSparkIterator.java | 12 ++++++++++-- .../spark/iterator/flowr/LetClauseSparkIterator.java | 6 +++++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/main/java/sparksoniq/spark/iterator/flowr/GroupByClauseSparkIterator.java b/src/main/java/sparksoniq/spark/iterator/flowr/GroupByClauseSparkIterator.java index 0220de367b..23843f0cbb 100644 --- a/src/main/java/sparksoniq/spark/iterator/flowr/GroupByClauseSparkIterator.java +++ b/src/main/java/sparksoniq/spark/iterator/flowr/GroupByClauseSparkIterator.java @@ -244,8 +244,16 @@ public Dataset getDataFrame( int duplicateVariableIndex = columnNames.indexOf(newVariableName); List allColumns = DataFrameUtils.getColumnNames(inputSchema, duplicateVariableIndex, null); - List UDFbinarycolumns = DataFrameUtils.getColumnNamesExceptPrecomputedCounts(inputSchema, -1, _dependencies); - List UDFlongcolumns = DataFrameUtils.getPrecomputedCountColumnNames(inputSchema, -1, _dependencies); + List UDFbinarycolumns = DataFrameUtils.getColumnNamesExceptPrecomputedCounts( + inputSchema, + -1, + _dependencies + ); + List UDFlongcolumns = DataFrameUtils.getPrecomputedCountColumnNames( + inputSchema, + -1, + _dependencies + ); df.sparkSession() .udf() diff --git a/src/main/java/sparksoniq/spark/iterator/flowr/LetClauseSparkIterator.java b/src/main/java/sparksoniq/spark/iterator/flowr/LetClauseSparkIterator.java index 981a8251ed..27387c7f54 100644 --- a/src/main/java/sparksoniq/spark/iterator/flowr/LetClauseSparkIterator.java +++ b/src/main/java/sparksoniq/spark/iterator/flowr/LetClauseSparkIterator.java @@ -163,7 +163,11 @@ public Dataset getDataFrame( int duplicateVariableIndex = Arrays.asList(inputSchema.fieldNames()).indexOf(_variableName); List allColumns = DataFrameUtils.getColumnNames(inputSchema, duplicateVariableIndex, null); - List UDFbinarycolumns = DataFrameUtils.getColumnNamesExceptPrecomputedCounts(inputSchema, -1, _dependencies); + List UDFbinarycolumns = DataFrameUtils.getColumnNamesExceptPrecomputedCounts( + inputSchema, + -1, + _dependencies + ); List UDFlongcolumns = DataFrameUtils.getPrecomputedCountColumnNames(inputSchema, -1, _dependencies); df.sparkSession() From bd43bd30459c01ded4b569b223250f4eb2ddc6bd Mon Sep 17 00:00:00 2001 From: canberker Date: Mon, 9 Dec 2019 11:11:27 +0100 Subject: [PATCH 6/6] Cleanup code and redundant comments --- .../java/sparksoniq/spark/DataFrameUtils.java | 36 ++++++------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/src/main/java/sparksoniq/spark/DataFrameUtils.java b/src/main/java/sparksoniq/spark/DataFrameUtils.java index 19032c4415..85f2344164 100644 --- a/src/main/java/sparksoniq/spark/DataFrameUtils.java +++ b/src/main/java/sparksoniq/spark/DataFrameUtils.java @@ -72,19 +72,9 @@ public class DataFrameUtils { - private static ThreadLocal lastBytesCache = new ThreadLocal() { - @Override - protected byte[] initialValue() { - return null; - } - }; + private static ThreadLocal lastBytesCache = ThreadLocal.withInitial(() -> null); - private static ThreadLocal> lastObjectItemCache = new ThreadLocal>() { - @Override - protected List initialValue() { - return null; - } - }; + private static ThreadLocal> lastObjectItemCache = ThreadLocal.withInitial(() -> null); public static void registerKryoClassesKryo(Kryo kryo) { kryo.register(Item.class); @@ -145,7 +135,7 @@ public static List getColumnNames( int duplicateVariableIndex, Map dependencies ) { - List result = new ArrayList(); + List result = new ArrayList<>(); String[] columnNames = inputSchema.fieldNames(); for (int columnIndex = 0; columnIndex < columnNames.length; columnIndex++) { if (columnIndex == duplicateVariableIndex) { @@ -170,7 +160,7 @@ public static List getColumnNamesExceptPrecomputedCounts( int duplicateVariableIndex, Map dependencies ) { - List result = new ArrayList(); + List result = new ArrayList<>(); String[] columnNames = inputSchema.fieldNames(); for (int columnIndex = 0; columnIndex < columnNames.length; columnIndex++) { if (columnIndex == duplicateVariableIndex) { @@ -199,7 +189,7 @@ public static List getPrecomputedCountColumnNames( int duplicateVariableIndex, Map dependencies ) { - List result = new ArrayList(); + List result = new ArrayList<>(); String[] columnNames = inputSchema.fieldNames(); for (int columnIndex = 0; columnIndex < columnNames.length; columnIndex++) { if (columnIndex == duplicateVariableIndex) { @@ -232,7 +222,6 @@ public static void prepareDynamicContext( List columnNames, List> deserializedParams ) { - // prepare dynamic context for (int columnIndex = 0; columnIndex < columnNames.size(); columnIndex++) { context.addVariableValue(columnNames.get(columnIndex), deserializedParams.get(columnIndex)); } @@ -245,11 +234,9 @@ public static void prepareDynamicContext( List> deserializedParams, List counts ) { - // prepare dynamic context for (int columnIndex = 0; columnIndex < binaryColumnNames.size(); columnIndex++) { context.addVariableValue(binaryColumnNames.get(columnIndex), deserializedParams.get(columnIndex)); } - // prepare dynamic context for (int columnIndex = 0; columnIndex < countColumnNames.size(); columnIndex++) { context.addVariableCount(countColumnNames.get(columnIndex), counts.get(columnIndex)); } @@ -349,12 +336,11 @@ public static String getGroupbyProjectSQL( return queryColumnString.toString(); } - public static Object deserializeByteArray(byte[] toDeserialize, Kryo kryo, Input input) { + private static Object deserializeByteArray(byte[] toDeserialize, Kryo kryo, Input input) { byte[] bytes = lastBytesCache.get(); if (bytes != null) { if (Arrays.equals(bytes, toDeserialize)) { - List deserializedParam = lastObjectItemCache.get(); - return deserializedParam; + return lastObjectItemCache.get(); } } input.setBuffer(toDeserialize); @@ -400,7 +386,7 @@ public static Row reserializeRowWithNewData( public static List deserializeRowField(Row row, int columnIndex, Kryo kryo, Input input) { Object o = row.get(columnIndex); if (o instanceof Long) { - List result = new ArrayList(1); + List result = new ArrayList<>(1); result.add(ItemFactory.getInstance().createIntegerItem(((Long) o).intValue())); return result; } else { @@ -451,12 +437,12 @@ public static Dataset zipWithIndex(Dataset df, Long offset, String ind .collect(); Row[] partitionOffsetsArray = ((Row[]) partitionOffsetsObject); Map partitionOffsets = new HashMap<>(); - for (int i = 0; i < partitionOffsetsArray.length; i++) { - partitionOffsets.put(partitionOffsetsArray[i].getInt(0), partitionOffsetsArray[i].getLong(1)); + for (Row row : partitionOffsetsArray) { + partitionOffsets.put(row.getInt(0), row.getLong(1)); } UserDefinedFunction getPartitionOffset = udf( - (partitionId) -> partitionOffsets.get((Integer) partitionId), + (partitionId) -> partitionOffsets.get(partitionId), DataTypes.LongType );