# -*- coding: utf-8 -
#
# This file is part of gaffer. See the NOTICE for more information.
"""
The process module wrap a process and IO redirection
"""
from datetime import timedelta
import os
import signal
import shlex
import pyuv
import psutil
from psutil.error import AccessDenied, NoSuchProcess
import six
from .events import EventEmitter
from .util import (bytestring, getcwd, check_uid, check_gid,
bytes2human, substitute_env)
from .sync import atomic_read, increment, decrement
pyuv.Process.disable_stdio_inheritance()
[docs]def get_process_info(process=None, interval=0):
"""Return information about a process. (can be an pid or a Process object)
If process is None, will return the information about the current process.
"""
if process is None:
process = psutil.Process(os.getpid())
info = {}
try:
mem_info = process.get_memory_info()
info['mem_info1'] = bytes2human(mem_info[0])
info['mem_info2'] = bytes2human(mem_info[1])
except AccessDenied:
info['mem_info1'] = info['mem_info2'] = "N/A"
try:
info['cpu'] = process.get_cpu_percent(interval=interval)
except AccessDenied:
info['cpu'] = "N/A"
try:
info['mem'] = round(process.get_memory_percent(), 1)
except AccessDenied:
info['mem'] = "N/A"
try:
cpu_times = process.get_cpu_times()
ctime = timedelta(seconds=sum(cpu_times))
ctime = "%s:%s.%s" % (ctime.seconds // 60 % 60,
str((ctime.seconds % 60)).zfill(2),
str(ctime.microseconds)[:2])
except AccessDenied:
ctime = "N/A"
info['ctime'] = ctime
try:
info['pid'] = process.pid
except AccessDenied:
info['pid'] = 'N/A'
try:
info['username'] = process.username
except AccessDenied:
info['username'] = 'N/A'
try:
info['nice'] = process.nice
except AccessDenied:
info['nice'] = 'N/A'
except NoSuchProcess:
info['nice'] = 'Zombie'
try:
cmdline = os.path.basename(shlex.split(process.cmdline[0])[0])
except (AccessDenied, IndexError):
cmdline = "N/A"
info['cmdline'] = cmdline
info['children'] = []
for child in process.get_children():
info['children'].append(get_process_info(psutil.Process(child),
interval=interval))
return info
[docs]class RedirectIO(object):
pipes_count = 2
def __init__(self, loop, process, stdio=[]):
self.loop = loop
self.process = process
self._emitter = EventEmitter(loop)
self._stdio = []
self._channels = []
# create (channel, stdio) pairs
for label in stdio[:self.pipes_count]:
# io registered can any label, so it's easy to redirect
# stderr to stdout, just use the same label.
p = pyuv.Pipe(loop)
io = pyuv.StdIO(stream=p, flags=pyuv.UV_CREATE_PIPE | \
pyuv.UV_READABLE_PIPE | \
pyuv.UV_WRITABLE_PIPE)
setattr(p, 'label', label)
self._channels.append(p)
self._stdio.append(io)
# create remaining pipes
for _ in range(self.pipes_count - len(self._stdio)):
self._stdio.append(pyuv.StdIO(flags=pyuv.UV_IGNORE))
[docs] def start(self):
# start reading
for p in self._channels:
p.start_read(self._on_read)
@property
[docs] def stdio(self):
return self._stdio
[docs] def subscribe(self, label, listener):
self._emitter.subscribe(label, listener)
[docs] def unsubscribe(self, label, listener):
self._emitter.unsubscribe(label, listener)
[docs] def stop(self, all_events=False):
for p in self._channels:
if p.active:
p.close()
if all_events:
self._emitter.close()
def _on_read(self, handle, data, error):
if not data:
return
label = getattr(handle, 'label')
msg = dict(event=label, name=self.process.name,
pid=self.process.id, data=data)
self._emitter.publish(label, msg)
[docs]class RedirectStdin(object):
""" redirect stdin allows multiple sender to write to same pipe """
def __init__(self, loop, process):
self.loop = loop
self.process = process
self.channel = pyuv.Pipe(loop)
self.stdio = pyuv.StdIO(stream=self.channel,
flags=pyuv.UV_CREATE_PIPE | \
pyuv.UV_READABLE_PIPE | \
pyuv.UV_WRITABLE_PIPE )
self._emitter = EventEmitter(loop)
[docs] def start(self):
self._emitter.subscribe("WRITE", self._on_write)
self._emitter.subscribe("WRITELINES", self._on_writelines)
[docs] def write(self, data):
self._emitter.publish("WRITE", data)
[docs] def writelines(self, data):
self._emitter.publish("WRITELINES", data)
[docs] def stop(self, all_events=False):
if self.channel.active:
self.channel.close()
if all_events:
self._emitter.close()
def _on_write(self, evtype, data):
self.channel.write(data)
def _on_writelines(self, evtype, data):
self.channel.writelines(data)
def _on_read(self, handle, data, error):
if not data:
return
label = getattr(handle, 'label')
msg = dict(event=label, name=self.process.name,
pid=self.process.id, data=data)
self._emitter.publish(label, msg)
[docs]class Stream(RedirectStdin):
""" create custom stdio """
def __init__(self, loop, process, id):
super(Stream, self).__init__(loop, process)
self.id = id
[docs] def start(self):
super(Stream, self).start()
self.channel.start_read(self._on_read)
[docs] def subscribe(self, listener):
self._emitter.subscribe('READ', listener)
[docs] def unsubscribe(self, listener):
self._emitter.unsubscribe('READ', listener)
def _on_read(self, handle, data, error):
if not data:
return
msg = dict(event='READ', name=self.process.name,
pid=self.process.id, data=data)
self._emitter.publish('READ', msg)
[docs]class ProcessWatcher(object):
""" object to retrieve process stats """
def __init__(self, loop, pid):
self.loop = loop
self.pid = pid
self._process = psutil.Process(pid)
self._last_info = None
self.on_refresh_cb = None
self._active = 0
self._refcount = 0
self._emitter = EventEmitter(loop)
@property
[docs] def active(self):
return atomic_read(self._active) > 0
[docs] def subscribe(self, listener):
self._refcount = increment(self._refcount)
self._emitter.subscribe("stat", listener)
self._start()
[docs] def subscribe_once(self, listener):
self._emitter.subscribe_once("stat", listener)
[docs] def unsubscribe(self, listener):
self._emitter.unsubscribe(".", listener)
self._refcount = decrement(self._refcount)
if not atomic_read(self._refcount):
self.stop()
[docs] def stop(self, all_events=False):
if self.active:
self._active = decrement(self._active)
self._timer.stop()
if all_events:
self._emitter.close()
def _async_refresh(self, handle):
self._last_info = self.refresh()
self._emitter.publish("stat", self._last_info)
[docs] def refresh(self, interval=0):
return get_process_info(self._process, interval=interval)
def _start(self):
if not self.active:
self._timer = pyuv.Timer(self.loop)
# start the timer to refresh the informations
# 0.1 is the minimum interval to fetch cpu stats for this
# process.
self._timer.start(self._async_refresh, 0.1, 0.1)
self._active = increment(self._active)
[docs]class Process(object):
""" class wrapping a process
Args:
- **loop**: main application loop (a pyuv Loop instance)
- **name**: name of the process
- **cmd**: program command, string)
- **args**: the arguments for the command to run. Can be a list or
a string. If **args** is a string, it's splitted using
:func:`shlex.split`. Defaults to None.
- **env**: a mapping containing the environment variables the command
will run with. Optional
- **uid**: int or str, user id
- **gid**: int or st, user group id,
- **cwd**: working dir
- **detach**: the process is launched but won't be monitored and
won't exit when the manager is stopped.
- **shell**: boolean, run the script in a shell. (UNIX
only)
- **redirect_output**: list of io to redict (max 2) this is a list of custom
labels to use for the redirection. Ex: ["a", "b"] will
redirect stdoutt & stderr and stdout events will be labeled "a"
- **redirect_input**: Boolean (False is the default). Set it if
you want to be able to write to stdin.
- **custom_streams**: list of additional streams that should be created
and passed to process. This is a list of streams labels. They become
available through :attr:`streams` attribute.
- **custom_channels**: list of additional channels that should be passed to
process.
"""
def __init__(self, loop, id, name, cmd, group=None, args=None, env=None,
uid=None, gid=None, cwd=None, detach=False, shell=False,
redirect_output=[], redirect_input=False, custom_streams=[],
custom_channels=[], on_exit_cb=None):
self.loop = loop
self.id = id
self.name = name
self.group = group
self.cmd = cmd
self.env = env or {}
# set command
self.cmd = bytestring(cmd)
if args is not None:
if isinstance(args, six.string_types):
self.args = shlex.split(bytestring(args))
else:
self.args = [bytestring(arg) for arg in args]
else:
args_ = shlex.split(self.cmd)
if len(args_) == 1:
self.args = []
else:
self.cmd = args_[0]
self.args = args_[1:]
# replace envirnonnement variable in args
# $PORT for example will become the given env variable.
self.args = [substitute_env(arg, self.env) for arg in self.args]
if shell:
self.args = ['-c', self.cmd] + self.args
self.cmd = "sh"
self.uid = uid
if self.uid is not None:
self.uid = check_uid(uid)
self.gid = gid
if self.gid is not None:
self.gid = check_gid(gid)
self.cwd = cwd or getcwd()
self.redirect_output = redirect_output
self.redirect_input = redirect_input
self.custom_streams = custom_streams
self.custom_channels = custom_channels
self._redirect_io = None
self._redirect_in = None
self.streams = {}
self.detach = detach
self.on_exit_cb = on_exit_cb
self._process = None
self._pprocess = None
self._process_watcher = None
self._cached_pid = None
self.stopped = False
self.graceful_time = 0
self._setup_stdio()
def _setup_stdio(self):
# for now we ignore all stdin
if not self.redirect_input:
self._stdio = [pyuv.StdIO(flags=pyuv.UV_IGNORE)]
else:
self._redirect_in = RedirectStdin(self.loop, self)
self._stdio = [self._redirect_in.stdio]
self._redirect_io = RedirectIO(self.loop, self,
self.redirect_output)
self._stdio.extend(self._redirect_io.stdio)
# create custom streams,
for label in self.custom_streams:
stream = self.streams[label] = Stream(self.loop, self,
len(self._stdio))
self._stdio.append(stream.stdio)
# create containers for custom channels.
for channel in self.custom_channels:
self._stdio.append(pyuv.StdIO(stream=channel,
flags=pyuv.UV_INHERIT_STREAM))
[docs] def spawn(self):
""" spawn the process """
kwargs = dict(
file = self.cmd,
exit_callback = self._exit_cb,
args = self.args,
env = self.env,
cwd = self.cwd,
stdio = self._stdio)
flags = 0
if self.uid is not None:
kwargs['uid'] = self.uid
flags = pyuv.UV_PROCESS_SETUID
if self.gid is not None:
kwargs['gid'] = self.gid
flags = flags | pyuv.UV_PROCESS_SETGID
if self.detach:
flags = flags | pyuv.UV_PROCESS_DETACHED
self.running = True
self._process = pyuv.Process(self.loop)
# spawn the process
self._process.spawn(**kwargs)
self._running = True
self._cached_pid = self._process.pid
# start redirecting IO
self._redirect_io.start()
if self._redirect_in is not None:
self._redirect_in.start()
for stream in self.streams.values():
stream.start()
@property
[docs] def active(self):
return self._process.active
@property
[docs] def closed(self):
return self._process.closed
@property
[docs] def pid(self):
""" return the process pid """
if self._cached_pid is None:
self._cached_pid = self._process.pid
return self._cached_pid
@property
[docs] def info(self):
""" return the process info. If the process is monitored it
return the last informations stored asynchronously by the watcher"""
if not self._pprocess:
self._pprocess = psutil.Process(self.pid)
return get_process_info(self._pprocess, 0.1)
@property
[docs] def status(self):
""" return the process status """
if not self._pprocess:
self._pprocess = psutil.Process(self.pid)
return self._pprocess.status
def __lt__(self, other):
return (self.pid != other.pid and
self.graceful_time < other.graceful_time)
__cmp__ = __lt__
[docs] def monitor(self, listener=None):
""" start to monitor the process
Listener can be any callable and receive *("stat", process_info)*
"""
if not self._process_watcher:
self._process_watcher = ProcessWatcher(self.loop, self.pid)
self._process_watcher.subscribe(listener)
[docs] def unmonitor(self, listener):
""" stop monitoring this process.
listener is the callback passed to the monitor function
previously.
"""
if not self._process_watcher:
return
self._process_watcher.unsubscribe(listener)
[docs] def monitor_io(self, io_label, listener):
""" subscribe to registered IO events """
if not self._redirect_io:
raise IOError("%s not redirected" % self.name)
self._redirect_io.subscribe(io_label, listener)
[docs] def unmonitor_io(self, io_label, listener):
""" unsubscribe to the IO event """
if not self._redirect_io:
return
self._redirect_io.unsubscribe(io_label, listener)
[docs] def write(self, data):
""" send data to the process via stdin"""
if not self._redirect_in:
raise IOError("stdin not redirected")
self._redirect_in.write(data)
[docs] def writelines(self, data):
""" send data to the process via stdin"""
if not self._redirect_in:
raise IOError("stdin not redirected")
self._redirect_in.writelines(data)
[docs] def stop(self):
""" stop the process """
self.kill(signal.SIGTERM)
[docs] def kill(self, signum):
""" send a signal to the process """
if not self.active:
return
self._process.kill(signum)
[docs] def close(self):
self._process.close()
def _exit_cb(self, handle, exit_status, term_signal):
if self._redirect_io is not None:
self._redirect_io.stop(all_events=True)
if self._redirect_in is not None:
self._redirect_in.stop(all_events=True)
for custom_io in self.streams.values():
custom_io.stop(all_events=True)
if self._process_watcher is not None:
self._process_watcher.stop(all_events=True)
self._running = False
handle.close()
# handle the exit callback
if self.on_exit_cb is not None:
self.on_exit_cb(self, exit_status, term_signal)