Enforcing Driver Breaks After Departure Time in Google OR-Tools VRP

Thana B. - Oct 3 - - Dev Community

In the world of logistics and transportation, planning efficient routes for vehicles is crucial. However, equally important is ensuring that drivers adhere to regulations, such as taking mandatory breaks after certain hours of driving. In this blog post, we'll explore how to enforce break constraints after a vehicle departs from the depot using Google OR-Tools in a Capacitated Vehicle Routing Problem with Time Windows and Breaks (CVRPTWB).

We'll walk through a Python example where we define an ActiveTime dimension that tracks the time since each vehicle departed from the depot. By leveraging this dimension, we'll set up mandatory breaks for drivers after specific hours of active driving.


Problem Overview

We are tackling a Capacitated Vehicle Routing Problem with Time Windows and Breaks (CVRPTWB). In this problem, we need to plan routes for vehicles that have:

  • Capacity constraints: Each vehicle has a maximum load capacity.
  • Time windows: Each customer must be served within a specific time frame.
  • Driver breaks: Drivers must take breaks after certain hours of driving since they departed from the depot.

Our goal is to minimize the total distance traveled while adhering to these constraints.


Setting Up the Environment

First, ensure you have Google OR-Tools installed:

pip install ortools
Enter fullscreen mode Exit fullscreen mode

Now, let's dive into the code.


Code Walkthrough

Importing Necessary Libraries

from ortools.constraint_solver import pywrapcp
from ortools.constraint_solver import routing_enums_pb2
import random
import pandas as pd
Enter fullscreen mode Exit fullscreen mode

Creating the Data Model

We define a function create_data_model() to generate our problem's data, including locations, demands, and time windows.

def create_data_model():
    """Stores the data for the problem."""
    data = {}
    ...
    return data
Enter fullscreen mode Exit fullscreen mode

Key parameters:

  • Number of orders: data["num_orders"]
  • Number of vehicles: data["num_vehicles"]
  • Vehicle capacities: data["vehicle_capacities"]
  • Time windows: data["time_windows"]
  • Service time per demand unit: data["time_per_demand_unit"]

Note: We use random generation for locations, demands, and time windows to simulate real-world variability. Due to this randomness, sometimes the problem may not have a feasible solution. If that happens, you might need to run the code multiple times.


Initializing the Routing Model

We create the routing index manager and the routing model.

manager = pywrapcp.RoutingIndexManager(
    data["num_orders"] + 1, data["num_vehicles"], data["depot"]
)
routing = pywrapcp.RoutingModel(manager)
parameters = pywrapcp.DefaultRoutingSearchParameters()

parameters.first_solution_strategy = (
    routing_enums_pb2.FirstSolutionStrategy.PARALLEL_CHEAPEST_INSERTION
)
Enter fullscreen mode Exit fullscreen mode

Defining Distance and Demand Callbacks

We define the Manhattan distance and demand callbacks to calculate distances between nodes and demands at each node.

def manhattan_distance(x1, y1, x2, y2):
    return abs(x1 - x2) + abs(y1 - y2)

def transit_callback(i, j):
    ...

def demand_callback(i):
    ...
Enter fullscreen mode Exit fullscreen mode

We register these callbacks with the routing model.

transit_index = routing.RegisterTransitCallback(transit_callback)
routing.SetArcCostEvaluatorOfAllVehicles(transit_index)

demand_index = routing.RegisterUnaryTransitCallback(demand_callback)
routing.AddDimensionWithVehicleCapacity(
    demand_index, 0, data["vehicle_capacities"], True, "Capacity"
)
Enter fullscreen mode Exit fullscreen mode

Setting Service Times and Time Windows

We calculate service times based on demands and add the time dimension with time windows.

service_times = [
    data["time_per_demand_unit"] * data["demands"][manager.IndexToNode(i)]
    for i in range(routing.Size())
]

def time_callback(i, j):
    ...

