材料清单
- 操作系统: MacOS Catalina 14.5
- 数据库:[email protected]
- Py版本:Python3.7
数据库设计:
- 字段:
字段名称 字段类型 长度 ID int
0 Phone bigint
11 QQNumber bigint
11 -
索引:
名称 索引 索引类型 索引方法 QQNumber qqNumber
NORMAL BTREE Phone phone
NORMAL BTREE -
数据量:5亿条
- 查询时间:5秒内
- 图片
查询速度
Python源代码
# -- coding:UTF-8 –
import random
import time
import pymysql
conn = pymysql.connect(host='localhost', user='root', password='admin141', db='Social', charset='utf8')
cur = conn.cursor()
# number_to_chinese, 单位-数字
num_dict = {0: "零", 1: "一", 2: "二", 3: "三", 4: "四",
5: "五", 6: "六", 7: "七", 8: "八", 9: "九"}
unit_map = [["", "十", "百", "千"], ["万", "十万", "百万", "千万"],
["亿", "十亿", "百亿", "千亿"], ["兆", "十兆", "百兆", "千兆"]]
unit_step = ["万", "亿", "兆"]
class number_to_chinese():
"""
参考: https://github.com/tyong920/a2c
"""
def __init__(self):
self.result = ""
def number_to_str_10000(self, data_str):
"""一万以内的数转成大写"""
res = []
count = 0
# 倒转
str_rev = reversed(data_str) # seq -- 要转换的序列,可以是 tuple, string, list 或 range。返回一个反转的迭代器。
for i in str_rev:
if i is not "0":
count_cos = count // 4 # 行
count_col = count % 4 # 列
res.append(unit_map[count_cos][count_col])
res.append(num_dict[int(i)])
count += 1
else:
count += 1
if not res:
res.append("零")
elif res[-1] is not "零":
res.append("零")
# 再次倒序,这次变为正序了
res.reverse()
# 去掉"一十零"这样整数的“零”
if res[-1] is "零" and len(res) is not 1:
res.pop()
return "".join(res)
def number_to_str(self, data):
"""分段转化"""
assert type(data) == float or int
data_str = str(data)
len_data = len(str(data_str))
count_cos = len_data // 4 # 行
count_col = len_data - count_cos * 4 # 列
if count_col > 0: count_cos += 1
res = ""
for i in range(count_cos):
if i == 0:
data_in = data_str[-4:]
elif i == count_cos - 1 and count_col > 0:
data_in = data_str[:count_col]
else:
data_in = data_str[-(i + 1) * 4:-(i * 4)]
res_ = self.number_to_str_10000(data_in)
res = res_ + unit_map[i][0] + res
return res
def decimal_chinese(self, data):
assert type(data) == float or int
data_str = str(data)
if "." not in data_str:
res = self.number_to_str(data_str)
else:
data_str_split = data_str.split(".")
if len(data_str_split) is 2:
res_start = self.number_to_str(data_str_split[0])
res_end = "".join([num_dict[int(number)] for number in data_str_split[1]])
res = res_start + random.sample(["点", "."], 1)[0] + res_end
else:
res = str(data)
return res
def getDB(dataList):
sql = "insert into qq(qqNumber, phone) values (%s,%s)"
cur.executemany(sql, dataList)
conn.commit()
print("[+] 插入成功: " + str(len(dataList)) + "条")
writeLog("[+] 插入成功: " + str(len(dataList)) + "条")
def writeLog(lineTextLog):
with open("logs.txt", 'a+') as f:
f.write(lineTextLog)
def AddList(filePath):
dataList = []
lineCount = 0
AllCount = 0
try:
for line in open(filePath, "r"):
try:
lineCount = lineCount + 1
resultLine = line.split('-')
if len(dataList) == 1000000:
AllCount = AllCount + 1
getDB(dataList)
dataList.clear()
print("[+] 插入成功第" + str(AllCount) + "波数据,插入数据量为100万条")
print("[+] 总共插入(中文): " + num.decimal_chinese(lineCount))
print("[+] 总共插入(数字): " + num.decimal_chinese(lineCount))
print("[+] =============================")
writeLog("[+] 插入成功第" + str(AllCount) + "波数据,插入数据量为100万条")
writeLog("[+] 总共插入(中文): " + num.decimal_chinese(lineCount))
writeLog("[+] 总共插入(数字): " + num.decimal_chinese(lineCount))
writeLog("[+] =============================")
time.sleep(5)
dataList.append([resultLine[0], resultLine[1]])
except:
print("[!] 插入出错,第: " + str(AllCount) + "波数据")
print("[!] 插入出错,第: " + num.decimal_chinese(lineCount) + "条")
writeLog("[!] 插入出错,第: " + str(AllCount) + "波数据")
writeLog("[!] 插入出错,第: " + num.decimal_chinese(lineCount) + "条")
continue
except:
print("[!] 读取文件错误,读取第" + str(lineCount) + "条时出错")
writeLog("[!] 读取文件错误,读取第" + str(lineCount) + "条时出错")
if __name__ == '__main__':
num = number_to_chinese()
AddList("/Users/Apple/Downloads/tencentnew1.txt")
cur.close()
conn.close()