diff --git a/tests/cmd-checklist.yaml b/tests/cmd-checklist.yaml index e469f95..b2d3f9d 100644 --- a/tests/cmd-checklist.yaml +++ b/tests/cmd-checklist.yaml @@ -1,238 +1,278 @@ --- - envvars: - KUBECONFIG: /data/project/test/.kube/config CUSTOMURL: https://localhost:30001/api/v1 CUSTOMADDR: 127.0.0.1 CUSTOMFQDN: jobs.svc.toolsbeta.eqiad1.wikimedia.cloud CFGFILE: ./test-cfg.yaml SLEEP: "2" BIN: python3 -c "from tjf_cli.cli import main; main()" TOOLHOME: /data/project/test CONTAINER: tf-bullseye-std NORMALJOBNAME: test-job TESTCMD: test-cmd.sh + CUSTOM_LOG_FILE: custom-log-file SCHEDJOBNAME: test-sched-job CONTJOBNAME: test-cont-job LOADFILE: ./test-load.yaml --- - name: prepare tests tests: - cmd: | cat << EOF > ${CFGFILE} --- # autogenerated configuration file by the testsuite api_url: ${CUSTOMURL} kubeconfig: ${KUBECONFIG} customaddr: ${CUSTOMADDR} customfqdn: ${CUSTOMFQDN} EOF retcode: 0 stderr: "" # cleanup everything - cmd: rm -f ${TOOLHOME}/${NORMALJOBNAME}.* - cmd: rm -f ${TOOLHOME}/${SCHEDJOBNAME}.* - cmd: rm -f ${TOOLHOME}/${CONTJOBNAME}.* + - cmd: rm -f ${TOOLHOME}/${CUSTOM_LOG_FILE}* - cmd: ${BIN} --cfg ${CFGFILE} flush - cmd: sleep ${SLEEP} - name: list images tests: - cmd: ${BIN} --cfg ${CFGFILE} images | grep -q "Short name" retcode: 0 stdout: "" stderr: "" - cmd: ${BIN} --cfg ${CFGFILE} images | grep -q "Container image URL" retcode: 0 stdout: "" stderr: "" - cmd: ${BIN} --cfg ${CFGFILE} images | grep -q ${CONTAINER} retcode: 0 stdout: "" stderr: "" - name: run normal job tests: - cmd: | cat << EOF > ${TOOLHOME}/${TESTCMD} #!/bin/sh echo stdout \$1 echo stderr >&2 sleep 20 # this is a simple script and exits faster than we can test it. this command slows it down EOF retcode: 0 stderr: "" - cmd: chmod a+x ${TOOLHOME}/${TESTCMD} - cmd: ${BIN} --cfg ${CFGFILE} run ${NORMALJOBNAME} --command "./${TESTCMD} --withargs" --image ${CONTAINER} retcode: 0 - name: show normal job tests: - cmd: | response=$(${BIN} --cfg ${CFGFILE} show ${NORMALJOBNAME}) echo "$response" | egrep -q "Job name:"[[:space:]]*"\| ${NORMALJOBNAME}" \ && echo "$response" | egrep -q "Command:"[[:space:]]*"\| ./${TESTCMD} --withargs" \ && echo "$response" | egrep -q "Job type:"[[:space:]]*"\| normal" \ && echo "$response" | egrep -q "Image:"[[:space:]]*"\| ${CONTAINER}" \ && echo "$response" | egrep -q "File log:"[[:space:]]*"\| yes" \ && echo "$response" | egrep -q "Resources:"[[:space:]]*"\| default" \ + && echo "$response" | egrep -q "Output log:"[[:space:]]*"\| *${NORMALJOBNAME}.out" \ + && echo "$response" | egrep -q "Error log:"[[:space:]]*"\| *${NORMALJOBNAME}.err" \ && echo "$response" | egrep -q "Status:" \ && echo "$response" | egrep -q "Hints:" \ && echo "$response" | egrep -q "Emails:" retcode: 0 stdout: "" stderr: "" - name: list normal job (long) tests: # predictable table headers - cmd: | ${BIN} --cfg ${CFGFILE} list -l | grep "Job name:" | grep "Command:" \ | grep "Job type:" | grep "Image:" | grep "File log:" | grep "Resources:" \ - | grep "Status:" | grep -q "Emails:" + | grep "Output log:" | grep "Error log:" | grep "Status:" | grep -q "Emails:" retcode: 0 # predictable job list output - cmd: | ${BIN} --cfg ${CFGFILE} list --long | grep ${NORMALJOBNAME} \ | grep "./${TESTCMD} --withargs" | grep "normal" | grep ${CONTAINER} \ - | egrep ""[[:space:]]"yes"[[:space:]]"" | grep default \ - | egrep ""[[:space:]]"none"[[:space:]]"" + | egrep ""[[:space:]]"yes"[[:space:]]"" | grep ${NORMALJOBNAME}.out \ + | grep ${NORMALJOBNAME}.err | grep default | egrep ""[[:space:]]"none"[[:space:]]"" retcode: 0 - name: list normal job (short) tests: # predictable table headers - cmd: ${BIN} --cfg ${CFGFILE} list | grep "Job name:" | grep "Job type:" | grep -q "Status:" retcode: 0 # predictable job list output - cmd: ${BIN} --cfg ${CFGFILE} list | grep ${NORMALJOBNAME} | grep -q "normal" retcode: 0 -- name: normal job produces file logs - tests: - - cmd: grep -q "stdout --withargs" ${TOOLHOME}/${NORMALJOBNAME}.out - retcode: 0 - - cmd: grep -q "stderr" ${TOOLHOME}/${NORMALJOBNAME}.err - retcode: 0 - - name: normal job can be deleted tests: - cmd: ${BIN} --cfg ${CFGFILE} delete ${NORMALJOBNAME} retcode: 0 - cmd: sleep ${SLEEP} - cmd: ${BIN} --cfg ${CFGFILE} show ${NORMALJOBNAME} | grep ERROR | grep -q "job '${NORMALJOBNAME}' does not exist" retcode: 0 -- name: normal job with no file log doesn't produce it - tests: - - cmd: ${BIN} --cfg ${CFGFILE} run ${NORMALJOBNAME}2 --command "./${TESTCMD} --withargs" --image ${CONTAINER} --no-filelog - retcode: 0 - - cmd: ls ${TOOLHOME}/${NORMALJOBNAME}2.out - retcode: 2 - - cmd: ls ${TOOLHOME}/${NORMALJOBNAME}2.err - retcode: 2 - # cleanup - - cmd: ${BIN} --cfg ${CFGFILE} delete ${NORMALJOBNAME}2 - - name: job with non-default resource allocation tests: - cmd: ${BIN} --cfg ${CFGFILE} run ${NORMALJOBNAME} --command ./${TESTCMD} --image ${CONTAINER} --mem 100Mi --cpu 1 retcode: 0 - cmd: ${BIN} --cfg ${CFGFILE} show ${NORMALJOBNAME} | grep Resources | grep mem | grep 100Mi | grep cpu | grep -q 1 retcode: 0 # cleanup - cmd: ${BIN} --cfg ${CFGFILE} delete ${NORMALJOBNAME} - name: run schedule job tests: - cmd: ${BIN} --cfg ${CFGFILE} run ${SCHEDJOBNAME} --command "./${TESTCMD} --withargs" --image ${CONTAINER} --schedule "* * * * *" retcode: 0 - name: show schedule job tests: - cmd: ${BIN} --cfg ${CFGFILE} show ${SCHEDJOBNAME} | egrep "Job type:"[[:space:]]*"| schedule" | grep -q "* * * * *" retcode: 0 - name: list schedule job tests: - cmd: ${BIN} --cfg ${CFGFILE} list | grep ${SCHEDJOBNAME} | grep schedule | grep -q "* * * * *" retcode: 0 - name: delete schedule job tests: - cmd: ${BIN} --cfg ${CFGFILE} delete ${SCHEDJOBNAME} retcode: 0 - cmd: sleep ${SLEEP} - cmd: ${BIN} --cfg ${CFGFILE} show ${SCHEDJOBNAME} | grep ERROR | grep -q "job '${SCHEDJOBNAME}' does not exist" retcode: 0 - name: run continuous job tests: - cmd: ${BIN} --cfg ${CFGFILE} run ${CONTJOBNAME} --command "./${TESTCMD} --withargs" --image ${CONTAINER} --continuous retcode: 0 - name: show continuous job tests: - cmd: ${BIN} --cfg ${CFGFILE} show ${CONTJOBNAME} | egrep -q "Job type:"[[:space:]]*"| continuous" retcode: 0 - name: list continuous job tests: - cmd: ${BIN} --cfg ${CFGFILE} list | grep ${CONTJOBNAME} | grep -q continuous retcode: 0 - name: delete continuous job tests: - cmd: ${BIN} --cfg ${CFGFILE} delete ${CONTJOBNAME} retcode: 0 - cmd: sleep ${SLEEP} - cmd: ${BIN} --cfg ${CFGFILE} show ${CONTJOBNAME} | grep ERROR | grep -q "job '${CONTJOBNAME}' does not exist" retcode: 0 +- name: normal job produces file logs + tests: + - cmd: ${BIN} --cfg ${CFGFILE} run ${NORMALJOBNAME}2 --command "./${TESTCMD} --withargs" --image ${CONTAINER} + retcode: 0 + - cmd: cat ${TOOLHOME}/${NORMALJOBNAME}2.out + retcode: 0 + - cmd: cat ${TOOLHOME}/${NORMALJOBNAME}2.err + retcode: 0 + # cleanup + - cmd: ${BIN} --cfg ${CFGFILE} delete ${NORMALJOBNAME}2 + +- name: normal job should log to single file + tests: + - cmd: ${BIN} --cfg ${CFGFILE} run ${NORMALJOBNAME}3 --command "./${TESTCMD} --withargs" --image ${CONTAINER} --filelog-stdout ${CUSTOM_LOG_FILE}3.out --filelog-stderr ${CUSTOM_LOG_FILE}3.out + retcode: 0 + - cmd: ${BIN} --cfg ${CFGFILE} show ${NORMALJOBNAME}3 + retcode: 0 + - cmd: grep -q "stdout --withargs" ${TOOLHOME}/${CUSTOM_LOG_FILE}3.out + retcode: 0 + - cmd: grep -q "stderr" ${TOOLHOME}/${CUSTOM_LOG_FILE}3.out + retcode: 0 + - cmd: ls ${TOOLHOME}/${CUSTOM_LOG_FILE}3.err + retcode: 2 + # cleanup + - cmd: ${BIN} --cfg ${CFGFILE} delete ${NORMALJOBNAME}3 + +- name: normal job should log to custom-log-file4.out and custom-log-file4.err + tests: + - cmd: ${BIN} --cfg ${CFGFILE} run ${NORMALJOBNAME}4 --command "./${TESTCMD} --withargs" --image ${CONTAINER} --filelog-stdout ${TOOLHOME}/${CUSTOM_LOG_FILE}4.out --filelog-stderr ${TOOLHOME}/${CUSTOM_LOG_FILE}4.err + retcode: 0 + - cmd: ${BIN} --cfg ${CFGFILE} show ${NORMALJOBNAME}4 + retcode: 0 + - cmd: grep -q "stdout --withargs" ${TOOLHOME}/${CUSTOM_LOG_FILE}4.out + retcode: 0 + - cmd: grep -q "stderr" ${TOOLHOME}/${CUSTOM_LOG_FILE}4.err + retcode: 0 + # cleanup + - cmd: ${BIN} --cfg ${CFGFILE} delete ${NORMALJOBNAME}4 + +- name: normal job with --no-filelog doesn't produce any log + tests: + - cmd: ${BIN} --cfg ${CFGFILE} run ${NORMALJOBNAME}5 --command "./${TESTCMD} --withargs" --image ${CONTAINER} --no-filelog + retcode: 0 + - cmd: ls ${TOOLHOME}/${NORMALJOBNAME}5.out + retcode: 2 + - cmd: ls ${TOOLHOME}/${NORMALJOBNAME}5.err + retcode: 2 + # cleanup + - cmd: ${BIN} --cfg ${CFGFILE} delete ${NORMALJOBNAME}5 + - name: run 3 jobs, they are all listed together tests: - cmd: ${BIN} --cfg ${CFGFILE} run ${NORMALJOBNAME} --command "./${TESTCMD} --withargs" --image ${CONTAINER} retcode: 0 - cmd: ${BIN} --cfg ${CFGFILE} run ${CONTJOBNAME} --command "./${TESTCMD} --withargs" --image ${CONTAINER} --continuous retcode: 0 - cmd: ${BIN} --cfg ${CFGFILE} run ${SCHEDJOBNAME} --command "./${TESTCMD} --withargs" --image ${CONTAINER} --schedule "* * * * *" retcode: 0 - cmd: ${BIN} --cfg ${CFGFILE} list --long | grep ${CONTAINER} | wc -l retcode: 0 stdout: "3" - name: job loading tests: - cmd: | cat << EOF > ${LOADFILE} --- # autogenerated file by the testsuite - name: ${SCHEDJOBNAME} command: ./${TESTCMD} --withargs image: ${CONTAINER} no-filelog: true schedule: "* * * * *" emails: onfinish - image: ${CONTAINER} name: ${CONTJOBNAME} command: ./${TESTCMD} --withargs continuous: true emails: onfailure - name: ${NORMALJOBNAME} image: ${CONTAINER} command: ./${TESTCMD} --withargs wait: true emails: all EOF retcode: 0 - cmd: ${BIN} --cfg ${CFGFILE} load ${LOADFILE} retcode: 0 - cmd: ${BIN} --cfg ${CFGFILE} list --long | grep ${CONTAINER} | wc -l retcode: 0 stdout: "3" - name: cleanup tests: + - cmd: rm -f ${TOOLHOME}/${NORMALJOBNAME}.* + - cmd: rm -f ${TOOLHOME}/${SCHEDJOBNAME}.* + - cmd: rm -f ${TOOLHOME}/${CONTJOBNAME}.* + - cmd: rm -f ${TOOLHOME}/${CUSTOM_LOG_FILE}* - cmd: ${BIN} --cfg ${CFGFILE} flush retcode: 0 - cmd: ${BIN} --cfg ${CFGFILE} list --long | grep -q ${CONTAINERS} retcode: 2 diff --git a/tjf_cli/cli.py b/tjf_cli/cli.py index 558d39b..b31221b 100644 --- a/tjf_cli/cli.py +++ b/tjf_cli/cli.py @@ -1,604 +1,623 @@ # (C) 2021 by Arturo Borrero Gonzalez # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # Some funcionts of this code were copy-pasted from the tools-webservice package. # Copyright on that TBD. # # This program is the command line interface part of the Toolforge Jobs Framework. # from tabulate import tabulate from typing import Optional, Set import textwrap import argparse import getpass import urllib3 import logging import time import json import yaml import sys from tjf_cli.conf import Conf from tjf_cli.loader import calculate_changes - # TODO: disable this for now, review later urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # for --wait: 5 minutes timeout, check every 5 seconds WAIT_TIMEOUT = 60 * 5 WAIT_SLEEP = 5 def parse_args(): description = "Toolforge Jobs Framework, command line interface" parser = argparse.ArgumentParser(description=description) parser.add_argument("--debug", action="store_true", help="activate debug mode") parser.add_argument( "--cfg", default="/etc/toolforge-jobs-framework-cli.cfg", help="YAML config for the CLI. Defaults to '%(default)s'. " "Only useful for Toolforge admins.", ) subparser = parser.add_subparsers( help="possible operations (pass -h to know usage of each)", dest="operation", required=True, ) # TODO: remove this after a few months subparser.add_parser( "containers", help="Kept for compatibility reasons, use `images` instead.", ) subparser.add_parser( "images", help="list information on available container image types for Toolforge jobs", ) runparser = subparser.add_parser( "run", help="run a new job of your own in Toolforge", ) runparser.add_argument("name", help="new job name") runparser.add_argument( "--command", required=True, help="full path of command to run in this job" ) runparser.add_argument( "--image", required=True, help="image shortname (check them with `images`)" ) runparser.add_argument( "--no-filelog", required=False, action="store_true", help="don't store job stdout in `jobname`.out and stderr in `jobname`.err files in the " "user home directory", ) + runparser.add_argument( + "-o", "--filelog-stdout", required=False, help="location to store stdout logs for this job" + ) + runparser.add_argument( + "-e", "--filelog-stderr", required=False, help="location to store stderr logs for this job" + ) runparser.add_argument( "--mem", required=False, help="specify additional memory limit required for this job", ) runparser.add_argument( "--cpu", required=False, help="specify additional CPU limit required for this job", ) runparser.add_argument( "--emails", required=False, choices=["none", "all", "onfinish", "onfailure"], default="none", help="specify if the system should email notifications about this job. " "Defaults to '%(default)s'.", ) runparser_exclusive_group = runparser.add_mutually_exclusive_group() runparser_exclusive_group.add_argument( "--schedule", required=False, help="run a job with a cron-like schedule (example '1 * * * *')", ) runparser_exclusive_group.add_argument( "--continuous", required=False, action="store_true", help="run a continuous job" ) runparser_exclusive_group.add_argument( "--wait", required=False, action="store_true", help=f"run a job and wait for completition. Timeout is {WAIT_TIMEOUT} seconds.", ) showparser = subparser.add_parser( "show", help="show details of a job of your own in Toolforge", ) showparser.add_argument("name", help="job name") listparser = subparser.add_parser( "list", help="list all running jobs of your own in Toolforge", ) listparser.add_argument( "-l", "--long", required=False, action="store_true", help="show long table with full details about each job", ) deleteparser = subparser.add_parser( "delete", help="delete a running job of your own in Toolforge", ) deleteparser.add_argument("name", help="job name") subparser.add_parser( "flush", help="delete all running jobs of your own in Toolforge", ) loadparser = subparser.add_parser( "load", help="flush all jobs and load a YAML file with job definitions and run them", ) loadparser.add_argument("file", help="path to YAML file to load") loadparser.add_argument("--job", required=False, help="load a single job only") restartparser = subparser.add_parser("restart", help="restarts a running job") restartparser.add_argument("name", help="job name") return parser.parse_args() def op_images(conf: Conf): try: response = conf.session.get(conf.api_url + "/images/") except Exception as e: logging.error(f"couldn't contact the API endpoint. Contact a Toolforge admin: {e}") sys.exit(1) if response.status_code != 200: logging.error(f"unable to fetch information. Contact a Toolforge admin: {response.text}") sys.exit(1) try: images = json.loads(response.text) except Exception as e: logging.error(f"couldn't parse information from the API. Contact a Toolforge admin: {e}") sys.exit(1) try: output = tabulate(images, headers=conf.IMAGES_TABULATION_HEADERS, tablefmt="pretty") except Exception as e: logging.error(f"couldn't format information from the API. Contact a Toolforge admin: {e}") sys.exit(1) print(output) def job_prepare_for_output(conf: Conf, job, long_listing=False, supress_hints=True): schedule = job.get("schedule", None) cont = job.get("continuous", None) if schedule is not None: job["type"] = f"schedule: {schedule}" job.pop("schedule", None) elif cont is not None: job["type"] = "continuous" job.pop("continuous", None) else: job["type"] = "normal" filelog = job.get("filelog", "false") if filelog == "True": job["filelog"] = "yes" else: job["filelog"] = "no" mem = job.pop("memory", "default") cpu = job.pop("cpu", "default") if mem == "default" and cpu == "default": job["resources"] = "default" else: job["resources"] = f"mem: {mem}, cpu: {cpu}" if supress_hints: if job.get("status_long", None) is not None: job.pop("status_long", None) else: job["status_long"] = textwrap.fill(job.get("status_long", "Unknown")) if long_listing: headers = conf.JOB_TABULATION_HEADERS_LONG else: headers = conf.JOB_TABULATION_HEADERS_SHORT # not interested in these fields ATM for key in job.copy(): if key not in headers: logging.debug(f"supressing job API field '{key}' before output") job.pop(key) # normalize key names for easier printing for key in headers: if key == "status_long" and supress_hints: continue oldkey = key newkey = headers[key] job[newkey] = job.pop(oldkey, "Unknown") def _list_jobs(conf: Conf): try: response = conf.session.get(conf.api_url + "/list/") except Exception as e: logging.error(f"couldn't contact the API endpoint. Contact a Toolforge admin: {e}") sys.exit(1) if response.status_code != 200: logging.error(f"unable to fetch information. Contact a Toolforge admin: {response.text}") sys.exit(1) try: list = json.loads(response.text) except Exception as e: logging.error(f"couldn't parse information from the API. Contact a Toolforge admin: {e}") sys.exit(1) return list def op_list(conf: Conf, long_listing: bool): list = _list_jobs(conf) if len(list) == 0: logging.debug("no jobs to be listed") return try: for job in list: logging.debug(f"job information from the API: {job}") job_prepare_for_output(conf, job, supress_hints=True, long_listing=long_listing) if long_listing: headers = conf.JOB_TABULATION_HEADERS_LONG else: headers = conf.JOB_TABULATION_HEADERS_SHORT output = tabulate(list, headers=headers, tablefmt="pretty") except Exception as e: logging.error(f"couldn't format information from the API. Contact a Toolforge admin: {e}") sys.exit(1) print(output) def _wait_for_job(conf: Conf, name: str): curtime = starttime = time.time() while curtime - starttime < WAIT_TIMEOUT: time.sleep(WAIT_SLEEP) curtime = time.time() job = _show_job(conf, name, missing_ok=True) if job is None: logging.info(f"job '{name}' completed (and already deleted)") return if job["status_short"] == "Completed": logging.info(f"job '{name}' completed") return if job["status_short"] == "Failed": logging.error(f"job '{name}' failed:") op_show(conf, name) sys.exit(1) logging.error(f"timed out {WAIT_TIMEOUT} seconds waiting for job '{name}' to complete:") op_show(conf, name) sys.exit(1) def op_run( conf: Conf, - name, - command, - schedule, - continuous, - image, - wait, + name: str, + command: str, + schedule: Optional[str], + continuous: bool, + image: str, + wait: bool, no_filelog: bool, - mem: str, - cpu: str, + filelog_stdout: Optional[str], + filelog_stderr: Optional[str], + mem: Optional[str], + cpu: Optional[str], emails: str, ): payload = {"name": name, "imagename": image, "cmd": command, "emails": emails} if continuous: payload["continuous"] = "true" elif schedule: payload["schedule"] = schedule if not no_filelog: # the default is to request the filelog payload["filelog"] = "true" + if filelog_stdout: + payload["filelog_stdout"] = filelog_stdout + + if filelog_stderr: + payload["filelog_stderr"] = filelog_stderr + if mem: payload["memory"] = mem if cpu: payload["cpu"] = cpu logging.debug(f"payload: {payload}") try: response = conf.session.post(conf.api_url + "/run/", data=payload) except Exception as e: logging.error(f"couldn't contact the API endpoint. Contact a Toolforge admin: {e}") sys.exit(1) if response.status_code == 409: logging.error(f"a job with the same name '{name}' exists already") sys.exit(1) if response.status_code >= 300: logging.error(f"unable to create job: {response.text.strip()}") sys.exit(1) logging.debug("job was created") if wait: _wait_for_job(conf, name) def _show_job(conf: Conf, name: str, missing_ok: bool): try: response = conf.session.get(conf.api_url + f"/show/{name}") except Exception as e: logging.error(f"couldn't contact the API endpoint. Contact a Toolforge admin: {e}") sys.exit(1) if response.status_code == 404: if missing_ok: return None # the job doesn't exist, but that's ok! logging.error(f"job '{name}' does not exist") sys.exit(1) try: job = json.loads(response.text) except Exception as e: logging.error(f"couldn't parse information from the API. Contact a Toolforge admin: {e}") sys.exit(1) logging.debug(f"job information from the API: {job}") return job def op_show(conf: Conf, name): job = _show_job(conf, name, missing_ok=False) job_prepare_for_output(conf, job, supress_hints=False, long_listing=True) # change table direction kvlist = [] for key in job: kvlist.append([key, job[key]]) try: output = tabulate(kvlist, tablefmt="grid") except Exception as e: logging.error(f"couldn't format information from the API. Contact a Toolforge admin: {e}") sys.exit(1) print(output) def op_delete(conf: Conf, name: str): try: conf.session.delete(conf.api_url + f"/delete/{name}") except Exception as e: logging.error(f"couldn't contact the API endpoint. Contact a Toolforge admin: {e}") sys.exit(1) logging.debug("job was deleted (if it existed anyway, we didn't check)") def op_flush(conf: Conf): try: conf.session.delete(conf.api_url + "/flush/") except Exception as e: logging.error(f"couldn't contact the API endpoint. Contact a Toolforge admin: {e}") sys.exit(1) logging.debug("all jobs were flushed (if any existed anyway, we didn't check)") def _flush_and_wait(conf: Conf): op_flush(conf) curtime = starttime = time.time() while curtime - starttime < WAIT_TIMEOUT: logging.debug(f"waiting for jobs list to be empty, sleeping {WAIT_SLEEP} seconds") time.sleep(WAIT_SLEEP) curtime = time.time() list = _list_jobs(conf) if len(list) == 0: # ok! return logging.error("could not load new jobs") logging.error(f"timed out ({WAIT_TIMEOUT} seconds) waiting for previous jobs to be flushed") sys.exit(1) def _delete_and_wait(conf: Conf, names: Set[str]): for name in names: op_delete(conf, name) curtime = starttime = time.time() while curtime - starttime < WAIT_TIMEOUT: logging.debug(f"waiting for {len(names)} job(s) to be gone, sleeping {WAIT_SLEEP} seconds") time.sleep(WAIT_SLEEP) curtime = time.time() jobs = _list_jobs(conf) if not any([job for job in jobs if job["name"] in names]): # ok! return logging.error("could not load new jobs") logging.error(f"timed out ({WAIT_TIMEOUT} seconds) waiting for old jobs to be deleted") sys.exit(1) def _load_job(conf: Conf, job: dict, n: int): # these are mandatory try: name = job["name"] command = job["command"] image = job["image"] except KeyError as e: logging.error(f"Unable to load job number {n}. Missing configuration parameter {str(e)}") sys.exit(1) # these are optional schedule = job.get("schedule", None) continuous = job.get("continuous", False) no_filelog = job.get("no-filelog", False) + filelog_stdout = job.get("filelog-stdout", None) + filelog_stderr = job.get("filelog-stderr", None) mem = job.get("mem", None) cpu = job.get("cpu", None) emails = job.get("emails", "none") if not schedule and not continuous: wait = job.get("wait", False) else: wait = False op_run( conf=conf, name=name, command=command, schedule=schedule, continuous=continuous, image=image, wait=wait, no_filelog=no_filelog, + filelog_stdout=filelog_stdout, + filelog_stderr=filelog_stderr, mem=mem, cpu=cpu, emails=emails, ) def op_load(conf: Conf, file: str, job_name: Optional[str]): try: with open(file) as f: jobslist = yaml.safe_load(f.read()) except Exception as e: logging.error(f"couldn't read YAML file '{file}': {e}") sys.exit(1) logging.debug(f"loaded content from YAML file '{file}':") logging.debug(f"{jobslist}") changes = calculate_changes( conf, jobslist, (lambda name: name == job_name) if job_name else None ) if len(changes.delete) > 0 or len(changes.modify) > 0: _delete_and_wait(conf, {*changes.delete, *changes.modify}) for n, job in enumerate(jobslist, start=1): if "name" not in job: logging.error(f"Unable to load job number {n}. Missing configuration parameter name") sys.exit(1) name = job["name"] if name not in changes.add and name not in changes.modify: continue _load_job(conf, job, n) def op_restart(conf: Conf, name: str): try: conf.session.post(conf.api_url + f"/restart/{name}") except Exception as e: logging.error(f"couldn't contact the API endpoint. Contact a Toolforge admin: {e}") sys.exit(1) logging.debug("job was restarted") def main(): args = parse_args() logging_format = "%(levelname)s: %(message)s" if args.debug: logging_level = logging.DEBUG logging_format = f"[%(asctime)s] [%(filename)s] {logging_format}" else: logging_level = logging.INFO logging.addLevelName( logging.WARNING, "\033[1;33m%s\033[1;0m" % logging.getLevelName(logging.WARNING) ) logging.addLevelName( logging.ERROR, "\033[1;31m%s\033[1;0m" % logging.getLevelName(logging.ERROR) ) logging.basicConfig( format=logging_format, level=logging_level, stream=sys.stdout, datefmt="%Y-%m-%d %H:%M:%S" ) user = getpass.getuser() if not user.startswith("tools.") and not user.startswith("toolsbeta."): logging.warning( "not running as the tool account? Likely to fail. Perhaps you forgot `become `?" ) conf = Conf(args.cfg) logging.debug("session configuration generated correctly") if args.operation == "images": op_images(conf) elif args.operation == "containers": # TODO: remove this after a few months logging.warning("the `containers` action is deprecated. Use `images` instead.") op_images(conf) elif args.operation == "run": op_run( - conf, - args.name, - args.command, - args.schedule, - args.continuous, - args.image, - args.wait, - args.no_filelog, - args.mem, - args.cpu, - args.emails, + conf=conf, + name=args.name, + command=args.command, + schedule=args.schedule, + continuous=args.continuous, + image=args.image, + wait=args.wait, + no_filelog=args.no_filelog, + filelog_stdout=args.filelog_stdout, + filelog_stderr=args.filelog_stderr, + mem=args.mem, + cpu=args.cpu, + emails=args.emails, ) elif args.operation == "show": op_show(conf, args.name) elif args.operation == "delete": op_delete(conf, args.name) elif args.operation == "list": op_list(conf, args.long) elif args.operation == "flush": op_flush(conf) elif args.operation == "load": op_load(conf, args.file, args.job) elif args.operation == "restart": op_restart(conf, args.name) logging.debug("-- end of operations") diff --git a/tjf_cli/conf.py b/tjf_cli/conf.py index 26f498d..609131a 100644 --- a/tjf_cli/conf.py +++ b/tjf_cli/conf.py @@ -1,133 +1,135 @@ # (C) 2021 Arturo Borrero Gonzalez # (C) 2022 Taavi Väänänen # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # Some funcionts of this code were copy-pasted from the tools-webservice package. # Copyright on that TBD. import os import socket import sys from logging import getLogger import requests import yaml LOGGER = getLogger(__name__) class Conf: """ Class that represents the configuration for this CLI session """ JOB_TABULATION_HEADERS_SHORT = { "name": "Job name:", "type": "Job type:", "status_short": "Status:", } JOB_TABULATION_HEADERS_LONG = { "name": "Job name:", "cmd": "Command:", "type": "Job type:", "image": "Image:", "filelog": "File log:", + "filelog_stdout": "Output log:", + "filelog_stderr": "Error log:", "emails": "Emails:", "resources": "Resources:", "status_short": "Status:", "status_long": "Hints:", } IMAGES_TABULATION_HEADERS = { "shortname": "Short name", "image": "Container image URL", } def __init__(self, cfg_file: str): """Constructor""" try: with open(cfg_file) as f: cfg = yaml.safe_load(f.read()) except Exception as e: LOGGER.error(f"couldn't read config file '{cfg_file}': {e}. Contact a Toolforge admin.") sys.exit(1) try: self.api_url = cfg.get("api_url") except KeyError as e: LOGGER.error( f"missing key '{str(e)}' in config file '{cfg_file}'. Contact a Toolforge admin." ) sys.exit(1) kubeconfig = cfg.get("kubeconfig", "~/.kube/config") customhdr = cfg.get("customhdr", None) customaddr = cfg.get("customaddr", None) customfqdn = cfg.get("customfqdn", None) self.kubeconfigfile = os.path.expanduser(kubeconfig) try: with open(self.kubeconfigfile) as f: self.k8sconf = yaml.safe_load(f.read()) except Exception as e: LOGGER.error( f"couldn't read kubeconfig file '{self.kubeconfigfile}': {e}. " "Contact a Toolforge admin." ) sys.exit(1) LOGGER.debug(f"loaded kubeconfig file '{self.kubeconfigfile}'") self.session = requests.Session() try: self.context = self._find_cfg_obj("contexts", self.k8sconf["current-context"]) self.cluster = self._find_cfg_obj("clusters", self.context["cluster"]) self.server = self.cluster["server"] self.namespace = self.context["namespace"] self.user = self._find_cfg_obj("users", self.context["user"]) self.session.cert = (self.user["client-certificate"], self.user["client-key"]) except KeyError as e: LOGGER.error( "couldn't build session configuration from file " f"'{self.kubeconfigfile}': missing key {e}. Contact a Toolforge admin." ) sys.exit(1) except Exception as e: LOGGER.error( "couldn't build session configuration from file " f"'{self.kubeconfigfile}': {e}. Contact a Toolforge admin." ) sys.exit(1) self._configure_user_agent() if customhdr is not None: self.session.headers.update(customhdr) # don't verify server-side TLS for now self.session.verify = False if customaddr is not None and customfqdn is not None: from forcediphttpsadapter.adapters import ForcedIPHTTPSAdapter self.session.mount(f"https://{customfqdn}", ForcedIPHTTPSAdapter(dest_ip=customaddr)) def _configure_user_agent(self): """Configure User-Agent header.""" host = socket.gethostname() pyrequest_ua = requests.utils.default_user_agent() ua_str = f"jobs-framework-cli {self.namespace}@{host} {pyrequest_ua}" self.session.headers.update({"User-Agent": ua_str}) def _find_cfg_obj(self, kind, name): """Lookup a named object in our config.""" for obj in self.k8sconf[kind]: if obj["name"] == name: return obj[kind[:-1]] raise KeyError(f"key '{name}' not found in '{kind}' section of config") diff --git a/toolforge-jobs.1 b/toolforge-jobs.1 index 130201c..5716214 100644 --- a/toolforge-jobs.1 +++ b/toolforge-jobs.1 @@ -1,266 +1,282 @@ .\" (C) Copyright 2021 Arturo Borrero Gonzalez .\" .TH TOOLFORGE-JOBS-FRAMEWORK 1 "October 10 2022" .\" Please adjust this date whenever revising the manpage. .\" .SH NAME toolforge-jobs-framework-cli \- command line interface for the Toolforge Jobs Framework .SH SYNOPSIS .B toolforge-jobs [options] {images,run,show,list,delete,flush,load,restart} ... .SH DESCRIPTION The \fBtoolforge-jobs\fP command line interface allows you to interact with the \fBToolforge Jobs Framework\fP. This framework allows you to manage jobs that run in the \fBWikimedia Toolforge\fP platform. In general there are 3 kind of jobs: .TP .B normal Jobs that are initiated by the user, and are expected to run until the job's internal execution normally finishes. .TP .B schedule Jobs that once created by the user, are periodically launched at a given time by the framework. Similar to a cronjob. .TP .B continuous Jobs that once created by the user are expected to be always up and running (for example, a daemon). .SH ACTIONS Top level actions that the command supports: .TP .B images List information on available container image types for the \fBToolforge Jobs Framework\fP. To be used in the \fBrun\fP command \fB--image\fP parameter. Container images marked as \fBDEPRECATED\fP should be avoided in general. Example: .nf $ toolforge-jobs images +--------------------------+------------------------------------------------------------------------+ | Short name | Container image URL | +--------------------------+------------------------------------------------------------------------+ | tf-bullseye-std | docker-registry.tools.wmflabs.org/toolforge-bullseye-standalone:latest | | tf-buster-std-DEPRECATED | docker-registry.tools.wmflabs.org/toolforge-buster-standalone:latest | | tf-golang | docker-registry.tools.wmflabs.org/toolforge-golang-sssd-base:latest | | tf-golang111 | docker-registry.tools.wmflabs.org/toolforge-golang111-sssd-base:latest | | tf-jdk17 | docker-registry.tools.wmflabs.org/toolforge-jdk17-sssd-base:latest | | tf-jdk11-DEPRECATED | docker-registry.tools.wmflabs.org/toolforge-jdk11-sssd-base:latest | | tf-jdk8-DEPRECATED | docker-registry.tools.wmflabs.org/toolforge-jdk8-sssd-base:latest | | tf-node12 | docker-registry.tools.wmflabs.org/toolforge-node12-sssd-base:latest | | tf-node10-DEPRECATED | docker-registry.tools.wmflabs.org/toolforge-node10-sssd-base:latest | | tf-node6-DEPRECATED | docker-registry.tools.wmflabs.org/toolforge-node6-sssd-base:latest | | tf-php5-DEPRECATED | docker-registry.tools.wmflabs.org/toolforge-php5-sssd-base:latest | | tf-php72-DEPRECATED | docker-registry.tools.wmflabs.org/toolforge-php72-sssd-base:latest | | tf-php73-DEPRECATED | docker-registry.tools.wmflabs.org/toolforge-php73-sssd-base:latest | | tf-php74 | docker-registry.tools.wmflabs.org/toolforge-php74-sssd-base:latest | | tf-python2-DEPRECATED | docker-registry.tools.wmflabs.org/toolforge-python2-sssd-base:latest | | tf-python34-DEPRECATED | docker-registry.tools.wmflabs.org/toolforge-python34-sssd-base:latest | | tf-python35-DEPRECATED | docker-registry.tools.wmflabs.org/toolforge-python35-sssd-base:latest | | tf-python37-DEPRECATED | docker-registry.tools.wmflabs.org/toolforge-python37-sssd-base:latest | | tf-python39 | docker-registry.tools.wmflabs.org/toolforge-python39-sssd-base:latest | | tf-ruby21-DEPRECATED | docker-registry.tools.wmflabs.org/toolforge-ruby21-sssd-base:latest | | tf-ruby25-DEPRECATED | docker-registry.tools.wmflabs.org/toolforge-ruby25-sssd-base:latest | | tf-ruby27 | docker-registry.tools.wmflabs.org/toolforge-ruby27-sssd-base:latest | | tf-tcl86 | docker-registry.tools.wmflabs.org/toolforge-tcl86-sssd-base:latest | | wm-bullseye | docker-registry.tools.wmflabs.org/wikimedia-bullseye:latest | | wm-buster-DEPRECATED | docker-registry.tools.wmflabs.org/wikimedia-buster:latest | | wm-stretch-DEPRECATED | docker-registry.tools.wmflabs.org/wikimedia-stretch:latest | +--------------------------+------------------------------------------------------------------------+ .fi .TP .B run NAME --command COMMAND --image IMAGE --no-filelog --mem MEM --cpu CPU [--schedule SCHEDULE | --continuous | --wait] Run a new job of your own in Toolforge. Action specific parameters: .nf NAME New job name, unique identifier. Example: "myjob" --command COMMAND The command to run in the job. Example: "./mycommand.sh argone --argtwo" --image IMAGE Container image shortname. Check them with the \fBcontainers\fP action. Example: "tf-bullseye-std". --no-filelog Disable log storage in files in the tool home directory. +-o, --filelog-stdout Specify the path to store output logs +-e, --filelog-stderr Specify the path to store error logs --mem MEM Request additional memory resource limits for the job. --cpu CPU Request additional CPU resource limits for the job --emails OPT Specify if you want to receive emails about events for this job. Choices are 'none', 'all', 'onfailure', 'onfinish'. The default is 'none'. --schedule SCHEDULE If the job is a schedule, cron time specification. Example: "1 * * * *". --continuous Run a continuous job. --wait Run a normal job and wait for completition. .fi Some complete examples: .nf Running a normal job: $ toolforge-jobs run myjob --command ./mycommand.sh --image tf-bullseye-std Running a normal job and waiting for it to complete: $ toolforge-jobs run myotherjob --command ./myothercommand.sh --image tf-bullseye-std --wait Running a continuous job: $ toolforge-jobs run myalwaysrunningjob --command ./myendlesscommand.sh --image tf-bullseye-std --continuous Running a scheduled job: $ toolforge-jobs run mycronjob --command ./everyminute.sh --image tf-bullseye-std --schedule "1 * * * *" Running a normal job without logs being stored: $ toolforge-jobs run myjob --command ./mycommand.sh --image tf-bullseye-std --no-filelog --emails none +Running a normal job with custom log files: +$ toolforge-jobs run myjob --command ./mycommand.sh --image tf-bullseye-std --emails none -o ./log-file.out -e ./log-file.err + +Running a normal job with both output and error logs stored in same file: +$ toolforge-jobs run myjob --command ./mycommand.sh --image tf-bullseye-std --emails none --filelog-stdout ./log-file.txt --filelog-stderr ./log-file.txt + Running a job with command arguments: $ toolforge-jobs run myjob --command "./mycommand.sh --witharguments" --image tf-bullseye-std --emails all Running a job requesting additional CPU and memory: $ toolforge-jobs run myjob --command "./heavycommand.sh" --image tf-bullseye-std --mem 1Gi --cpu 10 .fi .TP .B show NAME Show details of a job of your own in Toolforge. Example: .nf $ toolforge-jobs show myscheduledjob +------------+-----------------------------------------------------------------+ | Job name: | myscheduledjob | +------------+-----------------------------------------------------------------+ | Command: | ./read-dumps.sh myargument | +------------+-----------------------------------------------------------------+ | Job type: | schedule: * * * * * | +------------+-----------------------------------------------------------------+ | Container: | tf-bullseye-std | +------------+-----------------------------------------------------------------+ | File log: | yes | +------------+-----------------------------------------------------------------+ +| Output log:| /data/project/my-user/myscheduledjob.out | ++------------+-----------------------------------------------------------------+ +| Error log: | /data/project/my-user/myscheduledjob.err | ++------------+-----------------------------------------------------------------+ | Emails: | none | +------------+-----------------------------------------------------------------+ | Resources: | mem: 10Mi, cpu: 100 | +------------+-----------------------------------------------------------------+ | Status: | Last schedule time: 2021-06-30T10:26:00Z | +------------+-----------------------------------------------------------------+ | Hints: | Last run at 2021-06-30T10:26:08Z. Pod in 'Pending' phase. State | | | 'waiting' for reason 'ContainerCreating'. | +------------+-----------------------------------------------------------------+ .fi .TP .B list [-l|--long] List all running jobs of your own in Toolforge. The \fB-l\fP (or \fB--long\fP) parameter indicates if additional fields should be displayed. Example, short listing: .nf $ toolforge-jobs list Job name: Job type: Status: -------------- ------------------- --------------------------- myscheduledjob schedule: * * * * * Last schedule time: 2021-06-30T10:26:00Z alwaysrunning continuous Running myjob normal Completed .fi Example, long listing: .nf $ toolforge-jobs list -l -Job name: Command: Job type: Container: File log: Emails: Resources: Status: --------------- ----------------------- ------------------- --------------- --------- ------- ---------- --------------------------- -myscheduledjob ./read-dumps.sh schedule: * * * * * tf-bullseye-std yes none default Last schedule time: 2021-06-30T10:26:00Z -alwaysrunning ./myendlesscommand.sh continuous tf-bullseye-std no all default Running -myjob ./mycommand.sh --debug normal tf-bullseye-std yes onfinish default Completed +Job name: Command: Job type: Container: File log: Output log: Error log: Emails: Resources: Status: +-------------- ----------------------- ------------------- --------------- --------- ---------------------------------------- ---------------------------------------- ------- ---------- ---------------------------------------- +myscheduledjob ./read-dumps.sh schedule: * * * * * tf-bullseye-std yes /data/project/my-user/myscheduledjob.out /data/project/my-user/myscheduledjob.err none default Last schedule time: 2021-06-30T10:26:00Z +alwaysrunning ./myendlesscommand.sh continuous tf-bullseye-std no /dev/null /dev/null all default Running +myjob ./mycommand.sh --debug normal tf-bullseye-std yes /data/project/my-user/custom.out /data/project/my-user/custom.err onfinish default Completed .fi .TP .B delete NAME Delete a running job of your own in Toolforge. .TP .B flush Delete all running jobs of your own in Toolforge. .TP .B load FILE Flush all jobs (similar to \fBflush\fP action) and read a YAML file with job specifications to be loaded and run all at once. Loading new jobs will stop if failures are found. The file format mirrors arguments to the \fBrun\fP action. Example YAML file: .nf --- # a cronjob - name: everyminute command: ./myothercommand.py -v image: tf-bullseye-std no-filelog: true schedule: "* * * * *" emails: onfailure # a continuous job - image: tf-bullseye-std name: endlessjob command: ./dumps-daemon.py --endless + filelog-stdout: /data/project/user/custom.out + filelog-stderr: /data/project/user/custom.err continuous: true emails: all # wait for this normal job before loading the next - name: myjob image: tf-bullseye-std command: ./mycommand.sh --argument1 + filelog-stdout: /data/project/user/custom.log + filelog-stderr: /data/project/user/custom.log wait: true emails: onfinish # another normal job after the previous one finished running - name: anotherjob image: tf-bullseye-std command: ./mycommand.sh --argument1 emails: none .fi Alternatively, the \fB--job NAME\fP parameter can be used to load (and delete the old one, if it exists) a single job only. .TP .B restart NAME Restarts a currently running job. Only continuous and cron jobs are supported. .SH OPTIONS Normal users wont need any of these options, which are mostly for Toolforge administrators, and only documented here for completeness. .TP .B \-h, \-\-help Show summary of options. .TP .B \-\-debug Activate debug mode. .TP .B \-\-cfg PATH Specify path to a YAML configuration file for the Toolforge Jobs Framework command line interface. If not specified, the default is \fB/etc/toolforge-jobs-framework-cli.cfg\fP. This configuration allows to modify the framework environment and some behavior aspects. Example YAML configuration file: .nf --- api_url: https://jobs.svc.tools.eqiad1.wikimedia.cloud:30001/api/v1 kubeconfig: ~/.kube/config customhdr: { 'hdr': 'true' } customaddr: 127.0.0.1 customfqdn: jobs.svc.toolsbeta.eqiad1.wikimedia.cloud .fi .SH SEE ALSO .nf * https://wikitech.wikimedia.org/wiki/Portal:Toolforge * https://wikitech.wikimedia.org/wiki/Help:Toolforge/Jobs_framework * https://jobs.toolforge.org/ .fi .SH AUTHOR \fBWikimedia Toolforge\fP is a service provided by the \fBWikimedia Foundation Cloud Services\fP team. The \fBToolforge Job Framework\fP was initially designed and written by \fBArturo Borrero Gonzalez\fP.