Diala Abul-Khail
Diala's Blog

Diala's Blog

AI Traffic system with Python

AI Traffic system with Python

Pygame Simulation

Diala Abul-Khail's photo
Diala Abul-Khail
·Apr 27, 2022·

12 min read

Table of contents

Problem domain

According to a data sample we took from Greater Amman Municipality, the data clearly confirmed the huge problem of traffic we face everyday, especially in peak hours. This problem is partially related to the fact that each traffic light has a fixed green time, which wastes a lot of time and unnecessary traffic congestion. This can be solved by generating a dynamic green time tracking the traffic flow in each street for a specific period of time. So, the goal of our project was to build a create a prediction model for an intersection of 4 traffic lights, to generate a custom green time for each traffic light depending on the traffic flow for each hour of the day in order to help people save time, and avoid traffic congestions as possible.

Overview

In this blog I am going to share from scratch, the steps of building a Python project using the Pygame library, to achieve building a Traffic system simulation in which the traffic lights predict the green time duration based on number of vehicles for each street in the intersection accordingly.

About Data

The data we used was taken by previous Civil studies of Al-Bashiti intersection in Amman, and included total vehicle number for each hour in Mondays, Tuesdays and Wednesdays of August-2019.

Preparing data

The data we used was provided in an Excel sheet, And we had to clean it and prepare it and then convert it to a cvs file.

In order to prepare the data to use it in the model, we merged the lanes of the same leg, and found the percentage of traffic flow for each one, then we calculated the green time according to the percentage of the flow over the full intersection cycle (120 seconds).


Deep Learning

Building a model

Using Linear Regression machine learning algorithm, we were able to build a model that predicted the green time based on vehicle's flow percentage.

  • after defining what we want as dependent and independent attributes, we figured that the best approach will be if we divided the cycle on each street leg, and took the percentage for each hour:

  • first step is to import the needed dependencies:

    import numpy as np 
    import pandas as pd 
    import matplotlib.pyplot as plt
    from sklearn.linear_model import LinearRegression
    from sklearn.model_selection import train_test_split
    import picke
    

pickle will help us to save the model to file to use it in our code.

  • Create a dataframe of the cvs file using pandas, define dependent and independent attributes, and define the test and train variables and algorithm:

    df = pd.read_csv("csv file path")
    X = df['percentage'].values
    Y = df['greentime percentage']
    x_train, x_test ,y_train, y_test =train_test_split(X,Y, train_size=0.80,test_size=0.20,random_state=100)
    
  • Initialize the model and fit the training data:

model = LinearRegression()
model.fit(x_train.reshape(-1,1),y_train)
  • find a coeficeint and an intercipt point with y axis to draw the Red Regression Line:
model.coef_,model.intercept_
  • Predict the model and Test training and test accuracy:
y_predict= model.predict(x_test.reshape(-1,1))

train_accuracy= model.score(x_train.reshape(-1,1),y_train)
test_accuracy= model.score(x_test.reshape(-1,1),y_test)
  • Save the model to a file using pickle:
 with open('model_pickle','wb') as file:
      pickle.dump(model,file)
  • Save the model to a file using Sklearn Joblib:
import joblib
from sklearn import joblib
joblib.dump(model, 'model.joblib')

This is used as a link and is going to be imported then in the code

  • Plot a graph that represents the relationship between traffic volume and green-time, this was used to show the accuracy between training attributes:
%matplotlib inline
plt.figure(figsize=(14,7))
plt.scatter(X,Y)
plt.xlabel("Number of cars percentile")
plt.ylabel("Green time of Lanes #1 & #2")
plt.title(f"Train Accuracy= {train_accuracy}"  )
m, b = np.polyfit(X,Y, 1)


plt.plot(X, m*X + b,'Red')

train_accuracy.png


Pygame

What is Pygame?

Pygame is a Python library used for creating multimedia applications like games, art, music, sound, videos and many other projects. In this project, Pygame was used to generate a simulation of a signalized traffic intersection.

Diala.png

Basic structure

  • The first step is to import all required libraries, and import the linear regression model as well:
import random
import time
import threading
import pygame
import sys


import joblib
  • Start with the prediction model variable and then load the model:
# set prediction model for green time signal on / off
prediction_model_mode = True

mj = joblib.load("./ai_traffic_system/model_joblib")
  • Define a function that returns the green time prediction as an integer:
def rl_ml_model_timer(flow):
    flow_percentile = np.array(flow).reshape(-1, 1)
    green_time_predict = np.ceil(mj.predict(flow_percentile))
    return green_time_predict.astype(int)
  • Define another function to set the limits of green time:
def ml_model_timer(flow):
    flow_percentile = np.array(flow).reshape(-1, 1)
    green_time_predict = np.ceil(mj.predict(flow_percentile))
    # print("i predicted : ", green_time_predict)
    if green_time_predict < 3:
        green_time_predict = 2
    elif green_time_predict > 7:
        green_time_predict = 6
    return green_time_predict
  • Create constants that will help us with simulating vehicle's movement and traffic light's timings:
# Default traffic lights timers.
default_green = {0 : 20, 1 : 20, 2 : 20, 3 : 20}
default_yellow = 2
default_red = 120

signals = []
signal_number = 4
cureent_green = 0
currentYellow = 0
speed = 1.62

next_green = (cureent_green + 1) % signal_number  
# 1 -> 2 ->3 -> 0 -> 1 -> 2 ...

Car Speed was calculated based on the real live speed in that street which is 40km/hr


  • We started working on coordination. We used (Paint App by Microsoft) to locate -by pixles- the signals, timers and stop line coordination:
x = {
    "right": [0, 0, 0],
    "down": [542, 563, 637],
    "left": [1400, 1400, 1400],
    "up": [680, 723, 819],
}
y = {
    "right": [380, 410, 465],
    "down": [0, 0, 0],
    "left": [258, 315, 365],
    "up": [800, 800, 800],
}

vehicles = {
    "right": {0: [], 1: [], 2: [], "crossed": 0},
    "down": {0: [], 1: [], 2: [], "crossed": 0},
    "left": {0: [], 1: [], 2: [], "crossed": 0},
    "up": {0: [], 1: [], 2: [], "crossed": 0},
}
# assigning intersection legs starting from the right hand and CCW 
directionNumbers = {0:'right' , 1:'down', 2:'left', 3:'up'}

# Coordinates of signal image, timer, and vehicle count
signalCoods = [(400, 430), (690, 110), (970, 305), (690, 610)]
signalTimerCoods = [(450, 480), (760, 160), (1020, 355), (740, 660)]

# Coordinates of stop lines
stopLines = {"right": 350, "down": 197, "left": 1050, "up": 603}
defaultStop = {"right": 340, "down": 187, "left": 1060, "up": 610}

# gap between vehicls
moving_gap = 25
stoping_gap = 25
vehiclesTurned = {
    "right": {1: [], 2: []},
    "down": {1: [], 2: []},
    "left": {1: [], 2: []},
    "up": {1: [], 2: []},
}
vehiclesNotTurned = {
    "right": {1: [], 2: []},
    "down": {1: [], 2: []},
    "left": {1: [], 2: []},
    "up": {1: [], 2: []},
}

rotationAngle = 3
mid = {
    "right": {"x": 560, "y": 465},
    "down": {"x": 560, "y": 310},
    "left": {"x": 860, "y": 310},
    "up": {"x": 815, "y": 495},
}

count_Leg1 = 0
count_Leg2 = 0
count_Leg3 = 0
count_Leg4 = 0

total_flow_count = 1
totalflowcoods = (10, 110)

  • Then we used pygame.init() function to initialize PyGame . This function calls the separate init() functions of all the included pygame modules. Since these modules are abstractions for specific hardware, this startup step is essential to allow you to work with the same code on Linux, Windows, and Mac:
