Welcome to the pendant
Documentation!¶
❯ pip install pendant
Features¶
- Submit Batch jobs
pendant package¶
aws
Submodule¶
pendant.aws module¶
-
pendant.aws.
cli
(command: str) → str[source]¶ Use the
awscli
to execute a command.This function will call the
awscli
within the same process and not spawn subprocesses. In addition, the STDERR of the called function will be surpressed and the STDOUT will be returned as a string.Parameters: command – The command to be executed by the awscli
.Examples
>>> # cli('--version')
pendant.aws.batch module¶
-
class
pendant.aws.batch.
JobDefinition
[source]¶ Bases:
object
A Batch job definition.
-
name
¶ Return the name of the job definition.
-
parameters
¶ Return the parameters of the job definition.
-
revision
¶ Return the revision of the job definition.
-
at_revision
(revision: str) → pendant.aws.batch.JobDefinition[source]¶ Set this job definition to a specific revision.
-
-
class
pendant.aws.batch.
BatchJob
(definition: pendant.aws.batch.JobDefinition)[source]¶ Bases:
object
An AWS Batch job.
A Batch job can be instantiated and then submitted against the Batch service. After submission, the job’s status can be queried, the job’s logs can be read, and other methods can be called to understand the state of the job.
Parameters: definition – A Batch job definition. -
container_overrides
¶ Return container overriding parameters.
-
job_id
¶ Return the job ID.
-
queue
¶ Return the job queue.
-
cancel
(reason: str) → Dict[source]¶ Cancel this job.
Parameters: reason – The reason why the job must be canceled. Returns: The service response to job cancellation.
-
terminate
(reason: str) → Dict[source]¶ Terminate this job.
Jobs that are in the STARTING or RUNNING state are terminated, which causes them to transition to FAILED. Jobs that have not progressed to the STARTING state are cancelled.
Parameters: reason – The reason why the job must be terminated. Returns: The service response to job termination.
-
submit
(queue: str, container_overrides: Optional[Mapping] = None) → pendant.aws.response.SubmitJobResponse[source]¶ Submit this job to Batch.
Parameters: - queue – The Batch job queue to use.
- container_overrides – The values to override in the spawned container.
Returns: The service response to job submission.
-
pendant.aws.exception module¶
-
exception
pendant.aws.exception.
BatchJobNotFoundError
[source]¶ Bases:
Exception
A Batch job not found error.
-
exception
pendant.aws.exception.
BatchJobSubmissionError
[source]¶ Bases:
Exception
A Batch job submission error.
-
exception
pendant.aws.exception.
LogStreamNotFoundError
[source]¶ Bases:
Exception
A log stream not found error.
-
exception
pendant.aws.exception.
S3ObjectNotFoundError
[source]¶ Bases:
FileNotFoundError
A file not found error for objects on S3.
pendant.aws.logs module¶
pendant.aws.response module¶
pendant.aws.s3 module¶
-
class
pendant.aws.s3.
S3Uri
(path: Union[str, S3Uri])[source]¶ Bases:
object
An S3 URI which conforms to RFC 3986 formatting.
Parameters: path – The S3 URI path. Examples
>>> uri = S3Uri('s3://mybucket/prefix') >>> uri.scheme 's3://' >>> uri.bucket 'mybucket' >>> uri / 'myobject' S3Uri('s3://mybucket/prefix/myobject')
-
delimiter
= '/'¶
-
scheme
¶ Return the RFC 3986 scheme of this URI.
Example
>>> uri = S3Uri('s3://mybucket/myobject') >>> uri.scheme 's3://'
-
bucket
¶ Return the S3 bucket of this URI.
Example
>>> uri = S3Uri('s3://mybucket/myobject') >>> uri.bucket 'mybucket'
-
key
¶ Return the S3 key of this URI.
Example
>>> uri = S3Uri('s3://mybucket/myobject') >>> uri.key 'myobject'
-
add_suffix
(suffix: str) → pendant.aws.s3.S3Uri[source]¶ Add a suffix to this S3 URI.
Parameters: suffix – Append this suffix to the URI. Examples
>>> uri = S3Uri('s3://mybucket/myobject.bam') >>> uri.add_suffix('.bai') S3Uri('s3://mybucket/myobject.bam.bai')
This is equivalent to:
>>> S3Uri('s3://mybucket/myobject.bam') + '.bai' S3Uri('s3://mybucket/myobject.bam.bai')
-
-
pendant.aws.s3.
s3api_head_object
(bucket: str, key: str, profile: str = 'default') → Dict[source]¶ Use the
awscli
to make a GET request on an S3 object’s metadata.Parameters: - bucket – The S3 bucket name.
- key – The S3 object key.
- profile – The AWS profile to use, defaults to “default”.
Returns: A dictionary of object metadata, if the object exists.
-
pendant.aws.s3.
s3api_object_exists
(bucket: str, key: str, profile: str = 'default') → bool[source]¶ Use the
awscli
to test if an S3 object exists.Parameters: - bucket – The S3 bucket name.
- key – The S3 object key.
- profile – The AWS profile to use, defaults to “default”.
-
pendant.aws.s3.
s3_object_exists
(bucket: str, key: str) → bool[source]¶ Use
boto3.S3.Object
to test if an S3 object exists.Parameters: - bucket – The S3 bucket name.
- key – The S3 object key.
util
Submodule¶
pendant.util module¶
-
class
pendant.util.
ExitCode
[source]¶ Bases:
int
The code returned to a parent process by an executable.
Examples
>>> from subprocess import call >>> exit_code = ExitCode(call('ls')) >>> exit_code.is_ok() True
-
pendant.util.
format_ISO8601
(moment: datetime.datetime) → str[source]¶ Format a datetime into a filename compatible IS8601 representation.
Parameters: moment – A datetime. Returns: The ISO8601 datetime formatted with hyphens as seperators. Examples
>>> from datetime import datetime >>> format_ISO8601(datetime(2018, 2, 23, 12, 13, 38)) '2018-02-23T12-13-38'
How to Contribute¶
Pull requests, feature requests, and issues welcome!
The complete test suite is configured through Tox
:
❯ cd pendant
❯ pip install tox
❯ tox # Run entire dynamic / static analysis test suite
List all environments with:
❯ tox -av
using tox.ini: .../pendant/tox.ini
using tox-3.1.2 from ../tox/__init__.py
default environments:
py36 -> run the test suite with (basepython)
py36-lint -> check the code style
py36-type -> type check the library
py36-docs -> test building of HTML docs
additional environments:
dev -> the official sample_sheet development environment
To run just one environment:
❯ tox -e py36
To pass in positional arguments to a specified environment:
❯ tox -e py36 -- -x tests/test_sample_sheet.py
End-to-end Examples¶
The principle object for deploying jobs to AWS Batch is the Batch job definition. Every Batch job definition has a name, parameters, and some form of optional parameter validation.
from pendant.aws.batch import JobDefinition
from pendant.aws.s3 import S3Uri
from pendant.aws.exception import S3ObjectNotFoundError
class DemoJobDefinition(JobDefinition):
"""A Batch job definition for demonstrating our API.
Args:
input_object: The S3 URI for the input object.
"""
def __init__(self, input_object: S3Uri):
self.input_object = input_object
@property
def name(self) -> str:
"""Return the job definition name."""
return 'demo-job'
def validate(self) -> None:
"""Validate this parameterized job definition."""
if not self.input_object.object_exists():
raise S3ObjectNotFoundError(f'S3 object does not exist: {self.input_object}')
We can now wrap the parameterized job definition in a Batch job and set a specific revision.
from pendant.aws.batch import BatchJob
definition = DemoJobDefinition(input_object='s3://bucket/object')
definition.at_revision('6')
job = BatchJob(definition)
Submitting this Batch job is easy, and introspection can be performed immediately:
response = job.submit(queue='prod')
When the job is in a RUNNING
state we can access the job’s Cloudwatch logs:
for log_event in job.log_stream_events():
print(log_event)
"""
LogEvent(timestamp="1543809952329", message="You have started up this demo job", ingestion_time="1543809957080")
LogEvent(timestamp="1543809955437", message="Configuration, we are loading from...", ingestion_time="1543809957080")
LogEvent(timestamp="1543809955437", message="Defaulting to approximate values", ingestion_time="1543809957080")
LogEvent(timestamp="1543809955437", message="Setting up logger, nothing to see here", ingestion_time="1543809957080")
"""
And if we must, we can cancel the job as long as we provide a reason:
job.terminate(reason='I was just testing!')