From c1a1d2b5e7e307b7c37889eb4fa537f94b5bdc8e Mon Sep 17 00:00:00 2001 From: Jizong Date: Thu, 31 Oct 2019 00:31:00 -0400 Subject: [PATCH] add a log writer --- .gitignore | 3 ++- gpu_queue/__init__.py | 3 ++- gpu_queue/main.py | 25 ++++++++++++++++++------- gpu_queue/stdout_writer.py | 27 +++++++++++++++++++++++++++ setup.py | 2 +- 5 files changed, 50 insertions(+), 10 deletions(-) create mode 100644 gpu_queue/stdout_writer.py diff --git a/.gitignore b/.gitignore index c2b91c6..eb9eaad 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ GPUQueue.egg-info/ build/ dist/ -**/*.pyc \ No newline at end of file +**/*.pyc +log \ No newline at end of file diff --git a/gpu_queue/__init__.py b/gpu_queue/__init__.py index 6f5c73b..bf118aa 100644 --- a/gpu_queue/__init__.py +++ b/gpu_queue/__init__.py @@ -1 +1,2 @@ -from .main import JobSubmitter \ No newline at end of file +from .main import JobSubmitter +from .stdout_writer import log_writer \ No newline at end of file diff --git a/gpu_queue/main.py b/gpu_queue/main.py index 4bf30bb..19c1c17 100644 --- a/gpu_queue/main.py +++ b/gpu_queue/main.py @@ -1,6 +1,7 @@ import argparse +import time from queue import Queue, Empty -from subprocess import call +from subprocess import run from typing import List Job_Type = str @@ -12,6 +13,11 @@ from contextlib import contextmanager import re +try: + from stdout_writer import log_writer +except ModuleNotFoundError: + from .stdout_writer import log_writer + @contextmanager def wait_thread_ends(thread_name="submitter"): @@ -43,8 +49,9 @@ def wrapper(*args, **kwargs): def get_args(): parser = argparse.ArgumentParser(description="Dynamic gpu job submitter") parser.add_argument("jobs", nargs="+", type=str) - parser.add_argument("--available_gpus", type=str, nargs="+", default=["0"], metavar=["0", "1"], + parser.add_argument("--available_gpus", type=str, nargs="+", default=["0"], metavar="N", help="Available GPUs") + parser.add_argument("--save_dir", type=str, default="log", help="save_dir for log files") args = parser.parse_args() # print(f"input args:%s" % args) print(f"There are {len(args.jobs)} jobs") @@ -56,13 +63,15 @@ def get_args(): # you have one monitor to choose which variable to assign to the next job class JobSubmitter: - def __init__(self, job_array: Job_Array_Type, available_gpus: List[str] = ["0"], verbose=False) -> None: + def __init__(self, job_array: Job_Array_Type, available_gpus: List[str] = ["0"], save_dir="log", + verbose=False) -> None: super().__init__() os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" self.job_array = job_array self.available_gpus = available_gpus self.job_queue = Queue() self.verbose = verbose + self.save_dir = save_dir for job in self.job_array: self.job_queue.put(job) self.gpu_queue = Queue() @@ -75,9 +84,10 @@ def submit_jobs(self): while True: try: - job = self.job_queue.get(timeout=0.1) # if it is going te be empty, end the program + job = self.job_queue.get(timeout=1) # if it is going te be empty, end the program gpu = self.gpu_queue.get(timeout=None, block=True) # this will wait forever self._process_daemeon(job, gpu) + time.sleep(2) except TimeoutError: pass @@ -100,8 +110,9 @@ def submit_jobs(self): def _process_daemeon(self, job, gpu): new_environment = os.environ.copy() new_environment["CUDA_VISIBLE_DEVICES"] = str(gpu) - result_code = call(job, shell=True, env=new_environment) - self.result_dict[job] = result_code + with log_writer(job, save_dir=self.save_dir) as writer: + result_code = run(job, shell=True, env=new_environment, stdout=writer) + self.result_dict[job] = result_code.returncode # Recycling GPU num self.gpu_queue.put(gpu) @@ -114,7 +125,7 @@ def _print(self, result_dict): def main(): args = get_args() - jobmanager = JobSubmitter(args.jobs, args.available_gpus, verbose=True) + jobmanager = JobSubmitter(args.jobs, args.available_gpus, verbose=False, save_dir=args.save_dir) jobmanager.submit_jobs() diff --git a/gpu_queue/stdout_writer.py b/gpu_queue/stdout_writer.py new file mode 100644 index 0000000..d0b85e7 --- /dev/null +++ b/gpu_queue/stdout_writer.py @@ -0,0 +1,27 @@ +from datetime import datetime +from pathlib import Path +import time + +class log_writer: + def __init__(self, job_script: str, save_dir="log") -> None: + super().__init__() + self.job_script = job_script + self.launch_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + self.save_dir = save_dir + if self.save_dir: + Path(save_dir).mkdir(exist_ok=True, parents=True) + print(f"Job {self.job_script}->`job_{self.launch_time}.log`") + time.sleep(1) + + def __enter__(self): + if self.save_dir: + self.f = open(f"{str(Path(self.save_dir) / ('job_' + self.launch_time + '.log'))}", "w") + else: + self.f = open(f"job_{self.launch_time}.log") + self.f.writelines(self.job_script) + self.f.writelines(f"\nlaunched at {self.launch_time}\n") + self.f.writelines("output log:\n") + return self.f + + def __exit__(self, exc_type, exc_val, exc_tb): + self.f.close() diff --git a/setup.py b/setup.py index 445e1f2..8a11b18 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = f.read() setup( name='GPUQueue', - version='0.0.2', + version='0.0.3', packages=['gpu_queue'], url='https://github.com/jizongFox/GPUQueues', license='MIT',