文章
根据地址解析省市区
gen_adcode_csv.py
import csv
from task_center.common.data_tools.mysql import MysqlTools
dbc = MysqlTools.get_dbc("1002")
sql = """
SELECT concat(id, '000000') as adcode, city_name as name
FROM sys_city
"""
records = dbc.select_all(sql)
# 查询城市地区的其他名称
# 查询城市地区的其他名称
other_sql = """
SELECT adcode, other_name as name
FROM sys_city_extend
"""
other_records = dbc.select_all(other_sql)
records.extend(other_records)
headers = ['adcode', 'name']
with open("adcodes1.csv", 'w', newline='', encoding="utf-8-sig") as fp:
writer = csv.DictWriter(fp, headers)
writer.writeheader()
writer.writerows(records)
addr_info.py
class AddrInfo(object):
RANK_PROVINCE = 0
RANK_CITY = 1
RANK_COUNTY = 2
def __init__(self, name, adcode) -> None:
self.name = name
# adcode 的前 6 位代表省市区三级
self.adcode = adcode[:6]
# rank 代表行政区划级别 0: 省 1: 市 2: 县
if self.adcode.endswith("0000"):
self.rank = AddrInfo.RANK_PROVINCE
elif self.adcode.endswith("00"):
self.rank = AddrInfo.RANK_CITY
else:
self.rank = AddrInfo.RANK_COUNTY
def belong_to(self, other):
"""通过 adcode 判断当前 addr 是否属于 other"""
return self.adcode.startswith(other.adcode[:(other.rank + 1) * 2])
matcher.py
import ahocorasick
import re
class MatchInfo:
def __init__(self, attr_infos, start_index, end_index, origin_value) -> None:
self.attr_infos = attr_infos
self.start_index = start_index
self.end_index = end_index
self.origin_value = origin_value
def get_match_addr(self, parent_addr, first_adcode=None):
if parent_addr:
# 将重名的区self.attr_infos逐个和父级市按照adcode比较判断哪个区属于这个市
return next(filter(lambda attr: attr.belong_to(parent_addr), self.attr_infos), None)
elif first_adcode:
res = next(filter(lambda attr: attr.adcode == first_adcode, self.attr_infos), None)
return res if res else self.attr_infos[0]
else:
return self.attr_infos[0]
def get_rank(self):
return self.attr_infos[0].rank
def get_one_addr(self):
return self.attr_infos[0]
def __repr__(self) -> str:
return "from {} to {} value {}".format(self.start_index, self.end_index, self.origin_value)
class Matcher:
# 特殊的简写,主要是几个少数民族自治区
special_abbre = {
"内蒙古自治区": "内蒙古",
"广西壮族自治区": "广西",
"西藏自治区": "西藏",
"新疆维吾尔自治区": "新疆",
"宁夏回族自治区": "宁夏",
}
def __init__(self, stop_re):
self.ac = ahocorasick.Automaton()
self.stop_re = stop_re
def _abbr_name(self, origin_name):
return Matcher.special_abbre.get(origin_name) or re.sub(self.stop_re, '', origin_name)
def _first_add_addr(self, addr_info):
abbr_name = self._abbr_name(addr_info.name)
# 地址名与简写共享一个list
share_list = []
self.ac.add_word(abbr_name, (abbr_name, share_list))
self.ac.add_word(addr_info.name, (addr_info.name, share_list))
return abbr_name, share_list
def add_addr_info(self, addr_info):
# 因为区名可能重复,所以会添加多次
# 重名的地区 都会放到一个list列表里 比如:高新区, 则会把各省的高新区都放同一个list,当搜索高新区的时候就使用这些AddInfo去比较是不是属于指定市的adcode即可
info_tuple = self.ac.get(addr_info.name, 0) or self._first_add_addr(addr_info)
info_tuple[1].append(addr_info)
# 增加地址的阶段结束,之后不会再往对象中添加地址
def complete_add(self):
self.ac.make_automaton()
def _get(self, key):
return self.ac.get(key)
def iter(self, sentence):
prev_start_index = None
prev_match_info = None
prev_end_index = None
for end_index, (original_value, attr_infos) in self.ac.iter(sentence):
# start_index 和 end_index 是左闭右闭的
start_index = end_index - len(original_value) + 1
# 如果匹配到 冠县、县这样的endIndex一样的那么后来匹配的县就不要了
if prev_end_index is not None and end_index <= prev_end_index:
continue
cur_match_info = MatchInfo(attr_infos, start_index, end_index, original_value)
# 如果遇到的是全称, 会匹配到两次, 简称一次, 全称一次,所以要处理下
if prev_match_info is not None:
if start_index == prev_start_index:
yield cur_match_info
prev_match_info = None
else:
yield prev_match_info
prev_match_info = cur_match_info
else:
prev_match_info = cur_match_info
prev_start_index = start_index
prev_end_index = end_index
if prev_match_info is not None:
yield prev_match_info
parse.py
from address.addr_info import AddrInfo
from address.matcher import Matcher
from data_tools.mysql import MysqlTools
from tools.decorator import singleton
@singleton()
class ParseAddress(object):
_PROVINCE = "省"
_CITY = "市"
_COUNTY = "区"
_ADDR = "address"
_ADCODE = "adcode"
def __init__(self):
self.addr_dict, self.matcher = self.init_data_with_mysql()
def init_data(self, stop_key="([省市]|特别行政区|自治区)$"):
ad_map = {}
matcher = Matcher(stop_key)
from pkg_resources import resource_stream
with resource_stream('task_center.common.address', 'adcodes.csv') as csv_stream:
from io import TextIOWrapper
import csv
text = TextIOWrapper(csv_stream, encoding='utf-8-sig')
adcodes_csv_reader = csv.DictReader(text)
for record_dict in adcodes_csv_reader:
addr_info = AddrInfo(
name=record_dict["name"],
adcode=record_dict["adcode"])
ad_map[record_dict["adcode"]] = addr_info
matcher.add_addr_info(addr_info)
matcher.complete_add()
print("初始化数据完成.")
return ad_map, matcher
def init_data_with_mysql(self, stop_key="([省市]|特别行政区|自治区)$"):
ad_map = {}
matcher = Matcher(stop_key)
dbc = MysqlTools.get_dbc("1002")
sql = """
SELECT concat(id, '000000') as adcode, city_name as name
FROM sys_city
"""
records = dbc.select_all(sql)
# 查询城市地区的其他名称
other_sql = """
SELECT adcode, other_name as name
FROM sys_city_extend
"""
other_records = dbc.select_all(other_sql)
records.extend(other_records)
for record_dict in records:
addr_info = AddrInfo(
name=record_dict["name"],
adcode=record_dict["adcode"])
ad_map[record_dict["adcode"]] = addr_info
matcher.add_addr_info(addr_info)
matcher.complete_add()
print("初始化数据完成.")
return ad_map, matcher
def fill_adcode(self, adcode):
return '{:0<12s}'.format(adcode)
def adcode_name(self, part_adcode: str):
# 这里会将6位区号补全12位然后去查找12位区号对应的名称
addr = self.addr_dict.get(self.fill_adcode(part_adcode))
return None if addr is None else addr.name
def update_res_by_adcode(self, res: dict, adcode: str):
if adcode.endswith("0000"):
# 省
res[self._PROVINCE] = self.adcode_name(adcode[:2])
res["province"] = self.fill_adcode(adcode[:2])[:-6]
else:
# 省市
res[self._PROVINCE] = self.adcode_name(adcode[:2])
res["province"] = self.fill_adcode(adcode[:2])[:-6]
res[self._CITY] = self.adcode_name(adcode[:4])
res["city"] = self.fill_adcode(adcode[:4])[:-6]
# 区
if not adcode.endswith("00"):
res[self._COUNTY] = self.adcode_name(adcode)
def empty_record(self):
empty = {self._PROVINCE: None, self._CITY: None, self._COUNTY: None, self._ADDR: None, self._ADCODE: None}
return empty
def extract_addrs(self, sentence, umap) -> dict:
"""提取出 sentence 中的所有地址"""
# 空记录
if not isinstance(sentence, str) or sentence == '' or sentence is None:
yield self.empty_record()
return
# 从大地区向小地区匹配
res = self.empty_record()
last_info = None
adcode = None
truncate_index = -1
with_county_rank = False
# 如果匹配的地区有多个重名 那么match_info中attr_infos数据将包含所有重名的AddrInfo信息
for match_info in self.matcher.iter(sentence):
# 当没有省市等上级地区限制时, 优先选择的区的 adcode
first_adcode = umap.get(match_info.origin_value)
cur_addr = match_info.get_match_addr(last_info, first_adcode)
if cur_addr:
last_info = cur_addr
adcode = cur_addr.adcode
truncate_index = match_info.end_index
# 匹配到了县级就停止
if cur_addr.rank == AddrInfo.RANK_COUNTY:
self.update_res_by_adcode(res, adcode)
res[self._ADDR] = sentence[truncate_index + 1:]
res[self._ADCODE] = adcode
yield res
res = self.empty_record()
last_info = None
adcode = None
truncate_index = -1
with_county_rank = True
if adcode is None:
yield res
return
if not with_county_rank:
self.update_res_by_adcode(res, adcode)
res[self._ADDR] = sentence[truncate_index + 1:]
res[self._ADCODE] = None
yield res
def parse(self, sentence):
return next(self.extract_addrs(sentence, umap={}))