大数据

数组包含关系筛选

需求:筛选 姓名相同的联系方式 保留 联系方式最多的那条数据,联系方式包含子联系方式

drop table contact_info;
create table contact_info (
    id varchar(255),
    name string,
    contact_info string
) duplicate KEY (id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
    "replication_allocation" = "tag.location.default: 1"
);

show tables;

insert into contact_info values ('1', 'bob', 'a'),
                                ('1', 'bob', 'b'),
                                ('1', 'bob', 'c'),
                                ('2', 'bob', 'a'),
                                ('3', 'bob', 'd'),
                                ('4', 'bob', 'c'),
                                ('4', 'bob', 'd'),
                                ('5', 'bob', 'a'),
                                ('5', 'bob', 'b');

SELECT id, name, ARRAY_SORT(COLLECT_SET(contact_info)) contact_infos
FROM contact_info
GROUP BY id, name
ORDER BY id;

CREATE TABLE contact_info_set (
    id varchar(255),
    name string,
    contact_info array<string>
) duplicate KEY (id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
    "replication_allocation" = "tag.location.default: 1"
);

INSERT INTO contact_info_set
SELECT id, name, ARRAY_SORT(COLLECT_SET(contact_info)) contact_infos
FROM contact_info
GROUP BY id, name
ORDER BY id;

SELECT *
FROM contact_info_set;

-- 按照名称分组,联系方式元素个数从多到少升序 进行排序
CREATE TABLE ranked (
    id varchar(255),
    name string,
    contact_info array<string>,
    rn BIGINT
) duplicate KEY (id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
    "replication_allocation" = "tag.location.default: 1"
);

INSERT INTO ranked
SELECT id, name, contact_info,
       ROW_NUMBER() over (PARTITION BY name ORDER BY size(contact_info) DESC) AS rn
FROM contact_info_set
;

-- 找到包含当前记录的最长数组的id
CREATE TABLE id_mapping (
    id varchar(255),
    new_id string
) duplicate KEY (id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
    "replication_allocation" = "tag.location.default: 1"
);

INSERT INTO id_mapping
SELECT t1.id,
       nvl(t1.new_id, t1.id) as new_id
FROM (
         SELECT r1.id,
                -- 找到包含当前联系方式的最长数组的id
                -- join 组装 一个子集 r1 可能多个 父级 r2 所以 按照r1.id 进行分组
                first_value(r2.id) over (PARTITION BY r1.id ORDER BY r2.rn ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS new_id
         FROM ranked r1 -- 原来数据
                  LEFT JOIN ranked r2 -- 对比数据
                            ON r1.name = r2.name
                                AND r2.rn < r1.rn -- 只找比自己排序靠前 也就是 练习方式比自己多的
                                AND size(array_intersect(r2.contact_info, r1.contact_info)) = size(r1.contact_info)  -- r2 是否包含 r1
) t1
;


CREATE TABLE result (
    id varchar(255),
    name string,
    contact_info array<string>,
    delete_flag STRING,
    new_id STRING
) duplicate KEY (id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
    "replication_allocation" = "tag.location.default: 1"
);

INSERT INTO result
SELECT t1.id,
       t1.name,
       t1.contact_info,
       if(t1.id = t2.new_id, 0, 1) as delete_flag,
       t2.new_id
FROM ranked t1
    LEFT JOIN id_mapping t2 ON t1.id = t2.id;

SELECT *
FROM result;