Search code examples
python-3.xnetworkxdirected-acyclic-graphslongest-path

NetworkX: Find longest path in DAG returning all ties for max


I'm having trouble figuring out how to update the networkx dag_find_longest_path() algorithm to return "N" for ties instead of returning the first max edge found, or returning a list of all edges that are tied for max weight.

I first created a DAG from pandas dataframe that contains an edgelist like the following subset:

edge1        edge2          weight
115252161:T  115252162:A     1.0
115252162:A  115252163:G     1.0
115252163:G  115252164:C     3.0
115252164:C  115252165:A     5.5
115252165:A  115252166:C     5.5
115252162:T  115252163:G     1.0
115252166:C  115252167:A     7.5
115252167:A  115252168:A     7.5
115252168:A  115252169:A     6.5
115252165:A  115252166:G     0.5

Then I use the following code to topologically sort the graph and then find the longest path according to the weights of the edges:

 G = nx.from_pandas_edgelist(edge_df, source="edge1", 
                                target="edge2", 
                                edge_attr=['weight'], 
                                create_using=nx.OrderedDiGraph())

longest_path = pd.DataFrame(nx.dag_longest_path(G))

This works great, except when there are ties for max weighted edge, it returns the first max edge found, and instead I would like it to just return an "N" representing "Null". So currently, the output is:

115252161   T
115252162   A
115252163   G
115252164   C
115252165   A
115252166   C

But what I actually need is:

115252161   T
115252162   N (or [A,T] )
115252163   G
115252164   C
115252165   A
115252166   C

The algorithm for finding the longest path is:

def dag_longest_path(G):

    dist = {}  # stores [node, distance] pair
    for node in nx.topological_sort(G):
        # pairs of dist,node for all incoming edges
        pairs = [(dist[v][0] + 1, v) for v in G.pred[node]]
        if pairs:
            dist[node] = max(pairs)
        else:
            dist[node] = (0, node)
    node, (length, _) = max(dist.items(), key=lambda x: x[1])
    path = []
    while length > 0:
        path.append(node)
        length, node = dist[node]
    return list(reversed(path))

Copy-pastable definition of G.

import pandas as pd
import networkx as nx
import numpy as np

edge_df = pd.read_csv(
    pd.compat.StringIO(
        """edge1        edge2          weight
115252161:T  115252162:A     1.0
115252162:A  115252163:G     1.0
115252163:G  115252164:C     3.0
115252164:C  115252165:A     5.5
115252165:A  115252166:C     5.5
115252162:T  115252163:G     1.0
115252166:C  115252167:A     7.5
115252167:A  115252168:A     7.5
115252168:A  115252169:A     6.5
115252165:A  115252166:G     0.5"""
    ),
    sep=r" +",
)


G = nx.from_pandas_edgelist(
    edge_df,
    source="edge1",
    target="edge2",
    edge_attr=["weight"],
    create_using=nx.OrderedDiGraph(),
)

longest_path = pd.DataFrame(nx.dag_longest_path(G))

Solution

  • I ended up just modeling the behavior in a defaultdict counter object.

    from collections import defaultdict, Counter
    

    I modified my edgelist to a tuple of (position, nucleotide, weight):

    test = [(112,"A",23.0), (113, "T", 27), (112, "T", 12.0), (113, "A", 27), (112,"A", 1.0)]
    

    Then used defaultdict(counter) to quickly sum occurences of each nucleotide at each position:

    nucs = defaultdict(Counter)
    
    for key, nuc, weight in test:
        nucs[key][nuc] += weight
    

    And then looped through the dictionary to pull out all nucleotides that equal the max value:

    for key, nuc in nucs.items():
        seq_list = []
        max_nuc = []
        max_val = max(nuc.values())
        for x, y in nuc.items():
            if y == max_val:
                max_nuc.append(x)
    
        if len(max_nuc) != 1:
            max_nuc = "N"
        else:
            max_nuc = ''.join(max_nuc)
    
        seq_list.append(max_nuc)
        sequence = ''.join(seq_list)
    

    This returns the final sequence for the nucleotide with the max value found, and returns N in the position of a tie:

    TNGCACAAATGCTGAAAGCTGTACCATANCTGTCTGGTCTTGGCTGAGGTTTCAATGAATGGAATCCCGTAACTCTTGGCCAGTTCGTGGGCTTGTTTTGTATCAACTGTCCTTGTTGGCAAATCACACTTGTTTCCCACTAGCACCAT
    

    However, the question bothered me, so I ended up using node attributes in networkx as a means to flag each node as being a tie or not. Now, when a node is returned in the longest path, I can then check the "tie" attribute and replace the node name with "N" if it has been flagged:

    def get_path(self, edge_df):
    
        G = nx.from_pandas_edgelist(
            edge_df,
            source="edge1",
            target="edge2",
            edge_attr=["weight"],
            create_using=nx.OrderedDiGraph()
        )
    
        # flag all nodes as not having a tie
        nx.set_node_attributes(G, "tie", False)
    
        # check nodes for ties
        for node in G.nodes:
    
            # create list of all edges for each node
            edges = G.in_edges(node, data=True)
    
            # if there are multiple edges
            if len(edges) > 1:
    
                # find max weight
                max_weight = max([edge[2]['weight'] for edge in edges])
    
                tie_check = []
                for edge in edges:
                    # pull out all edges that match max weight
                    if edge[2]["weight"] == max_weight:
                        tie_check.append(edge)
    
                # check if there are ties       
                if len(tie_check) > 1:
                    for x in tie_check:
    
                        # flag node as being a tie
                        G.node[x[0]]["tie"] = True
    
        # return longest path
        longest_path = nx.dag_longest_path(G)     
    
        return longest_path