# -*- coding: utf-8 -
#
# This file is part of gaffer. See the NOTICE for more information.
"""
The manager module is a core component of gaffer. A Manager is
responsible of maintaining processes and allows you to interract with
them.
Classes
=======
"""
from collections import deque, OrderedDict
from threading import RLock
import pyuv
from .events import EventEmitter
from .error import ProcessError, ProcessConflict, ProcessNotFound
from .pubsub import Topic
from .state import ProcessState, ProcessTracker
from .sync import increment
from .util import parse_signal_value
[docs]class Manager(object):
""" Manager - maintain process alive
A manager is responsible of maintaining process alive and manage
actions on them:
- increase/decrease the number of processes / process template
- start/stop processes
- add/remove process templates to manage
The design is pretty simple. The manager is running on the default
event loop and listening on events. Events are sent when a process
exit or from any method call. The control of a manager can be
extended by adding apps on startup. For example gaffer
provides an application allowing you to control processes via HTTP.
Running an application is done like this::
# initialize the application with the default loop
loop = pyuv.Loop.default_loop()
m = Manager(loop=loop)
# start the application
m.start(apps=[HttpHandler])
.... # do smth
m.stop() # stop the controlller
m.run() # run the event loop
.. note::
The loop can be omitted if the first thing you do is
launching a manager. The run function is here for convenience. You
can of course just run `loop.run()` instead
.. warning::
The manager should be stopped the last one to prevent any lock
in your application.
"""
def __init__(self, loop=None):
# by default we run on the default loop
self.loop = loop or pyuv.Loop.default_loop()
# initialize the emitter
self.events = EventEmitter(self.loop)
# initialize the process tracker
self._tracker = ProcessTracker(self.loop)
# initialize some values
self.mapps = []
self.started = False
self._stop_ev = None
self.max_process_id = 0
self.processes = OrderedDict()
self.running = OrderedDict()
self._sessions = OrderedDict()
self._topics = {}
self._updates = deque()
self._signals = []
self.status = -1
self.stop_cb = None
self.restart_cb = None
self._lock = RLock()
@property
def active(self):
return self.status == 0 and self.started
[docs] def start(self, apps=[]):
""" start the manager. """
self.mapps = apps
self._waker = pyuv.Async(self.loop, self._wakeup)
# start the process tracker
self._tracker.start()
# manage processes
self.events.subscribe('exit', self._on_exit)
# start contollers
for mapp in self.mapps:
mapp.start(self.loop, self)
self.started = True
self.status = 0
[docs] def run(self):
""" Convenience function to use in place of `loop.run()`
If the manager is not started it raises a `RuntimeError`.
Note: if you want to use separately the default loop for this
thread then just use the start function and run the loop somewhere
else.
"""
if not self.started:
raise RuntimeError("manager hasn't been started")
self.loop.run()
[docs] def stop(self, callback=None):
""" stop the manager. This function is threadsafe """
if not self.started:
return
if self.status == 1:
# someone already requested to stop the manager
return
# set the callback
self.stop_cb = callback
# update the status to stop and wake up the loop
self.status = 1
self._waker.send()
[docs] def restart(self, callback=None):
""" restart all processes in the manager. This function is
threadsafe """
if self.status == 2:
# a restart is already running
return
self.restart_cb = callback
self.status = 2
self._waker.send()
[docs] def subscribe(self, topic):
if topic not in self._topics:
self._topics[topic] = Topic(topic, self)
self._topics[topic].start()
return self._topics[topic].subscribe()
[docs] def unsubscribe(self, topic, channel):
if topic not in self._topics:
return
self._topics[topic].unsubscribe(channel)
@property
def sessions(self):
return list(self._sessions)
[docs] def jobs(self, sessionid=None):
if not sessionid:
jobs = []
for sessionid in self._sessions:
session = self._sessions[sessionid]
jobs.extend(["%s.%s" % (sessionid, name) for name in session])
return jobs
else:
try:
session = self._sessions[sessionid]
except KeyError:
raise ProcessNotFound()
return ["%s.%s" % (sessionid, name) for name in session]
[docs] def jobs_walk(self, callback, sessionid=None):
with self._lock:
if not sessionid:
for sessionid in self._sessions:
for name in self._sessions[sessionid]:
callback(self, "%s.%s" % (sessionid, name))
else:
try:
session = self._sessions[sessionid]
except KeyError:
raise ProcessNotFound()
for name in session:
callback(self, "%s.%s" % (sessionid, name))
# ------------- process functions
[docs] def load(self, config, sessionid=None, env=None, start=True):
""" load a process config object.
Args:
- **config**: a ``process.ProcessConfig`` instance
- **sessionid**: Some processes only make sense in certain contexts.
this flag instructs gaffer to maintain this process in the sessionid
context. A context can be for example an application. If no session
is specified the config will be attached to the ``default`` session.
- **env**: dict, None by default, if specified the config env variable will
be updated with the env values.
"""
sessionid = self._sessionid(sessionid)
with self._lock:
if sessionid in self._sessions:
# if the process already exists in this context raises a
# conflict.
if config.name in self._sessions[sessionid]:
raise ProcessConflict()
else:
# initialize this session
self._sessions[sessionid] = OrderedDict()
# create a new state for this config
state = ProcessState(config, sessionid, env)
self._sessions[sessionid][config.name] = state
pname = "%s.%s" % (sessionid, config.name)
self._publish("load", name=pname)
if start:
self.start_job(pname)
[docs] def unload(self, name_or_process, sessionid=None):
""" unload a process config. """
sessionid = self._sessionid(sessionid)
name = self._get_pname(name_or_process)
pname = "%s.%s" % (sessionid, name)
with self._lock:
if sessionid not in self._sessions:
raise ProcessNotFound()
# get the state and remove it from the context
session = self._sessions[sessionid]
try:
state = session.pop(name)
except KeyError:
raise ProcessNotFound()
if not session:
try:
del self._sessions[sessionid]
except KeyError:
pass
# notify that we unload the process
self._publish("unload", name=pname)
# notify that we are stoppping the process
self._publish("stop", name=pname)
self._publish("job.%s.stop" % pname, name=pname)
# stop the process now.
state.stopped = True
self._stopall(state)
[docs] def reload(self, name, sessionid=None):
""" reload a process config. The number of processes is resetted to
the one in settings and all current processes are killed """
if not sessionid:
if hasattr(name, "name"):
sessionid = 'default'
name = getattr(name, 'name')
else:
sessionid, name = self._parse_name(name)
else:
name = self._get_pname(name)
with self._lock:
# reset the number of processes
state = self._get_state(sessionid, name)
state.reset()
# kill all the processes and let gaffer manage asynchronously the
# reload
self._stopall(state)
# manage processes
self._manage_processes(state)
[docs] def update(self, config, sessionid=None, env=None, start=False):
""" update a process config. All processes are killed """
sessionid = self._sessionid(sessionid)
with self._lock:
state = self._get_state(sessionid, config.name)
state.update(config, env=env)
if start:
# make sure we unstop the process
state.stop = False
# kill all the processes and let gaffer manage asynchronously the
# reload. If the process is not stopped then it will start
self._stopall(state)
[docs] def get(self, name):
""" get a job config """
sessionid, name = self._parse_name(name)
with self._lock:
state = self._get_state(sessionid, name)
return state.config
[docs] def start_job(self, name):
""" Start a job from which the config have been previously loaded """
sessionid, name = self._parse_name(name)
pname = "%s.%s" % (sessionid, name)
with self._lock:
state = self._get_state(sessionid, name)
# make sure we unstop the process
state.stopped = False
# reset the number of processes
state.reset()
# notify that we are starting the process
self._publish("start", name=pname)
self._publish("job.%s.start" % pname, name=pname)
# manage processes
self._manage_processes(state)
[docs] def stop_job(self, name):
""" stop a jon. All processes of this job are stopped and won't be
restarted by the manager """
sessionid, name = self._parse_name(name)
pname = "%s.%s" % (sessionid, name)
with self._lock:
state = self._get_state(sessionid, name)
# put the number to 0
state.numprocesses = 0
# flag the state to stop
state.stopped = True
# notify that we are stoppping the process
self._publish("stop", name=pname)
self._publish("job.%s.stop" % pname, name=pname)
self._stopall(state)
[docs] def commit(self, name, graceful_timeout=0, env=None):
""" Like ``scale(1) but the process won't be kept alived at the end.
It is also not handled uring scaling or reaping. """
sessionid, name = self._parse_name(name)
pname = "%s.%s" % (sessionid, name)
with self._lock:
state = self._get_state(sessionid, name)
# commit the job and return the pid
return self._commit_process(state,
graceful_timeout=graceful_timeout, env=env)
[docs] def scale(self, name, n):
""" Scale the number of processes in for a job. By using this
function you can increase, decrease or set the number of processes in
a template. Change is handled once the event loop is idling
n can be a positive or negative integer. It can also be a string
containing the opetation to do. For example::
m.scale("sometemplate", 1) # increase of 1
m.scale("sometemplate", -1) # decrease of 1
m.scale("sometemplate", "+1") # increase of 1
m.scale("sometemplate", "-1") # decrease of 1
m.scale("sometemplate", "=1") # set the number of processess to 1
"""
sessionid, name = self._parse_name(name)
pname = "%s.%s" % (sessionid, name)
# find the operation to do
if isinstance(n, int):
if n > 0:
op = "+"
else:
op = "-"
n = abs(n)
else:
if n.isdigit():
op = "+"
n = int(n)
else:
op = n[0]
if op not in ("=", "+", "-"):
raise ValueError("bad_operation")
n = int(n[1:])
with self._lock:
state = self._get_state(sessionid, name)
# scale
if op == "=":
curr = state.numprocesses
if curr > n:
ret = state.decr(curr - n)
else:
ret = state.incr(n - curr)
elif op == "+":
ret = state.incr(n)
else:
ret = state.decr(n)
self._publish("update", name=pname)
self._manage_processes(state)
return ret
[docs] def info(self, name):
""" get job' infos """
sessionid, name = self._parse_name(name)
pname = "%s.%s" % (sessionid, name)
with self._lock:
state = self._get_state(sessionid, name)
processes = list(state.running)
processes.extend(list(state.running_out))
info = {"name": pname,
"active": state.active,
"running": len(processes),
"running_out": len(state.running_out),
"max_processes": state.numprocesses,
"processes": [p.pid for p in processes]}
# get config
config = state.config.to_dict()
# remove custom channels because they can't be serialized
config.pop('custom_channels', None)
# add config to the info
info['config'] = config
return info
[docs] def stats(self, name):
""" return job stats
"""
sessionid, name = self._parse_name(name)
pname = "%s.%s" % (sessionid, name)
with self._lock:
state = self._get_state(sessionid, name)
processes = list(state.running)
processes.extend(list(state.running_out))
stats = []
lmem = []
lcpu = []
for p in processes:
pstats = p.stats
pstats['pid'] = p.pid
pstats['os_pid'] = p.os_pid
stats.append(pstats)
lmem.append(pstats['mem'])
lcpu.append(pstats['cpu'])
if 'N/A' in lmem or not lmem:
mem, max_mem, min_mem = "N/A"
else:
max_mem = max(lmem)
min_mem = min(lmem)
mem = sum(lmem)
if 'N/A' in lcpu or not lcpu:
cpu, max_cpu, min_cpu = "N/A"
else:
max_cpu = max(lcpu)
min_cpu = min(lcpu)
cpu = sum(lcpu)
ret = dict(name=pname, stats=stats, mem=mem,
max_mem=max_mem, min_mem=min_mem, cpu=cpu,
max_cpu=max_cpu, min_cpu=min_cpu)
return ret
[docs] def get_process(self, pid):
""" get an OS process by ID. A process is a ``gaffer.Process`` instance
attached to a process state that you can use.
"""
with self._lock:
return self._get_pid(pid)
[docs] def stop_process(self, pid):
""" stop a process """
with self._lock:
# remove the job from the runnings jobs
try:
p = self.running.pop(pid)
except KeyError:
raise ProcessNotFound()
# remove the process from the state
sessionid, name = self._parse_name(p.name)
state = self._get_state(sessionid, name)
# if the process is marked once it means the job has been
# committed and the process shouldn't be restarted
if p.once:
state.running_out.remove(p)
else:
state.remove(p)
# notify we stop this pid
self._publish("stop_process", pid=p.pid, name=p.name)
# then stop the process
p.stop()
# track this process to make sure it's killed after the
# graceful time
graceful_timeout = p.graceful_timeout or state.graceful_timeout
self._tracker.check(p, graceful_timeout)
[docs] def stopall(self, name):
""" stop all processes of a job. Processes are just exiting and will
be restarted by the manager. """
sessionid, name = self._parse_name(name)
with self._lock:
state = self._get_state(sessionid, name)
# kill all the processes.
self._stopall(state)
[docs] def kill(self, pid, sig):
""" send a signal to a process """
signum = parse_signal_value(sig)
with self._lock:
p = self._get_pid(pid)
# notify we stop this job
self._publish("proc.%s.kill" % p.pid, pid=p.pid, name=p.name)
# effectively send the signal
p.kill(signum)
[docs] def send(self, pid, lines, stream=None):
""" send some data to the process """
with self._lock:
p = self._get_pid(pid)
# find the stream we need to write to
if not stream or stream == "stdin":
target = p
else:
if stream in p.streams:
target = p.streams[stream]
else:
raise ProcessError(404, "stream_not_found")
# finally write to the stream
if isinstance(lines, list):
target.writelines(lines)
else:
target.write(lines)
[docs] def killall(self, name, sig):
""" send a signal to all processes of a job """
signum = parse_signal_value(sig)
sessionid, name = self._parse_name(name)
pname = "%s.%s" % (sessionid, name)
with self._lock:
state = self._get_state(sessionid, name)
self._publish("job.%s.kill" % pname, name=pname, signum=signum)
processes = list(state.running)
processes.extend(list(state.running_out))
for p in processes:
# notify we stop this job
self._publish("proc.%s.kill" % p.pid, pid=p.pid, name=p.name)
# effectively send the signal
p.kill(signum)
self._manage_processes(state)
[docs] def walk(self, callback, name=None):
with self._lock:
if not name:
processes = [p for pid, p in self.running.items()]
else:
sessionid, name = self._parse_name(name)
state = self._get_state(sessionid, name)
processes = state.running
for p in processes:
callback(self, p)
[docs] def list(self, name=None):
with self._lock:
if not name:
processes = [p for pid, p in self.running.items()]
else:
sessionid, name = self._parse_name(name)
state = self._get_state(sessionid, name)
processes = state.running
return list(processes)
[docs] def pids(self, name=None):
return [p.pid for p in self.list(name=name)]
[docs] def manage(self, name):
sessionid, name = self._parse_name(name)
with self._lock:
state = self._get_state(sessionid, name)
self._manage_processes(state)
[docs] def monitor(self, listener, name):
""" get stats changes on a process template or id
"""
sessionid, name = self._parse_name(name)
with self._lock:
state = self._get_state(sessionid, name)
for p in state.running:
p.monitor(listener)
[docs] def unmonitor(self, listener, name):
""" get stats changes on a process template or id
"""
sessionid, name = self._parse_name(name)
with self._lock:
state = self._get_state(sessionid, name)
for p in state.running:
p.unmonitor(listener)
# ------------- general purpose utilities
[docs] def wakeup(self):
self._waker.send()
[docs] def get_process_id(self):
""" generate a process id """
self.max_process_id = increment(self.max_process_id)
return self.max_process_id
def _get_locked_state(self, name):
""" utility function to get a state from name generally used for debug
"""
sessionid, name = self._parse_name(name)
with self._lock:
return self._get_state(sessionid, name)
def _sessionid(self, session=None):
if not session:
return "default"
return session
def _get_pname(self, name_or_process):
if hasattr(name_or_process, "name"):
return name_or_process.name
else:
return name_or_process
def _parse_name(self, name):
if "." in name:
sessionid, name = name.split(".", 1)
elif "/" in name:
sessionid, name = name.split("/", 1)
else:
sessionid = "default"
return sessionid, name
def _get_state(self, sessionid, name):
if sessionid not in self._sessions:
raise ProcessNotFound()
session = self._sessions[sessionid]
if name not in session:
raise ProcessNotFound()
return session[name]
def _get_pid(self, pid):
try:
return self.running[pid]
except KeyError:
raise ProcessNotFound()
# ------------- general private functions
def _shutdown(self):
with self._lock:
self._tracker.stop()
# stop the applications.
for ctl in self.mapps:
ctl.stop()
# we are now stopped
self.started = False
# close all handles
#def walk_cb(h):
# if h.active:
# h.close()
#self.loop.walk(walk_cb)
# if there any stop callback, excute it
if self.stop_cb is not None:
self.stop_cb(self)
self.stop_cb = None
def _stop(self):
# stop should be synchronous. We need to first stop the
# processes and let the applications know about it. It is
# actually done by setting on startup a timer waiting that all
# processes have stopped to run. Then applications are stopped.
self.stopping = True
# stop all processes
with self._lock:
for sid in self._sessions:
for name, state in self._sessions[sid].items():
if not state.stopped:
state.stopped = True
self._stopall(state)
self._tracker.on_done(self._shutdown)
def _restart(self):
with self._lock:
# on restart we first restart the applications
for app in self.mapps:
app.restart()
# then we restart the sessions
for sid in self._sessions:
session = self._sessions[sid]
for name in session:
self._restart_processes(session[name])
# if any callback has been set, run it
if self.restart_cb is not None:
self.restart_cb(self)
self.restart_cb = None
self.status = 0
# ------------- process type private functions
def _stop_group(self, state, group):
while True:
try:
p = group.popleft()
except IndexError:
break
if p.pid not in self.running:
continue
self.running.pop(p.pid)
# notify we stop this pid
self._publish("stop_process", pid=p.pid, name=p.name)
# stop the process
p.stop()
# track this process to make sure it's killed after the
# graceful time
graceful_timeout = p.graceful_timeout or state.graceful_timeout
self._tracker.check(p, graceful_timeout)
def _stopall(self, state):
""" stop all processes of a job """
# stop the flapping detection before killing the process to prevent
# any race condition
if state.flapping_timer is not None:
state.flapping_timer.stop()
# kill all keepalived processes
if state.running:
self._stop_group(state, state.running)
# kill all others processes (though who have been committed)
if state.running_out:
self._stop_group(state, state.running_out)
# if the job isn't stopped, restart the flapping detection
if not state.stopped and state.flapping_timer is not None:
state.flapping_timer.start()
# ------------- functions that manage the process
def _commit_process(self, state, graceful_timeout=10.0, env=None):
""" like spawn but doesn't keep the process associated to the state.
It should die at the end """
# get internal process id
pid = self.get_process_id()
# start process
p = state.make_process(self.loop, pid, self._on_process_exit)
p.spawn(once=True, graceful_timeout=graceful_timeout, env=env)
# add the pid to external processes in the state
state.running_out.append(p)
# we keep a list of all running process by id here
self.running[pid] = p
# notify
self._publish("spawn", name=p.name, pid=pid, os_pid=p.os_pid)
# on commit we return the pid now, so someone will be able to use it.
return pid
def _spawn_process(self, state):
""" spawn a new process and add it to the state """
# get internal process id
pid = self.get_process_id()
# start process
p = state.make_process(self.loop, pid, self._on_process_exit)
p.spawn()
# add the process to the running state
state.queue(p)
# we keep a list of all running process by id here
self.running[pid] = p
self._publish("spawn", name=p.name, pid=pid, os_pid=p.os_pid)
self._publish("job.%s.spawn" % p.name, name=p.name, pid=pid,
os_pid=p.os_pid)
def _spawn_processes(self, state):
""" spawn all processes for a state """
num_to_start = state.numprocesses - len(state.running)
for i in range(num_to_start):
self._spawn_process(state)
def _reap_processes(self, state):
if state.stopped:
return
diff = len(state.running) - state.numprocesses
if diff > 0:
for i in range(diff):
# remove the process from the running processes
try:
p = state.dequeue()
except IndexError:
return
# remove the pid from the running processes
if p.pid in self.running:
self.running.pop(p.pid)
# stop the process
p.stop()
# track this process to make sure it's killed after the
# graceful time
self._tracker.check(p, state.graceful_timeout)
# notify others that the process is beeing reaped
self._publish("reap", name=p.name, pid=p.pid, os_pid=p.os_pid)
self._publish("job.%s.reap" % p.name, name=p.name, pid=p.pid,
os_pid=p.os_pid)
self._publish("proc.%s.reap" % p.pid,
name=p.name, pid=p.pid, os_pid=p.os_pid)
def _manage_processes(self, state):
if state.stopped:
return
if len(state.running) < state.numprocesses:
self._spawn_processes(state)
self._reap_processes(state)
def _restart_processes(self, state):
# first launch new processes
for i in range(state.numprocesses):
self._spawn_process(state)
# then reap useless one.
self._manage_processes(state)
def _check_flapping(self, state):
if not state.flapping:
return True
check_flapping, can_retry = state.check_flapping()
if not check_flapping:
self._publish("flap", name=state.name)
# stop the processes
if not state.stopped:
state.stopped = True
self._stopall(state)
if can_retry:
# if we can retry later then set a callback
def flapping_cb(handle):
# allows respawning
state.stopped = False
state._flapping_timer = None
# restart processes
self._restart_processes(state)
# set a callback
t = pyuv.Timer(self.loop)
t.start(flapping_cb, state.flapping.retry_in, 0.0)
state._flapping_timer = t
return False
return True
def _publish(self, evtype, **ev):
event = {"event": evtype }
event.update(ev)
self.events.publish(evtype, event)
# ------------- events handler
def _wakeup(self, handle):
if self.status == 1:
handle.close()
self._stop()
elif self.status == 2:
self._restart()
def _on_exit(self, evtype, msg):
sessionid, name = self._parse_name(msg['name'])
once = msg.get('once', False)
with self._lock:
try:
state = self._get_state(sessionid, name)
except ProcessNotFound:
# race condition, we already removed this process
return
# eventually restart the process
if not state.stopped and not once:
# manage the template, eventually restart a new one.
if self._check_flapping(state):
self._manage_processes(state)
def _on_process_exit(self, process, exit_status, term_signal):
with self._lock:
# maybe uncheck this process from the tracker
self._tracker.uncheck(process)
# unexpected exit, remove the process from the list of
# running processes.
if process.pid in self.running:
self.running.pop(process.pid)
sessionid, name = self._parse_name(process.name)
try:
state = self._get_state(sessionid, name)
# remove the process from the state if needed
if process.once:
state.running_out.remove(process)
else:
state.remove(process)
except (ProcessNotFound, KeyError):
pass
# notify other that the process exited
ev_details = dict(name=process.name, pid=process.pid,
exit_status=exit_status, term_signal=term_signal,
os_pid=process.os_pid, once=process.once)
self._publish("exit", **ev_details)
self._publish("job.%s.exit" % process.name, **ev_details)