Package rrlog :: Package contrib :: Module catarch
[hide private]
[frames] | no frames]

Source Code for Module rrlog.contrib.catarch

  1  #Copyright (c) 2007 
  2  #        Ruben Reifenberg, Germany, 07381. 
  3  #    All rights reserved. 
  4  # 
  5  #Redistribution and use in source and binary forms, with or without 
  6  #modification, are permitted provided that the following conditions 
  7  #are met: 
  8  #1. Redistributions of source code must retain the above copyright 
  9  #   notice, this list of conditions and the following disclaimer as 
 10  #   the first lines of this file unmodified. 
 11  #2. Redistributions in binary form must reproduce the above copyright 
 12  #   notice, this list of conditions and the following disclaimer in the 
 13  #   documentation and/or other materials provided with the distribution. 
 14  # 
 15  # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 
 16  # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 17  # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 18  # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 
 19  # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 
 20  # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 
 21  # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 
 22  # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 23  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 
 24  # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 
 25  # SUCH DAMAGE. 
 26   
 27  """ 
 28  @summary: Base classes and exceptions for DB/files archiving tools. 
 29  @author: Ruben Reifenberg 
 30  """ 
 31  import os 
 32  from rrlog.tool import traceToShortStr 
 33   
34 -class ArchiverError(Exception):
35 pass
36
37 -class ObservedLogType(Exception):
38 """ 39 the log to observe has an unknown (duck) type 40 """ 41 pass
42
43 -class _MsgJob(object):
44
45 - def __init__(self, job, trigger_id):
46 self._job = job 47 self.trigger_id = trigger_id
48
49 - def __getattr__(self, name):
50 return getattr(self._job, name)
51
52 - def copy_update(self, kwargs):
53 """ 54 @return: new instance with my init kwargs but updated with the given kwargs 55 """ 56 return self.__class__(self._job.copy_update(kwargs), self.trigger_id)
57 58 59 60
61 -class CatArchiver(object):
62 """ 63 Archives log lines of defined categories (i.e.rescues them from log rotation.) 64 @attention: Not thread safe 65 @attention: There is a maximum number of problems saved (see __init__ arguments). 66 """ 67
68 - def __init__( 69 self, 70 server, 71 minArchCount=10, 72 minClientArchCount=1, 73 cats=None, 74 maxCount=10000, 75 ):
76 """ 77 @param minArchCount: >=1,default: 10. This is the count of messages that are at least archived 78 along with any problem message (the problem message itself included.) 79 Note 1: When a problem is archived shortly after another, no messages are saved twice even if the "preceding messages" intervals overlap. 80 Note 2: The effective number of archived lines is limited by the cached jobs (job history) of the server. See LogServer parameters. 81 @param minClientArchCount: >=1, default: 1. This is analogous but addresses messages 82 with the same client id as the "trigger message". 83 The intention is that we often want to save more preceding messages that concern the "problematic" client only. 84 Note: Unlike with minArchCount, messages are archived twice when the "preceding messages" intervals overlap, i.e. when problems are logged shortly after each other. 85 @param cats: list of cat; I archive log entries when cat is in this list.Default is "E","I","S" 86 (These are speaking keys meaning: 87 Error,InternalError,SecurityIssue. 88 Adapt it to your convention.) 89 @param maxCount: max. problem count per archiver process. Subsequent problems are ignored. 90 Note that the size of the archive (and meta table) can be limited this way, but since the limit is per-process, 91 the max row count of the meta table can be number-of-processes * maxCount ! 92 (We may consider to make the meta table mandatory; then we always could query the table size there 93 just to ensure it does not grow too big. But it seems not worth the effort.) 94 """ 95 96 if cats is None: cats = ("E","I","S") 97 self._enabled = True 98 self._server = server 99 self._lastPPTID = 0 # last Per-Process Trigger ID 100 self.cats = cats 101 assert minArchCount >= 1, "at least 1 message must be archived, not %s"%(minArchCount) 102 self.minArchCount = minArchCount 103 self.minClientArchCount = minClientArchCount 104 self.maxCount = maxCount 105 self._lcmi = [-1,-1] # Last-written Complete Msgid Interval. Both indexes included.
106 107
108 - def _dictFromJob(self, job):
109 """ 110 row supports the dict interface 111 but unfortunately, sometimes we need a real dict 112 """ 113 return job 114 res = {} 115 for k,v in row.items(): 116 res[str(k)] = v #str because SQLAlchemy >=0.4.0 expects str (not unicode) 117 return res
118
119 - def _writeMeta(self,trigger_id,clientid,msgid):
120 """ 121 hook, override to write meta data (one line for each problem) 122 #todo: inconsistent col names (suffixed by both "id" and "_id") 123 """ 124 pass
125
126 - def _next_trigger_id(self, ospid):
127 """ 128 Alternative: Make the meta table mandatory and use an auto increment column for 129 db-unique ids. But support for some DBs is not clearified yet. 130 @rtype: int 131 @return: database-unique id of the archive-triggering message 132 Currently,composed of os-pid and Per-Process Trigger ID 133 """ 134 self._lastPPTID += 1 135 if self._lastPPTID >= self.maxCount: 136 self._enabled = False 137 raise ArchiverError("max per-process archive count (%d) reached"%(self.maxCount)) 138 return int("%d%d"%(ospid,self._lastPPTID))
139 140
141 - def observe(self, jobhist, writer):
142 """ 143 The log observer protocol 144 @param jobhist: list,len>=1,ordered recent jobs, [-1] is the lastest (i.e.current). 145 @param writer: unused. The jobhist only is used for archiving. 146 @raise ArchiverError: When my maxCount is reached 147 """ 148 if not self._enabled: return 149 currentJob = jobhist[-1] 150 trigger_id = self._next_trigger_id(ospid=currentJob.pid) 151 if currentJob.cat in self.cats: 152 self._writeMeta( 153 trigger_id=trigger_id, 154 clientid=currentJob.clientid, 155 msgid=currentJob.msgid, 156 ) 157 158 # write >=1 lines in problem table: 159 currentClientid = currentJob.clientid 160 lenJobhist = len(jobhist) 161 162 # index from where (including) to archive all jobs 163 allIndex = max(0,lenJobhist-self.minArchCount) 164 165 # index from where (including) to archive jobs with current client id 166 clientidIndex = 0 167 tmp = 0 # count msgs with right client id 168 for i in range(lenJobhist-1,-1,-1): 169 if jobhist[i].clientid==currentClientid: tmp+=1 170 if tmp>=self.minClientArchCount: 171 clientidIndex=i 172 break 173 lcmi_upper=self._lcmi[1] 174 lcmi_lower=self._lcmi[0] 175 for i in range(min(allIndex,clientidIndex),len(jobhist)): 176 job = jobhist[i] 177 if i>=allIndex or job.clientid==currentClientid: 178 # we are either in the range where all jobs are archived, 179 # or the job has the right (current) client id. 180 if job.msgid>lcmi_upper or job.msgid<lcmi_lower: 181 # if job is outside the lmci interval. 182 # otherwise we'd know it is already archived (with an older problem) 183 try: 184 self._server.logJob( 185 _MsgJob(job,trigger_id=trigger_id), 186 False, 187 ) 188 except Exception,e: 189 print "Error processing job:%s"%(self._dictFromJob(currentJob)) 190 print traceToShortStr(8) 191 raise 192 # raise Exception("%s, TMP %s"%(e, self._dictFromJob(currentJob))) 193 194 self._lcmi = (jobhist[allIndex].msgid,jobhist[-1].msgid) 195 return True
196