+ added license text
[qpalma.git] / tests / test_sequence_utils.py
index c2a3163..00e8870 100644 (file)
 # -*- coding: utf-8 -*- 
 
 import pdb
+import unittest
 import numpy
-import qpalma.sequence_utils
 
-   #flat_files = '/fml/ag-raetsch/home/fabio/svn/projects/QPalma/tests/test_data'
+from qpalma.sequence_utils import SeqSpliceInfo,DataAccessWrapper,reverse_complement
+from qpalma.Lookup import LookupTable
 
-   #begin =   0
-   #end   =  60
-   #dna,acc,don = qpalma.sequence_utils.get_seq_and_scores(1,'+',begin,end,flat_files,True)
-   #dna = qpalma.sequence_utils.get_seq_and_scores(1,'+',begin,end,flat_files,True)
-   #print dna
+class TestSequenceUtils(unittest.TestCase):
 
-def check_positions(dna,acc,don,offset=0):
-   first_gt_tuple_pos = [p for p,e in enumerate(dna) if p>0 and p<len(dna)-1 and e=='g' and (dna[p+1]=='t' or dna[p+1]=='c')][:offset]
-   first_ag_tuple_pos = [p for p,e in enumerate(dna) if p>1 and p<len(dna) and dna[p-1]=='a' and dna[p]=='g'][:offset]
 
-   first_acc_scores = [p for p,e in enumerate(acc) if e != -numpy.inf][:offset]
-   first_don_scores = [p for p,e in enumerate(don) if e != -numpy.inf][:offset]
+   def setUp(self):
+      self.strands = ['+','']
 
-   last_gt_tuple_pos = [p for p,e in enumerate(dna) if p>0 and p<len(dna)-1 and e=='g' and (dna[p+1]=='t' or dna[p+1]=='c')][-offset:]
-   last_ag_tuple_pos = [p for p,e in enumerate(dna) if p>1 and p<len(dna) and dna[p-1]=='a' and dna[p]=='g'][-offset:]
+      g_dir    = '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome/'
+      acc_dir  = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/acc'
+      don_dir  = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/don'
 
-   last_acc_scores = [p for p,e in enumerate(acc) if e != -numpy.inf][-offset:]
-   last_don_scores = [p for p,e in enumerate(don) if e != -numpy.inf][-offset:]
+      g_fmt = 'chr%d.dna.flat'
+      s_fmt = 'contig_%d%s'
 
-   assert first_gt_tuple_pos == first_don_scores
-   assert first_ag_tuple_pos == first_acc_scores
+      num_chromo = 6
 
-   assert last_gt_tuple_pos == last_don_scores
-   assert last_ag_tuple_pos == last_acc_scores
+      accessWrapper = DataAccessWrapper(g_dir,acc_dir,don_dir,g_fmt,s_fmt)
+      self.seqInfo = SeqSpliceInfo(accessWrapper,range(1,num_chromo))
 
+      print self.seqInfo.chromo_sizes
 
-def run():
-   flat_files =  '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome/'
-   chromo   = 1
-   strand   = '+'
-   begin    = 0
-   end      = 100000000
-   dna,acc,don = qpalma.sequence_utils.get_seq_and_scores(chromo,strand,begin,end,flat_files)
+      #self.lt1 = LookupTable(g_dir,acc_dir,don_dir,g_fmt,s_fmt,range(1,2))
 
-   check_positions(dna,acc,don,100)
 
+   def testThalianaDataExamples(self):
+      seq = 'TGAAAACAGGAACGGATTGGAGAAAGGCGTCTCGTCAT'.lower()
+      chromo = 3
+      strand = '+'
+      pos = 19246391
 
-def run2():
-   flat_files =  '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome/'
-   chromo   = 8
-   strand   = '-'
-   begin    = 0
-   end      = 100000000
-   dna,acc,don = qpalma.sequence_utils.get_seq_and_scores(chromo,strand,begin,end,flat_files)
+      dna = self.seqInfo.get_seq_and_scores(chromo,strand,pos,pos+38,True)
+      self.assertEqual(seq,dna)
+      
+      seq = 'AGGCAATGAAACTGATGCATTGGACTTGACGGGTGTTG'.lower()
+      chromo = 5
+      strand = '+'
+      pos = 15394760
+      dna = self.seqInfo.get_seq_and_scores(chromo,strand,pos,pos+38,True)
+      self.assertEqual(seq,dna)
 
