import numpy as np
from skimage.morphology import skeletonize
from scipy.signal import convolve2d
from PVBM.helpers.tortuosity import compute_tortuosity
from PVBM.helpers.perimeter import compute_perimeter_
from PVBM.helpers.branching_angle import compute_angles_dictionary
#from PVBM.helpers.far import far
from PVBM.GraphRegularisation.GraphRegularisation import TreeReg
from PVBM.GraphCRE.GraphCentralRetinalEquivalent import Tree
[docs]
class CREVBMs:
"""A class that can perform geometrical biomarker computation for a fundus image.
"""
def extract_subgraphs(self,graphs, x_c, y_c):
"""
Extract B, the a graph where each of the disconnected subgraph is labeled differently and D which contains the euclidian distance graph between each
point in the graph and the optic disc.
:param graphs: Original blood vessel segmentation graph
:type graphs: array
:param x_c: x axis of the optic disc center
:type x_c: int
:param y_c: y axis of the optic disc center
:type y_c: int
:return: B,D
:rtype: tuple
"""
B = np.zeros_like(graphs)
D = np.zeros_like(graphs)
n = 1
for i in range(graphs.shape[0]):
for j in range(graphs.shape[1]):
if B[i, j] == 0 and graphs[i, j] == 1:
self.recursive_subgraph(graphs, B, D, i, j, n, x_c, y_c)
n += 1
return B, D
def recursive_subgraph(self,A, B, D, i, j, n, x_c, y_c):
"""
Recursively extract the value within B and D.
:param A: Original blood vessel segmentation graph
:type A: array
:param B: A graph where each of the disconnected subgraph is labeled differentl, which is initialized by a zeros matrix and recursively built
:type B: array
:param D: Euclidian Distance graph between each point in A and the optic disc center (x_c,y_c), which is initialized by a zeros matrix and recursively built
:type D: array
:param i: Current x axis location within the graph
:type i: int
:param j: Current y axis location within the graph
:type j: int
:param n: Current number of point distance since the optic disc
:type n: int
:param x_c: x axis of the optic disc center
:type x_c: int
:param y_c: y axis of the optic disc center
:type y_c: int
:return: B,D
:rtype: tuple
"""
up = (i - 1, j)
down = (i + 1, j)
left = (i, j - 1)
right = (i, j + 1)
up_left = (i - 1, j - 1)
up_right = (i - 1, j + 1)
down_left = (i + 1, j - 1)
down_right = (i + 1, j + 1)
points = [up, down, left, right, up_left, up_right, down_left, down_right]
for point in points:
if point[0] >= 0 and point[0] < B.shape[0] and point[1] < B.shape[1] and point[1] >= 0:
if A[point] == 1:
B[point] = n
A[point] = 0
D[point] = ((y_c - point[0]) ** 2 + (x_c - point[1]) ** 2) ** 0.5
self.recursive_subgraph(A, B, D, point[0], point[1], n, x_c, y_c)
def central_equivalent(self, l,formula):
"""
Recursively compute the central retinal equivalent given a list of blood vessel measurement and a formula to
:param l: list of blood vessel measurements
:type l: List
:param formula: One of the following formulas: PVBM.CentralRetinalAnalysis.crae_hubbard, PVBM.CentralRetinalAnalysis.crve_hubbard, PVBM.CentralRetinalAnalysis.crae_knudtson or PVBM.CentralRetinalAnalysis.crve_knudtson
:type formula: python function
:return: the computed central retinal equivalent VBM
:rtype: float
"""
l = np.sort(l[:])
new_l = []
pivot =None
if len(l) == 1:
return l[0]
if len(l)%2 !=0:
idx_pivot = len(l)//2
pivot = l[idx_pivot]
for i in range(len(l)//2):
new_l.append(formula(l[i],l[-i-1]))
if pivot != None:
new_l.append(pivot)
return self.central_equivalent(new_l,formula)
def crae_hubbard(self,w1, w2):
"""
Recursive formulas of the Central Retinal Arteriolar Equivalent according to Hubbard et al.
:param w1: w1 vessel measurement
:type w1: float
:param w2: w2 vessel measurement
:type w2: float
:return: the root vessel of w1 and w2 estimation
:rtype: float
"""
return (0.87 * w1 ** 2 + 1.01 * w2 ** 2 - 0.22 * w1 * w2 - 10.76) ** 0.5
def crve_hubbard(self,w1, w2):
"""
Recursive formulas of the Central Retinal Venular Equivalent according to Hubbard et al.
:param w1: w1 vessel measurement
:type w1: float
:param w2: w2 vessel measurement
:type w2: float
:return: the root vessel of w1 and w2 estimation
:rtype: float
"""
return (0.72 * w1 ** 2 + 0.91 * w2 ** 2 + 450.05) ** 0.5
def crae_knudtson(self,w1, w2):
"""
Recursive formulas of the Central Retinal Arteriolar Equivalent according to Knudtson et al.
:param w1: w1 vessel measurement
:type w1: float
:param w2: w2 vessel measurement
:type w2: float
:return: the root vessel of w1 and w2 estimation
:rtype: float
"""
return 0.88 * (w1 ** 2 + w2 ** 2) ** 0.5
def crve_knudtson(self,w1, w2):
"""
Recursive formulas of the Central Retinal Venular Equivalent according to Knudtson et al.
:param w1: w1 vessel measurement
:type w1: float
:param w2: w2 vessel measurement
:type w2: float
:return: the root vessel of w1 and w2 estimation
:rtype: float
"""
return 0.95 * (w1 ** 2 + w2 ** 2) ** 0.5
[docs]
def apply_roi(self, segmentation, skeleton, zones_ABC):
"""
Apply a region of interest (ROI) mask to the segmentation and skeleton images.
:param segmentation: The segmentation image containing binary values within {0, 1}.
:type segmentation: np.array
:param skeleton: The skeleton image containing binary values within {0, 1}.
:type skeleton: np.array
:param zones_ABC: A mask image used to exclude specific zones, where the second channel defines the exclusion areas.
:type zones_ABC: np.array
:return: A tuple containing:
- The modified segmentation image with the ROI applied.
- The modified skeleton image with the ROI applied.
:rtype: Tuple[np.array, np.array]
"""
zone_A_ = zones_ABC[:, :, 1] / 255
zone_B_ = zones_ABC[:, :, 0] / 255
zone_C_ = zones_ABC[:, :, 2] / 255
roi = (zone_C_ - zone_B_)
segmentation_roi = (segmentation * roi)
skeleton_roi = (skeleton * roi)
return segmentation_roi, skeleton_roi
[docs]
def compute_central_retinal_equivalents(self,blood_vessel, skeleton, xc,yc, radius, artery = True, Toplot = False):
"""
Compute the CRAE or CRVE equivalent for a given blood vessel graph.
:param blood_vessel: blood_vessel segmentation containing binary values within {0,1}
:type blood_vessel: np.array
:param skeleton: blood_vessel segmentation skeleton containing binary values within {0,1}
:type skeleton: np.array
:param xc: x axis of the optic disc center
:type xc: int
:param yc: y axis of the optic disc center
:type yc: int
:param radius: radius in pixel of the optic disc
:type radius: int
:param artery: Flag to decide if to use CRAE or CRVE formulas (artery to True means CRAE, and to False means CRVE)
:type artery: Bool
:param Toplot: Flag to decide if to store the visualisation element. (Setting it to true use more RAM)
:type Toplot: Bool
:return: A tuple containing:
- A result dictionnary (Dict): Dictionnary containing the computer CRE (-1 if it has failed)
- plotable_list (List): A summary that contains the topology information required to plot the visualisation (really useful when Toplot is True). Return None if the computation has failed.
:rtype: Tuple[Dict, List]
"""
radius_zone_C = int(3 * radius)
## Extract the distances graphs
B, D = self.extract_subgraphs(graphs=skeleton.copy(), x_c=xc, y_c=yc)
## Extract the starting points list by navigating through the skeleton graph
starting_points = np.zeros((skeleton.shape[0], skeleton.shape[1]), dtype=float)
for i in set(list(B.reshape(-1))) - {0}:
mask = B == i
if mask.sum() >= 50:
min_index = (D * mask + (1 - mask) * 1e10).argmin()
min_coordinates = np.unravel_index(min_index, D.shape)
# print(((min_coordinates[0] - yc)**2 + (min_coordinates[1] - xc)**2)**0.5)
if ((min_coordinates[0] - yc) ** 2 + (min_coordinates[1] - xc) ** 2) ** 0.5 < 20 + 2 * radius:
starting_points[min_coordinates[0], min_coordinates[1]] = 1
starting_point_list = np.argwhere(starting_points == 1)
### Cleaning the skeleton graph irregularities
B = np.zeros((blood_vessel.shape[0], blood_vessel.shape[1]))
tree_reg_list = []
plot = np.zeros((blood_vessel.shape[0], blood_vessel.shape[1]))
for idx_start in starting_point_list:
tree = TreeReg(idx_start[0], idx_start[1])
tree.recursive_reg(skeleton.copy(), idx_start[0], idx_start[1], 0, tree, plot)
tree_reg_list.append(tree)
plot_list = []
for tree_reg in tree_reg_list:
plot = np.zeros((blood_vessel.shape[0], blood_vessel.shape[1]))
tree_reg.print_reg(tree_reg, plot)
plot_list.append(plot.copy())
skoustideB_reg = np.sum(np.array(plot_list), axis=0)
B = np.zeros_like(skeleton)
D = np.zeros_like(skeleton)
plotted_tmp = None if Toplot is False else np.zeros((blood_vessel.shape[0], blood_vessel.shape[1]))
#####Initialising a dictionary that will be used to store the topology of th graph
dico = {}
####Navigating through the graph to fill the the topology dico
for idx_start in starting_point_list:
i, j = idx_start
tree = Tree((idx_start[0],idx_start[1]),(idx_start[0],idx_start[1]))
tree.iterative_CRE(skeleton.copy(), B, D, idx_start[0], idx_start[1], 1, None, None, None, None, None,
None, None, None, None, None, None, None, plotted_tmp, i, j, tree, blood_vessel, xc, yc,
radius_zone_C)
dico[(idx_start[0], idx_start[1])] = tree
tree_list = list(dico.values())
####Second graph navigation to check consistency of the topology dico (parent vessel is supposed to be bigger than its children).
final_list = []
for i in range(len(tree_list)):
tmp = tree_list[i].print_correct(tree_list[i], max_d=0)
if type(tmp[2]) is not int:
if tmp[0].startpoint != tmp[2].startpoint and np.mean(tmp[0].diameter_list) > np.mean(
tmp[2].diameter_list) + 2:
print('Potential error', i)
idx_start = tmp[0].startpoint
i, j = idx_start
tree = Tree((idx_start[0], idx_start[1]), (idx_start[0], idx_start[1]))
tree.iterative_CRE(skoustideB_reg.copy(), B, D, idx_start[0], idx_start[1], 1, None, None, None,
None, None, None, None, None, None, None, None, None, plotted_tmp, i, j, tree, blood_vessel,
xc, yc, radius_zone_C)
final_list.append(tree) # .remove_children())
else:
final_list.append(tmp[2]) # .remove_children())
else:
final_list.append(tmp[0]) # .remove_children())
### Aggregating the topology dico results
plotable_list = []
for tree in final_list:
tree.plotable_show_tree(plotable_list, blood_vessel.shape, Toplot = Toplot)
### Compute and return the CRE VBMs
if len(plotable_list) != 0:
blood_vessel_lista = []
blood_vessel_lista.append(plotable_list[0]['Mean diameter'])
for j in range(1, len(plotable_list)):
blood_vessel_lista.append(plotable_list[j]['Mean diameter'])
if artery:
craek = self.central_equivalent(np.sort(blood_vessel_lista)[-6:], self.crae_knudtson)
craeh = self.central_equivalent(np.sort(blood_vessel_lista)[-6:], self.crae_hubbard)
return ({"craek": craek, "craeh":craeh}, plotable_list)
else:
crvek = self.central_equivalent(np.sort(blood_vessel_lista)[-6:], self.crve_knudtson)
crveh = self.central_equivalent(np.sort(blood_vessel_lista)[-6:], self.crve_hubbard)
return ({"crvek": crvek, "crveh": crveh}, plotable_list)
else:
if artery:
return ({"craek": -1, "craeh": -1}, None)
else:
return ({"crvek": -1, "crveh": -1}, None)
if __name__ == "__main__":
import numpy as np
from skimage.morphology import skeletonize
from PIL import Image
center = (460,689)
radius = 110
blood_vessel_segmentation_path = '/Users/jonathanfhima/Desktop/test123/segmentation/seg.png'
segmentation = np.array(Image.open(blood_vessel_segmentation_path)) / 255 # Open the segmentation
segmentation = segmentation[:,:,2]
skeleton = skeletonize(segmentation) * 1
creVBM = CREVBMs() # Instanciate a geometrical VBM object
#roi = '/Users/jonathanfhima/Library/Containers/13E15484-8207-4BD5-BEA5-CB0FAD85FCF7/Data/Documents/test123/ROI/DR_2_ICDR.jpg'
#roi = np.array(Image.open(roi))
zones_ABC = '/Users/jonathanfhima/Desktop/test123/zones_ABC/zones.jpg'
zones_ABC = np.array(Image.open(zones_ABC))
segmentation_roi, skeleton_roi = creVBM.apply_roi(
segmentation=segmentation,
skeleton=skeleton,
zones_ABC=zones_ABC,
)
vbms, visual = creVBM.compute_central_retinal_equivalents(
blood_vessel=segmentation_roi,
skeleton=skeleton_roi,
xc=center[0],
yc=center[1],
radius=radius
)