Nabe International | Takeru Boost >> Takeruを回せ! >>

大規模GWASやPRS研究を可能にするHail(6)

Takeru Elysia news!

大規模GWASやPRS研究を可能にするHail(6)

Takeru ElysiaはApache Sparkが利用可能な大規模計算プラットフォームです。従来のジョブスケジューラーを用いたものと異なり、複数台の計算機を使う同じcluster構成でも、次のような特徴を持っています。
  • 自分でデータ分割をする必要がない
  • 並列処理を自動実行してくれる
  • 計算にnfsを使用しないため、ファイルI/Oがボトルネックになりにくい
  • 高い拡張性を持っており、数ノードからスタートして数百ノード以上にスケールすることができる
動作するソフトウェアやライブラリの中にはGATKといったメジャーなものもありますが今回は大規模なGWASやPRS(Polygenic Risk Score)を用いた研究ができる、Hailについてお話しします。近年、各種バイオバンクが充実してきており、数年まではできなかったことが可能になってきているようです。Apache Spark上で動作するHailなら、いくつものバイオバンクのデータを読み込み、自在にフィルターや変換、アノテーションをしていくことで新しい知見をより早く得ることができます。

今日の内容:

Spark上でPandas風にデータ分析ができるライブラリ、Koalasを使う

使用するvcfファイルについて:
使用するvcfファイルは、1000genomesより取得したものです。 圧縮した状態で合計1.3TB程度あります。 HailはPython上で動作します。まず、notebook上でHailを動作させます。 この試験ではkubernetes上にspark clusterをデプロイしています。
/usr/local/miniconda3/lib/python3.7/site-packages/hail/context.py:71: UserWarning: pip-installed Hail requires additional configuration options in Spark referring
  to the path to the Hail Python module directory HAIL_DIR,
  e.g. /path/to/python/site-packages/hail:
    spark.jars=HAIL_DIR/hail-all-spark.jar
    spark.driver.extraClassPath=HAIL_DIR/hail-all-spark.jar
    spark.executor.extraClassPath=./hail-all-spark.jar
  'pip-installed Hail requires additional configuration options in Spark referring\n'
Running on Apache Spark version 2.4.5
SparkUI available at 
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.34-914bd8a10ca2
LOGGING: writing to /home/testuser1/tutorial/hail-20200701-0746-0.2.34-914bd8a10ca2.log
Loading BokehJS ...

データの読み込み

vcfファイルとannotationファイルを読み込み, 1塩基多型のみに絞り込み, 両者を結合しておきます。
In [2]:
mt = hl.read_matrix_table('hdfs://spk000:9000/ectest/1kg-L.mt')
#mt = hl.read_matrix_table('data/1kg.mt')
table = hl.import_table('data/1kg_annotations.txt', impute=True).key_by('Sample')
mt = hl.split_multi_hts(mt) 
mt = mt.filter_rows( \
    ((mt.alleles[0] == 'A') | (mt.alleles[0] == 'G') | \
     (mt.alleles[0] == 'C') | (mt.alleles[0] == 'T')) & \
    ((mt.alleles[1] == 'A') | (mt.alleles[1] == 'G') | \
     (mt.alleles[1] == 'C') | (mt.alleles[1] == 'T')))
mt = mt.annotate_cols(pheno = table[mt.s])
2020-07-01 07:46:34 Hail: INFO: Reading table to impute column types
2020-07-01 07:46:36 Hail: INFO: Finished type imputation
  Loading column 'Sample' as type 'str' (imputed)
  Loading column 'Population' as type 'str' (imputed)
  Loading column 'SuperPopulation' as type 'str' (imputed)
  Loading column 'isFemale' as type 'bool' (imputed)
  Loading column 'PurpleHair' as type 'bool' (imputed)
  Loading column 'CaffeineConsumption' as type 'int32' (imputed)

Koalasを使ってみる

