When randomly sampling or tagging records using row number in a table, row_number() plus partition over is often used. For example, if there is a table with schema like,
CREATE TABLE IF NOT EXISTS A AS (
c1 STRING,
c2 STRING
)
PARTITIONED BY (
c3 STRING
)
If we want randomly sampling N records from each partition in A, in general, the script likes
CREATE TABLE B AS
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER(PARTITION BY c3 ORDER BY RAND()) r
FROM A
)
WHERE r<=N
;
If A is small, e.g. less than 1 million, there is not significant efficiency issue. But if A is bigger, particularly if string value is very long, which is the case when storing an image data as base64 encoded string, one image may have a few MB. Thus, millions of records may be more than a few TB. In this case, efficiency issue happens. It may run a few hours or even hang here.
After analysis, the bottleneck is the last reduce stage, where only a few instances allocated, limited by partition size c3. e.g. if c3 has 3 partition, 3 instances allocated. In case of no partition table, it is 1 instance allocated. Since millions of records and large tables, it costs a lot of memory. Allocated instances cannot handle.
To solve the issue, allocating more instances in reducer is a must. In order to make it work, we re-write the process logic as,
Assume c1 is sample_id colume, e.g. person identity. c2 is person image base64.
CREATE TABLE B AS
WITH b AS (
SELECT c1, c2
FROM (
SELECT c1,
c3,
ROW_NUMBER() OVER(PARTITION BY c3 ORDER BY RAND()) r
FROM A
)
WHERE r<=N
)
SELECT A.*
FROM A
JOIN b
ON b.c1=A.c1
;
After above re-organize the processing logic, block b does not load large value column c2 field, which makes it fast. And the JOIN operation can apply many instances, which is not constrained by c3 partition. In my case, when processing a few millions of image base64 records, it spends hours and cannot complete the task before re-organize logic. After re-write logic, a few minutes the job completes.