Nabe International | Takeru Boost >> Takeruを回せ! >>

大規模GWASやPRS研究を可能にするHail(5)

Takeru Elysia news!

大規模GWASやPRS研究を可能にするHail(5)

Takeru ElysiaはApache Sparkが利用可能な大規模計算プラットフォームです。従来のジョブスケジューラーを用いたものと異なり、複数台の計算機を使う同じcluster構成でも、次のような特徴を持っています。
  • 自分でデータ分割をする必要がない
  • 並列処理を自動実行してくれる
  • 計算にnfsを使用しないため、ファイルI/Oがボトルネックになりにくい
  • 高い拡張性を持っており、数ノードからスタートして数百ノード以上にスケールすることができる
動作するソフトウェアやライブラリの中にはGATKといったメジャーなものもありますが今回は大規模なGWASやPRS(Polygenic Risk Score)を用いた研究ができる、Hailについてお話しします。近年、各種バイオバンクが充実してきており、数年まではできなかったことが可能になってきているようです。Apache Spark上で動作するHailなら、いくつものバイオバンクのデータを読み込み、自在にフィルターや変換、アノテーションをしていくことで新しい知見をより早く得ることができます。

今日の内容:

HailのMatrixTableやTableをPandasやSparkのDataFrameへexportする

Hailで扱ったり分析をしたデータをPandasやSparkのDataFrame上へ変換することで様々な分析ができるようになることを想定しています。 まず最初に16MBの小さなデータをつかって試してみます。 PandasはSpark上で動作するわけではないので自ずと扱えるデータ量やパフォーマンスの課題がでると思われますので、 そのあたりを1.3TBのvcfファイルを使って試してみます。
使用するvcfファイルについて:
使用するvcfファイルは、Hailのサンプルデータです。 圧縮した状態で合計16.1MBの小さなものです。 HailはPython上で動作します。まず、notebook上でHailを動作させます。 この試験ではkubernetes上にspark clusterをデプロイしています。

テストデータ(16MB)の読み込み

まずは小さなvcfファイルとannotationファイルを読み込み, 1塩基多型のみに絞り込み, 両者を結合しておきます。
In [2]:
mt = hl.read_matrix_table('data/1kg.mt')
table = hl.import_table('data/1kg_annotations.txt', impute=True).key_by('Sample')
mt = hl.split_multi_hts(mt) 
mt = mt.filter_rows( \
    ((mt.alleles[0] == 'A') | (mt.alleles[0] == 'G') | \
     (mt.alleles[0] == 'C') | (mt.alleles[0] == 'T')) & \
    ((mt.alleles[1] == 'A') | (mt.alleles[1] == 'G') | \
     (mt.alleles[1] == 'C') | (mt.alleles[1] == 'T')))
mt = mt.annotate_cols(pheno = table[mt.s])
2020-06-27 06:21:34 Hail: INFO: Reading table to impute column types
2020-06-27 06:21:37 Hail: INFO: Finished type imputation
  Loading column 'Sample' as type 'str' (imputed)
  Loading column 'Population' as type 'str' (imputed)
  Loading column 'SuperPopulation' as type 'str' (imputed)
  Loading column 'isFemale' as type 'bool' (imputed)
  Loading column 'PurpleHair' as type 'bool' (imputed)
  Loading column 'CaffeineConsumption' as type 'int32' (imputed)

テーブルの確認

mtはMatrixTable, tableはHailのTableとして扱われています。
In [3]:
mt
Out[3]:
<hail.matrixtable.MatrixTable at 0x7ff89d111410>
In [4]:
table
Out[4]:
<hail.table.Table at 0x7ff89e3e2810>
MatrixTableの中にはColumn fieldsやRow fieldsなど、さらにいくつかのtableが含まれています。
In [5]:
mt.describe()
----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
    'pheno': struct {
        Population: str, 
        SuperPopulation: str, 
        isFemale: bool, 
        PurpleHair: bool, 
        CaffeineConsumption: int32
    }
