Source code for jenkinsflow.jenkins_api

# Copyright (c) 2012 - 2015 Lars Hupfeldt Nielsen, Hupfeldt IT
# All rights reserved. This work is under a BSD license, see LICENSE.TXT.

import os
import os.path
import time
import urllib.parse
# import json

from .api_base import BuildResult, Progress, ClientError, UnknownJobException, BaseApiMixin, ApiInvocationMixin
from .speed import Speed
from .rest_api_wrapper import ResourceNotFound, RequestsRestApi


_SUPERSEDED_PSEUDO_BUILD_NUM = -1
_DEQUEUED_PSEUDO_BUILD_NUM = -2

_CT_URL_ENC = {'Content-Type': 'application/x-www-form-urlencoded'}

# Query info and parameter definitions of a specific job
_GIVEN_JOB_QUERY_WITH_PARAM_DEFS = "lastBuild[number,result,building],queueItem[why],actions[parameterDefinitions[name,type]],property[parameterDefinitions[name,type]]"

# Initial query
# Build a three level query to handle github organization folder jobs.
# Note that 'name' in response does not contain first 'job/' from url, but does contain intermediate 'job/', so:
# 'name': 'github-org-jenkinsflow-test/job/jenkinsflow-gh-folder-test/job/main'
# Corresponds:
# 'url': 'http://<host>:<port>/job/github-org-jenkinsflow-test/job/jenkinsflow-gh-folder-test/job/main'
_ONE_LEVEL_JOBS_INFO_QUERY = f"jobs[name,{_GIVEN_JOB_QUERY_WITH_PARAM_DEFS}_RECURSE_]"
_TWO_LEVELS_JOBS_INFO_QUERY = _ONE_LEVEL_JOBS_INFO_QUERY.replace("_RECURSE_", "," + _ONE_LEVEL_JOBS_INFO_QUERY)
_THREE_LEVELS_JOBS_INFO_QUERY = _TWO_LEVELS_JOBS_INFO_QUERY.replace("_RECURSE_", "," + _ONE_LEVEL_JOBS_INFO_QUERY)
_FULL_QUERY = f"{_THREE_LEVELS_JOBS_INFO_QUERY.replace('_RECURSE_', '')},primaryView[url]"


# Query status only of a specific job
_GIVEN_JOB_QUICK_QUERY = "lastBuild[number,result,building],queueItem[why]"

# Quick query
# Build a three level query to handle github organization folder job status.
_ONE_LEVEL_JOBS_STATUS_QUERY = f"jobs[name,{_GIVEN_JOB_QUICK_QUERY}_RECURSE_]"
_TWO_LEVELS_JOBS_STATUS_QUERY = _ONE_LEVEL_JOBS_STATUS_QUERY.replace("_RECURSE_", "," + _ONE_LEVEL_JOBS_STATUS_QUERY)
_THREE_LEVELS_JOBS_STATUS_QUERY = _TWO_LEVELS_JOBS_STATUS_QUERY.replace("_RECURSE_", "," + _ONE_LEVEL_JOBS_STATUS_QUERY)
_QUICK_QUERY = _THREE_LEVELS_JOBS_STATUS_QUERY.replace('_RECURSE_', '')


def _result_and_progress(build_dct):
    building = build_dct.get("building")
    result = build_dct["result"]
    progress = Progress.RUNNING if (building or result is None) else Progress.IDLE
    result = BuildResult.UNKNOWN if result is None else BuildResult[result]
    return (result, progress)


