Logger service using epoll - Part 1

Update. There is a continuation of this post.

Suppose we have a number of microservices and we need some way to easily handle logging from all of them.

Graylog might be a good choice but in case you don't need such a full-featured solution self-made logging service can serve you well.

Python's logging facility has almost everything we need for this providing SocketHandler to send log records to a network socket. The missed part is actual logging service which will be able to receive and save those records. Logging Cookbook contains basic implementation of such a service which is good for a quick start.

I was experimenting with curio as an engine for such a service and it worked pretty well and certainly might be a good choice for this task. The only thing forced me to look for another solution was it's Python 3 dependency. All microservices I need logging service for are using Python 2 and I'd like it to use the very same python.

So, after a bit more experiments I came up with edge-triggered epoll-based solution which is compatible with Python 2 and Python 3. The code for the service is available here.

It took some time to understand epoll mechanics and this article by Scot Doyle helped a lot. My local test confirms service is pretty stable and properly handle incoming logging records.

Code in gist is rather generic and I'm going to add some features, but they won't be published there, sorry for that:

  • use some magic number in message header as a simple measure to prevent unknown clients from spamming (even though the service will listen on localhost)
  • enforce message size limits
  • store logging records in database

Another part of this story is SocketHandler which has a feature I'd like to remove - createSocket method implemets exponential back-off algorithm which I just don't need. During my tests intentionally broken (just for tests) connection to logging service leads to logging records being missed.

To be clear with the problem. Suppose we have a server and a microservice. Everything works as expected and is in good shape. Some time later we need to update the code for logging service and restart it. Using SocketHandler from python's standard library will force us to restart our microservice because without this step we will be missing at least one or two logging records from it (due to the way this handler works).

I wanted a way to automatically handle such a situation without user intervention. So, below is a modified version of SocketHandler. Every time it needs to handle a record it will try to connect to logging service to guarantee message delivery. It uses small trick to detect socket state and to re-establish connection when needed.

# -*- coding: utf-8 -*-

from __future__ import absolute_import

import logging
import socket
import struct
import traceback
try:
    import cPickle as pickle
except ImportError:  # pragma: no cover
    import pickle


class SocketHandler(logging.Handler):
    """
    [!] modified version of python's logging.handlers.SocketHandler

    """

    def __init__(self, host, port):
        logging.Handler.__init__(self)
        self.host = host
        self.port = port
        self._sock = None
        self.closeOnError = 1
        self.retry_count = 3

    def makeSocket(self):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            s.connect((self.host, self.port))
        except socket.error:
            # TODO sentry
            traceback.print_exc()
            s = None
        else:
            s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
        return s

    def _get_sock(self):
        if self._sock is None:
            self._sock = self.makeSocket()
        return self._sock

    def _set_sock(self, value):
        if value is None and self._sock:
            self._sock.close()
        self._sock = value

    sock = property(_get_sock, _set_sock)

    def get_sock_state(self, s):
        # see http://stackoverflow.com/a/18189190
        fmt = 'B' * 7 + 'I' * 24
        sz = struct.calcsize(fmt)
        x = struct.unpack(
            fmt, s.getsockopt(socket.IPPROTO_TCP, socket.TCP_INFO, sz))
        return x

    def send(self, s):
        retries = self.retry_count
        while retries > 0:
            sock = self.sock
            retries -= 1
            if sock:
                state = self.get_sock_state(sock)
                # check tcp_info.tcpi_state value is TCP_ESTABLISHED
                if state[0] != 1:
                    self.sock = None
                    continue
                try:
                    if hasattr(sock, 'sendall'):
                        sock.sendall(s)
                    else:
                        sentsofar = 0
                        left = len(s)
                        while left > 0:
                            sent = sock.send(s[sentsofar:])
                            sentsofar = sentsofar + sent
                            left = left - sent
                    break
                except socket.error:
                    self.sock = None

    def makePickle(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record)  # noqa
            record.exc_info = None  # to avoid Unpickleable error
        d = dict(record.__dict__)
        d['msg'] = record.getMessage()
        d['args'] = None
        s = pickle.dumps(d, pickle.HIGHEST_PROTOCOL)
        if ei:
            record.exc_info = ei  # for next handler
        slen = struct.pack('>L', len(s))
        return slen + s

    def handleError(self, record):
        if self.closeOnError and self.sock:
            self.sock = None
        else:
            logging.Handler.handleError(self, record)

    def emit(self, record):
        try:
            s = self.makePickle(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            # TODO sentry
            traceback.print_exc()
            self.handleError(record)

    def close(self):
        self.acquire()
        try:
            if self.sock:
                self.sock = None
        finally:
            self.release()
        logging.Handler.close(self)

I'll put logging service and this handler in production next month. Stay tuned.

Last modified: 2017-03-24 06:45:00 +00:00 UTC