Nabe International | Takeru Boost >> Takeruを回せ! >>

catch-img

Apache Spark (CADDデータ読み込み)

少し間があいてしまいましたが、今回はCADDのデータをApache Sparkに読み込んでみました。CADDのデータそのものはタブ区切りのcsvファイルですので、spark.read.csv()で読み込んでいくことができます。 今回からnotebook部分を以下のように表示するようにしました。横道ですがipynbファイルをnbconvertでhtmlへ変換していくことでnotebookをきれいに共有することができます。

In [1]:
import warnings
warnings.filterwarnings('ignore')
import pandas as pd
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

Omics Analysis with Apache Spark

CADD編

今回はCADDのデータをApache Sparkのデータフレームへ取り込んでいきます。

データの下準備

予め以下のデータをダウンロードしておいてあります。 annotationがincludeされるかどうかでサイズにかなりの開きがあります。
~ # ls /zfs-archive/CADD/v1.6/GRCh38/ -sh
合計 1010G
8.5K GRCh38_v1.6                        7.2G gnomad.genomes.r3.0.indel_inclAnno.tsv.bgz   81G whole_genome_SNVs.tsv.gz
206G annotationsGRCh38_v1.6.tar.gz      7.2G gnomad.genomes.r3.0.indel_inclAnno.tsv.gz   308G whole_genome_SNVs_inclAnno.tsv.bgz
1.1G gnomad.genomes.r3.0.indel.tsv.bgz  6.0G gnomad.genomes.r3.0.snv.tsv.gz              314G whole_genome_SNVs_inclAnno.tsv.gz
1.1G gnomad.genomes.r3.0.indel.tsv.gz    81G whole_genome_SNVs.tsv.bgz
bgzファイルはApache Sparkで高速に並列アクセスをするために事前につくっておきました。
~ # cat gnomad.genomes.r3.0.indel.tsv.gz |gzip -d |sed '1d' |bgzip -c > gnomad.genomes.r3.0.indel.tsv.bgz
sed '1d'はheaderを整えるために付与しています。
  • 変更前
    # cat gnomad.genomes.r3.0.indel.tsv.gz |gunzip|head
    ## CADD GRCh38-v1.6 (c) University of Washington, Hudson-Alpha Institute for Biotechnology and Berlin Institute of Health 2013-2020. All rights reserved.
    #Chrom  Pos Ref Alt RawScore    PHRED
    1   10061   T   TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCC    0.199964    3.123
    1   10067   T   TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCC  0.225348    3.416
    1   10108   C   CA  0.390514    5.321
    1   10108   C   CAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCT 0.215572    3.303
    1   10108   CAACCCT C   0.317701    4.494
    1   10109   AACCCT  A   0.321485    4.538
    1   10113   CT  C   0.337989    4.727
    1   10114   TA  T   0.338849    4.737
  • 変更後
    /zfs-archive/CADD/v1.6/GRCh38 # cat gnomad.genomes.r3.0.indel.tsv.bgz |gunzip|head
    #Chrom  Pos Ref Alt RawScore    PHRED
    1   10061   T   TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCC    0.199964    3.123
    1   10067   T   TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCC  0.225348    3.416
    1   10108   C   CA  0.390514    5.321
    1   10108   C   CAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCT 0.215572    3.303
    1   10108   CAACCCT C   0.317701    4.494
    1   10109   AACCCT  A   0.321485    4.538
    1   10113   CT  C   0.337989    4.727
    1   10114   TA  T   0.338849    4.737
    1   10116   AC  A   0.337997    4.727
データはs3互換ストレージに置いておきます。
(今回はオンプレミスに構築してあるテストマシンにて実行。s3はminioで、sparkはyarnで動作させています。)

起動

