Skip to content

Commit

Permalink
add a log writer
Browse files Browse the repository at this point in the history
  • Loading branch information
Jizong committed Oct 31, 2019
1 parent 651f272 commit c1a1d2b
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 10 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
GPUQueue.egg-info/
build/
dist/
**/*.pyc
**/*.pyc
log
3 changes: 2 additions & 1 deletion gpu_queue/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .main import JobSubmitter
from .main import JobSubmitter
from .stdout_writer import log_writer
25 changes: 18 additions & 7 deletions gpu_queue/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import argparse
import time
from queue import Queue, Empty
from subprocess import call
from subprocess import run
from typing import List

Job_Type = str
Expand All @@ -12,6 +13,11 @@
from contextlib import contextmanager
import re

try:
from stdout_writer import log_writer
except ModuleNotFoundError:
from .stdout_writer import log_writer


@contextmanager
def wait_thread_ends(thread_name="submitter"):
Expand Down Expand Up @@ -43,8 +49,9 @@ def wrapper(*args, **kwargs):
def get_args():
parser = argparse.ArgumentParser(description="Dynamic gpu job submitter")
parser.add_argument("jobs", nargs="+", type=str)
parser.add_argument("--available_gpus", type=str, nargs="+", default=["0"], metavar=["0", "1"],
parser.add_argument("--available_gpus", type=str, nargs="+", default=["0"], metavar="N",
help="Available GPUs")
parser.add_argument("--save_dir", type=str, default="log", help="save_dir for log files")
args = parser.parse_args()
# print(f"input args:%s" % args)
print(f"There are {len(args.jobs)} jobs")
Expand All @@ -56,13 +63,15 @@ def get_args():
# you have one monitor to choose which variable to assign to the next job
class JobSubmitter:

def __init__(self, job_array: Job_Array_Type, available_gpus: List[str] = ["0"], verbose=False) -> None:
def __init__(self, job_array: Job_Array_Type, available_gpus: List[str] = ["0"], save_dir="log",
verbose=False) -> None:
super().__init__()
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
self.job_array = job_array
self.available_gpus = available_gpus
self.job_queue = Queue()
self.verbose = verbose
self.save_dir = save_dir
for job in self.job_array:
self.job_queue.put(job)
self.gpu_queue = Queue()
Expand All @@ -75,9 +84,10 @@ def submit_jobs(self):
while True:

try:
job = self.job_queue.get(timeout=0.1) # if it is going te be empty, end the program
job = self.job_queue.get(timeout=1) # if it is going te be empty, end the program
gpu = self.gpu_queue.get(timeout=None, block=True) # this will wait forever
self._process_daemeon(job, gpu)
time.sleep(2)

except TimeoutError:
pass
Expand All @@ -100,8 +110,9 @@ def submit_jobs(self):
def _process_daemeon(self, job, gpu):
new_environment = os.environ.copy()
new_environment["CUDA_VISIBLE_DEVICES"] = str(gpu)
result_code = call(job, shell=True, env=new_environment)
self.result_dict[job] = result_code
with log_writer(job, save_dir=self.save_dir) as writer:
result_code = run(job, shell=True, env=new_environment, stdout=writer)
self.result_dict[job] = result_code.returncode
# Recycling GPU num
self.gpu_queue.put(gpu)

Expand All @@ -114,7 +125,7 @@ def _print(self, result_dict):

def main():
args = get_args()
jobmanager = JobSubmitter(args.jobs, args.available_gpus, verbose=True)
jobmanager = JobSubmitter(args.jobs, args.available_gpus, verbose=False, save_dir=args.save_dir)
jobmanager.submit_jobs()


Expand Down
27 changes: 27 additions & 0 deletions gpu_queue/stdout_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from datetime import datetime
from pathlib import Path
import time

class log_writer:
def __init__(self, job_script: str, save_dir="log") -> None:
super().__init__()
self.job_script = job_script
self.launch_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.save_dir = save_dir
if self.save_dir:
Path(save_dir).mkdir(exist_ok=True, parents=True)
print(f"Job {self.job_script}->`job_{self.launch_time}.log`")
time.sleep(1)

def __enter__(self):
if self.save_dir:
self.f = open(f"{str(Path(self.save_dir) / ('job_' + self.launch_time + '.log'))}", "w")
else:
self.f = open(f"job_{self.launch_time}.log")
self.f.writelines(self.job_script)
self.f.writelines(f"\nlaunched at {self.launch_time}\n")
self.f.writelines("output log:\n")
return self.f

def __exit__(self, exc_type, exc_val, exc_tb):
self.f.close()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = f.read()
setup(
name='GPUQueue',
version='0.0.2',
version='0.0.3',
packages=['gpu_queue'],
url='https://github.com/jizongFox/GPUQueues',
license='MIT',
Expand Down

0 comments on commit c1a1d2b

Please sign in to comment.