-   check_positions(dna,acc,don,100)
+      seq = 'TCTTGGTGGAGGAGCTAACACCGTAGCTGACGGTTACA'.lower()
+      chromo = 4
+      strand = '+'
+      pos = 16709206
+      dna = self.seqInfo.get_seq_and_scores(chromo,strand,pos,pos+38,True)
+      self.assertEqual(seq,dna)
 
-   pdb.set_trace()
+      seq = 'TTGGAAGACAGAGTCAACCATACCCTTGCCTCTGGTGA'.lower()
+      chromo = 2
+      strand = '+'
+      pos = 16579649
+      dna = self.seqInfo.get_seq_and_scores(chromo,strand,pos,pos+38,True)
+      self.assertEqual(seq,dna)
 
+      seq = 'CTGGCCAAAAGCTCAGGGAAGACGCAGCCTAGGGCTCC'.lower()
+      seq = reverse_complement(seq)
+      chromo = 1
+      strand = '-'
+      pos = 10475515
+      dna = self.seqInfo.get_seq_and_scores(chromo,strand,pos,pos+38,True)
+      self.assertEqual(seq,dna)
 
-def run3():
-   flat_files =  '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome/'
-   
-   begin = 0
-   end   = 100000000
-   full_pos_dna = qpalma.sequence_utils.get_seq_and_scores(1,'+',begin,end,flat_files,True)
-
-   begin = 0
-   end   = 500
-   dna = qpalma.sequence_utils.get_seq_and_scores(8,'+',begin,end,flat_files,True)
-
-   pos_dna = full_pos_dna[-500:]
-   r_dna = qpalma.sequence_utils.reverse_complement(dna)
-   pos_dna == r_dna
-   pdb.set_trace()
+      seq = 'TTTTTCCCTTCTAGAAGACCGTAAAGGTAAACTTCTAA'.lower()
+      seq = reverse_complement(seq)
+      chromo = 3
+      strand = '-'
+      pos = 17143420
+      dna = self.seqInfo.get_seq_and_scores(chromo,strand,pos,pos+38,True)
+      self.assertEqual(seq,dna)
+
+      seq = 'CACGGTGCAGATGAAGAACTGAGATCCGTTCGTGTTTG'.lower()
+      seq = reverse_complement(seq)
+      chromo = 4
+      strand = '-'
+      pos = 18083761
+      dna = self.seqInfo.get_seq_and_scores(chromo,strand,pos,pos+38,True)
+      self.assertEqual(seq,dna)
+
+      window = 113
+      seq = 'CACGGTGCAGATGAAGAACTGAGATCCGTTCGTGTTTG'.lower()
+      seq = reverse_complement(seq)
+      chromo = 4
+      strand = '-'
+      pos = 18083761-window
+      dna,acc,don = self.seqInfo.get_seq_and_scores(chromo,strand,pos,pos+38+2*window,False)
+      self.assertEqual(seq,dna[window:-window])
+
+      #print dna
+      #print "".join(map(lambda x: ['_','x'][x!=-numpy.inf],acc))
+      #print "".join(map(lambda x: ['_','x'][x!=-numpy.inf],don))
+
+
+   def _testThalianaDataGeneric(self):
+
+      dna,acc,don = self.seqInfo.get_seq_and_scores(1,'+',1000,1369)
+      dna_,acc_,don_ = self.lt1.get_seq_and_scores(1,'+',1000,1369,'')
+
+      self.assertEqual(dna,dna_)
+      self.assertEqual(acc,acc_)
+      self.assertEqual(don,don_)
+
+      dna,acc,don = self.seqInfo.get_seq_and_scores(1,'-',1000,1369)
+      dna_,acc_,don_ = self.lt1.get_seq_and_scores(1,'-',1000,1369,'')
+
+      self.assertEqual(dna,dna_)
+      self.assertEqual(acc,acc_)
+      self.assertEqual(don,don_)
+
+      #dna,acc,don = seqInfo.get_seq_and_scores(2,'+',1000,1369)
+      #dna,acc,don = seqInfo.get_seq_and_scores(3,'+',1000,1369)
+      #dna,acc,don = seqInfo.get_seq_and_scores(4,'+',1000,1369)
+      #dna,acc,don = seqInfo.get_seq_and_scores(5,'+',1000,1369)
+
+      #dna,acc,don = seqInfo.get_seq_and_scores(1,'-',1000,1369)
+      #dna_,acc_,don_ = lt1.get_seq_and_scores(1,'-',1000,1369,'')
+
+      #self.assertEqual(dna,dna_)
+      #self.assertEqual(acc,acc_)
+      #self.assertEqual(don,don_)
+
+
+      #dna,acc,don = seqInfo.get_seq_and_scores(2,'-',1000,1369)
+      #dna,acc,don = seqInfo.get_seq_and_scores(3,'-',1000,1369)
+      #dna,acc,don = seqInfo.get_seq_and_scores(4,'-',1000,1369)
+      #dna,acc,don = seqInfo.get_seq_and_scores(5,'-',1000,1369)
+
+
+   def _testLyrataData(self):
+      g_dir    = '/fml/ag-raetsch/home/fabio/tmp/Lyrata/contigs'
+      acc_dir  = '/fml/ag-raetsch/home/fabio/tmp/Lyrata/splice_scores/acc'
+      don_dir  = '/fml/ag-raetsch/home/fabio/tmp/Lyrata/splice_scores/don'
+
+      g_fmt = 'contig%d.dna.flat'
+      s_fmt = 'contig_%d%s'
+
+      num_chromo = 1099
    