pygame.init()

simulation = pygame.sprite.Group()
# Sprite is used for rendering.
# Group is an object that holds a group of Sprite objects.

Classes

TrafficSignal class is used to initiate the 4 traffic lights, each with 4 attributes:

class TrafficSignal :
    def __init__(self , red , yellow , green , timer_text):
        self.red = red
        self.yellow = yellow
        self.green = green
        self.signalText = ' '
        # self.signalText -> to display a timer on each signal.

Vehicle class is used to generate vehicle objects to be used in the simulation:

Attributes of Vehicle class:

  • lane : Represents the lane that the vehicle moves on.
  • direction: Represents the direction in text format.
  • x: Represents the current x-coordinate of the vehicle (location).
  • y: Represents the current y-coordinate of the vehicle (location).
  • crossed: Represents whether the vehicle has crossed the signal or not.
  • index: Represents the relative position of the vehicle among the vehicles moving in the same direction. (to calculate the stopping coordinates if its equal to the default stop or it must be calculated based on the relative location).
  • direction_number: Represents the direction by number (right takes value of 0 and going CCW).
  • image: Represents the image to be rendered.

Methods of Vehicle class

  • render(): To render -display- the image on screen.
  • move(): To control the movement of the vehicle according to the traffic light and the vehicles ahead.

steps of move method here


class Vehicle(pygame.sprite.Sprite):
    def __init__(self, lane, direction_number, direction, will_turn):
        pygame.sprite.Sprite.__init__(self)
        self.lane = lane
        self.direction_number = direction_number
        self.direction = direction
        self.x = x[direction][lane]
        self.y = y[direction][lane]
        self.crossed = 0
        self.willTurn = will_turn
        self.turned = 0
        self.rotateAngle = 0

        vehicles[direction][lane].append(self)
        self.index = len(vehicles[direction][lane]) - 1
        self.crossedIndex = 0
        path = "ai_traffic_system/images/" + direction + "/" + vehicleClass + ".png"
        self.originalImage = pygame.image.load(path)
        self.image = pygame.image.load(path)

The lane and direction together are used to determine the coordination. who to determine the x- coordinates as well as the y-coordinates for the vehicle object? the direction represent the key of the predefined x and y dictionaries and the lane represent the index of the coordinates in the list that nested as value. for example if you initiate and object and call it n , where n = Vehicle(2, 'car' , 1, 'down') , the 2 represent the lane and 'down' represent the direction , if we print the self.x the result will be 697 and the self.y will be 0.

The Vehicle class inherit from the base class for visible game objects pygame.sprite.Sprite. Derived classes will want to override the Sprite.update() and assign a Sprite.image and Sprite.rect attributes. The initializer can accept any number of Group instances to be added to. When subclassing the Sprite, you must be sure to call the base initializer before adding the Sprite to Groups pygame.sprite.Sprite.__init__(self).


