编辑
2025-11-07
记录知识
0

基于此代码, 我会逐渐给你需求,请你给我实现,实现后必须满足能够在simpy环境下运行。

# scheduler_framework.py # Enhanced SimPy-based scheduling framework with standard interfaces: wakeup, task control, dump, online/offline, dynamic add/remove. # This is a complete, runnable example. To run it: # 1. Install SimPy: pip install simpy # 2. Run: python scheduler_framework.py # Changes: # - Ready queue now holds Tasks instead of Devices for task-centric scheduling. # - Enqueue/dequeue operate on Tasks. # - task_exec enqueues Task without immediate assignment; assignment happens in schedule when selecting Task. # - Added fields to Task: assigned_device, blocked, enqueue_time, mlfq_level, this_slice, vruntime, pass_value, deficit, tickets, weight, temp_priority_boost, completion_event. # - Moved strategy-specific fields from Device to Task. # - Device no longer holds weight or strategy fields; uses hz only for expected_time. # - select_task() selects ready (non-blocked) Task, then finds best idle Device for assignment. # - Preemption interfaces adapted: interrupt/pause on device find Task, enqueue Task, update device state. # - Restore/resume use last_preempted_task/last_paused_task on Device. # - Added self.all_tasks: Dict[int, Task] for lookup. # - get_assigned_task(device_id) to find Task for Device. # - get_dev_state returns DeviceInfo with taskinfo from current or assigned. # - dump_ready_queue prints Task details including assigned_device. # - Ticker uses BASE_SLICE for simplicity. # - Demo updated prints for DeviceInfo. # - All strategies adapted to Task-based selection/sorting/updates. # - Dependencies triggered via task_completed(task), using task.assigned_device for dep lookup. # New for Demand 1: # - Introduced SchedulerDevice class to run the scheduling framework itself on a dedicated device. # - SchedulerDevice has hz attribute (default 1000.0 Hz for simulation consistency). # - In SchedulerDevice's run_task (scheduling loop), each call to scheduler.schedule() is timed: # - Records start/end simulation time for the schedule() invocation. # - Computes "runtime" as delta_time * hz (simulated clock cycles consumed). # - Prints the timing info per invocation. # - Scheduler now references scheduler_device, and cpu_loop is moved to SchedulerDevice's process. # - In run_simulation, create SchedulerDevice and assign to scheduler. import simpy import time import random import math from enum import Enum from typing import List, Dict, Optional, Callable, Any, Tuple, Union from dataclasses import dataclass, field from abc import ABC, abstractmethod random.seed(42) # For reproducibility # SimPy Wrapper for Decoupling class SimPyWrapper: def __init__(self, env: simpy.Environment): self._env = env def now(self) -> float: return self._env.now def timeout(self, delay: float): return self._env.timeout(delay) def event(self): return self._env.event() def process(self, generator): return self._env.process(generator) def run(self, until: Optional[float] = None): return self._env.run(until=until) # Enums class DeviceState(Enum): IDLE = "idle" RUNNING = "running" READY = "ready" BLOCKED = "blocked" class SchedulingStrategy(Enum): FCFS = "fcfs" # First Come First Serve SJF = "sjf" # Shortest Job First PRIORITY = "priority" # Static Priority EDF = "edf" # Earliest Deadline First WRR = "wrr" # Weighted Round Robin DWRR = "dwrr" # Deficit Weighted Round Robin LFS = "lfs" # Lottery Fair Share STRIDE = "stride" # Stride Scheduling STCF = "stcf" # Shortest Time to Completion First CFS = "cfs" # Completely Fair Scheduler MLFQ = "mlfq" # Multi-Level Feedback Queue class PreemptionStrategy(Enum): TIME = "time" # Fixed time slice LOAD = "load" # Slice inversely proportional to load_weight (high load -> shorter slice) TASK_COUNT = "task_count" # Slice inversely proportional to ready_queue length (more tasks -> shorter slice) # Dataclasses @dataclass class Task: id: int duration: float remaining_duration: float = field(default=None) # Synced with Device.remaining_duration priority: Optional[int] = None # For priority scheduling deadline: Optional[float] = None # For EDF release_time: float = 0.0 nr_clk: int = 1000 # Required clock cycles from application period: float = 10.0 # Period for Rate Monotonic tie-breaker func_id: str = "default" # Function ID for device group strategy: Optional[SchedulingStrategy] = None # Strategy for multi-queue preempt: bool = False # True: non-preemptible (cannot be interrupted) dev_list: Optional[List[str]] = None # Restrict to these device IDs; None = any in group assigned_device: Optional[str] = None blocked: bool = False enqueue_time: float = field(default=0.0) mlfq_level: int = 0 this_slice: float = field(default=float('inf')) vruntime: float = 0.0 pass_value: float = 0.0 deficit: float = 0.0 tickets: int = 100 weight: float = 1.0 temp_priority_boost: Optional[float] = None completion_event: Optional[simpy.Event] = None def __post_init__(self): if self.remaining_duration is None: self.remaining_duration = self.duration self.tickets = int(self.weight * 100) @dataclass class DeviceInfo: state: DeviceState taskinfo: Optional[Task] hz: float preempted: int # Net preemption count # Constants BASE_SLICE = 1.0 # Base quantum for all strategies PELT_WINDOW = 32.0 # PELT window size, similar to Linux (scaled to sim units) LOAD_WEIGHT_SCALE = 1024 # Scale for load weights, like Linux LOAD_BALANCE_FACTOR = 0.1 # Factor to weigh load in priority (adjustable) MLFQ_LEVELS = 3 # Number of MLFQ levels (0: high prio, short slice; 2: low prio, long slice) MLFQ_SLICES = [1.0, 2.0, 4.0] # Time slices per level # Device Group for same-function devices class DeviceGroup: def __init__(self, func_id: str, devices: List['Device'], task_generator: Callable[[], Task]): self.func_id = func_id self.devices = devices self.task_generator = task_generator # Device Class class Device: def __init__(self, sim_env: SimPyWrapper, device_id: str, scheduler: 'Scheduler', hz: float = 1000.0): self.sim_env = sim_env self.id = device_id self.state = DeviceState.IDLE self.current_task: Optional[Task] = None self.remaining_duration: float = 0.0 self.scheduler = scheduler self.process: Optional[simpy.Process] = None # Keep SimPy Process for interrupt self.hz = hz # Device clock frequency (e.g., ticks per second) self.start_time: Optional[float] = None # Track start time for actual runtime # PELT-like load tracking self.runnable_load = 0.0 # Runnable contribution self.blocked_load = 0.0 # Blocked/idle contribution self.load_weight = LOAD_WEIGHT_SCALE # Current load weight self.last_update = sim_env.now() self.total_util = 0.0 # Total utilization for load calc self.online = False # Track online status # Preemption status tracking self.preempted: bool = False # True if currently preempted (paused or interrupted) self.preempted_count: int = 0 # Net preemption count: +1 on preempt, -1 on restore (min 0) # For restore/pause tracking self.last_preempted_task: Optional[Task] = None self.last_paused_task: Optional[Task] = None def update_pelt(self): """Update PELT load tracking (exponential decay, EMA).""" now = self.sim_env.now() delta = now - self.last_update if delta == 0: return # Accurate decay: e^(-delta / PELT_WINDOW) decay = math.exp(-delta / PELT_WINDOW) self.runnable_load *= decay self.blocked_load *= decay # Add contribution based on state if self.state == DeviceState.RUNNING: self.runnable_load += delta elif self.state in [DeviceState.IDLE, DeviceState.BLOCKED]: self.blocked_load += delta # Total load signal self.total_util = self.runnable_load + self.blocked_load # Load weight: scale runnable_load util to [0, LOAD_WEIGHT_SCALE] util = self.runnable_load / (self.runnable_load + self.blocked_load + 1e-6) self.load_weight = int(util * LOAD_WEIGHT_SCALE) self.last_update = now def assign_task(self, task: Task): """Assign task to this device.""" self.current_task = task task.assigned_device = self.id self.remaining_duration = task.remaining_duration task.remaining_duration = self.remaining_duration if task.completion_event is None: task.completion_event = self.sim_env.event() task.mlfq_level = 0 task.temp_priority_boost = None self.preempted = False self.preempted_count = 0 self.state = DeviceState.READY expected_time = self.hz * task.nr_clk print(f"[{self.sim_env.now():.2f}] Device {self.id} assigned task {task.id} for func '{task.func_id}' (duration: {self.remaining_duration:.2f}, expected: {expected_time:.2f}, period: {task.period:.2f}, load_weight: {self.load_weight}, preempt: {task.preempt}, dev_list: {task.dev_list})") def run_task(self): """Run the current task with preemption support and timeout check.""" if not self.current_task or self.remaining_duration <= 0: self.state = DeviceState.IDLE self.preempted = False self.update_pelt() return self.start_time = self.sim_env.now() # Record start time for the entire task self.update_pelt() # Initial update while self.remaining_duration > 0: start_slice = self.sim_env.now() # Dynamic slice based on strategy; for MLFQ, override with level-specific slice if self.scheduler.strategy == SchedulingStrategy.MLFQ: slice_time = MLFQ_SLICES[self.current_task.mlfq_level] else: slice_time = self.scheduler.get_dynamic_slice(self) slice_time = min(slice_time, self.remaining_duration, self.current_task.this_slice) self.current_task.this_slice = float('inf') # Reset after use try: yield self.sim_env.timeout(slice_time) run_time = self.sim_env.now() - start_slice self.remaining_duration -= run_time if self.remaining_duration < 1e-6: self.remaining_duration = 0.0 # Sync to task self.current_task.remaining_duration = self.remaining_duration # Update stride/CFS vruntime if applicable if hasattr(self.scheduler, 'fair_enabled') and self.scheduler.fair_enabled: self.current_task.vruntime += run_time / self.current_task.weight # Fair share update # Update stride pass if applicable if hasattr(self.scheduler, 'stride_enabled') and self.scheduler.stride_enabled: self.current_task.pass_value += run_time * (1.0 / self.current_task.weight) # Stride update self.update_pelt() # Update after run # For MLFQ: If slice completed (not interrupt), demote level if not lowest if self.scheduler.strategy == SchedulingStrategy.MLFQ and self.current_task.mlfq_level < MLFQ_LEVELS - 1: self.current_task.mlfq_level += 1 print(f"[{self.sim_env.now():.2f}] MLFQ: Demoting task {self.current_task.id} to level {self.current_task.mlfq_level}") if self.remaining_duration <= 0: actual_time = self.sim_env.now() - self.start_time expected_time = self.hz * self.current_task.nr_clk print(f"[{self.sim_env.now():.2f}] Device {self.id} completed task {self.current_task.id} (actual: {actual_time:.2f}, expected: {expected_time:.2f})") if actual_time > expected_time: print(f"[{self.sim_env.now():.2f}] WARNING: 硬件运行超时 for Device {self.id} (actual {actual_time:.2f} > expected {expected_time:.2f})") self.scheduler.on_timeout(self) # Notify scheduler # Notify scheduler and trigger completion event if self.current_task.completion_event: self.current_task.completion_event.succeed() self.scheduler.task_completed(self.current_task) self.current_task = None self.remaining_duration = 0.0 self.start_time = None self.state = DeviceState.IDLE self.preempted = False self.preempted_count = 0 return except simpy.Interrupt: # Ignore interrupt if task is non-preemptible if self.current_task and not self.current_task.preempt: print(f"[{self.sim_env.now():.2f}] Interrupt ignored for non-preemptible task {self.current_task.id} on {self.id}") continue # Continue running without handling interrupt run_time = self.sim_env.now() - start_slice self.remaining_duration -= run_time self.current_task.remaining_duration = self.remaining_duration # Sync on interrupt self.preempted = True # Set preempted on interrupt self.preempted_count += 1 # Increment count on interrupt if self.remaining_duration < 1e-6: self.remaining_duration = 0.0 self.current_task.remaining_duration = 0.0 # Update stride/CFS vruntime if applicable if hasattr(self.scheduler, 'fair_enabled') and self.scheduler.fair_enabled: self.current_task.vruntime += run_time / self.current_task.weight # Fair share update # Update stride pass if applicable if hasattr(self.scheduler, 'stride_enabled') and self.scheduler.stride_enabled: self.current_task.pass_value += run_time * (1.0 / self.current_task.weight) # Stride update self.update_pelt() # Update after partial run # For MLFQ: No demotion on preemption if self.remaining_duration <= 0: actual_time = self.sim_env.now() - self.start_time expected_time = self.hz * self.current_task.nr_clk print(f"[{self.sim_env.now():.2f}] Device {self.id} completed task {self.current_task.id} (actual: {actual_time:.2f}, expected: {expected_time:.2f})") if actual_time > expected_time: print(f"[{self.sim_env.now():.2f}] WARNING: 硬件运行超时 for Device {self.id} (actual {actual_time:.2f} > expected {expected_time:.2f})") self.scheduler.on_timeout(self) if self.current_task.completion_event: self.current_task.completion_event.succeed() self.scheduler.task_completed(self.current_task) self.current_task = None self.remaining_duration = 0.0 self.start_time = None self.state = DeviceState.IDLE self.preempted = False self.preempted_count = 0 else: self.scheduler.enqueue(self.current_task) self.state = DeviceState.READY return # SchedulerDevice: Dedicated device to run the scheduling framework itself class SchedulerDevice(Device): def __init__(self, sim_env: SimPyWrapper, scheduler: 'Scheduler', hz: float = 1000.0): super().__init__(sim_env, "SchedulerDev", scheduler, hz) self.scheduler = scheduler # Direct reference for convenience self.scheduler.scheduler_device = self # Back-reference to Scheduler def run_task(self): """Run the scheduling loop on this device, timing each schedule() call.""" # No "task" for scheduler device; it runs indefinitely self.state = DeviceState.RUNNING self.start_time = self.sim_env.now() while True: if not self.scheduler.active: print(f"[{self.sim_env.now():.2f}] SchedulerDevice inactive. Yielding...") yield self.sim_env.timeout(1.0) continue # Time the schedule() call schedule_start = self.sim_env.now() current_proc = self.scheduler.schedule() schedule_end = self.sim_env.now() delta_time = schedule_end - schedule_start cycles_consumed = delta_time * self.hz # Simulated clock cycles for this schedule invocation print(f"[{schedule_start:.2f}] SchedulerDevice: schedule() runtime = {delta_time:.4f}s ({cycles_consumed:.2f} cycles @ {self.hz} Hz)") if current_proc is None: print(f"[{self.sim_env.now():.2f}] SchedulerDevice: No ready tasks. Yielding...") yield self.sim_env.timeout(1.0) continue try: yield current_proc except Exception as e: print(f"[{self.sim_env.now():.2f}] SchedulerDevice: Error in process: {e}") finally: self.scheduler.current_running = None self.scheduler.current_process = None self.update_pelt() # Update load after yielding # Abstract Scheduler class Scheduler(ABC): def __init__(self, sim_env: SimPyWrapper, strategy: SchedulingStrategy, preemption_strategy: PreemptionStrategy = PreemptionStrategy.TIME, multi_strategy: bool = False): self.sim_env = sim_env self.strategy = strategy self.preemption_strategy = preemption_strategy self.multi_strategy = multi_strategy self.use_multi = multi_strategy if self.use_multi: self.strategy_queues: Dict[SchedulingStrategy, List[Task]] = {} self.priority_order: List[SchedulingStrategy] = [SchedulingStrategy.EDF, SchedulingStrategy.PRIORITY, SchedulingStrategy.CFS] else: self.strategy_queues = None self.priority_order = [] self.devices: Dict[str, Device] = {} # device_id -> Device for quick lookup self.all_tasks: Dict[int, Task] = {} # task_id -> Task for lookup self.groups: Dict[str, DeviceGroup] = {} # func_id -> DeviceGroup self.ready_queue: List[Task] = [] # For non-MLFQ, non-multi self.mlfq_queues: List[List[Task]] = [[] for _ in range(MLFQ_LEVELS)] if strategy == SchedulingStrategy.MLFQ else None # For MLFQ: list of queues self.active: bool = True self.current_running: Optional[Device] = None self.current_process: Optional[simpy.Process] = None # Keep SimPy Process self.scheduler_device: Optional[SchedulerDevice] = None # Reference to the device running the scheduler self.app_state_callback: Optional[Callable[[Dict], None]] = None self.rr_index: int = 0 # For WRR/DWRR rotation self.dependencies: Dict[str, List[str]] = {} # Device ID -> list of dependent device IDs self.stride_enabled = strategy == SchedulingStrategy.STRIDE self.fair_enabled = strategy == SchedulingStrategy.CFS def get_compute_priority_for_strategy(self, strat: SchedulingStrategy, task: Task) -> float: """Compute priority for a given strategy (smaller = higher).""" now = self.sim_env.now() if strat == SchedulingStrategy.PRIORITY: return task.priority if task.priority is not None else 999 elif strat == SchedulingStrategy.EDF: return (task.deadline - now) if task.deadline else float('inf') elif strat == SchedulingStrategy.CFS: return task.vruntime elif strat == SchedulingStrategy.STRIDE: return task.pass_value elif strat == SchedulingStrategy.FCFS: return task.enqueue_time elif strat in [SchedulingStrategy.SJF, SchedulingStrategy.STCF]: return task.remaining_duration else: # Stub for WRR/DWRR/LFS/MLFQ return 0.0 def get_sort_key_for_strategy(self, strat: SchedulingStrategy) -> Callable[[Task], Tuple[float, int, float]]: """Get sort key for a strategy.""" def compute_prio(t: Task) -> float: return self.get_compute_priority_for_strategy(strat, t) def key(t: Task) -> Tuple[float, int, float]: prio = compute_prio(t) # Boost for restored tasks if t.temp_priority_boost: prio = min(prio, t.temp_priority_boost) load_w = self.get_load_for_task(t) period = t.period return (prio, load_w, period) return key def get_load_for_task(self, task: Task) -> int: """Get lowest load_weight of available devices for the task.""" group = self.groups.get(task.func_id) if not group: return LOAD_WEIGHT_SCALE candidates = [d for d in group.devices if d.online and d.state == DeviceState.IDLE] if task.dev_list: candidates = [d for d in candidates if d.id in task.dev_list] if not candidates: return LOAD_WEIGHT_SCALE return min(d.load_weight for d in candidates) def get_dynamic_slice(self, device: Device) -> float: """Compute dynamic slice based on preemption strategy.""" if self.preemption_strategy == PreemptionStrategy.TIME: return BASE_SLICE elif self.preemption_strategy == PreemptionStrategy.LOAD: # Inverse to load: high load -> shorter slice (fairness) load_factor = max(0.1, (device.load_weight / LOAD_WEIGHT_SCALE)) return BASE_SLICE / load_factor elif self.preemption_strategy == PreemptionStrategy.TASK_COUNT: # Inverse to ready queue length: more tasks -> shorter slice (responsiveness) if self.use_multi: queue_len = sum(len([t for t in q if not t.blocked]) for q in self.strategy_queues.values()) elif self.mlfq_queues is not None: queue_len = sum(len([t for t in q if not t.blocked]) for q in self.mlfq_queues) else: queue_len = sum(1 for t in self.ready_queue if not t.blocked) return max(0.1, BASE_SLICE / max(1, queue_len)) return BASE_SLICE def add_device_group(self, group: DeviceGroup): """Add a group of same-function devices.""" self.groups[group.func_id] = group for dev in group.devices: self.add_device_dynamic(dev) def task_exec(self, task: Task): """User layer interface: Dispatch a pre-created Task to scheduler. Enqueues Task to ready queue.""" self.all_tasks[task.id] = task task.release_time = self.sim_env.now() self.enqueue(task) print(f"[{self.sim_env.now():.2f}] Task {task.id} for func '{task.func_id}' executed via task_exec") return task # Deprecated: Kept for backward compat, but demos now use task_exec def request_task_by_function(self, func_id: str): """Application requests task for function; enqueue generated task.""" group = self.groups.get(func_id) if not group: print(f"[{self.sim_env.now():.2f}] No group for func '{func_id}'") return None # Generate task task = group.task_generator() task.func_id = func_id self.task_exec(task) print(f"[{self.sim_env.now():.2f}] Task for func '{func_id}' enqueued") return task def add_device(self, device: Device): self.devices[device.id] = device self.dependencies[device.id] = [] # Default no dependencies device.online = True def add_device_dynamic(self, device: Device): """Dynamic add device at runtime.""" if device.id in self.devices: print(f"[{self.sim_env.now():.2f}] Device {device.id} already exists") return self.add_device(device) print(f"[{self.sim_env.now():.2f}] Dynamically added device {device.id}") def remove_device_dynamic(self, device_id: str): """Dynamic remove device at runtime.""" if device_id not in self.devices: print(f"[{self.sim_env.now():.2f}] Device {device_id} not found") return dev = self.devices[device_id] # Stop running process if active and preemptible if dev.process and dev.process.is_alive and (not dev.current_task or dev.current_task.preempt): dev.process.interrupt() if dev.current_task: self.enqueue(dev.current_task) # Remove dependencies del self.dependencies[device_id] del self.devices[device_id] dev.online = False print(f"[{self.sim_env.now():.2f}] Dynamically removed device {device_id}") def online_device(self, device_id: str): """Bring device online.""" if device_id not in self.devices: print(f"[{self.sim_env.now():.2f}] Device {device_id} not found") return dev = self.devices[device_id] dev.online = True # If has blocked task, wakeup assigned_task = self.get_assigned_task(device_id) if assigned_task and assigned_task.blocked: self.wakeup_device(device_id) print(f"[{self.sim_env.now():.2f}] Device {device_id} brought online") def offline_device(self, device_id: str): """Take device offline.""" if device_id not in self.devices: print(f"[{self.sim_env.now():.2f}] Device {device_id} not found") return dev = self.devices[device_id] # Stop running if preemptible if dev.process and dev.process.is_alive and (not dev.current_task or dev.current_task.preempt): dev.process.interrupt() if dev.current_task: self.enqueue(dev.current_task) dev.online = False dev.state = DeviceState.IDLE print(f"[{self.sim_env.now():.2f}] Device {device_id} taken offline") def wakeup_device(self, device_id: str): """Wake up target device (from BLOCKED/IDLE to READY).""" if device_id not in self.devices: print(f"[{self.sim_env.now():.2f}] Device {device_id} not found") return dev = self.devices[device_id] assigned_task = self.get_assigned_task(device_id) if assigned_task and assigned_task.blocked: assigned_task.blocked = False self.enqueue(assigned_task) print(f"[{self.sim_env.now():.2f}] Woke up task {assigned_task.id} for device {device_id}") dev.state = DeviceState.READY else: print(f"[{self.sim_env.now():.2f}] Cannot wake {device_id}: no blocked task") def pause_task(self, device_id: str): """Pause task on device (to BLOCKED).""" if device_id not in self.devices: print(f"[{self.sim_env.now():.2f}] Device {device_id} not found") return dev = self.devices[device_id] task = dev.current_task if dev.state == DeviceState.RUNNING else self.get_assigned_task(device_id) if not task: print(f"[{self.sim_env.now():.2f}] No task to pause on {device_id}") return # Skip if non-preemptible if not task.preempt: print(f"[{self.sim_env.now():.2f}] Skip pause: non-preemptible task {task.id} on {device_id}") return if dev.state == DeviceState.RUNNING: dev.process.interrupt() task.blocked = True self.dequeue(task) dev.last_paused_task = task dev.state = DeviceState.BLOCKED dev.preempted = True dev.preempted_count += 1 print(f"[{self.sim_env.now():.2f}] Paused task {task.id} on {device_id}") def resume_task(self, device_id: str): """Resume task on device (to READY).""" if device_id not in self.devices: print(f"[{self.sim_env.now():.2f}] Device {device_id} not found") return dev = self.devices[device_id] task = dev.last_paused_task if not task: print(f"[{self.sim_env.now():.2f}] No paused task on {device_id}") return task.blocked = False self.enqueue(task) dev.last_paused_task = None dev.state = DeviceState.READY dev.preempted = False dev.preempted_count = max(0, dev.preempted_count - 1) print(f"[{self.sim_env.now():.2f}] Resumed task {task.id} on {device_id}") def cancel_task(self, device_id: str): """Cancel task on device (clear task, IDLE).""" if device_id not in self.devices: print(f"[{self.sim_env.now():.2f}] Device {device_id} not found") return dev = self.devices[device_id] task = dev.current_task if dev.state == DeviceState.RUNNING else self.get_assigned_task(device_id) if not task: print(f"[{self.sim_env.now():.2f}] No task to cancel on {device_id}") return if dev.state == DeviceState.RUNNING: dev.process.interrupt() self.dequeue(task) task.remaining_duration = 0.0 if task.completion_event: task.completion_event.succeed() if task.id in self.all_tasks: del self.all_tasks[task.id] dev.last_preempted_task = None dev.last_paused_task = None dev.current_task = None dev.state = DeviceState.IDLE dev.preempted = False dev.preempted_count = 0 print(f"[{self.sim_env.now():.2f}] Canceled task {task.id} on {device_id}") def interrupt_device(self, device_id: str) -> Optional[Task]: """Interrupt running device, enqueue Task, set device to READY. Returns the interrupted Task if successful, else None.""" if device_id not in self.devices: print(f"[{self.sim_env.now():.2f}] Device {device_id} not found") return None dev = self.devices[device_id] task = dev.current_task if not task or dev.state != DeviceState.RUNNING: print(f"[{self.sim_env.now():.2f}] Cannot interrupt {device_id}: not running") return None # Skip if non-preemptible if not task.preempt: print(f"[{self.sim_env.now():.2f}] Skip interrupt: non-preemptible task {task.id} on {device_id}") return None dev.process.interrupt() self.enqueue(task) dev.last_preempted_task = task dev.state = DeviceState.READY dev.preempted = True dev.preempted_count += 1 print(f"[{self.sim_env.now():.2f}] Interrupted device {device_id}, task {task.id} enqueued (remaining: {task.remaining_duration:.2f})") return task def restore_device(self, device_id: str): """Restore interrupted task for device by prioritizing it.""" if device_id not in self.devices: print(f"[{self.sim_env.now():.2f}] Device {device_id} not found") return dev = self.devices[device_id] task = dev.last_preempted_task if not task: print(f"[{self.sim_env.now():.2f}] No preempted task on {device_id}") return task.temp_priority_boost = self.sim_env.now() self.enqueue(task) dev.last_preempted_task = None dev.preempted_count = max(0, dev.preempted_count - 1) if dev.preempted_count == 0: dev.preempted = False print(f"[{self.sim_env.now():.2f}] Restored task {task.id} for device {device_id}, prioritized (remaining: {task.remaining_duration:.2f})") def launch_task_at_tail(self, task: Task, preferred_device_id: Optional[str] = None): """Launch a task by enqueuing at the tail of the queue (for low-priority preemption).""" if preferred_device_id: dev = self.devices.get(preferred_device_id) if dev and dev.state == DeviceState.IDLE: dev.assign_task(task) self.task_exec(task) # Enqueues at tail (append) def sync_task_completion(self, task_id: int) -> simpy.Event: """Wait for specified task to complete (returns Event to yield on).""" task = self.all_tasks.get(task_id) if not task: print(f"[{self.sim_env.now():.2f}] Task {task_id} not found for sync") event = self.sim_env.event() event.succeed() # Immediate if not found return event if task.completion_event is None: task.completion_event = self.sim_env.event() print(f"[{self.sim_env.now():.2f}] Syncing on task {task_id} completion") return task.completion_event def get_assigned_task(self, device_id: str) -> Optional[Task]: """Get the assigned task for a device (in queues or paused).""" # Running task dev = self.devices.get(device_id) if dev and dev.current_task: return dev.current_task # Queued tasks if self.use_multi: for q in self.strategy_queues.values(): for t in q: if t.assigned_device == device_id: return t elif self.mlfq_queues is not None: for level in self.mlfq_queues: for t in level: if t.assigned_device == device_id: return t else: for t in self.ready_queue: if t.assigned_device == device_id: return t return None def get_dev_state(self, device_id: str) -> Optional[DeviceInfo]: """Standard query interface: Get DeviceInfo for specified device.""" if device_id not in self.devices: print(f"[{self.sim_env.now():.2f}] Device {device_id} not found for get_dev_state") return None dev = self.devices[device_id] task = dev.current_task if dev.state == DeviceState.RUNNING else self.get_assigned_task(device_id) return DeviceInfo( state=dev.state, taskinfo=task, hz=dev.hz, preempted=dev.preempted_count ) def dump_ready_queue(self): """Dump ready queue tasks (print details).""" print(f"[{self.sim_env.now():.2f}] === Ready Queue Dump ===") if self.use_multi: for strat, q in self.strategy_queues.items(): if q: print(f"{strat.value} queue:") for t in q: assigned = t.assigned_device or 'unassigned' load = self.devices.get(assigned).load_weight if assigned != 'unassigned' and assigned in self.devices else 'N/A' print(f" - Task {t.id} (func: {t.func_id}, remaining: {t.remaining_duration:.2f}, assigned: {assigned}, load: {load})") else: print(f"{strat.value}: Empty") elif self.strategy == SchedulingStrategy.MLFQ: for level in range(MLFQ_LEVELS): q = self.mlfq_queues[level] if q: print(f"Level {level} (slice: {MLFQ_SLICES[level]}):") for t in q: assigned = t.assigned_device or 'unassigned' load = self.devices.get(assigned).load_weight if assigned != 'unassigned' and assigned in self.devices else 'N/A' print(f" - Task {t.id} (func: {t.func_id}, remaining: {t.remaining_duration:.2f}, assigned: {assigned}, load: {load})") else: print(f"Level {level}: Empty") else: if self.ready_queue: for t in self.ready_queue: assigned = t.assigned_device or 'unassigned' load = self.devices.get(assigned).load_weight if assigned != 'unassigned' and assigned in self.devices else 'N/A' print(f" - Task {t.id} (func: {t.func_id}, remaining: {t.remaining_duration:.2f}, assigned: {assigned}, load: {load})") else: print("Ready queue empty") print("=== End Dump ===") def set_dependency(self, from_dev_id: str, to_dev_ids: List[str]): """Set dependency: when from_dev completes task, trigger tasks for to_devs (same func).""" self.dependencies[from_dev_id] = to_dev_ids def task_completed(self, task: Task): """Called when task completes; trigger dependencies via assigned device.""" print(f"[{self.sim_env.now():.2f}] Task {task.id} completed. Triggering dependencies.") if task.id in self.all_tasks: del self.all_tasks[task.id] device_id = task.assigned_device if device_id: dev = self.devices.get(device_id) if dev: dev.state = DeviceState.IDLE dev.current_task = None dev.preempted = False dev.preempted_count = 0 dev.update_pelt() # Final update # Trigger dependent devices func_id = task.func_id group = self.groups.get(func_id) if group: for dep_id in self.dependencies.get(device_id, []): dep_dev = self.devices.get(dep_id) if dep_dev and dep_dev.state == DeviceState.IDLE and dep_dev.online: new_task = group.task_generator() new_task.func_id = func_id self.task_exec(new_task) print(f"[{self.sim_env.now():.2f}] Dependency triggered: Enqueued task for {dep_id}") def enqueue(self, task: Task): """Enqueue a task to the ready queue if not already present and not blocked.""" if task.blocked: return if self.use_multi: strat = task.strategy or self.strategy if strat not in self.strategy_queues: self.strategy_queues[strat] = [] q = self.strategy_queues[strat] if task not in q: q.append(task) if strat == SchedulingStrategy.FCFS: task.enqueue_time = self.sim_env.now() print(f"[{self.sim_env.now():.2f}] Enqueued task {task.id} to {strat.value} queue") elif self.strategy == SchedulingStrategy.MLFQ: level = min(task.mlfq_level, MLFQ_LEVELS - 1) if task not in self.mlfq_queues[level]: self.mlfq_queues[level].append(task) print(f"[{self.sim_env.now():.2f}] Enqueued task {task.id} to MLFQ level {level}") else: if task not in self.ready_queue: self.ready_queue.append(task) task.enqueue_time = self.sim_env.now() print(f"[{self.sim_env.now():.2f}] Enqueued task {task.id}") def dequeue(self, task: Task) -> bool: """Dequeue (remove) a specific task from the ready queue.""" dequeued = False if self.use_multi: for q in self.strategy_queues.values(): if task in q: q.remove(task) dequeued = True print(f"[{self.sim_env.now():.2f}] Dequeued task {task.id} from queue") break elif self.strategy == SchedulingStrategy.MLFQ: for level in range(MLFQ_LEVELS): if task in self.mlfq_queues[level]: self.mlfq_queues[level].remove(task) print(f"[{self.sim_env.now():.2f}] Dequeued task {task.id} from MLFQ level {level}") dequeued = True break else: if task in self.ready_queue: self.ready_queue.remove(task) print(f"[{self.sim_env.now():.2f}] Dequeued task {task.id} from ready queue") dequeued = True return dequeued def deactivate(self): """Deactivate (pause) the ready queue scheduling. Checked: Correctly stops new scheduling without interrupting running tasks.""" self.active = False print(f"[{self.sim_env.now():.2f}] Scheduler deactivated") def reactivate(self): """Reactivate the ready queue scheduling. Checked: Correctly resumes scheduling.""" self.active = True print(f"[{self.sim_env.now():.2f}] Scheduler reactivated") def preempt_current(self): """Preempt the current running process (for time slice or voluntary).""" if self.current_running and self.current_process and self.current_process.is_alive: task = self.current_running.current_task # Skip if non-preemptible if task and not task.preempt: print(f"[{self.sim_env.now():.2f}] Skip preempt_current: non-preemptible task {task.id} on {self.current_running.id}") return self.current_process.interrupt() self.current_running.preempted_count += 1 print(f"[{self.sim_env.now():.2f}] Preempting current process (strategy: {self.preemption_strategy.value})") def on_timeout(self, device: Device): """Hook for handling timeout warnings from device.""" print(f"[{self.sim_env.now():.2f}] Scheduler notified: Timeout on Device {device.id}. Handling... (e.g., reschedule or log)") def get_device_states(self) -> Dict[str, DeviceState]: """Get current states of all devices.""" return {dev.id: dev.state for dev in self.devices.values() if dev.online} def set_app_state_callback(self, callback: Callable[[Dict], None]): """Set callback for application to receive scheduling state.""" self.app_state_callback = callback @abstractmethod def compute_internal_priority(self, task: Task) -> float: """Compute internal priority for the task (smaller = higher priority).""" pass def get_sort_key(self) -> Callable[[Task], Tuple[float, int, float]]: """Return sort key for main strategy: (internal_priority, load_weight, period) - smaller first.""" def key(t: Task) -> Tuple[float, int, float]: prio = self.compute_internal_priority(t) if t.temp_priority_boost: prio = min(prio, t.temp_priority_boost) load_w = self.get_load_for_task(t) period = t.period return (prio, load_w, period) return key def select_task(self) -> Optional[Task]: """Select the next task to run based on unified internal priority + load balance.""" if self.use_multi: # Multi-strategy: Traverse priority_order then other queues for strat_list in [self.priority_order, [s for s in self.strategy_queues if s not in self.priority_order]]: for strat in strat_list: q = self.strategy_queues.get(strat, []) ready_q = [t for t in q if not t.blocked] if ready_q: key_func = self.get_sort_key_for_strategy(strat) ready_q.sort(key=key_func) selected = ready_q[0] q.remove(selected) print(f"[{self.sim_env.now():.2f}] Multi selected task {selected.id} from {strat.value} queue") return selected return None elif self.strategy == SchedulingStrategy.MLFQ: # MLFQ: Select from highest non-empty queue (level 0 first) for level in range(MLFQ_LEVELS): q = self.mlfq_queues[level] ready_q = [t for t in q if not t.blocked] if ready_q: ready_q.sort(key=self.get_sort_key()) selected = ready_q[0] self.mlfq_queues[level].remove(selected) print(f"[{self.sim_env.now():.2f}] MLFQ selected task {selected.id} from level {level}") return selected return None else: ready = [t for t in self.ready_queue if not t.blocked] if not ready: return None ready.sort(key=self.get_sort_key()) selected = ready[0] self.ready_queue.remove(selected) return selected def schedule(self) -> Optional[simpy.Process]: """Dispatch next task to a device (Linux-like schedule()). Returns Process to run or None.""" # Check if queues empty if not self.active: return None has_ready = False if self.use_multi: has_ready = any(any(not t.blocked for t in q) for q in self.strategy_queues.values()) elif self.strategy == SchedulingStrategy.MLFQ: has_ready = any(any(not t.blocked for t in q) for q in self.mlfq_queues) elif self.ready_queue: has_ready = any(not t.blocked for t in self.ready_queue) if not has_ready: return None next_task = self.select_task() if next_task is None: return None # Clear temp_priority_boost after selection if next_task.temp_priority_boost: next_task.temp_priority_boost = None # Find best device for task group = self.groups.get(next_task.func_id) if not group: print(f"[{self.sim_env.now():.2f}] No group for task {next_task.id}, re-enqueuing") self.enqueue(next_task) return None candidates = [d for d in group.devices if d.online and d.state == DeviceState.IDLE] if next_task.dev_list: candidates = [d for d in candidates if d.id in next_task.dev_list] if not candidates: print(f"[{self.sim_env.now():.2f}] No available device for task {next_task.id}, re-enqueuing") self.enqueue(next_task) return None candidates.sort(key=lambda d: d.load_weight) best_dev = candidates[0] # Assign and schedule best_dev.assign_task(next_task) dynamic_slice = self.get_dynamic_slice(best_dev) print(f"[{self.sim_env.now():.2f}] Scheduling task {next_task.id} on {best_dev.id} (internal prio: {self.compute_internal_priority(next_task):.2f}, load_weight: {best_dev.load_weight}, dynamic_slice: {dynamic_slice:.2f} via {self.preemption_strategy.value})") # Set strategy-specific slice if needed (e.g., WRR) if self.strategy == SchedulingStrategy.WRR: next_task.this_slice = BASE_SLICE * next_task.weight best_dev.state = DeviceState.RUNNING self.current_running = best_dev best_dev.process = self.sim_env.process(best_dev.run_task()) self.current_process = best_dev.process # Notify app if self.app_state_callback: self.app_state_callback(self.get_device_states()) return self.current_process # cpu_loop removed; now handled in SchedulerDevice.run_task() # Multi-Strategy Scheduler class MultiScheduler(Scheduler): def __init__(self, sim_env: SimPyWrapper, preemption_strategy: PreemptionStrategy = PreemptionStrategy.TIME): super().__init__(sim_env, SchedulingStrategy.FCFS, preemption_strategy, multi_strategy=True) self.priority_order = [SchedulingStrategy.EDF, SchedulingStrategy.PRIORITY, SchedulingStrategy.CFS] def compute_internal_priority(self, task: Task) -> float: # Fallback for base return 0.0 # Concrete Schedulers class FCFSScheduler(Scheduler): def __init__(self, sim_env: SimPyWrapper, strategy: SchedulingStrategy, preemption_strategy: PreemptionStrategy = PreemptionStrategy.TIME, multi_strategy: bool = False): super().__init__(sim_env, strategy, preemption_strategy, multi_strategy) def compute_internal_priority(self, task: Task) -> float: # FCFS: earlier enqueue_time = higher priority (smaller time first) return task.enqueue_time class SJFScheduler(Scheduler): def __init__(self, sim_env: SimPyWrapper, strategy: SchedulingStrategy, preemption_strategy: PreemptionStrategy = PreemptionStrategy.TIME, multi_strategy: bool = False): super().__init__(sim_env, strategy, preemption_strategy, multi_strategy) def compute_internal_priority(self, task: Task) -> float: # SJF: shorter remaining_duration = higher priority return task.remaining_duration class STCFScheduler(Scheduler): def __init__(self, sim_env: SimPyWrapper, strategy: SchedulingStrategy, preemption_strategy: PreemptionStrategy = PreemptionStrategy.TIME, multi_strategy: bool = False): super().__init__(sim_env, strategy, preemption_strategy, multi_strategy) def compute_internal_priority(self, task: Task) -> float: # STCF: shortest remaining time to completion = higher priority return task.remaining_duration class PriorityScheduler(Scheduler): def __init__(self, sim_env: SimPyWrapper, strategy: SchedulingStrategy, preemption_strategy: PreemptionStrategy = PreemptionStrategy.TIME, multi_strategy: bool = False): super().__init__(sim_env, strategy, preemption_strategy, multi_strategy) def compute_internal_priority(self, task: Task) -> float: # Static Priority: direct task.priority (lower number = higher priority) return task.priority if task.priority is not None else 999 class EDFScheduler(Scheduler): def __init__(self, sim_env: SimPyWrapper, strategy: SchedulingStrategy, preemption_strategy: PreemptionStrategy = PreemptionStrategy.TIME, multi_strategy: bool = False): super().__init__(sim_env, strategy, preemption_strategy, multi_strategy) def compute_internal_priority(self, task: Task) -> float: # EDF: earlier deadline = higher priority (smaller slack) now = self.sim_env.now() return (task.deadline - now) if task.deadline else float('inf') class WRRScheduler(Scheduler): def __init__(self, sim_env: SimPyWrapper, strategy: SchedulingStrategy, preemption_strategy: PreemptionStrategy = PreemptionStrategy.TIME, multi_strategy: bool = False): super().__init__(sim_env, strategy, preemption_strategy, multi_strategy) def compute_internal_priority(self, task: Task) -> float: # Stub for WRR (not used; selection via rotation) return 0.0 def select_task(self) -> Optional[Task]: """WRR: Rotate through ready_queue, select next.""" if self.use_multi: return super().select_task() ready = [t for t in self.ready_queue if not t.blocked] if not ready: return None # Rotate index start_idx = self.rr_index % len(ready) selected = None for i in range(len(ready)): idx = (start_idx + i) % len(ready) candidate = ready[idx] if candidate.remaining_duration > 0: # Eligible selected = candidate self.rr_index = (idx + 1) % len(ready) break if selected: self.ready_queue.remove(selected) print(f"[{self.sim_env.now():.2f}] WRR selected task {selected.id} (weight: {selected.weight})") return selected class DWRRScheduler(Scheduler): def __init__(self, sim_env: SimPyWrapper, strategy: SchedulingStrategy, preemption_strategy: PreemptionStrategy = PreemptionStrategy.TIME, multi_strategy: bool = False): super().__init__(sim_env, strategy, preemption_strategy, multi_strategy) def compute_internal_priority(self, task: Task) -> float: # Stub for DWRR (not used; selection via rotation with deficit) return 0.0 def select_task(self) -> Optional[Task]: """DWRR: Rotate through ready_queue, select if deficit allows.""" if self.use_multi: return super().select_task() ready = [t for t in self.ready_queue if not t.blocked] if not ready: return None quantum = BASE_SLICE # Rotate index start_idx = self.rr_index % len(ready) selected = None for i in range(len(ready)): idx = (start_idx + i) % len(ready) candidate = ready[idx] if candidate.deficit > 0 and candidate.remaining_duration > 0: run_amount = min(candidate.remaining_duration, candidate.deficit) candidate.deficit -= run_amount candidate.deficit += candidate.weight * quantum self.rr_index = (idx + 1) % len(ready) selected = candidate selected.this_slice = run_amount break if selected: self.ready_queue.remove(selected) print(f"[{self.sim_env.now():.2f}] DWRR selected task {selected.id} (weight: {selected.weight}, run_amount: {selected.this_slice:.2f}, remaining deficit: {selected.deficit:.2f})") return selected class LotteryScheduler(Scheduler): def __init__(self, sim_env: SimPyWrapper, strategy: SchedulingStrategy, preemption_strategy: PreemptionStrategy = PreemptionStrategy.TIME, multi_strategy: bool = False): super().__init__(sim_env, strategy, preemption_strategy, multi_strategy) def compute_internal_priority(self, task: Task) -> float: # LFS: Use tickets for probabilistic selection, but for sort_key stub (selection is random) return 0.0 def select_task(self) -> Optional[Task]: """LFS: Probabilistic selection based on tickets.""" if self.use_multi: return super().select_task() ready = [t for t in self.ready_queue if not t.blocked] if not ready: return None total_tickets = sum(t.tickets for t in ready) if total_tickets == 0: return None # Draw a random ticket draw = random.randint(1, total_tickets) cumulative = 0 for t in ready: cumulative += t.tickets if draw <= cumulative: self.ready_queue.remove(t) print(f"[{self.sim_env.now():.2f}] LFS selected task {t.id} (tickets: {t.tickets}, total: {total_tickets})") return t return None # Fallback class StrideScheduler(Scheduler): def __init__(self, sim_env: SimPyWrapper, strategy: SchedulingStrategy, preemption_strategy: PreemptionStrategy = PreemptionStrategy.TIME, multi_strategy: bool = False): super().__init__(sim_env, strategy, preemption_strategy, multi_strategy) self.stride_enabled = True def compute_internal_priority(self, task: Task) -> float: # Stride: Use pass_value as priority (smaller pass = higher priority) return task.pass_value def select_task(self) -> Optional[Task]: """Stride: Select task with minimal pass_value.""" if self.use_multi: return super().select_task() ready = [t for t in self.ready_queue if not t.blocked] if not ready: return None # Sort by pass_value (smaller first) ready.sort(key=lambda t: t.pass_value) selected = ready[0] self.ready_queue.remove(selected) print(f"[{self.sim_env.now():.2f}] Stride selected task {selected.id} (pass: {selected.pass_value:.2f}, weight: {selected.weight})") return selected class CFSScheduler(Scheduler): def __init__(self, sim_env: SimPyWrapper, strategy: SchedulingStrategy, preemption_strategy: PreemptionStrategy = PreemptionStrategy.TIME, multi_strategy: bool = False): super().__init__(sim_env, strategy, preemption_strategy, multi_strategy) self.fair_enabled = True def compute_internal_priority(self, task: Task) -> float: # CFS: Use vruntime as priority (smaller vruntime = higher priority) return task.vruntime def select_task(self) -> Optional[Task]: """CFS: Select task with minimal vruntime.""" if self.use_multi: return super().select_task() ready = [t for t in self.ready_queue if not t.blocked] if not ready: return None # Sort by vruntime (smaller first) ready.sort(key=lambda t: t.vruntime) selected = ready[0] self.ready_queue.remove(selected) print(f"[{self.sim_env.now():.2f}] CFS selected task {selected.id} (vruntime: {selected.vruntime:.2f}, weight: {selected.weight})") return selected class MLFQScheduler(Scheduler): def __init__(self, sim_env: SimPyWrapper, strategy: SchedulingStrategy, preemption_strategy: PreemptionStrategy = PreemptionStrategy.TIME, multi_strategy: bool = False): super().__init__(sim_env, strategy, preemption_strategy, multi_strategy) def compute_internal_priority(self, task: Task) -> float: # MLFQ: For sort_key within level: use remaining_duration (favor shorter tasks) return task.remaining_duration class CBSScheduler(Scheduler): def __init__(self, sim_env: SimPyWrapper, strategy: SchedulingStrategy, budget: float = 0.5, period: float = 10.0, preemption_strategy: PreemptionStrategy = PreemptionStrategy.TIME, multi_strategy: bool = False): super().__init__(sim_env, strategy, preemption_strategy, multi_strategy) self.budget = budget self.period = period self.server_time = 0.0 def compute_internal_priority(self, task: Task) -> float: # CBS: fallback to base strategy's priority if self.strategy == SchedulingStrategy.FCFS: return self.get_compute_priority_for_strategy(SchedulingStrategy.FCFS, task) return float('inf') def select_task(self) -> Optional[Task]: if self.sim_env.now() - self.server_time >= self.period: self.server_time = self.sim_env.now() if self.sim_env.now() - self.server_time > self.budget: return None return super().select_task() class EEVDFScheduler(Scheduler): def __init__(self, sim_env: SimPyWrapper, strategy: SchedulingStrategy, preemption_strategy: PreemptionStrategy = PreemptionStrategy.TIME, multi_strategy: bool = False): super().__init__(sim_env, strategy, preemption_strategy, multi_strategy) def compute_internal_priority(self, task: Task) -> float: # EEVDF: Placeholder fallback to EDF priority return self.get_compute_priority_for_strategy(SchedulingStrategy.EDF, task) # Ticker for time slices and PELT updates def ticker(env: simpy.Environment, scheduler: Scheduler): sim_env = scheduler.sim_env # Use wrapper while True: yield sim_env.timeout(BASE_SLICE) # Update PELT for all devices periodically for dev in scheduler.devices.values(): if dev.online: dev.update_pelt() scheduler.preempt_current() # Main Simulation with Groups and Dependencies def run_simulation(env: simpy.Environment, scheduler: Scheduler, sim_duration: float = 50.0): sim_env = scheduler.sim_env # Wrapper # Create SchedulerDevice and start its process scheduler_dev = SchedulerDevice(sim_env, scheduler, hz=1000.0) # 1000 Hz for scheduler device scheduler.add_device(scheduler_dev) # Add to devices for consistency scheduler_dev.online = True scheduler_dev.process = sim_env.process(scheduler_dev.run_task()) # Start scheduling loop on device def make_task_gen(i: int): def task_gen() -> Task: task_id = int(sim_env.now() * 1000) % 1000 + i * 1000 duration = 2 + (hash(f"Dev{i}{task_id}") % 3) # 2-4 units for quicker cycles; vary for STCF/MLFQ demo priority = (hash(task_id) % 10) if scheduler.strategy == SchedulingStrategy.PRIORITY else None deadline = (sim_env.now() + duration + (hash(task_id) % 1)) if scheduler.strategy == SchedulingStrategy.EDF else None nr_clk = 300 + (hash(task_id) % 500) # Smaller for demo period = 4 + (hash(task_id) % 6) # 4-9 return Task(task_id, duration, priority=priority, deadline=deadline, nr_clk=nr_clk, period=period, weight=1.0) return task_gen # Group A: Dev0, Dev1 (same func 'A') group_a_gen = make_task_gen(0) dev0 = Device(sim_env, "Dev0", scheduler, hz=1000.0) dev1 = Device(sim_env, "Dev1", scheduler, hz=1000.0) group_a = DeviceGroup("A", [dev0, dev1], group_a_gen) # Group B: Dev2, Dev3, Dev4 (same func 'B') group_b_gen = make_task_gen(2) dev2 = Device(sim_env, "Dev2", scheduler, hz=1100.0) dev3 = Device(sim_env, "Dev3", scheduler, hz=1100.0) dev4 = Device(sim_env, "Dev4", scheduler, hz=1100.0) group_b = DeviceGroup("B", [dev2, dev3, dev4], group_b_gen) scheduler.add_device_group(group_a) scheduler.add_device_group(group_b) # Group C for multi demo if scheduler.use_multi: group_c_gen = make_task_gen(6) dev6 = Device(sim_env, "Dev6", scheduler, hz=1000.0) group_c = DeviceGroup("C", [dev6], group_c_gen) scheduler.add_device_group(group_c) # Set dependencies within groups (e.g., Dev0 -> Dev1 in 'A'; Dev2 -> Dev3 in 'B') scheduler.set_dependency("Dev0", ["Dev1"]) scheduler.set_dependency("Dev2", ["Dev3"]) if scheduler.use_multi: scheduler.set_dependency("Dev6", []) # Optional # Application triggers: Initial tasks for 'A' and 'B' via task_exec, then periodic def app_triggers(): # Initial task_a = group_a_gen() task_a.func_id = "A" if scheduler.use_multi: task_a.strategy = SchedulingStrategy.PRIORITY scheduler.task_exec(task_a) task_b = group_b_gen() task_b.func_id = "B" if scheduler.use_multi: task_b.strategy = SchedulingStrategy.EDF scheduler.task_exec(task_b) if scheduler.use_multi: task_c = group_c_gen() task_c.func_id = "C" task_c.strategy = SchedulingStrategy.CFS scheduler.task_exec(task_c) yield env.timeout(3.0) # Another for 'A' (load balance to Dev1 if Dev0 busy) task_a2 = group_a_gen() task_a2.func_id = "A" if scheduler.use_multi: task_a2.strategy = SchedulingStrategy.PRIORITY scheduler.task_exec(task_a2) while True: yield env.timeout(8.0) # Periodic app requests task_b_periodic = group_b_gen() task_b_periodic.func_id = "B" if scheduler.use_multi: task_b_periodic.strategy = SchedulingStrategy.EDF scheduler.task_exec(task_b_periodic) if scheduler.use_multi: task_c_periodic = group_c_gen() task_c_periodic.func_id = "C" task_c_periodic.strategy = SchedulingStrategy.CFS scheduler.task_exec(task_c_periodic) env.process(app_triggers()) # Demo interfaces (enhanced with new preemption mechanisms) def demo_interfaces(): yield env.timeout(5.0) scheduler.pause_task("Dev0") # Pause Dev0 state0 = scheduler.get_dev_state("Dev0") print(f"[{sim_env.now():.2f}] get_dev_state(Dev0): state={state0.state}, taskinfo={state0.taskinfo.id if state0.taskinfo else None}, hz={state0.hz}, preempted={state0.preempted}") yield env.timeout(2.0) scheduler.resume_task("Dev0") # Resume Dev0 state0_resume = scheduler.get_dev_state("Dev0") print(f"[{sim_env.now():.2f}] get_dev_state(Dev0 after resume): state={state0_resume.state}, taskinfo={state0_resume.taskinfo.id if state0_resume.taskinfo else None}, hz={state0_resume.hz}, preempted={state0_resume.preempted}") yield env.timeout(3.0) scheduler.dump_ready_queue() # Dump at t=10 yield env.timeout(2.0) # New: Demo interrupt/restore interrupted_task = scheduler.interrupt_device("Dev1") # Interrupt Dev1 if running if interrupted_task: print(f"[{sim_env.now():.2f}] Interrupted task {interrupted_task.id}, remaining: {interrupted_task.remaining_duration:.2f}") state1_int = scheduler.get_dev_state("Dev1") print(f"[{sim_env.now():.2f}] get_dev_state(Dev1 after interrupt): state={state1_int.state}, taskinfo={state1_int.taskinfo.id if state1_int.taskinfo else None}, hz={state1_int.hz}, preempted={state1_int.preempted}") yield env.timeout(1.0) scheduler.restore_device("Dev1") # Restore Dev1 yield env.timeout(2.0) # Dynamic add Dev5 to Group B dev5 = Device(sim_env, "Dev5", scheduler, hz=1200.0) scheduler.add_device_dynamic(dev5) group_b.devices.append(dev5) # Add to group task_b_new = group_b_gen() task_b_new.func_id = "B" if scheduler.use_multi: task_b_new.strategy = SchedulingStrategy.EDF scheduler.task_exec(task_b_new) # Enqueue state5 = scheduler.get_dev_state("Dev5") print(f"[{sim_env.now():.2f}] get_dev_state(Dev5 after task_exec): state={state5.state}, taskinfo={state5.taskinfo.id if state5.taskinfo else None}, hz={state5.hz}, preempted={state5.preempted}") yield env.timeout(3.0) scheduler.offline_device("Dev4") # Offline Dev4 yield env.timeout(2.0) scheduler.online_device("Dev4") # Online Dev4 yield env.timeout(2.0) scheduler.cancel_task("Dev1") # Cancel on Dev1 if running # New: Demo launch at tail tail_task = Task(999, 1.0, func_id="A") # Short dummy task if scheduler.use_multi: tail_task.strategy = SchedulingStrategy.PRIORITY scheduler.launch_task_at_tail(tail_task, "Dev0") yield env.timeout(1.0) # New: Demo sync (wait for a task) sync_event = scheduler.sync_task_completion(999) yield sync_event print(f"[{sim_env.now():.2f}] Synced on task 999 completion") env.process(demo_interfaces()) # Callback for app state def app_state_handler(states: Dict[str, DeviceState]): print(f"[{sim_env.now():.2f}] App State: {states}") scheduler.set_app_state_callback(app_state_handler) # Demo control optional (tests deactivate/reactivate) def demo_control(): yield env.timeout(10) scheduler.deactivate() yield env.timeout(2) scheduler.reactivate() env.process(demo_control()) # Start ticker (independent of SchedulerDevice) env.process(ticker(env, scheduler)) # Run until duration yield env.timeout(sim_duration) print(f"Simulation ended at {sim_env.now():.2f}") if __name__ == "__main__": env = simpy.Environment() sim_env = SimPyWrapper(env) print("=== FCFS Scheduler Demo with Task-Centric Queue ===") fcfs = FCFSScheduler(sim_env, SchedulingStrategy.FCFS) env.process(run_simulation(env, fcfs, 25)) env.run(until=25) env = simpy.Environment() # Reset for next sim_env = SimPyWrapper(env) print("\n=== SJF Scheduler Demo with Task-Centric Queue ===") sjf = SJFScheduler(sim_env, SchedulingStrategy.SJF) env.process(run_simulation(env, sjf, 25)) env.run(until=25) print("\n=== STCF Scheduler Demo with Task-Centric Queue ===") env = simpy.Environment() sim_env = SimPyWrapper(env) stcf = STCFScheduler(sim_env, SchedulingStrategy.STCF) env.process(run_simulation(env, stcf, 25)) env.run(until=25) print("\n=== MLFQ Scheduler Demo with Task-Centric Queue ===") env = simpy.Environment() sim_env = SimPyWrapper(env) mlfq = MLFQScheduler(sim_env, SchedulingStrategy.MLFQ) env.process(run_simulation(env, mlfq, 25)) env.run(until=25) env = simpy.Environment() sim_env = SimPyWrapper(env) print("\n=== Priority Scheduler Demo with Task-Centric Queue ===") prio = PriorityScheduler(sim_env, SchedulingStrategy.PRIORITY) env.process(run_simulation(env, prio, 25)) env.run(until=25) env = simpy.Environment() sim_env = SimPyWrapper(env) print("\n=== EDF Scheduler Demo with Task-Centric Queue ===") edf = EDFScheduler(sim_env, SchedulingStrategy.EDF) env.process(run_simulation(env, edf, 25)) env.run(until=25) print("\n=== WRR Scheduler Demo with Task-Centric Queue ===") env = simpy.Environment() sim_env = SimPyWrapper(env) wrr = WRRScheduler(sim_env, SchedulingStrategy.WRR) env.process(run_simulation(env, wrr, 25)) env.run(until=25) print("\n=== DWRR Scheduler Demo with Task-Centric Queue ===") env = simpy.Environment() sim_env = SimPyWrapper(env) dwrr = DWRRScheduler(sim_env, SchedulingStrategy.DWRR) env.process(run_simulation(env, dwrr, 25)) env.run(until=25) print("\n=== LFS (Lottery) Scheduler Demo with Task-Centric Queue ===") env = simpy.Environment() sim_env = SimPyWrapper(env) lfs = LotteryScheduler(sim_env, SchedulingStrategy.LFS) env.process(run_simulation(env, lfs, 25)) env.run(until=25) print("\n=== Stride Scheduler Demo with Task-Centric Queue ===") env = simpy.Environment() sim_env = SimPyWrapper(env) stride = StrideScheduler(sim_env, SchedulingStrategy.STRIDE) env.process(run_simulation(env, stride, 25)) env.run(until=25) print("\n=== CFS (Completely Fair RR) Scheduler Demo with Task-Centric Queue ===") env = simpy.Environment() sim_env = SimPyWrapper(env) cfs = CFSScheduler(sim_env, SchedulingStrategy.CFS) env.process(run_simulation(env, cfs, 25)) env.run(until=25) print("\n=== CBS Scheduler Demo (Simplified) with Task-Centric Queue ===") env = simpy.Environment() sim_env = SimPyWrapper(env) cbs = CBSScheduler(sim_env, SchedulingStrategy.FCFS) env.process(run_simulation(env, cbs, 25)) env.run(until=25) print("\n=== EEVDF Scheduler Demo (Fallback to EDF) with Task-Centric Queue ===") env = simpy.Environment() sim_env = SimPyWrapper(env) eevdf = EEVDFScheduler(sim_env, SchedulingStrategy.EDF) env.process(run_simulation(env, eevdf, 25)) env.run(until=25) # Multi-Strategy Demo print("\n=== Multi-Strategy Scheduler Demo (EDF > PRIORITY > CFS) ===") env = simpy.Environment() sim_env = SimPyWrapper(env) multi = MultiScheduler(sim_env) env.process(run_simulation(env, multi, 25)) env.run(until=25) print("All demos completed. Observe task-centric queue in logs.")