Python多线程Request问题
我在实现一个简单的需求,验证邮箱是否已经注册,向目标服务器发送一个请求后,如果返回204是未注册,返回409是已注册,待检测的邮箱存放在txt文件中,读取后做为列表,遍历执行,我使用request实现,并使用了代理,但是一个一个发送请求太慢,请问如何多线程并发执行?
- 如何使用多线程
- 对于待检测的邮箱,如何避免多线程同时检测一个邮箱,或者重复检测,希望能做到,每个线程都能检测到未检测的邮箱,已检测的邮箱不会被读取,应该是使用队列实现?
下面的是代码,部分信息打码。
import requestsimport time
import json
class Validator(object):
def __init__(self):
self.headers = {
}
self.params = (
)
def run(self,name,proxies):
data = '{"emailAddress":"%s"}' %name
url = ''
response = requests.post(url=url,headers=self.headers,params=self.params,data=data,timeout=9,proxies=proxies)
#print(response.status_code)
if response.status_code == 204:
result = '{} 未注册 HTTP返回:{}'.format(name, response.status_code)
elif response.status_code == 409:
result = '{} 已注册 HTTP返回:{}'.format(name, response.status_code)
else:
result = '{} 检测异常 HTTP返回:{}'.format(name, response.status_code)
#print(result)
return result
# 以列表的形式读取待查询的帐号
def get_list():
with open ('unvalidated.txt', 'r') as f:
email_list = [i.strip() for i in f.readlines()]
return email_list
# 返回代理
def get_proxy(retry=3):
start = 0
while start <= retry:
res = requests.get("")
dic_info = res.text
dic_info = json.loads(dic_info)
status_code = dic_info['code']
if status_code != '0':
start += 1
time.sleep(0.5)
continue
data = dic_info['obj']
ip = data[0]["ip"]
port = data[0]["port"]
ip_port = 'http://{}:{}'.format(ip,port)
return {'http': ip_port}
return None
if __name__=='__main__':
app = Validator()
proxies = get_proxy()
# 循环查询
for email in get_list():
# 使用try,避免错误导致被挂起
try:
#proxies = get_proxy()
# 定义验证时间
validating_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
# 定义打印格式
info = '检测时间:{} {}'.format(validating_time, app.run(email,proxies))
print(info)
# 写入日志文件
with open('validate_log.txt', 'a+') as f:
f.write(info + '\n')
# 设置查询延迟
time.sleep(0.5)
except requests.exceptions.RequestException as e:
info = '检测时间:{} {} 检测错误 错误信息:{}'.format(validating_time, email, e)
print(info)
# 写入日志文件
with open('validate_log.txt', 'a+') as f:
f.write(info + '\n')
time.sleep(0.5)
回答:
from concurrent.futures import ThreadPoolExecutor, as_completedpool = ThreadPoolExecutor(4) # 4 threads
EMAILS = [
# some emails here
]
def check(email):
"""
if the email is registered, return None, else return the email
"""
# do something
def save(email):
"""
save the email
"""
# do something
def main():
tasks = [pool.submit(check, email) for email in EMAILS]
for task in as_completed(tasks):
check_result = task.result()
if check_result is None:
continue
save(check_result)
if __name__ == '__main__':
main()
自己再改改吧
回答:
可以将请求与处理过程,扔到线程池中执行。
处理结果最好是append到一个列表中,最后全部线程执行结束后再处理这个列表。
因为多线程下文件写操作是不安全的
回答:
我按照 @听完这一年 的答案,修改了代码,已经完成了主体功能,在50-100个线程的时候,完成的速度是以往的10倍不止,但是我发现,每次检测结果都比待检测见过要少,原因是因为一些原因(例如timeout)等,出错,导致线程被挂起,我尝试使用try,问题依旧存在,是不是要写在main()的循环提中呢?如果写在循环体中,如何传递email参数来得知是哪个帐号检测错误?万分感谢解答。
import requestsimport json
import time
from concurrent.futures import ThreadPoolExecutor, as\_completed
pool = ThreadPoolExecutor(50)
def get\_email():
with open ('unvalidated.txt', 'r') as f:
email\_list \= \[i.strip() for i in f.readlines()\]
return email\_list
def get\_proxy(retry\=3):
start \= 0
while start <= retry:
res \= requests.get("")
dic\_info \= res.text
dic\_info \= json.loads(dic\_info)
status\_code \= dic\_info\['code'\]
if status\_code != '0':
start += 1
time.sleep(0.5)
continue
data \= dic\_info\['obj'\]
ip \= data\[0\]\["ip"\]
port \= data\[0\]\["port"\]
ip\_port \= 'http://{}:{}'.format(ip,port)
return {'http': ip\_port}
return None
def check(email,proxies):
headers \= {
}
params \= (
)
url \= '1'
data \= '{"emailAddress": "%s"}' % email
response \= requests.post(url\=url,params\=params,headers\=headers,data\=data,proxies\=proxies,timeout\=8)
try:
if response.status\_code \== 204:
print('未注册-{}'.format(email))
return '未注册-{}'.format(email)
elif response.status\_code \== 409:
print('已注册-{}'.format(email))
return '已注册-{}'.format(email)
else:
print('连接错误-{}'.format(email))
return '连接错误-{}'.format(email)
except requests.exceptions.RequestException:
print('连接失败-{}'.format(email))
return '连接失败-{}'.format(email)
def save(result):
with open('validate\_log.txt', 'a+') as f:
f.write(result + '\\n')
def main():
tasks \= \[pool.submit(check, email, proxies) for email in EMAILS\]
for task in as\_completed(tasks):
check\_result \= task.result()
save(check\_result)
if \_\_name\_\_ \== '\_\_main\_\_':
proxies \= get\_proxy()
EMAILS \= get\_email()
main()
回答:
我觉得可以考虑用协程
python">import httpximport asyncio
location_email_list = ['1@qq.com', '2@163.com']
registered_email_list = []
not_registered_email_list = []
error_email_list = []
# 不考虑网络错误
async def task(client):
while len(location_email_list)>0:
email:str = location_email_list.pop()
response = await client.get('query_url', params={'email':email})
code:int = response.status_code
if code==204:
not_registered_email_list.append(email)
elif code==409:
registered_email_list.append(email)
else:
error_email_list.append(email)
async def main():
proxies = {
"http": "http://127.0.0.1:3080",
"https": "http://127.0.0.1:3081",
}
async with httpx.AsyncClient(proxies=proxies) as client:
task_list = [task(client), task(client), task(client), task(client), ]
await asyncio.gather(*task_list)
asyncio.run(main())
以上是 Python多线程Request问题 的全部内容, 来源链接: utcz.com/a/165182.html