Commit c55fc2cf by Tomasz Ciesielczyk

thread leak fix

parent 6ecaf64f
......@@ -14,6 +14,7 @@ import psnc.m2dc.rtm.pcm.algorithm.Solver
import psnc.m2dc.rtm.pcm.algorithm.Utils
import psnc.m2dc.rtm.properties.RTMProperties
import psnc.m2dc.rtm.properties.keys.Keys
import psnc.m2dc.rtm.rpm.ManagementExecutor
import psnc.m2dc.rtm.rpm.RPMApi
import psnc.m2dc.rtm.utils.loadJsonFile
import psnc.m2dc.rtm.utils.logDebug
......@@ -57,8 +58,13 @@ class RTMApi private constructor() {
}
/**
* function used for tracking thread leak
*/
private fun javaState(): String {
val sb =StringBuilder()
val sb = StringBuilder()
Thread.getAllStackTraces().keys.forEach {
sb.append("${it.name}(${it.id}): ${it.isAlive} \n")
}
......@@ -67,9 +73,7 @@ class RTMApi private constructor() {
fun checkConf() {
log.info("Start rest service check!")
printConsole(javaState(),false)
ServerInitializer.check()
printConsole(javaState(),false)
val nodes = DAO.Resource.getNodes()
nodes.toJson(RTMProperties.parsePath("${RTMProperties.basePath}/check_node_list.json").toString())
var actionList = ArrayList<ResourceAction>()
......@@ -77,6 +81,7 @@ printConsole(javaState(),false)
nodes.values.forEach { node ->
Utils.getPropertyAction(node.id, 0.0, null)?.let { actionList.addAll(it) }
}
actionList.toJson(RTMProperties.parsePath("${RTMProperties.basePath}/check_reset_actions.json").toString())
actionList.forEach { action ->
RPMApi.instance().startAction(action).let { result ->
......@@ -86,7 +91,6 @@ printConsole(javaState(),false)
actionList.toJson(RTMProperties.parsePath("${RTMProperties.basePath}/check_reset_actions_result.json").toString())
printConsole(javaState(),false)
Thread.sleep(90000)
RPMApi.instance().getPowerUsageList().toJson(RTMProperties.parsePath("${RTMProperties.basePath}/check_power_usage.json").toString())
//init solvers in order to dump their state
......@@ -107,7 +111,6 @@ printConsole(javaState(),false)
}
}
actionList.toJson(RTMProperties.parsePath("${RTMProperties.basePath}/check_reset_actions_result_100.json").toString())
printConsole(javaState(),false)
Thread.sleep(90000)
limits = PCMApi.instance().checkLimits()
state = CurrentState(limits.currentPowerUsage)
......@@ -115,7 +118,12 @@ printConsole(javaState(),false)
RPMApi.instance().getPowerUsageList().toJson(RTMProperties.parsePath("${RTMProperties.basePath}/check_power_usage_100.json").toString())
// ESMAlgorithm(state)
printConsole("End conf check ")
printConsole(javaState(),false)
if (ManagementExecutor.countRunning() > 0) {
ManagementExecutor.listRunning().toJson(true)
Thread.sleep(30000)
}
ManagementExecutor.stop()
}
......@@ -147,6 +155,11 @@ printConsole(javaState(),false)
}
}
if (ManagementExecutor.countRunning() > 0) {
ManagementExecutor.listRunning().toJson(true)
Thread.sleep(30000)
}
ManagementExecutor.stop()
}
......
......@@ -11,38 +11,29 @@ import psnc.m2dc.rtm.pcm.PCMApi
import psnc.m2dc.rtm.properties.keys.Keys
import psnc.m2dc.rtm.rest.Handlers
import psnc.m2dc.rtm.rest.auth.RESTAuth
import psnc.m2dc.rtm.utils.*
import psnc.m2dc.rtm.utils.Timer
import psnc.m2dc.rtm.utils.logError
import psnc.m2dc.rtm.utils.logInfo
import psnc.m2dc.rtm.utils.setWatch
import spark.Spark
/**
* Server's initialization methods
*/
object ServerInitializer {
private fun javaState(): String {
val sb =StringBuilder()
Thread.getAllStackTraces().keys.forEach {
sb.append("${it.name}(${it.id}): ${it.isAlive} \n")
}
return sb.toString()
}
private fun init() {
try {
logInfo("version: $APP_VERSION")
loadConfig()// -> tu zglasza null pointer
loadResources()
printConsole("x1: "+javaState(),false)
PCMApi.init()
printConsole("x2.2: "+javaState(),false)
ESMApi.init()
printConsole("x2: "+javaState(),false)
setDefaultPriorities()
printConsole("x3: "+javaState(),false)
LightStorage.saveStorage()
printConsole("x4: "+javaState(),false)
// StorageKeys.POWER_LIMIT = Values.powerLimit(440.0)
// val a =PCMApi.instance().predictPCMActions()
// a.generateScripts().toJson(true)
......
package psnc.m2dc.rtm.commands.java
val APP_VERSION = "1.0.548_47218"
\ No newline at end of file
val APP_VERSION = "1.0.548_54626"
\ No newline at end of file
package psnc.m2dc.rtm.client
const val APP_VERSION = "1.1.548_47218"
\ No newline at end of file
const val APP_VERSION = "1.1.548_54626"
\ No newline at end of file
......@@ -52,7 +52,8 @@ class RPMApiImpl : RPMApi {
override fun startAction(result: ResourceActionResult): ResourceActionResult {
printConsole("startaction: " + (Exception().stackTrace.toJson()))
val worker = ManagementWorkerImpl(result.resourceId + "_" + result.properties[ResourceAction.PROPERTY_ACTION_NAME], { actionTask(result) })
val worker = ManagementWorkerImpl(result.resourceId + "_" + result.powerAction.name, { actionTask(result) })
ManagementExecutor.executeTask(worker, TaskExecutor.ExecutionMode.WAIT)
return result
}
......
......@@ -165,11 +165,8 @@ abstract class TaskExecutor {
// return this.currentlyExecuting.containsKey(id)
// }
// fun printCurrent(){
// this.currentlyExecuting.keys.forEach {
// printConsole("xxx: ${it}")
// }
// }
fun countRunning() = this.currentlyExecuting.size
fun listRunning() = this.currentlyExecuting.keys
fun getTasks(id: String) = this.currentlyExecuting[id]
private fun <T> removeTask(task: ManagementWorker<T>) {
......@@ -248,8 +245,16 @@ abstract class TaskExecutor {
}
}
fun stop() {
this.executor.shutdown()
fun stop(force: Boolean = false) {
if (this.executor.isShutdown) {
logWarn("Task Executor is shutdown!")
return
}
if (force) {
this.executor.shutdownNow()
} else
this.executor.shutdown()
}
}
\ No newline at end of file
......@@ -15,8 +15,10 @@ import java.io.File
import java.io.IOException
import java.io.InputStreamReader
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
private val MAP_TYPE = object : TypeToken<HashMap<String, HashMap<String, Any>>>() {
}.type
......@@ -242,6 +244,40 @@ open class SSHApiImpl private constructor(val resource: Resource) : SSHApi {
}
fun getOutput(cmd : SSHCommand,process: Process,stdOut : StringBuilder,stdErr:StringBuilder): Pair<Any, Any> {
val newFixedThreadPool = Executors.newFixedThreadPool(2)
val output = newFixedThreadPool.submit {
var reader = BufferedReader(InputStreamReader(process.inputStream))
var line = reader.readLine()
while (line != null) {
stdOut.append(line).append("\n")
line = reader.readLine()
}
stdOut.toString()
}
val error = newFixedThreadPool.submit {
var reader = BufferedReader(InputStreamReader(process.errorStream))
var line = reader.readLine()
while (line != null) {
stdErr.append(line).append("\n")
line = reader.readLine()
}
stdErr.toString()
}
newFixedThreadPool.shutdown()
// process.waitFor();
if (!process.waitFor(SSH_COMMAND_TIMEOUT, TimeUnit.MILLISECONDS)) {
println("Destroy")
logWarn("SSH Command timeout: ${cmd.build().toJson()}" )
process.destroy()
}
return Pair(output.get(), error.get())
}
private fun runCommand(cmd: SSHCommand): String {
val stdOut = StringBuilder()
......@@ -254,26 +290,36 @@ open class SSHApiImpl private constructor(val resource: Resource) : SSHApi {
// printConsole("running ssh cmd2: " + cmd.toString())
p = Runtime.getRuntime().exec(array)
printConsole("pwait for : ${cmd.host}")
p.waitFor(SSH_COMMAND_TIMEOUT,TimeUnit.MILLISECONDS)
printConsole("pwaited for : ${cmd.host}")
var reader = BufferedReader(InputStreamReader(p.inputStream))
// printConsole("pwait for : ${cmd.host}")
this.getOutput(cmd,p,stdOut,stdErr).toJson(true)
// p.waitFor(SSH_COMMAND_TIMEOUT, TimeUnit.MILLISECONDS)
var line = reader.readLine()
while (line != null) {
stdOut.append(line).append("\n")
line = reader.readLine()
}
printConsole("pwait for read : ${cmd.host}")
reader = BufferedReader(InputStreamReader(p.errorStream))
line = reader.readLine()
while (line != null) {
stdErr.append(line).append("\n")
line = reader.readLine()
}
printConsole("pwait for read2 : ${cmd.host}")
// printConsole("pwaited for : ${cmd.host}")
// var reader = BufferedReader(InputStreamReader(p.inputStream))
//
// if (cmd.host == "10.12.10.18") {
// printConsole("${cmd.host} before readline: ")
// }
// var line = reader.readLine()
// if (cmd.host == "10.12.10.18") {
// printConsole("${cmd.host} readline: $line")
// }
// while (line != null) {
// if (cmd.host == "10.12.10.18") {
// printConsole("${cmd.host} readline: $line")
// }
// stdOut.append(line).append("\n")
// line = reader.readLine()
// }
// printConsole("pwait for read : ${cmd.host}")
// reader = BufferedReader(InputStreamReader(p.errorStream))
//
// line = reader.readLine()
// while (line != null) {
// stdErr.append(line).append("\n")
// line = reader.readLine()
// }
// printConsole("pwait for read2 : ${cmd.host}")
if (Keys.Default.debug.get()) {
stdOut.toString()
.let { if (it.length > 250) it.substring(0, 200) else it }
......@@ -292,11 +338,11 @@ open class SSHApiImpl private constructor(val resource: Resource) : SSHApi {
throw Exception(e)
}
log.debug(stdOut.toString())
printConsole("output for ${cmd.host} :$stdOut")
// printConsole("output for ${cmd.host} :$stdOut")
return if (stdOut.length > 1)
stdOut.toString().trim().let { printConsole("ssh result: ${it.take(256).replace("\n"," ")}"); it }
stdOut.toString().trim().let { printConsole("ssh result: ${it.take(256).replace("\n", " ")}"); it }
else {
stdErr.toString().trim().let { printConsole("ssh err ${cmd.build().toJson() } :$it"); it }
stdErr.toString().trim().let { printConsole("ssh err ${cmd.build().toJson()} :$it"); it }
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment