使用协程和线程时的吞吐量差异
几天前,我问了一个关于如何帮助我设计构建多个HTTP请求的范例的问题
这是场景。 我希望有一个多生产者,多消费者体系。 我的制作人员抓取并抓取几个网站,并将其找到的链接添加到队列中。 由于我要抓取多个网站,因此我希望有多个生产者/抓取工具。
消费者/员工馈送此队列,向这些链接发出TCP / UDP请求,并将结果保存到我的Django数据库。 由于每个队列项目是完全独立的,因此我还想拥有多个工作人员。
人们建议使用一个协程库来完成这个即Gevent或Eventlet。 从未使用过协程,我发现即使编程范例与线程范例类似,但只有一个线程正在执行,但在阻塞调用时(例如I / O调用),堆栈会切换到内存中,而另一个绿色线程接管直到它遇到某种阻塞的I / O调用。 希望我得到这个权利? 以下是我的一篇SO帖子的代码:
import gevent
from gevent.queue import *
import time
import random
q = JoinableQueue()
workers = []
producers = []
def do_work(wid, value):
gevent.sleep(random.randint(0,2))
print 'Task', value, 'done', wid
def worker(wid):
while True:
item = q.get()
try:
print "Got item %s" % item
do_work(wid, item)
finally:
print "No more items"
q.task_done()
def producer():
while True:
item = random.randint(1, 11)
if item == 10:
print "Signal Received"
return
else:
print "Added item %s" % item
q.put(item)
for i in range(4):
workers.append(gevent.spawn(worker, random.randint(1, 100000)))
# This doesn't work.
for j in range(2):
producers.append(gevent.spawn(producer))
# Uncommenting this makes this script work.
# producer()
q.join()
这很好,因为sleep
呼叫阻塞呼叫,当发生sleep
事件时,另一个绿色线程接管。 这比顺序执行要快得多。 正如你所看到的,我没有任何代码在我的程序中故意让一个线程执行到另一个线程。 我没有看到这是如何适应上面的情况,因为我想让所有线程同时执行。
所有的工作都很好,但是我感觉我使用Gevent / Eventlets获得的吞吐量高于原始的顺序运行程序,但是大大低于使用真实线程可以实现的吞吐量。
如果我要使用线程机制重新实现我的程序,那么我的每个生产者和消费者都可以同时工作,而无需像协同程序那样交换堆栈。
这应该使用线程重新实现吗? 我的设计错了吗? 我没有看到使用协程的真正好处。
也许我的概念很浑浊,但这就是我已经同化的东西。 任何对我的范例和概念的帮助或澄清都会很棒。
谢谢
正如你所看到的,我没有任何代码在我的程序中故意让一个线程执行到另一个线程。 我没有看到这是如何适应上面的情况,因为我想让所有线程同时执行。
有一个操作系统线程,但有几个greenlet。 在你的情况下, gevent.sleep()
允许工作人员同时执行。 如果使用修补的urllib2
与gevent
配合gevent
(通过调用gevent.monkey.patch_*()
),阻塞IO调用(如urllib2.urlopen(url).read()
执行相同的操作。
另请参阅关于协程和并发的好奇课程,以了解代码在单线程环境中如何同时工作。
为了比较gevent,线程,多处理之间的吞吐量差异,您可以编写与所有aproaches兼容的代码:
#!/usr/bin/env python
concurrency_impl = 'gevent' # single process, single thread
##concurrency_impl = 'threading' # single process, multiple threads
##concurrency_impl = 'multiprocessing' # multiple processes
if concurrency_impl == 'gevent':
import gevent.monkey; gevent.monkey.patch_all()
import logging
import time
import random
from itertools import count, islice
info = logging.info
if concurrency_impl in ['gevent', 'threading']:
from Queue import Queue as JoinableQueue
from threading import Thread
if concurrency_impl == 'multiprocessing':
from multiprocessing import Process as Thread, JoinableQueue
对于所有并发实现,脚本的其余部分是相同的:
def do_work(wid, value):
time.sleep(random.randint(0,2))
info("%d Task %s done" % (wid, value))
def worker(wid, q):
while True:
item = q.get()
try:
info("%d Got item %s" % (wid, item))
do_work(wid, item)
finally:
q.task_done()
info("%d Done item %s" % (wid, item))
def producer(pid, q):
for item in iter(lambda: random.randint(1, 11), 10):
time.sleep(.1) # simulate a green blocking call that yields control
info("%d Added item %s" % (pid, item))
q.put(item)
info("%d Signal Received" % (pid,))
不要在模块级执行代码将其放在main()
:
def main():
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(process)d %(message)s")
q = JoinableQueue()
it = count(1)
producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)]
workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)]
for t in producers+workers:
t.daemon = True
t.start()
for t in producers: t.join() # put items in the queue
q.join() # wait while it is empty
# exit main thread (daemon workers die at this point)
if __name__=="__main__":
main()
当你有很多(绿色)线程时,gevent非常棒。 我用数千测试过它,它工作得很好。 你必须确保你使用的所有库都用于抓取和保存到数据库。 afaik如果他们使用Python的套接字,gevent注入应该工作。 用C编写的扩展(例如mysqldb)将会阻止,而你需要使用绿色的等价物。
如果你使用gevent,你可以主要消除队列,为每个任务产生新的(绿色)线程,线程的代码和db.save(web.get(address))
一样简单。 当db或web块中的某些库时,gevent会照顾抢占。 只要你的任务适合记忆,它就会工作。
在这种情况下,您的问题不在于程序速度(即选择gevent或线程),而是网络IO吞吐量。 这是(应该是)决定程序运行速度的瓶颈。
Gevent是确保这是瓶颈的好方法,而不是程序的体系结构。
这是你想要的那种过程:
import gevent
from gevent.queue import Queue, JoinableQueue
from gevent.monkey import patch_all
patch_all() # Patch urllib2, etc
def worker(work_queue, output_queue):
for work_unit in work_queue:
finished = do_work(work_unit)
output_queue.put(finished)
work_queue.task_done()
def producer(input_queue, work_queue):
for url in input_queue:
url_list = crawl(url)
for work in url_list:
work_queue.put(work)
input_queue.task_done()
def do_work(work):
gevent.sleep(0) # Actually proces link here
return work
def crawl(url):
gevent.sleep(0)
return list(url) # Actually process url here
input = JoinableQueue()
work = JoinableQueue()
output = Queue()
workers = [gevent.spawn(worker, work, output) for i in range(0, 10)]
producers = [gevent.spawn(producer, input, work) for i in range(0, 10)]
list_of_urls = ['foo', 'bar']
for url in list_of_urls:
input.put(url)
# Wait for input to finish processing
input.join()
print 'finished producing'
# Wait for workers to finish processing work
work.join()
print 'finished working'
# We now have output!
print 'output:'
for message in output:
print message
# Or if you'd like, you could use the output as it comes!
你不需要等待输入和工作队列完成,我刚刚在这里证明了这一点。
链接地址: http://www.djcxy.com/p/53185.html上一篇: Throughput differences when using coroutines vs threading