-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparquet.ts
118 lines (108 loc) · 3.19 KB
/
parquet.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import type { ParquetType, FieldDefinition, SchemaDefinition, WriterOptions } from "@dsnp/parquetjs";
import type { createSBBFParams } from "@dsnp/parquetjs/dist/lib/bloomFilterIO/bloomFilterWriter";
import type { DSNPParquetSchema, DSNPParquetType, ParquetColumn } from "./types/dsnp-parquet.js";
/**
* All supported types from Parquetjs
*/
const supportedTypes = new Set([
"BOOLEAN",
"INT32",
"INT64",
"INT96",
"FLOAT",
"DOUBLE",
"BYTE_ARRAY",
"FIXED_LEN_BYTE_ARRAY",
"UTF8",
"MAP",
"LIST",
"ENUM",
"DECIMAL",
"DATE",
"TIME_MILLIS",
"TIME_MICROS",
"TIMESTAMP_MILLIS",
"TIMESTAMP_MICROS",
"UINT_8",
"UINT_16",
"UINT_32",
"UINT_64",
"INT_8",
"INT_16",
"INT_32",
"INT_64",
"JSON",
"BSON",
"INTERVAL",
]);
/**
* Simple check to make sure that type is supported.
*/
const isColumnTypeSupported = (incoming: string): incoming is ParquetType => {
return supportedTypes.has(incoming);
};
/**
* Error to capture all the unsupported edge cases
*/
export class UnsupportedDSNPSchemaError extends Error {
constructor(msg: string) {
const message = `Unsupported DSNP schema: ${msg}`;
super(message);
this.name = "UnsupportedDSNPSchemaError";
}
}
const convertColumnType = (columnType: DSNPParquetType): FieldDefinition["type"] => {
if (typeof columnType === "string") {
if (columnType === "STRING") return "UTF8";
if (isColumnTypeSupported(columnType)) return columnType;
throw new UnsupportedDSNPSchemaError(columnType.toString());
}
// ParquetJs uses the old format still, so not all options are available
if ("INTEGER" in columnType) {
return `${columnType.INTEGER.sign ? "" : "U"}INT_${columnType.INTEGER.bit_width}` as ParquetType;
}
if ("TIMESTAMP" in columnType && columnType.TIMESTAMP.is_adjusted_to_utc && columnType.TIMESTAMP.unit !== "NANOS") {
return `TIMESTAMP_${columnType.TIMESTAMP.unit}` as ParquetType;
}
if ("TIME" in columnType && columnType.TIME.is_adjusted_to_utc && columnType.TIME.unit !== "NANOS") {
return `TIME_${columnType.TIME.unit}` as ParquetType;
}
throw new UnsupportedDSNPSchemaError(columnType.toString());
};
/**
* Converts a field from a JSON Schema into a Parquet Field Definition
*/
const fromColumn = (column: ParquetColumn): FieldDefinition => {
return {
type: convertColumnType(column.column_type),
compression: column.compression,
statistics: false,
};
};
/**
* Converts supported Json Schemas into Parquet Schema Definitions
*/
const toSchema = (dsnpSchema: DSNPParquetSchema): SchemaDefinition => {
const schema: SchemaDefinition = {};
for (const column of dsnpSchema) {
schema[column.name] = fromColumn(column);
}
return schema;
};
/**
* Create a new schema from a DSNP Parquet Schema (dsnp.org)
* Also provides the Writer Options as DSNP Schemas support bloom filter selection.
*/
export const fromDSNPSchema = (dsnpSchema: DSNPParquetSchema): [SchemaDefinition, WriterOptions] => {
const schema: SchemaDefinition = toSchema(dsnpSchema);
const bloomFilters = dsnpSchema.reduce<createSBBFParams[]>((acc, x) => {
if (x.bloom_filter) acc.push({ column: x.name });
return acc;
}, []);
return [
schema,
{
bloomFilters,
},
];
};