----------------------------------------
Row fields:
    'locus': locus<GRCh37>
    'alleles': array<str>
    'rsid': str
    'qual': float64
    'filters': set<str>
    'info': struct {
        AC: array<int32>, 
        AF: array<float64>, 
        AN: int32, 
        BaseQRankSum: float64, 
        ClippingRankSum: float64, 
        DP: int32, 
        DS: bool, 
        FS: float64, 
        HaplotypeScore: float64, 
        InbreedingCoeff: float64, 
        MLEAC: array<int32>, 
        MLEAF: array<float64>, 
        MQ: float64, 
        MQ0: int32, 
        MQRankSum: float64, 
        QD: float64, 
        ReadPosRankSum: float64, 
        set: str
    }
    'a_index': int32
    'was_split': bool
----------------------------------------
Entry fields:
    'GT': call
    'AD': array<int32>
    'DP': int32
    'GQ': int32
    'PL': array<int32>
----------------------------------------
Column key: ['s']
Row key: ['locus', 'alleles']
----------------------------------------
tableは次のようにRow fieldsを基本としたものです。
In [6]:
table.describe()
----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    'Sample': str 
    'Population': str 
    'SuperPopulation': str 
    'isFemale': bool 
    'PurpleHair': bool 
    'CaffeineConsumption': int32 
----------------------------------------
Key: ['Sample']
----------------------------------------

HailのTableをPandasやSparkのDataFrameへexport

PandasのDataFrameへ変換

次のようにto_pandas()を実行することでHailのTableをPandasのDataFrameに変換することができます。
In [7]:
table_pd = table.to_pandas()
In [8]:
table_pd
Out[8]:
Sample Population SuperPopulation isFemale PurpleHair CaffeineConsumption
0 HG00096 GBR EUR False False 4
1 HG00097 GBR EUR True True 4
2 HG00098 GBR EUR False False 5
3 HG00099 GBR EUR True False 4
4 HG00100 GBR EUR True False 5
... ... ... ... ... ... ...
3495 NA21137 GIH SAS True False 1
3496 NA21141 GIH SAS True True 2
3497 NA21142 GIH SAS True True 2
3498 NA21143 GIH SAS True True 5
3499 NA21144 GIH SAS True False 3
3500 rows × 6 columns
In [9]:
table_pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3500 entries, 0 to 3499
Data columns (total 6 columns):
Sample                 3500 non-null object
Population             3500 non-null object
SuperPopulation        3500 non-null object
isFemale               3500 non-null bool
PurpleHair             3500 non-null bool
CaffeineConsumption    3500 non-null int32
dtypes: bool(2), int32(1), object(3)
memory usage: 102.7+ KB
meanで平均値を出してみます
In [10]:
table_pd.mean()
Out[10]:
isFemale               0.502857
PurpleHair             0.517429
CaffeineConsumption    3.983714
dtype: float64

SparkのDataFrameへ変換

次のようにto_spark()を実行することでHailのtableをSparkのDataFrameに変換することができます。
In [11]:
table_spark = table.to_spark()
In [12]:
table_spark
Out[12]:
DataFrame[Sample: string, Population: string, SuperPopulation: string, isFemale: boolean, PurpleHair: boolean, CaffeineConsumption: int]
In [13]:
table_spark.show()
+-------+----------+---------------+--------+----------+-------------------+
| Sample|Population|SuperPopulation|isFemale|PurpleHair|CaffeineConsumption|
+-------+----------+---------------+--------+----------+-------------------+
|HG00096|       GBR|            EUR|   false|     false|                  4|
|HG00097|       GBR|            EUR|    true|      true|                  4|
|HG00098|       GBR|            EUR|   false|     false|                  5|
|HG00099|       GBR|            EUR|    true|     false|                  4|
|HG00100|       GBR|            EUR|    true|     false|                  5|
|HG00101|       GBR|            EUR|   false|      true|                  1|
|HG00102|       GBR|            EUR|    true|      true|                  6|
|HG00103|       GBR|            EUR|   false|      true|                  5|
|HG00104|       GBR|            EUR|    true|     false|                  5|
|HG00105|       GBR|            EUR|   false|     false|                  4|
|HG00106|       GBR|            EUR|    true|     false|                  7|
|HG00107|       GBR|            EUR|   false|      true|                  4|
|HG00108|       GBR|            EUR|   false|     false|                  2|
|HG00109|       GBR|            EUR|   false|      true|                  3|
|HG00110|       GBR|            EUR|    true|      true|                  3|
|HG00111|       GBR|            EUR|    true|      true|                  2|
|HG00112|       GBR|            EUR|   false|     false|                  2|
|HG00113|       GBR|            EUR|   false|     false|                  5|
|HG00114|       GBR|            EUR|   false|      true|                  6|
|HG00115|       GBR|            EUR|   false|      true|                  4|
+-------+----------+---------------+--------+----------+-------------------+
only showing top 20 rows