あらかじめ設定しておいたAWSの"Service Catalog"からパラメータを入力し、欲しいサイズのSpark Clusterを起動していきます。
主要なオプションは以下のとおりです。
オプション
役割等
--packages
packages以下にコンマ区切りで依存関係にあるパッケージを指定しています。
io.projectglow:glow-spark3_2.12:1.1.2,
io.delta:delta-core_2.12:1.0.1,
この2行はdeltaを使うために必要です。
org.apache.hadoop:hadoop-aws:3.2.0,
s3へアクセスするために必要なものです。
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
serializeにつかうclassを指定。推奨値です。
In [2]:
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = ' \
 --conf spark.jars.packages=\
io.projectglow:glow-spark3_2.12:1.1.2,\
io.delta:delta-core_2.12:1.0.1,\
org.apache.hadoop:hadoop-aws:3.2.0,\
org.apache.hadoop:hadoop-client:3.2.0 \
 --conf spark.executor.cores=10 \
 --conf spark.executor.instances=6 \
 --conf spark.driver.memory=2G \
 --conf spark.executor.memory=2G \
 --conf spark.hadoop.io.compression.codecs=io.projectglow.sql.util.BGZFCodec \
 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
 --conf spark.hadoop.fs.s3a.endpoint="http://minio-server:9199" \
 --conf spark.hadoop.fs.s3a.access.key="access-key" \
 --conf spark.hadoop.fs.s3a.secret.key="secret-key" \
 --conf spark.hadoop.fs.s3a.path.style.access="True" \
 --conf spark.hadoop.fs.s3a.impl="org.apache.hadoop.fs.s3a.S3AFileSystem" \
 --conf spark.hadoop.fs.s3a.connection.maximum=5000 \
 --conf spark.hadoop.fs.s3a.connection.timeout=3000000 \
 --master yarn pyspark-shell '

from pyspark import SparkContext
sc=SparkContext.getOrCreate(conf=conf)

from pyspark.sql import SparkSession
spark = SparkSession(sc)

spark

sc

Out[2]:
SparkContext Spark UI
Version
v3.1.3
Master
yarn
AppName
pyspark-shell

データの読み込み

今回はgnomad.genomes.r3.0.indel_inclAnno.tsv.bgzを読み込んでみます。
初めて読み込むデータでもあるため、まずはinferSchema=Trueで読み込んでみます。
In [3]:
delta_output_path = 's3a://s3-prefix-omics-data-output/CADD-delta/CADD/v1.6/GRCh38/gnomad.genomes.r3.0.indel_inclAnno'
In [4]:
df = spark.read.option("delimiter","\t")\
    .csv("s3a://s3-prefix-omics-data-raw/CADD/v1.6/GRCh38/gnomad.genomes.r3.0.indel_inclAnno.tsv.bgz",inferSchema=True,nullValue="NA",header=True)
In [5]:
import pandas as pd

いくつかの項目を選択し表示させてみます
In [6]:
display(df.select("#Chrom","Pos","Ref","Alt","Type").limit(10).toPandas())


#Chrom
Pos
Ref
Alt
Type
0
1
10061
T
TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCC
INS
1
1
10061
T
TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCC
INS
2
1
10067
T
TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCC
INS
3
1
10067
T
TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCC
INS
4
1
10108
C
CA
INS
5
1
10108
C
CA
INS
6
1
10108
C
CAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCT
INS
7
1
10108
C
CAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCT
INS
8
1
10108
CAACCCT
C
DEL
9
1
10108
CAACCCT
C
DEL

データの中身を確認してみます。
df.na.drop()はデータに欠損値があるものをdropするものですが、このデータにおいては何かしらの列でNAがあるようで、何も残りませんでした。
以下いくつかのセルを実行することでどのように欠損値を含んだデータであるかが確認できますが、表示の都合で出力を削除してあります。
In [7]:
(df.na.drop().limit(50).toPandas())

SIFTcatはすべてNAのようです。
In [8]:
display(df.dropna(subset="SIFTcat").limit(50).toPandas())

それぞれのカラムでNAであるデータがいくつあるかをカウントしてみます。
In [9]:
from pyspark.sql.functions import *
col_null_cnt_df =  df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns])
In [10]:
display(col_null_cnt_df.toPandas())

