Search code examples
pythonpandashiveairflowgoogle-cloud-dataproc

How to extract the query result from a Hive job output logs using DataprocHiveOperator?


I am trying to build a data migration pipeline using Airflow, source being a Hive table on a Dataproc cluster and the destination is BigQuery. I'm using DataprocHiveOperator to get the schema as well as the data from source. This operator uses Dataproc REST API's internally to submit and execute the job on a Dataproc cluster we specify. The output would be written to a file Google cloud storage as part of job logs. I need only the result of the query from these logs.

As of now, I have modified the gcp_dataproc_hook.py code to return the output to the calling method by downloading the contents of the output file as string with the help of driverOutputResourceUri parameter. The return type of this output is a Pandas data frame (which can be changed to any other type as per our convenience). But this includes complete logs. I have to extract the query result from it.

Here is the code snippet I added in gcp_dataproc_hook.py to return the output logs of a submitted query:

    #download the output
    def getOutput(self,project, output_bucket,output_path):
        client = storage.Client(project=self.project_id)
        bucket = client.get_bucket(output_bucket)
        output_blob = ('/'.join(output_path)+"."+"000000000")
        return bucket.blob(output_blob).download_as_string()

    #get logs including query output
    def getQueryResult(self):
        result=self.job_ouput
        output = self.getOutput(result['reference']['projectId'],result['driverOutputResourceUri'].split('/')[2],result['driverOutputResourceUri'].split('/')[3:])
        df = pd.read_csv(io.BytesIO(output), sep='\n|', nrows=500000, engine='python')
        return df

Here is a sample query I'm trying to execute:

SHOW CREATE TABLE my_tbl;

The output log looks like this:

Connecting to jdbc:hive2://prod-metastore-test-cluster1-m:10000
0            Connected to: Apache Hive (version 2.3.5)             
1                    Driver: Hive JDBC (version 2.3.5)             
2    Transaction isolation: TRANSACTION_REPEATABLE_...             
3    . . . . . . . . . . . . . . . . . . . . . . .>...             
4    |                   createtab_stmt            ...             
5    +---------------------------------------------...             
6    | CREATE TABLE `my_tbl`(       ...             
7    |   `col1` string,            ...             
8    |   `col2` bigint,                   ...             
9    |   `col3` string,                 ...                         
..                                                 ...                         
141  |   `coln` string)                 ...             
142  | ROW FORMAT SERDE                            ...             
143  |   'org.apache.hadoop.hive.ql.io.orc.OrcSerde...             
144  | STORED AS INPUTFORMAT                       ...             
145  |   'org.apache.hadoop.hive.ql.io.orc.OrcInput...             
146  | OUTPUTFORMAT                                ...             
147  |   'org.apache.hadoop.hive.ql.io.orc.OrcOutpu...             
148  | LOCATION                                    ...             
149  |   'gs://my_hive_data_bucket/tmp/base_table/my_tbl...             
150  | TBLPROPERTIES (                             ...             
151  |   'transient_lastDdlTime'='1566842329')     ...             
152  +---------------------------------------------...             
153                  143 rows selected (0.154 seconds)             
154               Beeline version 2.3.5 by Apache Hive             
155  Closing: 0: jdbc:hive2://prod-metastore-test-c... 

The expected output should be like this:

CREATE TABLE `my_tbl`(
  `col1` string,
  `col2` bigint,
  `col3` string,
  ..
  `coln` string,  
)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
  'gs://my_hive_data_bucket/tmp/base_table/my_tbl'
TBLPROPERTIES (
  'transient_lastDdlTime'='1566842329')

Please suggest me a way I could get close to a solution.


Solution

  • In Dataproc, Hive queries use Beeline instead of the deprecated Hive CLI, which is why the formatting is different by default. Beeline typically will format human-readable output in the fancy border format instead of something more easily parseable.

    Fortunately, there are beeline options that can make the format pretty close to what the old Hive CLI did. You can simply create an initialization action that you add to the options when creating the Dataproc cluster specifying init_actions_uris in your Airflow operator. Create a file with the following contents:

    #!/bin/bash
    
    sed -i 's/beeline/beeline --outputformat=tsv2 --silent=true/' /usr/bin/beeline
    

    And upload that file to GCS, like gs://some-gcs-bucket/beeline-legacyfmt.sh and set that GCS URI as an init action for Dataproc clusters. That will apply the command-line options needed to beeline by default. Then, any Dataproc Hive jobs you send will now output in "tsv2" and "silent" mode meaning no extraneous log statements, and output will be raw tsv.