Commit 438d9be1 authored by O.V.B Sewmina's avatar O.V.B Sewmina

Merge branch 'IT19186566' into 'master'

algorithm implemented

See merge request !4
parents 7864ded6 97ba837b
import networkx as nx
import pickle
from chart_studio import plotly as py
#import plotly.plotly as py
import random
from plotly.graph_objs import *
from plotly.offline import init_notebook_mode, plot, iplot
init_notebook_mode(connected=True)
map_10_dict = {
0: {'pos': (0.7798606835438107, 0.6922727646627362), 'connections': [7, 6, 5]},
1: {'pos': (0.7647837074641568, 0.3252670836724646), 'connections': [4, 3, 2]},
2: {'pos': (0.7155217893995438, 0.20026498027300055), 'connections': [4, 3, 1]},
3: {'pos': (0.7076566826610747, 0.3278339270610988), 'connections': [5, 4, 1, 2]},
4: {'pos': (0.8325506249953353, 0.02310946309985762), 'connections': [1, 2, 3]},
5: {'pos': (0.49016747075266875, 0.5464878695400415), 'connections': [7, 0, 3]},
6: {'pos': (0.8820353070895344, 0.6791919587749445), 'connections': [0]},
7: {'pos': (0.46247219371675075, 0.6258061621642713), 'connections': [0, 5]},
8: {'pos': (0.11622158839385677, 0.11236327488812581), 'connections': [9]},
9: {'pos': (0.1285377678230034, 0.3285840695698353), 'connections': [8]}
}
class Map:
def __init__(self, G):
self._graph = G
self.intersections = nx.get_node_attributes(G, "pos")
self.roads = [list(G[node]) for node in G.nodes()]
def save(self, filename):
with open(filename, 'wb') as f:
pickle.dump(self._graph, f)
def load_map_graph(map_dict):
G = nx.Graph()
for node in map_dict.keys():
G.add_node(node, pos=map_dict[node]['pos'])
for node in map_dict.keys():
for con_node in map_dict[node]['connections']:
G.add_edge(node, con_node)
return G
def load_map_10():
G = load_map_graph(map_10_dict)
return Map(G)
def load_map_40():
G = load_map_graph(map_40_dict)
return Map(G)
def show_map(M, start=None, goal=None, path=None):
G = M._graph
pos = nx.get_node_attributes(G, 'pos')
edge_trace = Scatter(
x=[],
y=[],
line=Line(width=0.5,color='#888'),
hoverinfo='none',
mode='lines')
for edge in G.edges():
x0, y0 = G.node[edge[0]]['pos']
x1, y1 = G.node[edge[1]]['pos']
edge_trace['x'] += [x0, x1, None]
edge_trace['y'] += [y0, y1, None]
node_trace = Scatter(
x=[],
y=[],
text=[],
mode='markers',
hoverinfo='text',
marker=Marker(
showscale=False,
# colorscale options
# 'Greys' | 'Greens' | 'Bluered' | 'Hot' | 'Picnic' | 'Portland' |
# Jet' | 'RdBu' | 'Blackbody' | 'Earth' | 'Electric' | 'YIOrRd' | 'YIGnBu'
colorscale='Hot',
reversescale=True,
color=[],
size=10,
colorbar=dict(
thickness=15,
title='Node Connections',
xanchor='left',
titleside='right'
),
line=dict(width=2)))
for node in G.nodes():
x, y = G.node[node]['pos']
node_trace['x'].append(x)
node_trace['y'].append(y)
for node, adjacencies in enumerate(G.adjacency_list()):
color = 0
if path and node in path:
color = 2
if node == start:
color = 3
elif node == goal:
color = 1
# node_trace['marker']['color'].append(len(adjacencies))
node_trace['marker']['color'].append(color)
node_info = "Intersection " + str(node)
node_trace['text'].append(node_info)
fig = Figure(data=Data([edge_trace, node_trace]),
layout=Layout(
title='<br>Network graph made with Python',
titlefont=dict(size=16),
showlegend=False,
hovermode='closest',
margin=dict(b=20,l=5,r=5,t=40),
xaxis=XAxis(showgrid=False, zeroline=False, showticklabels=False),
yaxis=YAxis(showgrid=False, zeroline=False, showticklabels=False)))
iplot(fig)
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from maps import Map, load_map_10, show_map\n",
"import math"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"map_10 = load_map_10()\n",
"show_map(map_10)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"map_10.intersections\n",
"#map_10.intersections[0][0]\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# this shows that intersection 0 connects to intersections 7, 6, and 5\n",
"map_10.roads[0] "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# This shows the full connectivity of the map\n",
"map_10.roads"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# parameters in the function\n",
"show_map(map_10, start=7, goal=2, path=[7,5,5])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Planner\n",
"class PathPlanner():\n",
" \"\"\" PathPlanner Object \"\"\"\n",
" def __init__(self, M, start=None, goal=None):\n",
" \n",
" self.map = M\n",
" self.start= start\n",
" self.goal = goal\n",
" self.closedSet = self.create_closedSet() if goal != None and start != None else None\n",
" self.openSet = self.create_openSet() if goal != None and start != None else None\n",
" self.cameFrom = self.create_cameFrom() if goal != None and start != None else None\n",
" self.gScore = self.create_gScore() if goal != None and start != None else None\n",
" self.fScore = self.create_fScore() if goal != None and start != None else None\n",
" self.path = self.run_search() if self.map and self.start != None and self.goal != None else None\n",
" \n",
" def reconstruct_path(self, current):\n",
" total_path = [current]\n",
" while current in self.cameFrom.keys():\n",
" current = self.cameFrom[current]\n",
" total_path.append(current)\n",
" return total_path\n",
" \n",
" def _reset(self):\n",
" self.closedSet = None\n",
" self.openSet = None\n",
" self.cameFrom = None\n",
" self.gScore = None\n",
" self.fScore = None\n",
" self.path = self.run_search() if self.map and self.start and self.goal else None\n",
"\n",
" def run_search(self):\n",
" \"\"\" \"\"\"\n",
" if self.map == None:\n",
" raise(ValueError, \"Must create map before running search\")\n",
" if self.goal == None:\n",
" raise(ValueError, \"Must create goal node before running search\")\n",
" if self.start == None:\n",
" raise(ValueError, \"Must create start node before running search\")\n",
"\n",
" self.closedSet = self.closedSet if self.closedSet != None else self.create_closedSet()\n",
" self.openSet = self.openSet if self.openSet != None else self.create_openSet()\n",
" self.cameFrom = self.cameFrom if self.cameFrom != None else self.create_cameFrom()\n",
" self.gScore = self.gScore if self.gScore != None else self.create_gScore()\n",
" self.fScore = self.fScore if self.fScore != None else self.create_fScore()\n",
"\n",
" while not self.is_open_empty():\n",
" current = self.get_current_node()\n",
"\n",
" if current == self.goal:\n",
" self.path = [x for x in reversed(self.reconstruct_path(current))]\n",
" return self.path\n",
" else:\n",
" self.openSet.remove(current)\n",
" self.closedSet.add(current)\n",
"\n",
" for neighbor in self.get_neighbors(current):\n",
" if neighbor in self.closedSet:\n",
" continue # Ignore the neighbor which is already evaluated.\n",
"\n",
" if not neighbor in self.openSet: # Discover a new node\n",
" self.openSet.add(neighbor)\n",
" \n",
" # The distance from start to a neighbor\n",
" #the \"dist_between\" function may vary as per the solution requirements.\n",
" if self.get_tentative_gScore(current, neighbor) >= self.get_gScore(neighbor):\n",
" continue \n",
" \n",
" self.record_best_path_to(current, neighbor)\n",
" print(\"No Path Found\")\n",
" self.path = None\n",
" return False"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def create_closedSet(self):\n",
" \"\"\" Creates and returns a data structure suitable to hold the set of nodes already evaluated\"\"\"\n",
" # EXAMPLE: return a data structure suitable to hold the set of nodes already evaluated\n",
" return set()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def create_openSet(self):\n",
" if self.start != None:\n",
" # TODO: return a data structure suitable to hold the set of currently discovered nodes \n",
" # that are not evaluated yet. Make sure to include the start node.\n",
" openSet = set()\n",
" openSet.add(self.start)\n",
" return openSet\n",
" \n",
" raise(ValueError, \"Must create start node before creating an open set. Try running PathPlanner.set_start(start_node)\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def create_cameFrom(self):\n",
" # TODO: return a data structure that shows which node can most efficiently be reached from another,\n",
" # for each node. \n",
" cameFrom = {}\n",
" return cameFrom"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def create_gScore(self):\n",
" # TODO: return a data structure that holds the cost of getting from the start node to that node, for each node.\n",
" # for each node. The cost of going from start to start is zero. The rest of the node's values should \n",
" # be set to infinity.\n",
" if self.start != None:\n",
" gScore = {}\n",
" for node in self.map.intersections.keys():\n",
" if node == self.start:\n",
" gScore[node] = 0\n",
" else: gScore[node] = float('inf')\n",
" return gScore\n",
" raise(ValueError, \"Must create start node before creating gScore. Try running PathPlanner.set_start(start_node)\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def create_fScore(self):\n",
" # TODO: return a data structure that holds the total cost of getting from the start node to the goal\n",
" # by passing by that node, for each node. That value is partly known, partly heuristic.\n",
" # For the first node, that value is completely heuristic. The rest of the node's value should be \n",
" # set to infinity.\n",
" if self.start != None:\n",
" fScore = {}\n",
" for node in self.map.intersections.keys():\n",
" if node == self.start:\n",
" fScore[node] = self.heuristic_cost_estimate(self.start)\n",
" else: fScore[node] = float('inf')\n",
" return fScore\n",
" raise(ValueError, \"Must create start node before creating fScore. Try running PathPlanner.set_start(start_node)\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def set_map(self, M):\n",
" \"\"\"Method used to set map attribute \"\"\"\n",
" self._reset(self)\n",
" self.start = None\n",
" self.goal = None\n",
" # TODO: Set map to new value. \n",
" self.map = M"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def set_start(self, start):\n",
" \"\"\"Method used to set start attribute \"\"\"\n",
" self._reset(self)\n",
" # TODO: Set start value. Remember to remove goal, closedSet, openSet, cameFrom, gScore, fScore, \n",
" # and path attributes' values.\n",
" self.start = start\n",
" self.goal = None\n",
" "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def set_goal(self, goal):\n",
" \"\"\"Method used to set goal attribute \"\"\"\n",
" self._reset(self)\n",
" # TODO: Set goal value. \n",
" self.goal = goal\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def is_open_empty(self):\n",
" \"\"\"returns True if the open set is empty. False otherwise. \"\"\"\n",
" # TODO: Return True if the open set is empty. False otherwise.\n",
" return not bool(self.openSet)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_current_node(self):\n",
" \"\"\" Returns the node in the open set with the lowest value of f(node).\"\"\"\n",
" # TODO: Return the node in the open set with the lowest value of f(node).\n",
" current = None\n",
" minim = float('inf')\n",
" for node in self.openSet:\n",
" if self.fScore[node] < minim:\n",
" minim = self.fScore[node]\n",
" current = node\n",
" return current \n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_neighbors(self, node):\n",
" \"\"\"Returns the neighbors of a node\"\"\"\n",
" # TODO: Return the neighbors of a node\n",
" return set(self.map.roads[node]) \n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Scores\n",
"\n",
"def get_gScore(self, node):\n",
" \"\"\"Returns the g Score of a node\"\"\"\n",
" # TODO: Return the g Score of a node\n",
" return self.gScore[node]\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def distance(self, node_1, node_2):\n",
" \"\"\" Computes the Euclidean L2 Distance\"\"\"\n",
" # TODO: Compute and return the Euclidean L2 Distance\n",
" x1 = self.map.intersections[node_1][0]\n",
" y1 = self.map.intersections[node_1][1]\n",
" x2 = self.map.intersections[node_2][0]\n",
" y2 = self.map.intersections[node_2][1]\n",
" dist = math.sqrt( (x2-x1)**2 + (y2-y1)**2 )\n",
" return dist\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_tentative_gScore(self, current, neighbor):\n",
" \"\"\"Returns the tentative g Score of a node\"\"\"\n",
" # TODO: Return the g Score of the current node \n",
" # plus distance from the current node to it's neighbors\n",
" g_score_current = self.get_gScore(current)\n",
" dist_current_neighbor = self.distance(current,neighbor)\n",
" return g_score_current+dist_current_neighbor\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def heuristic_cost_estimate(self, node):\n",
" \"\"\" Returns the heuristic cost estimate of a node \"\"\"\n",
" # TODO: Return the heuristic cost estimate of a node\n",
" if self.goal != None:\n",
" heuristic_estimate = self.distance(node,self.goal)\n",
" return heuristic_estimate\n",
" raise(ValueError, \"Must create goal node before calculating huristic estimate. Try running PathPlanner.set_goal(goal_node)\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def calculate_fscore(self, node):\n",
" \"\"\"Calculate the f score of a node. \"\"\"\n",
" # TODO: Calculate and returns the f score of a node. \n",
" # REMEMBER F = G + H\n",
" f_score = self.get_gScore(node) + self.heuristic_cost_estimate(node)\n",
" return f_score\n",
" "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def record_best_path_to(self, current, neighbor):\n",
" \"\"\"Record the best path to a node \"\"\"\n",
" # TODO: Record the best path to a node, by updating cameFrom, gScore, and fScore\n",
" self.cameFrom[neighbor] = current\n",
" self.gScore[neighbor] = self.get_tentative_gScore(current,neighbor)\n",
" self.fScore[neighbor] = self.gScore[neighbor] + self.heuristic_cost_estimate(neighbor)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Associates implemented functions with PathPlanner class\n",
"PathPlanner.create_closedSet = create_closedSet\n",
"PathPlanner.create_openSet = create_openSet\n",
"PathPlanner.create_cameFrom = create_cameFrom\n",
"PathPlanner.create_gScore = create_gScore\n",
"PathPlanner.create_fScore = create_fScore\n",
"PathPlanner.set_map = set_map\n",
"PathPlanner.set_start = set_start\n",
"PathPlanner.set_goal = set_goal\n",
"PathPlanner.is_open_empty = is_open_empty\n",
"PathPlanner.get_current_node = get_current_node\n",
"PathPlanner.get_neighbors = get_neighbors\n",
"PathPlanner.get_gScore = get_gScore\n",
"PathPlanner.distance = distance\n",
"PathPlanner.get_tentative_gScore = get_tentative_gScore\n",
"PathPlanner.heuristic_cost_estimate = heuristic_cost_estimate\n",
"PathPlanner.calculate_fscore = calculate_fscore\n",
"PathPlanner.record_best_path_to = record_best_path_to"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Visualize result\n",
"start = 7\n",
"goal = 5\n",
"\n",
"show_map(map_10, start=start, goal=goal, path=PathPlanner(map_10, start, goal).path)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.4"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment