import _overlapped
import _winapi
import os
import faulthandler
import subprocess
import sys
import threading
import time
faulthandler.enable()
hExit = _overlapped.CreateEvent(None, True, False, None)
class SpawnThread(threading.Thread):
argv = [sys.executable, '-c', 'while True: pass']
startTime = 0
def run(self):
while _winapi.WaitForSingleObject(hExit, 0) == _winapi.WAIT_TIMEOUT:
self.startTime = time.time()
try:
subprocess.run(self.argv, timeout=0.0001)
except subprocess.TimeoutExpired:
pass
threads = [ SpawnThread() for _ in range(os.cpu_count()) ]
for t in threads:
t.start()
stuck = False
while True:
try:
print(f'[{time.asctime()}] running...')
time.sleep(1)
except KeyboardInterrupt:
_overlapped.SetEvent(hExit)
break
current = time.time()
for t in threads:
if current - t.startTime > 1:
print(f'!!! {t.name}')
stuck = True
if stuck:
_overlapped.SetEvent(hExit)
break
for t in threads:
t.join(0.01)
if stuck:
faulthandler.dump_traceback()
input('Inspect processes!')