Categories
Latest Posts
Running Cromwell on AWS/Batch
- Posted: 2018-11-21.
Parallel mysql myisam repair
- Posted: 2018-11-21.
Does a TKI like Crizotinib kill tumor cells ?
- Posted: 2018-10-28.
Save Spotify to flac or mp3
- Posted: 2018-07-14.
Replace all symlinks by the original file
- Posted: 2018-03-02.
Tag Collection
torque/pbs javascript BASH Cluster windows todo.txt Paired-End TrainOfThought whitespace NFS Apache python Typesetting proftpd cromwell FTP osd_cat terraform compress tikz bioinformatics Recovery Conky VMware line-end PHP Headless Batch spotify Linespacing Perl natbib galaxy indent R NSCLC antialias telenet AWS Remote ExecOnCommand fuzzy match bibtex Installation NGS timer Searching sudoers XFS api spam fancy LaTeX Password GATK HPC VMplayer docker CPAN Image mpd apoptosis Ubuntu cotd mysql Yelo.tv Rcran SLURM Silverlight preamble Literature cloudformation dos2unix drmaa levenshtein
Log in
Batch Workflow starting using the Galaxy API : Practical Example
Posted on 2012-11-02 11:53:22
by Geert Vandeweyer
Loading Content
Batch Workflow starting using the Galaxy API : Practical Example
Posted on 2012-11-02 11:53:22
by Geert Vandeweyer
Galaxy is a very powerfull interface to the analysis of Next-Generation-Sequencing data. On the other hand, it has some drawbacks. On of the major ones is the inability (for now) to run workflows on multiple pairs of input data files. This means that for running a workflow to QC, align and call variants for a 46 samples analyzed on a paired-end sequencing run, you need to fill in the entire workflow setup page 46 times. Being lazy as I am, I first wrote an interlacer to convert paired-end data to single datasets, than can be batched through the web-interface. But as that uses a lot of disk-space, and still needs manual interaction, I decided to dive into the galaxy-api after setting up the analysis for a couple of such large scale experiments.
But first things first. As the documentation on the api is very sparse, I listed here some of the needed steps.
1. Enable the api in universe_wsgi.ini
Set the following lines in your galaxy universe file.
# enable the api enable_api = True # If you need to send results from the api to other users history, set this to your email api_allow_run_as = your.login@mail.com
2. Redirecting the api on a proxy'd install
I have galaxy running on a subdirectory on our webserver. Redirection is done in the .htaccess file. As the api listens on the root/api address, this needs to be redirected as well.
## this is our .htaccess config RewriteEngine on # redirect galaxy content RewriteRule ^galaxy$ /galaxy/ [R] RewriteRule ^/galaxy/static/style/(.*) /home/galaxyuser/galaxy-dist/static/june_2007_style/blue/$1 [L] RewriteRule ^/galaxy/static/scripts/(.*) /home/galaxyuser/galaxy-dist/static/scripts/packed/$1 [L] RewriteRule ^/galaxy/static/(.*) /home/galaxyuser/galaxy-dist/static/$1 [L] RewriteRule ^/galaxy/favicon.ico /home/galaxyuser/galaxy-dist/static/favicon.ico [L] RewriteRule ^/galaxy/robots.txt /home/galaxyuser/galaxy-dist/static/robots.txt [L] RewriteRule ^/galaxy/admin/jobs(.*) http://localhost:8079/admin/jobs$1 [P] RewriteRule ^galaxy(.*) http://localhost:8080$1 [P] ## rewrite the api location. RewriteRule ^api(.*) http://localhost:8080/api/$1 [P]
3. Activate API for your user account
Before you can use the API, you need to get an API-key. After logging in your galaxy instance, goto user => API Keys to generate/get your key.
4. API syntax
These are some guidelines to the api syntax I've put together experimenting:
- Global: http://your.galaxy.server/api/<WHAT YOU WANT>?key=<YOUR API KEY>
- Details: http://your.galaxy.server/api/WHAT YOU WANT>/<ITEM_ID>?key=<YOUR API KEY>
- Contents: http://your.galaxy.server/api/WHAT YOU WANT>/<ITEM_ID>/contents?key=<YOUR API KEY>
- Details of content: http://your.galaxy.server/api/WHAT YOU WANT>/<ITEM_ID>/contents/<CONTENT_ID>?key=<YOUR API KEY>
Examples (try it in a browser on your galaxy-install to see the output format):
- List histories: <url>/api/histories?key=<apikey>
- List Users: <url>/api/users?key=<apikey>
- List Libraries: <url>/api/libraries?key=<apikey>
- List workflows: <url>/api/workflows?key=<apikey>
- List Data in a history: <url>/api/histories/history_id/contents?key=<apikey>
5. Perl Script querying the API using GET (retrieve info)
The following script contains some examples on how to retrieve the JSON-formatted answer from the galaxy-API using perl and print the output. It only lists libraries, histories and workflows, and is more of an example than a usable program. It is started as:
# list libraries perl script.pl -a <apikey> -l # list library content perl script.pl -a <apikey> -L libID #same for workflow (w/W), history (h/H)
#!/usr/bin/perl use JSON -support_by_pp; use LWP::Simple; use Getopt::Std; use LWP::UserAgent; our $url = 'your.galaxy.server.ip.or.name'; ########################## # COMMAND LINE ARGUMENTS # ########################## # a = (a)pi key # l = list (l)ibraries (boolean) # L = list (L)ibrary contents by library id provided (-L libID) # h = list (h)istories (boolean) # H = list (H)istory contents by history id provided (-H histID) # w = list (w)orkflows (boolean) # W = list (W)orkflow details getopts('a:lL:hH:wW:', \%opts); # option are in %opts ## actions ! if (!defined($opts{'a'})) { die('The api key is mandatory (-a)'); } else { our $apikey = $opts{'a'}; } ## get libraries if (defined($opts{'l'})) { my %libs = fetch_json_page("http://$url/api/libraries?key=$apikey"); print "You have access to these libraries:\n"; print "###################################\n"; foreach(keys(%libs)) { print " $_ => $libs{$_}\n"; } print "\n"; print "Use -L Library_ID to list their contents.\n"; exit(); } ## get library contents if (defined($opts{'L'})) { if ($opts{'L'} eq '') { die('You need to specify a library id with the -L switch.'); } ## get library name. my %answer = fetch_json_page("http://$url/api/libraries/$opts{'L'}?key=$apikey"); my ($libname) = keys(%answer); my %data = fetch_json_page("http://$url/api/libraries/$opts{'L'}/contents?key=$apikey"); print "Library Contents for '$libname':\n"; foreach(keys(%data)) { print " $_ => $data{$_}\n"; } print "\n"; #print "Use -L Library_ID to list their contents.\n"; exit(); } ## get histories if (defined($opts{'h'})) { my %libs = fetch_json_page("http://$url/api/histories?key=$apikey"); print "You have access to these histories:\n"; print "###################################\n"; foreach(keys(%libs)) { print " $_ => $libs{$_}\n"; } print "\n"; print "Use -H History_ID to list their contents.\n"; exit(); } ## get history contents if (defined($opts{'H'})) { if ($opts{'H'} eq '') { die('You need to specify a History id with the -H switch.'); } ## get library name. my %answer = fetch_json_page("http://$url/api/histories/$opts{'H'}?key=$apikey"); my ($histname) = keys(%answer); my %data = fetch_json_page("http://$url/api/histories/$opts{'H'}/contents?key=$apikey"); print "History Contents for '$histname':\n"; foreach(keys(%data)) { print " $_ => $data{$_}\n"; } print "\n"; #print "Use -L Library_ID to list their contents.\n"; exit(); } ## get workflows if (defined($opts{'w'})) { my %libs = fetch_json_page("http://$url/api/workflows?key=$apikey"); print "You have access to these workflows:\n"; print "###################################\n"; foreach(keys(%libs)) { print " $_ => $libs{$_}\n"; } print "\n"; print "Use -W WorkFlow_ID to list their details.\n"; exit(); } ## get workflow details if (defined($opts{'W'})) { if ($opts{'W'} eq '') { die('You need to specify a Workflow id with the -W switch.'); } ## get library name. my %answer = fetch_json_page("http://$url/api/workflows/$opts{'W'}?key=$apikey"); my ($wfname) = keys(%answer); my %data = %{$answer{$wfname}}; print "Details for '$wfname':\n"; print " 1. Inputs\n"; foreach(keys(%{$data{'inputs'}})) { print " $_ => $data{'inputs'}{$_}{'label'}\n"; } print "\n"; exit(); } ## SUBROUTINE sub fetch_json_page { my ($json_url) = @_; my %items; eval{ # download the json page: my $content = get $json_url ;#$browser->content(); die "Couldn't get $json_url" unless defined $content; my $json = new JSON; # these are some nice json options to relax restrictions a bit: my $json_text = $json->allow_nonref->utf8->relaxed->escape_slash->loose->allow_singlequote->allow_barekey->decode($content); ## first option: multiple values => json_text = arrayref if (ref($json_text) eq 'ARRAY') { foreach my $itemref (@{$json_text}) { $items{$itemref->{'name'}} = $itemref->{'id'}; } } ## second: only one value => json_text = hashref elsif (ref($json_text) eq 'HASH') { $items{$json_text->{'name'}} = $json_text; } }; # catch crashes: if($@){ print "[[JSON ERROR]] JSON parser crashed! $@\n"; } else { return %items; } }
6. The real stuff: Starting Workflows for a batch of paired-end runs in a single go.
Now for the actual work: The following script performs the following steps:
- Check if the user is able to run the api as different user :
- request : http://<url>/api/users?key=<apikey>
- If multiple values are returned : present them to pick a user, otherwise, run as current user (<apikey> == user)
- run_as is added to submitted data if needed.
- NOTE: all input data comes from provided <apikey>, output data goes to run_as with needed permissions.
- Present workflows available to user associated to <apikey>
- request: http://<url>/api/workflows?key=<apikey>
- Select a workflow, store ID (response['id'])
- Present Histories to select the source data.
- request: http://<url>/api/histories?key=<apikey>
- request: http://<url>/api/histories?key=<apikey>
- Switch between Batch workflow running (on pairs of input data) or single run on a selection of files (only batch is outlined here)
- Go over the workflow input datasets to select where to get them from.
- Request : http://<url>/api/workflows/<workflow_id>?key=<apikey>
- inputs are in answer[inputs]
- Per input offer options:
- If from history : list items in selected history, pick one
- If from library : list library, pick library, list contents of library, pick item
- If BATCH input dataset: put aside for later processing
- Loop over all datafiles in the selected history, looking for paired-end data:
- Pairs are defined as _R1_ and _R2_ filenames.
- Look for _R1_, replace with _R2_ for pair. If both are present, submit workflow, else goto next datafile
- Submit the workflow
- Compose the data in the ds_map dictionary
- Create a history name based on the selected batched datasets
- POST to the galaxyurl. When using POST, the api will run the provided workflow instead of requesting details about it.
python script.py <apikey>
#!/usr/bin/python import sys import json import urllib import urllib2 import re ## global variables galaxyurl = "http://YOUR.SERVER.URL/api/" try: apikey = sys.argv[1] except: # print help information and exit: print('No api key provided') # will print something like "option -a not recognized" #usage() sys.exit(2) runas = '' ## main program def main(): ## 0. RUN AS : users = fetch_json_page(galaxyurl + 'users?key='+apikey) idx = 0 uids = {} if len(users) == 1: runas = ''; else: print "Select a user to act as:" for i in range(len(users)): idx += 1 print ' ' + str(idx) + ': ' + users[i]['email'] uids[str(idx)] = users[i]['id'] uchoice = str(raw_input("Your Choice: ")) if not (uchoice in uids): print "Invalid user selection." sys.exit(2) runas = "&run_as="+uids[uchoice] # 1. WORKFLOW SELECTION print "\n"; print "###############################"; print "# Select the workflow to run: #"; print "###############################"; wfs = fetch_json_page(galaxyurl + 'workflows?key='+apikey+runas) print galaxyurl + 'workflows?key='+apikey+runas idx = 0 wfids = {} if len(wfs) == 0: print "\nNo workflows available for this api key"; sys.exit(1) for i in range(len(wfs)): idx += 1 print ' ' + str(idx) + ': ' + wfs[i]['name'] wfids[str(idx)] = wfs[i]['id'] wfchoice = str(raw_input("Your Choice: ")) if not (wfchoice in wfids): print "Invalid workflow selection." sys.exit(2) # 2. HISTORY WITH DATAFILES print "\n" print "################################################" print "# Select the History Containing the datafiles: #" print "################################################" hists = fetch_json_page(galaxyurl + 'histories?key='+apikey+runas) idx = 0 hids = {} if len(hists) == 0: print "\nNo histories available for this api key"; sys.exit(1) for i in range(len(hists)): idx += 1 print ' ' + str(idx) + ': ' + hists[i]['name'] hids[str(idx)] = hists[i]['id'] hchoice = str(raw_input("Your Choice: ")) if not (hchoice in hids): print "Invalid history selection." sys.exit(2) ## 3. Send data to new history?. print "\n" print "#######################################" print "# Send the results to a new history?: #" print "#######################################" print "\n" rchoice = str(raw_input("[K]eep results in the current history, or send them to [N]ew history? : K/[N]: ")) histstring = ''; if (re.search("[kK]",rchoice)): histstring = "hist_id=" + hids[hchoice] print "Keeping results in selected history. " else: print " => Sending results to a new history/histories (depending on batch settings)."; # 4. SWITCH Single Vs BATCH RUN. print "\n" print "###################################" print "# RUN MULTIPLE WORKFLOW INSTANCES #" print "###################################" print "\n" print "Select the run mode: \n(M)ultiple instances of the workflow on different data from the source history? \n(S)ingle instance, based on a specific selection of data?\n" bchoice = str(raw_input("Your Choice:")) # first: batch if (re.search("[mM]",bchoice)): # scan selected history to get list of datafiles. datalist = fetch_json_page(galaxyurl + 'histories/'+hids[hchoice]+'/contents?key='+apikey+runas) datasets = {} revdatasets = {} idx = 0 for i in range(len(datalist)): idx += 1 datasets[datalist[i]['id']] = datalist[i]['name'] revdatasets[datalist[i]['name']] = datalist[i]['id'] # scan selected workflow for input datasets wfinputs = fetch_json_page(galaxyurl + 'workflows/'+wfids[wfchoice]+'?key='+apikey) print "Specify the inputs sources : \n Static from [H]istory,\n Static from [L]ibrary,\n [B]atched (different per workflow run) "; batched = {} sdata = {} sdata['workflow_id'] = wfids[wfchoice] sdata['ds_map'] = {} idx = 0 for iid in wfinputs['inputs']: idx += 1 print "\n###################"; print "#INPUT DATA FIELD #"; print "###################"; isel = raw_input(" " + wfinputs['inputs'][iid]['label'] + " : H/L/B : ") ## batch => put aside if (re.search("[bB]",isel)): batched[iid] = wfinputs['inputs'][iid]['label'] continue ## history : present data and pick item if (re.search("[hH]",isel)): print " Select a datafile for this input field:" subidx = 0 subdata = {} for did in sorted(revdatasets.iterkeys()): subidx += 1 print " "+str(subidx)+": "+did subdata[str(subidx)] = revdatasets[did] subsel = str(raw_input("Your Choice:")) if not (subsel in subdata): print " => Invalid selection." sys.exit(2) ## store in ds_map sdata['ds_map'][int(iid)] = {'src':'hda', 'id':subdata[subsel]} continue if (re.search("[lL]",isel)): libsel = getLibraryItem() sdata['ds_map'][int(iid)] = {'src':'ld','id':libsel} continue ## Currently only two batched files are supported (R1_R2 style) if not (len(batched) == 2): print "Currently only batching for paired end fastq files is supported, sorry for that." sys.exit(2) ## select batch dataset order. print "Datasets in the provided history will be scanned for _R1_ / _R2_ in the names, indicating paired end reads." print "Specify which of the following input data fields corresponds to _R1_, the other will be used for _R2_"; subidx = 0 batchdata = {} for i in batched: subidx += 1 print " "+str(subidx)+": "+batched[i] batchdata[str(subidx)] = i ## i = step_id bfirst = str(raw_input(" Your Choice (1/2):")) if (bfirst == "1"): bsecond = "2" elif (bfirst == "2"): bsecond = "1" else: print "Invalid selection" sys.exit(2) ## Now loop the available datasets. for ds in revdatasets: dsid = revdatasets[ds] pds = ''; pdsid = ''; if re.search("_R1_",ds): pds = ds.replace('_R1_','_R2_') if pds in revdatasets: pdsid = revdatasets[pds] else: continue else: continue ## preparing to submit the pair. print "\nStarting workflow for:" print " - "+ds print " - "+pds ## set new history name if needed. if (histstring == ""): tohist = ds + " - " + pds else: tohist = histstring submissiondata = sdata submissiondata['ds_map'][int(batchdata[bfirst])] = {'src':'hda', 'id':dsid} submissiondata['ds_map'][int(batchdata[bsecond])] = {'src':'hda','id':pdsid} submissiondata['history'] = tohist if (runas != ""): submissiondata['run_as'] = uids[uchoice] ## send post to galaxy. req = urllib2.Request(galaxyurl+"workflows?key="+apikey,headers = { 'Content-Type': 'application/json' }, data = json.dumps(submissiondata)) response = urllib2.urlopen(req) print " => Workflow submitted. " ## second: single selection run. else: # scan selected history to get list of datafiles. datalist = fetch_json_page(galaxyurl + 'histories/'+hids[hchoice]+'/contents?key='+apikey+runas) datasets = {} revdatasets = {} idx = 0 for i in range(len(datalist)): idx += 1 datasets[datalist[i]['id']] = datalist[i]['name'] revdatasets[datalist[i]['name']] = datalist[i]['id'] # scan selected workflow for input datasets wfinputs = fetch_json_page(galaxyurl + 'workflows/'+wfids[wfchoice]+'?key='+apikey) print "Specify the inputs sources : \n Static from [H]istory,\n Static from [L]ibrary.\n "; sdata = {} sdata['workflow_id'] = wfids[wfchoice] sdata['ds_map'] = {} idx = 0 for iid in wfinputs['inputs']: idx += 1 print "\n###################"; print "#INPUT DATA FIELD #"; print "###################"; isel = raw_input(" " + wfinputs['inputs'][iid]['label'] + " : H/L : ") ## history : present data and pick item if (re.search("[hH]",isel)): print " Select a datafile for this input field:" subidx = 0 subdata = {} for did in sorted(revdatasets.iterkeys()): subidx += 1 print " "+str(subidx)+": "+did subdata[str(subidx)] = revdatasets[did] subsel = str(raw_input("Your Choice:")) if not (subsel in subdata): print " => Invalid selection." sys.exit(2) ## store in ds_map sdata['ds_map'][int(iid)] = {'src':'hda', 'id':subdata[subsel]} continue ## Library : select library and dataset if (re.search("[lL]",isel)): libsel = getLibraryItem() sdata['ds_map'][int(iid)] = {'src':'ld','id':libsel} continue ## set new history name if needed. if (histstring == ""): tohist = str(raw_input("Provide a name for the target history:")) if (tohist == ""): tohist = 'New_History_From_API'; else: tohist = histstring submissiondata = sdata submissiondata['history'] = tohist if (runas != ""): submissiondata['run_as'] = uids[uchoice] ## send post to galaxy. req = urllib2.Request(galaxyurl+"workflows?key="+apikey,headers = { 'Content-Type': 'application/json' }, data = json.dumps(submissiondata)) response = urllib2.urlopen(req) print " => Workflow submitted. " def fetch_json_page(url): try: data = urllib2.urlopen(url) j = json.load(data) except: print('Fetching api repsonse failed for following url:') print(url) sys.exit(2) ## return data return j def getLibraryItem(): ## get libraries print "I'm here" libs = fetch_json_page(galaxyurl + 'libraries?key='+apikey+runas) print " Select a library:" idx = 0 libids = {} for i in range(len(libs)): idx += 1 print " " + str(idx) + ": " + libs[i]['name'] libids[str(idx)] = libs[i]['id'] lchoice = str(raw_input(" Your Choice:")) if not (lchoice in libids): print " => Invalid selection." sys.exit(2) ## selet item from library libcontents = fetch_json_page(galaxyurl + 'libraries/'+libids[lchoice]+'/contents?key='+apikey+runas) print " Select a dataset:" idx = 0 libdataids = {} for i in range(len(libcontents)): idx += 1 print " "+str(idx)+": "+libcontents[i]['name'] libdataids[str(idx)] = libcontents[i]['id'] lchoice = str(raw_input(" Your Choice:")) if not (lchoice in libdataids): print " => Invalid selection." sys.exit(2) ## return the library item id return libdataids[lchoice] if __name__ == "__main__": main()
7. Benefits and limitations
The main benefits of this approach are:
- Static parts of the workflow input are selected once (eg dbSNP for GATK recalibration)
- Static datasets can be used directly from libraries
- Paired end data are coupled and detected automatically. Just make sure your files follow (standard?) R1-R2 naming.
- No need for history sharing afterwards. The user using the api has all the data and workflows in his history, but can send results to another user without explicit permission settings !
Some of the current limitations are:
- The api does not allow the specification of tool parameters at runtime. This means that ALL settings need to be prefilled in the workflow editor.
- no, just one actually... :-)
I hope this clarifies the API-usage somewhat for anybody interested !
api, Batch, galaxy, NGS, Paired-End, Perl, python
Comments
Very useful scripts
One of my gripes about the API is its output format. All the config files are in XML so why not the API output?
Thanks,
Simon
Comments
Loading Comments