Source code for rrlog.server.socketserver

# Copyright (c) 2007 Ruben Reifenberg
# 
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# 
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.


"""
Besides the thread(s) reading the socket, there is a single worker thread which feeds the log server.
To stop the threaded server process (Ctrl-C prbably won't work) 
use the specified module variables to regularly end it without log line loss, or kill the process.
"""


import sys
import logging.handlers
import SocketServer
import struct
import threading
import collections
import socket
import datetime
import time

from rrlog.globalconst import remoteloads,warn
from rrlog import globalconst

maxlen_jobdataq = 100000
socketreceiver = None # .abort=True in the LogRecordSocketReceiver to exit
rrlog_server = None
jobdataq = collections.deque() # alt: Queue.Queue
processq_stop = False # to exit the worker thread
counter = 0


[docs]def _i_am_orphan(): # siehe rrlog.mail.py, wenn wirklich genutzt, kann sie zu rrlog.tool.i_am_orphan_thread werden current = threading.currentThread() for thread in threading.enumerate(): if not thread.isDaemon() and (thread is not current): return False return True
[docs]def processq(): """ Loop which forever pops the oldest (left) element from the jobdataq and calls the log server. The global variable processq_stop=True ends the loop, but not before the jobdataq is found empty. """ while True: try: pickled = jobdataq.popleft() except IndexError: if processq_stop or _i_am_orphan(): return else: try: jobdata = remoteloads(pickled) except Exception,e: # no exit. there may be a process sending erroneously to me warn("serialization protocol error: deserialize jobdata failed; a job is skipped (%s)"%e) else: if "ping" == jobdata: # Someone wants to know whether I'm alive pass else: rrlog_server.log(jobdata)
[docs]class LogRecordStreamHandler(SocketServer.StreamRequestHandler): """Handler for a streaming logging request. This basically logs the record using whatever logging policy is configured locally. """
[docs] def handle(self): """ Handle multiple requests - each expected to be a 4-byte length, followed by the LogRecord in pickle format. Logs the record according to whatever policy is configured locally. """ #raise ValueError("merkt der Client das?") nein, der Client merkt nix, und blockiert auch nicht. overfull_warned = False # i've already emitted an overfull warning while True: chunk = self.connection.recv(4) if len(chunk) < 4: break slen = struct.unpack(">L", chunk)[0] try: chunk = self.connection.recv(slen) except MemoryError: # already seen when connecting an XMLRPC client by mistake ... sys.stderr.write("corrupt data, did you use the right socket client ?") raise while len(chunk) < slen: chunk = chunk + self.connection.recv(slen - len(chunk)) # obj = self.unPickle(chunk) # self.rrlog_server.log(obj) if len(jobdataq) <= maxlen_jobdataq: # deque of python2.6 has maxlen but that throws away oldest elements jobdataq.append(chunk) elif not overfull_warned: # this is not exact with multiple threads (e.g.ThreadedTCPServer). # But without harm because, when getting near the queue size limit, arbitrary jobs will be skipped in any case. warn("skipping job because queue len > %s. Subsequent warnings of that type are disabled."%maxlen_jobdataq) # may appear >1 times with >1 threads overfull_warned = True # there is also SocketServer.ThreadingTCPServer # but should a log really load multiple cpu cores ? # moreover, that would require the rotation to be threadsafe
tcpservercls = SocketServer.TCPServer
[docs]class LogRecordSocketReceiver(tcpservercls): """simple TCP socket-based logging receiver suitable for testing. """ allow_reuse_address = 1
[docs] def __init__(self, host, ports, handler=LogRecordStreamHandler): for port in ports: try: tcpservercls.__init__(self, (host, port), handler) except socket.error,e: warn("port %s not available:%s"%(port, e)) port = None if not port: raise self.abort = 0 self.timeout = 1 self.logname = None self.host = host self.port = port
[docs] def serve_until_stopped(self): import select abort = 0 while not abort: rd, wr, ex = select.select( [self.socket.fileno()], [], [], self.timeout ) if rd: self.handle_request() abort = self.abort
[docs]def startServer(logServer, host="localhost", ports=(globalconst.DEFAULTPORT_SOCKET,)): """ Run the given logServer as an xmlrpc server (forever). :param ports: sequence of portnumbers, at least one number. The first port available is used. Multiple ports is for development, where sometimes ports may remain blocked. In production, better use a single port only, for best control over which server/client pairs are married. """ global socketreceiver global rrlog_server rrlog_server = logServer socketreceiver = LogRecordSocketReceiver(host, ports) # print("About to start TCP server...") import os t = threading.Thread(target=processq) t.start() print "%s:log server ready. Available at host,port: %s.Pid=%s, thread.ident=%s"%( datetime.datetime.now(), str( (socketreceiver.host, socketreceiver.port)), os.getpid(), t.ident ) socketreceiver.serve_until_stopped()