最近はウェビナー等ではHailのお話をすることが多いですが、ここではGlow(https://glow.readthedocs.io)やApache SparkのネイティブなDataFrameを使ってゲノムデータ等を取り扱う方法を紹介します。 今回はDANNのデータをApache Sparkのデータフレームへ取り込んでいきます。 DANNは遺伝子変異の病原性をスコアリングしたもので、深層学習が使われています。
データの下準備¶
予め以下のデータをダウンロードしておいてあります。 103GBあります。
$ ls -sh DANN_whole_genome_SNVs.tsv.bgz
103G DANN_whole_genome_SNVs.tsv.bgz
最初の10行を見てみます。
$ zcat DANN_whole_genome_SNVs.tsv.bgz|head
1 10001 T A 0.16461391399220135
1 10001 T C 0.4396994049749739
1 10001 T G 0.38108629377072734
1 10002 A C 0.36182020272810128
1 10002 A G 0.44413258111779291
1 10002 A T 0.16812846819989813
1 10003 A C 0.36516159615040267
1 10003 A G 0.4480978029675266
1 10003 A T 0.17573150952125288
1 10004 C A 0.16559066203780862
とてもシンプルなデータですね。そしてheaderはありません。 chromosome, position, ref, alt, DANN_score という名前でheaderを作ることにします。 データはhdfsに置きました。 (今回はオンプレミスに構築してあるテストマシンにて実行。sparkはkubernetesで動作させています。)
k8sをリソースマネージャーとしたsparkを起動しておきます。
データの読み込み¶
inferSchema=Falseで読み込みます。 headerもないので、header=Falseです。 In [3]:
delta_output_path = 'hdfs://hostname:port/test/DANN-delta'
In [4]:
df = spark.read.option("delimiter","\t")\
.csv("hdfs://hostname:port/test/DANN/DANN_whole_genome_SNVs.tsv.bgz",inferSchema=False,nullValue="NA",header=False)
表示させてみます In [5]:
+---+-----+---+---+-------------------+
|_c0| _c1|_c2|_c3| _c4|
+---+-----+---+---+-------------------+
| 1|10001| T| A|0.16461391399220135|
| 1|10001| T| C| 0.4396994049749739|
| 1|10001| T| G|0.38108629377072734|
| 1|10002| A| C|0.36182020272810128|
| 1|10002| A| G|0.44413258111779291|
| 1|10002| A| T|0.16812846819989813|
| 1|10003| A| C|0.36516159615040267|
| 1|10003| A| G| 0.4480978029675266|
| 1|10003| A| T|0.17573150952125288|
| 1|10004| C| A|0.16559066203780862|
| 1|10004| C| G|0.31483562234250068|
| 1|10004| C| T|0.21451764900388209|
| 1|10005| C| A|0.16559118661232011|
| 1|10005| C| G|0.31483638206167452|
| 1|10005| C| T|0.21451772869824301|
| 1|10006| C| A|0.16559108705603318|
| 1|10006| C| G|0.31483638420514648|
| 1|10006| C| T|0.21451725744236924|
| 1|10007| T| A|0.17210554927891494|
| 1|10007| T| C|0.44406551656109544|
+---+-----+---+---+-------------------+
only showing top 20 rows
schemaをつくり、適用させて読み込んでみます。 In [6]:
from pyspark.sql.types import *
schema = StructType([
StructField("chromosome", StringType(), False),
StructField("position", IntegerType(), False),
StructField("ref", StringType(), False),
StructField("alt", StringType(), False),
StructField("DANN_score", DoubleType(), False)
])
In [7]:
%%time
df2 = spark.read.option("delimiter","\t")\
.csv("hdfs://hostmame:port/test/DANN/DANN_whole_genome_SNVs.tsv.bgz",inferSchema=False,nullValue="NA",header=False,schema=schema).cache()
df2.count()
CPU times: user 24.3 ms, sys: 18.9 ms, total: 43.2 ms
Wall time: 3min 48s
Out[7]:
In [8]:
+----------+--------+---+---+-------------------+
|chromosome|position|ref|alt| DANN_score|
+----------+--------+---+---+-------------------+
| 1| 10001| T| A|0.16461391399220135|
| 1| 10001| T| C| 0.4396994049749739|
| 1| 10001| T| G|0.38108629377072734|
| 1| 10002| A| C| 0.3618202027281013|
| 1| 10002| A| G| 0.4441325811177929|
| 1| 10002| A| T|0.16812846819989813|
| 1| 10003| A| C|0.36516159615040267|
| 1| 10003| A| G| 0.4480978029675266|
| 1| 10003| A| T|0.17573150952125288|
| 1| 10004| C| A|0.16559066203780862|
| 1| 10004| C| G| 0.3148356223425007|
| 1| 10004| C| T| 0.2145176490038821|
| 1| 10005| C| A| 0.1655911866123201|
| 1| 10005| C| G| 0.3148363820616745|
| 1| 10005| C| T| 0.214517728698243|
| 1| 10006| C| A|0.16559108705603318|
| 1| 10006| C| G| 0.3148363842051465|
| 1| 10006| C| T|0.21451725744236924|
| 1| 10007| T| A|0.17210554927891494|
| 1| 10007| T| C|0.44406551656109544|
+----------+--------+---+---+-------------------+
only showing top 20 rows
In [9]:
root
|-- chromosome: string (nullable = true)
|-- position: integer (nullable = true)
|-- ref: string (nullable = true)
|-- alt: string (nullable = true)
|-- DANN_score: double (nullable = true)
良さそうです。
DataFrame操作¶
ここではRawScoreでsortしています。 In [10]:
%%time
display(df2.sort("DANN_score").select("chromosome","position","ref","alt","DANN_score").limit(20).toPandas())
chromosome
position
ref
alt
DANN_score
0
X
961072
T
G
0.009863
1
X
1698174
A
C
0.010581
2
12
46394099
T
G
0.010603
3
X
457414
A
C
0.010612
4
21
46650999
T
G
0.010634
5
13
95130420
A
C
0.010705
6
X
976062
A
C
0.010814
7
3
166579220
A
C
0.010842
8
8
70768079
T
G
0.010990
9
X
976424
T
G
0.011023
10
X
513248
T
G
0.011068
11
X
102394404
A
C
0.011116
12
X
102394403
A
C
0.011116
13
22
21483206
T
G
0.011165
14
21
19034961
A
C
0.011186
15
X
795435
A
C
0.011209
16
X
1232985
A
C
0.011229
17
5
180636531
T
G
0.011302
18
16
9075921
T
G
0.011323
19
20
60522451
A
C
0.011368
CPU times: user 15.2 ms, sys: 6.24 ms, total: 21.4 ms
Wall time: 16.7 s
スコアの高い順番にsortする場合は次のようにdesc()で項目名を囲みます。 In [11]:
%%time
from pyspark.sql.functions import desc
display(df2.sort(desc("DANN_score")).select("chromosome","position","ref","alt","DANN_score").limit(20).toPandas())
chromosome
position
ref
alt
DANN_score
0
12
83250902
G
A
0.999643
1
2
216288949
C
T
0.999641
2
17
35616349
C
T
0.999639
3
15
33895443
G
A
0.999639
4
9
74360174
C
T
0.999639
5
3
53757650
G
A
0.999639
6
13
32735337
G
A
0.999639
7
1
211093356
C
T
0.999638
8
3
64148735
C
T
0.999637
9
1
237659930
G
A
0.999637
10
5
169138979
G
A
0.999637
11
1
65332823
C
T
0.999636
12
13
32721489
G
A
0.999636
13
13
35731325
G
A
0.999636
14
1
227333350
C
T
0.999636
15
5
169138991
G
A
0.999636
16
5
77511907
C
T
0.999636
17
9
334292
G
A
0.999636
18
3
123047611
C
T
0.999636
19
16
77393284
C
T
0.999636
CPU times: user 13.9 ms, sys: 2.15 ms, total: 16.1 ms
Wall time: 13.8 s
In [ ]: 十分なメモリを搭載した構成で、.cache()を実行することで、高速な操作が可能になります。 Apache Sparkを使って多様なデータをロードしたデータ解析や分析基盤にご興味がございましたらぜひお問合せください。