Skip to content

support pymongo connecting in background #51

@chuckliu1979

Description

@chuckliu1979

hi,

in pymongo https://github.com/mongodb/mongo-python-driver/blob/master/pymongo/mongo_client.py:
"```python

Starting with version 3.0 the MongoClient constructor no longer blocks while connecting

to the server or servers, and it no longer raises ConnectionFailure if they are unavailable.

Instead, the constructor returns immediately and launches the connection process on background

threads.


so log4mongo handler could continue initialize when mongodb not running. I made a patch for this:
> import threading
4,8c15
< try:
<     from pymongo import MongoClient as Connection
< except ImportError:
<     from pymongo import Connection
< 
---
> from pymongo import MongoClient
10,18c17,19
< from pymongo.errors import OperationFailure, PyMongoError
< import pymongo
< if pymongo.version_tuple[0] >= 3:
<     from pymongo.errors import ServerSelectionTimeoutError
<     write_method = 'insert_one'
<     write_many_method = 'insert_many'
< else:
<     write_method = 'save'
<     write_many_method = 'insert'
---
> from pymongo.errors import ConfigurationError, OperationFailure, ServerSelectionTimeoutError
> 
> # pylint: disable=pointless-string-statement,invalid-name
46a48
> # pylint: disable=too-many-instance-attributes,too-many-arguments,too-many-locals
49,50c51
<     DEFAULT_PROPERTIES = logging.LogRecord(
<         '', '', '', '', '', '', '', '').__dict__.keys()
---
>     DEFAULT_PROPERTIES = logging.LogRecord("", "", "", "", "", "", "", "").__dict__.keys()  # type: ignore
56,65c57,66
<             'timestamp': dt.datetime.utcnow(),
<             'level': record.levelname,
<             'thread': record.thread,
<             'threadName': record.threadName,
<             'message': record.getMessage(),
<             'loggerName': record.name,
<             'fileName': record.pathname,
<             'module': record.module,
<             'method': record.funcName,
<             'lineNumber': record.lineno
---
>             "timestamp": dt.datetime.utcnow(),
>             "level": record.levelname,
>             "thread": record.thread,
>             "threadName": record.threadName,
>             "message": record.getMessage(),
>             "loggerName": record.name,
>             "fileName": record.pathname,
>             "module": record.module,
>             "method": record.funcName,
>             "lineNumber": record.lineno,
69,75c70,72
<             document.update({
<                 'exception': {
<                     'message': str(record.exc_info[1]),
<                     'code': 0,
<                     'stackTrace': self.formatException(record.exc_info)
<                 }
<             })
---
>             document.update(
>                 {"exception": {"message": str(record.exc_info[1]), "code": 0, "stackTrace": self.formatException(record.exc_info)}}
>             )
78,79c75
<             contextual_extra = set(record.__dict__).difference(
<                 set(self.DEFAULT_PROPERTIES))
---
>             contextual_extra = set(record.__dict__).difference(set(self.DEFAULT_PROPERTIES))
87,92c83,101
< 
<     def __init__(self, level=logging.NOTSET, host='localhost', port=27017,
<                  database_name='logs', collection='logs',
<                  username=None, password=None, authentication_db='admin',
<                  fail_silently=False, formatter=None, capped=False,
<                  capped_max=1000, capped_size=1000000, reuse=True, **kwargs):
---
>     def __init__(
>         self,
>         level=logging.NOTSET,
>         host="localhost",
>         port=27017,
>         database_name="logs",
>         collection="logs",
>         username=None,
>         password=None,
>         authentication_db="admin",
>         fail_silently=False,
>         formatter=None,
>         capped=False,
>         capped_max=1000,
>         capped_size=1000000,
>         reuse=True,
>         ttl=3600,
>         **kwargs,
>     ):
122a132,133
>         self.ttl = ttl
>         self.ttl_index = ""
127c138
<         global _connection
---
>         global _connection  # pylint: disable=global-statement
131,150c142,148
<             if pymongo.version_tuple[0] < 3:
<                 try:
<                     self.connection = Connection(host=self.host,
<                                                  port=self.port, **kwargs)
<                 # pymongo >= 3.0 does not raise this error
<                 except PyMongoError:
<                     if self.fail_silently:
<                         return
<                     else:
<                         raise
<             else:
<                 self.connection = Connection(host=self.host, port=self.port,
<                                              **kwargs)
<                 try:
<                     self.connection.is_primary
<                 except ServerSelectionTimeoutError:
<                     if self.fail_silently:
<                         return
<                     else:
<                         raise
---
>             self.connection = MongoClient(host=self.host, port=self.port, **kwargs)
>             try:
>                 self.connection.is_primary
>             except ServerSelectionTimeoutError:
>                 if self.fail_silently:
>                     return
>                 raise
151a150
>         self._setup()
153c152,153
<         self.db = self.connection[self.database_name]
---
>     def _setup(self):
>         self.db = self.connection[self.database_name]  # type: ignore
155,157c155,156
<             auth_db = self.connection[self.authentication_database_name]
<             self.authenticated = auth_db.authenticate(self.username,
<                                                       self.password)
---
>             auth_db = self.connection[self.authentication_database_name]  # type: ignore
>             self.authenticated = auth_db.authenticate(self.username, self.password)
164,166c163
<                 self.collection = Collection(self.db, self.collection_name,
<                                              capped=True, max=self.capped_max,
<                                              size=self.capped_size)
---
>                 self.collection = Collection(self.db, self.collection_name, capped=True, max=self.capped_max, size=self.capped_size)
172a170,178
>     def _ensure_log4mongo_index(self):
>         if not self.ttl_index and self.ttl > 0:
>             try:
>                 self.ttl_index = self.collection.create_index(  # type: ignore
>                     "timestamp", expireAfterSeconds=self.ttl, background=True
>                 )
>             except (TypeError, ConfigurationError, ServerSelectionTimeoutError):
>                 pass
> 
178c184
<             self.db.logout()
---
>             self.db.logout()  # type: ignore
183a190,197
> 
>         if self.collection is None:
>             try:
>                 self._setup()
>             except ServerSelectionTimeoutError:
>                 pass
>         self._ensure_log4mongo_index()
> 
186,187c200,201
<                 getattr(self.collection, write_method)(self.format(record))
<             except Exception:
---
>                 self.collection.insert_one(self.format(record))
>             except Exception:  # pylint: disable=broad-except
191c205
<     def __exit__(self, type, value, traceback):
---
>     def __exit__(self, type, value, traceback):  # pylint: disable=redefined-builtin
196,203c210,231
< 
<     def __init__(self, level=logging.NOTSET, host='localhost', port=27017,
<                  database_name='logs', collection='logs',
<                  username=None, password=None, authentication_db='admin',
<                  fail_silently=False, formatter=None, capped=False,
<                  capped_max=1000, capped_size=1000000, reuse=True,
<                  buffer_size=100, buffer_periodical_flush_timing=5.0,
<                  buffer_early_flush_level=logging.CRITICAL, **kwargs):
---
>     def __init__(
>         self,
>         level=logging.NOTSET,
>         host="localhost",
>         port=27017,
>         database_name="logs",
>         collection="logs",
>         username=None,
>         password=None,
>         authentication_db="admin",
>         fail_silently=False,
>         formatter=None,
>         capped=False,
>         capped_max=1000,
>         capped_size=1000000,
>         reuse=True,
>         ttl=3600,
>         buffer_size=100,
>         buffer_periodical_flush_timing=5.0,
>         buffer_early_flush_level=logging.CRITICAL,
>         **kwargs,
>     ):
219,222c247,265
<         MongoHandler.__init__(self, level=level, host=host, port=port, database_name=database_name, collection=collection,
<                               username=username, password=password, authentication_db=authentication_db,
<                               fail_silently=fail_silently, formatter=formatter, capped=capped, capped_max=capped_max,
<                               capped_size=capped_size, reuse=reuse, **kwargs)
---
>         MongoHandler.__init__(
>             self,
>             level=level,
>             host=host,
>             port=port,
>             database_name=database_name,
>             collection=collection,
>             username=username,
>             password=password,
>             authentication_db=authentication_db,
>             fail_silently=fail_silently,
>             formatter=formatter,
>             capped=capped,
>             capped_max=capped_max,
>             capped_size=capped_size,
>             reuse=reuse,
>             ttl=ttl,
>             **kwargs,
>         )
227c270
<         self.last_record = None #kept for handling the error on flush
---
>         self.last_record = None  # kept for handling the error on flush
230c273,274
<         self._buffer_lock = None
---
>         self.buffer_lock = threading.RLock()
> 
237,238c281
<             import atexit
<             atexit.register(self.destroy)
---
>             import atexit  # pylint: disable=import-outside-toplevel
240,241c283
<             import threading
<             self._buffer_lock = threading.RLock()
---
>             atexit.register(self.destroy)
258c300,302
<             self._timer_stopper, self.buffer_timer_thread = call_repeatedly(self.buffer_periodical_flush_timing, self.flush_to_mongo)
---
>             self._timer_stopper, self.buffer_timer_thread = call_repeatedly(
>                 self.buffer_periodical_flush_timing, self.flush_to_mongo
>             )
262,263c306,315
< 
<         self.add_to_buffer(record)
---
>         if self.collection is None:
>             try:
>                 MongoHandler._setup(self)
>             except ServerSelectionTimeoutError:
>                 pass
>         MongoHandler._ensure_log4mongo_index(self)
> 
>         with self.buffer_lock:
>             self.last_record = record
>             self.buffer.append(self.format(record))
267,287d318
<         return
< 
<     def buffer_lock_acquire(self):
<         """Acquire lock on buffer (only if periodical flush is set)."""
<         if self._buffer_lock:
<             self._buffer_lock.acquire()
< 
<     def buffer_lock_release(self):
<         """Release lock on buffer (only if periodical flush is set)."""
<         if self._buffer_lock:
<             self._buffer_lock.release()
< 
<     def add_to_buffer(self, record):
<         """Add a formatted record to buffer."""
< 
<         self.buffer_lock_acquire()
< 
<         self.last_record = record
<         self.buffer.append(self.format(record))
< 
<         self.buffer_lock_release()
292,302c323,329
<             self.buffer_lock_acquire()
<             try:
< 
<                 getattr(self.collection, write_many_method)(self.buffer)
<                 self.empty_buffer()
< 
<             except Exception as e:
<                 if not self.fail_silently:
<                     self.handleError(self.last_record) #handling the error on flush
<             finally:
<                 self.buffer_lock_release()
---
>             with self.buffer_lock:
>                 try:
>                     self.collection.insert_many(self.buffer)
>                     self.empty_buffer()
>                 except Exception:  # pylint: disable=broad-except
>                     if not self.fail_silently:
>                         self.handleError(self.last_record)  # type: ignore
315,316d341
< 
< 

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions