文章
数组包含关系筛选
需求:筛选 姓名相同的联系方式 保留 联系方式最多的那条数据,联系方式包含子联系方式
drop table contact_info;
create table contact_info (
id varchar(255),
name string,
contact_info string
) duplicate KEY (id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
show tables;
insert into contact_info values ('1', 'bob', 'a'),
('1', 'bob', 'b'),
('1', 'bob', 'c'),
('2', 'bob', 'a'),
('3', 'bob', 'd'),
('4', 'bob', 'c'),
('4', 'bob', 'd'),
('5', 'bob', 'a'),
('5', 'bob', 'b');
SELECT id, name, ARRAY_SORT(COLLECT_SET(contact_info)) contact_infos
FROM contact_info
GROUP BY id, name
ORDER BY id;
CREATE TABLE contact_info_set (
id varchar(255),
name string,
contact_info array<string>
) duplicate KEY (id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
INSERT INTO contact_info_set
SELECT id, name, ARRAY_SORT(COLLECT_SET(contact_info)) contact_infos
FROM contact_info
GROUP BY id, name
ORDER BY id;
SELECT *
FROM contact_info_set;
-- 按照名称分组,联系方式元素个数从多到少升序 进行排序
CREATE TABLE ranked (
id varchar(255),
name string,
contact_info array<string>,
rn BIGINT
) duplicate KEY (id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
INSERT INTO ranked
SELECT id, name, contact_info,
ROW_NUMBER() over (PARTITION BY name ORDER BY size(contact_info) DESC) AS rn
FROM contact_info_set
;
-- 找到包含当前记录的最长数组的id
CREATE TABLE id_mapping (
id varchar(255),
new_id string
) duplicate KEY (id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
INSERT INTO id_mapping
SELECT t1.id,
nvl(t1.new_id, t1.id) as new_id
FROM (
SELECT r1.id,
-- 找到包含当前联系方式的最长数组的id
-- join 组装 一个子集 r1 可能多个 父级 r2 所以 按照r1.id 进行分组
first_value(r2.id) over (PARTITION BY r1.id ORDER BY r2.rn ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS new_id
FROM ranked r1 -- 原来数据
LEFT JOIN ranked r2 -- 对比数据
ON r1.name = r2.name
AND r2.rn < r1.rn -- 只找比自己排序靠前 也就是 练习方式比自己多的
AND size(array_intersect(r2.contact_info, r1.contact_info)) = size(r1.contact_info) -- r2 是否包含 r1
) t1
;
CREATE TABLE result (
id varchar(255),
name string,
contact_info array<string>,
delete_flag STRING,
new_id STRING
) duplicate KEY (id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
INSERT INTO result
SELECT t1.id,
t1.name,
t1.contact_info,
if(t1.id = t2.new_id, 0, 1) as delete_flag,
t2.new_id
FROM ranked t1
LEFT JOIN id_mapping t2 ON t1.id = t2.id;
SELECT *
FROM result;