diff --git a/src/tink/sql/drivers/node/MySql.hx b/src/tink/sql/drivers/node/MySql.hx index a58af98..9bc5914 100755 --- a/src/tink/sql/drivers/node/MySql.hx +++ b/src/tink/sql/drivers/node/MySql.hx @@ -12,11 +12,11 @@ import tink.sql.Query; import tink.sql.Info; import tink.sql.Types; import tink.sql.format.Sanitizer; +import tink.sql.format.SqlFormatter; import tink.streams.Stream; import tink.sql.format.MySqlFormatter; import tink.sql.expr.ExprTyper; import tink.sql.parse.ResultParser; - import #if haxe3 js.lib.Error #else js.Error #end as JsError; using tink.CoreApi; @@ -27,9 +27,8 @@ typedef NodeSettings = MySqlSettings & { } class MySql implements Driver { - public final type:Driver.DriverType = MySql; - + final settings:NodeSettings; public function new(settings) { @@ -54,7 +53,7 @@ class MySql implements Driver { supportBigNumbers: true, bigNumberStrings: true, }); - + // pool.on('acquire', function (connection) { // js.Node.console.log('Connection ${connection.threadId} acquired'); // }); @@ -77,7 +76,6 @@ class MySqlConnectionPool implements Connection.ConnectionPool { final pool:NativeConnectionPool; final formatter:MySqlFormatter; final parser:ResultParser; - public function new(info, pool) { this.info = info; @@ -85,34 +83,30 @@ class MySqlConnectionPool implements Connection.ConnectionPool { this.formatter = new MySqlFormatter(); this.parser = new ResultParser(); } - - + public function getFormatter() return formatter; - + public function execute(query:Query):Result { final cnx = getNativeConnection(); return new MySqlConnection(info, cnx, true).execute(query); } - + public function isolate():Pair, CallbackLink> { final cnx = getNativeConnection(); - return new Pair( - (new MySqlConnection(info, cnx, false):Connection), - (() -> cnx.handle(o -> switch o { - case Success(native): native.release(); - case Failure(_): // nothing to do - }):CallbackLink) - ); + return new Pair((new MySqlConnection(info, cnx, false) : Connection), (() -> cnx.handle(o -> switch o { + case Success(native): native.release(); + case Failure(_): // nothing to do + }) : CallbackLink)); } - + function getNativeConnection() { return new Promise((resolve, reject) -> { var cancelled = false; pool.getConnection((err, cnx) -> { - if(cancelled) + if (cancelled) cnx.release(); - else if(err != null) + else if (err != null) reject(Error.ofJsError(err)); else resolve(cnx); @@ -126,8 +120,8 @@ class MySqlConnectionPool implements Connection.ConnectionPool { return new MySqlConnection(info, cnx, true).executeSql(sql); } } -class MySqlConnection implements Connection implements Sanitizer { +class MySqlConnection implements Connection implements Sanitizer { final info:DatabaseInfo; final cnx:Promise; final formatter:MySqlFormatter; @@ -144,12 +138,12 @@ class MySqlConnection implements Connection implements Sanitizer { public function value(v:Any):String { if (Std.is(v, Date)) - return 'DATE_ADD(FROM_UNIXTIME(0), INTERVAL ${(v:Date).getTime()/1000} SECOND)'; + return 'DATE_ADD(FROM_UNIXTIME(0), INTERVAL ${(v : Date).getTime() / 1000} SECOND)'; if (Int64.isInt64(v)) return Int64.toStr(v); - return NativeDriver.escape(if(Std.is(v, Bytes)) Buffer.hxFromBytes(v) else v); + return NativeDriver.escape(if (Std.is(v, Bytes)) Buffer.hxFromBytes(v) else v); } public function ident(s:String):String @@ -162,10 +156,11 @@ class MySqlConnection implements Connection implements Sanitizer { return Failure(Error.withData(error.message, error)); public function execute(query:Query):Result { - inline function fetch(): Promise return run(queryOptions(query)); + inline function fetch():Promise + return run(queryOptions(query)); return switch query { case Select(_) | Union(_): - final parse:DynamicAccess->{} = parser.queryParser(query, formatter.isNested(query)); + final parse:DynamicAccess -> {} = parser.queryParser(query, formatter.isNested(query)); stream(queryOptions(query)).map(parse); case CallProcedure(_): @@ -174,17 +169,28 @@ class MySqlConnection implements Connection implements Sanitizer { final parse = parser.queryParser(query, formatter.isNested(query)); Stream.ofIterator({ hasNext: () -> iterator.hasNext(), - next: () -> parse(iterator.next()) + next: () -> parse(iterator.next()) }); })); case Transaction(_) | CreateTable(_, _) | DropTable(_) | AlterTable(_, _) | TruncateTable(_): fetch().next(_ -> Noise); - case Insert(_): - fetch().next(res -> new Id(res.insertId)); + case Insert(insert): + fetch().next(res -> { + var insertId:Dynamic = res.insertId; + final p:Promise = switch (SqlFormatter.getAutoIncPrimaryKeyCol(insert.table)) { + case {type: DInt(Big, _, _, _)}: + // insertId is always a number even if bigNumberStrings is enabled + // https://github.com/mysqljs/mysql/issues/2460 + new Id64(Int64.fromFloat(insertId)); + case _: + new Id(insertId); + } + p; + }); case Update(_): - fetch().next(res -> {rowsAffected: (res.changedRows: Int)}); + fetch().next(res -> {rowsAffected: (res.changedRows : Int)}); case Delete(_): - fetch().next(res -> {rowsAffected: (res.affectedRows: Int)}); + fetch().next(res -> {rowsAffected: (res.affectedRows : Int)}); case ShowColumns(_): fetch().next((res:Array) -> res.map(formatter.parseColumn)); case ShowIndex(_): @@ -196,7 +202,7 @@ class MySqlConnection implements Connection implements Sanitizer { return run({sql: sql}); } - function queryOptions(query:Query): QueryOptions { + function queryOptions(query:Query):QueryOptions { final sql = formatter.format(query).toString(this); #if sql_debug trace(sql); @@ -209,29 +215,32 @@ class MySqlConnection implements Connection implements Sanitizer { } } - function stream(options: QueryOptions):Stream { + function stream(options:QueryOptions):Stream { return cnx.next(cnx -> { final query = cnx.query(options); Stream.ofNodeStream('query', query.stream({highWaterMark: 1024}), {onEnd: autoRelease ? cnx.release : null}); }); } - function run(options: QueryOptions):Promise + function run(options:QueryOptions):Promise return cnx.next(cnx -> { new Promise((resolve, reject) -> { cnx.query(options, (err, res) -> { - if(autoRelease) cnx.release(); - if (err != null) reject(Error.ofJsError(err)); - else resolve(cast res); + if (autoRelease) + cnx.release(); + if (err != null) + reject(Error.ofJsError(err)); + else + resolve(cast res); }); null; // irreversible, we always want to wait for the query to finish }); }); - function typeCast(field, next): Any { + function typeCast(field, next):Any { return switch field.type { case 'GEOMETRY': - switch (field.buffer(): Buffer) { + switch (field.buffer() : Buffer) { case null: null; case v: @:privateAccess new ResultParser().parseGeometryValue(v.hxToBytes()); } @@ -275,6 +284,7 @@ private typedef NativeConfig = { final ?flags:String; final ?ssl:Any; } + private typedef NativePoolConfig = NativeConfig & { final ?acquireTimeout:Int; final ?waitForConnections:Int; @@ -291,16 +301,17 @@ private typedef QueryOptions = { extern class NativeConnectionPool extends js.node.events.EventEmitter { function getConnection(cb:JsError->NativeConnection->Void):Void; } + extern class NativeConnection { - @:overload(function (q: QueryOptions, cb:JsError->Dynamic->Void):Void {}) - function query(q: QueryOptions):NativeQuery; + @:overload(function(q:QueryOptions, cb:JsError->Dynamic->Void):Void {}) + function query(q:QueryOptions):NativeQuery; function pause():Void; function resume():Void; function release():Void; } + extern class NativeQuery extends EventEmitter> { function stream(?opt:{?highWaterMark:Int}):NativeStream; } extern class NativeStream extends Readable {} -