1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 """
28 @summary: Base classes and exceptions for DB/files archiving tools.
29 @author: Ruben Reifenberg
30 """
31 import os
32 from rrlog.tool import traceToShortStr
33
36
38 """
39 the log to observe has an unknown (duck) type
40 """
41 pass
42
44
46 self._job = job
47 self.trigger_id = trigger_id
48
50 return getattr(self._job, name)
51
53 """
54 @return: new instance with my init kwargs but updated with the given kwargs
55 """
56 return self.__class__(self._job.copy_update(kwargs), self.trigger_id)
57
58
59
60
62 """
63 Archives log lines of defined categories (i.e.rescues them from log rotation.)
64 @attention: Not thread safe
65 @attention: There is a maximum number of problems saved (see __init__ arguments).
66 """
67
68 - def __init__(
69 self,
70 server,
71 minArchCount=10,
72 minClientArchCount=1,
73 cats=None,
74 maxCount=10000,
75 ):
76 """
77 @param minArchCount: >=1,default: 10. This is the count of messages that are at least archived
78 along with any problem message (the problem message itself included.)
79 Note 1: When a problem is archived shortly after another, no messages are saved twice even if the "preceding messages" intervals overlap.
80 Note 2: The effective number of archived lines is limited by the cached jobs (job history) of the server. See LogServer parameters.
81 @param minClientArchCount: >=1, default: 1. This is analogous but addresses messages
82 with the same client id as the "trigger message".
83 The intention is that we often want to save more preceding messages that concern the "problematic" client only.
84 Note: Unlike with minArchCount, messages are archived twice when the "preceding messages" intervals overlap, i.e. when problems are logged shortly after each other.
85 @param cats: list of cat; I archive log entries when cat is in this list.Default is "E","I","S"
86 (These are speaking keys meaning:
87 Error,InternalError,SecurityIssue.
88 Adapt it to your convention.)
89 @param maxCount: max. problem count per archiver process. Subsequent problems are ignored.
90 Note that the size of the archive (and meta table) can be limited this way, but since the limit is per-process,
91 the max row count of the meta table can be number-of-processes * maxCount !
92 (We may consider to make the meta table mandatory; then we always could query the table size there
93 just to ensure it does not grow too big. But it seems not worth the effort.)
94 """
95
96 if cats is None: cats = ("E","I","S")
97 self._enabled = True
98 self._server = server
99 self._lastPPTID = 0
100 self.cats = cats
101 assert minArchCount >= 1, "at least 1 message must be archived, not %s"%(minArchCount)
102 self.minArchCount = minArchCount
103 self.minClientArchCount = minClientArchCount
104 self.maxCount = maxCount
105 self._lcmi = [-1,-1]
106
107
109 """
110 row supports the dict interface
111 but unfortunately, sometimes we need a real dict
112 """
113 return job
114 res = {}
115 for k,v in row.items():
116 res[str(k)] = v
117 return res
118
125
127 """
128 Alternative: Make the meta table mandatory and use an auto increment column for
129 db-unique ids. But support for some DBs is not clearified yet.
130 @rtype: int
131 @return: database-unique id of the archive-triggering message
132 Currently,composed of os-pid and Per-Process Trigger ID
133 """
134 self._lastPPTID += 1
135 if self._lastPPTID >= self.maxCount:
136 self._enabled = False
137 raise ArchiverError("max per-process archive count (%d) reached"%(self.maxCount))
138 return int("%d%d"%(ospid,self._lastPPTID))
139
140
141 - def observe(self, jobhist, writer):
142 """
143 The log observer protocol
144 @param jobhist: list,len>=1,ordered recent jobs, [-1] is the lastest (i.e.current).
145 @param writer: unused. The jobhist only is used for archiving.
146 @raise ArchiverError: When my maxCount is reached
147 """
148 if not self._enabled: return
149 currentJob = jobhist[-1]
150 trigger_id = self._next_trigger_id(ospid=currentJob.pid)
151 if currentJob.cat in self.cats:
152 self._writeMeta(
153 trigger_id=trigger_id,
154 clientid=currentJob.clientid,
155 msgid=currentJob.msgid,
156 )
157
158
159 currentClientid = currentJob.clientid
160 lenJobhist = len(jobhist)
161
162
163 allIndex = max(0,lenJobhist-self.minArchCount)
164
165
166 clientidIndex = 0
167 tmp = 0
168 for i in range(lenJobhist-1,-1,-1):
169 if jobhist[i].clientid==currentClientid: tmp+=1
170 if tmp>=self.minClientArchCount:
171 clientidIndex=i
172 break
173 lcmi_upper=self._lcmi[1]
174 lcmi_lower=self._lcmi[0]
175 for i in range(min(allIndex,clientidIndex),len(jobhist)):
176 job = jobhist[i]
177 if i>=allIndex or job.clientid==currentClientid:
178
179
180 if job.msgid>lcmi_upper or job.msgid<lcmi_lower:
181
182
183 try:
184 self._server.logJob(
185 _MsgJob(job,trigger_id=trigger_id),
186 False,
187 )
188 except Exception,e:
189 print "Error processing job:%s"%(self._dictFromJob(currentJob))
190 print traceToShortStr(8)
191 raise
192
193
194 self._lcmi = (jobhist[allIndex].msgid,jobhist[-1].msgid)
195 return True
196