k-nearest neighbors algorithm in Python
Institute for Environmental and Spatial Analysis...University of North Georgia
Contents
1 Introduction
Implement the k-nearest neighbors algorithm.
2 Python code
# to read/write shapefile
from osgeo import ogr
# to sort distances by index
import numpy as np
# to generate random numbers for tie handling
import random
def knn_classify(test_point, training_points, training_classes, k):
"""
Classifies test_point using training_points and training_classes and
returns the class with the majority vote.
"""
dists = []
x1, y1, _ = test_point
for i in range(len(training_points)):
x2, y2, _ = training_points[i]
dist = ((x1 - x2)**2 + (y1 - y2)**2)**.5
dists.append(dist)
idxs = np.argsort(dists)
votes = {}
for i in range(k):
idx = idxs[i]
class_ = training_classes[idx]
if class_ in votes:
votes[class_] += 1
else:
votes[class_] = 1
print(votes)
max_count = None
class_ = None
for cls, count in votes.items():
if max_count is None or count > max_count:
class_ = cls
max_count = count
tie_classes = []
for cls, count in votes.items():
if count == max_count:
tie_classes.append(cls)
if len(tie_classes) > 1:
class_ = random.choice(tie_classes)
print("Classified as", class_)
return class_
def classify_shapefile(shapefile_path, class_field_name, k):
# open the shapefile in update mode
file = ogr.Open(shapefile_path, 1)
# get the first layer
lyr = file.GetLayer(0)
# get the layer definition to find the field index of the Class field
lyr_def = lyr.GetLayerDefn()
# let's use a dictionary to store field name and index pairs
field_idx = {}
for i in range(lyr_def.GetFieldCount()):
# for each field, store the field name and its index in field_idx
field_idx[lyr_def.GetFieldDefn(i).GetName()] = i
# now we can find the field index of the Class field
class_field_idx = field_idx[class_field_name]
# let's collect already classified features for training
# geometries only
training_points = []
# classes only
training_classes = []
num_features = lyr.GetFeatureCount()
for i in range(num_features):
# for each feature
feat = lyr.GetFeature(i)
# get the class
class_ = feat.GetField(class_field_idx)
if class_ is not None:
# if the feature is classified
# get the location
point = feat.geometry().GetPoint()
# append it to training_points
training_points.append(point)
# append its class to training_classes
training_classes.append(class_)
# let's classify unclassified features
for i in range(num_features):
# for each feature
feat = lyr.GetFeature(i)
class_ = feat.GetField(class_field_idx)
if class_ is None:
# if the feature is unclassified
# get the location
test_point = feat.geometry().GetPoint()
# pass the location to knn_classify with the training data sets and k;
# get the class with the majority vote
class_ = knn_classify(test_point, training_points, training_classes, k)
# set the newly classfied label to the feature's Class field
feat.SetField(class_field_name, class_)
# pass the updated feature back to the layer
lyr.SetFeature(feat)
shapefile_path = "p:/courses/python/points.shp"
class_field_name = "Class"
k = int(input("k? "))
classify_shapefile(shapefile_path, class_field_name, k)