Skip to content
This repository has been archived by the owner on Mar 30, 2021. It is now read-only.

Commit

Permalink
clear cancelled tasks once handled in TaskCancelHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
Harish Butani committed Dec 24, 2016
1 parent 3745ef7 commit aed1c08
Showing 1 changed file with 3 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/main/scala/org/sparklinedata/druid/DruidRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ object TaskCancelHandler extends Logging {
while(true) {
Thread.sleep(secs5)
log.debug(s"cancelThread woke up")
var canceledTasks : Seq[String] = Seq()
taskMap.foreach{t =>
val (queryId, (req, cancellableHolder, taskContext)) = t
log.debug(s"checking task stageid=${taskContext.stageId()}, " +
Expand All @@ -463,12 +464,14 @@ object TaskCancelHandler extends Logging {
cancellableHolder.wasCancelTriggered = true
req.cancel()
log.info("aborted http request for query {}: {}", Array[Any](queryId, req))
canceledTasks = canceledTasks :+ queryId
} catch {
case e : Throwable => log.warn("failed to abort http request: {}", req)
}
}

}
canceledTasks.foreach(t => clearQueryId(t))
}

}
Expand Down

0 comments on commit aed1c08

Please sign in to comment.