Source code for mgtoolkit.library
from properties import resources
from exception import MetagraphException
from numpy import matrix
import copy
import math
[docs]def singleton(cls):
"""A helper function to ease implementing singletons.
This should be used as a decorator to the
class that should be a singleton.
:param cls: class that should be a singleton
:return: singleton instance of the class
"""
instances = {}
def get_instance():
if cls not in instances:
instances[cls] = cls()
return instances[cls]
return get_instance
[docs]class Triple(object):
""" Captures a set of co-inputs, co-outputs and edges between two metagraph elements.
"""
def __init__(self, coinputs, cooutputs, edges):
if edges is None:
raise MetagraphException('edges', resources['value_null'])
self.coinputs = coinputs
self.cooutputs = cooutputs
self.edges = edges
[docs] def coinputs(self):
""" The co-inputs of the Triple object
:return: set
"""
return self.coinputs
[docs] def cooutputs(self):
""" The co-outputs of the Triple object
:return: set
"""
return self.cooutputs
def __repr__(self):
if isinstance(self.edges, list):
edge_desc = [repr(edge) for edge in self.edges]
else:
edge_desc = [repr(self.edges)]
full_desc = ''
for desc in edge_desc:
if full_desc == '':
full_desc = desc
else:
full_desc += ', ' + desc
return 'Triple(%s, %s, %s)' % (self.coinputs, self.cooutputs, full_desc)
def __eq__(self, other):
if other is None:
return False
if not isinstance(other,Triple):
return False
return (self.coinputs == other.coinputs and
self.cooutputs == other.cooutputs and
len(self.edges) == len(other.edges) and
self.edges == other.edges)
[docs]class Node(object):
""" Represents a metagraph node.
"""
def __init__(self, element_set):
if element_set is None or len(element_set) == 0:
raise MetagraphException('element_set', resources['value_null'])
if not isinstance(element_set, set):
raise MetagraphException('element_set', resources['format_invalid'])
self.element_set = element_set
[docs] def get_element_set(self):
""" Returns the node elements
:return: set
"""
return self.element_set
def __repr__(self):
return 'Node(%s)' % self.element_set
[docs]class Edge(object):
""" Represents a metagraph edge.
"""
def __init__(self, invertex, outvertex, attributes=None, label=None):
if invertex is None or len(invertex) == 0:
raise MetagraphException('invertex', resources['value_null'])
if outvertex is None or len(outvertex) == 0:
raise MetagraphException('outvertex', resources['value_null'])
if not isinstance(invertex, set):
raise MetagraphException('invertex', resources['format_invalid'])
if not isinstance(outvertex, set):
raise MetagraphException('outvertex', resources['format_invalid'])
self.invertex = invertex
self.outvertex = outvertex
self.attributes = attributes
self.label = label
# include attributes as part if invertex
if attributes is not None:
invertex = list(self.invertex)
for attribute in attributes:
if attribute not in invertex:
invertex.append(attribute)
self.invertex = set(invertex)
def __repr__(self):
return 'Edge(%s, %s)' % (self.invertex, self.outvertex)
[docs] def invertex(self):
""" Returns the invertex of the edge.
:return: set
"""
return self.invertex
[docs] def outvertex(self):
""" Returns the outvertex of the edge.
:return: set
"""
return self.outvertex
def __eq__(self, other):
if other is None:
return False
if not isinstance(other, Edge):
return False
return (self.invertex == other.invertex and
self.outvertex == other.outvertex and
self.attributes == other.attributes)
[docs]class Metapath(object):
""" Represents a metapath between a source and a target node in a metagraph.
"""
def __init__(self, source, target, edge_list):
if source is None or len(source) == 0:
raise MetagraphException('source', resources['value_null'])
if target is None or len(target) == 0:
raise MetagraphException('target', resources['value_null'])
self.source = source
self.target = target
self.edge_list = edge_list
[docs] def source(self):
""" Returns the source of the metapath.
:return: set
"""
return self.source
[docs] def target(self):
""" Returns the target of the metapath.
:return: set
"""
return self.target
[docs] def edge_list(self):
""" Returns the list of edges of the metapath.
:return: set
"""
return self.edge_list
def __repr__(self):
edge_desc = [repr(edge) for edge in self.edge_list]
full_desc = 'source: %s, target: %s' % (self.source, self.target)
for desc in edge_desc:
if full_desc == '':
full_desc = desc
else:
full_desc += ", " + desc
return 'Metapath({ %s })' % full_desc
[docs] def dominates(self, metapath):
"""Checks whether current metapath dominates that provided.
:param metapath: Metapath object
:return: boolean
"""
if metapath is None:
raise MetagraphException('metapath', resources['value_null'])
input1 = self.source # B
input2 = metapath.source # B'
output1 = self.target # C
output2 = metapath.target # C'
if input1.issubset(input2) and output2.issubset(output1): # B <= B', C' <= C
return True
return False
[docs]class Metagraph(object):
""" Represents a metagraph.
"""
def __init__(self, generator_set):
if generator_set is None or len(generator_set) == 0:
raise MetagraphException('generator_set', resources['value_null'])
if not isinstance(generator_set, set):
raise MetagraphException('generator_set', resources['format_invalid'])
self.nodes = []
self.edges = []
self.generating_set = generator_set
self.a_star = None
[docs] def add_node(self, node):
""" Adds a node to the metagraph.
:param node: Node object
:return: None
"""
if not isinstance(node, Node):
raise MetagraphException('node', resources['format_invalid'])
# nodes cant be null or empty
if node is None:
raise MetagraphException('node', resources['value_null'])
# each element in node must be in the generating set
not_found = [element not in self.generating_set for element in node.element_set]
if True in not_found:
raise MetagraphException('node', resources['range_invalid'])
if not MetagraphHelper().is_node_in_list(node, self.nodes):
self.nodes.append(node)
[docs] def remove_node(self, node):
""" Removes a specified node from the metagraph.
:param node: Node object
:return: None
"""
if not isinstance(node, Node):
raise MetagraphException('node', resources['format_invalid'])
# nodes cant be null or empty
if node is None:
raise MetagraphException('node', resources['value_null'])
if not MetagraphHelper().is_node_in_list(node, self.nodes):
raise MetagraphException('node', resources['value_not_found'])
self.nodes.remove(node)
[docs] def add_nodes_from(self, nodes_list):
""" Adds nodes from the given list to the metagraph.
:param nodes_list: list of Node objects
:return: None
"""
if nodes_list is None or len(nodes_list) == 0:
raise MetagraphException('nodes_list', resources['value_null'])
for node in nodes_list:
if not isinstance(node, Node):
raise MetagraphException('nodes_list', resources['format_invalid'])
for node in nodes_list:
if not MetagraphHelper().is_node_in_list(node, self.nodes):
self.nodes.append(node)
[docs] def remove_nodes_from(self, nodes_list):
""" Removes nodes from the given list from the metagraph.
:param nodes_list: list of Node objects
:return: None
"""
if nodes_list is None or len(nodes_list) == 0:
raise MetagraphException('nodes_list', resources['value_null'])
for node in nodes_list:
if not isinstance(node, set):
raise MetagraphException('nodes_list', resources['format_invalid'])
if not MetagraphHelper().is_node_in_list(node, self.nodes):
raise MetagraphException('nodes_list', resources['value_not_found'])
for node in nodes_list:
self.nodes.remove(node)
[docs] def add_edge(self, edge):
""" Adds the given edge to the metagraph.
:param edge: Edge object
:return: None
"""
if not isinstance(edge, Edge):
raise MetagraphException('edge', resources['format_invalid'])
# add to list of nodes first
node1 = Node(edge.invertex)
node2 = Node(edge.outvertex)
if not MetagraphHelper().is_node_in_list(node1, self.nodes):
self.nodes.append(node1)
if not MetagraphHelper().is_node_in_list(node2, self.nodes):
self.nodes.append(node2)
#..then edges
if not MetagraphHelper().is_edge_in_list(edge, self.edges):
self.edges.append(edge)
[docs] def remove_edge(self, edge):
""" Removes the given edge from the metagraph.
:param edge: Edge object
:return:None
"""
if not isinstance(edge, Edge):
raise MetagraphException('edge', resources['format_invalid'])
# remove edge
if edge in self.edges:
self.edges.remove(edge)
[docs] def add_edges_from(self, edge_list):
""" Adds the given list of edges to the metagraph.
:param edge_list: list of Edge objects
:return: None
"""
if edge_list is None or len(edge_list) == 0:
raise MetagraphException('edge_list', resources['value_null'])
for edge in edge_list:
if not isinstance(edge, Edge):
raise MetagraphException('edge', resources['format_invalid'])
for edge in edge_list:
node1 = Node(edge.invertex)
node2 = Node(edge.outvertex)
if not MetagraphHelper().is_node_in_list(node1, self.nodes):
self.nodes.append(node1)
if not MetagraphHelper().is_node_in_list(node2, self.nodes):
self.nodes.append(node2)
if edge not in self.edges:
self.edges.append(edge)
[docs] def remove_edges_from(self, edge_list):
""" Removes edges from the given list from the metagraph.
:param edge_list: list of Edge objects
:return: None
"""
if edge_list is None or len(edge_list) == 0:
raise MetagraphException('edge_list', resources['value_null'])
for edge in edge_list:
if not isinstance(edge, Edge):
raise MetagraphException('edge', resources['format_invalid'])
for edge in edge_list:
if MetagraphHelper().is_edge_in_list(edge, self.edges):
self.edges.remove(edge)
[docs] def nodes(self):
""" Returns a list of metagraph nodes.
:return: list of Node objects
"""
return self.nodes
[docs] def edges(self):
""" Returns a list of metagraph edges.
:return: list of Edge objects.
"""
return self.edges
[docs] def get_edges(self, invertex, outvertex):
""" Retrieves all edges between a given invertex and outvertex.
:param invertex: set
:param outvertex: set
:return: list of Edge objects.
"""
if invertex is None:
raise MetagraphException('invertex', resources['value_null'])
if outvertex is None:
raise MetagraphException('outvertex', resources['value_null'])
result = []
if len(self.edges) > 0:
for edge in self.edges:
if (invertex.issubset(edge.invertex)) and (outvertex.issubset(edge.outvertex)) and (edge not in result):
result.append(edge)
return result
[docs] @staticmethod
def get_coinputs(edge, x_i):
""" Returns the set of co-inputs for element x_i in the given edge.
:param edge: Edge object
:param x_i: invertex element
:return: set
"""
coinputs = None
all_inputs = edge.invertex
if x_i in list(all_inputs):
coinputs = list(all_inputs)
coinputs.remove(x_i)
if coinputs is not None and len(coinputs) > 0:
return set(coinputs)
return None
[docs] @staticmethod
def get_cooutputs(edge, x_j):
""" Returns the set of co-outputs for element x_j in the given edge.
:param edge: Edge object
:param x_j: outvertex element
:return: set
"""
cooutputs = None
all_outputs = edge.outvertex
if x_j in list(all_outputs):
cooutputs = list(all_outputs)
cooutputs.remove(x_j)
if cooutputs is not None and len(cooutputs) > 0:
return set(cooutputs)
return None
[docs] def adjacency_matrix_old(self):
""" Returns the adjacency matrix of the metagraph.
:return: numpy.matrix
"""
# get matrix size
size = len(self.generating_set)
adj_matrix = MetagraphHelper().get_null_matrix(size, size)
# one triple for each edge e connecting x_i to x_j
for i in range(size):
for j in range(size):
x_i = list(self.generating_set)[i]
x_j = list(self.generating_set)[j]
# multiple edges may exist between x_i and x_j
edges = self.get_edges({x_i}, {x_j})
if len(edges) > 0:
triples_list = []
for edge in edges:
coinputs = self.get_coinputs(edge, x_i)
cooutputs = self.get_cooutputs(edge, x_j)
triple = Triple(coinputs, cooutputs, edge)
if triple not in triples_list:
triples_list.append(triple)
adj_matrix[i][j] = triples_list
# return adj_matrix
# noinspection PyCallingNonCallable
return matrix(adj_matrix)
[docs] def adjacency_matrix(self):
""" Returns the adjacency matrix of the metagraph.
:return: numpy.matrix
"""
# get matrix size
size = len(self.generating_set)
adj_matrix = MetagraphHelper().get_null_matrix(size, size)
# create lookup table
count=1
triples_lookup=dict()
for edge in self.edges:
for elt1 in edge.invertex:
for elt2 in edge.outvertex:
coinputs = self.get_coinputs(edge, elt1)
cooutputs = self.get_cooutputs(edge, elt2)
triple = Triple(coinputs, cooutputs, edge)
if (elt1,elt2) not in triples_lookup:
triples_lookup[(elt1,elt2)] = []
triples_lookup[(elt1,elt2)].append(triple)
count+=1
count=1
gen_elts = list(self.generating_set)
for i in range(size):
for j in range(size):
x_i = gen_elts[i]
x_j = gen_elts[j]
try:
adj_matrix[i][j] = triples_lookup[(x_i,x_j)]
except BaseException,e:
pass
# noinspection PyCallingNonCallable
return matrix(adj_matrix)
[docs] def equivalent(self, metagraph2):
"""Checks if current metagraph is equivalent to the metagraph provided.
:param metagraph2: Metagraph object
:return: boolean
"""
if metagraph2 is None:
raise MetagraphException('metagraph2', resources['value_null'])
if self.dominates(metagraph2) and metagraph2.dominates(self):
return True
return False
[docs] def add_metagraph(self, metagraph2):
""" Adds the given metagraph to current and returns the composed result.
:param metagraph2: Metagraph object
:return: Metagraph object
"""
if metagraph2 is None:
raise MetagraphException('metagraph2', resources['value_null'])
generating_set1 = self.generating_set
generating_set2 = metagraph2.generating_set
if generating_set2 is None or len(generating_set2) == 0:
raise MetagraphException('metagraph2.generating_set', resources['value_null'])
# check if the generating sets of the matrices overlap (otherwise no sense in combining metagraphs)
#intersection=generating_set1.intersection(generating_set2)
#if intersection==None:
# raise MetagraphException('generating_sets', resources['no_overlap'])
if len(generating_set1.difference(generating_set2)) == 0 and \
len(generating_set2.difference(generating_set1)) == 0:
# generating sets are identical..simply add edges
# size = len(generating_set1)
for edge in metagraph2.edges:
if edge not in self.edges:
self.add_edge(edge)
else:
# generating sets overlap but are different...combine generating sets and then add edges
# combined_generating_set = generating_set1.union(generating_set2)
for edge in metagraph2.edges:
if edge not in self.edges:
self.add_edge(edge)
return self
[docs] def multiply_metagraph(self, metagraph2):
""" Multiplies the metagraph with that provided and returns the result.
:param metagraph2: Metagraph object
:return: Metagraph object
"""
if metagraph2 is None:
raise MetagraphException('metagraph2', resources['value_null'])
generating_set1 = self.generating_set
generating_set2 = metagraph2.generating_set
if generating_set2 is None or len(generating_set2) == 0:
raise MetagraphException('metagraph2.generator_set', resources['value_null'])
# check generating sets are identical
if not(len(generating_set1.difference(generating_set2)) == 0 and
len(generating_set2.difference(generating_set1)) == 0):
raise MetagraphException('generator_sets', resources['not_identical'])
adjacency_matrix1 = self.adjacency_matrix().tolist()
adjacency_matrix2 = metagraph2.adjacency_matrix().tolist()
size = len(generating_set1)
resultant_adjacency_matrix = MetagraphHelper().get_null_matrix(size, size)
for i in range(size):
for j in range(size):
resultant_adjacency_matrix[i][j] = MetagraphHelper().multiply_components(
adjacency_matrix1, adjacency_matrix2, generating_set1, i, j, size)
# extract new edge list
new_edge_list = MetagraphHelper().get_edges_in_matrix(resultant_adjacency_matrix, self.generating_set)
# clear current edge list and append new
self.edges = []
if len(new_edge_list) > 0:
self.add_edges_from(new_edge_list)
return self
[docs] def get_closure(self):
""" Returns the closure matrix (i.e., A*) of the metagraph.
:return: numpy.matrix
"""
adjacency_matrix = self.adjacency_matrix().tolist()
i = 0
size = len(self.generating_set)
a = dict()
a[i] = adjacency_matrix
a_star = adjacency_matrix
for i in range(size):
#print(' iteration %s --------------'%i)
a[i+1] = MetagraphHelper().multiply_adjacency_matrices(a[i],
self.generating_set,
adjacency_matrix,
self.generating_set)
#print('multiply_adjacency_matrices complete')
a_star = MetagraphHelper().add_adjacency_matrices(a_star,
self.generating_set,
a[i+1],
self.generating_set)
#print('add_adjacency_matrices complete')
if a[i+1] == a[i]:
break
# noinspection PyCallingNonCallable
return matrix(a_star)
[docs] def get_all_metapaths_from(self, source, target):
""" Retrieves all metapaths between given source and target in the metagraph.
:param source: set
:param target: set
:return: list of Metapath objects
"""
if source is None or len(source) == 0:
raise MetagraphException('source', resources['value_null'])
if target is None or len(target) == 0:
raise MetagraphException('target', resources['value_null'])
# check subset
if not source.intersection(self.generating_set) == source:
raise MetagraphException('source', resources['not_a_subset'])
if not target.intersection(self.generating_set) == target:
raise MetagraphException('target', resources['not_a_subset'])
# compute A* first
if self.a_star is None:
#print('computing closure..')
self.a_star = self.get_closure().tolist()
#print('closure computation- %s'%(time.time()- start))
metapaths = []
all_applicable_input_rows = []
for x_i in source:
index = list(self.generating_set).index(x_i)
if index not in all_applicable_input_rows:
all_applicable_input_rows.append(index)
cumulative_output_global = []
cumulative_edges_global = []
for i in all_applicable_input_rows:
mp_exist_for_row=False
cumulative_output_local = []
cumulative_edges_local = []
for x_j in target:
j = list(self.generating_set).index(x_j)
if self.a_star[i][j] is not None:
mp_exist_for_row = True
# x_j is already an output
cumulative_output_local.append(x_j)
triples = MetagraphHelper().get_triples(self.a_star[i][j])
for triple in triples:
# retain cooutputs
output = triple.cooutputs
if output is not None:
cumulative_output_local += output
if output is not None:
cumulative_output_global += output
#... and edges
if isinstance(triple.edges, Edge):
edges = MetagraphHelper().get_edge_list([triple.edges])
else:
edges = MetagraphHelper().get_edge_list(triple.edges)
for edge in edges:
if edge not in cumulative_edges_local:
cumulative_edges_local.append(edge)
if edge not in cumulative_edges_global:
cumulative_edges_global.append(edge)
if not mp_exist_for_row:
continue
# check if cumulative outputs form a cover for the target
if set(target).issubset(set(cumulative_output_local)):
if set(cumulative_edges_local) not in metapaths:
metapaths.append(set(cumulative_edges_local))
elif set(target).issubset(set(cumulative_edges_global)):
if set(cumulative_edges_global) not in metapaths:
metapaths.append(set(cumulative_edges_global))
else:
break
if len(metapaths)>0:
#result=[]
#for path in metapaths:
# mp = Metapath(source, target, list(path))
# result.append(mp)
#return result
valid_metapaths = []
processed_edge_lists=[]
from itertools import combinations
for metapath in metapaths:
if len(metapath)>25:
continue
all_subsets = sum(map(lambda r: list(combinations(list(metapath), r)), range(1, len(list(metapath))+1)), [])
for path in all_subsets:
if len(path) <= len(metapath): # metapaths
edge_list2 = self.get_edge_list2(path)
if len(processed_edge_lists)>0:
if MetagraphHelper().is_edge_list_included(edge_list2,processed_edge_lists):
continue
mp = Metapath(source, target, edge_list2)
if self.is_metapath(mp):
valid_metapaths.append(mp)
return valid_metapaths
return None
def get_all_metapaths_from200(self, source, target):
if source is None or len(source) == 0:
raise MetagraphException('source', resources['value_null'])
if target is None or len(target) == 0:
raise MetagraphException('target', resources['value_null'])
# check subset
if not source.intersection(self.generating_set) == source:
raise MetagraphException('source', resources['not_a_subset'])
if not target.intersection(self.generating_set) == target:
raise MetagraphException('target', resources['not_a_subset'])
# compute A* first
if self.a_star is None:
print('computing closure..')
self.a_star = self.get_closure().tolist()
print('find mps')
metapaths = []
all_applicable_input_rows = []
for x_i in source:
index = list(self.generating_set).index(x_i)
if index not in all_applicable_input_rows:
all_applicable_input_rows.append(index)
for x_j in target:
mp_exists=False
j = list(self.generating_set).index(x_j)
for i in all_applicable_input_rows:
if self.a_star[i][j] is not None:
mp_exists=True
break
if not mp_exists:
return None
metapaths=[]
for x_j in target:
j = list(self.generating_set).index(x_j)
triples_set=set()
for i in all_applicable_input_rows:
triples = MetagraphHelper().get_triples(self.a_star[i][j])
triples_set = triples_set.union(set(triples))
if MetagraphHelper().forms_cover(triples_set, target, x_j):
metapath = MetagraphHelper().get_metapath_from_triples(source, target, triples_set)
metapaths.append(metapath)
for i in all_applicable_input_rows:
triples_set=set()
for x_j in target:
j = list(self.generating_set).index(x_j)
triples = MetagraphHelper().get_triples(self.a_star[i][j])
triples_set = triples_set.union(set(triples))
if MetagraphHelper().forms_cover(triples_set, target, x_j):
metapath = MetagraphHelper().get_metapath_from_triples(source, target, triples_set)
metapaths.append(metapath)
return metapaths
def get_edge_list(self, path):
result=set()
for elt in path:
result = result.union(elt)
return list(result)
def get_edge_list2(self, path):
result=set()
for elt in path:
result = result.union({elt})
return list(result)
[docs] def is_metapath(self, metapath_candidate):
""" Checks if the given candidate is a metapath.
:param metapath_candidate: Metapath object
:return: boolean
"""
if metapath_candidate is None:
raise MetagraphException('metapath_candidate', resources['value_null'])
all_inputs=[]
all_outputs=[]
for edge in metapath_candidate.edge_list:
for input_elt in list(edge.invertex):
if input_elt not in all_inputs:
all_inputs.append(input_elt)
for output_elt in list(edge.outvertex):
if output_elt not in all_outputs:
all_outputs.append(output_elt)
# now check input and output sets
if (set(all_inputs).difference(set(all_outputs)).issubset(metapath_candidate.source)) and \
set(metapath_candidate.target).issubset(all_outputs):
return True
return False
[docs] def is_edge_dominant_metapath(self, metapath):
""" Checks if the given metapath is an edge-dominant metapath.
:param metapath: Metapath object
:return: boolean
"""
if metapath is None:
raise MetagraphException('metapath', resources['value_null'])
from itertools import combinations
# check input metapath is valid
if not self.is_metapath(metapath):
return False
all_subsets = sum(map(lambda r: list(combinations(metapath.edge_list, r)),
range(1, len(metapath.edge_list)+1)), [])
# if one proper subset is a metapath then not edge dominant
for path in all_subsets:
# must be a proper subset
if len(path) < len(metapath.edge_list):
mp = Metapath(metapath.source, metapath.target, list(path))
if self.is_metapath(mp):
return False
return True
[docs] def is_input_dominant_metapath(self, metapath):
""" Checks if the given metapath is an input-dominant metapath.
:param metapath: Metapath object
:return: boolean
"""
if metapath is None:
raise MetagraphException('metapath', resources['value_null'])
from itertools import combinations
# check input metapath is valid
if not self.is_metapath(metapath):
return False
# get all proper subsets of subset1
all_subsets = sum(map(lambda r: list(combinations(metapath.source, r)), range(1, len(metapath.source)+1)), [])
# if one proper subset has a metapath to subset2 then not input dominant
for subset in all_subsets:
# must be proper subset
if len(subset) < len(metapath.source):
if isinstance(subset, tuple):
subset = set(list(subset))
metapath1 = self.get_all_metapaths_from(subset, metapath.target)
if metapath1 is not None and len(metapath1) > 0:
#print('source: %s, target: %s'%(subset, metapath.target))
return False
return True
[docs] def is_dominant_metapath(self, metapath):
""" Checks if the given metapath is a dominant metapath.
:param metapath: Metapath object
:return: boolean
"""
if metapath is None:
raise MetagraphException('metapath', resources['value_null'])
# check input metapath is valid
if not self.is_metapath(metapath):
return False
if (self.is_edge_dominant_metapath(metapath) and
self.is_input_dominant_metapath(metapath)):
return True
return False
[docs] def is_redundant_edge(self, edge, metapath, source, target):
""" Checks if the given edge is redundant for the given metapath.
:param edge: Edge object
:param metapath: Metapath object
:param source: set
:param target: set
:return: boolean
"""
if edge is None:
raise MetagraphException('edge', resources['value_null'])
if metapath is None:
raise MetagraphException('metapath', resources['value_null'])
if source is None:
raise MetagraphException('source', resources['value_null'])
if target is None:
raise MetagraphException('target', resources['value_null'])
# check input metapath is valid
if not self.is_metapath(metapath):
raise MetagraphException('metapath', resources['arguments_invalid'])
from itertools import combinations
all_subsets = sum(map(lambda r: list(combinations(target, r)), range(1, len(target)+1)), [])
# get all metapaths from subset1 to proper subsets of subset2
for subset in all_subsets:
if len(subset) < len(target):
metapaths = self.get_all_metapaths_from(source, target)
# check if edges is in every metapath found
if metapaths is not None and len(metapaths) > 0:
for mp in metapaths:
if not MetagraphHelper().is_edge_in_list(edge, mp.edge_list):
# redundant
return True
# non redundant
return False
return False
[docs] def is_cutset(self, edge_list, source, target):
""" Checks if an edge list is a cutset between a given source and target.
:param edge_list: list of Edge objects
:param source: set
:param target: set
:return: boolean
"""
if edge_list is None:
raise MetagraphException('edges', resources['value_null'])
if source is None:
raise MetagraphException('source', resources['value_null'])
if target is None:
raise MetagraphException('target', resources['value_null'])
# get all metapaths
metapaths = self.get_all_metapaths_from(source, target)
if metapaths is not None and len(metapaths)>0:
for metapath in metapaths:
updated = MetagraphHelper().remove_edge_list(edge_list, metapath.edge_list)
mpc = Metapath(source, target, updated)
if self.is_metapath(mpc):
# not a cutset
return False
# is a cutset
return True
return False
[docs] def get_minimal_cutset(self, source, target):
""" Retrieves the minimal cutset between a given source and target.
:param source: set
:param target: set
:return: list of Edge objects
"""
if source is None:
raise MetagraphException('source', resources['value_null'])
if target is None:
raise MetagraphException('target', resources['value_null'])
metapaths = self.get_all_metapaths_from(source, target)
if metapaths is None or len(metapaths) == 0:
return None
cutsets = []
from itertools import combinations
# noinspection PyTypeChecker
for metapath in metapaths:
all_combinations = sum(map(lambda r: list(combinations(metapath.edge_list, r)),
range(1, len(metapath.edge_list)+1)), [])
for combination in all_combinations:
if self.is_cutset(list(combination), source, target) and list(combination) not in cutsets:
cutsets.append(list(combination))
# return smallest cutset
if len(cutsets) > 0:
smallest = cutsets[0]
for cutset in cutsets:
if len(cutset) < len(smallest):
smallest = cutset
return smallest
return None
[docs] def is_bridge(self, edge_list, source, target):
""" Checks if a given edge list forms a bridge between a source and a target.
:param edge_list: list of Edge objects
:param source: set
:param target: set
:return: boolean
"""
if edge_list is None:
raise MetagraphException('edge_list', resources['value_null'])
if source is None:
raise MetagraphException('source', resources['value_null'])
if target is None:
raise MetagraphException('target', resources['value_null'])
if len(edge_list)!=1:
return False
if not isinstance(edge_list, list):
raise MetagraphException('edge_list', resources['arguments_invalid'])
return self.is_cutset(edge_list, source, target)
[docs] def get_projection(self, generator_subset):
""" Gets the metagraph projection for a subset of the generating set.
:param generator_subset: set
:return: Metagraph object
"""
if generator_subset is None:
raise MetagraphException('generator_subset', resources['value_null'])
# step1. reduce A* by removing unwanted rows, cols
applicable_rows_and_cols = []
for x_i in generator_subset:
index = list(self.generating_set).index(x_i)
if index not in applicable_rows_and_cols:
applicable_rows_and_cols.append(index)
a_star = self.get_closure().tolist()
# sort list
applicable_rows_and_cols = sorted(applicable_rows_and_cols)
size = len(generator_subset)
a_star_new = MetagraphHelper().get_null_matrix(size, size)
m = 0
for i in applicable_rows_and_cols:
n = 0
for j in applicable_rows_and_cols:
a_star_new[m][n] = a_star[i][j]
n += 1
m += 1
# step2. create list L from edges in E (not E') s.t. V_e is a subset of X'
edge_list1 = []
all_triples = []
k = len(applicable_rows_and_cols)
for i in range(k):
for j in range(k):
if a_star_new[i][j] is not None:
triples = MetagraphHelper().get_triples(a_star_new[i][j])
for triple in triples:
if isinstance(triple.edges, Edge):
new_triple = Triple(triple.coinputs, triple.cooutputs, [triple.edges])
if not MetagraphHelper().is_triple_in_list(new_triple,all_triples):
all_triples.append(new_triple)
edges = MetagraphHelper().extract_edge_list([triple.edges])
else:
if not MetagraphHelper().is_triple_in_list(triple,all_triples):
all_triples.append(triple)
edges = MetagraphHelper().extract_edge_list(triple.edges)
# select edges with invertices in generator_subset
for edge in edges:
if edge.invertex.issubset(set(generator_subset)) and ([edge] not in edge_list1):
edge_list1.append([edge])
# step3. find combinations of triples s.t. union(CI_t_i)\ union(CO_t_i) is a subset of generator_subset
from itertools import combinations
all_combinations = sum(map(lambda r: list(combinations(all_triples, r)), range(1, len(all_triples)+1)), [])
for combination in all_combinations:
coinput = MetagraphHelper().get_coinputs_from_triples(combination)
cooutput = MetagraphHelper().get_cooutputs_from_triples(combination)
diff = set(coinput).difference(set(cooutput))
if diff.issubset(set(generator_subset)):
# add edges in combination to L
edges2 = MetagraphHelper().get_edges_from_triple_list(list(combination))
included = MetagraphHelper().is_edge_list_included_recursive(edges2, edge_list1)
if not included:
edge_list1.append(edges2)
# step4. construct L0 from L
triples_list_l0 = []
for element in edge_list1:
all_inputs = MetagraphHelper().get_netinputs(element)
all_outputs = MetagraphHelper().get_netoutputs(element)
net_inputs = list(set(all_inputs).difference(all_outputs))
net_outputs = all_outputs
new_triple = Triple(set(net_inputs), set(net_outputs), element)
if new_triple not in triples_list_l0:
triples_list_l0.append(new_triple)
# step5. reduce L0
to_eliminate = []
for i in triples_list_l0:
for j in triples_list_l0:
if i != j:
# check if i is subsumed by j
edges_i = MetagraphHelper().get_edge_list(i.edges)
edges_j = MetagraphHelper().get_edge_list(j.edges)
outputs_i = i.cooutputs
outputs_j = j.coinputs
# check j's edges are a subset of i's edges
subset = True
for edge in edges_j:
if edge not in edges_i:
subset = False
break
inclusive = True
if subset:
# edges form a subset..check outputs are inclusive
outputs_j_in_x = []
for output in outputs_j:
if (output in generator_subset) and (output not in outputs_j_in_x):
outputs_j_in_x.append(output)
outputs_i_in_x = []
for output in outputs_i:
if (output in generator_subset) and (output not in outputs_i_in_x):
outputs_i_in_x.append(output)
for output in outputs_i_in_x:
if output not in outputs_j_in_x:
inclusive = False
break
if inclusive and (i not in to_eliminate):
to_eliminate.append(i)
break
for elt in to_eliminate:
triples_list_l0.remove(elt)
to_drop = []
for i in triples_list_l0:
for j in triples_list_l0:
if i != j:
inputs_i = i.coinputs
inputs_j = j.coinputs
outputs_i = i.cooutputs
outputs_j = j.cooutputs
# check if input and output of j are a subset of i
input_subset = True
for input_elt in inputs_j:
if input_elt not in inputs_i:
input_subset = False
break
if input_subset:
output_subset = True
for output in outputs_j:
if output not in outputs_i:
output_subset = False
break
if output_subset:
for elt in j.cooutputs:
i.cooutputs.remove(elt)
if (i.cooutputs is None or len(i.cooutputs) == 0) and (i not in to_drop):
to_drop.append(i)
for item in to_drop:
triples_list_l0.remove(item)
#step6. merge triples based on identical inputs and outputs
triples_to_merge = dict()
index = 0
for triple1 in triples_list_l0:
for triple2 in triples_list_l0:
if triple1 != triple2:
# merge if same input and output
if triple1.coinputs == triple2.coinputs and triple1.cooutputs == triple2.cooutputs:
if index not in triples_to_merge:
triples_to_merge[index] = []
if triple2 not in triples_to_merge[index]:
triples_to_merge[index].append(triple2)
index += 1
post_merge_triples = dict()
for index, triples_list in triples_to_merge.iteritems():
triple1 = triples_to_merge[index]
if triple1 in triples_list_l0:
triples_list_l0.remove(triple1)
for triple2 in triples_list:
if triple2 in triples_list_l0:
triples_list_l0.remove(triple2)
if index not in post_merge_triples:
post_merge_triples[index] = Triple(triple1.coinputs, triple1.cooutputs,
[triple1.edges, triple2.edges])
else:
post_merge_triples[index].edges.append(triple2.edges)
triples_list_l0.append(post_merge_triples[index])
#step7. and triples with identical inputs only
triples_to_merge = dict()
index = 0
for triple1 in triples_list_l0:
for triple2 in triples_list_l0:
if triple1 != triple2:
# merge if same input
if triple1.coinputs == triple2.coinputs:
if index not in triples_to_merge:
triples_to_merge[index] = []
if triple2 not in triples_to_merge[index]:
triples_to_merge[index].append(triple2)
index += 1
triple_list_l0_copy = copy.copy(triples_list_l0)
post_merge_triples = dict()
for index, triples_list in triples_to_merge.iteritems():
triple1 = triple_list_l0_copy[index]
if triple1 in triples_list_l0:
triples_list_l0.remove(triple1)
for triple2 in triples_list:
if triple2 in triples_list_l0:
triples_list_l0.remove(triple2)
if index not in post_merge_triples:
edges_to_merge = list(triple1.edges)
for elt in list(triple2.edges):
if elt not in edges_to_merge:
edges_to_merge.append(elt)
post_merge_triples[index] = Triple(triple1.coinputs, triple1.cooutputs.union(triple2.cooutputs),
edges_to_merge)
else:
edges_to_merge = list(post_merge_triples[index].edges)
for elt in list(triple2.edges):
if elt not in edges_to_merge:
edges_to_merge.append(elt)
post_merge_triples[index] = Triple(post_merge_triples[index].coinputs,
post_merge_triples[index].cooutputs.union(triple2.cooutputs),
edges_to_merge) # triple1
triples_list_l0.append(post_merge_triples[index])
temp_list = []
for triple in triples_list_l0:
# remove all inputs and outputs that are not in generator_subset
valid_inputs = triple.coinputs.intersection(set(generator_subset))
valid_outputs = triple.cooutputs.intersection(set(generator_subset))
new_triple = Triple(valid_inputs, valid_outputs, triple.edges)
if new_triple not in temp_list:
temp_list.append(new_triple)
# remove any tuples with zero input or output
final_triples_list = []
for triple in temp_list:
if triple.coinputs is not None and triple.cooutputs is not None and \
len(triple.coinputs) > 0 and len(triple.cooutputs) > 0 and \
(triple not in final_triples_list):
final_triples_list.append(triple)
edge_list = []
for triple in final_triples_list:
edge = Edge(triple.coinputs, triple.cooutputs, None, repr(triple.edges))
if edge not in edge_list:
edge_list.append(edge)
if edge_list is None or len(edge_list) == 0:
return None
mg = Metagraph(set(generator_subset))
mg.add_edges_from(edge_list)
return mg
[docs] def incidence_matrix(self):
""" Gets the metagraph projection for a subset of the generating set.
:return: numpy.matrix
"""
rows = len(self.generating_set)
cols = len(self.edges)
incidence_matrix = MetagraphHelper().get_null_matrix(rows, cols)
for i in range(rows):
x_i = list(self.generating_set)[i]
for j in range(cols):
e_j = self.edges[j]
if x_i in e_j.invertex:
incidence_matrix[i][j] = -1
elif x_i in e_j.outvertex:
incidence_matrix[i][j] = 1
else:
incidence_matrix[i][j] = None
# noinspection PyCallingNonCallable
return matrix(incidence_matrix)
[docs] def get_inverse(self):
""" Gets the inverse metagraph.
:return: Metagraph object
"""
incidence_m = self.incidence_matrix().tolist()
edge_list = []
# step1: extract indices
col_index = 0
for i in range(len(incidence_m[0])):
negative_item_indices = []
positive_item_indices = []
column = [row[col_index] for row in incidence_m]
row_index = 0
for item1 in column:
if item1 == -1:
# get all elements with +1 across the row
row = incidence_m[row_index]
eligible = []
local_index = 0
for item2 in row:
if item2 == 1 and local_index not in eligible:
eligible.append(local_index)
local_index += 1
if len(eligible) == 0:
row_index += 1 # debug dr check this
continue
# TODO: how do we handle multiple occurrences of +1?
# keep a track of -1 and + 1 indices
if (row_index, col_index) not in negative_item_indices:
negative_item_indices.append((row_index, col_index))
for local_index in eligible:
if (row_index, local_index) not in positive_item_indices:
positive_item_indices.append((row_index, local_index))
row_index += 1
# construct edges from indices
invertex = []
outvertex = []
edge_label = None
for negative_item_index in negative_item_indices:
if repr(self.edges[negative_item_index[1]]) not in outvertex:
outvertex.append(repr(self.edges[negative_item_index[1]]))
for positive_item_index in positive_item_indices:
if repr(self.edges[positive_item_index[1]]) not in invertex:
invertex.append(repr(self.edges[positive_item_index[1]]))
# generate label
if edge_label is None:
edge_label = '<%s,%s>' % (list(self.generating_set)[positive_item_index[0]],
repr(self.edges[positive_item_index[1]]))
else:
edge_label += ', <%s,%s>' % (list(self.generating_set)[positive_item_index[0]],
repr(self.edges[positive_item_index[1]]))
if invertex is not None and outvertex is not None and len(invertex) > 0 and len(outvertex) > 0:
edge = Edge(set(invertex), set(outvertex), None, edge_label)
if edge not in edge_list:
edge_list.append(edge)
col_index += 1
# compress the edges
compressed_edges = []
for edge1 in edge_list:
compressed = False
for edge2 in edge_list:
if edge1 != edge2:
if edge1.invertex == edge2.invertex and edge1.label == edge2.label:
new_edge = Edge(edge1.invertex, edge1.outvertex.union(edge2.outvertex), None, edge1.label)
if not MetagraphHelper().is_edge_in_list(new_edge, compressed_edges):
compressed_edges.append(new_edge)
compressed = True
if not compressed and (not MetagraphHelper().is_edge_in_list(edge1, compressed_edges)):
compressed_edges.append(edge1)
# add links to alpha and beta
row_index = 0
occurrences = lambda s, lst: (y for y, e in enumerate(lst) if e == s)
for row in incidence_m:
if row.__contains__(-1) and (not row.__contains__(1)):
col_indices = list(occurrences(-1, row))
for col_index in col_indices:
label = '<%s, alpha>' % (list(self.generating_set)[row_index])
new_edge = Edge({'alpha'}, {repr(self.edges[col_index])}, None, label)
if not MetagraphHelper().is_edge_in_list(new_edge, compressed_edges):
compressed_edges.append(new_edge)
elif row.__contains__(1) and (not row.__contains__(-1)):
col_indices = list(occurrences(1, row))
for col_index in col_indices:
label = '<%s, %s>' % (list(self.generating_set)[row_index], repr(self.edges[col_index]))
new_edge = Edge({repr(self.edges[col_index])}, {'beta'}, None, label)
if not MetagraphHelper().is_edge_in_list(new_edge, compressed_edges):
compressed_edges.append(new_edge)
row_index += 1
mg = Metagraph(MetagraphHelper().get_generating_set(compressed_edges))
mg.add_edges_from(compressed_edges)
return mg
[docs] def get_efm(self, generator_subset):
""" Gets the element-flow metagraph.
:param generator_subset: set
:return: Metagraph object
"""
if generator_subset is None or len(generator_subset) == 0:
raise MetagraphException('generator_subset', resources['value_null'])
incidence_m = self.incidence_matrix().tolist()
excluded = self.generating_set.difference(generator_subset)
# compute G1 and G2
applicable_rows = []
for x_i in generator_subset:
index = list(self.generating_set).index(x_i)
if index not in applicable_rows:
applicable_rows.append(index)
# sort list
applicable_rows = sorted(applicable_rows)
inapplicable_rows = sorted(set(range(len(self.generating_set))).difference(applicable_rows))
rows1 = len(generator_subset)
cols1 = len(incidence_m[0])
rows2 = len(excluded)
g1 = MetagraphHelper().get_null_matrix(rows1, cols1)
g2 = MetagraphHelper().get_null_matrix(rows2, cols1)
m = 0
for i in applicable_rows:
n = 0
for j in range(cols1):
g1[m][n] = incidence_m[i][j]
n += 1
m += 1
m = 0
for i in inapplicable_rows:
n = 0
for j in range(cols1):
g2[m][n] = incidence_m[i][j]
n += 1
m += 1
g1_t = MetagraphHelper().transpose_matrix(g1)
mult_r = MetagraphHelper().custom_multiply_matrices(g2, g1_t, self.edges)
row_index = 0
edge_list = []
lookup = dict()
for row in mult_r:
col_index = 0
invertices = []
outvertices = []
for elt in row:
if len(elt) > 0 and isinstance(list(elt)[0], tuple):
extracted = list(elt)[0]
if 1 in extracted:
invertex = []
local_indices = [row.index(elt2) for elt2 in row if elt.issubset(elt2)]
for local_index in local_indices:
value = list(self.generating_set)[applicable_rows[local_index]]
if value not in invertex:
invertex.append(value)
if set(invertex) not in invertices:
invertices.append(set(invertex))
if repr(invertex) not in lookup:
lookup[repr(invertex)] = extracted[1]
elif -1 in extracted:
outvertex = []
local_indices = [row.index(elt2) for elt2 in row if elt.issubset(elt2)]
for local_index in local_indices:
value = list(self.generating_set)[applicable_rows[local_index]]
if value not in outvertex:
outvertex.append(value)
if set(outvertex) not in outvertices:
outvertices.append(set(outvertex))
if repr(outvertex) not in lookup:
lookup[repr(outvertex)] = extracted[1]
col_index += 1
# combine the invertices and outvertices
for invertex in invertices:
for outvertex in outvertices:
# create flow composition
label = '%s <%s; %s>' % (list(self.generating_set)[inapplicable_rows[row_index]],
lookup[repr(list(invertex))], lookup[repr(list(outvertex))])
edge = Edge(invertex, outvertex, None, label)
if not MetagraphHelper().is_edge_in_list(edge, edge_list):
edge_list.append(edge)
row_index += 1
# combine edges
final_edge_list = []
for edge1 in edge_list:
match = False
for edge2 in edge_list:
if edge1 != edge2:
if edge1.invertex == edge2.invertex and edge1.outvertex == edge1.outvertex:
match = True
comp1 = MetagraphHelper().extract_edge_label_components(edge1.label)
comp2 = MetagraphHelper().extract_edge_label_components(edge2.label)
combined = (comp1[0].union(comp2[0]), comp1[1].union(comp2[1]), comp1[2].union(comp2[2]))
label = '%s <%s; %s>' % (list(combined[0]), list(combined[1]), list(combined[2]))
combined_edge = Edge(edge1.invertex, edge1.outvertex, None, label)
if not MetagraphHelper().is_edge_in_list(combined_edge, final_edge_list):
final_edge_list.append(combined_edge)
if not match:
if not MetagraphHelper().is_edge_in_list(edge1, final_edge_list):
final_edge_list.append(edge1)
if len(final_edge_list) > 0:
gen_set = MetagraphHelper().get_generating_set(final_edge_list)
mg = Metagraph(gen_set)
mg.add_edges_from(final_edge_list)
return mg
return None
[docs] def dominates(self, metagraph2):
"""Checks if the metagraph dominates that provided.
:param metagraph2: Metagraph object
:return: boolean
"""
if metagraph2 is None:
raise MetagraphException('metagraph2', resources['value_null'])
#adjacency_matrix = self.adjacency_matrix().tolist()
#all_metapaths1 = []
from itertools import combinations
all_sources1 = sum(map(lambda r: list(combinations(self.generating_set, r)),
range(1, len(self.generating_set))), [])
all_targets1 = copy.copy(all_sources1)
all_sources2 = sum(map(lambda r: list(combinations(metagraph2.generating_set, r)),
range(1, len(metagraph2.generating_set))), [])
all_targets2 = copy.copy(all_sources2)
all_metapaths1 = []
i = 1
#s1 = len(all_sources1)
#t1 = len(all_targets1)
#s2 = len(all_sources2)
#t2 = len(all_targets2)
for source in all_sources1:
for target in all_targets1:
if source != target:
mp = self.get_all_metapaths_from(set(source), set(target))
if mp is not None and len(mp) > 0 and (mp not in all_metapaths1):
all_metapaths1.append(mp)
#print(i)
i += 1
all_metapaths2 = []
for source in all_sources2:
for target in all_targets2:
if source != target:
mp = self.get_all_metapaths_from(set(source), set(target))
if mp is not None and len(mp) > 0 and (mp not in all_metapaths2):
all_metapaths2.append(mp)
for mp1 in all_metapaths2:
dominated = False
for mp2 in all_metapaths1:
if mp2.dominates(mp1):
dominated = True
break
if not dominated:
return False
return True
def __repr__(self):
edge_desc = [repr(edge) for edge in self.edges]
full_desc = ''
for desc in edge_desc:
if full_desc == '':
full_desc = desc
else:
full_desc += ", " + desc
desc = '%s(%s)' % (str(type(self)), full_desc)
desc = desc.replace('\\', '')
return desc
[docs]class ConditionalMetagraph(Metagraph):
""" Represents a conditional metagraph that is instantiated using a set of variables and a set of propositions.
"""
def __init__(self, variables_set, propositions_set):
if not isinstance(variables_set, set):
raise MetagraphException('variable_set', resources['format_invalid'])
if not isinstance(propositions_set, set):
raise MetagraphException('propositions_set', resources['format_invalid'])
if len(variables_set.intersection(propositions_set)) > 0:
raise MetagraphException('variables_and_propositions', resources['partition_invalid'])
self.nodes = []
self.edges = []
self.mg = None
self.variables_set = variables_set
self.propositions_set = propositions_set
self.generating_set = variables_set.union(propositions_set)
super(ConditionalMetagraph, self).__init__(self.generating_set)
[docs] def add_edges_from(self, edge_list):
""" Adds the given list of edges to the conditional metagraph.
:param edge_list: list of Edge objects
:return: None
"""
if edge_list is None or len(edge_list) == 0:
raise MetagraphException('edge_list', resources['value_null'])
for edge in edge_list:
if not isinstance(edge, Edge):
raise MetagraphException('edge', resources['format_invalid'])
if len(edge.invertex.union(edge.outvertex)) == 0:
raise MetagraphException('edge', resources['value_null'])
# if outvertex contains a proposition, the outvertex cannot contain any other element
for proposition in self.propositions_set:
if proposition in edge.outvertex:
if len(edge.outvertex) > 1:
raise MetagraphException('edge', resources['value_invalid'])
for edge in edge_list:
node1 = Node(edge.invertex)
node2 = Node(edge.outvertex)
if not MetagraphHelper().is_node_in_list(node1, self.nodes):
self.nodes.append(node1)
if not MetagraphHelper().is_node_in_list(node2, self.nodes):
self.nodes.append(node2)
if edge not in self.edges:
self.edges.append(edge)
[docs] def get_context(self, true_propositions, false_propositions):
"""Retrieves the context metagraph for the given true and false propositions.
:param true_propositions: set
:param false_propositions: set
:return: ConditionalMetagraph object
"""
if true_propositions is None or len(true_propositions) == 0:
raise MetagraphException('true_propositions', resources['value_null'])
if false_propositions is None or len(false_propositions) == 0:
raise MetagraphException('false_propositions', resources['value_null'])
for proposition in true_propositions:
if proposition not in self.propositions_set:
raise MetagraphException('true_propositions', resources['range_invalid'])
for proposition in false_propositions:
if proposition not in self.propositions_set:
raise MetagraphException('false_propositions', resources['range_invalid'])
edges_to_remove = []
edges_copy = copy.copy(self.edges)
for edge in edges_copy:
for proposition in list(true_propositions):
if proposition in edge.invertex:
edge.invertex.difference({proposition})
# remove if this results in an invertex that is null
if len(edge.invertex) == 0 and edge not in edges_to_remove:
edges_to_remove.append(edge)
if proposition in edge.outvertex:
edge.outvertex.difference({proposition})
# remove if this results in an outvertex that is null
if len(edge.outvertex) == 0 and edge not in edges_to_remove:
edges_to_remove.append(edge)
for proposition in list(false_propositions):
if proposition in edge.invertex or proposition in edge.outvertex:
# remove edge
if edge not in edges_to_remove:
edges_to_remove.append(edge)
# create new conditional metagraph describing context
context = ConditionalMetagraph(self.variables_set, self.propositions_set)
for edge in edges_to_remove:
if edge in edges_copy:
edges_copy.remove(edge)
context.add_edges_from(edges_copy)
return context
[docs] def get_projection(self, variables_subset):
""" Gets the conditional metagraph projection for a subset of its variable set.
:param variables_subset: set
:return: Metagraph object
"""
if variables_subset is None or len(variables_subset) == 0:
raise MetagraphException('variables_subset', resources['value_null'])
subset = variables_subset.union(self.propositions_set)
generator_set = self.variables_set.union(self.propositions_set)
mg = Metagraph(generator_set)
mg.add_edges_from(self.edges)
return mg.get_projection(subset)
[docs] def get_all_metapaths_from(self, source, target, prop_subset=None):
""" Retrieves all metapaths between given source and target in the conditional metagraph.
:param source: set
:param target: set
:return: list of Metapath objects
"""
if source is None or len(source) == 0:
raise MetagraphException('subset1', resources['value_null'])
if target is None or len(target) == 0:
raise MetagraphException('subset2', resources['value_null'])
if not source.issubset(self.generating_set):
raise MetagraphException('subset1', resources['not_a_subset'])
if not target.issubset(self.generating_set):
raise MetagraphException('subset2', resources['not_a_subset'])
generator_set = self.variables_set.union(self.propositions_set)
if self.mg is None:
self.mg = Metagraph(generator_set)
self.mg.add_edges_from(self.edges)
if prop_subset is not None:
return self.mg.get_all_metapaths_from(source.union(prop_subset), target)
else:
return self.mg.get_all_metapaths_from(source.union(self.propositions_set), target)
[docs] def get_all_metapaths(self):
""" Retrieves all metapaths in the conditional metagraph.
:return: List of Metapath objects
"""
from itertools import combinations
all_subsets=sum(map(lambda r: list(combinations(self.nodes, r)), range(1, len(self.nodes)+1)), [])
all_metapaths = []
for subset1 in all_subsets:
for subset2 in all_subsets:
if subset1 != subset2:
if len(subset1)>1:
element_set=set()
for node in subset1:
element_set = element_set.union(node.element_set)
node1 = Node(element_set)
else:
node1=subset1[0]
if len(subset2)>1:
element_set=set()
for node in subset2:
element_set = element_set.union(node.element_set)
node2 = Node(element_set)
else:
node2=subset2[0]
# TODO: can source and target in a metapath overlap?
if not MetagraphHelper().nodes_overlap([node1], [node2]):
source = MetagraphHelper().get_element_set([node1])
target = MetagraphHelper().get_element_set([node2])
mps = self.get_all_metapaths_from(source, target)
if mps is None or len(mps) == 0:
continue
# noinspection PyTypeChecker
for mp in mps:
if mp not in all_metapaths:
all_metapaths.append(mp)
return all_metapaths
[docs] def has_conflicts(self, metapath):
"""Checks whether the given metapath has any conflicts.
:param metapath: Metapath object
:return: boolean
"""
invertices = set()
intersec = set()
edges = metapath.edge_list
for edge in edges:
invertices = invertices.union(edge.invertex)
if len(intersec)==0:
intersec = set(edge.attributes)
else:
intersec = intersec.intersection(set(edge.attributes))
potential_conflicts_set = invertices.intersection(self.propositions_set)
if self.edge_attributes_conflict(potential_conflicts_set, intersec):
return True
return False
[docs] def has_redundancies(self, metapath):
""" Checks if given metapath has redundancies.
:param metapath: Metapath object
:return: boolean
"""
# check input metapath is valid
if not self.is_metapath(metapath):
raise MetagraphException('metapath', resources['arguments_invalid'])
return not self.is_dominant_metapath(metapath)
[docs] def edge_attributes_conflict(self, potential_conflicts_set, intersecting_attr_set):
""" Checks if given edge attributes conflict.
:param potential_conflicts_set: set
:return: boolean
"""
if potential_conflicts_set is None:
raise MetagraphException('potential_conflicts_set', resources['value_null'])
# currently checks if actions conflict
# extend later to include active times etc
actions = self.get_actions(potential_conflicts_set)
malware_sigs = self.get_malware_sigs(potential_conflicts_set)
sig_present = self.get_sig_present(potential_conflicts_set)
# TODO: extend to support more attributes
if len(intersecting_attr_set)>0:
# check if diff response actions are applied to same malware sig list
if len(actions)>1 and len(malware_sigs)>1 and len(sig_present)==1:
intersection = set(malware_sigs[0])
for sig_list in malware_sigs:
intersection = intersection.intersection(set(sig_list))
if len(intersection)>0:
return True
# check if same response action is applied to different sig lists
if len(malware_sigs)>1 and len(actions)==1 and len(sig_present)==1:
return True
if ('allow' in actions or 'permit' in actions) and 'deny' in actions:
return True
return False
[docs] @staticmethod
def get_actions(attributes):
""" Filters the given list of attributes and returns a list of action-attribute values.
:param attributes: list
:return: list of strings
"""
if attributes is None:
raise MetagraphException('attributes', resources['value_null'])
actions = []
for attribute in attributes:
if 'action' in attribute:
value = attribute.replace('action=', '')
if value not in actions:
actions.append(value)
return actions
@staticmethod
def get_malware_sigs(attributes):
if attributes is None:
raise MetagraphException('attributes', resources['value_null'])
sigs = []
for attribute in attributes:
if 'malware_sigs' in attribute:
value = attribute.replace('malware_sigs=', '')
value = value.replace('[','')
value = value.replace(']','')
value = value.split(',')
if value not in sigs:
sigs.append(value)
return sigs
@staticmethod
def get_sig_present(attributes):
if attributes is None:
raise MetagraphException('attributes', resources['value_null'])
present = []
for attribute in attributes:
if 'sig_present' in attribute:
value = attribute.replace('sig_present=', '')
if value not in present:
present.append(value)
return present
[docs] def is_connected(self, source, target, logical_expressions, interpretations):
"""Checks if subset1 is connected to subset2.
:param source: set
:param target: set
:param logical_expressions: list of strings
:param interpretations: lists of tuples
:return: boolean
"""
if source is None or len(source) == 0:
raise MetagraphException('source', resources['value_null'])
if target is None or len(target) == 0:
raise MetagraphException('target', resources['value_null'])
if logical_expressions is None or len(logical_expressions) == 0:
raise MetagraphException('logical_expressions', resources['value_null'])
if interpretations is None or len(interpretations) == 0:
raise MetagraphException('interpretations', resources['value_null'])
if not source.issubset(self.variables_set):
raise MetagraphException('source', resources['not_a_subset'])
if not target.issubset(self.variables_set):
raise MetagraphException('target', resources['not_a_subset'])
# check expressions are over X_p
for logical_expression in logical_expressions:
logical_expression_copy = copy.copy(logical_expression)
logical_expression_copy = logical_expression_copy.replace('.', ' ')
logical_expression_copy = logical_expression_copy.replace('|', ' ')
logical_expression_copy = logical_expression_copy.replace('!', ' ')
logical_expression_copy = logical_expression_copy.replace('(', '')
logical_expression_copy.replace(')', '')
items = logical_expression_copy.split(' ')
for item in items:
item = item.replace(' ', '')
if item != '' and item not in self.propositions_set:
raise MetagraphException('logical_expression', resources['arguments_invalid'])
# check metapath exists for at least one interpretation
for interpretation in interpretations:
true_propositions = []
false_propositions = []
for tuple_elt in interpretation:
if tuple_elt[0] not in self.propositions_set:
raise MetagraphException('interpretations', resources['arguments_invalid'])
if tuple_elt[1] and tuple_elt[0] not in true_propositions:
true_propositions.append(tuple_elt[0])
elif tuple_elt[0] not in true_propositions:
false_propositions.append(tuple_elt[0])
# compute context metagraph
context = self.get_context(true_propositions, false_propositions)
metapaths = context.get_all_metapaths_from(source, target)
if metapaths is not None and len(metapaths) >= 1:
return True
return False
[docs] def is_fully_connected(self, source, target, logical_expressions, interpretations):
"""Checks if subset1 is fully connected to subset2.
:param source: set
:param target: set
:param logical_expressions: list of strings
:param interpretations: lists of tuples
:return: boolean
"""
if source is None or len(source) == 0:
raise MetagraphException('source', resources['value_null'])
if target is None or len(target) == 0:
raise MetagraphException('target', resources['value_null'])
if logical_expressions is None or len(logical_expressions) == 0:
raise MetagraphException('logical_expressions', resources['value_null'])
if interpretations is None or len(interpretations) == 0:
raise MetagraphException('interpretations', resources['value_null'])
if not source.issubset(self.variables_set):
raise MetagraphException('source', resources['not_a_subset'])
if not target.issubset(self.variables_set):
raise MetagraphException('target', resources['not_a_subset'])
# check expressions are over X_p
for logical_expression in logical_expressions:
logical_expression_copy = copy.copy(logical_expression)
logical_expression_copy = logical_expression_copy.replace('.', ' ')
logical_expression_copy = logical_expression_copy.replace('|', ' ')
logical_expression_copy = logical_expression_copy.replace('!', ' ')
logical_expression_copy = logical_expression_copy.replace('(', '')
logical_expression_copy.replace(')', '')
items = logical_expression_copy.split(' ')
for item in items:
item = item.replace(' ', '')
if item != '' and item not in self.propositions_set:
raise MetagraphException('logical_expression', resources['arguments_invalid'])
# check metapath exists for every interpretation
for interpretation in interpretations:
true_propositions = []
false_propositions = []
for tuple_elt in interpretation:
if tuple_elt[0] not in self.propositions_set:
raise MetagraphException('interpretations', resources['arguments_invalid'])
if tuple_elt[1] and tuple_elt[0] not in true_propositions:
true_propositions.append(tuple_elt[0])
elif tuple_elt[0] not in true_propositions:
false_propositions.append(tuple_elt[0])
# compute context metagraph
context = self.get_context(true_propositions, false_propositions)
metapaths = context.get_all_metapaths_from(source, target)
if not(metapaths is not None and len(metapaths) >= 1):
return False
return True
[docs] def is_redundantly_connected(self, source, target, logical_expressions, interpretations):
"""Checks if subset1 is non-redundantly connected to subset2.
:param source: set
:param target: set
:param logical_expressions: list of strings
:param interpretations: lists of tuples
:return: boolean
"""
if source is None or len(source) == 0:
raise MetagraphException('source', resources['value_null'])
if target is None or len(target) == 0:
raise MetagraphException('target', resources['value_null'])
if logical_expressions is None or len(logical_expressions) == 0:
raise MetagraphException('logical_expressions', resources['value_null'])
if interpretations is None or len(interpretations) == 0:
raise MetagraphException('interpretations', resources['value_null'])
if not source.issubset(self.variables_set):
raise MetagraphException('source', resources['not_a_subset'])
if not target.issubset(self.variables_set):
raise MetagraphException('target', resources['not_a_subset'])
# check expressions are over X_p
for logical_expression in logical_expressions:
logical_expression_copy = copy.copy(logical_expression)
logical_expression_copy = logical_expression_copy.replace('.', ' ')
logical_expression_copy = logical_expression_copy.replace('|', ' ')
logical_expression_copy = logical_expression_copy.replace('!', ' ')
logical_expression_copy = logical_expression_copy.replace('(', '')
logical_expression_copy.replace(')', '')
items = logical_expression_copy.split(' ')
for item in items:
item = item.replace(' ', '')
if item != '' and item not in self.propositions_set:
raise MetagraphException('logical_expression', resources['arguments_invalid'])
# check metapath exists for every interpretation
for interpretation in interpretations:
true_propositions = []
false_propositions = []
for tuple_elt in interpretation:
if tuple_elt[0] not in self.propositions_set:
raise MetagraphException('interpretations', resources['arguments_invalid'])
if tuple_elt[1] and tuple_elt[0] not in true_propositions:
true_propositions.append(tuple_elt[0])
elif tuple_elt[0] not in true_propositions:
false_propositions.append(tuple_elt[0])
# compute context metagraph
context = self.get_context(true_propositions, false_propositions)
metapaths = context.get_all_metapaths_from(source, target)
if metapaths is not None and len(metapaths) > 1:
return False
return True
[docs] def is_non_redundant(self, logical_expressions, interpretations):
""" Checks if a conditional metagraph is non redundant.
:param logical_expressions: list of strings
:param interpretations: lists of tuples
:return: boolean
"""
if logical_expressions is None or len(logical_expressions) == 0:
raise MetagraphException('logical_expressions', resources['value_null'])
if interpretations is None or len(interpretations) == 0:
raise MetagraphException('interpretations', resources['value_null'])
# check expressions are over X_p
for logical_expression in logical_expressions:
logical_expression_copy = copy.copy(logical_expression)
logical_expression_copy = logical_expression_copy.replace('.', ' ')
logical_expression_copy = logical_expression_copy.replace('|', ' ')
logical_expression_copy = logical_expression_copy.replace('!', ' ')
logical_expression_copy = logical_expression_copy.replace('(', '')
logical_expression_copy.replace(')', '')
items = logical_expression_copy.split(' ')
for item in items:
item = item.replace(' ', '')
if item != '' and item not in self.propositions_set:
raise MetagraphException('logical_expression', resources['arguments_invalid'])
# check metapath exists for at least one interpretation
for interpretation in interpretations:
true_propositions = []
false_propositions = []
for tuple_elt in interpretation:
if tuple_elt[0] not in self.propositions_set:
raise MetagraphException('interpretations', resources['arguments_invalid'])
if tuple_elt[1] and tuple_elt[0] not in true_propositions:
true_propositions.append(tuple_elt[0])
elif tuple_elt[0] not in true_propositions:
false_propositions.append(tuple_elt[0])
# compute context metagraph
context = self.get_context(true_propositions, false_propositions)
for x in self.variables_set:
edge_list = []
for edge in context.edges:
if x in edge.outvertex and edge not in edge_list:
edge_list.append(edge)
if len(edge_list) > 1:
return False
return True
def __repr__(self):
edge_desc = [repr(edge) for edge in self.edges]
full_desc = ''
for desc in edge_desc:
if full_desc == '':
full_desc = desc
else:
full_desc += ', ' + desc
desc = '%s(%s)' % (str(type(self)), full_desc)
desc = desc.replace('\\', '')
return desc
# noinspection PyShadowingNames,PyShadowingNames
@singleton
class MetagraphHelper:
""" Helper class that facilitates metagraph operations.
"""
def __init__(self):
pass
def add_adjacency_matrices(self, adjacency_matrix1, generator_set1, adjacency_matrix2, generator_set2):
""" Adds the two adjacency matrices provided and returns a combined matrix.
:param adjacency_matrix1: numpy.matrix
:param generator_set1: set
:param adjacency_matrix2: numpy.matrix
:param generator_set2: set
:return: numpy.matrix
"""
if adjacency_matrix1 is None:
raise MetagraphException('adjacency_matrix1', resources['value_null'])
if adjacency_matrix2 is None:
raise MetagraphException('adjacency_matrix2', resources['value_null'])
if generator_set1 is None:
raise MetagraphException('generator_set1', resources['value_null'])
if generator_set2 is None:
raise MetagraphException('generator_set2', resources['value_null'])
# check if the generating sets of the matrices overlap (otherwise no sense in combining metagraphs)
intersection = generator_set1.intersection(generator_set2)
if intersection is None:
raise MetagraphException('generator_sets', resources['no_overlap'])
#combined_adjacency_matrix = None
if len(generator_set1.difference(generator_set2)) == 0 and len(generator_set2.difference(generator_set1)) == 0:
# generating sets are identical..use adjacency matrices as is
size = len(generator_set1)
combined_adjacency_matrix = MetagraphHelper().get_null_matrix(size, size)
for i in range(size):
for j in range(size):
# take the union
if adjacency_matrix1[i][j] is None:
combined_adjacency_matrix[i][j] = adjacency_matrix2[i][j]
elif adjacency_matrix2[i][j] is None:
combined_adjacency_matrix[i][j] = adjacency_matrix1[i][j]
else:
temp = list()
temp.append(adjacency_matrix1[i][j])
temp.append(adjacency_matrix2[i][j])
combined_adjacency_matrix[i][j] = temp
else:
# generating sets overlap but are different...need to redefine adjacency matrices before adding them
combined_generating_set = generator_set1.union(generator_set2)
mg1 = Metagraph(combined_generating_set)
# add all metagraph1 edges
edge_list1 = self.get_edges_in_matrix(adjacency_matrix1, generator_set1)
for edge in edge_list1:
mg1.add_edge(edge)
modified_adjacency_matrix1 = mg1.adjacency_matrix().tolist()
mg2 = Metagraph(combined_generating_set)
# add all metagraph2 edges
edge_list2 = self.get_edges_in_matrix(adjacency_matrix2, generator_set2)
for edge in edge_list2:
mg2.add_edge(edge)
modified_adjacency_matrix2 = mg2.adjacency_matrix().tolist()
#combined_mg = Metagraph(combined_generating_set)
size = len(combined_generating_set)
combined_adjacency_matrix = MetagraphHelper().get_null_matrix(size, size)
for i in range(size):
for j in range(size):
# take the union
if modified_adjacency_matrix1[i][j] is None:
combined_adjacency_matrix[i][j] = modified_adjacency_matrix2[i][j]
elif modified_adjacency_matrix2[i][j] is None:
combined_adjacency_matrix[i][j] = modified_adjacency_matrix1[i][j]
else:
temp = modified_adjacency_matrix1[i][j]
for triple in modified_adjacency_matrix2[i][j]:
if not triple in modified_adjacency_matrix1[i][j]:
temp.append(triple)
combined_adjacency_matrix[i][j] = temp
return combined_adjacency_matrix
def get_triples(self, nested_triples_list):
""" Returns a list of non-nested Triple objects.
:param nested_triples_list: list of nested Triple objects
:return: list of Triple objects
"""
if nested_triples_list is None or len(nested_triples_list) == 0:
raise MetagraphException('triples_list', resources['value_null'])
result = []
if isinstance(nested_triples_list, list):
for elt in nested_triples_list:
if isinstance(elt, Triple):
result.append(elt)
else:
temp = self.get_triples(elt)
for item in temp:
result.append(item)
return result
def forms_cover(self,triples_set, target, x_j):
cumulative_output = {x_j}
for triple in triples_set:
# retain cooutputs
cumulative_output = cumulative_output.union(triple.cooutputs)
return target.issubset(cumulative_output)
def get_metapath_from_triples(self, source, target, triples_set):
edges=set()
for triple in triples_set:
edges = edges.union(triple.edges)
return Metapath(source,target,edges)
def multiply_adjacency_matrices(self, adjacency_matrix1, generator_set1, adjacency_matrix2, generator_set2):
""" Multiplies the two adjacency matrices provided and returns the result.
:param adjacency_matrix1: numpy.matrix
:param generator_set1: set
:param adjacency_matrix2: numpy.matrix
:param generator_set2: set
:return: numpy.matrix
"""
if adjacency_matrix1 is None:
raise MetagraphException('adjacency_matrix1', resources['value_null'])
if adjacency_matrix2 is None:
raise MetagraphException('adjacency_matrix2', resources['value_null'])
if generator_set1 is None:
raise MetagraphException('generator_set1', resources['value_null'])
if generator_set2 is None:
raise MetagraphException('generator_set2', resources['value_null'])
# check generating sets are identical
if not(len(generator_set1.difference(generator_set2)) == 0 and
len(generator_set2.difference(generator_set1)) == 0):
raise MetagraphException('generator_sets', resources['not_identical'])
size = len(generator_set1)
resultant_adjacency_matrix = MetagraphHelper().get_null_matrix(size, size)
for i in range(size):
for j in range(size):
resultant_adjacency_matrix[i][j] = self.multiply_components(adjacency_matrix1,
adjacency_matrix2,
generator_set1, i,
j, size)
#print('multiply_components')
return resultant_adjacency_matrix
def multiply_components(self, adjacency_matrix1, adjacency_matrix2, generator_set1, i, j, size):
""" Multiplies elements of two adjacency matrices.
:param adjacency_matrix1: numpy.matrix
:param adjacency_matrix2: numpy.matrix
:param generator_set1: set
:param i: int
:param j: int
:param size: int
:return: list of Triple objects.
"""
if adjacency_matrix1 is None:
raise MetagraphException('adjacency_matrix1', resources['value_null'])
if adjacency_matrix2 is None:
raise MetagraphException('adjacency_matrix2', resources['value_null'])
if generator_set1 is None or len(generator_set1) == 0:
raise MetagraphException('generator_set1', resources['value_null'])
result = []
# computes the outermost loop (ie., k=1...K where K is the size of each input matrix)
for k in range(size):
a_ik = adjacency_matrix1[i][k]
b_kj = adjacency_matrix2[k][j]
#print('multiply_triple_lists')
temp = self.multiply_triple_lists(a_ik, b_kj, list(generator_set1)[i],
list(generator_set1)[j], list(generator_set1)[k])
if temp is not None:
#print('len(temp): %s'%len(temp))
for triple in temp:
if not MetagraphHelper().is_triple_in_list(triple, result):
result.append(triple)
#if triple not in result: result.append(triple)
if len(result) == 0:
return None
return result
def multiply_components(self, adjacency_matrix1, adjacency_matrix2, generator_set1, i, j, size):
""" Multiplies elements of two adjacency matrices.
:param adjacency_matrix1: numpy.matrix
:param adjacency_matrix2: numpy.matrix
:param generator_set1: set
:param i: int
:param j: int
:param size: int
:return: list of Triple objects.
"""
if adjacency_matrix1 is None:
raise MetagraphException('adjacency_matrix1', resources['value_null'])
if adjacency_matrix2 is None:
raise MetagraphException('adjacency_matrix2', resources['value_null'])
if generator_set1 is None or len(generator_set1) == 0:
raise MetagraphException('generator_set1', resources['value_null'])
result = []
# computes the outermost loop (ie., k=1...K where K is the size of each input matrix)
for k in range(size):
a_ik = adjacency_matrix1[i][k]
b_kj = adjacency_matrix2[k][j]
temp = self.multiply_triple_lists(a_ik, b_kj, list(generator_set1)[i],
list(generator_set1)[j], list(generator_set1)[k])
if temp is not None and len(temp)>0:
result+=temp
if len(result) == 0:
return None
return list(set(result))
def multiply_triple_lists(self, triple_list1, triple_list2, x_i, x_j, x_k):
""" Multiplies two list of Triple objects and returns the result.
:param triple_list1: list of Triple objects
:param triple_list2: list of Triple objects
:param x_i: generator set element
:param x_j: generator set element
:param x_k: generator set element
:return: list of Triple objects
"""
if triple_list1 is None or triple_list2 is None:
return None
triples_list=[]
# computes the middle loop (ie., n=1...N where N is the size of triple_list1
for triple1 in triple_list1:
# computes the innermost loop (ie., m=1...M where M is the size of triple_list2
for triple2 in triple_list2:
temp = self.multiply_triples(triple1, triple2, x_i, x_j, x_k)
if temp is not None:
triples_list.append(temp)
if len(triples_list)>0:
return list(set(triples_list))
return []
@staticmethod
def multiply_triples(triple1, triple2, x_i, x_j, x_k):
""" Multiplies two Triple objects and returns the result.
:param triple1: Triple object
:param triple2: Triple object
:param x_i: generator set element
:param x_j: generator set element
:param x_k: generator set element
:return: Triple object
"""
if triple1 is None or triple2 is None:
return None
# compute alpha(R)
alpha_r = triple2.coinputs
if triple2.coinputs is None:
alpha_r = triple1.coinputs
elif triple1.coinputs is not None:
alpha_r = triple1.coinputs.union(triple2.coinputs)
if alpha_r is not None and triple1.cooutputs is not None:
alpha_r = alpha_r.difference(({x_i}).union(triple1.cooutputs))
elif alpha_r is not None:
alpha_r = alpha_r.difference(({x_i}))
# compute beta(R)
beta_r = triple2.cooutputs
if triple2.cooutputs is None:
beta_r = triple1.cooutputs
elif triple1.cooutputs is not None:
beta_r = triple1.cooutputs.union(triple2.cooutputs)
if beta_r is not None:
beta_r = beta_r.union({x_k})
beta_r = beta_r.difference({x_j})
else:
beta_r = {x_k}
beta_r = beta_r.difference(({x_j}))
# compute gamma(R)
truncated = []
if triple1.edges not in truncated:
if isinstance(triple1.edges, Edge):
truncated.append(triple1.edges)
else:
if isinstance(triple1.edges, list):
truncated = copy.copy(triple1.edges)
if triple2.edges not in truncated:
if isinstance(triple2.edges, Edge):
truncated.append(triple2.edges)
else:
truncated.append = copy.copy(triple2.edges)
gamma_r = truncated
return Triple(alpha_r, beta_r, gamma_r)
@staticmethod
def get_null_matrix(rows, cols):
""" Returns a null matrix of dimension rows x cols.
:param rows: int
:param cols: int
:return: list
"""
psi = None
result = []
for i in range(rows):
# noinspection PyUnusedLocal
item = [psi for j in range(cols)]
result.append(item)
return result
@staticmethod
def get_edges_in_matrix(adjacency_matrix, generator_set):
""" Returns the list of edges in the provided adjacency matrix.
:param adjacency_matrix: numpy.matrix
:param generator_set: set
:return: list of Edge objects
"""
if adjacency_matrix is None:
raise MetagraphException('adjacency_matrix', resources['value_null'])
if generator_set is None or len(generator_set) == 0:
raise MetagraphException('generator_set', resources['value_null'])
size = len(generator_set)
edge_list = []
for i in range(size):
for j in range(size):
if adjacency_matrix[i][j] is not None:
# list of triples
triples_list = adjacency_matrix[i][j]
for triple in triples_list:
# gamma_R describes the edge
if triple[2] not in edge_list:
edge_list.append(triple[2])
return edge_list
def get_edge_list(self, nested_edges):
""" Returns a non-nested list of edges.
:param nested_edges: nested list of Edge objects
:return: non-nested list of Edge objects.
"""
edge_list = []
if nested_edges is None or len(nested_edges) == 0:
return edge_list
for element in nested_edges:
if isinstance(element, list):
temp = self.get_edge_list(element)
for edge in temp:
if edge not in edge_list:
edge_list.append(edge)
elif isinstance(element, Edge):
if element not in edge_list:
edge_list.append(element)
return edge_list
def remove_edge_list(self, edges_to_remove, target_edge_list):
""" Removes a given set of edges from a target set.
:param edges_to_remove: a list of edges to remove
:param target_edge_list: a list of target edges
:return: a list of Edge objects.
"""
updated = []
for edge in target_edge_list:
if not self.is_edge_in_list(edge, edges_to_remove):
updated.append(edge)
return updated
def get_edges_from_triple_list(self, nested_triples):
""" Returns the edges present in a nested list of Triple objects.
:param nested_triples: nested list of Triple objects
:return: non-nested list of Triple objects
"""
result = []
if nested_triples is None:
return result
if isinstance(nested_triples, Triple):
return nested_triples.edges
elif isinstance(nested_triples, list):
for element in nested_triples:
temp = self.get_edges_from_triple_list(element)
if isinstance(temp, list):
for elt in temp:
if isinstance(elt, Edge) and (elt not in result):
result.append(elt)
elif isinstance(elt, list):
for elt2 in elt:
if elt2 not in result:
result.append(elt2)
elif isinstance(temp, Edge) and (temp not in result):
result.append(temp)
return result
def is_triple_in_list(self, triple, triples_list):
""" Checks whether a particular Triple object is in a given list of Triples.
:param triple: Triple object
:param triples_list: list of Triple object
:return: boolean
"""
if triple==None:
raise MetagraphException('triple', resources['value_null'])
if triples_list==None:
raise MetagraphException('triples_list', resources['value_null'])
result=False
for element in triples_list:
if self.are_triples_equal(triple, element):
result= True
break
return result
def is_edge_in_list(self, edge, nested_edges):
""" Checks whether a particular edge is in the nested list of edges.
:param edge: Edge object
:param nested_edges: nested list of Edge objects
:return: boolean
"""
if edge is None:
raise MetagraphException('edge', resources['value_null'])
if nested_edges is None:
raise MetagraphException('nested_edges', resources['value_null'])
result = False
if isinstance(nested_edges, list):
for element in nested_edges:
result = self.is_edge_in_list(edge, element)
if result:
break
elif isinstance(nested_edges, Edge):
if self.are_edges_equal(edge, nested_edges):
result = True
return result
def is_node_in_list(self, node, node_list):
""" Checks if a particular node is in the given list of nodes.
:param node: Node object
:param node_list: list of Node objects
:return: boolean
"""
if node is None:
raise MetagraphException('node', resources['value_null'])
if node_list is None:
raise MetagraphException('node_list', resources['value_null'])
result = False
if isinstance(node_list, list):
for element in node_list:
result = self.is_node_in_list(node, element)
if result:
break
elif isinstance(node_list, Node):
if self.are_nodes_equal(node, node_list):
result = True
return result
def are_triples_equal(self, triple1, triple2):
""" Checks if the two given triples are equal.
:param triple1: Triple object
:param triple2: Triple object
:return: boolean
"""
if triple1 is None:
raise MetagraphException('triple1', resources['value_null'])
if triple2 is None:
raise MetagraphException('triple2', resources['value_null'])
if not isinstance(triple1, Triple):
raise MetagraphException('triple1', resources['format_invalid'])
if not isinstance(triple2, Triple):
raise MetagraphException('triple2', resources['format_invalid'])
edge_list_match = True
for edge in triple1.edges:
if not self.is_edge_in_list(edge, triple2.edges):
edge_list_match = False
break
for edge in triple2.edges:
if not self.is_edge_in_list(edge, triple1.edges):
edge_list_match = False
break
return (triple1.coinputs == triple2.coinputs and
triple1.cooutputs == triple2.cooutputs and
len(triple1.edges) == len(triple2.edges) and
edge_list_match)
@staticmethod
def are_edges_equal(edge1, edge2):
""" Checks if the two given edges are equal.
:param edge1: Edge object
:param edge2: Edge object
:return: boolean
"""
if edge1 is None:
raise MetagraphException('edge1', resources['value_null'])
if edge2 is None:
raise MetagraphException('edge2', resources['value_null'])
if not isinstance(edge1, Edge):
raise MetagraphException('edge1', resources['format_invalid'])
if not isinstance(edge2, Edge):
raise MetagraphException('edge2', resources['format_invalid'])
if edge1.attributes is not None and edge2.attributes is not None:
return (edge1.invertex == edge2.invertex and
edge1.outvertex == edge2.outvertex and
set(edge1.attributes) == set(edge2.attributes) and
edge1.label == edge2.label)
else:
return (edge1.invertex == edge2.invertex and
edge1.outvertex == edge2.outvertex and
edge1.label == edge2.label)
@staticmethod
def are_nodes_equal(node1, node2):
""" Checks if two given nodes are equal.
:param node1: Node object
:param node2: Node object
:return: boolean
"""
if node1 is None:
raise MetagraphException('node1', resources['value_null'])
if node2 is None:
raise MetagraphException('node2', resources['value_null'])
if not isinstance(node1, Node):
raise MetagraphException('node1', resources['format_invalid'])
if not isinstance(node2, Node):
raise MetagraphException('node2', resources['format_invalid'])
return node1.element_set == node2.element_set
@staticmethod
def is_edge_list_included_recursive(edges, reference_edge_list):
""" Checks if an edge list is included in the reference edge list.
:param edges: list of Edge objects
:param reference_edge_list: reference lists of Edge objects
:return: boolean
"""
if edges is None or len(edges) == 0:
raise MetagraphException('edges', resources['value_null'])
if reference_edge_list is None or len(reference_edge_list) == 0:
raise MetagraphException('reference_edge_list', resources['value_null'])
for ref_edges in reference_edge_list:
inclusive_list = True
for edge1 in edges:
if not MetagraphHelper().is_edge_in_list(edge1, ref_edges):
#match=False
#for edge2 in ref_edges:
# if edge1.invertex==edge2.invertex and edge1.outvertex==edge2.outvertex:
# match=True
# break
#if not match:
inclusive_list = False
break
if inclusive_list and len(edges) == len(ref_edges):
return True
return False
@staticmethod
def is_edge_list_included(edge_list1, edge_list2):
""" Checks if an edge list is included in edge_list2.
:param edge_list1: first list of Edge objects
:param edge_list2: second list of Edge objects
:return: boolean
"""
if edge_list1 is None or len(edge_list1) == 0:
raise MetagraphException('edge_list1', resources['value_null'])
if edge_list2 is None or len(edge_list2) == 0:
raise MetagraphException('edge_list2', resources['value_null'])
return set(edge_list1).issubset(set(edge_list2))
def get_netinputs(self, edge_list):
""" Retrieves a list of net inputs corresponding to the given edge list.
:param edge_list: list of Edge objects
:return: list
"""
if edge_list is None or len(edge_list) == 0:
raise MetagraphException('edge_list', resources['value_null'])
all_inputs = []
for edge in edge_list:
if isinstance(edge, Edge):
for input_elt in edge.invertex:
if input_elt not in all_inputs:
all_inputs.append(input_elt)
elif isinstance(edge, list):
temp = self.get_netinputs(edge)
for item in temp:
if item not in all_inputs:
all_inputs.append(item)
return all_inputs
def get_netoutputs(self, edge_list):
""" Retrieves a list of net outputs corresponding to the given edge list.
:param edge_list: list of Edge objects
:return: list
"""
if edge_list is None or len(edge_list) == 0:
raise MetagraphException('edge_list', resources['value_null'])
all_outputs = []
for edge in edge_list:
if isinstance(edge, Edge):
for output in edge.outvertex:
if output not in all_outputs:
all_outputs.append(output)
elif isinstance(edge, list):
temp = self.get_netoutputs(edge)
for item in temp:
if item not in all_outputs:
all_outputs.append(item)
return all_outputs
@staticmethod
def get_coinputs_from_triples(triples_list):
""" Retrieves a list of co-inputs corresponding to the given triples list.
:param triples_list: list of Triple objects
:return: list
"""
if triples_list is None or len(triples_list) == 0:
raise MetagraphException('triples_list', resources['value_null'])
all_coinputs = []
for triple in triples_list:
if triple.coinputs is not None:
for coinput in triple.coinputs:
if coinput not in all_coinputs:
all_coinputs.append(coinput)
return all_coinputs
@staticmethod
def get_cooutputs_from_triples(triples_list):
""" Retrieves a list of co-outputs corresponding to the given triples list.
:param triples_list: list of Triple objects
:return: list
"""
if triples_list is None or len(triples_list) == 0:
raise MetagraphException('triples_list', resources['value_null'])
all_cooutputs = []
for triple in triples_list:
if triple.cooutputs is not None:
for cooutput in triple.cooutputs:
if cooutput not in all_cooutputs:
all_cooutputs.append(cooutput)
return all_cooutputs
def extract_edge_list(self, nested_edge_list):
""" Retrieves a non-nested edge list from the given nested list.
:param nested_edge_list: nested list of Edge objects
:return: non-nested list of Edge objects.
"""
if nested_edge_list is None or len(nested_edge_list) == 0:
raise MetagraphException('nested_edge_list', resources['value_null'])
result = []
for element in nested_edge_list:
if isinstance(element, list):
temp = self.extract_edge_list(element)
for item in temp:
result.append(item)
elif isinstance(element, Edge):
result.append(element)
return result
def node_lists_overlap(self, nodes_list1, nodes_list2):
""" Checks if two lists of nodes overlap.
:param nodes_list1: list of Node objects
:param nodes_list2: list of Node objects
:return: boolean
"""
for node1 in nodes_list1:
if self.is_node_in_list(node1, nodes_list2):
return True
return False
def generate_visualisation(self, edge_list, file_path):
try:
clusters = dict()
edges = []
index = 0
for edge in edge_list:
inv=None
outv=None
if len(list(edge.invertex))>1:
# is a cluster
if edge.invertex not in clusters.values():
# create new
clusters[index] = edge.invertex
inv = index
index+=1
else:
# use existing
inv = clusters.values().index(edge.invertex)
else:
# indiv node
inv = list(edge.invertex)[0]
inv = inv.strip()
inv = inv.replace(' ','_')
inv = inv.replace('-','_')
inv = inv.replace('"','')
inv = inv.replace(';','')
if len(list(edge.outvertex))>1:
# a cluster
if edge.outvertex not in clusters.values():
# create new
clusters[index] = edge.outvertex
outv=index
index+=1
else:
# use existing
outv = clusters.values().index(edge.outvertex)
else:
# indiv node
outv = list(edge.outvertex)[0]
outv = outv.strip()
outv = outv.replace(' ','_')
outv = outv.replace('-','_')
outv = outv.replace('"','')
outv = outv.replace(';','')
if (inv, outv) not in edges:
edges.append((inv, outv))
dot_output=[]
dot_output.append('digraph G { \n')
dot_output.append('compound=true; \n')
# clusters first
for index, content in clusters.iteritems():
dot_output.append('subgraph cluster%s { \n'%index)
for elt in list(content):
#if '2202' in elt:
# print('test')
elt = elt.strip()
elt = elt.replace(' ','_')
elt = elt.replace('-','_')
elt = elt.replace('"','')
elt = elt.replace(';','')
dot_output.append('%s; \n'%elt)
dot_output.append('} \n')
# add all edges
for edge in edges:
inv = None
outv=None
inv_cluster=None
outv_cluster=None
if isinstance(edge[0],int):
# invertex is a cluster
inv_cluster='cluster%s'%edge[0]
inv = clusters[edge[0]]
else:
# invertex is an individual node
inv = edge[0]
if isinstance(edge[1],int):
# outvertex is a cluster
outv_cluster='cluster%s'%edge[0]
outv = clusters[edge[1]]
else:
# outvertex is an individual node
outv = edge[1]
if isinstance(inv,set) and isinstance(outv,set):
# inv, outv both clusters
a = list(inv)[0]
b = list(outv)[0]
a = a.strip()
a = a.replace(' ','_')
a = a.replace('-','_')
a = a.replace('"','')
a = a.replace(';','')
b = b.strip()
b = b.replace(' ','_')
b = b.replace('-','_')
b = b.replace('"','')
b = b.replace(';','')
dot_output.append('%s -> %s [ltail=%s,lhead=%s]; \n'%(a,b,inv_cluster,outv_cluster))
elif isinstance(inv,set) and isinstance(outv,str):
# inv is cluster, outv is string
a = list(inv)[0]
a = a.strip()
a = a.replace(' ','_')
a = a.replace('-','_')
a = a.replace('"','')
a = a.replace(';','')
dot_output.append('%s -> %s [ltail=%s]; \n'%(a,outv,inv_cluster))
elif isinstance(inv,str) and isinstance(outv,set):
# inv is string, outv is cluster
b = list(outv)[0]
b = b.strip()
b = b.replace(' ','_')
b = b.replace('-','_')
b = b.replace('"','')
b = b.replace(';','')
dot_output.append('%s -> %s [lhead=%s]; \n'%(inv,b,outv_cluster))
else:
# inv, outv both string
dot_output.append('%s -> %s; \n'%(inv,outv))
# write output to .dot file
dot_output.append('} \n')
dot_file_text=''
for line in dot_output:
dot_file_text +=line
#write policy file
dot_file=open(file_path,'w')
dot_file.write(dot_file_text)
dot_file.close()
except BaseException, e:
print('generate_visualisation:: Error- %s'%e)
@staticmethod
def nodes_overlap(nodes_list1, nodes_list2):
""" Checks if two lists of nodes overlap.
:param nodes_list1: list of Node objects
:param nodes_list2: list of Node objects
:return: boolean
"""
for node1 in nodes_list1:
for node2 in nodes_list2:
intersection = node1.get_element_set().intersection(node2.get_element_set())
if intersection is not None and len(intersection) > 0:
return True
return False
@staticmethod
def get_generating_set(edge_list):
""" Retrieves the generating set of the metagraph from its edge list.
:param edge_list: list of Edge objects
:return: set
"""
if edge_list is None or len(edge_list) == 0:
raise MetagraphException('edge_list', resources['value_null'])
generating_set = []
for edge in edge_list:
for input_elt in edge.invertex:
if input_elt not in generating_set:
generating_set.append(input_elt)
for output in edge.outvertex:
if output not in generating_set:
generating_set.append(output)
return set(generating_set)
@staticmethod
def get_element_set(nodes_list):
""" Retrieves the set of elements within a given list of nodes
:param nodes_list: list of Node objects
:return: set
"""
if nodes_list is not None and len(nodes_list) > 0:
result = set()
for node in nodes_list:
result = result.union(node.get_element_set())
return result
return set()
@staticmethod
def transpose_matrix(matrix):
""" Computes the transpose matrix of given matrix
:param matrix: 2D array
:return: 2D array
"""
if matrix is None:
raise MetagraphException('matrix', resources['value_null'])
#rows = len(matrix)
cols = len(matrix[0])
result = []
for j in range(cols):
column = [row[j] for row in matrix]
result.append(column)
return result
@staticmethod
def custom_multiply_matrices(matrix1, matrix2, edge_list):
"""Multiplies the Triple lists of two matrices
:param matrix1: 2D array
:param matrix2: 2D array
:param edge_list: list of Edge objects
:return: 2D array
"""
if matrix1 is None:
raise MetagraphException('matrix1', resources['value_null'])
if matrix2 is None:
raise MetagraphException('matrix2', resources['value_null'])
if edge_list is None or len(edge_list) == 0:
raise MetagraphException('edge_list', resources['value_null'])
matrix1_cols = len(matrix1[0])
matrix2_rows = len(matrix2)
if matrix1_cols != matrix2_rows:
raise MetagraphException('matrix1, matrix2', resources['structures_incompatible'])
result = MetagraphHelper().get_null_matrix(len(matrix1), len(matrix2[0]))
for i in range(len(matrix1)):
for j in range(len(matrix2[0])):
intermediate_result = set()
for k in range(len(matrix1[0])):
a_ik = matrix1[i][k]
b_kj = matrix2[k][j]
temp = MetagraphHelper().custom_add_matrix_elements(k, a_ik, b_kj, edge_list)
intermediate_result = intermediate_result.union(temp)
result[i][j] = intermediate_result
return result
@staticmethod
def custom_add_matrix_elements(k, a_ik, b_kj, y):
"""Custom addition of matrix elements.
:param k: int
:param a_ik: int
:param b_kj: int
:param y: list
:return: set
"""
if len(y) < k+1:
raise MetagraphException('k', resources['value_out_of_bounds'])
if a_ik == 1 and b_kj == -1:
return {(1, y[k])}
elif a_ik == -1 and b_kj == -1:
return {(-1, y[k])}
else:
return set()
@staticmethod
def extract_edge_label_components(label):
"""Extracts components of an edge label.
:param label: string
:return: string tuple
"""
if label is None or label == '':
raise MetagraphException('label', resources['value_null'])
label = label.replace('>', '')
items = label.split('<')
if len(items) < 2:
raise MetagraphException('label', resources['format_invalid'])
r_ij = {items[0]}
tuples = items[1].split(';')
if len(tuples) < 2:
raise MetagraphException('label', resources['format_invalid'])
t_a = {tuples[0]}
t_b = {tuples[1]}
# noinspection PyRedundantParentheses
return (r_ij, t_a, t_b)
@staticmethod
def get_pre_requisites_list(pre_requisites_desc):
result=[]
if pre_requisites_desc=='NA':
return result
items= pre_requisites_desc.split(' or ')
for item in items:
item = item.replace('(','')
item = item.replace(')','')
sub_items = item.split(' and ')
if len(sub_items)>1 and sub_items not in result:
result.append(sub_items)
elif item not in result:
result.append(item)
return result