Start Typing !!! type the highlighted character. You can't scroll once you start typing.
filename

bidirectional_a_star.py

"""

https://en.wikipedia.org/wiki/Bidirectional_search

"""

import time

from math import sqrt

from typing import List, Tuple

# 1 for manhattan, 0 for euclidean

HEURISTIC = 0

grid = [

[0, 0, 0, 0, 0, 0, 0],

[0, 1, 0, 0, 0, 0, 0], # 0 are free path whereas 1's are obstacles

[0, 0, 0, 0, 0, 0, 0],

[0, 0, 1, 0, 0, 0, 0],

[1, 0, 1, 0, 0, 0, 0],

[0, 0, 0, 0, 0, 0, 0],

[0, 0, 0, 0, 1, 0, 0],

]

delta = [[-1, 0], [0, -1], [1, 0], [0, 1]] # up, left, down, right

class Node:

"""

>>> k = Node(0, 0, 4, 3, 0, None)

>>> k.calculate_heuristic()

5.0

>>> n = Node(1, 4, 3, 4, 2, None)

>>> n.calculate_heuristic()

2.0

>>> l = [k, n]

>>> n == l[0]

False

>>> l.sort()

>>> n == l[0]

True

"""

def __init__(self, pos_x, pos_y, goal_x, goal_y, g_cost, parent):

self.pos_x = pos_x

self.pos_y = pos_y

self.pos = (pos_y, pos_x)

self.goal_x = goal_x

self.goal_y = goal_y

self.g_cost = g_cost

self.parent = parent

self.h_cost = self.calculate_heuristic()

self.f_cost = self.g_cost + self.h_cost

def calculate_heuristic(self) -> float:

"""

Heuristic for the A*

"""

dy = self.pos_x - self.goal_x

dx = self.pos_y - self.goal_y

if HEURISTIC == 1:

return abs(dx) + abs(dy)

else:

return sqrt(dy ** 2 + dx ** 2)

def __lt__(self, other) -> bool:

return self.f_cost < other.f_cost

class AStar:

"""

>>> astar = AStar((0, 0), (len(grid) - 1, len(grid[0]) - 1))

>>> (astar.start.pos_y + delta[3][0], astar.start.pos_x + delta[3][1])

(0, 1)

>>> [x.pos for x in astar.get_successors(astar.start)]

[(1, 0), (0, 1)]

>>> (astar.start.pos_y + delta[2][0], astar.start.pos_x + delta[2][1])

(1, 0)

>>> astar.retrace_path(astar.start)

[(0, 0)]

>>> astar.search() # doctest: +NORMALIZE_WHITESPACE

[(0, 0), (1, 0), (2, 0), (2, 1), (2, 2), (2, 3), (3, 3),

(4, 3), (4, 4), (5, 4), (5, 5), (6, 5), (6, 6)]

"""

def __init__(self, start, goal):

self.start = Node(start[1], start[0], goal[1], goal[0], 0, None)

self.target = Node(goal[1], goal[0], goal[1], goal[0], 99999, None)

self.open_nodes = [self.start]

self.closed_nodes = []

self.reached = False

def search(self) -> List[Tuple[int]]:

while self.open_nodes:

# Open Nodes are sorted using __lt__

self.open_nodes.sort()

current_node = self.open_nodes.pop(0)

if current_node.pos == self.target.pos:

self.reached = True

return self.retrace_path(current_node)

self.closed_nodes.append(current_node)

successors = self.get_successors(current_node)

for child_node in successors:

if child_node in self.closed_nodes:

continue

if child_node not in self.open_nodes:

self.open_nodes.append(child_node)

else:

# retrieve the best current path

better_node = self.open_nodes.pop(self.open_nodes.index(child_node))

if child_node.g_cost < better_node.g_cost:

self.open_nodes.append(child_node)

else:

self.open_nodes.append(better_node)

if not (self.reached):

return [(self.start.pos)]

def get_successors(self, parent: Node) -> List[Node]:

"""

Returns a list of successors (both in the grid and free spaces)

"""

successors = []

for action in delta:

pos_x = parent.pos_x + action[1]

pos_y = parent.pos_y + action[0]

if not (0 <= pos_x <= len(grid[0]) - 1 and 0 <= pos_y <= len(grid) - 1):

continue

if grid[pos_y][pos_x] != 0:

continue

successors.append(

Node(

pos_x,

pos_y,

self.target.pos_y,

self.target.pos_x,

parent.g_cost + 1,

parent,

)

)

return successors

def retrace_path(self, node: Node) -> List[Tuple[int]]:

"""

Retrace the path from parents to parents until start node

"""

current_node = node

path = []

while current_node is not None:

path.append((current_node.pos_y, current_node.pos_x))

current_node = current_node.parent

path.reverse()

return path

class BidirectionalAStar:

"""

>>> bd_astar = BidirectionalAStar((0, 0), (len(grid) - 1, len(grid[0]) - 1))

>>> bd_astar.fwd_astar.start.pos == bd_astar.bwd_astar.target.pos

True

>>> bd_astar.retrace_bidirectional_path(bd_astar.fwd_astar.start,

... bd_astar.bwd_astar.start)

[(0, 0)]

>>> bd_astar.search() # doctest: +NORMALIZE_WHITESPACE

[(0, 0), (0, 1), (0, 2), (1, 2), (1, 3), (2, 3), (2, 4),

(2, 5), (3, 5), (4, 5), (5, 5), (5, 6), (6, 6)]

"""

def __init__(self, start, goal):

self.fwd_astar = AStar(start, goal)

self.bwd_astar = AStar(goal, start)

self.reached = False

def search(self) -> List[Tuple[int]]:

while self.fwd_astar.open_nodes or self.bwd_astar.open_nodes:

self.fwd_astar.open_nodes.sort()

self.bwd_astar.open_nodes.sort()

current_fwd_node = self.fwd_astar.open_nodes.pop(0)

current_bwd_node = self.bwd_astar.open_nodes.pop(0)

if current_bwd_node.pos == current_fwd_node.pos:

self.reached = True

return self.retrace_bidirectional_path(

current_fwd_node, current_bwd_node

)

self.fwd_astar.closed_nodes.append(current_fwd_node)

self.bwd_astar.closed_nodes.append(current_bwd_node)

self.fwd_astar.target = current_bwd_node

self.bwd_astar.target = current_fwd_node

successors = {

self.fwd_astar: self.fwd_astar.get_successors(current_fwd_node),

self.bwd_astar: self.bwd_astar.get_successors(current_bwd_node),

}

for astar in [self.fwd_astar, self.bwd_astar]:

for child_node in successors[astar]:

if child_node in astar.closed_nodes:

continue

if child_node not in astar.open_nodes:

astar.open_nodes.append(child_node)

else:

# retrieve the best current path

better_node = astar.open_nodes.pop(

astar.open_nodes.index(child_node)

)

if child_node.g_cost < better_node.g_cost:

astar.open_nodes.append(child_node)

else:

astar.open_nodes.append(better_node)

if not self.reached:

return [self.fwd_astar.start.pos]

def retrace_bidirectional_path(

self, fwd_node: Node, bwd_node: Node

) -> List[Tuple[int]]:

fwd_path = self.fwd_astar.retrace_path(fwd_node)

bwd_path = self.bwd_astar.retrace_path(bwd_node)

bwd_path.pop()

bwd_path.reverse()

path = fwd_path + bwd_path

return path

if __name__ == "__main__":

# all coordinates are given in format [y,x]

import doctest

doctest.testmod()

init = (0, 0)

goal = (len(grid) - 1, len(grid[0]) - 1)

for elem in grid:

print(elem)

start_time = time.time()

a_star = AStar(init, goal)

path = a_star.search()

end_time = time.time() - start_time

print(f"AStar execution time = {end_time:f} seconds")

bd_start_time = time.time()

bidir_astar = BidirectionalAStar(init, goal)

path = bidir_astar.search()

bd_end_time = time.time() - bd_start_time

print(f"BidirectionalAStar execution time = {bd_end_time:f} seconds")