time_index = routing.RegisterTransitCallback(time_callback)
routing.AddDimension(time_index, data["horizon"], data["horizon"], False, "Time")
time_dimension = routing.GetDimensionOrDie("Time")
Enter fullscreen mode Exit fullscreen mode

We then add time window constraints for each customer and the depot.

for order in range(1, manager.GetNumberOfNodes()):
    start, end = data["time_windows"][order]
    index = manager.NodeToIndex(order)
    time_dimension.CumulVar(index).SetRange(start, end)

for vehicle_id in range(data["num_vehicles"]):
    index = routing.Start(vehicle_id)
    time_dimension.CumulVar(index).SetRange(
        data["time_windows"][data["depot"]][0], data["time_windows"][data["depot"]][1]
    )
Enter fullscreen mode Exit fullscreen mode

Key Part 1: Defining the ActiveTime Dimension

To enforce breaks after a certain amount of active driving time since departing from the depot, we introduce an ActiveTime dimension.

Defining the ActiveTime Callback

def active_time_callback(i, j):
    loc_i = data["locations"][manager.IndexToNode(i)]
    loc_j = data["locations"][manager.IndexToNode(j)]
    return (
        int(manhattan_distance(loc_i[0], loc_i[1], loc_j[0], loc_j[1]) / data["speed"])
        + service_times[manager.IndexToNode(i)]
    )
Enter fullscreen mode Exit fullscreen mode

This callback calculates the transit time between nodes, including the service time at the current node.

Adding the ActiveTime Dimension

We register the active_time_callback and add the ActiveTime dimension. Crucially, we set fix_start_cumul_to_zero to True, ensuring that the active time for each vehicle starts from zero at the depot.

active_time_index = routing.RegisterTransitCallback(active_time_callback)
routing.AddDimensionWithVehicleTransits(
    [active_time_index]*data["num_vehicles"], 0, data["horizon"], True, "ActiveTime"
)
active_time_dimension = routing.GetDimensionOrDie("ActiveTime")
Enter fullscreen mode Exit fullscreen mode

Explanation:

  • AddDimensionWithVehicleTransits allows us to add a dimension that can have different transit callbacks for each vehicle. In our case, we use the same callback for all vehicles.
  • The fourth argument True is for fix_start_cumul_to_zero, which fixes the cumulative value at the start of each vehicle's route to zero. This is essential for accurately tracking the active time since departing from the depot.

Minimizing Time Variables

We also add time minimization constraints to help the solver find better solutions.

for i in range(routing.Size()):
    routing.AddVariableMinimizedByFinalizer(time_dimension.CumulVar(i))
    routing.AddVariableMinimizedByFinalizer(active_time_dimension.CumulVar(i))
for j in range(data["num_vehicles"]):
    routing.AddVariableMinimizedByFinalizer(time_dimension.CumulVar(routing.Start(j)))
    routing.AddVariableMinimizedByFinalizer(time_dimension.CumulVar(routing.End(j)))
    routing.AddVariableMinimizedByFinalizer(active_time_dimension.CumulVar(routing.Start(j)))
    routing.AddVariableMinimizedByFinalizer(active_time_dimension.CumulVar(routing.End(j)))
Enter fullscreen mode Exit fullscreen mode

Key Part 2: Adding Vehicle Breaks Based on ActiveTime

Now that we have our ActiveTime dimension, we can set up breaks that occur after certain hours of active driving.

Defining Break Data

We define the break intervals that we want to enforce. For example, drivers must take a 30-minute break between 6-8 hours and another between 14-16 hours of active driving.

break_data = [
    # [start_min, start_max, duration] (in seconds)
    [6 * 3600, 8 * 3600, 1 * 1800],  # Morning break
    [14 * 3600, 16 * 3600, 1 * 1800],  # Afternoon break
]
Enter fullscreen mode Exit fullscreen mode

Creating and Enforcing Break Intervals

We create break intervals for each vehicle and enforce that they must be taken.