MatrixTableをPandasやSparkのDataFrameへexport

MatrixTableはいくつかのfieldを内包しています。 以下ではそれぞれのfield単位でexportしてみます。
MatrixTableにあるfieldをHailのTableとして扱うためには次のようにします。
In [14]:
mt.cols()
2020-06-27 06:21:42 Hail: WARN: cols(): Resulting column table is sorted by 'col_key'.
    To preserve matrix table column order, first unkey columns with 'key_cols_by()'
Out[14]:
<hail.table.Table at 0x7ff89cf1bc50>
In [15]:
mt.rows()
Out[15]:
<hail.table.Table at 0x7ff89cf371d0>
In [16]:
mt.entries()
2020-06-27 06:21:42 Hail: WARN: entries(): Resulting entries table is sorted by '(row_key, col_key)'.
    To preserve row-major matrix table order, first unkey columns with 'key_cols_by()'
Out[16]:
<hail.table.Table at 0x7ff89cf39950>

SparkのDataFrameへ変換

In [17]:
%time mt_rows_spark = mt.rows().to_spark()
CPU times: user 42.4 ms, sys: 389 µs, total: 42.8 ms
Wall time: 1.77 s

PandasのDataFrameへ変換

In [18]:
%time mt_rows_pd = mt.rows().to_pandas()
CPU times: user 205 ms, sys: 27.9 ms, total: 233 ms
Wall time: 1.85 s
In [19]:
%time mt_rows_pd.mean()
CPU times: user 4.23 s, sys: 61.7 ms, total: 4.3 s
Wall time: 4.32 s
Out[19]:
locus.position          7.763903e+07
qual                    1.771046e+05
info.AN                 5.002793e+03
info.BaseQRankSum      -8.111796e-02
info.ClippingRankSum   -2.974969e+00
info.DP                 1.784070e+04
info.DS                 0.000000e+00
info.FS                 1.696762e+01
info.InbreedingCoeff    9.519850e-02
info.MQ                 5.882072e+01
info.MQ0                0.000000e+00
info.MQRankSum          3.643640e+00
info.QD                 1.819635e+01
info.ReadPosRankSum     1.580171e+00
a_index                 1.000000e+00
was_split               0.000000e+00
dtype: float64
どちらもうまくいきました。

テストデータ(1.3TB)の読み込み

次に大きなvcfファイルとannotationファイルを読み込み, 1塩基多型のみに絞り込み, 両者を結合しておきます。
In [20]:
mtL = hl.read_matrix_table('hdfs://spk000:9000/ectest/1kg-L.mt')
tableL = hl.import_table('data/1kg_annotations.txt', impute=True).key_by('Sample')
mtL = hl.split_multi_hts(mtL) 
mtL = mtL.filter_rows( \
    ((mtL.alleles[0] == 'A') | (mtL.alleles[0] == 'G') | \
     (mtL.alleles[0] == 'C') | (mtL.alleles[0] == 'T')) & \
    ((mtL.alleles[1] == 'A') | (mtL.alleles[1] == 'G') | \
     (mtL.alleles[1] == 'C') | (mtL.alleles[1] == 'T')))
mtL = mtL.annotate_cols(pheno = table[mtL.s])
2020-06-27 06:21:50 Hail: INFO: Reading table to impute column types
2020-06-27 06:21:50 Hail: INFO: Finished type imputation
  Loading column 'Sample' as type 'str' (imputed)
  Loading column 'Population' as type 'str' (imputed)
  Loading column 'SuperPopulation' as type 'str' (imputed)
  Loading column 'isFemale' as type 'bool' (imputed)
  Loading column 'PurpleHair' as type 'bool' (imputed)
  Loading column 'CaffeineConsumption' as type 'int32' (imputed)

