-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathhicpropairs2bedpe
executable file
·111 lines (100 loc) · 3.99 KB
/
hicpropairs2bedpe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/env python
#--coding:utf-8--
import argparse, gzip, os, re, sys
from glob import glob
from datetime import datetime
def pairs2bedpe(f_hicpro, f_out, ext=50):
with gzip.open(f_out, 'wt') as f_bedpe:
if f_hicpro.endswith('.gz'):
f_pair = gzip.open(f_hicpro)
else:
f_pair = open(f_hicpro)
for line in f_pair:
line = line.strip().split('\t')
#if the position is the middle of the read
#petA = [line[1], int(line[2])-ext, int(line[2])+ext]
#petB = [line[4], int(line[5])-ext, int(line[5])+ext]
#if the position is the 5' end of the read
if line[3] == "+":
petA = [line[1], int(line[2]), int(line[2])+ext]
else:
petA = [line[1], int(line[2])-ext, int(line[2])]
if line[6] == "+":
petB = [line[4], int(line[5]), int(line[5])+ext]
else:
petB = [line[4], int(line[5])-ext, int(line[5])]
newline = [
petA[0], petA[1], petA[2], petB[0], petB[1], petB[2], line[0],
'.', line[3], line[6]
]
f_bedpe.write("\t".join(map(str, newline)) + "\n")
f_pair.close()
def main():
start_time = datetime.now()
op = mainHelp()
if op.out is not None:
if os.path.isfile(op.out):
sys.stderr.write(
"Error: file %s exists, unable to create output folder\n" %
output)
if not os.path.isdir(op.out):
os.makedirs(op.out)
input_list = op.input
hicpro_file_list = []
for input in input_list:
if not os.path.exists(input):
sys.stderr.write("Warning: %s not exist, skipping\n" % input)
continue
if os.path.isfile(input):
hicpro_file_list.append(input)
else:
hicpro_file_list.extend(
glob(os.path.join(input, '*_allValidPairs')))
hicpro_file_list.extend(
glob(os.path.join(input, '*_allValidPairs.gz')))
hicpro_file_list.extend(
glob(os.path.join(input, '*', '*_allValidPairs')))
hicpro_file_list.extend(
glob(os.path.join(input, '*', '*_allValidPairs.gz')))
for hicpro_file in hicpro_file_list:
if op.out is not None:
bedpe_file = os.path.join(op.out, os.path.basename(hicpro_file))
else:
bedpe_file = hicpro_file
bedpe_file = re.sub(r'_allValidPairs(.gz)?$', '', bedpe_file)
bedpe_file = bedpe_file + '.bedpe.gz'
pairs2bedpe(hicpro_file, bedpe_file, ext=op.ext)
usedtime = datetime.now() - start_time
sys.stderr.write("Prcess finished. Used CPU time: %s Bye!\n" % usedtime)
def mainHelp():
description = "Convert hicpro allValidPairs file to bedpe format for cLoops"
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
dest="input",
nargs='+',
type=str,
help=
"HiC-Pro allValidPairs files, folders or folders whose subfolders contain files ended with '_allValidPairs'. Files and files inside folder could be gzipped (with additional .gz suffix)."
)
parser.add_argument(
'-o',
'--out',
dest="out",
required=False,
type=str,
help=
"Output directory. If specified all converted bedpe file will be put inside this folder rather than the same folder of respective input file. Names of files inside different directories should be unique, otherwise some output files will be overwrote."
)
parser.add_argument(
'-ext',
dest="ext",
required=False,
type=int,
default=50,
help=
"Extension from read center (HiC-Pro output the center of each read), default is 50. "
)
op = parser.parse_args()
return op
if __name__ == '__main__':
main()