scheduling/generate_rules.py

293 lines
11 KiB
Python
Raw Normal View History

import json
from math import floor
import subprocess
from collections import defaultdict
# Maximum time for AK: 2 Timeslots
SLOT_LEN = 120
DAYS = 3
START = 480
END = 1200
class WorkGroup:
def __init__(self, id, interest, track, projector, reso, length, leaders, constraints):
self.id = id # must be int
self.interest = interest
self.track = track if track is not None else 'none' # haben alle aks einen track?
self.projector = 'true' if projector else 'false'
self.reso = 'true' if reso else 'false'
self.length = floor((length-1) / SLOT_LEN) + 1
if self.length > 2:
print(f"Warning, length of workgroup {id} greater than 2.")
self.length = 2
self.leaders = leaders
self.constrainted_time = [{'day': 1, 'time': 840, 'type': 'before'}] if reso else []
self.not_on_day = [0, 1, 2] if len(list(filter(lambda c: c['type'] == 'OnlyOnDay', constraints))) else []
if reso and not self.not_on_day:
self.not_on_day.append(2)
self.only_on_day = []
for constraint in constraints:
if constraint["type"] == 'OnlyAfterTime':
if constraint['workGroup'] is None:
self.constrainted_time.append(
{'day': constraint['day'], 'time': constraint['time'], 'type': 'after'})
elif constraint["type"] == 'OnlyBeforeTime':
self.constrainted_time.append({'day': constraint['day'], 'time': constraint['time'], 'type': 'before'})
elif constraint['type'] == 'NotOnDay':
self.not_on_day.append(constraint['day'])
elif constraint['type'] == 'OnlyOnDay':
if constraint['day'] == 2 and reso:
print(f'WorkGroup {id}: only on day two, but has reso. Warning, is UNSATISFIABLE!')
self.not_on_day.remove(constraint['day'])
# TODO: more unsat warning when reso == True
def __str__(self):
leader_rules = ''
for leader in self.leaders:
if leader.strip() != '':
leader_rules += f'leader({self.id}, "{leader}").\n'
return f'ak({self.id}, {self.interest}, "{self.track}", {self.projector}, {self.reso}, {self.length}).\n' + leader_rules
class Room:
def __init__(self, id, n_places, projector, is_pool): # TODO internet, whiteboard & co.
self.id = id # must be int
self.n_places = n_places
self.projector = 'true' if projector else 'false'
self.is_pool = is_pool
def __str__(self):
pool_rule = ''
if self.is_pool:
pool_rule = f'\npool({self.id}).'
return f'room({self.id}, {self.n_places}, {self.projector}).{pool_rule}'
class Timeslot:
def __init__(self, order, timeslot):
self.begin, self.end, self.day = timeslot
self.id = f'{self.day}#{self.begin}#{self.end}'
self.order = order
# self.begin = begin
# self.end = end
# self.day = day
def __str__(self):
return f'timeslot("{self.id}", {self.order}, {self.begin}, {self.end}, {self.day}).'
class Schedule:
def __init__(self, id, workgroup, day, time, room):
self.id = id
self.workgroup = workgroup
self.room = room
self.day = day
self.time = time
def to_dict(self):
return {
'id': self.id,
'workGroup': self.workgroup,
'room': self.room,
'time': self.time,
'day': self.day,
'lockRoom': False,
'lockTime': False,
}
def parse_workgroups(data):
workgroups = []
id_to_workgroup = {}
for i, workgroup in enumerate(data['workGroups']):
workgroups.append(
WorkGroup(workgroup['id'], workgroup['interested'], workgroup['track'], workgroup['projector'],
workgroup['resolution'], workgroup['length'], workgroup['leader'], workgroup['constraints']))
id_to_workgroup[workgroup['id']] = i
return workgroups, id_to_workgroup
def parse_rooms(data):
rooms = []
id_to_room = {}
for i, room in enumerate(data['rooms']):
rooms.append(Room(room['id'], room['places'], room['projector'], room['pool'])) # TODO internet, whiteboard & co.
id_to_room[room['id']] = i
return rooms, id_to_room
#######################
# READ BACKUP JSON #
#######################
data = json.load(open('backup.json'))
workgroups, id_to_workgroup = parse_workgroups(data)
rooms, id_to_room = parse_rooms(data)
rules = """
%LENGTH = { schedule(AK,TIMESLOT,ROOM): timeslot(TIMESLOT,_,_,_,_), room(ROOM,_,_) } :- ak(AK,_,_,_,_,LENGTH).
LENGTH = { schedule(AK,TIMESLOT,ROOM): timeslot(TIMESLOT,_,_,_,_), room(ROOM,_,_) } :- ak(AK,_,_,_,_,LENGTH).
% forbid 2AKS in the same room at the same time
:- schedule(AK1,TIMESLOT,ROOM), schedule(AK2,TIMESLOT,ROOM), AK1 != AK2.
% forbid ak with beamer request in room without beamer
:- schedule(AK,_,ROOM), ak(AK,_,_,true,_,_), room(ROOM,_,false).
% forbid ak in room that is too small
:- schedule(AK,_,ROOM), ak(AK,INTEREST,_,_,_,_), room(ROOM,PLACES,_), PLACES<INTEREST.
% forbid two AKs with same leader at same timestep
:- schedule(AK1,TIMESTEP,_), schedule(AK2,TIMESTEP,_), leader(AK1,LEADER), leader(AK2,LEADER), AK1 != AK2.
% schedules of longer aks should be in the same room and in consecutive timeslots
schedule(AK,TIMESLOT2,ROOM) :- ak(AK,_,_,_,_,2), schedule(AK,TIMESLOT1,ROOM), timeslot(TIMESLOT1,ORDER1,_,_,DAY), timeslot(TIMESLOT2,ORDER2,_,_,DAY), timeslot(TIMESLOT3,ORDER3,_,_,_), ORDER2 != ORDER1+1, ORDER1 = ORDER3+1, not schedule(AK,TIMESLOT3,ROOM) .
"""
lengths = defaultdict(int)
for workgroup in workgroups:
lengths[workgroup.length] += 1
rules += str(workgroup) + '\n'
print(lengths, lengths[1] + lengths[2]*2)
for room in rooms:
rules += str(room) + '\n'
timeslots = [(start, start + SLOT_LEN, day) for day in range(DAYS) for start in range(START, END, SLOT_LEN)]
del timeslots[0] # remove slot thursday morning
if DAYS >= 3: del timeslots[-1] # remove last slot for Abschlussplenum
print(f'Number of Workgroups: {len(workgroups)}, Number of Rooms: {len(rooms)}, Number of Timeslots: {len(timeslots)}')
for order, timeslot in enumerate(timeslots):
if order == 8:
continue # remove slot for Resovorstellung, keep order to avoid two-slot-workgroups to be scheduled before and after Resovorstellung.
rules += str(Timeslot(order, timeslot)) + '\n'
############################
# ADD MORE CONSTRAINTS #
############################
def time_to_id(timeslots, day, time):
timeslots = list(filter(lambda ts: time >= ts[1][0] and time < ts[1][1] and day == ts[1][2], enumerate(timeslots)))
return timeslots[0][0]
def get_last_on_day(day):
if day == 0:
return 4
else:
return (day + 1) * 5
def get_first_on_day(day):
if day == 3:
return 15
else:
return (day) * 5
for workgroup in workgroups:
if workgroup.constrainted_time:
for constrainted_time in workgroup.constrainted_time:
# TODO constraint for 2 slot workgroups missing!
timeslot_id = time_to_id(timeslots, constrainted_time['day'], constrainted_time['time'])
timeslot = timeslots[timeslot_id] # after this slot: ok
operator1 = '<' if constrainted_time['type'] == 'after' else '>'
operator2 = '>' if constrainted_time['type'] == 'after' else '<'
day_restriction = get_last_on_day(constrainted_time['day']) + 1 if constrainted_time[
'type'] == 'before' else get_first_on_day(
constrainted_time['day']) - 1 # first or last slot on this day
timeslot2 = timeslot_id if constrainted_time['type'] == 'before' else timeslot_id
print(
f':- schedule({workgroup.id}, TIMESLOT, _), timeslot(TIMESLOT, ORDER, _, _, _), ORDER {operator1} {timeslot2}, ORDER {operator2} {day_restriction}.')
rules += f':- schedule({workgroup.id}, TIMESLOT, _), timeslot(TIMESLOT, ORDER, _, _, _), ORDER {operator1} {timeslot2}, ORDER {operator2} {day_restriction}.\n'
if workgroup.not_on_day:
for day in workgroup.not_on_day:
first_timeslot = get_first_on_day(day)
last_timeslot = get_last_on_day(day)
print(
f':- schedule({workgroup.id}, TIMESLOT, _), timeslot(TIMESLOT, ORDER, _, _, _), ORDER >= {first_timeslot}, ORDER <= {last_timeslot}.')
rules += f':- schedule({workgroup.id}, TIMESLOT, _), timeslot(TIMESLOT, ORDER, _, _, _), ORDER >= {first_timeslot}, ORDER <= {last_timeslot}.\n'
rules += "pool_schedule(ROOM) :- schedule(_,_,ROOM), pool(ROOM).\n"\
"poolpen(T) :- T = {pool_schedule(_)}.\n"\
"#minimize { T : poolpen(T) }.\n" \
"#show schedule/3."
#######################
# GENERATE SCHEDULE #
#######################
output_file_name = "generated_rules.pl"
open(output_file_name, 'w').write(rules)
# create subprocess definition
process = ['clingo', output_file_name, '--outf=2']
# run subprocess, not sure, if busy-waiting..
completed_process = subprocess.run(process, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
output = completed_process.stdout
#######################
# OUTPUT SCHEDULE #
#######################
if 'UNSATISFIABLE' in output:
print('UNSATISFIABLE')
print(output)
else:
print('SATISFIABLE, outputting schedule...')
import re
output = json.loads(output)
schedules = []
workgroup_to_schedule = defaultdict(list)
for i, schedule in enumerate(output['Call'][0]['Witnesses'][-1]['Value']):
match = re.match('schedule\((.+),"(.+)#(.+)#.+",(.+)\)', schedule)
try:
workgroup_id = id_to_workgroup[int(match.group(1))]
room_id = id_to_room[int(match.group(4))]
schedules.append(
Schedule(i, data['workGroups'][workgroup_id], match.group(2), match.group(3), data['rooms'][room_id]))
# {
# 'id': i,
# 'workGroup': data['workGroups'][workgroup_id],
# 'room': data['rooms'][room_id],
# 'time': match.group(3),
# 'day': match.group(2),
# 'lockRoom': False,
# 'lockTime': False,
# })
workgroup_to_schedule[workgroup_id].append(i)
except Exception as e:
print(e)
schedules_filtered = []
for workgroup_id in workgroup_to_schedule:
schedules_for_workgroup = workgroup_to_schedule[workgroup_id]
if len(schedules_for_workgroup) == 1:
schedules_filtered.append(schedules[schedules_for_workgroup[0]])
else:
# more than one schedule contains workgroup, because workgroup is 2 slots long
# find out which one is the first one and remove the second workgroup:
schedules_for_workgroup_0 = schedules[schedules_for_workgroup[0]]
schedules_for_workgroup_1 = schedules[schedules_for_workgroup[1]]
if schedules_for_workgroup_0.time < schedules_for_workgroup_1.time:
schedules_filtered.append(schedules_for_workgroup_0)
else:
schedules_filtered.append(schedules_for_workgroup_1)
data = {}
data['schedules'] = [s.to_dict() for s in schedules_filtered]
json.dump(data, open('output.json', 'w'))