solver = routing.solver()
breaks = {}
for vehicle in range(data["num_vehicles"]):
    breaks[vehicle] = []
    for i, data_break in enumerate(break_data):
        break_interval = solver.FixedDurationIntervalVar(
            data_break[0],
            data_break[1],
            data_break[2],
            False,  # is_optional set to False
            f"Vehicle {vehicle}'s Break {i}",
        )
        breaks[vehicle].append(break_interval)

    # Apply break constraints
    solver.Add(breaks[vehicle][0].PerformedExpr() == 1)
    solver.Add(breaks[vehicle][1].PerformedExpr() == 1)

    index = routing.Start(vehicle)
    solver.Add(time_dimension.CumulVar(index) < breaks[vehicle][0].StartExpr())
Enter fullscreen mode Exit fullscreen mode

Linking Breaks to ActiveTime Dimension

This is the crucial step: We set the breaks to be based on the ActiveTime dimension rather than the default time dimension.

# Set breaks based on ActiveTime dimension
active_time_dimension.SetBreakIntervalsOfVehicle(
    breaks[vehicle], vehicle, service_times
)
Enter fullscreen mode Exit fullscreen mode

Explanation:

  • By calling SetBreakIntervalsOfVehicle with our active_time_dimension, we ensure that breaks are scheduled based on the cumulative active time since the vehicle departed from the depot.
  • The service_times parameter accounts for the time spent servicing customers.

Optional: Forcing Longer Working Hours for Testing

Note: In our example, to test the breaks effectively, we add constraints to ensure vehicles work longer hours. This is for experimentation purposes and can be removed in actual applications.

# Please delete this when you apply to your problem
# This is added for this experiment to ensure results with long hours of working so we can test our breaks
for vehicle in range(data["num_vehicles"]):
    index = routing.Start(vehicle)
    solver.Add(time_dimension.CumulVar(index) <= 1 * 3600)
    index = routing.End(vehicle)
    solver.Add(time_dimension.CumulVar(index) >= 23 * 3600)
Enter fullscreen mode Exit fullscreen mode

Allowing Orders to Be Skipped with Penalties

To provide flexibility in routing, we add penalties for skipping orders. This helps the solver make decisions that balance total distance with the need to serve all customers.

kPenalty = 10000000  # Penalty for skipping an order
for order in range(1, manager.GetNumberOfNodes()):
    routing.AddDisjunction([manager.NodeToIndex(order)], kPenalty)
Enter fullscreen mode Exit fullscreen mode

Solving the Problem

We set up the solver and solve the problem.

solution = routing.SolveWithParameters(parameters)
Enter fullscreen mode Exit fullscreen mode

Extracting and Printing the Solution

Finally, we extract the solution and print out the routes, including the break schedules.

Printing the Break Plans

print("==== BREAK PLAN ====")
for vehicle_id in range(data["num_vehicles"]):
    vehicle_breaks = breaks[vehicle_id]
    for break_interval in vehicle_breaks:
        if solution.PerformedValue(break_interval):
            print(
                f"{break_interval.Name()} Start({break_interval.StartExpr()}) "
                f"Duration({break_interval.DurationExpr()}) "
                f"End({break_interval.EndExpr()})"
            )
Enter fullscreen mode Exit fullscreen mode

Printing the Route Plans

print("\n\n==== ROUTE PLAN ====")
total_distance = 0
total_load = 0
total_time = 0
capacity_dimension = routing.GetDimensionOrDie("Capacity")
time_dimension = routing.GetDimensionOrDie("Time")

for vehicle_id in range(data["num_vehicles"]):
    index = routing.Start(vehicle_id)
    plan_output = f"Route for vehicle {vehicle_id}:\n"
    route_distance = 0
    route_load = 0
    while not routing.IsEnd(index):
        ...
    print(plan_output)
    total_distance += route_distance
    total_load += route_load
    total_time += solution.Max(time_var)