[docs] class JenkinsApi(Speed, BaseApiMixin): """Optimized minimal set of methods needed for jenkinsflow to access Jenkins jobs. Args: direct_uri (str): Should be a non-proxied uri if possible (e.g. http://localhost:<port> if flow job is running on 'built-in' node) The public URI will be retrieved from Jenkins and used in output. job_prefix_filter (str): Jobs with names that don't start with this string, will be skpped when polling Jenkins. username (str): Name of user authorized to execute all jobs in flow. password (str): Password of user. invocation_class (class): Defaults to `Invocation`. You can subclass that to provide your own class. csrf (bool): Will attempt to get (and use) a CSRF protection crumb from Jenkins. A 404 - ResourceNotFound error is silently ignored as this indicates that csrf protection is not enabled on Jenkins. """ def __init__(self, direct_uri, job_prefix_filter=None, username=None, password=None, invocation_class=None, rest_access_provider=RequestsRestApi, csrf=True): if username or password: if not (username and password): raise ValueError("You must specify both username and password or neither") self.rest_api = rest_access_provider(direct_uri, username, password) self.direct_uri = direct_uri self.username = username self.password = password self.invocation_class = invocation_class or Invocation self.job_prefix_filter = job_prefix_filter self.jenkins_prefix = urllib.parse.urlsplit(direct_uri).path # When jenkins is started with a 'prefix' value self._public_uri = None self.jobs = None self.queue_items = {} self.is_jenkins = False self.csrf = csrf self._crumb = None def _get_fresh_crumb(self): """Get the CSRF crumb to be put on subsequent requests""" try: crumb = self.rest_api.get_content('/crumbIssuer/api/xml', xpath='concat(//crumbRequestField,":",//crumb)').split(b':') self._crumb = {crumb[0]: crumb[1]} except ResourceNotFound: self.csrf = False def get_json(self, url="", **params): # pylint: disable=inconsistent-return-statements # Sometimes (but rarely) Jenkins will Abort the connection when jobs are being aborted! for ii in (1, 2, 3): try: return self.rest_api.get_json(url, **params) except ConnectionError as ex: if ii == 3: raise print("WARNING: Retrying failed 'poll':", ex) time.sleep(0.1) def post(self, url, payload=None, headers=None, **params): # pylint: disable=inconsistent-return-statements for crumb_attempt in (0, 1): if self._crumb: if headers: headers = headers.copy() headers.update(self._crumb) else: headers = self._crumb try: return self.rest_api.post(url, payload, headers, **params) except ClientError as ex: if crumb_attempt: raise if self.csrf: if self._crumb: print("INFO: getting new crumb:", ex) self._get_fresh_crumb() def headers(self): return self.rest_api.headers() @property def public_uri(self): if not self._public_uri: query = "primaryView[url]" dct = self.get_json(tree=query) self._public_uri = dct['primaryView']['url'].rstrip('/') return self._public_uri def _public_job_url(self, job_name): return self.public_uri + '/job/' + job_name def _resolve_job_levels(self, dct, parent_job_name, quick: bool): """Resolve jobs for a poll. E.g. gh folder jobs have three levels <org>/<repo>/<branch>.""" has_children = False for job_dct in dct.get('jobs') or []: has_children = True job_name = str(job_dct['name']) if self.job_prefix_filter and not job_name.startswith(self.job_prefix_filter): continue if parent_job_name: job_name = f"{parent_job_name}/job/{job_name}" has_grand_children = self._resolve_job_levels(job_dct, job_name, quick) if quick: job = self.jobs.get(job_name) if job: job.dct = job_dct continue # A new job was created while flow was running, get the remaining properties try: job_dct = self.get_json("/job/" + job_name, tree=_GIVEN_JOB_QUERY_WITH_PARAM_DEFS) # print("new job:", job_dct) job = ApiJob(self, job_dct, job_name, has_children=has_grand_children) self.jobs[job_name] = job except ResourceNotFound: # pragma: no cover # Ignore this, the job came and went continue continue self.jobs[job_name] = ApiJob(self, job_dct, job_name, has_grand_children) #if job_dct["_class"] not in ("hudson.model.FreeStyleProject", "org.jenkinsci.plugins.workflow.job.WorkflowJob"): # print(json.dumps(self.get_json(depth=3), indent=2)) return has_children def poll(self): if not self.is_jenkins: # Determine whether we are talking to Jenkins head_response = self.headers() if head_response.get("X-Jenkins"): self.is_jenkins = True else: raise Exception("Not connected to Jenkins. Expected X-Jenkins header, got: " + repr(head_response)) dct = self.get_json(tree=_FULL_QUERY) self._public_uri = dct['primaryView']['url'].rstrip('/') self.jobs = {} self._resolve_job_levels(dct, None, quick=False) def quick_poll(self): dct = self.get_json(tree=_QUICK_QUERY) # print("dct:", json.dumps(dct, indent=2)) self._resolve_job_levels(dct, None, quick=True) def queue_poll(self): query = "items[task[name],id]" dct = self.get_json("/queue", tree=query) queue_items = {} for qi_dct in dct.get('items') or []: job_name = str(qi_dct['task']['name']) if self.job_prefix_filter and not job_name.startswith(self.job_prefix_filter): continue # print("queue_item name:", job_name) queue_items.setdefault(job_name, []).append(qi_dct['id']) self.queue_items = queue_items def get_job(self, name): try: return self.jobs[name] except KeyError as ex: # print("self.jobs:", self.jobs.keys()) raise UnknownJobException(self._public_job_url(name)) from ex def create_job(self, job_name, config_xml): self.post('/createItem', name=job_name, headers={'Content-Type': 'application/xml header;charset=utf-8'}, payload=config_xml.encode('utf-8')) def delete_job(self, job_name): try: self.post('/job/' + job_name + '/doDelete') except ResourceNotFound as ex: # TODO: Check error raise UnknownJobException(self._public_job_url(job_name), ex) from ex def _set_description(self, description, build_url, replace=False, separator: str = '\n'): if not replace: dct = self.get_json(build_url, tree="description") existing_description = dct['description'] if existing_description: description = existing_description + separator + description self.post(build_url + '/submitDescription', headers=_CT_URL_ENC, payload={'description': description})
[docs] def set_build_description( self, description: str, replace: bool = False, separator: str = '\n', build_url: str = None, job_name: str = None, build_number: int = None): """Utility to set/append build description. Args: description: The description to set on the build replace: If True, replace existing description, if any, instead of appending to it separator: A separator to insert between any existing description and the new :py:obj:`description` if :py:obj:`replace` is False. build_url: The URL of the Jenkins job build. job_name: Name of the Jenkins job build_number: The build number for which to set the description """ self.poll() build_url = self.get_build_url(build_url, job_name, build_number) try: self._set_description(description, build_url, replace, separator) except ResourceNotFound as ex: raise ValueError(f"Build not found {repr(build_url)}", ex) from ex
class ApiJob(): def __init__(self, jenkins, dct, name, has_children): self.jenkins = jenkins self.dct = dct.copy() self.name = name self.has_children = has_children self.public_uri = self.jenkins._public_job_url(self.name) # pylint: disable=protected-access self.old_build_number = None self._invocations = {} self.queued_why = None self._path = "/job/" + self.name for prop in self.dct.get("property", []): if prop and prop.get('parameterDefinitions'): self._build_trigger_path = self._path + "/buildWithParameters" return for action in self.dct.get("action", []): if action and action.get('parameterDefinitions'): self._build_trigger_path = self._path + "/buildWithParameters" return self._build_trigger_path = self._path + "/build" def invoke(self, securitytoken, build_params, cause, description): try: if cause: build_params = build_params or {} build_params['cause'] = cause headers = _CT_URL_ENC if build_params else None params = {} if securitytoken: params['token'] = securitytoken response = self.jenkins.post(self._build_trigger_path, headers=headers, payload=build_params, **params) except ResourceNotFound as ex: raise UnknownJobException(self.jenkins._public_job_url(self.name), ex) from ex # pylint: disable=protected-access # Make location relative location = urllib.parse.urlsplit(response.headers['location']).path if self.jenkins.jenkins_prefix: location = os.path.relpath(location, self.jenkins.jenkins_prefix) old_inv = self._invocations.get(location) if old_inv: old_inv.build_number = _SUPERSEDED_PSEUDO_BUILD_NUM inv = self.jenkins.invocation_class(self, location, description) self._invocations[location] = inv # print("invoke:", location, inv) return inv def poll(self): for invocation in self._invocations.values(): # print("invocation:", invocation) if not invocation.build_number: query = "executable[number],why,*" dct = self.jenkins.get_json(invocation.queued_item_path, tree=query) # print("dct:", dct) executable = dct.get('executable') if executable: invocation.build_number = executable['number'] invocation.queued_why = None invocation.set_description() else: try: invocation.queued_why = dct['why'] # If we still have invocations in the queue, wait until next poll to query again break except KeyError: # 'scans' have no 'why' pass def job_status(self): """Result, progress and latest buildnumber info for the JOB, NOT the invocation Return (result, progress_info, latest_build_number) (str, str, int or None): If there is no finished build, result will be BuildResult.UNKNOWN and latest_build_number will be None """ progress = None if not self.has_children: try: qi = self.dct['queueItem'] if qi: progress = Progress.QUEUED self.queued_why = qi['why'] except KeyError: # 'scans' have no 'queueItem'/'why' and they may not have children if scan has not run/finished pass dct = self.dct.get('lastBuild') if dct: self.old_build_number = dct['number'] if dct["result"] is None: # Latest build dct does not have result, get it from builds list. query = "builds[number,result]" builds = self.jenkins.get_json(self._path, tree=query) for build_dct in builds['builds']: if build_dct['number'] == self.old_build_number: dct = build_dct break result, latest_progress = _result_and_progress(dct) return (result, progress or latest_progress, self.old_build_number) return (BuildResult.UNKNOWN, progress or Progress.IDLE, None) def stop_all(self): # First remove pending builds from queue queue_item_ids = self.jenkins.queue_items.get(self.name) or [] for qid in queue_item_ids: try: self.jenkins.post('/queue/cancelItem', id=repr(qid)) except ResourceNotFound: # Job is no longer queued, so just ignore # NOTE: bug https://issues.jenkins-ci.org/browse/JENKINS-21311 also brings us here! pass # Abort running builds query = "builds[number,result]" dct = self.jenkins.get_json(self._path, tree=query) for build in dct['builds']: _result, progress = _result_and_progress(build) if progress != Progress.IDLE: build_number = build['number'] try: self.jenkins.post(self._path + '/' + repr(build_number) + '/stop') except ResourceNotFound: # pragma: no cover # Build was deleted, just ignore pass def update_config(self, config_xml): self.jenkins.post(self._path + "/config.xml", headers={'Content-Type': 'application/xml header;charset=utf-8'}, payload=config_xml.encode('utf-8')) def disable(self): try: self.jenkins.post(self._path + '/disable') except ResourceNotFound as ex: raise UnknownJobException(self.jenkins._public_job_url(self.name), ex) from ex # pylint: disable=protected-access def enable(self): try: self.jenkins.post(self._path + '/enable') except ResourceNotFound as ex: raise UnknownJobException(self.jenkins._public_job_url(self.name), ex) from ex # pylint: disable=protected-access def __repr__(self): return str(dict(name=self.name, dct=self.dct)) class Invocation(ApiInvocationMixin): def __init__(self, job, queued_item_path, description): self.job = job self.queued_item_path = queued_item_path self.description = description self.build_number = None self.queued_why = None def __repr__(self): return 'Invocation: ' + repr(self.queued_item_path) + ' ' + repr(self.build_number) + ' ' + repr(self.queued_why) def status(self): """Result and Progress info for the invocation Return (result, progress_info) (str, str): If the build has not started or has not finished running, result will be BuildResult.UNKNOWN """ if self.build_number is None: return (BuildResult.UNKNOWN, Progress.QUEUED) if self.build_number == _SUPERSEDED_PSEUDO_BUILD_NUM: return (BuildResult.SUPERSEDED, Progress.IDLE) if self.build_number == _DEQUEUED_PSEUDO_BUILD_NUM: return (BuildResult.DEQUEUED, Progress.IDLE) # It seems that even after the executor has been assigned a number in the queue item, the lastBuild might not yet exist dct = self.job.dct.get('lastBuild') last_number = dct['number'] if dct else None if last_number is None: return (BuildResult.UNKNOWN, Progress.QUEUED) if last_number == self.build_number: return _result_and_progress(dct) if last_number < self.build_number: # TODO: Why does this happen? pass # pragma: no cover # Latest build is not ours, get the correct build query = "builds[number,result]" dct = self.job.jenkins.get_json(self.job._path, tree=query) for build in dct['builds']: if build['number'] == self.build_number: return _result_and_progress(build) raise Exception("Build deleted while flow running? This may happen if you invoke more builds than the job is configured to keep. " + repr(self)) def set_description(self): """Sets the build description""" if not self.description: return build_url = self.job._path + '/' + repr(self.build_number) try: self.job.jenkins._set_description(self.description, build_url) except ResourceNotFound as ex: raise Exception("Build deleted while flow running? " + repr(build_url), ex) from ex def stop(self, dequeue): try: if self.build_number is not None and self.build_number >= 0 and not dequeue: # Job has started self.job.jenkins.post(self.job._path + '/' + repr(self.build_number) + '/stop') return if self.build_number is None and dequeue: # Job is queued qid = self.queued_item_path.strip('/').split('/')[2] self.job.jenkins.post('/queue/cancelItem', id=qid) self.build_number = _DEQUEUED_PSEUDO_BUILD_NUM except ResourceNotFound: # pragma: no cover # Job is no longer queued or running, except that it may have just changed from queued to running # We leave it up to the flow logic to handle that # NOTE: bug https://issues.jenkins-ci.org/browse/JENKINS-21311 also brings us here! pass