I have working with graphframes and now I am using aggregate Message. The vertex schema is:
|-- id: long (nullable = false)
|-- company: string (nullable = true)
|-- money: integer (nullable = false)
|-- memoryLearned: map (nullable = true)
| |-- key: string
| |-- value: integer (valueContainsNull = false)
If I try it:
...
def createMessage(memory: org.apache.spark.sql.Column): org.apache.spark.sql.Column = {
memory + 10
}
...
val msgToSrc: org.apache.spark.sql.Column = this.createMessage(AM.dst("id"))
val aggregates = gx
.aggregateMessages
.sendToSrc(msgToSrc)
.agg(sum(AM.msg).as("aggMess"))
aggregates.show()
It works!, but I need to get keys and values from memoryLearned, so I think it works:
...
def createMessage(memory: org.apache.spark.sql.Column): org.apache.spark.sql.Column = {
for((k,v) <- memory)
...
}
...
val msgToSrc: org.apache.spark.sql.Column = this.createMessage(AM.dst("memoryLearned"))
val aggregates = gx
.aggregateMessages
.sendToSrc(msgToSrc)
.agg(myUDFA(AM.msg).as("aggMess"))
aggregates.show()
I got this error: "value filter is not a member of org.apache.spark.sql.Column"
I tried to search how to cast or get MapType, but i only find functions like explode using the dataframe, but I have not a df, I only have one column...
If I put this: memory.getItem("aKeyFromMap")
instead of for(...
, I get the correct value from Map...
Also I have tried to create "aux" DataFrame into createMessage
(one row and one column) for using df functions, but when I use .withColumn("newColumn",memory)
, it fails..
I am blocked.. any ideas?
A lot of thanks!! Regards
If you want to iterate over MapType
Column
, and you don't know the key up front, you have to use UDF
or other operation on the external type (like map
):
import org.apache.spark.sql.functions.udf
def createMessage = udf( (memory: Map[String, Integer]) => {
for( (k,v) <- memory )
...
} )
You get:
I got this error: "value filter is not a member of org.apache.spark.sql.Column"
because for comprehensions are syntactic sugar for map
/ flatMap
/ filter
.