k-nearest neighbors algorithm in Python

Dr. Huidae Cho
Institute for Environmental and Spatial Analysis...University of North Georgia

1   Introduction

Implement the k-nearest neighbors algorithm.

2   Python code

# to read/write shapefile
from osgeo import ogr
# to sort distances by index
import numpy as np
# to generate random numbers for tie handling
import random

def knn_classify(test_point, training_points, training_classes, k):
    """
    Classifies test_point using training_points and training_classes and
    returns the class with the majority vote.
    """
    dists = []
    x1, y1, _ = test_point
    for i in range(len(training_points)):
        x2, y2, _ = training_points[i]
        dist = ((x1 - x2)**2 + (y1 - y2)**2)**.5
        dists.append(dist)
    idxs = np.argsort(dists)

    votes = {}

    for i in range(k):
        idx = idxs[i]
        class_ = training_classes[idx]
        if class_ in votes:
            votes[class_] += 1
        else:
            votes[class_] = 1

    print(votes)

    max_count = None
    class_ = None

    for cls, count in votes.items():
        if max_count is None or count > max_count:
            class_ = cls
            max_count = count

    tie_classes = []

    for cls, count in votes.items():
        if count == max_count:
            tie_classes.append(cls)

    if len(tie_classes) > 1:
        class_ = random.choice(tie_classes)

    print("Classified as", class_)

    return class_


def classify_shapefile(shapefile_path, class_field_name, k):
    # open the shapefile in update mode
    file = ogr.Open(shapefile_path, 1)
    # get the first layer
    lyr = file.GetLayer(0)

    # get the layer definition to find the field index of the Class field
    lyr_def = lyr.GetLayerDefn()
    # let's use a dictionary to store field name and index pairs
    field_idx = {}

    for i in range(lyr_def.GetFieldCount()):
        # for each field, store the field name and its index in field_idx
        field_idx[lyr_def.GetFieldDefn(i).GetName()] = i

    # now we can find the field index of the Class field
    class_field_idx = field_idx[class_field_name]

    # let's collect already classified features for training
    # geometries only
    training_points = []
    # classes only
    training_classes = []

    num_features = lyr.GetFeatureCount()

    for i in range(num_features):
        # for each feature
        feat = lyr.GetFeature(i)
        # get the class
        class_ = feat.GetField(class_field_idx)
        if class_ is not None:
            # if the feature is classified
            # get the location
            point = feat.geometry().GetPoint()
            # append it to training_points
            training_points.append(point)
            # append its class to training_classes
            training_classes.append(class_)

    # let's classify unclassified features
    for i in range(num_features):
        # for each feature
        feat = lyr.GetFeature(i)
        class_ = feat.GetField(class_field_idx)
        if class_ is None:
            # if the feature is unclassified
            # get the location
            test_point = feat.geometry().GetPoint()
            # pass the location to knn_classify with the training data sets and k;
            # get the class with the majority vote
            class_ = knn_classify(test_point, training_points, training_classes, k)
            # set the newly classfied label to the feature's Class field
            feat.SetField(class_field_name, class_)
            # pass the updated feature back to the layer
            lyr.SetFeature(feat)


shapefile_path = "p:/courses/python/points.shp"
class_field_name = "Class"
k = int(input("k? "))

classify_shapefile(shapefile_path, class_field_name, k)