gevent初探

0x00 前言

有很多Python语言的协程库,如:tornado、asyncio等。这些库在使用时需要使用特定的语法,如:async/await,对于非协程的代码需要改写才能以协程方式运行。

gevent是一个特殊的协程库,它可以将非协程代码以协程方式运行,从而起到提升性能的作用。本文尝试分析一下它的实现原理。

0x01 使用案例

先看以下这段代码:

  1. import ctypes
  2. import sys
  3. import threading
  4. import time
  5. def gettid():
  6. if sys.platform == 'linux2':
  7. return ctypes.CDLL('libc.so.6').syscall(186)
  8. else:
  9. return ctypes.windll.kernel32.GetCurrentThreadId()
  10. def thread_test(index):
  11. time0 = time.time()
  12. while time.time() - time0 < 1:
  13. print('I\'m thread %d: %d %d' % (index, threading.current_thread().ident, gettid()))
  14. time.sleep(0.1)
  15. thread1 = threading.Thread(target=thread_test, args=(1,))
  16. thread1.start()
  17. thread2 = threading.Thread(target=thread_test, args=(2,))
  18. thread2.start()
  19. print('Main thread sleep')
  20. time.sleep(2)
  21. print('Main thread exit')
COPY

输出内容如下:

  1. I'm thread 1: 140540774946560 32347
  2. I'm thread 2: 140540766553856 32348
  3. Main thread sleep
  4. I'm thread 1: 140540774946560 32347
  5. I'm thread 2: 140540766553856 32348
  6. I'm thread 1: 140540774946560 32347
  7. I'm thread 2: 140540766553856 32348
  8. I'm thread 1: 140540774946560 32347
  9. I'm thread 2: 140540766553856 32348
  10. I'm thread 1: 140540774946560 32347
  11. I'm thread 2: 140540766553856 32348
  12. I'm thread 1: 140540774946560 32347
  13. I'm thread 2: 140540766553856 32348
  14. I'm thread 1: 140540774946560 32347
  15. I'm thread 2: 140540766553856 32348
  16. I'm thread 1: 140540774946560 32347
  17. I'm thread 2: 140540766553856 32348
  18. I'm thread 1: 140540774946560 32347
  19. I'm thread 2: 140540766553856 32348
  20. I'm thread 1: 140540774946560 32347
  21. I'm thread 2: 140540766553856 32348
  22. Main thread exit
COPY

在这段代码前面加上以下代码:

  1. from gevent import monkey
  2. monkey.patch_thread()
COPY

输出如下:

  1. I'm thread 1: 21069936 31623
  2. I'm thread 1: 21069936 31623
  3. I'm thread 1: 21069936 31623
  4. I'm thread 1: 21069936 31623
  5. I'm thread 1: 21069936 31623
  6. I'm thread 1: 21069936 31623
  7. I'm thread 1: 21069936 31623
  8. I'm thread 1: 21069936 31623
  9. I'm thread 1: 21069936 31623
  10. I'm thread 1: 21069936 31623
  11. I'm thread 2: 14522208 31623
  12. I'm thread 2: 14522208 31623
  13. I'm thread 2: 14522208 31623
  14. I'm thread 2: 14522208 31623
  15. I'm thread 2: 14522208 31623
  16. I'm thread 2: 14522208 31623
  17. I'm thread 2: 14522208 31623
  18. I'm thread 2: 14522208 31623
  19. I'm thread 2: 14522208 31623
  20. I'm thread 2: 14522208 31623
  21. Main thread sleep
  22. Main thread exit
COPY

可以看出,在加入gevent后,输出与之前有些不同,最大的区别是:两个线程具有相同的线程ID。也就是说,这两个线程其实是跑在同一个线程里的。

还有一点需要注意的地方:Main thread sleep这句输出在前后两次执行是不同的。

0x02 原理分析

