# Copyright (c) 2012 - 2015 Lars Hupfeldt Nielsen, Hupfeldt IT
# All rights reserved. This work is under a BSD license, see LICENSE.TXT.
import os, time
import os.path
from collections import OrderedDict
import urllib.parse
from .api_base import BuildResult, Progress, AuthError, ClientError, UnknownJobException, BaseApiMixin, ApiInvocationMixin
from .speed import Speed
from .rest_api_wrapper import ResourceNotFound, RequestsRestApi
_superseded = -1
_dequeued = -2
_ct_url_enc = {'Content-Type': 'application/x-www-form-urlencoded'}
def _result_and_progress(build_dct):
result = build_dct['result']
progress = Progress.RUNNING if result is None else Progress.IDLE
result = BuildResult.UNKNOWN if result is None else BuildResult[result]
return (result, progress)
[docs]class Jenkins(Speed, BaseApiMixin):
"""Optimized minimal set of methods needed for jenkinsflow to access Jenkins jobs.
Args:
direct_uri (str): Should be a non-proxied uri if possible (e.g. http://localhost:<port> if flow job is running on master)
The public URI will be retrieved from Jenkins and used in output.
job_prefix_filter (str): Jobs with names that don't start with this string, will be skpped when polling Jenkins.
If you are using Hudson and have many jobs, it might be a good idea to enable Team support and create a job-runner user,
which only has access to the jobs in the flow that it is executing. That way the job list will be filtered serverside.
username (str): Name of user authorized to execute all jobs in flow.
password (str): Password of user.
invocation_class (class): Defaults to `Invocation`. You can subclass that to provide your own class.
csrf (bool): Will attempt to get (and use) a CSRF protection crumb from Jenkins. A 404 - ResourceNotFound error is silently
ignored as this indicates that csrf protection is not enabled on Jenkins.
"""
def __init__(self, direct_uri, job_prefix_filter=None, username=None, password=None, invocation_class=None, rest_access_provider=RequestsRestApi, csrf=True):
if username or password:
if not (username and password):
raise Exception("You must specify both username and password or neither")
self.rest_api = rest_access_provider(direct_uri, username, password)
self.direct_uri = direct_uri
self.username = username
self.password = password
self.invocation_class = invocation_class or Invocation
self.job_prefix_filter = job_prefix_filter
self.jenkins_prefix = urllib.parse.urlsplit(direct_uri).path # When jenkins is started with a 'prefix' value
self._public_uri = None
self.jobs = None
self.queue_items = {}
self.is_jenkins = None
self.csrf = csrf
self._crumb = None
def _get_fresh_crumb(self):
"""Get the CSRF crumb to be put on subsequent requests"""
try:
crumb = self.rest_api.get_content('/crumbIssuer/api/xml', xpath='concat(//crumbRequestField,":",//crumb)').split(b':')
self._crumb = {crumb[0]: crumb[1]}
return self._crumb
except ResourceNotFound:
self.csrf = False
def get_json(self, url="", **params):
# Sometimes (but rarely) Jenkins will Abort the connection when jobs are being aborted!
for ii in (1, 2, 3):
try:
return self.rest_api.get_json(url, **params)
except ConnectionError as ex:
if ii == 3:
raise
print("WARNING: Retrying failed 'poll':", ex)
time.sleep(0.1)
def post(self, url, payload=None, headers=None, **params):
for crumb_attempt in (0, 1):
if self._crumb:
if headers:
headers = headers.copy()
headers.update(self._crumb)
else:
headers = self._crumb
try:
return self.rest_api.post(url, payload, headers, **params)
except ClientError as ex:
if crumb_attempt:
raise
if self.csrf:
if self._crumb:
print("INFO: getting new crumb:", ex)
self._get_fresh_crumb()
def headers(self):
return self.rest_api.headers()
@property
def public_uri(self):
if not self._public_uri:
query = "primaryView[url]"
dct = self.get_json(tree=query)
self._public_uri = dct['primaryView']['url'].rstrip('/')
return self._public_uri
def _public_job_url(self, job_name):
return self.public_uri + '/job/' + job_name
def poll(self):
# Determine whether we are talking to Jenkins or Hudson
if self.is_jenkins is None:
# TODO: A lot of Nonsense here because Hudson does not respond reliably
for _ in (1, 2, 3):
try:
head_response = self.headers()
if head_response.get("X-Jenkins"):
self.is_jenkins = True
break
if head_response.get("X-Hudson"):
self.is_jenkins = False
break
except AuthError:
raise
except Exception as ex:
head_response = "HEAD request failed: " + str(ex)
time.sleep(0.1)
else:
raise Exception("Not connected to Jenkins or Hudson (expected X-Jenkins or X-Hudson header, got: " + repr(head_response))
query = "jobs[name,lastBuild[number,result],queueItem[why],actions[parameterDefinitions[name,type]]],primaryView[url]"
dct = self.get_json(tree=query)
self._public_uri = dct['primaryView']['url'].rstrip('/')
self.jobs = {}
for job_dct in dct.get('jobs') or []:
job_name = str(job_dct['name'])
if self.job_prefix_filter and not job_name.startswith(self.job_prefix_filter):
continue
self.jobs[job_name] = ApiJob(self, job_dct, job_name)
def quick_poll(self):
query = "jobs[name,lastBuild[number,result],queueItem[why]]"
dct = self.get_json(tree=query)
for job_dct in dct.get('jobs') or []:
job_name = str(job_dct['name'])
if self.job_prefix_filter and not job_name.startswith(self.job_prefix_filter):
continue
job = self.jobs.get(job_name)
if job:
job.dct = job_dct
continue
# A new job was created while flow was running, get the remaining properties
try:
query = "lastBuild[number,result],queueItem[why],actions[parameterDefinitions[name,type]]"
job_dct = self.get_json("/job/" + job_name, tree=query)
job = ApiJob(self, job_dct, job_name)
self.jobs[job_name] = job
except ResourceNotFound: # pragma: no cover
# Ignore this, the job came and went
pass
def queue_poll(self):
query = "items[task[name],id]"
dct = self.get_json("/queue", tree=query)
queue_items = {}
for qi_dct in dct.get('items') or []:
job_name = str(qi_dct['task']['name'])
if self.job_prefix_filter and not job_name.startswith(self.job_prefix_filter):
continue
queue_items.setdefault(job_name, []).append(qi_dct['id'])
self.queue_items = queue_items
def get_job(self, name):
try:
return self.jobs[name]
except KeyError:
raise UnknownJobException(self._public_job_url(name))
def create_job(self, job_name, config_xml):
self.post('/createItem', name=job_name,
headers={'Content-Type': 'application/xml header;charset=utf-8'},
payload=config_xml.encode('utf-8'))
def delete_job(self, job_name):
try:
self.post('/job/' + job_name + '/doDelete')
except ResourceNotFound as ex:
# TODO: Check error
raise UnknownJobException(self._public_job_url(job_name), ex)
def _set_description(self, description, build_url, replace=False, separator: str = '\n'):
if not replace:
dct = self.get_json(build_url, tree="description")
existing_description = dct['description']
if existing_description:
description = existing_description + separator + description
self.post(build_url + '/submitDescription', headers=_ct_url_enc, payload={'description': description})
[docs] def set_build_description(
self, description: str, replace: bool = False, separator: str = '\n',
build_url: str = None, job_name: str = None, build_number: int = None):
"""Utility to set/append build description.
Args:
description: The description to set on the build
replace: If True, replace existing description, if any, instead of appending to it
separator: A separator to insert between any existing description and the new :py:obj:`description` if :py:obj:`replace` is False.
build_url: The URL of the Jenkins job build.
job_name: Name of the Jenkins job
build_number: The build number for which to set the description
"""
self.poll()
build_url = self.get_build_url(build_url, job_name, build_number)
try:
self._set_description(description, build_url, replace, separator)
except ResourceNotFound as ex:
raise Exception("Build not found " + repr(build_url), ex)
class ApiJob():
def __init__(self, jenkins, dct, name):
self.jenkins = jenkins
self.dct = dct.copy()
self.name = name
self.public_uri = self.jenkins._public_job_url(self.name) # pylint: disable=protected-access
actions = self.dct.get('actions') or []
self._path = "/job/" + self.name
for action in actions:
if action is None:
continue
if action.get('parameterDefinitions'):
self._build_trigger_path = self._path + "/buildWithParameters"
break
else:
self._build_trigger_path = self._path + "/build"
self.old_build_number = None
self._invocations = OrderedDict()
self.queued_why = None
def invoke(self, securitytoken, build_params, cause, description):
try:
if cause:
build_params = build_params or {}
build_params['cause'] = cause
headers = _ct_url_enc if build_params else None
params = {}
if securitytoken:
params['token'] = securitytoken
response = self.jenkins.post(self._build_trigger_path, headers=headers, payload=build_params, **params)
except ResourceNotFound as ex:
raise UnknownJobException(self.jenkins._public_job_url(self.name), ex) # pylint: disable=protected-access
# Make location relative
location = urllib.parse.urlsplit(response.headers['location']).path
if self.jenkins.jenkins_prefix:
location = os.path.relpath(location, self.jenkins.jenkins_prefix)
old_inv = self._invocations.get(location)
if old_inv:
old_inv.build_number = _superseded
inv = self.jenkins.invocation_class(self, location, description)
self._invocations[location] = inv
return inv
def poll(self):
for invocation in self._invocations.values():
if not invocation.build_number:
# Hudson does not return queue item from invoke, instead it returns the job URL :(
query = "executable[number],why" if self.jenkins.is_jenkins else "queueItem[why],lastBuild[number]"
dct = self.jenkins.get_json(invocation.queued_item_path, tree=query)
if self.jenkins.is_jenkins:
executable = dct.get('executable')
if executable:
invocation.build_number = executable['number']
invocation.queued_why = None
invocation.set_description()
else:
invocation.queued_why = dct['why']
# If we still have invocations in the queue, wait until next poll to query again
break
else: # Hudson
# Note, this is not guaranteed to be correct in case of simultaneously running flows!
# Should handle multiple invocations in same flow
qi = dct.get('queueItem')
if qi:
invocation.queued_why = qi['why']
last_build = dct.get('lastBuild')
if last_build:
last_build_number = last_build['number']
if last_build_number > self.old_build_number:
invocation.build_number = last_build['number']
self.old_build_number = invocation.build_number
invocation.set_description()
else:
break
def job_status(self):
"""Result, progress and latest buildnumber info for the JOB, NOT the invocation
Return (result, progress_info, latest_build_number) (str, str, int or None):
If there is no finished build, result will be BuildResult.UNKNOWN and latest_build_number will be None
"""
progress = None
qi = self.dct['queueItem']
if qi:
progress = Progress.QUEUED
self.queued_why = qi['why']
dct = self.dct.get('lastBuild')
if dct:
self.old_build_number = dct['number']
result, latest_progress = _result_and_progress(dct)
return (result, progress or latest_progress, self.old_build_number)
return (BuildResult.UNKNOWN, progress or Progress.IDLE, None)
def stop_all(self):
# First remove pending builds from queue
queue_item_ids = self.jenkins.queue_items.get(self.name) or []
for qid in queue_item_ids:
try:
self.jenkins.post('/queue/cancelItem', id=repr(qid))
except ResourceNotFound:
# Job is no longer queued, so just ignore
# NOTE: bug https://issues.jenkins-ci.org/browse/JENKINS-21311 also brings us here!
pass
# Abort running builds
query = "builds[number,result]"
dct = self.jenkins.get_json("/job/" + self.name, tree=query)
for build in dct['builds']:
_result, progress = _result_and_progress(build)
if progress != Progress.IDLE:
build_number = build['number']
try:
self.jenkins.post(self._path + '/' + repr(build_number) + '/stop')
except ResourceNotFound: # pragma: no cover
# Build was deleted, just ignore
pass
def update_config(self, config_xml):
self.jenkins.post("/job/" + self.name + "/config.xml",
headers={'Content-Type': 'application/xml header;charset=utf-8'},
payload=config_xml.encode('utf-8'))
def disable(self):
try:
self.jenkins.post(self._path + '/disable')
except ResourceNotFound as ex:
raise UnknownJobException(self.jenkins._public_job_url(self.name), ex) # pylint: disable=protected-access
def enable(self):
try:
self.jenkins.post(self._path + '/enable')
except ResourceNotFound as ex:
raise UnknownJobException(self.jenkins._public_job_url(self.name), ex) # pylint: disable=protected-access
def __repr__(self):
return str(dict(name=self.name, dct=self.dct))
class Invocation(ApiInvocationMixin):
def __init__(self, job, queued_item_path, description):
self.job = job
self.queued_item_path = queued_item_path
self.description = description
self.build_number = None
self.queued_why = None
def __repr__(self):
return 'Invocation: ' + repr(self.queued_item_path) + ' ' + repr(self.build_number) + ' ' + repr(self.queued_why)
def status(self):
"""Result and Progress info for the invocation
Return (result, progress_info) (str, str):
If the build has not started or has not finished running, result will be BuildResult.UNKNOWN
"""
if self.build_number is None:
return (BuildResult.UNKNOWN, Progress.QUEUED)
if self.build_number == _superseded:
return (BuildResult.SUPERSEDED, Progress.IDLE)
if self.build_number == _dequeued:
return (BuildResult.DEQUEUED, Progress.IDLE)
# It seems that even after the executor has been assigned a number in the queue item, the lastBuild might not yet exist
dct = self.job.dct.get('lastBuild')
last_number = dct['number'] if dct else None
if last_number is None:
return (BuildResult.UNKNOWN, Progress.QUEUED)
if last_number == self.build_number:
return _result_and_progress(dct)
if last_number < self.build_number:
# TODO: Why does this happen?
pass # pragma: no cover
# Latest build is not ours, get the correct build
query = "builds[number,result]"
dct = self.job.jenkins.get_json("/job/" + self.job.name, tree=query)
for build in dct['builds']:
if build['number'] == self.build_number:
return _result_and_progress(build)
raise Exception("Build deleted while flow running? This may happen if you invoke more builds than the job is configured to keep. " + repr(self))
def set_description(self):
"""Sets the build description"""
if not self.description:
return
build_url = self.job._path + '/' + repr(self.build_number)
try:
self.job.jenkins._set_description(self.description, build_url)
except ResourceNotFound as ex:
raise Exception("Build deleted while flow running? " + repr(build_url), ex)
def stop(self, dequeue):
try:
if self.build_number is not None and self.build_number >= 0 and not dequeue:
# Job has started
self.job.jenkins.post(self.job._path + '/' + repr(self.build_number) + '/stop')
return
if self.build_number is None and dequeue:
# Job is queued
qid = self.queued_item_path.strip('/').split('/')[2]
self.job.jenkins.post('/queue/cancelItem', id=qid)
self.build_number = _dequeued
except ResourceNotFound as ex: # pragma: no cover
# Job is no longer queued or running, except that it may have just changed from queued to running
# We leave it up to the flow logic to handle that
# NOTE: bug https://issues.jenkins-ci.org/browse/JENKINS-21311 also brings us here!
pass