Search code examples
rapache-sparkelasticsearchapache-zeppelinsparkr

How to read and write to ElasticSearch with SparkR?


Begineer SparkR and ElasticSearch question here!

How do I write a sparkR dataframe or RDD to ElasticSearch with multiple nodes?

There exists a specific R package for elastic but says nothing about hadoop or distributed dataframes. When I try to use it I get the following errors:

install.packages("elastic", repos = "http://cran.us.r-project.org")
library(elastic)
df <- read.json('/hadoop/file/location')
connect(es_port = 9200, es_host = 'https://hostname.dev.company.com', es_user = 'username', es_pwd = 'password')
docs_bulk(df)

Error: no 'docs_bulk' method for class SparkDataFrame

If this were pyspark, I would use the rdd.saveAsNewAPIHadoopFile() function as shown here, but I can't find any information about it in sparkR from googling. ElasticSearch also has good documentation, but only for Scala and Java

I'm sure there is something obvious I am missing; any guidance appreciated!


Solution

  • to connect your SparkR session to Elasticsearch you need to make the connector jar and your ES configuration available to your SparkR session.

    1: specifiy the jar (look up which version you need in the elasticsearch documentation; the below version is for spark 2.x, scala 2.11 and ES 6.8.0)

    sparkPackages <- "org.elasticsearch:elasticsearch-spark-20_2.11:6.8.0"
    

    2: specify your cluster config in your SparkConfig. You can add other Elasticsearch config here, too (and, of course, additional spark configs)

    sparkConfig <- list(es.nodes = "your_comma-separated_es_nodes",
                        es.port = "9200")
    
    1. initiate a sparkR session
    sparkR.session(master="your_spark_master", 
                   sparkPackages=sparkPackages, 
                   sparkConfig=sparkConfig)
    
    
    1. do some magic that results in a sparkDataframe you want to save to ES

    2. write your dataframe to ES:

    write.df(yourSparkDF, source="org.elasticsearch.spark.sql",
                     path= "your_ES_index_path"
             )