反対に、NullやNaではなく値が入っている数を数えてみます。
SIFTcat以外にもいくつかのカラムはそもそもデータが入っていないことがわかりました。
In [11]:
col_not_null_cnt_df =  df.select([count(when(col(c).isNotNull(),c)).alias(c) for c in df.columns])
display(col_not_null_cnt_df.toPandas())

inferSchemaで取得されたSchemaを見てみます 正確なところはやはり↓のような公式ドキュメントを見て決定していく必要がありそうです。
https://cadd.gs.washington.edu/static/ReleaseNotes_CADD_v1.4.pdf
In [12]:
df.printSchema()

root
 |-- #Chrom: string (nullable = true)
 |-- Pos: integer (nullable = true)
 |-- Ref: string (nullable = true)
 |-- Alt: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Length: integer (nullable = true)
 |-- AnnoType: string (nullable = true)
 |-- Consequence: string (nullable = true)
 |-- ConsScore: integer (nullable = true)
 |-- ConsDetail: string (nullable = true)
 |-- GC: double (nullable = true)
 |-- CpG: double (nullable = true)
 |-- motifECount: integer (nullable = true)
 |-- motifEName: string (nullable = true)
 |-- motifEHIPos: integer (nullable = true)
 |-- motifEScoreChng: string (nullable = true)
 |-- oAA: string (nullable = true)
 |-- nAA: string (nullable = true)
 |-- GeneID: string (nullable = true)
 |-- FeatureID: string (nullable = true)
 |-- GeneName: string (nullable = true)
 |-- CCDS: string (nullable = true)
 |-- Intron: string (nullable = true)
 |-- Exon: string (nullable = true)
 |-- cDNApos: integer (nullable = true)
 |-- relcDNApos: double (nullable = true)
 |-- CDSpos: integer (nullable = true)
 |-- relCDSpos: double (nullable = true)
 |-- protPos: integer (nullable = true)
 |-- relProtPos: double (nullable = true)
 |-- Domain: string (nullable = true)
 |-- Dst2Splice: integer (nullable = true)
 |-- Dst2SplType: string (nullable = true)
 |-- minDistTSS: integer (nullable = true)
 |-- minDistTSE: integer (nullable = true)
 |-- SIFTcat: string (nullable = true)
 |-- SIFTval: string (nullable = true)
 |-- PolyPhenCat: string (nullable = true)
 |-- PolyPhenVal: string (nullable = true)
 |-- priPhCons: double (nullable = true)
 |-- mamPhCons: double (nullable = true)
 |-- verPhCons: double (nullable = true)
 |-- priPhyloP: double (nullable = true)
 |-- mamPhyloP: double (nullable = true)
 |-- verPhyloP: double (nullable = true)
 |-- bStatistic: integer (nullable = true)
 |-- targetScan: integer (nullable = true)
 |-- mirSVR-Score: double (nullable = true)
 |-- mirSVR-E: double (nullable = true)
 |-- mirSVR-Aln: integer (nullable = true)
 |-- cHmm_E1: double (nullable = true)
 |-- cHmm_E2: double (nullable = true)
 |-- cHmm_E3: double (nullable = true)
 |-- cHmm_E4: double (nullable = true)
 |-- cHmm_E5: double (nullable = true)
 |-- cHmm_E6: double (nullable = true)
 |-- cHmm_E7: double (nullable = true)
 |-- cHmm_E8: double (nullable = true)
 |-- cHmm_E9: double (nullable = true)
 |-- cHmm_E10: double (nullable = true)
 |-- cHmm_E11: double (nullable = true)
 |-- cHmm_E12: double (nullable = true)
 |-- cHmm_E13: double (nullable = true)
 |-- cHmm_E14: double (nullable = true)
 |-- cHmm_E15: double (nullable = true)
 |-- cHmm_E16: double (nullable = true)
 |-- cHmm_E17: double (nullable = true)
 |-- cHmm_E18: double (nullable = true)
 |-- cHmm_E19: double (nullable = true)
 |-- cHmm_E20: double (nullable = true)
 |-- cHmm_E21: double (nullable = true)
 |-- cHmm_E22: double (nullable = true)
 |-- cHmm_E23: double (nullable = true)
 |-- cHmm_E24: double (nullable = true)
 |-- cHmm_E25: double (nullable = true)
 |-- GerpRS: double (nullable = true)
 |-- GerpRSpval: double (nullable = true)
 |-- GerpN: double (nullable = true)
 |-- GerpS: double (nullable = true)
 |-- tOverlapMotifs: integer (nullable = true)
 |-- motifDist: double (nullable = true)
 |-- EncodeH3K4me1-sum: double (nullable = true)
 |-- EncodeH3K4me1-max: double (nullable = true)
 |-- EncodeH3K4me2-sum: double (nullable = true)
 |-- EncodeH3K4me2-max: double (nullable = true)
 |-- EncodeH3K4me3-sum: double (nullable = true)
 |-- EncodeH3K4me3-max: double (nullable = true)
 |-- EncodeH3K9ac-sum: double (nullable = true)
 |-- EncodeH3K9ac-max: double (nullable = true)
 |-- EncodeH3K9me3-sum: double (nullable = true)
 |-- EncodeH3K9me3-max: double (nullable = true)
 |-- EncodeH3K27ac-sum: double (nullable = true)
 |-- EncodeH3K27ac-max: double (nullable = true)
 |-- EncodeH3K27me3-sum: double (nullable = true)
 |-- EncodeH3K27me3-max: double (nullable = true)
 |-- EncodeH3K36me3-sum: double (nullable = true)
 |-- EncodeH3K36me3-max: double (nullable = true)
 |-- EncodeH3K79me2-sum: double (nullable = true)
 |-- EncodeH3K79me2-max: double (nullable = true)
 |-- EncodeH4K20me1-sum: double (nullable = true)
 |-- EncodeH4K20me1-max: double (nullable = true)
 |-- EncodeH2AFZ-sum: double (nullable = true)
 |-- EncodeH2AFZ-max: double (nullable = true)
 |-- EncodeDNase-sum: double (nullable = true)
 |-- EncodeDNase-max: double (nullable = true)
 |-- EncodetotalRNA-sum: double (nullable = true)
 |-- EncodetotalRNA-max: double (nullable = true)
 |-- Grantham: string (nullable = true)
 |-- SpliceAI-acc-gain: double (nullable = true)
 |-- SpliceAI-acc-loss: double (nullable = true)
 |-- SpliceAI-don-gain: double (nullable = true)
 |-- SpliceAI-don-loss: double (nullable = true)
 |-- MMSp_acceptorIntron: double (nullable = true)
 |-- MMSp_acceptor: double (nullable = true)
 |-- MMSp_exon: double (nullable = true)
 |-- MMSp_donor: double (nullable = true)
 |-- MMSp_donorIntron: string (nullable = true)
 |-- Dist2Mutation: integer (nullable = true)
 |-- Freq100bp: integer (nullable = true)
 |-- Rare100bp: integer (nullable = true)
 |-- Sngl100bp: integer (nullable = true)
 |-- Freq1000bp: integer (nullable = true)
 |-- Rare1000bp: integer (nullable = true)
 |-- Sngl1000bp: integer (nullable = true)
 |-- Freq10000bp: integer (nullable = true)
 |-- Rare10000bp: integer (nullable = true)
 |-- Sngl10000bp: integer (nullable = true)
 |-- EnsembleRegulatoryFeature: string (nullable = true)
 |-- dbscSNV-ada_score: string (nullable = true)
 |-- dbscSNV-rf_score: string (nullable = true)
 |-- RemapOverlapTF: integer (nullable = true)
 |-- RemapOverlapCL: integer (nullable = true)
 |-- RawScore: double (nullable = true)
 |-- PHRED: double (nullable = true)


