Search code examples
javascalaapache-sparkapache-spark-sqlhive-metastore

How to use ExternalCatalog.listPartitions() with Java


I'm new in Java. I want to drop partition in hiveTable. I want to use SparkSession.ExternalCatalog().listPartitions and SparkSession.ExternalCatalog().dropPartitions.

I saw this methods on scala How to truncate data and drop all partitions from a Hive table using Spark But I can't understand how to run it on Java. It's a part of etl process and I want to understand how to deal with it on Java.

My code failed because of the misunderstanding how to manipulate with datatypes and convert it to java. What type of object need and how to understand what data return API.

Example of my code:

ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
ec.listPartitions("need_schema", "need_table");

And it fails because of the:

method listPartitions in class org.apache.spark.sql.catalog.ExternalCatalog cannot be applied to given types.

I can't beat it because of less information about api (https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-ExternalCatalog.html#listPartitions) and java knowledges and because the all examples I find wrote on scala. Finally I need to convert this code that works on scala to java:

def dropPartitions(spark:SparkSession, shema:String, table:String, need_date:String):Unit = {
        val cat = spark.sharedState.externalCatalog
        val partit = cat.ListPartitions(shema,table).map(_.spec).map(t => t.get("partition_field")).flatten
        val filteredPartit = partita.filter(_<dt).map(x => Map("partition_field" -> x))
        cat.dropPartitions(
    shema
    ,table
    ,filteredPartitions
    ,ignoreIfNotExists=true
    ,purge=false
    ,retainData=false
    }

Please, if you know how deal with it can you help in this things:

  1. some example of code in java to write my own container to manipulate data from externalCatalog
  2. what data structure use in this api and some theoretical source which can help me to understand how they are usable with java
  3. what's mean in scala code string: cat.ListPartitions(shema,table).map(_.spec).map(t => t.get("partition_field")).flatten? tnx

UPDATING Thank you very much for your feedback @jpg. I'll try. I have big etl task and goal of it to writing into dynamic partitioned table data once a week. Business rules of making this datamart: (sysdate - 90 days). And because of that I want to drop arrays of partition (by days) in target table in public access schema. And I have read that the right way of drop partition - using externalCatalog. I should use java because of the historical tradition this project) and try to understand how to do this most efficiently. Some of methods of externalCatalog I can return into terminal through System.out.println(): externalCatalog.tableExists(), externalCatalog.listTables() and methods of externalCatalog.getTable. But I don't understand how to deal with externalCatalog.listPartitions.

UPDATING ONE MORE TIME Hello everyone. I have one step forward in my task: Now I can return in terminal buffer of list partitions:

ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
ec.listPartitions("schema", "table", Option.empty()); // work! null or miss parameter fail program
Seq<CatalogTablePartition> ctp = ec.listPartitions("schema", "table", Option.empty());
List<CatalogTablePartition> catalogTablePartitions = JavaConverters.seqAsJavaListConverter(ctp).asJava();
for CatalogTablePartition catalogTablePartition: catalogTablePartitions) {
    System.out.println(catalogTablePartition.toLinkedHashMap().get("Partition Values"));//retutn me value of partition like "Some([validation_date=2021-07-01])"
)

But this is another problem. I can return values in this api in method ec.dropPartitions like Java List. It's want in 3d parameter Seq<Map<String, String>> structure. Also I can't filtered partition in this case - in my dreams I want filtered the values of partition less by date parameter and then drop it. If anyone know how to wrote map method with this api to return it like in my scala example please help me.


Solution

  • I solved it by myself. Maybe it'll help someone.

    public static void partitionDeleteLessDate(String db_name, String table_name, String date_less_delete) {
        ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
        Seq<CatalogTablePartition> ctp = ec.listPartitions(db_name, table_name, Option.empty());
        List<CatalogTablePartition> catalogTablePartitions = JavaConverters.seqAsJavaListConverter(ctp).asJava();
        List<Map<String, String>> allPartList = catalogTablePartitions.stream.
            .map(s -> s.spec.seq())
            .collect(Collectors.toList());
        List<String> datePartDel = 
            allPartList.stream()
                .map(x -> x.get("partition_name").get())
                .sorted()
                .collect(Collectors.toList());
        String lessThisDateDelete = date_less_delete;
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
        LocalDate date = LocalDate.parse(lessThisDateDelete, formatter);
        
        List<String> filteredDates = datePartDel.stream()
            .map(s -> LocalDate.parse(s, formatter))
            .filter(d -> d.isBefore(date))
            .map(s -> s.toString())
            .collect(Collectors.toList());
        
        for (String seeDate : filteredDates)) {
            List<Map<String, String>> elem = allPartList.stream()
                .filter(x -> x.get("partition_name").get().equals(seeDate))
                .collect(Collectors.toList());
            Seq<Map<String, String>> seqElem = JavaConverters.asScalaIteratorConverter(elem.iterator()).asScala.toSeq();
            ec.dropPartitions(
            db_name
            , table_name
            , seqElem
            , true
            , false
            , false
            );
        } 
    }