Multi-Dendrix Logo

Source code for network_permutation_test

#!/usr/bin/python

# Load required modules
import sys, os, networkx as nx
from itertools import combinations, product

def parse_args(input_list=None):
    # Parse arguments
    import argparse
    class Args: pass
    args = Args()
    description = 'Calculates network permutation test for given pathway set '\
                  'on the given network.'
    parser = argparse.ArgumentParser(description=description)
    parser.add_argument('-n', '--network_edgelist', required=True,
                        help='PPI edgelist location.')
    parser.add_argument('-i', '--permuted_networks_dir', required=True,
                        help='Directory of permuted networks.')
    parser.add_argument('-p', '--pathway_set_file', required=True,
                        help='File containing input pathway set.')
    parser.add_argument('-o', '--output_file', default=None,
                        help='Name of output file.')
    parser.add_argument('-d', '--distance', default=False, action='store_true',
                        help='Flag average pairwise distance test.')
    parser.add_argument('-v', '--verbose', default=False, action='store_true',
                        help='Flag verbose mode.')

    # If called from the command line, parse command line args.
    if input_list: parser.parse_args(input_list, namespace=args)
    else: parser.parse_args(namespace=args)
    
    return args

[docs]def load_pathway_set(pathway_set_file): return [ l.rstrip().split("\t")[1:] for l in open(pathway_set_file) ]
[docs]def load_network(network_edgelist): return nx.read_edgelist(network_edgelist)
[docs]def remove_name_annotation(genes): return map(lambda g: g.replace("(A)", "").replace("(D)", ""), genes)
[docs]def interact(network, g1, g2): '''Returns true if g1 interacts with g2 in PPI''' try: return g2 in nx.neighbors(network, g1) except nx.exception.NetworkXError: return False
def combos(xs, n): return list(combinations(xs, n)) def prod(xs, ys): return list(product(xs, ys))
[docs]def pairs_within(pathways): '''Given a set of pathways, returns all {n \choose 2} pairs from within EACH pathway''' return reduce(lambda pairs, p: pairs + combos(p, 2), pathways, [])
[docs]def pairs_between(pathways): '''Given a set of pathways, returns all pairs from each distinct pair of pathways''' pairs_of_pathways = combos(pathways, 2) return reduce(lambda pairs, (p1, p2): pairs + prod(p1, p2), pairs_of_pathways, [])
[docs]def count_interactions(network, pairs): '''Given a PPI and a list of gene pairs, returns the number of genes that interact''' return sum( [int(interact(network, g1, g2)) for g1, g2 in pairs])
[docs]def num_interactions_in_pathway(network, pathway): best_count = 0 pathway = list( set( pathway ) ) k = len(pathway) pairs = [(pathway[i], pathway[j]) for i in range(k) for j in range(i+1, k)] return count_interactions(network, pairs)
[docs]def num_interaction_statistic(network, pathways): within_pairs = pairs_within(pathways) interactions_within = count_interactions(network, within_pairs) nu_within = float(interactions_within) / float(len(within_pairs)) between_pairs = pairs_between(pathways) interactions_between = count_interactions(network, between_pairs) nu_between = float( interactions_between ) / float(len(between_pairs)) return nu_within - nu_between
[docs]def num_interactions_network_test(pathways, G, Hs): # Remove name annotation for genes in the pathways pathways = [ list(set(remove_name_annotation(P))) for P in pathways ] # Calculate the number of interactions statistic in the original network # and the permuted networks. Higher statistics indicate stronger # pathway sets. stat = num_interaction_statistic(G, pathways) permuted_stats = [ num_interaction_statistic(H, pathways) for H in Hs ] count = len( [s for s in permuted_stats if s >= stat ] ) pval = float( count ) / float( len( Hs ) ) return stat, pval
[docs]def avg_pair_dist_of_pathway(network, pathway): min_dist = 1e100 pathway = list( set( pathway ) ) k = len(pathway) pairs = [(pathway[i], pathway[j]) for i in range(k) for j in range(i+1, k)] return float( sum_dist(network, pairs) ) / float( len(pairs ) )
[docs]def sum_dist(network, pairs): '''Given a PPI and a list of gene pairs, returns the sum of the shortest paths between each pair.''' return sum( [ dist(network, g1, g2) for g1, g2 in pairs ] )
[docs]def dist(network, g1, g2): '''Returns the length of the shortest path between g1 and g2 in PPI. If no path exists, returns 1e100''' try: return len(nx.shortest_path(network, g1, g2))-1 except (nx.NetworkXNoPath, nx.NetworkXError): return 1e100
[docs]def avg_pair_dist_ratio(network, pathways): # Ensure all genes are in the network if not all([ g in network.nodes() for P in pathways for g in P ]): return 1e100 # Enumerate pairs of genes within the same and between pathways within, between = pairs_within(pathways), pairs_between(pathways) within_dists = [ dist(network, g1, g2) for g1, g2 in within ] between_dists = [ dist(network, g1, g2) for g1, g2 in between ] # Ensure all genes in the same pathway are in the same component if not all([ d != 1e100 for d in within_dists ]): return 1e100 # Check if any genes in different pathways are not in the same component if not all([ d != 1e100 for d in between_dists ]): return 0 # If all checks out, return the ratio d_within = float(sum(within_dists)) / float(len(within)) d_between = float(sum(between_dists)) / float(len(between)) return d_within / d_between
[docs]def avg_pair_dist_network_test(pathways, G, Hs): # Remove name annotation for genes in the pathways pathways = [ list(set(remove_name_annotation(P))) for P in pathways ] # Calculate the average pairwise distance ratio in the original network # and the permuted networks. Lower ratios indicate stronger pathway sets. ratio = avg_pair_dist_ratio(G, pathways) permuted_ratios = [ avg_pair_dist_ratio(H, pathways) for H in Hs ] count = len( [r for r in permuted_ratios if r <= ratio ] ) pval = float( count ) / float( len( Hs ) ) return ratio, pval
[docs]def eval_pathways_by_dist(pathways, G, Hs): stats, pvals = [], [] for p in pathways: avg_pair_dist = avg_pair_dist_of_pathway(G, p) permuted_dists = [ avg_pair_dist_of_pathway(H, p) for H in Hs ] extreme = [ n for n in permuted_dists if n <= avg_pair_dist] pval = float( len(extreme) ) / float( len(Hs) ) stats.append( avg_pair_dist ) pvals.append( pval ) return zip(pathways, stats, pvals)
[docs]def eval_pathways_by_interactions(pathways, G, Hs): stats, pvals = [], [] for p in pathways: num_interactions = num_interactions_in_pathway(G, p) permuted_counts = [ num_interactions_in_pathway(H, p) for H in Hs ] extreme = [ n for n in permuted_counts if n >= num_interactions] pval = float( len(extreme) ) / float( len(Hs) ) stats.append( num_interactions ) pvals.append( pval ) return zip(pathways, stats, pvals)
[docs]def evaluate_pathway_set(pathways, G, Hs, distance=False): # Calculate test statistic and p-value if distance: test_name = 'Average_Pairwise_Distance_Ratio' statistic, pval = avg_pair_dist_network_test(pathways, G, Hs) pathway_results = eval_pathways_by_dist(pathways, G, Hs) else: test_name = 'Number_of_Interactions_Statistic' statistic, pval = num_interactions_network_test(pathways, G, Hs) pathway_results = eval_pathways_by_interactions(pathways, G, Hs) return test_name, statistic, pval, pathway_results
[docs]def run(args): # Load input if args.verbose: print 'Loading networks...' pathways = load_pathway_set(args.pathway_set_file) G = load_network(args.network_edgelist) Hs = [ load_network(args.permuted_networks_dir + "/" + fh) for fh in os.listdir(args.permuted_networks_dir) ] # Evaluate the pathway set if args.verbose: print 'Evaluating input pathway set...' results = evaluate_pathway_set(pathways, G, Hs, args.distance) test_name, statistic, pval, pathway_results = results # Output and return results if args.output_file: header = 'Testing\t' + test_name + '\tP-Value\n' output = [ '\t'.join(['Pathway set', str(statistic), str(pval)]) ] for pathway, stat, p in pathway_results: output.append( '\t'.join([', '.join(pathway), str(stat), str(p)]) ) open(args.output_file, 'w').write(header + '\n'.join(output)) if args.verbose: print 'Pathway set test' print '\t' + test_name + ':', statistic print '\tP-value:', pval print '\nIndividual pathways test' for p, stat, pval in pathway_results: print '\tGenes:', ', '.join(p) print '\tStatistic:', stat print '\tP-value:', pval, '\n' return statistic, pval, pathway_results
if __name__ == "__main__": run(parse_args())