print(f"Total Distance of all routes: {total_distance} units")
print(f"Total Load of all routes: {total_load}")
print(f"Total Time of all routes: {round(total_time/3600, 3)} hours")
Enter fullscreen mode Exit fullscreen mode

Full Code

For completeness, here's the full code with comments:

# Capacitated Vehicle Routing Problem with Time Windows and Breaks (CVRPTWB)

from ortools.constraint_solver import pywrapcp
from ortools.constraint_solver import routing_enums_pb2
import random
import pandas as pd

# Data Model
def create_data_model():
    """Stores the data for the problem."""
    data = {}
    # Problem parameters corresponding to the mathematical model
    data["num_orders"] = 20  # |C| = 20 customer nodes
    data["num_vehicles"] = 3  # |V| = 3 vehicles
    data["depot"] = 0  # Depot node
    data["vehicle_capacities"] = [40] * data["num_vehicles"]  # Q = 40 for each vehicle
    data["speed"] = 100  # Vehicle speed (used in time calculations)

    # Time parameters
    data["time_per_demand_unit"] = 100  # Service time per demand unit (s_i = 100 * d_i)
    data["horizon"] = 24 * 3600  # H = 24 hours in seconds

    num_locations = data["num_orders"] + 1  # Total nodes |N| = |C| + 1 (depot)

    # Random seed for reproducibility
    use_deterministic_seed = False
    if use_deterministic_seed:
        random.seed(0)
    else:
        random.seed(None)

    # Randomly generate locations (x_i, y_i for each node i)
    kXMax = 100000
    kYMax = 100000
    data["locations"] = [
        (random.randint(0, kXMax), random.randint(0, kYMax))
        for _ in range(num_locations)
    ]

    # Random demands (d_i for each customer i)
    data["demands"] = [0] + [random.randint(1, 10) for _ in range(data["num_orders"])]

    # Time windows ([a_i, b_i] for each node i)
    data["time_windows"] = []
    kTWDuration = 5 * 3600  # 5 hours in seconds
    for _ in range(num_locations):
        start = random.randint(0, data["horizon"] - kTWDuration)
        data["time_windows"].append((start, start + kTWDuration))

    return data


# Instantiate the data problem
data = create_data_model()

# Initialize routing and search parameters
manager = pywrapcp.RoutingIndexManager(
    data["num_orders"] + 1, data["num_vehicles"], data["depot"]
)
routing = pywrapcp.RoutingModel(manager)
parameters = pywrapcp.DefaultRoutingSearchParameters()

# Set first solution strategy (not in mathematical model, but affects solution quality)
parameters.first_solution_strategy = (
    routing_enums_pb2.FirstSolutionStrategy.PARALLEL_CHEAPEST_INSERTION
)


# Define the Manhattan distance function (c_ij in the model)
def manhattan_distance(x1, y1, x2, y2):
    return abs(x1 - x2) + abs(y1 - y2)


# Callback for distance calculation (implements c_ij)
def transit_callback(i, j):
    loc_i = data["locations"][manager.IndexToNode(i)]
    loc_j = data["locations"][manager.IndexToNode(j)]
    return manhattan_distance(loc_i[0], loc_i[1], loc_j[0], loc_j[1])


transit_index = routing.RegisterTransitCallback(transit_callback)
routing.SetArcCostEvaluatorOfAllVehicles(transit_index)


# Adding capacity dimension constraints (Constraint 3 in the model)
def demand_callback(i):
    return data["demands"][manager.IndexToNode(i)]


demand_index = routing.RegisterUnaryTransitCallback(demand_callback)
routing.AddDimensionWithVehicleCapacity(
    demand_index, 0, data["vehicle_capacities"], True, "Capacity"
)


# Set service times (s_i in the model)
service_times = [
    data["time_per_demand_unit"] * data["demands"][manager.IndexToNode(i)]
    for i in range(routing.Size())
]


