自学内容网 自学内容网

Python应用指南:高德拥堵延时指数

随着城市化进程的加快,交通拥堵问题日益严重,成为影响城市居民生活质量的重要因素之一。为了科学评估和管理交通拥堵,各种交通拥堵指数应运而生。其中,高德地图提供的“拥堵延时指数”因其数据丰富、实时性强和应用广泛而备受关注。本篇文章我们将视角聚焦于高德拥堵延时指数的定义、计算方法、应用场景以及其在城市交通管理中的重要作用。

高德地图交通大数据链接:上海 城市实时交通详情

1. 高德拥堵延时指数的定义

高德拥堵延时指数(Traffic Congestion Delay Index, TCDI)是一种衡量城市交通拥堵程度的综合指标。它通过比较实际行车时间和自由流状态下的行车时间来量化交通拥堵的程度。具体来说,拥堵延时指数反映了在拥堵状态下,车辆通过某路段所需的时间相对于自由流状态下所需时间的增加比例。

2. 高德拥堵延时指数的计算方法

高德拥堵延时指数的计算公式如下:

拥堵延时指数:实际旅行时间与自由流(畅通)状态下旅行时间的比值

  • 实际行驶时间(Actual Travel Time, TaTa​):在实际交通条件下,车辆通过某路段所需的时间。
  • 自由流行驶时间(Free Flow Travel Time, TfTf​):在无交通干扰的情况下,车辆通过该路段所需的时间。

例如,假设某路段的自由流行驶时间为10分钟,实际行驶时间为20分钟,则拥堵延时指数为:

拥堵延时指数=20/10=2.0

这意味着在拥堵状态下,车辆通过该路段所需的时间是自由流状态下所需时间的2倍。

3. 高德拥堵延时指数的分级标准

为了更直观地理解和应用拥堵延时指数,高德地图将其分为几个等级:

  • 1.0 - 1.4:畅通
  • 1.5 - 1.9:缓行
  • 2.0 - 3.9:拥堵
  • 4.0 以上:严重拥堵

其他指标释义可参考高德发布的2023年度中国主要城市交通分析报告附录A名词解释部分:

2023年度中国主要城市交通分析报告

先讲一下方法思路,一共三个步骤;

方法思路

  1. 通过高德驾车路径规划查询路段的通行时间、速度等指标(允许计算多条道路)
  2. 每5分钟记录一次数据
  3. 输出csv的结果表

高德驾车路径规划的个人开发者账号的日配额5000次/日, 高德的拥堵延时指数则是每5分钟自动刷新,我们可以通过驾车规划来计算路段的拥堵延时指数,理论上我们可以实现对一条或者多条道路的全天监测,这取决于我们账号的额度,基于这个思路,我们就阔以通过python脚本来实现自动化的流程;

我们来基于图示来猜测一下计算逻辑,就是计算通过计算该段路的实际旅行时间与自由流(畅通)状态下旅行时间的比值,来得到的数值,那我们只要把驾车路径调整成需要观测道路的行驶路径,即可监测该条路的拥堵延时指数和平均速度这些指标;

完整代码#运行环境Python 3.11

import requests
import json
import math
import time
import csv
import os
import pandas as pd
import matplotlib.pyplot as plt
import geopandas as gpd
from shapely.geometry import Point, LineString

# 设置中文支持
plt.rcParams['font.sans-serif'] = ['SimHei']  # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False    # 用来正常显示负号

# 常量定义
pi = 3.14159265358979324
ee = 0.00669342162296594323
a = 6378245.0

def out_of_china(lng, lat):
    """
    判断是否在国内,不在国内不做偏移
    :param lng: 经度
    :param lat: 纬度
    :return: 是否在国内
    """
    return not (73.66 < lng < 135.05 and 3.86 < lat < 53.55)

def transformlat(lng, lat):
    ret = -100.0 + 2.0 * lng + 3.0 * lat + 0.2 * lat * lat + 0.1 * lng * lat + 0.2 * math.sqrt(math.fabs(lng))
    ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 * math.sin(2.0 * lng * pi)) * 2.0 / 3.0
    ret += (20.0 * math.sin(lat * pi) + 40.0 * math.sin(lat / 3.0 * pi)) * 2.0 / 3.0
    ret += (160.0 * math.sin(lat / 12.0 * pi) + 320 * math.sin(lat * pi / 30.0)) * 2.0 / 3.0
    return ret

def transformlng(lng, lat):
    ret = 300.0 + lng + 2.0 * lat + 0.1 * lng * lng + 0.1 * lng * lat + 0.1 * math.sqrt(math.fabs(lng))
    ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 * math.sin(2.0 * lng * pi)) * 2.0 / 3.0
    ret += (20.0 * math.sin(lng * pi) + 40.0 * math.sin(lng / 3.0 * pi)) * 2.0 / 3.0
    ret += (150.0 * math.sin(lng / 12.0 * pi) + 300.0 * math.sin(lng / 30.0 * pi)) * 2.0 / 3.0
    return ret

