Search code examples
python-3.xapache-sparkpysparkteradatajoblib

Pyspark with Joblib giving me ambiguous result


I am trying to fetch data from teradata--

select ... from table1_1
union all
select .. from table_2
union all
select ... from table_3

NOTE: One or more select .. may fail which should not cause whole union to fail.

from .base import Base
from joblib import Parallel, delayed
import re
import pandas as pd

class TeradataWithSpark(Base):
    def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
        super().__init__(spark, host, port, database, username, password)
        self._reader = self._spark.read.format("jdbc") \
                .option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
                .option("user", self._username) \
                .option("password", self._password) \
                .option("driver", "com.teradata.jdbc.TeraDriver")

    def run_query(self, query, return_pandasDF=True):
        spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
        if return_pandasDF:
            return spark_df.toPandas()
        else:
            return spark_df

    def run_queries_and_union_all(self, queries, return_pandasDF=True):
        def run(query):
            try:
                return self._reader.option('dbtable', f"({query}) as tbl").load().toPandas()
            except Exception as e:
                return None
        
        dfs = Parallel(n_jobs=10, prefer='threads')(delayed(run)(q) for q in queries)
        concat_df = pd.concat(dfs).reset_index(drop=True)
        if return_pandasDF:
            return concat_df
        else:
            return self._spark.createDataFrame(concat_df)

    def split_query_and_run_individually(self, query, separator='union all', return_pandasDF=True):
        queries = re.split(separator, query, flags=re.IGNORECASE)
        return self.run_queries_and_union_all(queries, return_pandasDF)

As you can see split_query_and_run_individually method splits the query based on union all then runs all of the sub queries in parallel threads n_jobs=10.

But the problem I am facing is that the data is corrupted like this

n_jobs = 1
                               src_tbl  total_count   data_date
0     dsl_dim_mdm_....................           61  2022-03-17
1     dsl_agg_call....................      3992202  2022-03-27
2      dsl_call_ac....................       924719  2022-03-27
3      dsl_dim_acc....................         4762  2022-03-31
4                 ....................         6821  2022-03-31
5     dsl_dim_geo_....................      8610038  2022-04-05
6              dsl....................        67116  2022-03-31
7           dsl_rl....................      2087669  2022-04-06
8             dsl_....................          154  2022-04-01
9             dsl_....................        85630  2022-03-27
10  dsl_selling_da....................           53  2021-03-03
11  dsl_speaker_ev....................        17765  2022-03-31
12       dsl_speak....................        26269  2022-08-24
13   dsl_speaker_e....................         4202  2022-04-05
14              ds....................          268  2022-03-31
15      dsl_rltn_r....................       255794  2022-03-18
16     dsl_rltn_nr....................        12088  2022-03-18
17        dsl_rapp....................        81182  2022-01-01
18   dsl_dim_physi....................       109299  2022-03-31
19             dsl....................         4265  2022-02-01
20         dsl_fac....................       117978  2022-04-03
21      dsl_coachi....................          242  2022-03-31
22   dsl_speaker_e....................        16653  2022-03-31
23     dsl_dim_cal....................        17817  2099-12-31
24    dsl_rltn_nrt....................         3304  2022-02-01
Time took: 3.4742537260055544 minutes
-----------
n_jobs=10
                              src_tbl  total_count   data_date
0             dsl_sel................        85630  2022-03-27
1             dsl_sel................        85630  2022-03-27
2             dsl_sel................        85630  2022-03-27
3             dsl_sel................        85630  2022-03-27
4             dsl_sel................        85630  2022-03-27
5             dsl_sel................        85630  2022-03-27
6             dsl_sel................        85630  2022-03-27
7             dsl_sel................        85630  2022-03-27
8             dsl_sel................        85630  2022-03-27
9             dsl_sel................        85630  2022-03-27
10  dsl_speaker_event................        17765  2022-03-31
11   dsl_speaker_even................         4202  2022-04-05
12   dsl_speaker_even................         4202  2022-04-05
13              dsl_s................          268  2022-03-31
14        dsl_rapper_................        81182  2022-01-01
15        dsl_rapper_................        81182  2022-01-01
16     dsl_rltn_nrtl_................        12088  2022-03-18
17        dsl_rapper_................        81182  2022-01-01
18   dsl_dim_physicia................       109299  2022-03-31
19             dsl_cu................         4265  2022-02-01
20         dsl_fact_f................       117978  2022-04-03
21      dsl_coaching_................          242  2022-03-31
22   dsl_speaker_even................        16653  2022-03-31
23     dsl_dim_call_c................        17817  2099-12-31
24    dsl_rltn_nrtl_r................         3304  2022-02-01
Time took: 1.8048373858133953 minutes
-----------
n_jobs=-1
                            src_tbl  total_count   data_date
