Python多线程Request问题

Python多线程Request问题

我在实现一个简单的需求,验证邮箱是否已经注册,向目标服务器发送一个请求后,如果返回204是未注册,返回409是已注册,待检测的邮箱存放在txt文件中,读取后做为列表,遍历执行,我使用request实现,并使用了代理,但是一个一个发送请求太慢,请问如何多线程并发执行?

  1. 如何使用多线程
  2. 对于待检测的邮箱,如何避免多线程同时检测一个邮箱,或者重复检测,希望能做到,每个线程都能检测到未检测的邮箱,已检测的邮箱不会被读取,应该是使用队列实现?

下面的是代码,部分信息打码。

import requests

import time

import json

class Validator(object):

def __init__(self):

self.headers = {

}

self.params = (

)

def run(self,name,proxies):

data = '{"emailAddress":"%s"}' %name

url = ''

response = requests.post(url=url,headers=self.headers,params=self.params,data=data,timeout=9,proxies=proxies)

#print(response.status_code)

if response.status_code == 204:

result = '{} 未注册 HTTP返回:{}'.format(name, response.status_code)

elif response.status_code == 409:

result = '{} 已注册 HTTP返回:{}'.format(name, response.status_code)

else:

result = '{} 检测异常 HTTP返回:{}'.format(name, response.status_code)

#print(result)

return result

# 以列表的形式读取待查询的帐号

def get_list():

with open ('unvalidated.txt', 'r') as f:

email_list = [i.strip() for i in f.readlines()]

return email_list

# 返回代理

def get_proxy(retry=3):

start = 0

while start <= retry:

res = requests.get("")

dic_info = res.text

dic_info = json.loads(dic_info)

status_code = dic_info['code']

if status_code != '0':

start += 1

time.sleep(0.5)

continue

data = dic_info['obj']

ip = data[0]["ip"]

port = data[0]["port"]

ip_port = 'http://{}:{}'.format(ip,port)

return {'http': ip_port}

return None

if __name__=='__main__':

app = Validator()

proxies = get_proxy()

# 循环查询

for email in get_list():

# 使用try,避免错误导致被挂起

try:

#proxies = get_proxy()

# 定义验证时间

validating_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))

# 定义打印格式

info = '检测时间:{} {}'.format(validating_time, app.run(email,proxies))

print(info)

# 写入日志文件

with open('validate_log.txt', 'a+') as f:

f.write(info + '\n')

# 设置查询延迟

time.sleep(0.5)

except requests.exceptions.RequestException as e:

info = '检测时间:{} {} 检测错误 错误信息:{}'.format(validating_time, email, e)

print(info)

# 写入日志文件

with open('validate_log.txt', 'a+') as f:

f.write(info + '\n')

time.sleep(0.5)


回答:

from concurrent.futures import ThreadPoolExecutor, as_completed

pool = ThreadPoolExecutor(4) # 4 threads

EMAILS = [

# some emails here

]

def check(email):

"""

if the email is registered, return None, else return the email

"""

# do something

def save(email):

"""

save the email

"""

# do something

def main():

tasks = [pool.submit(check, email) for email in EMAILS]

for task in as_completed(tasks):

check_result = task.result()

if check_result is None:

continue

save(check_result)

if __name__ == '__main__':

main()

自己再改改吧


回答:

可以将请求与处理过程,扔到线程池中执行。
处理结果最好是append到一个列表中,最后全部线程执行结束后再处理这个列表。
因为多线程下文件写操作是不安全的


回答:

我按照 @听完这一年 的答案,修改了代码,已经完成了主体功能,在50-100个线程的时候,完成的速度是以往的10倍不止,但是我发现,每次检测结果都比待检测见过要少,原因是因为一些原因(例如timeout)等,出错,导致线程被挂起,我尝试使用try,问题依旧存在,是不是要写在main()的循环提中呢?如果写在循环体中,如何传递email参数来得知是哪个帐号检测错误?万分感谢解答。

import requests

import json

import time

from concurrent.futures import ThreadPoolExecutor, as\_completed

pool = ThreadPoolExecutor(50)

def get\_email():

with open ('unvalidated.txt', 'r') as f:

email\_list \= \[i.strip() for i in f.readlines()\]

return email\_list

def get\_proxy(retry\=3):

start \= 0

while start <= retry:

res \= requests.get("")

dic\_info \= res.text

dic\_info \= json.loads(dic\_info)

status\_code \= dic\_info\['code'\]

if status\_code != '0':

start += 1

time.sleep(0.5)

continue

data \= dic\_info\['obj'\]

ip \= data\[0\]\["ip"\]

port \= data\[0\]\["port"\]

ip\_port \= 'http://{}:{}'.format(ip,port)

return {'http': ip\_port}

return None

def check(email,proxies):

headers \= {

}

params \= (

)

url \= '1'

data \= '{"emailAddress": "%s"}' % email

response \= requests.post(url\=url,params\=params,headers\=headers,data\=data,proxies\=proxies,timeout\=8)

try:

if response.status\_code \== 204:

print('未注册-{}'.format(email))

return '未注册-{}'.format(email)

elif response.status\_code \== 409:

print('已注册-{}'.format(email))

return '已注册-{}'.format(email)

else:

print('连接错误-{}'.format(email))

return '连接错误-{}'.format(email)

except requests.exceptions.RequestException:

print('连接失败-{}'.format(email))

return '连接失败-{}'.format(email)

def save(result):

with open('validate\_log.txt', 'a+') as f:

f.write(result + '\\n')

def main():

tasks \= \[pool.submit(check, email, proxies) for email in EMAILS\]

for task in as\_completed(tasks):

check\_result \= task.result()

save(check\_result)

if \_\_name\_\_ \== '\_\_main\_\_':

proxies \= get\_proxy()

EMAILS \= get\_email()

main()


回答:

我觉得可以考虑用协程

python">import httpx

import asyncio

location_email_list = ['1@qq.com', '2@163.com']

registered_email_list = []

not_registered_email_list = []

error_email_list = []

# 不考虑网络错误

async def task(client):

while len(location_email_list)>0:

email:str = location_email_list.pop()

response = await client.get('query_url', params={'email':email})

code:int = response.status_code

if code==204:

not_registered_email_list.append(email)

elif code==409:

registered_email_list.append(email)

else:

error_email_list.append(email)

async def main():

proxies = {

"http": "http://127.0.0.1:3080",

"https": "http://127.0.0.1:3081",

}

async with httpx.AsyncClient(proxies=proxies) as client:

task_list = [task(client), task(client), task(client), task(client), ]

await asyncio.gather(*task_list)

asyncio.run(main())

以上是 Python多线程Request问题 的全部内容, 来源链接: utcz.com/a/165182.html

回到顶部