def time_callback(i, j):
    loc_i = data["locations"][manager.IndexToNode(i)]
    loc_j = data["locations"][manager.IndexToNode(j)]
    return (
        int(manhattan_distance(loc_i[0], loc_i[1], loc_j[0], loc_j[1]) / data["speed"])
        + service_times[manager.IndexToNode(i)]
    )


time_index = routing.RegisterTransitCallback(time_callback)
routing.AddDimension(time_index, data["horizon"], data["horizon"], False, "Time")
time_dimension = routing.GetDimensionOrDie("Time")


# === Key Part 1: Defining ActiveTime Dimension === #
def active_time_callback(i, j):
    loc_i = data["locations"][manager.IndexToNode(i)]
    loc_j = data["locations"][manager.IndexToNode(j)]
    return (
        int(manhattan_distance(loc_i[0], loc_i[1], loc_j[0], loc_j[1]) / data["speed"])
        + service_times[manager.IndexToNode(i)]
    )

active_time_index = routing.RegisterTransitCallback(active_time_callback)
routing.AddDimensionWithVehicleTransits(
    [active_time_index]*data["num_vehicles"], 0, data["horizon"], True, "ActiveTime"
)
active_time_dimension = routing.GetDimensionOrDie("ActiveTime")
# ================================================ #


# Adding time windows (Constraint 4 in the model)
for order in range(1, manager.GetNumberOfNodes()):
    start, end = data["time_windows"][order]
    index = manager.NodeToIndex(order)
    time_dimension.CumulVar(index).SetRange(start, end)

for vehicle_id in range(data["num_vehicles"]):
    index = routing.Start(vehicle_id)
    active_time_dimension.CumulVar(index).SetValue(0)

# Add time window constraints for each vehicle start node.
depot_idx = data["depot"]
for vehicle_id in range(data["num_vehicles"]):
    index = routing.Start(vehicle_id)
    time_dimension.CumulVar(index).SetRange(
        data["time_windows"][depot_idx][0], data["time_windows"][depot_idx][1]
    )

# Minimizing time variables (part of the objective function)
for i in range(routing.Size()):
    routing.AddVariableMinimizedByFinalizer(time_dimension.CumulVar(i))
for j in range(data["num_vehicles"]):
    routing.AddVariableMinimizedByFinalizer(time_dimension.CumulVar(routing.Start(j)))
    routing.AddVariableMinimizedByFinalizer(time_dimension.CumulVar(routing.End(j)))

# Minimizing ActiveTime variables
for i in range(routing.Size()):
    routing.AddVariableMinimizedByFinalizer(active_time_dimension.CumulVar(i))
for j in range(data["num_vehicles"]):
    routing.AddVariableMinimizedByFinalizer(
        active_time_dimension.CumulVar(routing.Start(j))
    )
    routing.AddVariableMinimizedByFinalizer(
        active_time_dimension.CumulVar(routing.End(j))
    )


# === Key Part 2: Adding Vehicle Breaks === #
solver = routing.solver()


break_data = [
    # [start_min, start_max, duration] (in seconds)
    [6 * 3600, 8 * 3600, 1 * 1800],  # Morning break
    [14 * 3600, 16 * 3600, 1 * 1800],  # Afternoon break
]

# Optional: Forcing long working hours for testing
# Please delete this when you apply to your problem
# This is added for this experiment to ensure results with long hours of working so we can test our breaks
for vehicle in range(data["num_vehicles"]):
    index = routing.Start(vehicle)
    solver.Add(time_dimension.CumulVar(index) <= 1 * 3600)
    index = routing.End(vehicle)
    solver.Add(time_dimension.CumulVar(index) >= 23 * 3600)

