背景:无论是自己爬虫爬取的IP代理还是购买的IP代理服务都会存在IP不稳定的情况。通过不断的校验来判断IP是否可用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python2.7
#coding:utf8
import 
os,time,sys
#import mymodul
#from lxml import etree
import 
re
import 
random
import 
requests,os
import 
sys
 
#每10分钟验证一次IP的可用性
def 
__extract_ipaddress(text_content):
    
result_list
=
[]
    
for 
line 
in 
text_content.split(
'\n'
):
        
#从每一行提取ip
        
m
=
re.search(
"((?:(?:25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))\.){3}(?:25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d))))"
,line)
        
ip,port
=
'
','
'
        
if 
m:
            
ip 
= 
m.group(
0
)
            
# 从包含IP的行提取端口
            
m
=
re.search(
"\D(\d{2,5})\D*"
,line[line.find(ip)
+
len
(ip):])
            
if 
m:
                
port
=
m.group(
1
)
                
result_list.append((ip, m.group(
1
)))
    
return 
result_list
 
#提取IP,验证黄页
def 
test_poxy((_ip,_port),test_url
=
'http://www.yp.net.cn/schinese//about/AboutCnYPol.asp'
):
    
import 
requests
    
try
:
        
session 
= 
requests.session()
        
session.proxies 
= 
{
'http'
'{}:{}'
.
format
(_ip,_port)}
        
headers 
= 
{
'User-Agent'
'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:33.0) Gecko/20100101 Firefox/33.0'
}
        
resp 
= 
session.get(test_url, timeout
=
10
, headers
=
headers)
        
#print len(resp.content)
    
except 
BaseException as e:
        
#print 'test_poxy error:',e
        
return 
(_ip,_port,
-
1
,
-
1
)
    
# if len(resp.content)<100000:
    
#     print '######',resp.content
    
return 
(_ip,_port,resp.elapsed.seconds,
len
(resp.content))
 
if 
__name__ 
=
= 
'__main__'
:
    
from 
multiprocessing.dummy 
import 
Pool as ThreadPool
    
#while 1:
    
#验证Proxy.txt中的IP,将可用IP写入proxy_ip.txt
    
with 
open
(
'/root/scrit/Proxy.txt'
,
'r'
) as f , 
open
(
'/root/scrit/proxy_ip.txt'
,
'wb'
) as fout:
        
ip_list 
= 
__extract_ipaddress(f.read())
        
#print('ipcount',len(ip_list))
        
pool
=
ThreadPool(
30
)
        
result
=
pool.
map
(test_poxy, ip_list)
        
pool.close()
        
pool.join()
        
result 
= 
sorted
(result, key
=
lambda 
d:d[
3
], reverse
=
True
)
        
result
=
set
(result)
        
for 
item 
in  
result:
            
if 
int
(item[
3
])>
7000
:
                
#print item
                
fout.write(
'{}:{}\n'
.
format
(item[
0
],item[
1
]))
        
fout.close()
        
#print mymodul.get_time_str(),u'一次检测结束'
        
#time.sleep(3)
    
Data 
= 
open
(
"/root/scrit/proxy_ip.txt"
).read()
    
langth 
= 
len
(Data)
    
if 
not 
langth:
        
get_IP()
    
else
:
        
sys.exit(
0
)