Weighted Interval Scheduling¶
- Given $N$ intervals, where interval $i$ starts at $s_i$, finishes at $f_i$, and has weight $w_i$.
- Two intervals are compatible if they don't overlap.
- Goal: find maximum weight subset of mutually compatible intervals.
Let's say that we have the following set of intervals:
intervals = [
[0, 3, 3], # Starts at t=0, ends at t=3, and have weight=3
[1, 4, 2],
[0, 5, 4],
[3, 6, 1],
[4, 7, 2],
[3, 9, 5],
[5, 10, 2],
[8, 10, 1],
]
Dynamic Programming Solution¶
This tutorial is adapted from GitHub.
Let $OPT[i]$ denote the value of the optimal solution consisting of the intervals $1, 2, ..., i$.
CASE 1: $OPT$ does include the interval $i$. In this case, $OPT$ cannot select any jobs incompatible with $i$, and $OPT$ must include optimal solution to problem consisting of remaining compatible jobs $1, 2, ..., p(i)$, where $p(i)$ denotes the last job that is compatible with $i$.
CASE 2: $OPT$ doesn't include the interval $i$. In this case, $OPT$ must include the optimal solution consisting of intervals $1, ..., i-1$.
The optimal solution should have the maximum value between these two cases. Hence, we have
$$ OPT[i] = \begin{cases} 0 & \text{if i = 0} \\ \max\left(w_i + OPT[p(i)], OPT[i-1]\right) & \text{otherwise} \end{cases} $$
Implementation¶
First, we want to sort the intervals by finish time.
intervals = sorted(intervals, key=lambda x: x[1])
Then, create an empty array $OPT$, where $OPT[i]$ stores the optimal value consisting of the first $i$ intervals.
N = len(intervals)
OPT = [0] * (N + 1)
When we include interval $i$, we need to determine which intervals are no longer available. To do this, we will precompute an array $p$, where $p[i]$ denotes the largest index $j < i$ s.t. interval $i$ is compatible with $j$.
p = [0] * (N + 1)
for i, int_i in enumerate(intervals, start=1):
# Search up to the ith interval.
for j, int_j in enumerate(intervals, start=1):
if j >= i:
break
# Check that int_i and int_j are compatible.
if min(int_i[1], int_j[1]) - max(int_i[0], int_j[0]) < 1:
p[i] = j
There are a few interesting things going in this code:
- We use
enumerate(intervals, start=1)
, which will iterate over the list of intervals and give us the index of each item starting from 1. For our specific case, it will give us(1, [0, 3, 3]), (2, [1, 4, 2]), ...
. - For the inner for-loop, we only need to search up to interval $i$, since we know that interval $i$ overlaps with itself.
- If the intervals $i$ and $j$ are compatible, we update $p[i] = j$.
Question: We use min(int_i[1], int_j[1]) - max(int_i[0], int_j[0])
to check if interval $i$ and $j$ are compatible with each other. Why does this work? See if you can figure it out.
Recall that int_i[0]
and int_i[1]
refers to the start time and finish time of interval $i$, respectively.
Now we are ready to implement the dynamic program, which is just the optimal substructure that we derived earlier.
OPT[0] = 0
for i, int_i in enumerate(intervals, start=1):
OPT[i] = max(int_i[2] + OPT[p[i]], OPT[i - 1])
print(OPT)
[0, 3, 3, 4, 4, 5, 8, 8, 8]
Visualization with dpvis
¶
To visualize the dynamic program, replace the python lists with our custom DPArray
object.
from dp import DPArray
from dp._visualizer import display
N = len(intervals)
# Sort intervals by finish time.
intervals = sorted(intervals, key=lambda x: x[1])
# OPT[i] = value of the optimal solution consisting of intervals 1,...,i
OPT = DPArray(N + 1, array_name="Weighted Interval Scheduling", dtype=int)
# Compute p[i] = largest index j < i s.t. interval i is compatible with j.
p = [0] * (N + 1)
for i, int_i in enumerate(intervals, start=1):
# Search up to the ith interval.
for j, int_j in enumerate(intervals[:i], start=1):
# Check that int_i and int_j are compatible.
if min(int_i[1], int_j[1]) - max(int_i[0], int_j[0]) < 1:
p[i] = j
# Base Case.
OPT[0] = 0
for i, int_i in enumerate(intervals, start=1):
OPT[i] = max(int_i[2] + OPT[p[i]], OPT[i - 1])
/home/dawei/.cache/pypoetry/virtualenvs/dpvis-UhAVmDkf-py3.11/lib/python3.11/site-packages/numpy/core/numeric.py:330: RuntimeWarning: invalid value encountered in cast multiarray.copyto(a, fill_value, casting='unsafe')
Optionally, you may add additional information to the visualization. For example, the column labels
column_labels = [f"{i} intervals" for i in range(N + 1)]
To render the visualization, use the display
function.
from dp._visualizer import display
display(OPT)
After executing the code above, this is what you will see when you go to https://127.0.0.1:8050.
- On the top of the page is a slider to control what timestep is being visualized.
- The slider can be used to show different timesteps by clicking and dragging or using the PLAY and STOP buttons.
- Below the slider is a visualization presenting the elements of the array on the current timestep. The 0th timestep shows the base case.
- The visualization shows that the zeroth element of the array is set to zero, which corresponds to
OPT[0] = 0
in our code.
Try dragging the slider to timestep 5.
Now the visual shows the array with the first six elements set. On this timestep, elements two and four of the OPT
array are READ, meaning we accessed the values of those elements on this timestep. We also WRITE a value of 5
to element five of the OPT
array. This corresponds to line 37 in our code when i=5
:
# a[2] is the weight of interval i
max(a[2] + OPT[p[i]], OPT[i - 1])