SparkのDataFrameへ変換

Sparkは複数のノードに自動で分散されるのでより大きなデータを扱うことができます。 今回のテストでは384GBのメモリを持ったマシンが4台あるので、合計1.5TBのメモリを使うことができます。
In [21]:
%time mtL_rows_spark = mtL.rows().to_spark()
CPU times: user 46.7 ms, sys: 5.03 ms, total: 51.7 ms
Wall time: 16.3 s

PandasのDataFrameへ変換

読み込んだvcfファイルが1.3TBあるため、こちらはうまくいくかどうか少し不安です。 PandasはSpark上では動作できないので、できるかどうかはこのnotebookが載っているサーバーのメモリ次第です。
In [22]:
%time mtL_rows_pd = mtL.rows().to_pandas()
---------------------------------------------------------------------------
FatalError                                Traceback (most recent call last)
<timed exec> in <module>

<decorator-gen-1133> in to_pandas(self, flatten)

/usr/local/miniconda3/lib/python3.7/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    583     def wrapper(__original_func, *args, **kwargs):
    584         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 585         return __original_func(*args_, **kwargs_)
    586 
    587     return wrapper

/usr/local/miniconda3/lib/python3.7/site-packages/hail/table.py in to_pandas(self, flatten)
   3185 
   3186         """
-> 3187         return Env.spark_backend('to_pandas').to_pandas(self, flatten)
   3188 
   3189     @staticmethod

/usr/local/miniconda3/lib/python3.7/site-packages/hail/backend/backend.py in to_pandas(self, t, flatten)
    151 
    152     def to_pandas(self, t, flatten):
--> 153         return self.to_spark(t, flatten).toPandas()
    154 
    155     def from_pandas(self, df, key):

/usr/local/miniconda3/lib/python3.7/site-packages/pyspark/sql/dataframe.py in toPandas(self)
   2148 
   2149         # Below is toPandas without Arrow optimization.
-> 2150         pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
   2151 
   2152         dtype = {}

/usr/local/miniconda3/lib/python3.7/site-packages/pyspark/sql/dataframe.py in collect(self)
    532         """
    533         with SCCallSiteSync(self._sc) as css:
--> 534             sock_info = self._jdf.collectToPython()
    535         return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
    536 

/usr/local/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/local/miniconda3/lib/python3.7/site-packages/hail/utils/java.py in deco(*args, **kwargs)
    223             raise FatalError('%s\n\nJava stack trace:\n%s\n'
    224                              'Hail version: %s\n'
--> 225                              'Error summary: %s' % (deepest, full, hail.__version__, deepest)) from None
    226         except pyspark.sql.utils.CapturedException as e:
    227             raise FatalError('%s\n\nJava stack trace:\n%s\n'

FatalError: OutOfMemoryError: Java heap space

Java stack trace:
java.lang.OutOfMemoryError: Java heap space
    at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:282)
    at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:278)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at org.apache.spark.sql.execution.SparkPlan$$anon$1.foreach(SparkPlan.scala:278)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:300)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:299)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3263)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3260)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3260)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)



Hail version: 0.2.34-914bd8a10ca2
Error summary: OutOfMemoryError: Java heap space
In [23]:
%time mtL_rows_pd.mean()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<timed eval> in <module>

NameError: name 'mtL_rows_pd' is not defined
384GBのメモリを持っていましたが途中でkernelが落ちてしまいました。

まとめ

今回は次のような操作を行いました。 ・HailのTable, MatrixTableをPandasやSparkのDataFrameへ変換 ・1.3TBのvcfファイルはPandasのDataFrameへ読み込むことができませんでしたが、SparkのDataFrameなら可能でした。
このようにSparkでは、巨大なデータでも自分で分割することなく自動的に複数のノードを使った分散処理を行うことができます。 ノードは拡張可能で、メモリやCPUだけではなくSSDを足すことでファイルI/Oの能力も拡張することができます。 今後増加が見込まれるBiobankのデータを扱うのに最適なシステムです。
次回はSpark上でPandas風にデータ分析ができるライブラリ、Koalasを使ってみたいと思います。
In [ ]: