I have a long list of edge IDs (about 12 billion) that I am willing to remove from my Titan graph (which is hosted on an HBase backend).
How can I do it quickly and efficiently?
I tried removing the edges via Gremlin, but that is too slow for that amount of edges.
Is it possible to directly perform Delete commands on HBase? How can I do it? (How do I assemble the Key to delete?)
Thanks
After two days of research, I came up with a solution.
The main purpose - given a very large collection of string edgeIds
, implementing a logics which removes them from the graph -
The implementation has to support a removal of billions of edges, so it must be efficient in memory and time.
Direct usage of Titan is disqualified, since Titan performs a lot of unnecessary instantiations which are redundant -- generally, we don't want to load the edges, we just want to remove them from HBase.
/**
* Deletes the given edge IDs, by splitting it to chunks of 100,000
* @param edgeIds Collection of edge IDs to delete
* @throws IOException
*/
public static void deleteEdges(Iterator<String> edgeIds) throws IOException {
IDManager idManager = new IDManager(NumberUtil.getPowerOf2(GraphDatabaseConfiguration.CLUSTER_MAX_PARTITIONS.getDefaultValue()));
byte[] columnFamilyName = "e".getBytes(); // 'e' is your edgestore column-family name
long deletionTimestamp = System.currentTimeMillis();
int chunkSize = 100000; // Will contact HBase only once per 100,000 records two deletes (=> 50,000 edges, since each edge is removed one time as IN and one time as OUT)
org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();
config.set("hbase.zookeeper.quorum", "YOUR-ZOOKEEPER-HOSTNAME");
config.set("hbase.table", "YOUR-HBASE-TABLE");
List<Delete> deletions = Lists.newArrayListWithCapacity(chunkSize);
Connection connection = ConnectionFactory.createConnection(config);
Table table = connection.getTable(TableName.valueOf(config.get("hbase.table")));
Iterators.partition(edgeIds, chunkSize)
.forEachRemaining(edgeIdsChunk -> deleteEdgesChunk(edgeIdsChunk, deletions, table, idManager,
columnFamilyName, deletionTimestamp));
}
/**
* Given a collection of edge IDs, and a list of Delete object (that is cleared on entrance),
* creates two Delete objects for each edge (one for IN and one for OUT),
* and deletes it via the given Table instance
*/
public static void deleteEdgesChunk(List<String> edgeIds, List<Delete> deletions, Table table, IDManager idManager,
byte[] columnFamilyName, long deletionTimestamp) {
deletions.clear();
for (String edgeId : edgeIds)
{
RelationIdentifier identifier = RelationIdentifier.parse(edgeId);
deletions.add(createEdgeDelete(idManager, columnFamilyName, deletionTimestamp, identifier.getRelationId(),
identifier.getTypeId(), identifier.getInVertexId(), identifier.getOutVertexId(),
IDHandler.DirectionID.EDGE_IN_DIR);
deletions.add(createEdgeDelete(idManager, columnFamilyName, deletionTimestamp, identifier.getRelationId(),
identifier.getTypeId(), identifier.getOutVertexId(), identifier.getInVertexId(),
IDHandler.DirectionID.EDGE_OUT_DIR));
}
try {
table.delete(deletions);
}
catch (IOException e)
{
logger.error("Failed to delete a chunk due to inner exception: " + e);
}
}
/**
* Creates an HBase Delete object for a specific edge
* @return HBase Delete object to be used against HBase
*/
private static Delete createEdgeDelete(IDManager idManager, byte[] columnFamilyName, long deletionTimestamp,
long relationId, long typeId, long vertexId, long otherVertexId,
IDHandler.DirectionID directionID) {
byte[] vertexKey = idManager.getKey(vertexId).getBytes(0, 8); // Size of a long
byte[] edgeQualifier = makeQualifier(relationId, otherVertexId, directionID, typeId);
return new Delete(vertexKey)
.addColumn(columnFamilyName, edgeQualifier, deletionTimestamp);
}
/**
* Cell Qualifier for a specific edge
*/
private static byte[] makeQualifier(long relationId, long otherVertexId, IDHandler.DirectionID directionID, long typeId) {
WriteBuffer out = new WriteByteBuffer(32); // Default length of array is 32, feel free to increase
IDHandler.writeRelationType(out, typeId, directionID, false);
VariableLong.writePositiveBackward(out, otherVertexId);
VariableLong.writePositiveBackward(out, relationId);
return out.getStaticBuffer().getBytes(0, out.getPosition());
}
Keep in mind that I do not consider System Types and so -- I assume that the given edge IDs are user-edges.
Using this implementation I was able to remove 20 million edges in about 2 minutes.