Package Ganga :: Package Utility :: Package AMGAServerTools :: Module Commands
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Utility.AMGAServerTools.Commands

  1  #!/bin/env python 
  2  #---------------------------------------------------------------------------- 
  3  # Name:         Commands.py 
  4  # Purpose:      Thread wrapper for the system commands. 
  5  # 
  6  # Author:       Alexander Soroko 
  7  # 
  8  # Created:      05/02/2003 
  9  #---------------------------------------------------------------------------- 
 10   
 11  import os 
 12  import sys 
 13  import time 
 14  import threading 
 15  import popen2 
 16  import signal 
 17  import PipeReader 
 18   
 19  MIN_TIMEOUT = 0.01 
 20   
 21  if sys.platform == 'win32': 
 22      try: 
 23          import win32api 
 24          import win32pipe 
 25          import win32file 
 26          import win32process 
 27          import win32security 
 28          import win32con 
 29          import win32pdh 
 30          import win32pdhutil 
 31          import msvcrt 
 32      except ImportError: 
 33          WIN_EXT = 0 
 34      else: 
 35          WIN_EXT = 1 
 36          MIN_TIMEOUT = 0.1 # on Win32 IsRunning command is slow!  
 37       
 38  ################################################################################ 
39 -class winPopen:
40 """Class to emulate popen2.Popen3 object on Windows"""
41 - def __init__(self, cmd, capturestderr = False, bufsize = None):
42 try: 43 if WIN_EXT: 44 # security attributes for pipes 45 sAttrs = win32security.SECURITY_ATTRIBUTES() 46 sAttrs.bInheritHandle = 1 47 48 # create pipes 49 hStdin_r, self.hStdin_w = win32pipe.CreatePipe(sAttrs, 0) 50 self.hStdout_r, hStdout_w = win32pipe.CreatePipe(sAttrs, 0) 51 if capturestderr: 52 self.hStderr_r, hStderr_w = win32pipe.CreatePipe(sAttrs, 0) 53 54 # set the info structure for the new process. 55 StartupInfo = win32process.STARTUPINFO() 56 StartupInfo.hStdInput = hStdin_r 57 StartupInfo.hStdOutput = hStdout_w 58 if capturestderr: 59 StartupInfo.hStdError = hStderr_w 60 else: 61 StartupInfo.hStdError = hStdout_w 62 StartupInfo.dwFlags = win32process.STARTF_USESTDHANDLES 63 StartupInfo.dwFlags = (StartupInfo.dwFlags| 64 win32process.STARTF_USESHOWWINDOW) 65 StartupInfo.wShowWindow = win32con.SW_HIDE 66 67 # Create new output read handles and the input write handle. Set 68 # the inheritance properties to FALSE. Otherwise, the child inherits 69 # the these handles; resulting in non-closeable handles to the pipes 70 # being created. 71 pid = win32api.GetCurrentProcess() 72 pp = ['hStdin_w','hStdout_r'] 73 if capturestderr: 74 pp.append('hStderr_r') 75 for hh in pp: 76 handle = getattr(self, hh) 77 tmp = win32api.DuplicateHandle( 78 pid, 79 handle, 80 pid, 81 0, 82 0, # non-inheritable!! 83 win32con.DUPLICATE_SAME_ACCESS) 84 # Close the inhertible version of the handle 85 win32file.CloseHandle(handle) 86 setattr(self, hh, tmp) 87 88 # start the process. 89 hProcess, hThread, dwPid, dwTid = win32process.CreateProcess( 90 None, # program 91 cmd, # command line 92 None, # process security attributes 93 None, # thread attributes 94 1, # inherit handles, or USESTDHANDLES won't work. 95 # creation flags. Don't access the console. 96 0, # Don't need anything here. 97 # If you're in a GUI app, you should use 98 # CREATE_NEW_CONSOLE here, or any subprocesses 99 # might fall victim to the problem described in: 100 # KB article: Q156755, cmd.exe requires 101 # an NT console in order to perform redirection.. 102 # win32process.CREATE_NEW_CONSOLE, 103 None, # no new environment 104 None, # current directory (stay where we are) 105 StartupInfo) 106 # normally, we would save the pid etc. here... 107 self._hProcess = hProcess 108 self._hThread = hThread 109 self._dwTid = dwTid 110 self.pid = dwPid 111 112 # Child is launched. Close the parents copy of those pipe handles 113 # that only the child should have open. 114 # You need to make sure that no handles to the write end of the 115 # output pipe are maintained in this process or else the pipe will 116 # not close when the child process exits and the ReadFile will hang. 117 if capturestderr: 118 win32file.CloseHandle(hStderr_w) 119 win32file.CloseHandle(hStdout_w) 120 win32file.CloseHandle(hStdin_r) 121 122 cfd_fun = msvcrt.open_osfhandle 123 md = os.O_TEXT 124 self.tochild = os.fdopen(cfd_fun(self.hStdin_w, md),"w") 125 self.fromchild = os.fdopen(cfd_fun(self.hStdout_r, md),"r") 126 if capturestderr: 127 self.childerr = os.fdopen(cfd_fun(self.hStderr_r, md),"r") 128 129 else: 130 raise Exception("Error using Windows extensions") 131 132 except: 133 self._hProcess = None 134 self._hThread = None 135 self._dwTid = None 136 self.pid = None 137 if capturestderr: 138 pfactory = popen2.popen3 139 else: 140 pfactory = popen2.popen4 141 if bufsize: 142 pipes = pfactory(cmd, bufsize) 143 else: 144 pipes = pfactory(cmd) 145 if capturestderr: 146 (self.fromchild, self.tochild, self.childerr) = pipes 147 else: 148 (self.fromchild, self.tochild) = pipes
149 150 #---------------------------------------------------------------------------
151 - def poll(self):
152 if self.pid: 153 if IsRunning(self.pid): 154 return -1 155 elif WIN_EXT and self._hProcess: 156 try: 157 return win32process.GetExitCodeProcess(self._hProcess) 158 except: 159 return
160 161 #---------------------------------------------------------------------------
162 - def wait(self):
163 while 1: 164 status = self.poll() 165 if status == -1: 166 time.sleep(MIN_TIMEOUT) 167 else: 168 return status
169 170 ################################################################################ 171 # Wrapper for command
172 -class Command:
173 """Class to submit a command to the operative system""" 174
175 - def __init__(self, cmd, std_in = None, timeout = 30, 176 pipesize=0, blocksize=1024, capturestderr = False):
177 """ 178 cmd = command to be executed. 179 std_in = input for the command. 180 timeout = timeout between blocks of the command output (in seconds). 181 pipesize = the size (in blocks) of the queue used to buffer 182 the blocks read. 183 blocksize = the maximum block size for a raw read. 184 capturestderr if True tells to merge std_out and std_err of the command. 185 """ 186 187 # initialization 188 self._cmd = cmd 189 if std_in: 190 self._std_in = std_in 191 else: 192 self._std_in = [] 193 self._timeout = timeout 194 self._capturestderr = capturestderr 195 196 if sys.platform == 'win32': 197 self._pipe_obj = winPopen(self._cmd, capturestderr) 198 else: 199 if capturestderr: 200 self._pipe_obj = popen2.Popen3(self._cmd, 1) 201 else: 202 self._pipe_obj = popen2.Popen4(self._cmd) 203 204 if capturestderr: 205 out_pipes = [self._pipe_obj.fromchild, self._pipe_obj.childerr] 206 else: 207 out_pipes = [self._pipe_obj.fromchild] 208 pipe_in = self._pipe_obj.tochild 209 self._pid = self._pipe_obj.pid 210 211 # write std_in 212 try: 213 try: 214 for line in self._std_in: 215 if line and line[-1] != '\n': 216 line += '\n' 217 pipe_in.write(line) 218 finally: 219 pipe_in.close() 220 except: 221 pass 222 223 # start reading output 224 self._nbpipes = [] 225 for pipe in out_pipes: 226 self._nbpipes.append(PipeReader.PipeReader(pipe, timeout, 227 pipesize, blocksize))
228 229 #---------------------------------------------------------------------------
230 - def __del__(self):
231 self.finalize()
232 233 #---------------------------------------------------------------------------
234 - def submit(self):
235 """Legacy method. Deprecated.""" 236 pass
237 238 #---------------------------------------------------------------------------
239 - def getInput(self):
240 """Return list of input strings.""" 241 return self._std_in
242 243 #---------------------------------------------------------------------------
244 - def getStatus(self):
245 return self._pipe_obj.poll()
246 247 #---------------------------------------------------------------------------
248 - def isRunning(self):
249 """Shows is command running or not. 250 If there is no way to establish this (win32 without extensions) 251 always returns True.""" 252 if self.getStatus() in [-1, None]: 253 return 1 254 else: 255 return 0
256 257 #---------------------------------------------------------------------------
258 - def _readlines(self, output):
259 lines = [] 260 rest = output 261 while rest: 262 end = rest.find('\n') + 1 263 if end > 0: 264 lines.append(rest[:end]) 265 rest = rest[end:] 266 else: 267 lines.append(rest) 268 rest = '' 269 return lines
270 271 #---------------------------------------------------------------------------
272 - def readOutput(self, pipe_ind, maxblocks=0, timeout=None):
273 """Read no more than maxblocks from out pipe i. 274 i = 0 std_out (or std_out + std_err) 275 i = 1 std_err. 276 If maxblocks = 0 (default) read till the end of data or timeout 277 between blocks arrival""" 278 pobj = self._nbpipes[pipe_ind] 279 data = pobj.read(maxblocks, timeout, condition = self.isRunning) 280 return self._readlines(data)
281 282 #---------------------------------------------------------------------------
283 - def finalize(self):
284 """Tries to kill command process, 285 close pipes and stop threads. 286 Can block on win32 without win extensions.""" 287 if self.isRunning(): 288 if self._pid: 289 if not Kill(self._pid): 290 print "Warning! Command process is still running" 291 for pipe in self._nbpipes: 292 pipe.stop() 293 try: 294 pipe.rfile.close() 295 except: 296 pass
297 298 #---------------------------------------------------------------------------
299 - def getOutput(self, maxblocks=0):
300 """getOutput([maxblocks]) 301 --> [std_out and std_err lines], 302 if capturestderr = True 303 --> ([std_out lines], [std_err lines]), 304 if capturestderr = False""" 305 output = [] 306 for i in range(len(self._nbpipes)): 307 try: 308 outlines = self.readOutput(i, maxblocks) 309 except: 310 output.append([]) 311 else: 312 output.append(outlines) 313 314 # try to close files and stop threads 315 self.finalize() 316 317 if self._capturestderr: 318 return tuple(output) 319 320 return output[0]
321 322 #---------------------------------------------------------------------------
323 - def getStatusOutput(self, maxblocks=0):
324 """getStatusOutput([maxblocks]) --> (status, getOutput())""" 325 output = self.getOutput(maxblocks) 326 status = self.getStatus() 327 return (status, output)
328 329 330 ################################################################################ 331 # Command to kill a process
332 -def Kill(pid, exitCode = 0):
333 """Wrapper for os.kill() to kill a process 334 Kill(pid [, exitCode]) --> status. 335 exitCode is relevant only for win32 platform""" 336 try: 337 if sys.platform == 'win32': 338 if WIN_EXT: 339 try: 340 h = win32api.OpenProcess(win32con.PROCESS_TERMINATE,0,pid) 341 win32api.TerminateProcess(h, exitCode) 342 finally: 343 win32api.CloseHandle(h) 344 else: 345 raise NotImplementedError() 346 else: 347 os.kill(pid, signal.SIGTERM) 348 except: 349 return 0 350 else: 351 return 1
352 353 ################################################################################ 354 # returns performance attributes on windows for all processes
355 -def winAllProcesses(object = "Process", 356 format = None, 357 machine = None, 358 bRefresh = 1):
359 """Return a tuple of a list of process attributes and a 360 list with the requested attributes for all processes. 361 Run only on win32 with windows extensions. 362 """ 363 if not format: 364 format = win32pdh.PDH_FMT_LONG 365 366 if bRefresh: # PDH docs say this is how you do a refresh. 367 win32pdh.EnumObjects(None, machine, 0, 1) 368 369 counter = "ID Process" 370 items, instances = win32pdh.EnumObjectItems(None,None,object, -1) 371 # Track multiple instances. 372 instance_dict = {} 373 for instance in instances: 374 try: 375 instance_dict[instance] += 1 376 except KeyError: 377 instance_dict[instance] = 0 378 379 items = [counter] + items[:5] 380 all_pr_attr = [] 381 get_attr = win32pdhutil.GetPerformanceAttributes 382 for instance, max_instances in instance_dict.items(): 383 for inum in xrange(max_instances+1): 384 try: 385 attr_list = [] 386 for item in items: 387 attr_list.append(get_attr(object, item, instance, 388 inum, format, machine)) 389 except: 390 continue 391 all_pr_attr.append(attr_list) 392 393 return (items, all_pr_attr)
394 395 ################################################################################
396 -def ListAllProcesses():
397 """List attributes for all running processes.""" 398 try: 399 if sys.platform == 'win32': 400 if WIN_EXT: 401 items, attr = winAllProcesses() 402 items_str = ' \t'.join(map(lambda x: str(x), items))+'\n' 403 return (items_str, attr) 404 else: 405 uid = os.getuid() 406 cmd_line = 'ps -U ' + str(uid) + ' -flw' 407 command = Command(cmd_line) 408 status, output = command.getStatusOutput() 409 if status == 0 and len(output) > 1: 410 attr = [] 411 for line in output[1:]: 412 stat_fields = line.split() 413 attr.append(stat_fields) 414 return (output[0], attr) 415 except: 416 pass 417 return ('', [])
418 419 ################################################################################
420 -def GetProcessAttributes(pid):
421 proc_list = ListAllProcesses()[1] 422 for attr in proc_list: 423 if sys.platform == 'win32': 424 if len(attr) > 0 and attr[0] == pid: 425 break 426 else: 427 if len(attr) > 3 and attr[3] == str(pid): 428 break 429 else: 430 return [] 431 return attr
432 433 ################################################################################ 434 # Command to get process status
435 -def IsRunning(pid):
436 """IsRunning(pid) --> status 437 pid = pricess ID. 438 status = True or False""" 439 attr = GetProcessAttributes(pid) 440 if sys.platform == 'win32': 441 if len(attr) > 0 and attr[0] == pid: 442 return 1 443 else: 444 if len(attr) > 3 and attr[3] == str(pid): 445 if attr[1] != 'Z': 446 return 1 447 return 0
448 449 ################################################################################ 450 # helper function
451 -def submitCmd(cmd_line, std_in = [], timeout = 120.0):
452 command = Command(cmd_line, std_in, timeout) 453 status, output = command.getStatusOutput() 454 if status: 455 # status not in [0, None] 456 executed = 0 457 else: 458 executed = 1 459 return (executed, output)
460 461 ################################################################################ 462