I'm trying to find out if network traffic log files can be analyzed with Neo4j. Therefore, I am "tailing" 3 different log files from Bro IDS in real-time using "sh"-library and importing the log records into neo4j which seems to be very slow using py2neo. A CSV import won't work here, since it is real-time.
An example: I am analyzing a one-hour packet capture file using tcpreplay that has almost 4.000.000 connections. I even played it with half of the tempo. So after 2 hrs I had about 4.000.000 log entries.Right now, 3,5hrs after beginning of the analysis, I have just imported 289691 graphs consisting of 5 nodes and 4 relationships. All in all, about 15% of the data in almost double the time.
I am using py2neo and the code looks like following (this is one of the graphs):
def create_conn_graph(connlog):
[...]
## Start Session
graph = Graph(bolt=True, password="neo4j")
tx = graph.begin()
############
## Nodes ##
############
## Connection Node
conn = Node("Connection", uid=connlog['uid'],
ts=connlog['ts'],
date=evt_date,
time=evt_time,
[...])
conn_properties = dict(conn)
for key in conn_properties.keys():
if conn[key] == "-" or conn[key] == "(empty)":
conn[key] = "0"
conn.update()
tx.merge(conn, "Connection", "uid")
## IP Nodes
orig = Node("IP", ip=connlog['orig_h'])
tx.merge(orig)
resp = Node("IP", ip=connlog['resp_h'])
tx.merge(resp)
## History Node
if connlog['history']:
hist_flow = history_flow(connlog['history'])
history_node = Node("History", history=connlog['history'], flow=hist_flow)
tx.merge(history_node, "History", "history")
## (Connection)-[HAS_HISTORY]->(History)
conn_hist = Relationship(conn, "HAS_HISTORY", history_node)
tx.merge(conn_hist)
## Conn_State
conn_state = Node("Conn_State", state=connlog['conn_state'], meaning=CONN_STATE[connlog['conn_state']])
tx.merge(conn_state, "Conn_State", "conn_state")
tx.commit()
tx = graph.begin()
#####################
## Relationships ##
#####################
## (IP)-[STARTS_CONNECTION]->(Connection)
orig_conn = Relationship(orig, "STARTS_CONNECTION", conn, port=connlog['orig_p'])
tx.merge(orig_conn)
## (Connection)-[CONNECTS_TO]->(IP)
conn_resp = Relationship(conn, "CONNECTS_TO", resp, port=connlog['resp_p'])
tx.merge(conn_resp)
## (Connection)-[HAS_CONN_STATE]->(Conn_State)
conn_connstate = Relationship(conn, "HAS_CONN_STATE", conn_state)
tx.merge(conn_connstate)
tx.commit()
## (Connection)-[PRODUCED]-> (DNS|HTTP)
if connlog['service'] == "dns":
graph.run("MATCH (c:Connection {uid:{uid}}), (d:DNS {uid:{uid}}) \
MERGE (c)-[:PRODUCED]->(d)",
{"uid": connlog['uid']})
if connlog['service'] == "http":
graph.run("MATCH (c:Connection {uid:{uid}}), (d:HTTP {uid:{uid}}) \
MERGE (c)-[:PRODUCED]->(d)",
{"uid": connlog['uid']})
return True
## End of create_conn_graph ########################################
if __name__ == "__main__":
logentry = {}
logfield = CONNLOG
logline = []
for line in tail("-F", LOG_DIR, _iter=True, _bg=True):
entry = line.strip().split("\t")
if line.startswith('#'):
continue
for i in range(len(logfield)):
logentry[logfield[i]] = entry[i]
create_conn_graph(logentry)
I have the following constraints and indexes:
graph.run("CREATE CONSTRAINT ON (c:Connection) ASSERT c.uid IS UNIQUE")
graph.run("CREATE CONSTRAINT ON (i:IP) ASSERT i.ip IS UNIQUE")
graph.run("CREATE CONSTRAINT ON (c:Conn_State) ASSERT c.conn_state IS UNIQUE")
graph.run("CREATE CONSTRAINT ON (h:History) ASSERT h.history IS UNIQUE")
graph.run("CREATE CONSTRAINT ON (host:Host) ASSERT host.host is UNIQUE")
graph.run("CREATE CONSTRAINT ON (q:QueryType) ASSERT q.type is UNIQUE")
graph.run("CREATE CONSTRAINT ON (qc:QueryClass) ASSERT qc.class is UNIQUE")
graph.run("CREATE CONSTRAINT ON (rc:ResponseCode) ASSERT rc.code is UNIQUE")
graph.run("CREATE CONSTRAINT ON (ic:InfoCode) ASSERT ic.code is UNIQUE")
graph.run("CREATE CONSTRAINT ON (ua:UserAgent) ASSERT ua.useragent is UNIQUE")
graph.run("CREATE CONSTRAINT ON (m:Method) ASSERT m.method is UNIQUE")
graph.run("CREATE CONSTRAINT ON (r:Referrer) ASSERT r.referrer is UNIQUE")
graph.run("CREATE INDEX ON :DNS(uid)")
graph.run("CREATE INDEX ON :Uri(uri)")
graph.run("CREATE INDEX ON :HTTP(uid)")
Maybe someone can give me hint what I am doing wrong or where I made mistakes in the code? The amount of commits comes due to transient errors while trying to write to neo4j. With a higher number of transactions, I had no more errors.
Thanks in advance for any help
I'm not sure what py2neo is doing under the hood, from my experiences the python driver(s) are not the fastest ones.
I would probably go with plain Cypher statements where you have full control over what happens.
You also have some wrong/missing indexes, please check that all your queries / operations use an index. Those will otherwise result in full scans.
I also suggest that you send a bit more data per transaction (like 10k records)
It also might to make sense to do some pre-processing per log-batch, e.g. create the distinct ip-nodes upfront per log-segment and not with each log-line.
this might help you too: http://jexp.de/blog/2017/03/5-tips-tricks-for-fast-batched-updates-of-graph-structures-with-neo4j-and-cypher/