来看下patch_thread函数的实现:

  1. def patch_thread(threading=True, _threading_local=True, Event=True, logging=True,
  2. existing_locks=True,
  3. _warnings=None):
  4. if threading:
  5. threading_mod = __import__('threading')
  6. # Capture the *real* current thread object before
  7. # we start returning DummyThread objects, for comparison
  8. # to the main thread.
  9. orig_current_thread = threading_mod.current_thread()
  10. else:
  11. threading_mod = None
  12. gevent_threading_mod = None
  13. orig_current_thread = None
  14. gevent_thread_mod, thread_mod = _patch_module('thread',
  15. _warnings=_warnings, _notify_did_subscribers=False)
  16. if threading:
  17. gevent_threading_mod, _ = _patch_module('threading',
  18. _warnings=_warnings, _notify_did_subscribers=False)
  19. if Event:
  20. from gevent.event import Event
  21. patch_item(threading_mod, 'Event', Event)
  22. # Python 2 had `Event` as a function returning
  23. # the private class `_Event`. Some code may be relying
  24. # on that.
  25. if hasattr(threading_mod, '_Event'):
  26. patch_item(threading_mod, '_Event', Event)
  27. if existing_locks:
  28. _patch_existing_locks(threading_mod)
  29. if logging and 'logging' in sys.modules:
  30. logging = __import__('logging')
  31. patch_item(logging, '_lock', threading_mod.RLock())
  32. for wr in logging._handlerList:
  33. # In py26, these are actual handlers, not weakrefs
  34. handler = wr() if callable(wr) else wr
  35. if handler is None:
  36. continue
  37. if not hasattr(handler, 'lock'):
  38. raise TypeError("Unknown/unsupported handler %r" % handler)
  39. handler.lock = threading_mod.RLock()
  40. if _threading_local:
  41. _threading_local = __import__('_threading_local')
  42. from gevent.local import local
  43. patch_item(_threading_local, 'local', local)
  44. def make_join_func(thread, thread_greenlet):
  45. from gevent.hub import sleep
  46. from time import time
  47. def join(timeout=None):
  48. end = None
  49. if threading_mod.current_thread() is thread:
  50. raise RuntimeError("Cannot join current thread")
  51. if thread_greenlet is not None and thread_greenlet.dead:
  52. return
  53. if not thread.is_alive():
  54. return
  55. if timeout:
  56. end = time() + timeout
  57. while thread.is_alive():
  58. if end is not None and time() > end:
  59. return
  60. sleep(0.01)
  61. return join
  62. if threading:
  63. from gevent.threading import main_native_thread
  64. for thread in threading_mod._active.values():
  65. if thread == main_native_thread():
  66. continue
  67. thread.join = make_join_func(thread, None)
  68. if sys.version_info[:2] >= (3, 4):
  69. # Issue 18808 changes the nature of Thread.join() to use
  70. # locks. This means that a greenlet spawned in the main thread
  71. # (which is already running) cannot wait for the main thread---it
  72. # hangs forever. We patch around this if possible. See also
  73. # gevent.threading.
  74. greenlet = __import__('greenlet')
  75. if orig_current_thread == threading_mod.main_thread():
  76. main_thread = threading_mod.main_thread()
  77. _greenlet = main_thread._greenlet = greenlet.getcurrent()
  78. main_thread.join = make_join_func(main_thread, _greenlet)
  79. # Patch up the ident of the main thread to match. This
  80. # matters if threading was imported before monkey-patching
  81. # thread
  82. oldid = main_thread.ident
  83. main_thread._ident = threading_mod.get_ident()
  84. if oldid in threading_mod._active:
  85. threading_mod._active[main_thread.ident] = threading_mod._active[oldid]
  86. if oldid != main_thread.ident:
  87. del threading_mod._active[oldid]
  88. else:
  89. _queue_warning("Monkey-patching not on the main thread; "
  90. "threading.main_thread().join() will hang from a greenlet",
  91. _warnings)
  92. from gevent import events
  93. _notify_patch(events.GeventDidPatchModuleEvent('thread', gevent_thread_mod, thread_mod))
  94. _notify_patch(events.GeventDidPatchModuleEvent('threading', gevent_threading_mod, threading_mod))
COPY

首先,orig_current_thread变量存储了当前的线程对象,然后调用_patch_module函数patchthreadingthread模块。

_patch_module函数相关实现如下:

  1. def __call_module_hook(gevent_module, name, module, items, _warnings):
  2. # This function can raise DoNotPatch on 'will'
  3. def warn(message):
  4. _queue_warning(message, _warnings)
  5. func_name = '_gevent_' + name + '_monkey_patch'
  6. try:
  7. func = getattr(gevent_module, func_name)
  8. except AttributeError:
  9. func = lambda *args: None
  10. func(module, items, warn)
  11. def patch_item(module, attr, newitem):
  12. olditem = getattr(module, attr, _NONE)
  13. if olditem is not _NONE:
  14. saved.setdefault(module.__name__, {}).setdefault(attr, olditem)
  15. setattr(module, attr, newitem)
  16. def patch_module(target_module, source_module, items=None,
  17. _warnings=None,
  18. _notify_did_subscribers=True):
  19. """
  20. patch_module(target_module, source_module, items=None)
  21. Replace attributes in *target_module* with the attributes of the
  22. same name in *source_module*.
  23. The *source_module* can provide some attributes to customize the process:
  24. * ``__implements__`` is a list of attribute names to copy; if not present,
  25. the *items* keyword argument is mandatory.
  26. * ``_gevent_will_monkey_patch(target_module, items, warn, **kwargs)``
  27. * ``_gevent_did_monkey_patch(target_module, items, warn, **kwargs)``
  28. These two functions in the *source_module* are called *if* they exist,
  29. before and after copying attributes, respectively. The "will" function
  30. may modify *items*. The value of *warn* is a function that should be called
  31. with a single string argument to issue a warning to the user. If the "will"
  32. function raises :exc:`gevent.events.DoNotPatch`, no patching will be done. These functions
  33. are called before any event subscribers or plugins.
  34. :keyword list items: A list of attribute names to replace. If
  35. not given, this will be taken from the *source_module* ``__implements__``
  36. attribute.
  37. :return: A true value if patching was done, a false value if patching was canceled.
  38. .. versionadded:: 1.3b1
  39. """
  40. from gevent import events
  41. if items is None:
  42. items = getattr(source_module, '__implements__', None)
  43. if items is None:
  44. raise AttributeError('%r does not have __implements__' % source_module)
  45. try:
  46. __call_module_hook(source_module, 'will', target_module, items, _warnings)
  47. _notify_patch(
  48. events.GeventWillPatchModuleEvent(target_module.__name__, source_module,
  49. target_module, items),
  50. _warnings)
  51. except events.DoNotPatch:
  52. return False
  53. for attr in items:
  54. patch_item(target_module, attr, getattr(source_module, attr))
  55. __call_module_hook(source_module, 'did', target_module, items, _warnings)
  56. if _notify_did_subscribers:
  57. # We allow turning off the broadcast of the 'did' event for the benefit
  58. # of our internal functions which need to do additional work (besides copying
  59. # attributes) before their patch can be considered complete.
  60. _notify_patch(
  61. events.GeventDidPatchModuleEvent(target_module.__name__, source_module,
  62. target_module)
  63. )
  64. return True
  65. def _patch_module(name, items=None, _warnings=None, _notify_did_subscribers=True):
  66. gevent_module = getattr(__import__('gevent.' + name), name)
  67. module_name = getattr(gevent_module, '__target__', name)
  68. target_module = __import__(module_name)
  69. patch_module(target_module, gevent_module, items=items,
  70. _warnings=_warnings,
  71. _notify_did_subscribers=_notify_did_subscribers)
  72. return gevent_module, target_module
