Search code examples
postgresqlgenericscounttalenddayofweek

Count the current week records from multiple table in Postgresql and Talend


I have multiple tables in Postgresql. Say , A table has column A,column B,column C,refresh_date,fiscal week. B table has column D,column E,column B,fiscal week,refresh_date. I want to find the total count of records for the current week for table A and the total count of column E from table B for the current week. I am using Talend for data loading from table A and table B and those tables are in Postgresql. Also, if column E has values equal to zero for that current week then it should send a mail to myself. I want to create a generic code for this as this is for table A and table B, there will be multiple tables used similar to this. How to do this in Talend and Postgresql?


Solution

  • I've created something like your need a few months ago, and it basically executes a random query and parses the resultset and stores it denormalized in a database table. Note that I use enterprise Talend that has a neat feature called Dynamic Schema: https://help.talend.com/pages/viewpage.action?pageId=190513179

    So where should we start? My typical query looks like this:

    select pk1 as rcr_grby_pk1, pk2 as rcr_grpby_pk2, 
           count(*) as cnt, sum(amount) as sum_amount
    from mySchema.myTable
    group by pk1, pk2
    

    Obviously the Select query could be anything, can contain any number of columns. We execute it and store the results in a table that would look like this:

    --------------------------------------------------------
    |  schema  |  table  |  pk     |measure_name| value_n  |
    --------------------------------------------------------
    | mySchema | myTable | foo2015 | cnt        | 1234     |
    --------------------------------------------------------
    | mySchema | myTable | foo2015 | sum_amount | 987.65   |
    --------------------------------------------------------
    | mySchema | myTable | bar2014 | cnt        | 4321     |
    --------------------------------------------------------
    | mySchema | myTable | bar2014 | sum_amount | 567.89   |
    --------------------------------------------------------
    

    We've distinguished the 3 basic types: text, numbers, dates.

    I guess you can have an idea how to write and store these SQL queries, along with an ID that could be passed to talend and stored in the target table, so you can see what generated that result.

    So the processing piece. tFlowToIterate -> tJavaFlex -> tLogRow

    I've placed everything into a joblet because we are using this to reconcile data between different databases. (e.g. Oracle and Postgres) Joblet content:

    joblet content

    tJavaFlex has an output schema like this:

    tJavaFlex has an output schema like this:

    tJavaFlex content is like this:

    begin:

    Dynamic record = ((Dynamic)globalMap.get("input.line"));
    
    String group_by_columns = "";
    
    for(int i = 0 ; i < record.getColumnCount() ; i++) {
        DynamicMetadata meta = record.getColumnMetadata(i);
        if(meta.getDbName().toUpperCase().startsWith("RCR_GRPBY_") ){
            group_by_columns  += "" + record.getColumnValue(i);
        }
    }
    for(int i = 0 ; i < record.getColumnCount() ; i++) {
        DynamicMetadata meta = record.getColumnMetadata(i);
        if(false == meta.getDbName().toUpperCase().startsWith("RCR_GRPBY_") ){
    

    main:

    out.grp_id = context.grp_id;
    out.job_id = context.job_id;
    out.table_test_id = context.table_test_id;
    
    out.group_by_columns = group_by_columns;
    out.measure_column_name = meta.getDbName().toUpperCase();
    
    out.result_n = /* Float */ null;
    out.result_v = /* String */ null;
    out.result_d = /* Date */ null;
    
    if( (record.getColumnValue(i)!=null) && ( meta.getType().equals("id_String") ) ){
        out.result_v = String.valueOf(record.getColumnValue(i));
    } else if( (record.getColumnValue(i)!=null) && meta.getType().equals("id_Date") ) {
        System.out.println(String.valueOf(record.getColumnValue(i)));   
        out.result_d = (Date)record.getColumnValue(i);
    } else if( (record.getColumnValue(i)!=null) && ( meta.getType().equals("id_Integer") 
            || meta.getType().equals("id_Double") 
            || meta.getType().equals("id_Float")
            || meta.getType().equals("id_Long") )) {
        out.result_n = Float.valueOf(String.valueOf(record.getColumnValue(i)));
    } else if( (record.getColumnValue(i)!=null) && ( meta.getType().equals("id_BigDecimal") ) ) {
        out.result_n = new BigDecimal( String.valueOf(record.getColumnValue(i)) ).floatValue();
    } else {
        //Should not happen
        System.out.println("\n Unhandled type: " + meta.getType() );    
    }
    

    end:

        } // if
    } //for
    

    PS: I know Float is a bad choice to store numbers, but hadn't had time to rework it, and it still gives acceptable results.