Skip to content

Commit

Permalink
fix int64 insertId in node mysql
Browse files Browse the repository at this point in the history
TODO: write test and fix the other target/db combinations
  • Loading branch information
andyli committed Sep 28, 2023
1 parent b58113a commit 440fdf5
Showing 1 changed file with 51 additions and 40 deletions.
91 changes: 51 additions & 40 deletions src/tink/sql/drivers/node/MySql.hx
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import tink.sql.Query;
import tink.sql.Info;
import tink.sql.Types;
import tink.sql.format.Sanitizer;
import tink.sql.format.SqlFormatter;
import tink.streams.Stream;
import tink.sql.format.MySqlFormatter;
import tink.sql.expr.ExprTyper;
import tink.sql.parse.ResultParser;

import #if haxe3 js.lib.Error #else js.Error #end as JsError;

using tink.CoreApi;
Expand All @@ -27,9 +27,8 @@ typedef NodeSettings = MySqlSettings & {
}

class MySql implements Driver {

public final type:Driver.DriverType = MySql;

final settings:NodeSettings;

public function new(settings) {
Expand All @@ -54,7 +53,7 @@ class MySql implements Driver {
supportBigNumbers: true,
bigNumberStrings: true,
});

// pool.on('acquire', function (connection) {
// js.Node.console.log('Connection ${connection.threadId} acquired');
// });
Expand All @@ -77,42 +76,37 @@ class MySqlConnectionPool<Db> implements Connection.ConnectionPool<Db> {
final pool:NativeConnectionPool;
final formatter:MySqlFormatter;
final parser:ResultParser<Db>;


public function new(info, pool) {
this.info = info;
this.pool = pool;
this.formatter = new MySqlFormatter();
this.parser = new ResultParser();
}



public function getFormatter()
return formatter;

public function execute<Result>(query:Query<Db, Result>):Result {
final cnx = getNativeConnection();
return new MySqlConnection(info, cnx, true).execute(query);
}

public function isolate():Pair<Connection<Db>, CallbackLink> {
final cnx = getNativeConnection();
return new Pair(
(new MySqlConnection(info, cnx, false):Connection<Db>),
(() -> cnx.handle(o -> switch o {
case Success(native): native.release();
case Failure(_): // nothing to do
}):CallbackLink)
);
return new Pair((new MySqlConnection(info, cnx, false) : Connection<Db>), (() -> cnx.handle(o -> switch o {
case Success(native): native.release();
case Failure(_): // nothing to do
}) : CallbackLink));
}

function getNativeConnection() {
return new Promise((resolve, reject) -> {
var cancelled = false;
pool.getConnection((err, cnx) -> {
if(cancelled)
if (cancelled)
cnx.release();
else if(err != null)
else if (err != null)
reject(Error.ofJsError(err));
else
resolve(cnx);
Expand All @@ -126,8 +120,8 @@ class MySqlConnectionPool<Db> implements Connection.ConnectionPool<Db> {
return new MySqlConnection(info, cnx, true).executeSql(sql);
}
}
class MySqlConnection<Db> implements Connection<Db> implements Sanitizer {

class MySqlConnection<Db> implements Connection<Db> implements Sanitizer {
final info:DatabaseInfo;
final cnx:Promise<NativeConnection>;
final formatter:MySqlFormatter;
Expand All @@ -144,12 +138,12 @@ class MySqlConnection<Db> implements Connection<Db> implements Sanitizer {

public function value(v:Any):String {
if (Std.is(v, Date))
return 'DATE_ADD(FROM_UNIXTIME(0), INTERVAL ${(v:Date).getTime()/1000} SECOND)';
return 'DATE_ADD(FROM_UNIXTIME(0), INTERVAL ${(v : Date).getTime() / 1000} SECOND)';

if (Int64.isInt64(v))
return Int64.toStr(v);

return NativeDriver.escape(if(Std.is(v, Bytes)) Buffer.hxFromBytes(v) else v);
return NativeDriver.escape(if (Std.is(v, Bytes)) Buffer.hxFromBytes(v) else v);
}

public function ident(s:String):String
Expand All @@ -162,10 +156,11 @@ class MySqlConnection<Db> implements Connection<Db> implements Sanitizer {
return Failure(Error.withData(error.message, error));

public function execute<Result>(query:Query<Db, Result>):Result {
inline function fetch<T>(): Promise<T> return run(queryOptions(query));
inline function fetch<T>():Promise<T>
return run(queryOptions(query));
return switch query {
case Select(_) | Union(_):
final parse:DynamicAccess<Any>->{} = parser.queryParser(query, formatter.isNested(query));
final parse:DynamicAccess<Any> -> {} = parser.queryParser(query, formatter.isNested(query));
stream(queryOptions(query)).map(parse);

case CallProcedure(_):
Expand All @@ -174,17 +169,28 @@ class MySqlConnection<Db> implements Connection<Db> implements Sanitizer {
final parse = parser.queryParser(query, formatter.isNested(query));
Stream.ofIterator({
hasNext: () -> iterator.hasNext(),
next: () -> parse(iterator.next())
next: () -> parse(iterator.next())
});
}));
case Transaction(_) | CreateTable(_, _) | DropTable(_) | AlterTable(_, _) | TruncateTable(_):
fetch().next(_ -> Noise);
case Insert(_):
fetch().next(res -> new Id(res.insertId));
case Insert(insert):
fetch().next(res -> {
var insertId:Dynamic = res.insertId;
final p:Promise<Dynamic> = switch (SqlFormatter.getAutoIncPrimaryKeyCol(insert.table)) {
case {type: DInt(Big, _, _, _)}:
// insertId is always a number even if bigNumberStrings is enabled
// https://github.com/mysqljs/mysql/issues/2460
new Id64(Int64.fromFloat(insertId));
case _:
new Id(insertId);
}
p;
});
case Update(_):
fetch().next(res -> {rowsAffected: (res.changedRows: Int)});
fetch().next(res -> {rowsAffected: (res.changedRows : Int)});
case Delete(_):
fetch().next(res -> {rowsAffected: (res.affectedRows: Int)});
fetch().next(res -> {rowsAffected: (res.affectedRows : Int)});
case ShowColumns(_):
fetch().next((res:Array<MysqlColumnInfo>) -> res.map(formatter.parseColumn));
case ShowIndex(_):
Expand All @@ -196,7 +202,7 @@ class MySqlConnection<Db> implements Connection<Db> implements Sanitizer {
return run({sql: sql});
}

function queryOptions(query:Query<Db, Dynamic>): QueryOptions {
function queryOptions(query:Query<Db, Dynamic>):QueryOptions {
final sql = formatter.format(query).toString(this);
#if sql_debug
trace(sql);
Expand All @@ -209,29 +215,32 @@ class MySqlConnection<Db> implements Connection<Db> implements Sanitizer {
}
}

function stream<T>(options: QueryOptions):Stream<T, Error> {
function stream<T>(options:QueryOptions):Stream<T, Error> {
return cnx.next(cnx -> {
final query = cnx.query(options);
Stream.ofNodeStream('query', query.stream({highWaterMark: 1024}), {onEnd: autoRelease ? cnx.release : null});
});
}

function run<T>(options: QueryOptions):Promise<T>
function run<T>(options:QueryOptions):Promise<T>
return cnx.next(cnx -> {
new Promise((resolve, reject) -> {
cnx.query(options, (err, res) -> {
if(autoRelease) cnx.release();
if (err != null) reject(Error.ofJsError(err));
else resolve(cast res);
if (autoRelease)
cnx.release();
if (err != null)
reject(Error.ofJsError(err));
else
resolve(cast res);
});
null; // irreversible, we always want to wait for the query to finish
});
});

function typeCast(field, next): Any {
function typeCast(field, next):Any {
return switch field.type {
case 'GEOMETRY':
switch (field.buffer(): Buffer) {
switch (field.buffer() : Buffer) {
case null: null;
case v: @:privateAccess new ResultParser().parseGeometryValue(v.hxToBytes());
}
Expand Down Expand Up @@ -275,6 +284,7 @@ private typedef NativeConfig = {
final ?flags:String;
final ?ssl:Any;
}

private typedef NativePoolConfig = NativeConfig & {
final ?acquireTimeout:Int;
final ?waitForConnections:Int;
Expand All @@ -291,16 +301,17 @@ private typedef QueryOptions = {
extern class NativeConnectionPool extends js.node.events.EventEmitter<NativeConnectionPool> {
function getConnection(cb:JsError->NativeConnection->Void):Void;
}

extern class NativeConnection {
@:overload(function (q: QueryOptions, cb:JsError->Dynamic->Void):Void {})
function query<Row>(q: QueryOptions):NativeQuery<Row>;
@:overload(function(q:QueryOptions, cb:JsError->Dynamic->Void):Void {})
function query<Row>(q:QueryOptions):NativeQuery<Row>;
function pause():Void;
function resume():Void;
function release():Void;
}

extern class NativeQuery<Row> extends EventEmitter<NativeQuery<Row>> {
function stream(?opt:{?highWaterMark:Int}):NativeStream;
}

extern class NativeStream extends Readable<NativeStream> {}

0 comments on commit 440fdf5

Please sign in to comment.