大数据

根据地址解析省市区

gen_adcode_csv.py

import csv
from task_center.common.data_tools.mysql import MysqlTools

dbc = MysqlTools.get_dbc("1002")
sql = """
    SELECT concat(id, '000000') as adcode, city_name as name
    FROM sys_city
"""
records = dbc.select_all(sql)
# 查询城市地区的其他名称
# 查询城市地区的其他名称
other_sql = """
    SELECT adcode, other_name as name
    FROM sys_city_extend
"""
other_records = dbc.select_all(other_sql)
records.extend(other_records)

headers = ['adcode', 'name']

with open("adcodes1.csv", 'w', newline='', encoding="utf-8-sig") as fp:
    writer = csv.DictWriter(fp, headers)
    writer.writeheader()
    writer.writerows(records)

addr_info.py

class AddrInfo(object):
    RANK_PROVINCE = 0
    RANK_CITY = 1
    RANK_COUNTY = 2

    def __init__(self, name, adcode) -> None:
        self.name = name
        # adcode 的前 6 位代表省市区三级
        self.adcode = adcode[:6]

        # rank 代表行政区划级别 0: 省 1: 市 2: 县
        if self.adcode.endswith("0000"):
            self.rank = AddrInfo.RANK_PROVINCE
        elif self.adcode.endswith("00"):
            self.rank = AddrInfo.RANK_CITY
        else:
            self.rank = AddrInfo.RANK_COUNTY

    def belong_to(self, other):
        """通过 adcode 判断当前 addr 是否属于 other"""
        return self.adcode.startswith(other.adcode[:(other.rank + 1) * 2])

matcher.py

import ahocorasick
import re


class MatchInfo:

    def __init__(self, attr_infos, start_index, end_index, origin_value) -> None:
        self.attr_infos = attr_infos
        self.start_index = start_index
        self.end_index = end_index
        self.origin_value = origin_value

    def get_match_addr(self, parent_addr, first_adcode=None):
        if parent_addr:
            # 将重名的区self.attr_infos逐个和父级市按照adcode比较判断哪个区属于这个市
            return next(filter(lambda attr: attr.belong_to(parent_addr), self.attr_infos), None)
        elif first_adcode:
            res = next(filter(lambda attr: attr.adcode == first_adcode, self.attr_infos), None)
            return res if res else self.attr_infos[0]
        else:
            return self.attr_infos[0]

    def get_rank(self):
        return self.attr_infos[0].rank

    def get_one_addr(self):
        return self.attr_infos[0]

    def __repr__(self) -> str:
        return "from {} to {} value {}".format(self.start_index, self.end_index, self.origin_value)


class Matcher:
    # 特殊的简写,主要是几个少数民族自治区
    special_abbre = {
        "内蒙古自治区": "内蒙古",
        "广西壮族自治区": "广西",
        "西藏自治区": "西藏",
        "新疆维吾尔自治区": "新疆",
        "宁夏回族自治区": "宁夏",
    }

    def __init__(self, stop_re):
        self.ac = ahocorasick.Automaton()
        self.stop_re = stop_re

    def _abbr_name(self, origin_name):
        return Matcher.special_abbre.get(origin_name) or re.sub(self.stop_re, '', origin_name)

    def _first_add_addr(self, addr_info):
        abbr_name = self._abbr_name(addr_info.name)
        # 地址名与简写共享一个list
        share_list = []
        self.ac.add_word(abbr_name, (abbr_name, share_list))
        self.ac.add_word(addr_info.name, (addr_info.name, share_list))
        return abbr_name, share_list

    def add_addr_info(self, addr_info):
        # 因为区名可能重复,所以会添加多次
        # 重名的地区 都会放到一个list列表里 比如:高新区, 则会把各省的高新区都放同一个list,当搜索高新区的时候就使用这些AddInfo去比较是不是属于指定市的adcode即可
        info_tuple = self.ac.get(addr_info.name, 0) or self._first_add_addr(addr_info)
        info_tuple[1].append(addr_info)

    # 增加地址的阶段结束,之后不会再往对象中添加地址
    def complete_add(self):
        self.ac.make_automaton()

    def _get(self, key):
        return self.ac.get(key)

    def iter(self, sentence):
        prev_start_index = None
        prev_match_info = None
        prev_end_index = None
        for end_index, (original_value, attr_infos) in self.ac.iter(sentence):
            # start_index 和 end_index 是左闭右闭的
            start_index = end_index - len(original_value) + 1
            # 如果匹配到 冠县、县这样的endIndex一样的那么后来匹配的县就不要了
            if prev_end_index is not None and end_index <= prev_end_index:
                continue
            cur_match_info = MatchInfo(attr_infos, start_index, end_index, original_value)
            # 如果遇到的是全称, 会匹配到两次, 简称一次, 全称一次,所以要处理下
            if prev_match_info is not None:
                if start_index == prev_start_index:
                    yield cur_match_info
                    prev_match_info = None
                else:
                    yield prev_match_info
                    prev_match_info = cur_match_info
            else:
                prev_match_info = cur_match_info
            prev_start_index = start_index
            prev_end_index = end_index

        if prev_match_info is not None:
            yield prev_match_info

parse.py

from address.addr_info import AddrInfo
from address.matcher import Matcher
from data_tools.mysql import MysqlTools
from tools.decorator import singleton


@singleton()
class ParseAddress(object):
    _PROVINCE = ""
    _CITY = ""
    _COUNTY = ""
    _ADDR = "address"
    _ADCODE = "adcode"

    def __init__(self):
        self.addr_dict, self.matcher = self.init_data_with_mysql()

    def init_data(self, stop_key="([省市]|特别行政区|自治区)$"):
        ad_map = {}
        matcher = Matcher(stop_key)
        from pkg_resources import resource_stream
        with resource_stream('task_center.common.address', 'adcodes.csv') as csv_stream:
            from io import TextIOWrapper
            import csv
            text = TextIOWrapper(csv_stream, encoding='utf-8-sig')
            adcodes_csv_reader = csv.DictReader(text)
            for record_dict in adcodes_csv_reader:
                addr_info = AddrInfo(
                    name=record_dict["name"],
                    adcode=record_dict["adcode"])
                ad_map[record_dict["adcode"]] = addr_info
                matcher.add_addr_info(addr_info)
        matcher.complete_add()
        print("初始化数据完成.")
        return ad_map, matcher

    def init_data_with_mysql(self, stop_key="([省市]|特别行政区|自治区)$"):
        ad_map = {}
        matcher = Matcher(stop_key)
        dbc = MysqlTools.get_dbc("1002")
        sql = """
            SELECT concat(id, '000000') as adcode, city_name as name
            FROM sys_city
        """
        records = dbc.select_all(sql)
        # 查询城市地区的其他名称
        other_sql = """
            SELECT adcode, other_name as name
            FROM sys_city_extend
        """
        other_records = dbc.select_all(other_sql)
        records.extend(other_records)
        for record_dict in records:
            addr_info = AddrInfo(
                name=record_dict["name"],
                adcode=record_dict["adcode"])
            ad_map[record_dict["adcode"]] = addr_info
            matcher.add_addr_info(addr_info)
        matcher.complete_add()
        print("初始化数据完成.")
        return ad_map, matcher

    def fill_adcode(self, adcode):
        return '{:0<12s}'.format(adcode)

    def adcode_name(self, part_adcode: str):
        # 这里会将6位区号补全12位然后去查找12位区号对应的名称
        addr = self.addr_dict.get(self.fill_adcode(part_adcode))
        return None if addr is None else addr.name

    def update_res_by_adcode(self, res: dict, adcode: str):
        if adcode.endswith("0000"):
            # 省
            res[self._PROVINCE] = self.adcode_name(adcode[:2])
            res["province"] = self.fill_adcode(adcode[:2])[:-6]
        else:
            # 省市
            res[self._PROVINCE] = self.adcode_name(adcode[:2])
            res["province"] = self.fill_adcode(adcode[:2])[:-6]
            res[self._CITY] = self.adcode_name(adcode[:4])
            res["city"] = self.fill_adcode(adcode[:4])[:-6]
            # 区
            if not adcode.endswith("00"):
                res[self._COUNTY] = self.adcode_name(adcode)

    def empty_record(self):
        empty = {self._PROVINCE: None, self._CITY: None, self._COUNTY: None, self._ADDR: None, self._ADCODE: None}
        return empty

    def extract_addrs(self, sentence, umap) -> dict:
        """提取出 sentence 中的所有地址"""
        # 空记录
        if not isinstance(sentence, str) or sentence == '' or sentence is None:
            yield self.empty_record()
            return

        # 从大地区向小地区匹配
        res = self.empty_record()
        last_info = None
        adcode = None
        truncate_index = -1
        with_county_rank = False
        # 如果匹配的地区有多个重名 那么match_info中attr_infos数据将包含所有重名的AddrInfo信息
        for match_info in self.matcher.iter(sentence):
            # 当没有省市等上级地区限制时, 优先选择的区的 adcode
            first_adcode = umap.get(match_info.origin_value)
            cur_addr = match_info.get_match_addr(last_info, first_adcode)
            if cur_addr:
                last_info = cur_addr
                adcode = cur_addr.adcode
                truncate_index = match_info.end_index
                # 匹配到了县级就停止
                if cur_addr.rank == AddrInfo.RANK_COUNTY:
                    self.update_res_by_adcode(res, adcode)
                    res[self._ADDR] = sentence[truncate_index + 1:]
                    res[self._ADCODE] = adcode
                    yield res
                    res = self.empty_record()
                    last_info = None
                    adcode = None
                    truncate_index = -1
                    with_county_rank = True
        if adcode is None:
            yield res
            return

        if not with_county_rank:
            self.update_res_by_adcode(res, adcode)
            res[self._ADDR] = sentence[truncate_index + 1:]
            res[self._ADCODE] = None
        yield res

    def parse(self, sentence):
        return next(self.extract_addrs(sentence, umap={}))