i have spark app which insert data into titan with goblin. but it insert duplicate vertexes with same name. the test condition 'if not result:' not match, and i am in the same session.
def savePartition(p):
print ('savePartition', p)
from goblin import element, properties
class Brand(element.Vertex):
name = properties.Property(properties.String)
import asyncio
loop = asyncio.get_event_loop()
from goblin.app import Goblin
app = loop.run_until_complete(Goblin.open(loop))
app.register(Brand)
async def go(app):
session = await app.session()
for i in p:
if i['brand']:
traversal = session.traversal(Brand)
result = await traversal.has(Brand.name, i['brand']).oneOrNone()
if not result: # TODO: Remove Duplicates
print(i)
brand = Brand()
brand.name = i['brand']
session.add(brand)
session.flush()
await app.close()
loop.run_until_complete(go(app))
rdd = rdd.foreachPartition(savePartition)
how to fix it? thanks a lot.
I am not sure how this would work with Goblin but if you want Titan to prevent duplicates based on a vertex property you can just use Titan composite indices and specify that they must be unique. For example, you could do the following:
mgmt = graph.openManagement()
name = mgmt.makePropertyKey('name').dataType(String.class).make()
mgmt.buildIndex('byNameUnique', Vertex.class).addKey(name).unique().buildCompositeIndex()
mgmt.commit()
The above will specify that the name
property on vertices must be unique.