breaks = {}
for vehicle in range(data["num_vehicles"]):
    # Declare a list of break intervals for each vehicle
    breaks[vehicle] = []
    for i, data_break in enumerate(break_data):
        # Declare each possible break interval for the current vehicle
        break_interval = solver.FixedDurationIntervalVar(
            data_break[0],
            data_break[1],
            data_break[2],
            False,  # is_optional set to False
            f"Vehicle {vehicle}'s Break {i}",
        )
        breaks[vehicle].append(break_interval)

    # Apply break constraints
    solver.Add(breaks[vehicle][0].PerformedExpr() == 1)
    solver.Add(breaks[vehicle][1].PerformedExpr() == 1)

    index = routing.Start(vehicle)
    solver.Add(time_dimension.CumulVar(index) < breaks[vehicle][0].StartExpr())

    # Set breaks based on ActiveTime dimension
    active_time_dimension.SetBreakIntervalsOfVehicle(
        breaks[vehicle], vehicle, service_times
    )
# ========================================== #


# Adding penalty costs to allow skipping orders
kPenalty = 10000000  # Penalty for skipping an order
for order in range(1, manager.GetNumberOfNodes()):
    routing.AddDisjunction([manager.NodeToIndex(order)], kPenalty)

# Solve the problem
solution = routing.SolveWithParameters(parameters)

print("==== BREAK PLAN ====")
for vehicle_id in range(data["num_vehicles"]):
    # Get the break intervals for the current vehicle
    vehicle_breaks = breaks[vehicle_id]
    # Check each break interval of the vehicle
    for break_interval in vehicle_breaks:
        # Find whether the break interval was performed or not from the solution
        if solution.PerformedValue(break_interval):
            print(
                f"{break_interval.Name()} Start({break_interval.StartExpr()}) "
                f"Duration({break_interval.DurationExpr()}) "
                f"End({break_interval.EndExpr()})"
            )


print("\n\n==== ROUTE PLAN ====")
total_distance = 0
total_load = 0
total_time = 0
capacity_dimension = routing.GetDimensionOrDie("Capacity")
time_dimension = routing.GetDimensionOrDie("Time")

for vehicle_id in range(data["num_vehicles"]):
    index = routing.Start(vehicle_id)
    plan_output = f"Route for vehicle {vehicle_id}:\n"
    route_distance = 0
    route_load = 0
    while not routing.IsEnd(index):
        node_index = manager.IndexToNode(index)
        load_var = capacity_dimension.CumulVar(index)
        time_var = time_dimension.CumulVar(index)
        route_load = solution.Value(load_var)
        time_min = solution.Min(time_var)
        time_max = solution.Max(time_var)

        plan_output += f" {node_index} Load({route_load}) Time({round(time_min/3600, 3)},{round(time_max/3600, 3)}) -> "

        previous_index = index
        index = solution.Value(routing.NextVar(index))
        route_distance += routing.GetArcCostForVehicle(
            previous_index, index, vehicle_id
        )

    node_index = manager.IndexToNode(index)
    time_var = time_dimension.CumulVar(index)
    time_min = solution.Min(time_var)
    time_max = solution.Max(time_var)
    load_var = capacity_dimension.CumulVar(index)
    route_load = solution.Value(load_var)

    plan_output += (
        f" {node_index} Time({round(time_min/3600, 3)},{round(time_max/3600, 3)})\n"
    )
    plan_output += f"Distance of the route: {route_distance} units\n"
    plan_output += f"Load of the route: {route_load}\n"
    print(plan_output)
    total_distance += route_distance
    total_load += route_load
    total_time += solution.Max(time_var)


print(f"Total Distance of all routes: {total_distance} units")
print(f"Total Load of all routes: {total_load}")
print(f"Total Time of all routes: {round(total_time/3600, 3)} hours")
Enter fullscreen mode Exit fullscreen mode

Validate Result

I wrote additional code to confirm breaks as below:

import pandas as pd

def seconds_to_hhmm(seconds):
    # Convert seconds to HH:MM format
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    return f"{hours}:{minutes:02d}"


