"""Lookup functions for the variant browser."""
import logging
import re
import db
from . import error
SEARCH_LIMIT = 10000
REGION_REGEX = re.compile(r'^\s*(\d+|X|Y|M|MT)\s*([-:]?)\s*(\d*)-?([\dACTG]*)-?([ACTG]*)')
[docs]def autocomplete(dataset:str, query:str, ds_version:str=None):
"""
Provide autocomplete suggestions based on the query.
Args:
dataset (str): short name of dataset
query (str): the query to compare to the available gene names
ds_version (str): the dataset version
Returns:
list: A list of genes names whose beginning matches the query
"""
try:
ref_set = db.get_dataset_version(dataset, ds_version).reference_set
except AttributeError:
raise error.NotFoundError(f'Reference set not found for dataset {dataset}.')
query = (db.Gene.select(db.Gene.name)
.where(((db.Gene.name.startswith(query)) &
(db.Gene.reference_set == ref_set))))
gene_names = [str(gene.name) for gene in query]
return gene_names
[docs]def get_awesomebar_result(dataset:str, query:str, ds_version:str=None):
"""
Parse the search input.
Datatype is one of:
* `gene`
* `transcript`
* `variant`
* `dbsnp_variant_set`
* `region`
Identifier is one of:
* ensembl ID for gene
* variant ID string for variant (eg. 1-1000-A-T)
* region ID string for region (eg. 1-1000-2000)
Follow these steps:
* if query is an ensembl ID, return it
* if a gene symbol, return that gene's ensembl ID
* if an RSID, return that variant's string
Args:
dataset (str): short name of dataset
query (str): the search query
ds_version (str): the dataset version
Returns:
tuple: (datatype, identifier)
"""
query = query.strip()
# Parse Variant types
try:
variant = get_variants_by_rsid(dataset, query.lower(), ds_version=ds_version)
except (error.NotFoundError, error.ParsingError):
pass
else:
if len(variant) == 1:
return ('variant', variant[0]['variant_id'])
return ('dbsnp_variant_set', variant[0]['rsid'])
# Gene
try:
gene = get_gene_by_name(dataset, query)
except error.NotFoundError:
pass
else:
return 'gene', gene['gene_id']
# Capital letters for all other queries
query = query.upper()
try:
gene = get_gene_by_name(dataset, query)
except error.NotFoundError:
pass
else:
return 'gene', gene['gene_id']
# Ensembl formatted queries
if query.startswith('ENS'):
# Gene
try:
gene = get_gene(dataset, query)
except error.NotFoundError:
pass
else:
return 'gene', gene['gene_id']
# Transcript
try:
transcript = get_transcript(dataset, query)
except error.NotFoundError:
pass
else:
return 'transcript', transcript['transcript_id']
# Region and variant queries
query = query[3:] if query.startswith('CHR') else query
match = REGION_REGEX.match(query)
if match:
target = match.group(0)
target_type = 'region'
if match.group(2) == ":":
target = target.replace(":","-")
if match.group(5) and set(match.group(4)).issubset(set("ACGT")):
target_type = 'variant'
try:
get_raw_variant(dataset, match.group(3), match.group(1), match.group(4), match.group(5), ds_version)
except error.NotFoundError as err:
target_type = 'not_found'
return target_type, target
return 'not_found', query
[docs]def get_coverage_for_bases(dataset:str, chrom:str, start_pos:int, end_pos:int=None, ds_version:str=None):
"""
Get the coverage for the list of bases given by start_pos->end_pos, inclusive.
Args:
dataset (str): short name for the dataset
chrom (str): chromosome
start_pos (int): first position of interest
end_pos (int): last position of interest; if None it will be set to start_pos
ds_version (str): version of the dataset
Returns:
list: coverage dicts for the region of interest. None if failed
"""
dataset_version = db.get_dataset_version(dataset, ds_version)
if not dataset_version:
raise error.NotFoundError(f'Unable to find the dataset version in the database')
if end_pos is None:
end_pos = start_pos
coverage = [row for row in (db.Coverage.select()
.where((db.Coverage.pos >= start_pos) &
(db.Coverage.pos <= end_pos) &
(db.Coverage.chrom == chrom) &
(db.Coverage.dataset_version == dataset_version.id))
.dicts())]
if not coverage:
raise error.NotFoundError('No coverage found for the region')
return coverage
[docs]def get_coverage_for_transcript(dataset:str, chrom:str, start_pos:int, end_pos:int=None, ds_version:str=None):
"""
Get the coverage for the list of bases given by start_pos->end_pos, inclusive.
Args:
dataset (str): short name for the dataset
chrom (str): chromosome
start_pos (int): first position of interest
end_pos (int): last position of interest; if None it will be set to start_pos
ds_version (str): version of the dataset
Returns:
list: coverage dicts for the region of interest
"""
# Is this function still relevant with postgres?
# Only entries with reported cov are in database
coverage_array = get_coverage_for_bases(dataset, chrom, start_pos, end_pos, ds_version)
# only return coverages that have coverage (if that makes any sense?)
# return coverage_array
if not coverage_array:
return []
covered = [c for c in coverage_array if c['mean']]
return covered
[docs]def get_exons_in_transcript(dataset:str, transcript_id:str, ds_version=None):
"""
Retrieve exons associated with the given transcript id.
Args:
dataset (str): short name of the dataset
transcript_id (str): the id of the transcript
ds_version (str): dataset version
Returns:
list: dicts with values for each exon sorted by start position
"""
try:
ref_set = db.get_dataset_version(dataset, ds_version).reference_set
except AttributeError:
logging.info('get_exons_in_transcript({}, {}): unable to find dataset dbid'.format(dataset, transcript_id))
raise error.NotFoundError(f'Reference set not found for dataset {dataset}.')
try:
transcript = (db.Transcript
.select()
.join(db.Gene)
.where((db.Transcript.transcript_id == transcript_id) &
(db.Gene.reference_set == ref_set))
.get())
except db.Transcript.DoesNotExist:
logging.info('get_exons_in_transcript({}, {}): unable to retrieve transcript'.format(dataset, transcript_id))
raise error.NotFoundError(f'Transcript {transcript_id} not found in reference data.')
wanted_types = ('CDS', 'UTR', 'exon')
features = sorted(list(db.Feature.select().where((db.Feature.transcript == transcript) &
(db.Feature.feature_type in wanted_types)).dicts()),
key=lambda k: k['start'])
if not features:
raise error.NotFoundError(f'No features found for transcript {transcript_id} in reference data.')
return features
[docs]def get_gene(dataset:str, gene_id:str, ds_version:str=None):
"""
Retrieve gene by gene id.
Args:
dataset (str): short name of the dataset
gene_id (str): the id of the gene
ds_version (str): dataset version
Returns:
dict: values for the gene; None if not found
"""
try:
ref_set = db.get_dataset_version(dataset, ds_version).reference_set
except AttributeError:
raise error.NotFoundError(f'Reference set not found for dataset {dataset}.')
try:
return db.Gene.select().where((db.Gene.gene_id == gene_id) &
(db.Gene.reference_set == ref_set)).dicts().get()
except db.Gene.DoesNotExist:
raise error.NotFoundError(f'Gene {gene_id} not found in reference data.')
[docs]def get_gene_by_dbid(gene_dbid:str):
"""
Retrieve gene by gene database id.
Args:
gene_dbid (str): the database id of the gene
Returns:
dict: values for the gene; empty if not found
"""
try:
return db.Gene.select().where(db.Gene.id == gene_dbid).dicts().get()
except db.Gene.DoesNotExist:
return {}
except ValueError:
return {}
[docs]def get_gene_by_name(dataset:str, gene_name:str, ds_version=None):
"""
Retrieve gene by gene_name.
Args:
dataset (str): short name of the dataset
gene_name (str): the id of the gene
ds_version (str): dataset version
Returns:
dict: values for the gene; empty if not found
"""
try:
ref_set = db.get_dataset_version(dataset, ds_version).reference_set
except AttributeError:
raise error.NotFoundError(f'Reference set not found for dataset {dataset}.')
try:
return (db.Gene.select()
.where((db.Gene.reference_set == ref_set) &
(db.Gene.name==gene_name))
.dicts()
.get())
except db.Gene.DoesNotExist:
try:
return (db.GeneOtherNames.select(db.Gene)
.join(db.Gene)
.where((db.GeneOtherNames.name == gene_name) &
(db.Gene.reference_set == ref_set))
.dicts()
.get())
except db.GeneOtherNames.DoesNotExist:
logging.info('get_gene_by_name({}, {}): unable to retrieve gene'.format(dataset, gene_name))
raise error.NotFoundError(f'Gene {gene_name} not found in reference data')
[docs]def get_genes_in_region(dataset:str, chrom:str, start_pos:int, stop_pos:int, ds_version:str=None):
"""
Retrieve genes located within a region.
Args:
dataset (str): short name of the dataset
chrom (str): chromosome name
start_pos (int): start of region
stop_pos (int): end of region
ds_version (str): dataset version
Returns:
dict: values for the gene; empty if not found
"""
try:
ref_set = db.get_dataset_version(dataset, ds_version).reference_set
except AttributeError:
raise error.NotFoundError(f'Reference set not found for dataset {dataset}.')
genes = db.Gene.select().where((db.Gene.reference_set == ref_set) &
(db.Gene.start <= stop_pos) &
(db.Gene.stop >= start_pos) &
(db.Gene.chrom == chrom)).dicts()
return genes
[docs]def get_raw_variant(dataset:str, pos:int, chrom:str, ref:str, alt:str, ds_version:str=None):
"""
Retrieve variant by position and change.
Args:
dataset (str): short name of the reference set
pos (int): position of the variant
chrom (str): name of the chromosome
ref (str): reference sequence
alt (str): variant sequence
ds_version (str): dataset version
Returns:
dict: values for the variant; None if not found
"""
dataset_version = db.get_dataset_version(dataset, ds_version)
if not dataset_version:
raise error.NotFoundError(f'Unable to find the dataset version in the database')
try:
variant = (db.Variant
.select()
.where((db.Variant.pos == pos) &
(db.Variant.ref == ref) &
(db.Variant.alt == alt) &
(db.Variant.chrom == chrom) &
(db.Variant.dataset_version == dataset_version))
.dicts()
.get())
variant['genes'] = [gene['gene_id'] for gene in
db.VariantGenes.select(db.Gene.gene_id)
.join(db.Gene)
.where(db.VariantGenes.variant == variant['id'])
.dicts()]
variant['transcripts'] = [transcript['transcript_id'] for transcript in
db.VariantTranscripts.select(db.Transcript.transcript_id)
.join(db.Transcript)
.where(db.VariantTranscripts.variant == variant['id'])
.dicts()]
return variant
except db.Variant.DoesNotExist:
logging.info('get_raw_variant({}, {}, {}, {}, {}, {}): unable to retrieve variant'
.format(dataset, pos, chrom, ref, alt, dataset_version.id))
raise error.NotFoundError(f'Variant {chrom}-{pos}-{ref}-{alt} not found')
[docs]def get_transcript(dataset:str, transcript_id:str, ds_version:str=None):
"""
Retrieve transcript by transcript id.
Also includes exons as ['exons']
Args:
dataset (str): short name of the dataset
transcript_id (str): the id of the transcript
ds_version (str): dataset version
Returns:
dict: values for the transcript, including exons; None if not found
"""
try:
ref_set = db.get_dataset_version(dataset, ds_version).reference_set
except AttributeError:
raise error.NotFoundError(f'Reference set not found for dataset {dataset}.')
try:
transcript = (db.Transcript
.select(db.Transcript, db.Gene.gene_id)
.join(db.Gene)
.where((db.Transcript.transcript_id == transcript_id) &
(db.Gene.reference_set == ref_set))
.dicts()
.get())
transcript['exons'] = get_exons_in_transcript(dataset, transcript_id)
return transcript
except db.Transcript.DoesNotExist:
logging.info('get_transcript({}, {}): unable to retrieve transcript'.format(dataset, transcript_id))
raise error.NotFoundError(f'Transcript {transcript_id} not found in reference data')
[docs]def get_transcripts_in_gene(dataset:str, gene_id:str, ds_version:str=None):
"""
Get the transcripts associated with a gene.
Args:
dataset (str): short name of the reference set
gene_id (str): id of the gene
ds_version (str): dataset version
Returns:
list: transcripts (dict) associated with the gene; empty if no hits
"""
try:
ref_set = db.get_dataset_version(dataset, ds_version).reference_set
except AttributeError:
logging.warning('get_transcripts_in_gene({}, {}): unable to get referenceset dbid'.format(dataset, gene_id))
raise error.NotFoundError(f'Reference set not found for dataset {dataset}.')
try:
gene = db.Gene.select().where((db.Gene.reference_set == ref_set) &
(db.Gene.gene_id == gene_id)).dicts().get()
except db.Gene.DoesNotExist:
logging.info('get_transcripts_in_gene({}, {}): unable to retrieve gene'.format(dataset, gene_id))
raise error.NotFoundError(f'Gene {gene_id} not found in reference data')
return [transcript for transcript in db.Transcript.select().where(db.Transcript.gene == gene['id']).dicts()]
[docs]def get_transcripts_in_gene_by_dbid(gene_dbid:int):
"""
Get the transcripts associated with a gene.
Args:
gene_dbid (int): database id of the gene
Returns:
list: transcripts (dict) associated with the gene; empty if no hits
"""
return [transcript for transcript in db.Transcript.select().where(db.Transcript.gene == gene_dbid).dicts()]
[docs]def get_variant(dataset:str, pos:int, chrom:str, ref:str, alt:str, ds_version:str=None):
"""
Retrieve variant by position and change.
Args:
dataset (str): short name of the dataset
pos (int): position of the variant
chrom (str): name of the chromosome
ref (str): reference sequence
alt (str): variant sequence
ds_version (str): version of the dataset
Returns:
dict: values for the variant; None if not found
"""
variant = get_raw_variant(dataset, pos, chrom, ref, alt, ds_version)
variant = get_raw_variant(dataset, pos, chrom, ref, alt, ds_version)
if variant and 'rsid' in variant and variant['rsid'] and not str(variant['rsid']).startswith('rs'):
variant['rsid'] = 'rs{}'.format(variant['rsid'])
return variant
[docs]def get_variants_by_rsid(dataset:str, rsid:str, ds_version:str=None):
"""
Retrieve variants by their associated rsid.
Args:
dataset (str): short name of dataset
rsid (str): rsid of the variant (starting with rs)
ds_version (str): version of the dataset
Returns:
list: variants as dict; no hits returns None
"""
dataset_version = db.get_dataset_version(dataset, ds_version)
if not dataset_version:
raise error.NotFoundError(f'Unable to find the dataset version in the database')
if not rsid.startswith('rs'):
logging.error('get_variants_by_rsid({}, {}): rsid not starting with rs'.format(dataset, rsid))
raise error.ParsingError('rsid not starting with rs')
try:
rsid = int(rsid.lstrip('rs'))
except ValueError:
logging.error('get_variants_by_rsid({}, {}): not an integer after rs'.format(dataset, rsid))
raise error.ParsingError('Not an integer after rs')
variants = (db.Variant
.select()
.where((db.Variant.rsid == rsid) &
(db.Variant.dataset_version == dataset_version))
.dicts())
if not variants:
raise error.NotFoundError('No variants found for rsid {rsid}')
return variants
[docs]def get_variants_in_gene(dataset:str, gene_id:str, ds_version:str=None):
"""
Retrieve variants present inside a gene.
Args:
dataset (str): short name of the dataset
gene_id (str): id of the gene
ds_version (str): version of the dataset
Returns:
list: values for the variants
"""
dataset_version = db.get_dataset_version(dataset, ds_version)
if not dataset_version:
raise error.NotFoundError(f'Unable to find the dataset version in the database')
gene = get_gene(dataset, gene_id, ds_version)
if not gene:
raise error.NotFoundError(f'Gene {gene_id} not found in reference data')
variants = [variant for variant in db.Variant.select()
.join(db.VariantGenes)
.where((db.VariantGenes.gene == gene['id']) &
(db.Variant.dataset_version == dataset_version)).dicts()]
##### remove when db is fixed
for variant in variants:
if not variant['hom_count']:
variant['hom_count'] = 0
variant['filter'] = variant['filter_string']
#####
for variant in variants:
if variant['rsid']:
variant['rsid'] = 'rs{}'.format(variant['rsid'])
return variants
[docs]def get_variants_in_region(dataset:str, chrom:str, start_pos:int, end_pos:int, ds_version:str=None):
"""
Variants that overlap a region.
Args:
dataset (str): short name of the dataset
chrom (str): name of the chromosom
start_pos (int): start of the region
end_pos (int): start of the region
ds_version (str): version of the dataset
Returns:
list: variant dicts, None if no hits
"""
dataset_version = db.get_dataset_version(dataset, ds_version)
if not dataset_version:
raise error.NotFoundError(f'Unable to find the dataset version in the database')
query = (db.Variant
.select()
.where((db.Variant.pos >= start_pos) &
(db.Variant.pos <= end_pos) &
(db.Variant.chrom == chrom) &
(db.Variant.dataset_version == dataset_version))
.dicts())
variants = [variant for variant in query]
##### remove when db is fixed
for variant in variants:
if not variant['hom_count']:
variant['hom_count'] = 0
variant['filter'] = variant['filter_string']
#####
for variant in variants:
if variant['rsid']:
variant['rsid'] = 'rs{}'.format(variant['rsid'])
return variants
[docs]def get_variants_in_transcript(dataset:str, transcript_id:str, ds_version:str=None):
"""
Retrieve variants inside a transcript.
Args:
dataset (str): short name of the dataset
transcript_id (str): id of the transcript (ENST)
ds_version (str): version of the dataset
Returns:
dict: values for the variant; None if not found
"""
dataset_version = db.get_dataset_version(dataset, ds_version)
if not dataset_version:
raise error.NotFoundError(f'Unable to find the dataset version in the database')
transcript = get_transcript(dataset, transcript_id, ds_version)
if not transcript:
raise error.NotFoundError(f'Transcript {transcript_id} not found in reference data')
variants = [variant for variant in db.Variant.select()
.join(db.VariantTranscripts)
.where((db.VariantTranscripts.transcript == transcript['id']) &
(db.Variant.dataset_version == dataset_version))
.dicts()]
##### remove when db is fixed
for variant in variants:
if not variant['hom_count']:
variant['hom_count'] = 0
variant['filter'] = variant['filter_string']
#####
for variant in variants:
variant['vep_annotations'] = [anno for anno in variant['vep_annotations'] if anno['Feature'] == transcript_id]
if variant['rsid']:
variant['rsid'] = 'rs{}'.format(variant['rsid'])
return variants