KoalasはSpark上でPandasと同じようなコードの書き方ができるライブラリです。 次のようにpipでインストールできます。
$ pip install koalas
In [3]:
import databricks.koalas as ks
WARNING:root:'ARROW_PRE_0_15_IPC_FORMAT' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=0.15 and pyspark<3.0. Koalas will set it for you but it does not work if there is a Spark context already launched.
SparkのDataFrameからKoalasのDataFrameをつくるには, to_koalas()を使います。

KoalasのDataFrameへ変換

KoalasはSpark cluster上で動作するため、メモリ不足にならずに高速処理にすることができます。
In [4]:
%time mt_rows_koalas = mt.rows().to_spark().to_koalas()
CPU times: user 58 ms, sys: 7.87 ms, total: 65.8 ms
Wall time: 25 s
In [5]:
%time mt_rows_koalas.mean()
CPU times: user 51 ms, sys: 15.2 ms, total: 66.2 ms
Wall time: 29.3 s
Out[5]:
locus.position              7.608271e+07
qual                        7.447422e+04
info.AN                     4.927785e+03
info.BaseQRankSum           1.486101e-01
info.ClippingRankSum        1.187699e-01
info.DP                     7.983993e+04
info.DS                     0.000000e+00
info.END                             NaN
info.ExcessHet              3.355071e+07
info.FS                     4.402321e+00
info.HaplotypeScore                  NaN
info.InbreedingCoeff                 NaN
info.MQ                     5.614368e+01
info.MQ0                    0.000000e+00
info.MQRankSum              7.536698e-02
info.NEGATIVE_TRAIN_SITE    1.065494e-01
info.POSITIVE_TRAIN_SITE    2.484622e-01
info.QD                     1.314340e+01
info.RAW_MQ                          NaN
info.ReadPosRankSum         4.204837e-01
info.SOR                    9.905019e-01
info.VQSLOD                -1.626230e+01
a_index                     1.042901e+00
was_split                   7.725182e-02
Name: 0, dtype: float64
In [6]:
%time mt_cols_koalas = mt.cols().to_spark().to_koalas()
2020-07-01 07:47:54 Hail: WARN: cols(): Resulting column table is sorted by 'col_key'.
    To preserve matrix table column order, first unkey columns with 'key_cols_by()'
CPU times: user 27.1 ms, sys: 3.1 ms, total: 30.2 ms
Wall time: 932 ms
2020-07-01 07:47:55 Hail: INFO: Coerced sorted dataset
In [7]:
%time mt_cols_koalas.mean()
CPU times: user 18.4 ms, sys: 9.88 ms, total: 28.2 ms
Wall time: 302 ms
Out[7]:
pheno.isFemale               0.507588
pheno.PurpleHair             0.513978
pheno.CaffeineConsumption    3.967652
Name: 0, dtype: float64
In [8]:
%time mt_cols_koalas.info()
<class 'databricks.koalas.frame.DataFrame'>
Index: 2504 entries, 0 to 2503
Data columns (total 6 columns):
s                            2504 non-null object
pheno.Population             2504 non-null object
pheno.SuperPopulation        2504 non-null object
pheno.isFemale               2504 non-null bool
pheno.PurpleHair             2504 non-null bool
pheno.CaffeineConsumption    2504 non-null int32
dtypes: bool(2), int32(1), object(3)CPU times: user 34.7 ms, sys: 17.4 ms, total: 52.1 ms
Wall time: 1.18 s
/usr/local/miniconda3/lib/python3.7/site-packages/databricks/koalas/generic.py:351: FutureWarning: `get_dtype_counts` has been deprecated and will be removed in a future version. For DataFrames use `.dtypes.value_counts()
  FutureWarning,
今回は次のような操作を行いました。 ・前回Pandasで読み込めなかったデータを、Spark上でPandas風にデータ分析ができるライブラリKoalasを使って読み込み
今回はhailのデータをKoalasのDataFrameへと変換をしました。 Pandasではメモリ不足に陥るような大きなデータでも、KoalasのDataFrameなら問題なく処理するができました。 次回の内容はひとまず未定ですが、お楽しみにしてください。