自学内容网 自学内容网

【车辆轨迹处理】python实现轨迹点的聚类(二)—— ST-DBSCAN算法


前言

  笔者在之前的研究中,尝试对车辆轨迹数据进行空间聚类,以期望发现车辆在行驶过程中的停留信息。在笔者之前的文章中,笔者使用了DBSCAN算法来做这一件事。
  然而,对于时序的车辆经纬度数据,DBSCAN有一个很大的问题——没有考虑数据中蕴含的时间信息!时间信息是时间序列数据与其他数据区别的重要特征。举个例子:在使用DBSCAN对车辆的经纬度进行聚类时,它仅仅是把那些空间相近的数据聚成一类,可是同一类中的数据可能相差很大。我们想要发现车辆的驻留行为,那些空间和时间都相近的轨迹点才能聚成一类,这才表示这辆车可能在某段时间因为某原因发生了停留。
  正因如此,很多年前的研究者就对DBSCAN进行改进,有了适合用作时间序列数据密度聚类的算法——ST-DBSCAN。STDBSCAN的具体算法本文不再赘述,基本流程和DBSCAN无异,只是在可达点寻找中加入了时间阈值作为限制条件。若想要了解详细算法,可自行互联网搜索。
  本文还是以如下格式车辆轨迹数据为例,实提供了ST-DBSCAN对车辆轨迹数据聚类并分析的方法:

collect_timeidlonlat
时间车辆标识经度纬度

  为了尽量去除噪声影响,车辆轨迹数据已经经过滤波平滑,平滑方法可见作者之前文章:https://blog.csdn.net/jgsecurity/article/details/140608431

一、单辆车轨迹的聚类与分析

  为了尽量与scikit-learn库中的使用方法相似,本文用类来实现STDBSCAN。class STDBSCAN的内容可以放在单独文件中作为模块导入,也可以同一文件中使用。

1.引入库

  使用了数学计算库numpy,数据分析库pandas,机器学习库scikit-learn,地理相关库shapelygeopy,绘图库matplotlib

import numpy as np
import pandas as pd
from datetime import timedelta
from shapely.geometry import MultiPoint
from geopy.distance import great_circle
from sklearn import metrics
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors

2.class STDBSCAN实现

  在进行聚类之前,先使用shapelygeopy库实现了get_centermost_point函数。其输入数据cluster是列表类型,表示每一组聚类的点集。作用是在获得了每个聚类之后,计算出该聚类的中心点。

# 计算每个聚类的中心点
def get_centermost_point(cluster):
    # 计算整个点集合的质心点
    centroid = (MultiPoint(cluster).centroid.x, MultiPoint(cluster).centroid.y)
    # 取点集合中离质心点最近的点为中心点
    centermost_point = min(cluster, key=lambda point: great_circle(point, centroid).m)
    # 返回中心点
    return tuple(centermost_point)

  采用class来实现STDBSCAN。
  类有四个属性:spatial_threshold(距离阈值,单位)、temporal_threshold(时间阈值,单位分钟)、min_neighbors(邻域内最少点数)、labels_(聚类后的标签)。在__init__构造函数中为前三个参数设置了默认值。
  retrieve_neighbors(self, index_center, df)用于寻找给定一个核心点的所有可达邻居(在距离阈值和时间阈值内)。接受参数index_center(整数: 给定核心点的索引)和df(dataframe: 单个车辆的轨迹点数据集)。函数返回给定核心点所有可达邻居点的索引集合。
  fit(self, df)为实现ST-DBSCAN的聚类方法,用于接受某个车辆的轨迹点数据并完成STDBSCAN聚类。接受参数df(dataframe: 单个车辆的轨迹点数据集)。返回当前STDBSCAN类的实例本身。