events = []
for vehicle_id in range(data["num_vehicles"]):
    index = routing.Start(vehicle_id)

    break_remark = ""
    is_start = True

    while not routing.IsEnd(index):
        next_index = solution.Value(routing.NextVar(index))
        node_index = manager.IndexToNode(index)
        next_node_index = manager.IndexToNode(next_index)

        time_var = time_dimension.CumulVar(index)
        arrival_time = solution.Min(time_var)  # in seconds

        if is_start:
            depot_arrival_time = arrival_time
            is_start = False

        time_var = time_dimension.CumulVar(next_index)
        actual_next_arrival_time = solution.Min(time_var)  # in seconds

        service_time = service_times[node_index]  # in seconds
        start_wait = arrival_time + service_time
        expected_next_arrival_time = arrival_time + time_callback(index, next_index)
        transit_duration = expected_next_arrival_time - service_time - arrival_time
        waiting_duration = actual_next_arrival_time - expected_next_arrival_time
        start_transit = start_wait + waiting_duration
        if routing.IsStart(index):
            events.append(
                (
                    vehicle_id,
                    node_index,
                    "start_route",
                    arrival_time,
                    seconds_to_hhmm(arrival_time),
                    0,
                    seconds_to_hhmm(arrival_time - depot_arrival_time),
                    "",
                )
            )
        else:
            events.append(
                (
                    vehicle_id,
                    node_index,
                    "arrival_time",
                    arrival_time,
                    seconds_to_hhmm(arrival_time),
                    0,
                    seconds_to_hhmm(arrival_time - depot_arrival_time),
                    "",
                )
            )

        events.append(
            (
                vehicle_id,
                node_index,
                "start_transit",
                start_transit,
                seconds_to_hhmm(start_transit),
                seconds_to_hhmm(transit_duration),
                seconds_to_hhmm(start_transit - depot_arrival_time),
                "",
            )
        )

        index = next_index
        if waiting_duration > 0:
            end_wait = start_wait + waiting_duration

            for break_id, break_info in enumerate(break_data):
                overlap_start = max(start_wait - depot_arrival_time, break_info[0])
                overlap_end = min(
                    end_wait - depot_arrival_time, break_info[1] + break_info[2]
                )
                overlap_duration = overlap_end - overlap_start

                if overlap_duration > break_info[2]:
                    this_remark = f"break {break_id} "
                    break_remark =  break_remark + this_remark

            events.append(
                (
                    vehicle_id,
                    next_node_index,
                    "start_wait (idle + break)",
                    start_wait,
                    seconds_to_hhmm(start_wait),
                    seconds_to_hhmm(waiting_duration),
                    seconds_to_hhmm(start_wait - depot_arrival_time),
                    break_remark,
                )
            )

        break_remark = ""
        if routing.IsEnd(next_index):
            events.append(
                (
                    vehicle_id,
                    next_node_index,
                    "end_route",
                    actual_next_arrival_time,
                    seconds_to_hhmm(actual_next_arrival_time),
                    0,
                    seconds_to_hhmm(actual_next_arrival_time - depot_arrival_time),
                    "",
                )
            )

df_result = pd.DataFrame(
    events,
    columns=[
        "vehicle_id",
        "node",
        "event",
        "timestamp",
        "time",
        "duration",
        "since_start",
        "remark",
    ],
).sort_values(["vehicle_id", "timestamp"])
df_result = df_result[
    [
        "vehicle_id",
        "node",
        "event",
        "timestamp",
        "time",
        "since_start",
        "duration",
        "remark",
    ]
].copy()
Enter fullscreen mode Exit fullscreen mode

Here is an example output:

Image description


Conclusion

By introducing an ActiveTime dimension and linking driver breaks to this dimension, we can effectively enforce breaks after specific hours of driving since departing from the depot in Google OR-Tools. This approach ensures compliance with regulations and enhances driver safety.

Remember that due to random data generation, you might need to run the code multiple times to obtain a feasible solution. In real-world applications, you'd use actual data, which would provide consistent results.

.
Terabox Video Player