python高性能代码之多线程优化

python

以常见的端口扫描器为实例

端口扫描器的原理很简单,操作socket来判断连接状态确定主机端口的开放情况。

import socket 

def scan(port):

s = socket.socket()

if s.connect_ex((\'localhost\', port)) == 0:

print port, \'open\'

s.close()

if __name__ == \'__main__\':

map(scan,range(1,65536))

这是一个socket扫描器的基本代码。

但是如果直接运行会等待很长时间都没有反应,这是因为socket是阻塞的,到等待每个连接超时后才会进入下一个连接。

给这段代码加一个超时

s.settimeout(0.1)

完整的代码如下

import socket 

def scan(port):

s = socket.socket()

s = settimeont(0.1)

if s.connect_ex((\'localhost\', port)) == 0:

print port, \'open\'

s.close()

if __name__ == \'__main__\':

map(scan,range(1,65536))

本文的重点不在于扫描器功能部分。而重点在于代码质量的提升和优化从而提升代码的运行效率。

多线程版本:

import socket 

import threading

def scan(port):

s = socket.socket()

s.settimeout(0.1)

if s.connect_ex((\'localhost\', port)) == 0:

print port, \'open\'

s.close()

if __name__ == \'__main__\':

threads = [threading.Thread(target=scan, args=(i,)) for i in xrange(1,65536)]

map(lambda x:x.start(),threads)

 

 Run起来,速度确实快了不少,但是抛出了异常:thread.error: can\'t start new thread

这个进程开启了65535个线程,有两种可能,一种是超过最大线程数了,一种是超过最大socket句柄数了。在linux可以通过ulimit来修改。
如果不修改最大限制,怎么用多线程不报错呢?
加个queue,变成生产者-消费者模式,开固定线程。

多线程+队列版本:

import socket 

import threading

from Queue import Queue

def scan(port):

s = socket.socket()

s.settimeout(0.1)

if s.connect_ex((\'localhost\', port)) == 0:

print port, \'open\'

s.close()

def worker():

while not q.empty():

port = q.get()

try:

scan(port)

finally:

q.task_done()

if __name__ == \'__main__\':

q = Queue()

map(q.put,xrange(1,65535))

threads = [threading.Thread(target=worker) for i in xrange(500)]

map(lambda x:x.start(),threads)

q.join()

 

开500个线程,不停的从队列中取出任务来进行...

 

multiprocessing + 队列版本:

总不能开65535个进程吧?还是用生产者消费者模式

import socket 
import multiprocessing

def scan(port):

s = socket.socket()

s.settimeout(0.1)

if s.connect_ex((\'localhost\', port)) == 0:

print port, \'open\'

s.close()

def worker(q):

while not q.empty():

port = q.get()

try:

scan(port)

finally:

q.task_done()

if __name__ == \'__main__\':

q = multiprocessing.JoinableQueue()

map(q.put,xrange(1,65535))

jobs = [multiprocessing.Process(target=worker, args=(q,)) for i in xrange(100)]

map(lambda x:x.start(),jobs)

 

注意这里把队列作为一个参数传入到worker中去,因为是process safe的queue,不然会报错。
还有用的是JoinableQueue(),顾名思义就是可以join()的。

gevent的spawn版本:

from gevent import monkey; monkey.patch_all(); 

import gevent

import socket

...

if __name__ == \'__main__\':

threads = [gevent.spawn(scan, i) for i in xrange(1,65536)]

gevent.joinall(threads)

 

注意monkey patch必须在被patch的东西之前import,不然会Exception KeyError.比如不能先import threading,再monkey patch.

gevent的Pool版本:

from gevent import monkey; monkey.patch_all(); 

import socket

from gevent.pool import Pool

...

if __name__ == \'__main__\':

pool = Pool(500)

pool.map(scan,xrange(1,65536))

pool.join()

 

concurrent.futures版本:

import socket 

from Queue import Queue

from concurrent.futures import ThreadPoolExecutor

...

if __name__ == \'__main__\':

q = Queue()

map(q.put,xrange(1,65536))

with ThreadPoolExecutor(max_workers=500) as executor:

for i in range(500):

executor.submit(worker,q)

 

以上是 python高性能代码之多线程优化 的全部内容, 来源链接: utcz.com/z/388240.html

回到顶部