# Copyright (c) 2007 Ruben Reifenberg
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""
@summary: Tools not specific for logging purpose
@author: Ruben Reifenberg
"""
import time
[docs]class ListRotator(object):
"""
Iteriert durch eine gegebene Liste, beginnend mit erstem Element.
beginnt im Gegensatz zum Iterator wieder von vorn.
@ivar i: index of last returned element
"""
[docs] def __init__(self, list):
"""
:param list: len >=1
list needs to be constant (behaviour not defined for later changes)
"""
self.list = list
self.i = -1
[docs] def next(self):
"""
Not threadsafe.
:returns: next list element, rotating
:raises IndexError: if list has length 0
"""
self.i += 1
if self.i >= len(self.list):
self.i = 0
return self.list[self.i]
[docs] def len(self):
return len(self.list)
[docs]def _cutcount(l, max):
"""
How many elements to cut away from something with len l,
when we want restlen+len(decimal-digit-of-cut-count) < max?
:rtype: int
:returns: count of chars to cut away (cutcount) (can be > l when max is too low)
"""
if l<=max: return 0
else:
cdl=1 #cutcount digit len (e.g. 3 for 100..999)
cut=10 #always 10^cdl
while True:
if l-cut+cdl<max:
return l-max+cdl
cut=cut*10
cdl+=1
[docs]def lu2a(unistr, max=50, errorstr="~~~"):
"""
Limited Unicode To Ascii (limited: the result length is limited)
Intended for ASCII-logging of untrusted (e.g.user input) unicode strings.
The length limitation is intended for storage in length-limited ASCII Database fields.
:rtype: ASCII str or None
:returns: The argument but with: All non-ascii chars replaced by "\\xhexvalue".
If N chars (of the result string) are thrown away, "[N+]" is appened at the end.
:param unistr: Python Unicode or None
:param max: >=3, length limit of the RESULT STRING (default 50)
:param errorstr: Returned in case max is too low to return the regular result.
"""
assert max >= 3
if unistr is None: return None
res = unistr.encode("ascii","backslashreplace")
if len(res)>max:
cutcount = _cutcount(len(res),max-3) #3 because "[+]"
if cutcount<=len(res):
res = "%s[%d+]"%(res[:-cutcount],cutcount)
else:
#max is too low to display even the count of cut chars
res = errorstr
return res
[docs]def lu2a_de(unistr, max=50, errorstr="~~~"):
"""
Note: In case of unistr containing non-ascii characters, this method
gets slow (because it does a python loop over each character.)
@see: lu2a
Additionally, the german umlauts are replaced by AE,OE...
to make it more readable (by the drawback of information loss, of course.)
"""
assert max >= 3
if unistr is None: return None
try:
# fastest if possible
res = unistr.encode("ascii")
except UnicodeError:
# slow.
# Could be faster. But this wants to be is a side effect free library
# and we don't want to register a callback handler in codecs globally.
res = ""
for char in unistr: #don't care for speed. Assume this is seldom.
try:
res += char.encode("ascii")
except UnicodeError:
try:
res += {
u"\u00e4":"ae",
u"\u00f6":"oe",
u"\u00fc":"ue",
u"\u00c4":"AE",
u"\u00d6":"OE",
u"\u00dc":"UE",
u"\u00df":"ss",
}[char]
except KeyError:
res += char.encode("ascii","backslashreplace") #"<%d>"%(ord(char))
if len(res)>max:
cutcount = _cutcount(len(res),max-3) #3 because "[+]"
if cutcount<=len(res):
res = "%s[%d+]"%(res[:-cutcount],cutcount)
else:
#max is too low to display even the count of cut chars
res = errorstr
return res
[docs]def mStrftime(dt, formatStr):
"""
:param dt: datetime.datetime
:param formatStr: strftime format string for dt
with an extension: %3N is milliseconds
:returns: str, made by dt.strftime
"""
formatStr = formatStr.replace("%3N",str(dt.microsecond/1000))
return dt.strftime(formatStr)
[docs]def tm_strftime(format, t, ms):
"""
:param t: 9-tuple, as returned by time.localtime()
:param format: strftime format string
with an extension: %3N is milliseconds
:param ms: microseconds as int (0..999)
:returns: str, made by time.strftime
"""
format = format.replace("%3N",str(ms))
return time.strftime(format, t)
[docs]def traceToShortStr(maxLines=3,exc_info=None,use_cr=True):
"""
:param exc_info: as given by sys.exc_info(). If None, it is obtained by calling sys.exc_info
:param use_cr: If True, "\\n" is used between the path lines. Otherwise, "<" will separate the lines.
:returns: short str describing the current ex.stacktrace (end-first),
"" if there is no current exc.
"""
import sys,traceback
if exc_info is None:
type,value,exc_trace = sys.exc_info()
else:
type,value,exc_trace = exc_info[0],exc_info[1],exc_info[2]
if use_cr:
sep = "\n"
else:
sep = "<"
trace=traceback.extract_tb(exc_trace)
lines = traceback.format_list(trace)
res = "" # for maxLines==0 or no exception traceback available
for i in range(0, len(lines)):
if i==maxLines:
break
if i==0:
res = "%s%s%s"%(value,sep,lines[-i-1]) #order beginning with end
else:
res += "%s%s"%(sep,lines[-i-1])
return res
[docs]class StrIndenter(object):
"""
Create a String consisting of "depth" identical tokens.
Intended for output indention.
"""
[docs] def __init__(self, token=" ", offset=0):
"""
:param token: str. The resulting string will consist of this tokens.
"""
self.tara(offset) #self._offset = 0
self.token = token
[docs] def tara(self, depth, tara=0):
"""
set string/indention lengt 0 for the current stack depth
:param tara: is added to depth (depth:=depth+tara instead of explicit tara has the same result)
"""
self._offset = depth+tara
[docs] def __call__(self, depth):
"""
:returns: string, consisting of the given tokens
"""
return self.token * (depth-self._offset)
[docs]def mail_smtp(server,serverpw,to_address,from_address,loginuser,subject,content,charset="latin-1"):
"""
Send subject/content, as latin-1 by default
String-Parameter; subject/content may be unicode or str.
:returns: None
:param server: e.g.."mail.gmx.net"
:param serverpw: SMTP server password
"""
import sys
import smtplib
import socket
from email.MIMEText import MIMEText
try:
server = smtplib.SMTP(server)
#server.set_debuglevel(1)
except socket.gaierror:
e,f,g = sys.exc_info()
msg = "ERROR:%s,%s \n%s"%(e,f, traceToShortStr())
sys.stderr.write(msg)
sys.stderr.write("(Network down? No DNS resolution?)")
return
except:
e,f,g = sys.exc_info()
msg = "ERROR:%s,%s \n%s"%(e,f, traceToShortStr())
sys.stderr.write(msg)
return
try:
server.login(loginuser, serverpw)
except:
e,f,g = sys.exc_info()
msg = "ERROR login:%s,%s\n%s"%(e,f, traceToShortStr())
sys.stderr.write(msg)
return
msg = MIMEText(content,_charset=charset)
msg["From"] = from_address
msg["To"] = to_address
msg["Subject"] = subject
try:
server.sendmail(from_address, (to_address,), msg.as_string())
except:
e,f,g = sys.exc_info()
msg = "ERROR sendmail:%s,%s"%(e,f)
sys.stderr.write(msg)
msg = u"ERROR sendmail; server=%s, from=%s, to=%s"%(server, from_address, to_address)
sys.stderr.write(msg.encode("ascii","backslashreplace"))
try:
server.quit()
except:
e,f,g = sys.exc_info()
print "ERROR quit:%s,%s"%(e,f)