Source code for gaffer.util
# -*- coding: utf-8 -
#
# This file is part of gaffer. See the NOTICE for more information.
import grp
import os
import pwd
import resource
import socket
import string
import time
import six
if six.PY3:
def bytestring(s):
return s
else:
[docs] def bytestring(s):
if isinstance(s, unicode):
return s.encode('utf-8')
return s
_SYMBOLS = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
MAXFD = 1024
if hasattr(os, "devnull"):
REDIRECT_TO = os.devnull
else:
REDIRECT_TO = "/dev/null"
try:
from setproctitle import setproctitle
[docs] def setproctitle_(title):
setproctitle(title)
except ImportError:
def setproctitle(_title):
return
[docs]def getcwd():
"""Returns current path, try to use PWD env first"""
try:
a = os.stat(os.environ['PWD'])
b = os.stat(os.getcwd())
if a.ino == b.ino and a.dev == b.dev:
working_dir = os.environ['PWD']
else:
working_dir = os.getcwd()
except:
working_dir = os.getcwd()
return working_dir
[docs]def check_uid(val):
"""Return an uid, given a user value.
If the value is an integer, make sure it's an existing uid.
If the user value is unknown, raises a ValueError.
"""
if isinstance(val, six.integer_types):
try:
pwd.getpwuid(val)
return val
except (KeyError, OverflowError):
raise ValueError("%r isn't a valid user id" % val)
if not isinstance(val, str):
raise TypeError(val)
try:
return pwd.getpwnam(val).pw_uid
except KeyError:
raise ValueError("%r isn't a valid user val" % val)
[docs]def check_gid(val):
"""Return a gid, given a group value
If the group value is unknown, raises a ValueError.
"""
if isinstance(val, int):
try:
grp.getgrgid(val)
return val
except (KeyError, OverflowError):
raise ValueError("No such group: %r" % val)
if not isinstance(val, str):
raise TypeError(val)
try:
return grp.getgrnam(val).gr_gid
except KeyError:
raise ValueError("No such group: %r" % val)
[docs]def bytes2human(n):
"""Translates bytes into a human repr.
"""
if not isinstance(n, six.integer_types):
raise TypeError(n)
prefix = {}
for i, s in enumerate(_SYMBOLS):
prefix[s] = 1 << (i + 1) * 10
for s in reversed(_SYMBOLS):
if n >= prefix[s]:
value = int(float(n) / prefix[s])
return '%s%s' % (value, s)
return "%sB" % n
[docs]def parse_address(netloc, default_port=8000):
if netloc.startswith("unix:"):
return netloc.split("unix:")[1]
# get host
if '[' in netloc and ']' in netloc:
host = netloc.split(']')[0][1:].lower()
elif ':' in netloc:
host = netloc.split(':')[0].lower()
elif netloc == "":
host = "0.0.0.0"
else:
host = netloc.lower()
#get port
netloc = netloc.split(']')[-1]
if ":" in netloc:
port = netloc.split(':', 1)[1]
if not port.isdigit():
raise RuntimeError("%r is not a valid port number." % port)
port = int(port)
else:
port = default_port
return (host, port)
[docs]def is_ipv6(addr):
try:
socket.inet_pton(socket.AF_INET6, addr)
except socket.error: # not a valid address
return False
return True
[docs]def get_maxfd():
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if (maxfd == resource.RLIM_INFINITY):
maxfd = MAXFD
return maxfd
try:
from os import closerange
except ImportError:
def closerange(fd_low, fd_high): # NOQA
# Iterate through and close all file descriptors.
for fd in range(fd_low, fd_high):
try:
os.close(fd)
except OSError: # ERROR, fd wasn't open to begin with (ignored)
pass
# http://www.svbug.com/documentation/comp.unix.programmer-FAQ/faq_2.html#SEC16
[docs]def daemonize():
"""Standard daemonization of a process.
"""
#if not 'CIRCUS_PID' in os.environ:
if os.fork():
os._exit(0)
os.setsid()
if os.fork():
os._exit(0)
os.umask(0)
maxfd = get_maxfd()
closerange(0, maxfd)
os.open(REDIRECT_TO, os.O_RDWR)
os.dup2(0, 1)
os.dup2(0, 2)
[docs]def nanotime(s=None):
""" convert seconds to nanoseconds. If s is None, current time is
returned """
if s is not None:
return int(s) * 1000000000
return time.time() * 1000000000
[docs]def from_nanotime(n):
""" convert from nanotime to seconds """
return n / 1.0e9
[docs]def substitute_env(s, env):
return string.Template(s).substitute(env)