Search code examples
apache-sparkjoinsubstring

Join two df and cut second df field by first df field condition during join?


I have two df -

first contains start and end position, like

id start end 

1  4     8

2  2     6

2  5     7

And second df with id-string

id string

1   my beautiful data

2   lorem ipsum

Join "as is" and then cut string to needed positions - fails out of memory. First df has about 1kk entries, and second - about 10 entries, but every string is about 100Mb

So i am thinking if it is possible to cut string during join, and then every string will be few chars - and it's acceptable size.

Result should be like:

id start end seq

1  | 4   |  8  | beaut

2  | 2   |  6  | orem i

2  | 5   | 7   | m i


Thanks a lot!

UPD:

Currently i just do join 1-to-1, and then get substring like

df1 = df1.join(df2, "id") 

df1.withColumn("substring" df1['string'].substr(df1.start, df1.end))).show() 

But join fails with OOM, here is Optimized physical plan (i tried broadcast, repartition), but anyway such size for intermediate result is unacceptable. Separetly this dfs are shown and processed without errors:

exonsDF = exons_raw.join(dnaDF, "seq_region_id").explain("cost")

   == Optimized Logical Plan ==
Project [seq_region_id#63L, exon_id#62L, seq_region_start#64L, seq_region_end#65L, seq_region_strand#66, phase#67, end_phase#68, is_current#69, is_constitutive#70, stable_id#71, version#72, created_date#73, modified_date#74, transcript_id#75L, rank#76, sequence#148], Statistics(sizeInBytes=12.3 PiB)
+- Join Inner, (seq_region_id#63L = cast(seq_region_id#147 as bigint)), Statistics(sizeInBytes=14.4 PiB)
   :- Repartition 10, true, Statistics(sizeInBytes=13.6 MiB)
   :  +- Filter isnotnull(seq_region_id#63L), Statistics(sizeInBytes=13.6 MiB)
   :     +- Relation [exon_id#62L,seq_region_id#63L,seq_region_start#64L,seq_region_end#65L,seq_region_strand#66,phase#67,end_phase#68,is_current#69,is_constitutive#70,stable_id#71,version#72,created_date#73,modified_date#74,transcript_id#75L,rank#76] orc, Statistics(sizeInBytes=13.6 MiB)
   +- Repartition 10000, true, Statistics(sizeInBytes=1083.8 MiB)
      +- Filter isnotnull(seq_region_id#147), Statistics(sizeInBytes=1083.8 MiB)
         +- Relation [seq_region_id#147,sequence#148] orc, Statistics(sizeInBytes=1083.8 MiB)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [seq_region_id#63L, exon_id#62L, seq_region_start#64L, seq_region_end#65L, seq_region_strand#66, phase#67, end_phase#68, is_current#69, is_constitutive#70, stable_id#71, version#72, created_date#73, modified_date#74, transcript_id#75L, rank#76, sequence#148]
   +- SortMergeJoin [seq_region_id#63L], [cast(seq_region_id#147 as bigint)], Inner
      :- Sort [seq_region_id#63L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(seq_region_id#63L, 200), ENSURE_REQUIREMENTS, [plan_id=335]
      :     +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=330]
      :        +- Filter isnotnull(seq_region_id#63L)
      :           +- FileScan orc [exon_id#62L,seq_region_id#63L,seq_region_start#64L,seq_region_end#65L,seq_region_strand#66,phase#67,end_phase#68,is_current#69,is_constitutive#70,stable_id#71,version#72,created_date#73,modified_date#74,transcript_id#75L,rank#76] Batched: true, DataFilters: [isnotnull(seq_region_id#63L)], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/nfs/production/flicek/ensembl/infrastructure/mira/19tmp/exons], PartitionFilters: [], PushedFilters: [IsNotNull(seq_region_id)], ReadSchema: struct<exon_id:bigint,seq_region_id:bigint,seq_region_start:bigint,seq_region_end:bigint,seq_regi...
      +- Sort [cast(seq_region_id#147 as bigint) ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(cast(seq_region_id#147 as bigint), 200), ENSURE_REQUIREMENTS, [plan_id=336]
            +- Exchange RoundRobinPartitioning(10000), REPARTITION_BY_NUM, [plan_id=331]
               +- Filter isnotnull(seq_region_id#147)
                  +- FileScan orc [seq_region_id#147,sequence#148] Batched: true, DataFilters: [isnotnull(seq_region_id#147)], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/nfs/production/flicek/ensembl/infrastructure/mira/6tmp/sequence], PartitionFilters: [], PushedFilters: [IsNotNull(seq_region_id)], ReadSchema: struct<seq_region_id:string,sequence:string>


UPD2 - with new plan


== Optimized Logical Plan ==
Project [seq_region_id#63L, exon_id#62L, seq_region_start#64L, seq_region_end#65L, seq_region_strand#66, phase#67, end_phase#68, is_current#69, is_constitutive#70, stable_id#71, version#72, created_date#73, modified_date#74, transcript_id#75L, rank#76, sequence#148], Statistics(sizeInBytes=12.3 PiB)
+- Join Inner, (seq_region_id#63L = cast(seq_region_id#147 as bigint)), Statistics(sizeInBytes=14.4 PiB)
   :- RepartitionByExpression [seq_region_id#63L], Statistics(sizeInBytes=13.6 MiB)
   :  +- Filter isnotnull(seq_region_id#63L), Statistics(sizeInBytes=13.6 MiB)
   :     +- Relation [exon_id#62L,seq_region_id#63L,seq_region_start#64L,seq_region_end#65L,seq_region_strand#66,phase#67,end_phase#68,is_current#69,is_constitutive#70,stable_id#71,version#72,created_date#73,modified_date#74,transcript_id#75L,rank#76] orc, Statistics(sizeInBytes=13.6 MiB)
   +- RepartitionByExpression [cast(seq_region_id#147 as bigint)], Statistics(sizeInBytes=1083.8 MiB)
      +- Filter isnotnull(seq_region_id#147), Statistics(sizeInBytes=1083.8 MiB)
         +- Relation [seq_region_id#147,sequence#148] orc, Statistics(sizeInBytes=1083.8 MiB)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [seq_region_id#63L, exon_id#62L, seq_region_start#64L, seq_region_end#65L, seq_region_strand#66, phase#67, end_phase#68, is_current#69, is_constitutive#70, stable_id#71, version#72, created_date#73, modified_date#74, transcript_id#75L, rank#76, sequence#148]
   +- SortMergeJoin [seq_region_id#63L], [cast(seq_region_id#147 as bigint)], Inner
      :- Sort [seq_region_id#63L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(seq_region_id#63L, 200), REPARTITION_BY_COL, [plan_id=330]
      :     +- Filter isnotnull(seq_region_id#63L)
      :        +- FileScan orc [exon_id#62L,seq_region_id#63L,seq_region_start#64L,seq_region_end#65L,seq_region_strand#66,phase#67,end_phase#68,is_current#69,is_constitutive#70,stable_id#71,version#72,created_date#73,modified_date#74,transcript_id#75L,rank#76] Batched: true, DataFilters: [isnotnull(seq_region_id#63L)], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/nfs/production/flicek/ensembl/infrastructure/mira/0tmp/exons], PartitionFilters: [], PushedFilters: [IsNotNull(seq_region_id)], ReadSchema: struct<exon_id:bigint,seq_region_id:bigint,seq_region_start:bigint,seq_region_end:bigint,seq_regi...
      +- Sort [cast(seq_region_id#147 as bigint) ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(cast(seq_region_id#147 as bigint), 200), REPARTITION_BY_COL, [plan_id=331]
            +- Filter isnotnull(seq_region_id#147)
               +- FileScan orc [seq_region_id#147,sequence#148] Batched: true, DataFilters: [isnotnull(seq_region_id#147)], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/nfs/production/flicek/ensembl/infrastructure/mira/0tmp/sequence], PartitionFilters: [], PushedFilters: [IsNotNull(seq_region_id)], ReadSchema: struct<seq_region_id:string,sequence:string>

Solution

  • Rather than repartitioning by a number, attempt to do so through the seq_region_id field:

    exon = exon.repartition(exon['seq_region_id'])
    sequence = sequence.repartition(sequence['seq_region_id'].cast("bigint"))
    

    The cast is necessary as

    cast(seq_region_id#147 as bigint)

    is required from sequence which has string, exon has bigint on disk. That may be on purpose but given the names and this example it looks like a mistake, this too would speed things up to correct on disk.

    Given 3.5 usage the sorted merge join is definitely the fastest / most efficient way to handle this and spark will choose this for you.

    Lastly make sure to drop "sequence" after the substring operation. This will let spark know it's not required in the final output and also stands a chance of being lower memory usage.