COPY

这里比较关键的逻辑有两个:

  • 调用__call_module_hook执行目标模块中的_gevent_will_monkey_patch_gevent_did_monkey_patch方法

  • 调用patch_item将原始模块中的指定方法替换为gevent对应模块中的同名方法

可以看出,patch的关键逻辑就是由patch_item实现的,具体要patch的对象是由gevent模块中的__implements__列表指定的。

0x03 threading模块patch分析

对于threading模块,对应模块就是gevent.threading,里面实现了_gevent_will_monkey_patch方法,在hook前做一些准备工作。

  1. import threading as __threading__
  2. def _gevent_will_monkey_patch(native_module, items, warn): # pylint:disable=unused-argument
  3. # Make sure the MainThread can be found by our current greenlet ID,
  4. # otherwise we get a new DummyThread, which cannot be joined.
  5. # Fixes tests in test_threading_2 under PyPy.
  6. main_thread = main_native_thread()
  7. if __threading__.current_thread() != main_thread:
  8. warn("Monkey-patching outside the main native thread. Some APIs "
  9. "will not be available. Expect a KeyError to be printed at shutdown.")
  10. return
  11. if _get_ident() not in __threading__._active:
  12. main_id = main_thread.ident
  13. del __threading__._active[main_id]
  14. main_thread._ident = main_thread._Thread__ident = _get_ident()
  15. __threading__._active[_get_ident()] = main_thread
COPY

gevent.threading模块中要patch的列表是:

  1. __implements__ = [
  2. 'local',
  3. '_start_new_thread',
  4. '_allocate_lock',
  5. 'Lock',
  6. '_get_ident',
  7. '_sleep',
  8. '_DummyThread',
  9. ]
COPY

而在gevent.thread模块中则是:

  1. __implements__ = [
  2. 'allocate_lock',
  3. 'get_ident',
  4. 'exit',
  5. 'LockType',
  6. 'stack_size',
  7. 'start_new_thread',
  8. '_local'
  9. ]
COPY

我们来看下threading模块在启动线程过程中做了哪些操作。

  1. +------------------------+
  2. | |
  3. | threading.Thread.start |
  4. | |
  5. +-----------+------------+
  6. |
  7. |
  8. v
  9. +-------------+---------------+
  10. | |
  11. | threading._start_new_thread |
  12. | |
  13. +-------------+---------------+
  14. |
  15. |
  16. |
  17. v
  18. +-----------+-------------+
  19. | |
  20. | thread.start_new_thread |
  21. | |
  22. +-------------------------+
COPY

上面的threading._start_new_threadthread.start_new_thread都是在gevent的__implements__列表中的。也就是说:这两个函数都被gevent hook了。

  1. from gevent.thread import start_new_thread as _start_new_thread
COPY

而由于gevent.threading._start_new_thread其实就是gevent.thread.start_new_thread,所以它们其实是同一个函数。

  1. def start_new_thread(function, args=(), kwargs=None):
  2. if kwargs is not None:
  3. greenlet = Greenlet.spawn(function, *args, **kwargs)
  4. else:
  5. greenlet = Greenlet.spawn(function, *args)
  6. return get_ident(greenlet)
COPY

start_new_thread函数可以看出,启动线程逻辑已经被替换成了greenlet的实现,这也是为什么使用了patch_thread后,真实线程id相同的原因。

0x04 总结

gevent使用动态patch的方法,实现了动态将非协程库变成协程库的功能,在极少修改代码的前提下提升了程序的性能。但是,在上面线程的例子中可以看出,patch后程序的行为可能会有一些差异,所以在使用上还是需要小心。

分享
0 comments
Anonymous
Markdown is supported

Be the first guy leaving a comment!