class STDBSCAN(object):

    def __init__(self, spatial_threshold=500.0, temporal_threshold=30.0,
                 min_neighbors=6):

        self.spatial_threshold = spatial_threshold
        self.temporal_threshold = temporal_threshold
        self.min_neighbors = min_neighbors
        self.labels_ = []

    # 找到当前核心点的可达邻居
    def retrieve_neighbors(self, index_center, df):
        neigborhood = []

        # index_center为当前核心点索引,选取核心点对应的行数据
        center_point = df.loc[index_center]

        # 根据时间阈值筛选可达点
        min_time = center_point['collect_time'] - timedelta(minutes=self.temporal_threshold)
        max_time = center_point['collect_time'] + timedelta(minutes=self.temporal_threshold)
        df = df[(df['collect_time'] >= min_time) & (df['collect_time'] <= max_time)]

        # 根据距离阈值筛选可达点
        for index, point in df.iterrows():
            if index != index_center:
                distance = great_circle((center_point['lat'], center_point['lon']),
                                        (point['lat'], point['lon'])).m
                if distance <= self.spatial_threshold:
                    neigborhood.append(index)

        # 返回所有可达点的索引
        return neigborhood

    def fit(self, df):

        cluster_label = -1
        noise = -1
        unmarked = 777777
        stack = []

        # 丢弃原索引,保证新索引从0开始且连续
        df = df.reset_index(drop=True)
        # 初始化聚类标签
        self.labels_ = [unmarked] * df.shape[0]

        # 遍历所有轨迹点
        for index, point in df.iterrows():
            if self.labels_[index] == unmarked:
                neighborhood = self.retrieve_neighbors(index, df)

                if len(neighborhood) < self.min_neighbors:
                    self.labels_[index] = noise

                else:  # 发现核心点
                    # 为核心点分配新簇标签
                    cluster_label = cluster_label + 1
                    self.labels_[index] = cluster_label

                    # 为该核心点的所有可达邻居点分配当前簇标签
                    for neighbor_index in neighborhood:
                        self.labels_[neighbor_index] = cluster_label
                        # 邻居点入栈
                        stack.append(neighbor_index)

                    # 栈内点依次出栈
                    while len(stack) > 0:
                        current_point_index = stack.pop()
                        new_neighborhood = self.retrieve_neighbors(current_point_index, df)

                        # 若current_point_index点也是核心点
                        if len(new_neighborhood) >= self.min_neighbors:
                            for neighbor_index in new_neighborhood:
                                neighbor_cluster = self.labels_[neighbor_index]
                                if (neighbor_cluster != noise) & (neighbor_cluster == unmarked):
                                    self.labels_[neighbor_index] = cluster_label
                                    stack.append(neighbor_index)

        return self

3.进行聚类

  在进行聚类之前,先使用shapelygeopy库实现了get_centermost_point函数。其输入数据cluster是列表类型,表示每一组聚类的点集。作用是在获得了每个聚类之后,计算出该聚类的中心点。

# 计算每个聚类的中心点
def get_centermost_point(cluster):
    # 计算整个点集合的质心点
    centroid = (MultiPoint(cluster).centroid.x, MultiPoint(cluster).centroid.y)
    # 取点集合中离质心点最近的点为中心点
    centermost_point = min(cluster, key=lambda point: great_circle(point, centroid).m)
    # 返回中心点
    return tuple(centermost_point)

  对单辆车的聚类函数cluster_traj,其输入数据data是dataframe类型,表示一辆车的轨迹数据。假设STDBSCAN类与cluster_traj函数放在同一文件中,对输入数据data进行一些预处理得到coords后,只需使用stdb = STDBSCAN(spatial_threshold=400.0, temporal_threshold=40.0, min_neighbors=4).fit(coords)就能完成聚类。怎么样,是不是很像scikit-learn中的DBSCAN用法?


# STDBSCAN聚类
def cluster_traj(data):
    # 创建副本
    data = data.copy()
    # 提取dataframe中的经纬度列
    coords = data[['collect_time', 'smoothed_lat', 'smoothed_lon']].copy()
    # 重命名列
    coords.rename(columns={
        'smoothed_lat': 'lat',
        'smoothed_lon': 'lon'
    }, inplace=True)
    # 将时间列转换为pandas的datetime格式用于后续计算
    coords['collect_time'] = pd.to_datetime(coords['collect_time'].astype(str))

    # 指定参数进行聚类,参数需根据数据集调整
    stdb = STDBSCAN(spatial_threshold=400.0, temporal_threshold=40.0, min_neighbors=4).fit(coords)
    cluster_labels = stdb.labels_
    # 将 cluster_labels 转换为 Pandas Series 并确保索引一致
    cluster_labels = pd.Series(cluster_labels, index=coords.index)

    # 离群点的聚类标签为-1,其余数据聚成n类,标签为为0到n-1。num_clusters获得总共的聚类数n。
    num_clusters = len(set(cluster_labels) - set([-1]))
    print('Clustered ' + str(len(data)) + ' points to ' + str(num_clusters) + ' clusters')

    # 将聚类标签添加为新列 c_label2
    data['c_label2'] = cluster_labels
    # 聚类完成删除时间列并转换成数组,以便后续计算
    coords = coords.drop(columns=['collect_time']).values

    # # 可选:输出聚类当前车的聚类情况
    # clusters = pd.Series([coords[cluster_labels == n] for n in range(num_clusters)])
    # print(clusters)

    # 输计算噪声点占总点数的比例
    ratio = len(cluster_labels[cluster_labels[:] == -1]) / len(cluster_labels)
    print('噪声点占总点数的比例: ' + str(ratio))

    # 只有聚类数量>1时才能计算指标
    if num_clusters > 1:
        # 计算轮廓系数,作为聚类评价指标
        sc_score = metrics.silhouette_score(coords, cluster_labels)
        print('轮廓系数: ' + str(sc_score))
        # 计算DBI指标
        dbi_score = metrics.davies_bouldin_score(coords, cluster_labels)
        print('戴维斯-布尔丁指数: ' + str(dbi_score))

    print("\n")