Class Main is the responsible class for running the game:

    # initialization
    thread1 = threading.Thread(name="initialization", target=initialize, args=())  
    thread1.daemon = True
    thread1.start()

    # Colours
    black = (0, 0, 0)
    white = (255, 255, 255)

    # Screensize
    screenWidth = 1400
    screenHeight = 800
    screenSize = (screenWidth, screenHeight)

    # Setting background image (image of intersection)
    background = pygame.image.load("ai_traffic_system/images/intersection2.png")
    screen = pygame.display.set_mode(screenSize)
    pygame.display.set_caption("SIMULATION")

    # Loading signal images and font
    redSignal = pygame.image.load("ai_traffic_system/images/signals/red.png")
    yellowSignal = pygame.image.load("ai_traffic_system/images/signals/yellow.png")
    greenSignal = pygame.image.load("ai_traffic_system/images/signals/green.png")
    font = pygame.font.Font(None, 30)

    thread2 = threading.Thread(name="generateVehicles", target=generateVehicles, args=())
    thread2.daemon = True
    thread2.start()

    thread5 = threading.Thread(name="simTime", target=simTime, args=())
    thread5.daemon = True
    thread5.start()

    while True:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                showStats()
                sys.exit()

    # display background in simulation
        screen.blit(background, (0, 0))  
        for i in range(0, noOfSignals):
    # display signal and set timer according to current status: green, yellow, red
            if i == currentGreen:
                if currentYellow == 1:
                    signals[i].signalText = signals[i].yellow
                    screen.blit(yellowSignal, signalCoods[i])

                else:
                    signals[i].signalText = signals[i].green
                    screen.blit(greenSignal, signalCoods[i])

            else:
                signals[i].signalText = signals[i].red
                screen.blit(redSignal, signalCoods[i])

                if signals[i].red <= 10:
                    signals[i].signalText = signals[i].red

                else:
                    signals[i].signalText = '---'
                screen.blit(redSignal, signalCoods[i])
        signalTexts = ['', '', '', '']

        # display signal timer
        for i in range(0, noOfSignals):
            signalTexts[i] = font.render(str(signals[i].signalText), True, white, black)
            screen.blit(signalTexts[i], signalTimerCoods[i])

        # display the vehicles
        for vehicle in simulation:
            screen.blit(vehicle.image, [vehicle.x, vehicle.y])
            vehicle.move()
        pygame.display.update()
Main()

Functions

  • Initialize() -> This function is for initializing traffic lights with predicted values:
**def initialize():**
    if prediction_model_mode:
        L1_percen = count_Leg1 / total_flow_count
        L2_percen = count_Leg2 / total_flow_count
        L3_percen = count_Leg3 / total_flow_count
        L4_percen = count_Leg4 / total_flow_count

        T1_predict = ml_model_timer(L1_percen)
        T2_predict = ml_model_timer(L2_percen)
        T3_predict = ml_model_timer(L3_percen)
        T4_predict = ml_model_timer(L4_percen)

        ts1 = TrafficSignal(0, defaultYellow, T1_predict)
        signals.append(ts1)
        tts1 = ts1.red + ts1.yellow + ts1.green

        ts2 = TrafficSignal(tts1, defaultYellow, T2_predict)
        signals.append(ts2)
        tts2 = tts1 + ts2.yellow + ts2.green

        ts3 = TrafficSignal(tts2, defaultYellow, T3_predict)
        signals.append(ts3)
        tts3 = tts2 + ts3.yellow + ts3.green

        ts4 = TrafficSignal(tts3, defaultYellow, T4_predict)
        signals.append(ts4)

   else:
        ts1 = TrafficSignal(0, defaultYellow, defaultGreen[0])
        signals.append(ts1)
        ts2 = TrafficSignal(ts1.yellow + ts1.green, defaultYellow, defaultGreen[1])
        signals.append(ts2)
        ts3 = TrafficSignal(defaultRed, defaultYellow, defaultGreen[2])
        signals.append(ts3)
        ts4 = TrafficSignal(defaultRed, defaultYellow, defaultGreen[3])
        signals.append(ts4)
    repeat()
  • printStatus()
**def printStatus():**
    for i in range(0, 4):
        if signals[i] != None:
            if i == currentGreen:
                if currentYellow == 0:
                    print(
                        " GREEN TS",
                        i + 1,
                        "-> r:",
                        signals[i].red,
                        "-> y:",
                        signals[i].yellow,
                        "-> g:",
                        signals[i].green,
                    )
                else:
                    print(
                        "YELLOW TS",
                        i + 1,
                        "-> r:",
                        signals[i].red,
                        "-> y:",
                        signals[i].yellow,
                        "-> g:",
                        signals[i].green,
                    )
            else:
                print(
                    "   RED TS",
                    i + 1,
                    "-> r:",
                    signals[i].red,
                    "-> y:",
                    signals[i].yellow,
                    "-> g:",
                    signals[i].green,
                )
    print()
  • repeat() -> this function is used to repeat generating traffic lights