-def run4():
-   flat_files =  '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome/'
-   chromo   = 8
-   strand   = '-'
-   begin    = 200
-   end      = 1200
-   dna,acc,don = qpalma.sequence_utils.get_seq_and_scores(chromo,strand,begin,end,flat_files)
+      accessWrapper = DataAccessWrapper(g_dir,acc_dir,don_dir,g_fmt,s_fmt)
+      seqInfo = SeqSpliceInfo(accessWrapper,range(1,num_chromo))
+
+      dna,acc,don = seqInfo.get_seq_and_scores(1,'+',1,1369)
+      dna,acc,don = seqInfo.get_seq_and_scores(2,'+',1,1369)
+      dna,acc,don = seqInfo.get_seq_and_scores(3,'+',1,1369)
+      dna,acc,don = seqInfo.get_seq_and_scores(4,'+',1,1369)
+      dna,acc,don = seqInfo.get_seq_and_scores(5,'+',1,1369)
+      dna,acc,don = seqInfo.get_seq_and_scores(1,'-',1,1369)
+      dna,acc,don = seqInfo.get_seq_and_scores(2,'-',1,1369)
+      dna,acc,don = seqInfo.get_seq_and_scores(3,'-',1,1369)
+      dna,acc,don = seqInfo.get_seq_and_scores(4,'-',1,1369)
+      dna,acc,don = seqInfo.get_seq_and_scores(5,'-',1,1369)
+      dna,acc,don = seqInfo.get_seq_and_scores(45,'-',1,1369)
+      dna,acc,don = seqInfo.get_seq_and_scores(45,'+',1,1369)
+
+      print 'Finished'
+      #num_tests = 10
+      #for chromo in range(1,6):
+      #   for strand in ['+','-']:
+      #      for test_idx in range(num_tests):
+      #         if strand == '-':
+      #            size = seqInfo.chromo_sizes[chromo+7]
+      #         else:
+      #            size = seqInfo.chromo_sizes[chromo]
+      #         begin = random.randint(0,size)
+      #         end   = random.randint(b,size)
+      #         dna,acc,don = seqInfo.get_seq_and_scores(chromo,strand,begin,end)
+
+
+   def tearDown(self):
+      pass
+
+
+class TestLookupTable(unittest.TestCase):
+
+   def setUp(self):
+      pass
+
+
+   def testTableThalianaData(self):
+      g_dir    = '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome/'
+      acc_dir  = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/acc'
+      don_dir  = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/don'
+
+      g_fmt = 'chr%d.dna.flat'
+      s_fmt = 'contig_%d%s'
+
+      num_chromo = 2
+
+      lt1 = LookupTable(g_dir,acc_dir,don_dir,g_fmt,s_fmt,range(1,num_chromo))
+
+      accessWrapper = DataAccessWrapper(g_dir,acc_dir,don_dir,g_fmt,s_fmt)
+      seqInfo = SeqSpliceInfo(accessWrapper,range(1,num_chromo))
+
+      seq = 'CTGGCCAAAAGCTCAGGGAAGACGCAGCCTAGGGCTCC'.lower()
+      seq = reverse_complement(seq)
+      chromo = 1
+      strand = '-'
+      pos = 10475515
+      dna = seqInfo.get_seq_and_scores(chromo,strand,pos,pos+38,True)
+      self.assertEqual(seq,dna)
+
+      dna = lt1.get_seq_and_scores(chromo,strand,pos,pos+38,True)
+      pdb.set_trace()
+      self.assertEqual(seq,dna)
+
+      dna,acc,don = seqInfo.get_seq_and_scores(1,'+',1,1369)
+      dna_,acc_,don_ = lt1.get_seq_and_scores(1,'+',1,1369)
+
+
+
+   def _testTableLyrataData(self):
+      g_dir    = '/fml/ag-raetsch/home/fabio/tmp/Lyrata/contigs'
+      acc_dir  = '/fml/ag-raetsch/home/fabio/tmp/Lyrata/splice_scores/acc'
+      don_dir  = '/fml/ag-raetsch/home/fabio/tmp/Lyrata/splice_scores/don'
+
+      g_fmt = 'contig%d.dna.flat'
+      s_fmt = 'contig_%d%s'
+
+      lt1 = LookupTable(g_dir,acc_dir,don_dir,g_fmt,s_fmt,range(0,1099))
 
-   check_positions(dna,acc,don)
 
-   pdb.set_trace()
+   def tearDown(self):
+      pass
 
 
-def check_negative_strand(b,e):
-   flat_files =  '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome/'
-   chromo   = 8
-   strand   = '-'
-   begin    = b
-   end      = e
-   dna,acc,don = qpalma.sequence_utils.get_seq_and_scores(chromo,strand,begin,end,flat_files)
+def check_wrapper():
+   g_dir    = '/fml/ag-raetsch/home/fabio/tmp/Lyrata/contigs'
+   acc_dir  = '/fml/ag-raetsch/home/fabio/tmp/Lyrata/splice_scores/acc'
+   don_dir  = '/fml/ag-raetsch/home/fabio/tmp/Lyrata/splice_scores/don'
 
-   pdb.set_trace()
+   g_fmt = 'contig%d.dna.flat'
+   s_fmt = 'contig_%d%s.Conf_cum'
 
-   check_positions(dna,acc,don)
+   test = DataAccessWrapper(g_dir,acc_dir,don_dir,g_fmt,s_fmt)
 
-   print 'fine'
+   for idx in range(1,100):
+      pos = test.get_genomic_fragment_fn(idx,'+')
+      neg = test.get_genomic_fragment_fn(idx,'-')
+      print pos,neg
+      assert os.path.exists(pos)
+      assert os.path.exists(neg)
 
-def check_positive_strand(b,e):
-   flat_files =  '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome/'
-   chromo   = 1
-   strand   = '+'
-   begin    = b
-   end      = e
-   dna,acc,don = qpalma.sequence_utils.get_seq_and_scores(chromo,strand,begin,end,flat_files)
+      acc_fn,don_fn = test.get_splice_site_scores_fn(idx,'+')
+      print acc_fn,don_fn
+      assert os.path.exists(acc_fn)
+      assert os.path.exists(don_fn)
+      acc_fn,don_fn = test.get_splice_site_scores_fn(idx,'-')
+      print acc_fn,don_fn
+      assert os.path.exists(acc_fn)
+      assert os.path.exists(don_fn)
 
-   check_positions(dna,acc,don)
-   print 'fine'
 
 if __name__ == '__main__':
-   #run()
-   #run2()
-   #run3(
-   run4()
+   #suite = unittest.TestLoader().loadTestsFromTestCase(TestSequenceUtils)
+   suite = unittest.TestLoader().loadTestsFromTestCase(TestLookupTable)
+   unittest.TextTestRunner(verbosity=2).run(suite)