Nabe International | Takeru Boost >> Takeruを回せ! >>

catch-img

Apache Spark (DANNデータ読み込み編)

最近はウェビナー等ではHailのお話をすることが多いですが、ここではGlow(https://glow.readthedocs.io)やApache SparkのネイティブなDataFrameを使ってゲノムデータ等を取り扱う方法を紹介します。 今回はDANNのデータをApache Sparkのデータフレームへ取り込んでいきます。
DANNは遺伝子変異の病原性をスコアリングしたもので、深層学習が使われています。

データの下準備

予め以下のデータをダウンロードしておいてあります。 103GBあります。

$ ls -sh DANN_whole_genome_SNVs.tsv.bgz
103G DANN_whole_genome_SNVs.tsv.bgz

最初の10行を見てみます。

$ zcat DANN_whole_genome_SNVs.tsv.bgz|head
1   10001   T   A   0.16461391399220135
1   10001   T   C   0.4396994049749739
1   10001   T   G   0.38108629377072734
1   10002   A   C   0.36182020272810128
1   10002   A   G   0.44413258111779291
1   10002   A   T   0.16812846819989813
1   10003   A   C   0.36516159615040267
1   10003   A   G   0.4480978029675266
1   10003   A   T   0.17573150952125288
1   10004   C   A   0.16559066203780862

とてもシンプルなデータですね。そしてheaderはありません。
chromosome, position, ref, alt, DANN_score
という名前でheaderを作ることにします。 データはhdfsに置きました。 (今回はオンプレミスに構築してあるテストマシンにて実行。sparkはkubernetesで動作させています。)

起動

k8sをリソースマネージャーとしたsparkを起動しておきます。

データの読み込み

inferSchema=Falseで読み込みます。
headerもないので、header=Falseです。 In [3]:

delta_output_path = 'hdfs://hostname:port/test/DANN-delta'

In [4]:

df = spark.read.option("delimiter","\t")\
    .csv("hdfs://hostname:port/test/DANN/DANN_whole_genome_SNVs.tsv.bgz",inferSchema=False,nullValue="NA",header=False)

表示させてみます In [5]:

df.show()
+---+-----+---+---+-------------------+
|_c0|  _c1|_c2|_c3|                _c4|
+---+-----+---+---+-------------------+
|  1|10001|  T|  A|0.16461391399220135|
|  1|10001|  T|  C| 0.4396994049749739|
|  1|10001|  T|  G|0.38108629377072734|
|  1|10002|  A|  C|0.36182020272810128|
|  1|10002|  A|  G|0.44413258111779291|
|  1|10002|  A|  T|0.16812846819989813|
|  1|10003|  A|  C|0.36516159615040267|
|  1|10003|  A|  G| 0.4480978029675266|
|  1|10003|  A|  T|0.17573150952125288|
|  1|10004|  C|  A|0.16559066203780862|
|  1|10004|  C|  G|0.31483562234250068|
|  1|10004|  C|  T|0.21451764900388209|
|  1|10005|  C|  A|0.16559118661232011|
|  1|10005|  C|  G|0.31483638206167452|
|  1|10005|  C|  T|0.21451772869824301|
|  1|10006|  C|  A|0.16559108705603318|
|  1|10006|  C|  G|0.31483638420514648|
|  1|10006|  C|  T|0.21451725744236924|
|  1|10007|  T|  A|0.17210554927891494|
|  1|10007|  T|  C|0.44406551656109544|
+---+-----+---+---+-------------------+
only showing top 20 rows

schemaをつくり、適用させて読み込んでみます。 In [6]:

from pyspark.sql.types import *

schema = StructType([
  StructField("chromosome", StringType(), False),
  StructField("position", IntegerType(), False),
  StructField("ref", StringType(), False),
  StructField("alt", StringType(), False),
  StructField("DANN_score", DoubleType(), False)
])

In [7]:

%%time
df2 = spark.read.option("delimiter","\t")\
    .csv("hdfs://hostmame:port/test/DANN/DANN_whole_genome_SNVs.tsv.bgz",inferSchema=False,nullValue="NA",header=False,schema=schema).cache()
df2.count()
CPU times: user 24.3 ms, sys: 18.9 ms, total: 43.2 ms
Wall time: 3min 48s

Out[7]:

8575974291

In [8]:

df2.show()
+----------+--------+---+---+-------------------+
|chromosome|position|ref|alt|         DANN_score|
+----------+--------+---+---+-------------------+
|         1|   10001|  T|  A|0.16461391399220135|
|         1|   10001|  T|  C| 0.4396994049749739|
|         1|   10001|  T|  G|0.38108629377072734|
|         1|   10002|  A|  C| 0.3618202027281013|
|         1|   10002|  A|  G| 0.4441325811177929|
|         1|   10002|  A|  T|0.16812846819989813|
|         1|   10003|  A|  C|0.36516159615040267|
|         1|   10003|  A|  G| 0.4480978029675266|
|         1|   10003|  A|  T|0.17573150952125288|
|         1|   10004|  C|  A|0.16559066203780862|
|         1|   10004|  C|  G| 0.3148356223425007|
|         1|   10004|  C|  T| 0.2145176490038821|
|         1|   10005|  C|  A| 0.1655911866123201|
|         1|   10005|  C|  G| 0.3148363820616745|
|         1|   10005|  C|  T|  0.214517728698243|
|         1|   10006|  C|  A|0.16559108705603318|
|         1|   10006|  C|  G| 0.3148363842051465|
|         1|   10006|  C|  T|0.21451725744236924|
|         1|   10007|  T|  A|0.17210554927891494|
|         1|   10007|  T|  C|0.44406551656109544|
+----------+--------+---+---+-------------------+
only showing top 20 rows

In [9]:

df2.printSchema()
root
 |-- chromosome: string (nullable = true)
 |-- position: integer (nullable = true)
 |-- ref: string (nullable = true)
 |-- alt: string (nullable = true)
 |-- DANN_score: double (nullable = true)

良さそうです。

DataFrame操作

ここではRawScoreでsortしています。 In [10]:

%%time
display(df2.sort("DANN_score").select("chromosome","position","ref","alt","DANN_score").limit(20).toPandas())

chromosome position ref alt DANN_score
0 X 961072 T G 0.009863
1 X 1698174 A C 0.010581
2 12 46394099 T G 0.010603
3 X 457414 A C 0.010612
4 21 46650999 T G 0.010634
5 13 95130420 A C 0.010705
6 X 976062 A C 0.010814
7 3 166579220 A C 0.010842
8 8 70768079 T G 0.010990
9 X 976424 T G 0.011023
10 X 513248 T G 0.011068
11 X 102394404 A C 0.011116
12 X 102394403 A C 0.011116
13 22 21483206 T G 0.011165
14 21 19034961 A C 0.011186
15 X 795435 A C 0.011209
16 X 1232985 A C 0.011229
17 5 180636531 T G 0.011302
18 16 9075921 T G 0.011323
19 20 60522451 A C 0.011368
CPU times: user 15.2 ms, sys: 6.24 ms, total: 21.4 ms
Wall time: 16.7 s

スコアの高い順番にsortする場合は次のようにdesc()で項目名を囲みます。 In [11]:

%%time
from pyspark.sql.functions import desc
display(df2.sort(desc("DANN_score")).select("chromosome","position","ref","alt","DANN_score").limit(20).toPandas())

chromosome position ref alt DANN_score
0 12 83250902 G A 0.999643
1 2 216288949 C T 0.999641
2 17 35616349 C T 0.999639
3 15 33895443 G A 0.999639
4 9 74360174 C T 0.999639
5 3 53757650 G A 0.999639
6 13 32735337 G A 0.999639
7 1 211093356 C T 0.999638
8 3 64148735 C T 0.999637
9 1 237659930 G A 0.999637
10 5 169138979 G A 0.999637
11 1 65332823 C T 0.999636
12 13 32721489 G A 0.999636
13 13 35731325 G A 0.999636
14 1 227333350 C T 0.999636
15 5 169138991 G A 0.999636
16 5 77511907 C T 0.999636
17 9 334292 G A 0.999636
18 3 123047611 C T 0.999636
19 16 77393284 C T 0.999636
CPU times: user 13.9 ms, sys: 2.15 ms, total: 16.1 ms
Wall time: 13.8 s

In [ ]: 十分なメモリを搭載した構成で、.cache()を実行することで、高速な操作が可能になります。 Apache Sparkを使って多様なデータをロードしたデータ解析や分析基盤にご興味がございましたらぜひお問合せください。