def gcj02towgs84(lng, lat):
    """
    GCJ02(火星坐标系)转GPS84
    :param lng:火星坐标系的经度
    :param lat:火星坐标系纬度
    :return:
    """
    if out_of_china(lng, lat):
        return lng, lat
    dlat = transformlat(lng - 105.0, lat - 35.0)
    dlng = transformlng(lng - 105.0, lat - 35.0)
    radlat = lat / 180.0 * pi
    magic = math.sin(radlat)
    magic = 1 - ee * magic * magic
    sqrtmagic = math.sqrt(magic)
    dlat = (dlat * 180.0) / ((a * (1 - ee)) / (magic * sqrtmagic) * pi)
    dlng = (dlng * 180.0) / (a / sqrtmagic * math.cos(radlat) * pi)
    mglat = lat + dlat
    mglng = lng + dlng
    return [lng * 2 - mglng, lat * 2 - mglat]

def coordinates(c):
    lng, lat = c.split(',')
    lng, lat = float(lng), float(lat)
    wlng, wlat = gcj02towgs84(lng, lat)
    return wlng, wlat

def driving_planning(api_key, routes):
    results = []
    all_route_coordinates = []

    for i, (from_location, to_location) in enumerate(routes):
        url = 'https://restapi.amap.com/v3/direction/driving'
        parameters = {
            'key': api_key,
            'origin': from_location,
            'destination': to_location,
            'strategy': '11',
            'output': 'json'
        }

        response = requests.get(url, params=parameters)
        data = json.loads(response.text)

        if data['status'] == '1':
            route = data['route']
            paths = route['paths']
            if paths:
                path = paths[0]
                distance = int(path['distance']) / 1000  # 转换为公里
                duration = int(path['duration']) / 60  # 转换为分钟
                steps = path['steps']

                route_coordinates = []

                for step in steps:
                    polyline = step['polyline']
                    # 转换坐标
                    coordinates_list = [coordinates(c) for c in polyline.split(';')]
                    route_coordinates.extend(coordinates_list)

                # 计算平均速度(km/h)
                average_speed = distance / (duration / 60)

                results.append({
                    'route': f"路线 {i + 1}",
                    '总出行距离': distance,
                    '总出行时间': duration,
                    '平均速度': average_speed
                })

                print(f"路线 {i + 1}:")
                print(f"  总出行距离: {distance:.2f}公里")
                print(f"  总出行时间: {duration:.2f}分钟")
                print(f"  平均速度: {average_speed:.2f}公里/小时")

                all_route_coordinates.append(route_coordinates)
            else:
                print(f"未找到路径 {i + 1}")
        else:
            print(f"路线规划失败 {i + 1}: {data['info']}")

    return results, all_route_coordinates

def export_to_csv(results, filename):
    file_exists = os.path.exists(filename)

    with open(filename, mode='a', newline='', encoding='ANSI') as file:
        writer = csv.writer(file)
        if not file_exists:
            writer.writerow(['时间', '路线', '总出行距离(公里)', '总出行时间(分钟)', '平均速度(公里/小时)'])

        current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
        for result in results:
            writer.writerow([current_time, result['route'], f"{result['总出行距离']:.2f}", f"{result['总出行时间']:.2f}", f"{result['平均速度']:.2f}"])

def plot_routes(all_route_coordinates):
    fig, ax = plt.subplots(figsize=(10, 10))

    for i, route_coordinates in enumerate(all_route_coordinates):
        route_line = LineString(route_coordinates)
        gdf = gpd.GeoDataFrame(index=[0], geometry=[route_line])
        gdf.plot(ax=ax, color=f'C{i}', label=f"路线 {i + 1}")

    ax.set_xlabel('经度')
    ax.set_ylabel('纬度')
    ax.legend()
    plt.title('路线规划图')
    plt.show()

def main():
    api_key = '你的key'  # 替换为你的高德地图API密钥

    # 定义多个起终点对
    routes = [
        ('121.535949,31.213957', '121.572702,31.215328'),  # 示例起终点
        ('121.564392,31.204669', '121.557429,31.231008'),  # 示例起终点

        # 可以继续添加更多的起终点对
    ]

    first_run = True  # 标志变量,用于判断是否是第一次运行

    while True:
        results, all_route_coordinates = driving_planning(api_key, routes)
        export_to_csv(results, '拥堵延时指数.csv')
        print("数据已导出到CSV文件")

        if first_run:
            plot_routes(all_route_coordinates)
            first_run = False  # 第一次运行后设置标志为False

        time.sleep(300)  # 每五分钟运行一次

if __name__ == '__main__':
    main()

这里我们以上海的花木路和芳甸路作为研究对象;

结果输出为csv的形式,便于我们后续的计算与分析;

这里主观界定凌晨的2:00-4:00点路段的通行时间均值作为自由流(畅通)状态下旅行时间,基于此来计算的拥堵延时指数,同时我们把监测时间延长到24小时,那么我们就可以看到整条道路的早晚高峰分布情况,平均道路运行速度等直观的路况数据;

文章仅用于分享个人学习成果与个人存档之用,分享知识,如有侵权,请联系作者进行删除。所有信息均基于作者的个人理解和经验,不代表任何官方立场或权威解读。


原文地址:https://blog.csdn.net/weixin_45812624/article/details/143947797

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!