Source code for gaffer.manager

# -*- coding: utf-8 -
#
# This file is part of gaffer. See the NOTICE for more information.
"""
The manager module is a core component of gaffer. A Manager is
responsible of maintaining processes and allows you to interract with
them.

Classes
=======

"""
from collections import deque
import copy
import operator
from threading import RLock

import pyuv
import six

try:
    from collections import OrderedDict
except ImportError:
    from .datastructures import OrderedDict

from .events import EventEmitter
from .process import Process
from .state import ProcessState, ProcessTracker
from .sync import increment

[docs]class Manager(object): """ Manager - maintain process alive A manager is responsible of maintaining process alive and manage actions on them: - increase/decrease the number of processes / process template - start/stop processes - add/remove process templates to manage The design is pretty simple. The manager is running on the default event loop and listening on events. Events are sent when a process exit or from any method call. The control of a manager can be extended by adding apps on startup. For example gaffer provides an application allowing you to control processes via HTTP. Running an application is done like this:: # initialize the application with the default loop loop = pyuv.Loop.default_loop() m = Manager(loop=loop) # start the application m.start(apps=[HttpHandler]) .... # do smth m.stop() # stop the controlller m.run() # run the event loop .. note:: The loop can be omitted if the first thing you do is launching a manager. The run function is here for convenience. You can of course just run `loop.run()` instead .. warning:: The manager should be stopped the last one to prevent any lock in your application. """ def __init__(self, loop=None): # by default we run on the default loop self.loop = loop or pyuv.Loop.default_loop() # wakeup ev for internal signaling self._wakeup_ev = pyuv.Async(self.loop, self._on_wakeup) # initialize the emitter self._emitter = EventEmitter(self.loop) # initialize the process tracker self._tracker = ProcessTracker(self.loop) # initialize some values self.apps = None self.started = False self._stop_ev = None self.max_process_id = 0 self.processes = OrderedDict() self.running = OrderedDict() self.groups = {} self.channel = deque() self._updates = deque() self._signals = [] self.stopping= False self.stop_cb = None self.restart_cb = None self._lock = RLock()
[docs] def start(self, apps=[]): """ start the manager. """ self.apps = apps # start the process tracker self._tracker.start() # manage processes self.subscribe('exit', self._on_exit) # start contollers for ctl in self.apps: ctl.start(self.loop, self) self.started = True
[docs] def run(self): """ Convenience function to use in place of `loop.run()` If the manager is not started it raises a `RuntimeError`. Note: if you want to use separately the default loop for this thread then just use the start function and run the loop somewhere else. """ if not self.started: raise RuntimeError("manager hasn't been started") self.loop.run()
[docs] def stop(self, callback=None): """ stop the manager. This function is threadsafe """ self.stop_cb = callback self._signals.append("STOP") self.wakeup()
[docs] def restart(self, callback=None): """ restart all processes in the manager. This function is threadsafe """ self.restart_cb = callback self._signals.append("RESTART") self.wakeup()
[docs] def start_processes(self): """ start all processes """ self._lock.acquire() for name in self.processes: self._lock.release() self.start_process(name) self._lock.acquire() self._lock.release()
[docs] def stop_processes(self): """ stop all processes in the manager """ with self._lock: for name in self.processes: self._stop_processes(name)
[docs] def running_processes(self): """ return running processes """ with self._lock: return self.running
[docs] def processes_stats(self): """ iterator returning all processes stats """ with self._lock: for name in self.processes: yield self.get_process_stats(name)
[docs] def subscribe(self, evtype, listener): """ subscribe to the manager event *eventype* 'on' is an alias to this function """ self._emitter.subscribe(evtype, listener)
on = subscribe
[docs] def subscribe_once(self, evtype, listener): """ subscribe once to the manager event *eventype* 'once' is an alias to this function """ self._emitter.subscribe_once(evtype, listener)
once = subscribe
[docs] def unsubscribe(self, evtype, listener): """ unsubscribe from the event *eventype* """ self._emitter.unsubscribe(evtype, listener)
[docs] def get_groups(self): """ return the groups list """ with self._lock: return list(self.groups)
[docs] def get_group(self, groupname): """ return list of named process of this group """ with self._lock: if groupname not in self.groups: raise KeyError('%r not found') return copy.copy(self.groups[groupname])
[docs] def remove_group(self, groupname): """ remove a group and all its processes. All processes are stopped """ self._apply_group_func(groupname, self.remove_process) # finally remove the group with self._lock: del self.groups[groupname]
[docs] def start_group(self, groupname): """ start all process templates of the group """ self._apply_group_func(groupname, self.start_process)
[docs] def stop_group(self, groupname): """ stop all processes templates of the group """ self._apply_group_func(groupname, self.stop_process)
[docs] def restart_group(self, groupname): """ restart all processes in a group """ self._apply_group_func(groupname, self.restart_process) # ------------- process functions
[docs] def add_process(self, name, cmd, **kwargs): """ add a process to the manager. all process should be added using this function - **name**: name of the process - **cmd**: program command, string) - **args**: the arguments for the command to run. Can be a list or a string. If **args** is a string, it's splitted using :func:`shlex.split`. Defaults to None. - **env**: a mapping containing the environment variables the command will run with. Optional - **uid**: int or str, user id - **gid**: int or st, user group id, - **cwd**: working dir - **detach**: the process is launched but won't be monitored and won't exit when the manager is stopped. - **shell**: boolean, run the script in a shell. (UNIX only), - **os_env**: boolean, pass the os environment to the program - **numprocesses**: int the number of OS processes to launch for this description - **flapping**: a FlappingInfo instance or, if flapping detection should be used. flapping parameters are: - **attempts**: maximum number of attempts before we stop the process and set it to retry later - **window**: period in which we are testing the number of retry - **retry_in**: seconds, the time after we restart the process and try to spawn them - **max_retry**: maximum number of retry before we give up and stop the process. - **redirect_output**: list of io to redict (max 2) this is a list of custom labels to use for the redirection. Ex: ["a", "b"] will redirect stdout & stderr and stdout events will be labeled "a" - **redirect_input**: Boolean (False is the default). Set it if you want to be able to write to stdin. - **graceful_timeout**: graceful time before we send a SIGKILL to the process (which definitely kill it). By default 30s. This is a time we let to a process to exit cleanly. """ with self._lock: if name in self.processes: raise KeyError("a process named %r is already managed" % name) if 'start' in kwargs: start = kwargs.pop('start') else: start = True # Grouped process are prefixed by the name of the group # <grouname>:<name> if ":" in name: group = name.split(":", 1)[0] kwargs['group'] = group else: group = None state = ProcessState(name, cmd, **kwargs) self.processes[name] = state # register this name to the group if group is not None: try: self.groups[group].append(name) except KeyError: self.groups[group] = [name] self._publish("create", name=name) if start: self._publish("start", name=name) self._publish("proc.%s.start" % name, name=name) self._manage_processes(state)
[docs] def update_process(self, name, cmd, **kwargs): """ update a process information. When a process is updated, all current processes are stopped then the state is updated and new processes with new info are started """ with self._lock: if name not in KeyError: raise KeyError("%r not found" % name) self._stop_processes(name) state = ProcessState(name, cmd, **kwargs) state.setup(name, cmd, **kwargs) if 'start' in kwargs: del kwargs['start'] self._publish("update", name=name) self._manage_processes(state)
[docs] def start_process(self, name): with self._lock: if name in self.processes: state = self.processes[name] self._publish("start", name=name) self._publish("proc.%s.start" % name, name=name) self._manage_processes(state) else: raise KeyError("%s not found")
[docs] def stop_process(self, name_or_id): """ stop a process by name or id If a name is given all processes associated to this name will be removed and the process is marked at stopped. If the internal process id is givien, only the process with this id will be stopped """ with self._lock: # stop all processes of the template name if isinstance(name_or_id, six.string_types): self._stop_processes(name_or_id) else: # stop a process by its internal pid self._stop_process(name_or_id)
[docs] def restart_process(self, name): """ restart a process """ with self._lock: if name not in self.processes: raise KeyError("%r not found" % name) state = self.get_process_state(name) self._restart_processes(state)
[docs] def remove_process(self, name): """ remove the process and its config from the manager """ with self._lock: if name not in self.processes: raise KeyError("%r not found" % name) # stop all processes self._stop_processes(name) # remove it the from the list state = self.processes.pop(name) # also remove it from the group if any. if state.group is not None: if state.group in self.groups: g = self.groups[state.group] del g[operator.indexOf(g, name)] self.groups[state.group] = g # notify other that this template has been deleted self._publish("delete", name=name)
[docs] def manage_process(self, name): with self._lock: if name not in self.processes: raise KeyError("%r not found" % name) state = self.processes[name] self._manage_processes(state)
[docs] def get_process(self, name_or_pid): with self._lock: if isinstance(name_or_pid, int): return self.running[name_or_pid] else: return self.processes[name_or_pid]
[docs] def get_process_info(self, name): """ get process info """ with self._lock: if name not in self.processes: raise KeyError("%r not found" % name) state = self.processes[name] info = {"name": state.name, "cmd": state.cmd} info.update(state.settings) return info
[docs] def get_process_status(self, name): """ return the process status:: { "active": str, "running": int, "max_processes": int } - **active** can be *active* or *stopped* - **running**: the number of actually running OS processes using this template. - **max_processes**: The maximum number of processes that should run. It is is normally the same than the **runnin** value. """ with self._lock: if name not in self.processes: raise KeyError("%r not found" % name) state = self.processes[name] status = { "active": state.active, "running": len(state.running), "max_processes": state.numprocesses } return status
[docs] def get_process_stats(self, name_or_id): """ return process stats for a process template or a process id """ with self._lock: if isinstance(name_or_id, int): try: return self.running[name_or_id].info except KeyError: raise KeyError("%s not found" % name_or_id) else: if name_or_id not in self.processes: raise KeyError("%r not found" % name_or_id) state = self.processes[name_or_id] return state.stats()
[docs] def monitor(self, name_or_id, listener): """ get stats changes on a process template or id """ with self._lock: try: if isinstance(name_or_id, int): return self.running[name_or_id].monitor(listener) else: state = self.processes[name_or_id] return state.monitor(listener) except KeyError: raise KeyError("%s not found" % name_or_id)
[docs] def unmonitor(self, name_or_id, listener): """ get stats changes on a process template or id """ with self._lock: try: if isinstance(name_or_id, int): return self.running[name_or_id].unmonitor(listener) else: state = self.get_process_state(name_or_id) return state.unmonitor(listener) except KeyError: raise KeyError("%s not found" % name_or_id)
[docs] def ttin(self, name, i=1): """ increase the number of system processes for a state. Change is handled once the event loop is idling """ with self._lock: state = self.get_process_state(name) ret = state.ttin(i) self._publish("update", name=name) self._manage_processes(state) return ret
[docs] def ttou(self, name, i=1): """ decrease the number of system processes for a state. Change is handled once the event loop is idling """ with self._lock: state = self.get_process_state(name) ret = state.ttou(i) self._publish("update", name=name) self._manage_processes(state) return ret
[docs] def send_signal(self, name_or_id, signum): """ send a signal to a process or all processes contained in a state """ with self._lock: try: if isinstance(name_or_id, int): p = self.running[name_or_id] p.kill(signum) else: state = self.processes[name_or_id] for p in state.running: p.kill(signum) except KeyError: pass # ------------- general purpose utilities
[docs] def wakeup(self): self._wakeup_ev.send()
[docs] def get_process_state(self, name): if name not in self.processes: return return self.processes[name]
[docs] def get_process_id(self): """ generate a process id """ self.max_process_id = increment(self.max_process_id) return self.max_process_id # ------------- private functions
def _shutdown(self): with self._lock: # stop the applications. for ctl in self.apps: ctl.stop() # we are now stopped self.started = False # close all handles #def walk_cb(h): # if h.active: # h.close() #self.loop.walk(walk_cb) # if there any stop callback, excute it if self.stop_cb is not None: self.stop_cb(self) self.stop_cb = None def _stop(self): # stop should be synchronous. We need to first stop the # processes and let the applications know about it. It is # actually done by setting on startup a timer waiting that all # processes have stopped to run. Then applications are stopped. self.stopping = True # stop all processes with self._lock: for name in self.processes: self._stop_processes(name) self._tracker.on_done(self._shutdown) def _restart(self): with self._lock: # on restart we first restart the applications for app in self.apps: app.restart() # then we restart the processes for name, state in self.processes.items(): self._restart_processes(state) # if any callback has been set, run it if self.restart_cb is not None: self.restart_cb(self) self.restart_cb = None def _stop_processes(self, name): """ stop all processes in a template """ if name not in self.processes: return # get the template state = self.processes[name] if state.stopped: return state.stopped = True # notify others that all processes of the templates are beeing # stopped. self._publish("stop", name=name) self._publish("proc.%s.stop" % name, name=name) # stop the flapping detection. if state.flapping_timer is not None: state.flapping_timer.stop() # iterrate over queued processes. while True: try: p = state.dequeue() except IndexError: break # notify other that the process is beeing stopped self._publish("stop_pid", name=p.name, pid=p.id, os_pid=p.pid) self._publish("proc.%s.stop_pid" % p.name, name=p.name, pid=p.id, os_pid=p.pid) # remove the pid from the running processes if p.id in self.running: self.running.pop(p.id) # stop the process p.stop() # track this process to make sure it's killed after the # graceful time self._tracker.check(p, state.graceful_timeout) def _stop_process(self, pid): """ stop a process bby id """ if pid not in self.running: return # remove the process from the running processes p = self.running.pop(pid) state = self.processes[p.name] state.remove(p) # stop the process p.stop() # track this process to make sure it's killed after the # graceful time self._tracker.check(p, state.graceful_timeout) # notify other that the process is beeing stopped self._publish("stop_pid", name=p.name, pid=pid, os_pid=p.pid) self._publish("proc.%s.stop_pid" % p.name, name=p.name, pid=pid, os_pid=p.pid) def _spawn_process(self, state): """ spawn a new process and add it to the state """ # get internal process id pid = self.get_process_id() # start process p = state.make_process(self.loop, pid, self._on_process_exit) p.spawn() # add the process to the running state state.queue(p) # we keep a list of all running process by id here self.running[pid] = p self._publish("spawn", name=p.name, pid=pid, detached=p.detach, os_pid=p.pid) self._publish("proc.%s.spawn" % p.name, name=p.name, pid=pid, detached=p.detach, os_pid=p.pid) def _spawn_processes(self, state): """ spawn all processes for a state """ num_to_start = state.numprocesses - len(state.running) for i in range(num_to_start): self._spawn_process(state) def _reap_processes(self, state): if state.stopped: return diff = len(state.running) - state.numprocesses if diff > 0: for i in range(diff): # remove the process from the running processes try: p = state.dequeue() except IndexError: return # remove the pid from the running processes if p.id in self.running: self.running.pop(p.id) # stop the process p.stop() # track this process to make sure it's killed after the # graceful time self._tracker.check(p, state.graceful_timeout) # notify others that the process is beeing reaped self._publish("reap", name=p.name, pid=p.id, os_pid=p.pid) self._publish("proc.%s.reap" % p.name, name=p.name, pid=p.id, os_pid=p.pid) def _manage_processes(self, state): if state.stopped: return if len(state.running) < state.numprocesses: self._spawn_processes(state) self._reap_processes(state) def _restart_processes(self, state): # first launch new processes for i in range(state.numprocesses): self._spawn_process(state) # then reap useless one. self._manage_processes(state) def _check_flapping(self, state): if not state.flapping: return True check_flapping, can_retry = state.check_flapping() if not check_flapping: self._publish("flap", name=state.name) # stop the processes self._stop_processes(state.name) if can_retry: # if we can retry later then set a callback def flapping_cb(handle): # allows respawning state.stopped = False state._flapping_timer = None # restart processes self._restart_processes(state) # set a callback t = pyuv.Timer(self.loop) t.start(flapping_cb, state.flapping.retry_in, 0.0) state._flapping_timer = t return False return True def _publish(self, evtype, **ev): event = {"event": evtype } event.update(ev) self._emitter.publish(evtype, event) def _apply_group_func(self, groupname, func): self._lock.acquire() if groupname not in self.groups: raise KeyError('%r not found') for name in self.groups[groupname]: if name in self.processes: self._lock.release() func(name) self._lock.acquire() self._lock.release() # ------------- events handler def _on_wakeup(self, handle): sig = self._signals.pop(0) if len(self._signals) else None if not sig: return if sig == "STOP": handle.close() self._stop() elif sig == "RESTART": self._restart() def _on_exit(self, evtype, msg): with self._lock: state = self.get_process_state(msg['name']) if not state: return # eventually restart the process if not state.stopped: # manage the template, eventually restart a new one. if self._check_flapping(state): self._manage_processes(state) def _on_process_exit(self, process, exit_status, term_signal): with self._lock: # maybe uncjeck this process from the tracker self._tracker.uncheck(process) # unexpected exit, remove the process from the list of # running processes. if process.id in self.running: self.running.pop(process.id) state = self.get_process_state(process.name) if state and state is not None: # remove the process from the state state.remove(process) # notify other that the process exited ev_details = dict(name=process.name, pid=process.id, exit_status=exit_status, term_signal=term_signal, os_pid=process.pid) self._publish("exit", **ev_details) self._publish("proc.%s.exit" % process.name, **ev_details)

This Page