4.单辆车的聚类评价

  需要注意的是,每辆车聚类之后,还计算了噪声比、轮廓系数(SC)、戴维斯-布尔丁指数(DBI)来评价聚类效果。其中SC指标越接近1,聚类效果越好;DBI指标越小,聚类效果越好。

  除此之外,还可以使用matplotlib库通过绘制散点图的方式,来肉眼观察这辆车的聚类效果,只需在cluster_traj函数中的return语句前插入下列代码(matplotlib绘制的散点图用于实验时判断聚类效果来调整参数,若要绘制更美观的图,可考虑使用folium库在地图上绘制轨迹点):

    # 获得每个聚类的中心点
    centermost_points = clusters.map(get_centermost_point)
    # 将各个聚类的中心点存入rep_points
    lats, lons = zip(*centermost_points)
    rep_points = pd.DataFrame({'lon': lons, 'lat': lats})
    # 绘制散点图
    colors = list(mcolors.TABLEAU_COLORS.values())  # 使用Tableau颜色作为聚类颜色
    noise_color = 'black'  # 离群点颜色
    fig, ax = plt.subplots(figsize=(12, 8))

    for i, cluster in enumerate(clusters):
        if i == len(colors):  # 如果聚类数超过颜色数,循环使用颜色
            color = colors[i % len(colors)]
        else:
            color = colors[i]

        ax.scatter(cluster[:, 1], cluster[:, 0], s=30, c=color, marker='o', label='Cluster ' + str(i))

    # 绘制离群点
    noise_points = coords[cluster_labels == -1]
    ax.scatter(noise_points[:, 1], noise_points[:, 0], s=20, c=noise_color, marker='x', label='Noise points')

    ax.scatter(rep_points['lon'], rep_points['lat'], c='red', marker='*', s=100, label='Cluster Centers')

    ax.set_title('DBSCAN Clustering of Trajectory Data')
    ax.set_xlabel('Longitude')
    ax.set_ylabel('Latitude')
    ax.legend()

    plt.show()

二、整个数据集多辆车聚类

  本人的数据集中包含多辆车的轨迹数据,这些数据统一存储一个CSV文件中,并且已经按照id和collect_time数据升序排序。

1.聚类

  使用groupby的方式对车辆按id分组,每组分别调用cluster_traj即可。

    #假设已经读入数据df
    clustered_data = pd.DataFrame()

    # 按车辆id分组,对每辆车的数据进行聚类
    grouped = df.groupby('id')
    for name, group in grouped:
        print('车辆id:' + name + '  轨迹点数:' + str(len(group)))
        clustered_group = cluster_traj(group)
        clustered_data = pd.concat([clustered_data, clustered_group], ignore_index=True)

2.整体评价

  可以在函数外设置两个全局变量列表sc_scores和dbi_scores存储每辆车的评价指标。

# 全局变量用于存储指标
sc_scores = []
dbi_scores = []

  对cluser_traj函数中的计算轮廓系数部分添加sc_scores.append(sc_score)和dbi_scores.append(dbi_score)两行代码。即计算每辆车的评价指标的同时,将其加入外部的列表中。

if num_clusters > 1:
        # 计算轮廓系数,作为聚类评价指标
        sc_score = metrics.silhouette_score(coords, cluster_labels)
        print('轮廓系数: ' + str(sc_score))
        sc_scores.append(sc_score)
        # 计算DBI指标
        dbi_score = metrics.davies_bouldin_score(coords, cluster_labels)
        print('戴维斯-布尔丁指数: ' + str(dbi_score))
        dbi_scores.append(dbi_score)

  通过sc_scores和dbi_scores两个列表的分析,例如求均值、中位数、画图查看分布等方式,可以评价整个数据聚类效果的好坏。


原文地址:https://blog.csdn.net/jgsecurity/article/details/140697540

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!