Python实现号码归属地查询功能
作者:AKAD5 发布时间:2023-09-20 13:31:40
标签:Python,号码,归属地,查询
使用场景
对手机号码进行地域分析,需要查询归属地;
问题描述
针对数据集比较大的情况,通过脚本来处理,使用多线程的方法来加快查询速度
pool = multiprocessing.Pool(processes=pool_count)
for i in data_cut(data,pool_count):
data_log_list.append(pool.apply_async(main, (i,)))
pool.close()
pool.join()
解决方案
创建一个 pool 进程池,然后通过data_cut将数据读取并且等分成数据组,设置好pool_count进程数量就可以开始,每个数据组独立查询然后将结果汇总给push_log进行最终处理,保存为csv文件。
# 电话号码归属地查询
import os
import sys
import time
import json
import warnings
import pandas as pd
import multiprocessing
from phone import Phone
warnings.filterwarnings("ignore")
path = os.path.abspath(".")
def data_cut(data_list,data_cut=4):
#将任务拆分,建议拆分数为CPU核心数,默认为4
#分组数据,分组间隔
data_all=[]
if data_cut > len(data_list):
data_cut = len(data_list)
data_cut_num = int((len(data_list)+1)/data_cut)
for i in range(1,data_cut+1):
if i < data_cut:
data_1=data_list[data_cut_num*(i-1):data_cut_num*i]
else:
data_1=data_list[data_cut_num*(i-1):]
data_all.append(data_1)
return data_all
def push_log(data_log_list,file_name):
data_all = []
data_list = [i.get() for i in data_log_list]
for i in data_list:
for j in i:
data_all.append(j)
data_all = pd.DataFrame(data_all)
data_all.to_csv(path + "/phone_{}.csv".format(file_name),index=False,encoding='gbk')
print('成功查询:',data_all.shape[0])
def main(data):
resp = []
for i in data:
try:
if type(Phone().find(i)) == dict:
resp.append(Phone().find(i))
except:
pass
return resp
if __name__ == '__main__':
start_time= time.time()
file_name = name = sys.argv[1]
data = pd.read_table(path + "/{}".format(file_name),header=None)
data=list(data[0])
pool_count = 12
data_log_list = []
pool = multiprocessing.Pool(processes=pool_count)
for i in data_cut(data,pool_count):
data_log_list.append(pool.apply_async(main, (i,)))
pool.close()
pool.join()
push_log(data_log_list,file_name)
print(time.time()-start_time)
方法补充
除了上文的方法,小编还为大家整理了一些其他Python号码归属地查询的方法,需要的可以参考一下
方法一:
import requests
def get_callerloc(phone):
url = f"https://www.qvdv.com/tools/qvdv-api-mobile.html?f=json&mobile={phone}"
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36',
}
response = requests.request("GET", url, headers=headers).json()
return response["message"]
if __name__ == '__main__':
phone = input("请输入手机号(查询归属地):")
res = get_callerloc(phone)
print("手机号码:"+res["mobile"])
print("归属地:"+res["province"])
print("运营商:"+res["supplier"])
截图
方法二
GUI
from tkinter import *
from tkinter import ttk
from tkinter import messagebox
from Get_Attr import Get_Infos
import re
import threading
class App:
def __init__(self):
self.root=Tk()
self.root.title('手机号码归属地查询-v1.0')
self.root.resizable(0,0)
width=410
height=390
left=(self.root.winfo_screenwidth()-width)/2
top=(self.root.winfo_screenheight()-height)/2
self.root.geometry('%dx%d+%d+%d'%(width,height,left,top))
self.create_widet()
self.set_widget()
self.place_widget()
self.root.mainloop()
def create_widet(self):
self.l1=ttk.Label(self.root)
self.e1=ttk.Entry(self.root)
self.b1=ttk.Button(self.root)
self.lf=ttk.LabelFrame(self.root)
self.l2=ttk.Label(self.lf)
self.e2=ttk.Entry(self.lf)
self.l3=ttk.Label(self.lf)
self.e3=ttk.Entry(self.lf)
self.l4=ttk.Label(self.lf)
self.e4=ttk.Entry(self.lf)
self.l5=ttk.Label(self.lf)
self.e5=ttk.Entry(self.lf)
self.l6=ttk.Label(self.lf)
self.e6=ttk.Entry(self.lf)
self.l7=ttk.Label(self.lf)
self.e7=ttk.Entry(self.lf)
self.b1.config(command=lambda:self.thread_it(self.search_infos))
def set_widget(self):
self.e2_var=StringVar()
self.e3_var=StringVar()
self.e4_var=StringVar()
self.e5_var=StringVar()
self.e6_var=StringVar()
self.e7_var=StringVar()
self.l1.config(text='请输入手机号:')
self.b1.config(text='查询')
self.lf.config(text='查询结果')
self.l2.config(text='手机号码:')
self.l3.config(text='所属省份:')
self.l4.config(text='所属城市:')
self.l5.config(text='区 号:')
self.l6.config(text='邮 编:')
self.l7.config(text='类 型:')
#将字符串变量绑定Entry组件
self.e2.config(textvariable=self.e2_var)
self.e3.config(textvariable=self.e3_var)
self.e4.config(textvariable=self.e4_var)
self.e5.config(textvariable=self.e5_var)
self.e6.config(textvariable=self.e6_var)
self.e7.config(textvariable=self.e7_var)
self.root.bind('<Escape>',self.escape)
self.root.bind('<Return>',self.do_search)
def place_widget(self):
self.l1.place(x=30,y=20)
self.e1.place(x=130,y=20)
self.b1.place(x=290,y=20)
self.lf.place(x=30,y=60,width=350,height=300)
self.l2.place(x=60,y=10)
self.e2.place(x=150,y=10)
self.l3.place(x=60,y=50)
self.e3.place(x=150,y=50)
self.l4.place(x=60,y=90)
self.e4.place(x=150,y=90)
self.l5.place(x=60,y=130)
self.e5.place(x=150,y=130)
self.l6.place(x=60,y=170)
self.e6.place(x=150,y=170)
self.l7.place(x=60,y=210)
self.e7.place(x=150,y=210)
def search_infos(self):
pn=self.e1.get()
#判断输入类型,必须为11位数字
if re.match('\d{11}',pn):
result=Get_Infos().get_infos(pn)
self.e2_var.set(pn)
self.e3_var.set(result['province'])
self.e4_var.set(result['city'])
self.e5_var.set(result['areacode'])
self.e6_var.set(result['zip'])
self.e7_var.set(result['company'])
else:
messagebox.showwarning('警告','输入有误,请检查!')
#使用线程防止UI界面卡死
def thread_it(self,func,*args):
t=threading.Thread(target=func,args=args)
t.setDaemon(True)
t.start()
def escape(self,event):
self.root.destroy()
def do_search(self,event):
self.thread_it(self.search_infos())
if __name__ == '__main__':
a=App()
Get_Attr
import json
import requests
from urllib.parse import urlencode
class Get_Infos():
def __init__(self):
self.url='http://apis.juhe.cn/mobile/get?'
self.headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
}
def get_infos(self,phone_num):
params={
'phone':phone_num,
'key':'7a2b367a62fa24108b1f27ed4c84c97a',
'dtype':''
}
r=requests.get(self.url+urlencode(params),headers=self.headers)
_json=json.loads(r.text)
if _json.get('resultcode')=='200':
result=_json.get('result')
item={}
item['province']=result.get('province')
item['city']=result.get('city')
item['areacode']=result.get('areacode')
item['zip']=result.get('zip')
item['company']=result.get('company')
return item
else:
return False
来源:https://blog.csdn.net/weixin_43457027/article/details/128468098


猜你喜欢
- 爬取TOP500的音乐信息,包括排名情况、歌曲名、歌曲时间。网页版酷狗不能手动翻页进行下一步的浏览,仔细观察第一页的URL:http://w
- MATLAB转格式,需要使用spm package(https://github.com/neurodebian/spm12).%%img2
- 1、先说恢复误删单元格的操作场景:不小心把某个cell给cut了,或者删除了单元格(前提不要关闭notebook窗口)。解决方法: 先按Es
- 如何根据二维数组中的某一行或者某一列排序?假设data是一个numpy.array类型的二维数组,可以利用numpy中的argsort函数进
- 前记上一遍文章《Python中Async语法协程的实现》介绍了Python是如何以生成器来实现协程的以及Python Asyncio通过Fu
- 前言Go 1.3 的sync包中加入一个新特性:Pool。这个类设计的目的是用来保存和复用临时对象,以减少内存分配,降低CG压力。type
- Python时间戳操作很多,每次用点时候总是去查,查的麻烦,现在自己也好好归纳一下。我现在刚好有个需求需要获取当天零点时间戳,但是网上查的大
- 使用timer来统计asp页面程序的运行时间。实例代码和说明见下:<%Dim varInitial_TimevarIniti
- 姓名的翻译: 英语是名(First name)在前,姓(Last name)在后。中文地址的翻译:如果你英语水平不高,填表时只要国家名用英语
- CategoricalDtype自定义排序当我们的透视表生成完毕后,有很多情况下需要我们对某列或某行值进行排序。排序有很多种方法。例如sor
- 当服务器必须提供与两个或更多个网络或网络子网的连接时,典型的方案是使用多宿主计算机。此计算机通常位于外围网络(也称为 DMZ、外围安全区域或
- 目录1、Unittest为Python内嵌的测试框架,不需要特殊配置2、编写规范总结1、Unittest为Python内嵌的测试框架,不需要
- InstrRev描述:返回某字符串在另一个字符串中出现的从结尾计起的位置。语法:InstrRev(string1, string2
- 数据库操作当中,当数据库对象列表不只有一个普通的元素——objectname时,你将要使用objectowner.objectname来引用
- 上一小节讲解了行高与单行纯文字的垂直居中,而如果行内含有图片和文字,在浏览器内浏览时,读者可以发现文字和图片在垂直方向并不是沿中线居中,而是
- 1 监听启动activity 信息命令adb shell logcat | grep START 可以查看apk包名和Activity名字=
- 阅读上一篇教程:WEB2.0网页制作标准教程(9)第一个CSS布局实例如果我们想在3列布局的最后加一行页脚,放版权之类的信息。就遇到必须对齐
- 数据类型:float — 浮点数可以精确到小数点后面15位int — 整型可以无限 * ool — 非零为true,零为falselist —
- $("select").change(function(){ var n = $(t
- 先来看个例子:#-*- coding:utf8 -*-s = u'中文截取's.decode('utf8')