ディスクへの書き込み、読み込み

S3
データをdelta形式でs3へ保存します
In [13]:
%%time
df.write.format("delta").mode("overwrite").save(delta_output_path)

CPU times: user 545 ms, sys: 119 ms, total: 665 ms
Wall time: 7min 13s

s3から読み込む場合は、このようにします。
In [14]:
df_s3 = spark.read.format("delta").load(delta_output_path)

DataFrame操作

ここではRawScoreでsortしています。
In [15]:
%%time
display(df_s3.sort("RawScore").select("#Chrom","Pos","Ref","Alt","Type","RawScore").limit(20).toPandas())


#Chrom
Pos
Ref
Alt
Type
RawScore
0
5
146459065
CT
C
DEL
-3.088003
1
X
153398165
CT
C
DEL
-2.764114
2
X
153398165
CT
C
DEL
-2.764114
3
3
156927605
G
GT
INS
-2.762860
4
12
52472133
CG
C
DEL
-2.716330
5
7
100752351
CA
C
DEL
-2.700306
6
7
100752351
CA
C
DEL
-2.700306
7
11
118533115
CG
C
DEL
-2.332786
8
11
118533115
CG
C
DEL
-2.332786
9
11
118533115
CG
C
DEL
-2.332786
10
6
160064455
CG
C
DEL
-2.328412
11
6
31271844
TC
T
DEL
-2.276888
12
6
31271844
TC
T
DEL
-2.276888
13
6
31271844
TC
T
DEL
-2.276888
14
6
31271844
TC
T
DEL
-2.276888
15
6
31271844
TC
T
DEL
-2.276888
16
17
76292849
AT
A
DEL
-2.240346
17
17
76292849
AT
A
DEL
-2.240346
18
21
44592478
CG
C
DEL
-2.230148
19
21
44592478
CG
C
DEL
-2.230148