0   dsl_dim_acc....................         4762  2022-03-31
1   dsl_dim_acc....................         4762  2022-03-31
2   dsl_dim_acc....................         4762  2022-03-31
3   dsl_dim_acc....................         4762  2022-03-31
4   dsl_dim_acc....................         4762  2022-03-31
5   dsl_dim_acc....................         4762  2022-03-31
6   dsl_dim_acc....................         4762  2022-03-31
7   dsl_dim_acc....................         4762  2022-03-31
8   dsl_dim_acc....................         4762  2022-03-31
9   dsl_dim_acc....................         4762  2022-03-31
10  dsl_dim_acc....................         4762  2022-03-31
11  dsl_dim_acc....................         4762  2022-03-31
12  dsl_dim_acc....................         4762  2022-03-31
13  dsl_dim_acc....................         4762  2022-03-31
14  dsl_dim_acc....................         4762  2022-03-31
15  dsl_dim_acc....................         4762  2022-03-31
16  dsl_dim_acc....................         4762  2022-03-31
17  dsl_dim_acc....................         4762  2022-03-31
18  dsl_dim_acc....................         4762  2022-03-31
19  dsl_dim_acc....................         4762  2022-03-31
20  dsl_dim_acc....................         4762  2022-03-31
21  dsl_dim_acc....................         4762  2022-03-31
22  dsl_dim_acc....................         4762  2022-03-31
23  dsl_dim_acc....................         4762  2022-03-31
24  dsl_dim_acc....................         4762  2022-03-31
25  dsl_dim_acc....................         4762  2022-03-31
-----------

As you can see as I increase the number of threads the result becomes ambiguous. What is happening is that the results from each query are overlapping with each other.

I have also implemented the same class with teradatasql library which works just fine with n_jobs=-1. I think self._reader.option('dbtable', f"({query}) as tbl").load() is getting messed up in threads. I tried with ThreadpoolExecutor but similar result. Does anyone know how to solve this issue? Versions

Python 3.6.8
Spark 2.4.0-cdh6.3.4

Solution

  • Thanks to @pltc, here is one solution. Although it is very slow compared to teradatasql library with multithreading although FAIR schedulers on

    from .base import Base
    import re
    import pandas as pd
    from pyspark.sql import DataFrame
    from functools import reduce
    
    class TeradataWithSpark(Base):
        def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
            super().__init__(spark, host, port, database, username, password)
            self._reader = self._spark.read.format("jdbc") \
                    .option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
                    .option("user", self._username) \
                    .option("password", self._password) \
                    .option("driver", "com.teradata.jdbc.TeraDriver")
    
        def run_query(self, query, return_pandasDF=True):
            # spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
            # if return_pandasDF:
            #     return spark_df.toPandas()
            # else:
            #     return spark_df
            return self.split_query_and_run_individually(query, r'union all', return_pandasDF)
    
        def run_queries_and_union_all(self, queries, return_pandasDF=True):     
            dataframes = []
            for each_query in queries:
                try:
                    spark_df = self._reader.option('dbtable', f"({each_query}) as tbl").load()
                    dataframes.append(spark_df)
                except Exception as e:
                    # simply ignoring the query
                    print(f'Error while reading the query {each_query}')
                
            concat_sparkDf = reduce(DataFrame.unionAll, dataframes)
            if return_pandasDF:
                return concat_sparkDf.toPandas()
            else:
                return concat_sparkDf
    
        def split_query_and_run_individually(self, query, separator=r'union all', return_pandasDF=True):
            queries = re.split(separator, query, flags=re.IGNORECASE)
            return self.run_queries_and_union_all(queries, return_pandasDF)