**def repeat():**
    global currentGreen, currentYellow, nextGreen
    while (signals[currentGreen].green > 0):
        printStatus()
        updateValues()
        time.sleep(1)

    currentYellow = 1

    for i in range(0, 3):
        for vehicle in vehicles[directionNumbers[currentGreen]][i]:
            vehicle.stop = defaultStop[directionNumbers[currentGreen]]

   while (signals[currentGreen].yellow > 0 ):
        printStatus()
        updateValues()
        time.sleep(1)

    currentYellow = 0

    if prediction_model_mode:
        signals[0].green = ml_model_timer(count_Leg1 / total_flow_count)
        signals[1].green = ml_model_timer(count_Leg2 / total_flow_count)
        signals[2].green = ml_model_timer(count_Leg3 / total_flow_count)
        signals[3].green = ml_model_timer(count_Leg4 / total_flow_count)
    else:
        signals[currentGreen].green = defaultGreen[currentGreen]

    signals[currentGreen].yellow = defaultYellow
    signals[currentGreen].red = defaultRed

# set next signal as green signal
    currentGreen = nextGreen  
# set next green signal
    nextGreen = (currentGreen + 1) % noOfSignals  
# set the red time of next to next signal as (yellow time + green time) of next signal
    signals[nextGreen].red = (signals[currentGreen].yellow + signals[currentGreen].green)  

    repeat()
  • updateValues() -> this function updates values of signal timers after every second
def updateValues():
    for i in range(0, noOfSignals):
        if i == currentGreen:
            if currentYellow == 0:
                signals[i].green -= 1
            else:
                signals[i].yellow -= 1
        else:
            signals[i].red -= 1
  • generateVehicles() -> this function is used for generating vehicle objects
def generateVehicles():

    while True:
        vehicle_type = random.choice(allowedVehicleTypesList)
        lane_number = random.randint(1, 2)
        will_turn = 0

        if lane_number == 1:
            temp = random.randint(0, 99)
            if temp < 35:
                will_turn = 1
        elif lane_number == 2:
            temp = random.randint(0, 99)
            if temp < 35:
                will_turn = 1

        temp = random.randint(0, 100)
        direction_number = 0

        dist = [5, 11, 56, 101]
        if temp < dist[0]:
            direction_number = 1  # north to south (Down)
            count_Leg2 += 1

        elif temp < dist[1]:
            direction_number = 3  # south to north (Up)
            count_Leg4 += 1

        elif temp < dist[2]:
            direction_number = 0  # west to east (Right)
            count_Leg1 += 1

        elif temp < dist[3]:
            direction_number = 2  # east to west  (Left)
            count_Leg3 += 1

        Vehicle(lane_number,direction_number,directionNumbers[direction_number],will_turn)
        time.sleep(1.25)

        total_flow_count += 1
        print("Total flow count: ", total_flow_count)
  • showStatus()
def showStats():
    totalVehicles = 0
    print("Direction-wise Vehicle crossed Counts of Lanes#")
    for i in range(0, 4):
        if signals[i] != None:
            print("Direction", i + 1, ":", vehicles[directionNumbers[i]]["crossed"])
            totalVehicles += vehicles[directionNumbers[i]]["crossed"]
    print("Total vehicles passed: ", totalVehicles)
    print("Total time: ", timeElapsed)
  • simTime()
def simTime():

    global timeElapsed, simulationTime
    while True:
        timeElapsed += 1
        time.sleep(1)
        if timeElapsed == simulationTime:
            showStats()
            os._exit(1)

Demonstration of the simulation

Demo


Team members of the project


Tools

Pygame guide

Kaggle Linear Regression model

Data sample

Project Readings Link

GitHub project code


References

Solving Traffic with A.I.

Tenesorflow Turorial AI traffic system

Tensorflow

AI traffic system simulation

Intelligent traffic lights

Traffic light detection using tensorflow

Traffic management using AI

 
Share this