CPU times: user 21.2 ms, sys: 1.33 ms, total: 22.5 ms
Wall time: 23.8 s

スコアの高い順番にsortする場合は次のようにdesc()で項目名を囲みます。
In [16]:
%%time
display(df_s3.sort(desc("RawScore")).select("#Chrom","Pos","Ref","Alt","Type","RawScore").limit(20).toPandas())


#Chrom
Pos
Ref
Alt
Type
RawScore
0
2
178528523
CTTACTGGCAGGTTGTTTTTAAACCATTCGA
C
DEL
22.120904
1
2
178528523
CTTACTGGCAGGTTGTTTTTAAACCATTCGA
C
DEL
22.120904
2
1
246327333
A
AT
INS
17.310817
3
1
246327333
A
AT
INS
17.310817
4
2
178542369
T
TGGTAAACTTAAACGTGGACCTA
INS
14.240998
5
2
178542369
T
TGGTAAACTTAAACGTGGACCTA
INS
14.240998
6
2
178542369
T
TGGTAAACTTAAACGTGGACCTA
INS
14.240998
7
14
45226825
TCCAA
T
DEL
13.917819
8
2
178595802
TGGAACATCTGGAAATA
T
DEL
13.404990
9
2
178595802
TGGAACATCTGGAAATA
T
DEL
13.404990
10
2
178598496
GTTTTCCTACC
G
DEL
13.027805
11
2
178598496
GTTTTCCTACC
G
DEL
13.027805
12
2
178531890
AAAAGATATCAAATCTTGCAGACCTCT
A
DEL
12.881773
13
2
178531890
AAAAGATATCAAATCTTGCAGACCTCT
A
DEL
12.881773
14
2
178531453
AG
A
DEL
12.862448
15
2
178531453
AG
A
DEL
12.862448
16
2
178531453
AG
A
DEL
12.862448
17
2
178527722
CCT
C
DEL
12.783503
18
2
178527722
CCT
C
DEL
12.783503
19
2
178527495
CAT
C
DEL
12.755046

CPU times: user 18.1 ms, sys: 2.54 ms, total: 20.7 ms
Wall time: 23.5 s

今回はテスト用の小さなVMを6ノード使って作成したSpark Clusterを使用しました。csvファイルなので簡単に読み込めますが、実際にはもっと丁寧にSchemaの定義をする必要はあります。deltaへの書き込みが7分13秒と少々かかっていますが、これはテストマシンの構成によるもので、Cloudや実際に使用するオンプレミスの機材ではもっと高速になります。