Source code for gaffer.httpclient

# -*- coding: utf-8 -
#
# This file is part of gaffer. See the NOTICE for more information.
"""
Gaffer provides you a simple Client to control a gaffer node via HTTP.

Example of usage::

    import pyuv

    from gaffer.httpclient import Server

    # initialize a loop
    loop = pyuv.Loop.default_loop()

    s = Server("http://localhost:5000", loop=loop)

    # add a process without starting it
    process = s.add_process("dummy", "/some/path/to/dummy/script", start=False)

    # start a process
    process.start()

    # increase the number of process by 2 (so 3 will run)
    process.add(2)

    # stop all processes
    process.stop()

    loop.run()

"""


import json
import signal

import pyuv
import six
from tornado import httpclient

from .events import EventEmitter
from .tornado_pyuv import IOLoop

if six.PY3:
    import urllib.parse
    quote = urllib.parse.quote
    quote_plus = urllib.parse.quote_plus
    unquote = urllib.parse.unquote
    urlencode = urllib.parse.urlencode
else:
    import urllib
    quote = urllib.quote
    quote_plus = urllib.quote_plus
    unquote = urllib.unquote
    urlencode = urllib.urlencode


[docs]class GafferNotFound(Exception): """ exception raised on HTTP 404 """
[docs]class GafferConflict(Exception): """ exption raised on HTTP 409 """
[docs]class HTTPClient(object): """A blocking HTTP client. This interface is provided for convenience and testing; most applications that are running an IOLoop will want to use `AsyncHTTPClient` instead. Typical usage looks like this:: http_client = httpclient.HTTPClient() try: response = http_client.fetch("http://www.friendpaste.com/") print response.body except httpclient.HTTPError as e: print("Error: %s" % e) """ def __init__(self, async_client_class=None, loop=None, **kwargs): self._io_loop = IOLoop(_loop=loop) if async_client_class is None: async_client_class = httpclient.AsyncHTTPClient self._async_client = async_client_class(self._io_loop, **kwargs) self._response = None self._closed = False def __del__(self): self.close()
[docs] def close(self): """Closes the HTTPClient, freeing any resources used.""" if not self._closed: self._async_client.close() self._io_loop.close(True) self._closed = True
[docs] def fetch(self, request, **kwargs): """Executes a request, returning an `HTTPResponse`. The request may be either a string URL or an `HTTPRequest` object. If it is a string, we construct an `HTTPRequest` using any additional kwargs: ``HTTPRequest(request, **kwargs)`` If an error occurs during the fetch, we raise an `HTTPError`. """ def callback(response): self._response = response self._io_loop.stop() self._async_client.fetch(request, callback, **kwargs) self._io_loop.start() response = self._response self._response = None response.rethrow() return response
[docs]class EventsourceClient(object): """ simple client to fetch Gaffer streams using the eventsource stream. Example of usage:: loop = pyuv.Loop.default_loop() def cb(event, data): print(data) # create a client url = http://localhost:5000/streams/1/stderr?feed=continuous' client = EventSourceClient(loop, url) # subscribe to the stderr event client.subscribe("stderr", cb) # start the client client.start() """ def __init__(self, loop, url, **kwargs): self.loop = loop self._io_loop = IOLoop(_loop=loop) self.url = url self._emitter = EventEmitter(self.loop) self.client = httpclient.AsyncHTTPClient(self._io_loop, **kwargs) self.active = False self.stopped = False
[docs] def start(self): self.active = True headers = {"Content-Type": "text/event-stream"} req = httpclient.HTTPRequest(url=self.url, method='GET', headers=headers, request_timeout=0, streaming_callback=self._on_stream) self.client.fetch(req, self._on_request) self._io_loop.start(False)
[docs] def subscribe(self, event, listener): self._emitter.subscribe(event, listener)
[docs] def unsubscribe(self, event, listener): self._emitter.unsubscribe(event, listener)
[docs] def subscribe_once(self, event, listener): self._emitter.subscribe_once(event, listener)
[docs] def render(self, event, data): return data
[docs] def stop(self): self.active = False #self._emitter.close() self.client.close() self._io_loop.stop() self._io_loop.close(True)
[docs] def run(self): self.loop.run()
def _on_request(self, response): self.stop() def _on_stream(self, message): if not message: return lines = [line for line in message.strip(b'\r\n').split(b"\r\n")] event = None data = [] for line in lines: f, val = line.split(b":", 1) if f == b"event": event = val.strip() elif f == b"data": data.append(val.strip()) if event is None: return event = event.decode('utf-8') data = self.render(event, b"\n".join(data).strip()) self._emitter.publish(event, data)
[docs]class Watcher(EventsourceClient): """ simple EventsourceClient wrapper that decode the JSON to a python object """
[docs] def render(self, event, data): return json.loads(data.decode('utf-8'))
[docs]class Server(object): """ Server, main object to connect to a gaffer node. Most of the calls are blocking. (but running in the loop) """ def __init__(self, uri, loop=None, **options): self.loop = loop or pyuv.Loop() self.uri = uri self.options = options self.client = HTTPClient(loop=loop)
[docs] def request(self, method, path, headers=None, body=None, **params): headers = headers or {} headers.update({"Accept": "application/json"}) url = make_uri(self.uri, path, **params) method = method.upper() if (body is None) and method in ("POST", "PATCH", "PUT"): body = "" try: resp = self.client.fetch(url, method=method, headers=headers, body=body, **self.options) except httpclient.HTTPError as e: if method != "HEAD": # only raise on non head method since we are using head to # check status and so on. if e.code == 404: raise GafferNotFound(self.json_body(e.response)) elif e.code == 409: raise GafferConflict(self.json_body(e.response)) else: raise else: if e.response is not None: resp = e.response else: raise return resp
[docs] def json_body(self, resp): return json.loads(resp.body.decode('utf-8'))
@property
[docs] def version(self): """ get gaffer version """ resp = self.request("get", "/") return self.json_body(resp)['version']
[docs] def processes(self): """ get list of registered processes """ resp = self.request("get", "/processes") return self.json_body(resp)
[docs] def running(self): """ get list of running processes by pid """ resp = self.request("get", "/processes", running="true") return self.json_body(resp)
[docs] def get_process(self, name_or_id): """ get a process by name or id. If id is given a ProcessId instance is returned in other cases a Process instance is returned. """ resp = self.request("get", "/processes/%s" % name_or_id) process = self.json_body(resp) if isinstance(name_or_id, int): return ProcessId(server=self, pid=name_or_id, process=process) return Process(server=self, process=process)
[docs] def is_process(self, name): """ is the process exists ? """ resp = self.request("head", "/processes/%s" % name) if resp.code == 200: return True return False
[docs] def save_process(self, name, cmd, **kwargs): """ save a process. Args: - **name**: name of the process - **cmd**: program command, string) - **args**: the arguments for the command to run. Can be a list or a string. If **args** is a string, it's splitted using :func:`shlex.split`. Defaults to None. - **env**: a mapping containing the environment variables the command will run with. Optional - **uid**: int or str, user id - **gid**: int or st, user group id, - **cwd**: working dir - **detach**: the process is launched but won't be monitored and won't exit when the manager is stopped. - **shell**: boolean, run the script in a shell. (UNIX only), - os_env: boolean, pass the os environment to the program - numprocesses: int the number of OS processes to launch for this description If `_force_update=True` is passed, the existing process template will be overwritten. """ if "_force_update" in kwargs: force = kwargs.pop("_force_update") process = { "name": name, "cmd": cmd } process.update(kwargs) body = json.dumps(process) headers = {"Content-Type": "application/json" } if force: if self.is_process(name): self.request("put", "/processes/%s" % name, body=body, headers=headers) else: self.request("post", "/processes", body=body, headers=headers) else: self.request("post", "/processes", body=body, headers=headers) return Process(server=self, process=process)
[docs] def add_process(self, name, cmd, **kwargs): """ add a process. Use the same arguments as in save_process. If a process with the same name is already registred a `GafferConflict` exception is raised. """ kwargs["_force_update"] = False return self.save_process(name, cmd, **kwargs)
[docs] def update_process(self, name, cmd, **kwargs): """ update a process. """ kwargs["_force_update"] = True return self.save_process(name, cmd, **kwargs)
[docs] def remove_process(self, name): """ Stop a process and remove it from the managed processes """ self.request("delete", "/processes/%s" % name) return True
[docs] def send_signal(self, name_or_id, num_or_str): """ Send a signal to the pid or the process name """ if isinstance(num_or_str, six.string_types): signame = num_or_str.upper() if not signame.startswith('SIG'): signame = "SIG%s" % signame try: signum = getattr(signal, signame) except AttributeError: raise ValueError("invalid signal name") else: signum = num_or_str self.request('post', "/processes/%s/_signal/%s" % (name_or_id, signum)) return True
[docs] def groups(self): """ return the list of all groups """ resp = self.request("get", "/groups") return self.json_body(resp)
[docs] def get_group(self, name): """ return the list of all process templates of this group """ resp = self.request("get", "/groups/%s" % name) return self.json_body(resp)
[docs] def start_group(self, name): """ start all process templates of the group """ self.request("post", "/groups/%s/_start" % name) return True
[docs] def stop_group(self, name): """ stop all process templates of the group """ self.request("post", "/groups/%s/_stop" % name) return True
[docs] def restart_group(self, name): """ restart all process templates of the group """ self.request("post", "/groups/%s/_restart" % name) return True
[docs] def remove_group(self, name): """ remove the group and all process templates of the group """ self.request("delete", "/groups/%s" % name) return True
[docs] def get_watcher(self, heartbeat="true"): """ return a watcher to listen on /watch """ url = make_uri(self.uri, '/watch', feed='eventsource', heartbeat=str(heartbeat)) return Watcher(self.loop, url, **self.options)
[docs]class ProcessId(object): """ Process Id object. It represent a pid """ def __init__(self, server, pid, process): self.server = server self.pid = pid if isinstance(process, dict): self.process = process else: self.process = server.get_process(process) def __str__(self): return str(self.pid) @property
[docs] def active(self): """ return True if the process is active """ resp = self.server.request("head", "/processes/%s" % self.pid) if resp.code == 200: return True return False
[docs] def stop(self): """ stop the process """ self.server.request("post", "/processes/%s/_stop" % self.pid) return True
[docs] def signal(self, num_or_str): """ Send a signal to the pid """ if isinstance(num_or_str, six.string_types): signame = num_or_str.upper() if not signame.startswith('SIG'): signame = "SIG%s" % signame try: signum = getattr(signal, signame) except AttributeError: raise ValueError("invalid signal name") else: signum = num_or_str self.server.request("post", "/processes/%s/_signal/%s" % (self.pid, signum)) return True
[docs]class Process(object): """ Process object. Represent a remote process state""" def __init__(self, server, process): self.server = server if isinstance(process, dict): self.process = process else: self.process = server.get_process(process) def __str__(self): return self.process.get('name') def __getattr__(self, key): if key in self.process: return self.process[key] try: return self.__dict__[key] except KeyError as e: raise AttributeError(str(e)) @property
[docs] def active(self): """ return True if the process is active """ status = self.status() return status['active']
@property
[docs] def running(self): """ return the number of processes running for this template """ status = self.status() return status['running']
@property
[docs] def numprocesses(self): """ return the maximum number of processes that can be launched for this template """ status = self.status() return status['max_processes']
@property
[docs] def pids(self): """ return a list of running pids """ resp = self.server.request("get", "/processes/%s/_pids" % self.process['name']) result = self.server.json_body(resp) return result['pids']
[docs] def info(self): """ return the process info dict """ return self.process
[docs] def status(self): """ Return the status :: { "active": true, "running": 1, "numprocesses": 1 } """ resp = self.server.request("get", "/status/%s" % self.process['name']) return self.server.json_body(resp)
[docs] def start(self): """ start the process if not started, spawn new processes """ self.server.request("post", "/processes/%s/_start" % self.process['name']) return True
[docs] def stop(self): """ stop the process """ self.server.request("post", "/processes/%s/_stop" % self.process['name']) return True
[docs] def restart(self): """ restart the process """ self.server.request("post", "/processes/%s/_restart" % self.process['name']) return True
[docs] def add(self, num=1): """ increase the number of processes for this template """ resp = self.server.request("post", "/processes/%s/_add/%s" % (self.process['name'], num)) obj = self.server.json_body(resp) return obj['numprocesses']
[docs] def sub(self, num=1): """ decrease the number of processes for this template """ resp = self.server.request("post", "/processes/%s/_sub/%s" % (self.process['name'], num)) obj = self.server.json_body(resp) return obj['numprocesses']
[docs] def stats(self): resp = self.server.request("get", "/stats/%s" % self.process['name']) return self.server.json_body(resp)
[docs] def signal(self, num_or_str): """ send a signal to all processes of this template """ if isinstance(num_or_str, six.string_types): signame = num_or_str.upper() if not signame.startswith('SIG'): signame = "SIG%s" % signame try: signum = getattr(signal, signame) except AttributeError: raise ValueError("invalid signal name") else: signum = num_or_str self.server.request("post", "/processes/%s/_signal/%s" % (self.process['name'], signum)) return True # ----------- helpers
[docs]def url_quote(s, charset='utf-8', safe='/:'): """URL encode a single string with a given encoding.""" if isinstance(s, six.text_type): s = s.encode(charset) elif not isinstance(s, str): s = str(s) return quote(s, safe=safe)
[docs]def url_encode(obj, charset="utf8", encode_keys=False): items = [] if isinstance(obj, dict): for k, v in list(obj.items()): items.append((k, v)) else: items = list(items) tmp = [] for k, v in items: if encode_keys: k = encode(k, charset) if not isinstance(v, (tuple, list)): v = [v] for v1 in v: if v1 is None: v1 = '' elif six.callable(v1): v1 = encode(v1(), charset) else: v1 = encode(v1, charset) tmp.append('%s=%s' % (quote(k), quote_plus(v1))) return '&'.join(tmp)
[docs]def encode(v, charset="utf8"): if isinstance(v, six.text_type): v = v.encode(charset) else: v = str(v) return v
[docs]def make_uri(base, *args, **kwargs): """Assemble a uri based on a base, any number of path segments, and query string parameters. """ # get encoding parameters charset = kwargs.pop("charset", "utf-8") safe = kwargs.pop("safe", "/:") encode_keys = kwargs.pop("encode_keys", True) base_trailing_slash = False if base and base.endswith("/"): base_trailing_slash = True base = base[:-1] retval = [base] # build the path _path = [] trailing_slash = False for s in args: if s is not None and isinstance(s, six.string_types): if len(s) > 1 and s.endswith('/'): trailing_slash = True else: trailing_slash = False _path.append(url_quote(s.strip('/'), charset, safe)) path_str ="" if _path: path_str = "/".join([''] + _path) if trailing_slash: path_str = path_str + "/" elif base_trailing_slash: path_str = path_str + "/" if path_str: retval.append(path_str) params_str = url_encode(kwargs, charset, encode_keys) if params_str: retval.extend(['?', params_str]) return ''.join(retval)

This Page