Dask¶
Dask是一个灵活的Python并行计算库。
Dask由两部分组成:
动态任务调度针对计算进行了优化。这类似于 气流、Luigi、Celery或Make,但针对交互进行了优化 计算工作负载。
“大数据”收藏如并行阵列、数据帧和列表 将常见接口(如NumPy、Pandas或Python迭代器)扩展为 大于内存或分布式环境。这些并行集合 在动态任务调度程序之上运行。
Dask强调以下美德:
熟悉:提供并行化的NumPy数组和Pandas DataFrame对象
灵活:为更多自定义工作负载提供任务调度接口 以及与其他项目的整合。
原生:支持纯Python中的分布式计算,并可访问 PyData堆栈。
快速:以低开销、低延迟和最小序列化方式运行 快速数值算法所必需的
纵向扩展:在具有数千个核心的群集上弹性运行
按比例缩小:只需一个进程即可在笔记本电脑上设置和运行
响应式:在设计时考虑到了交互计算,它提供了快速 帮助人类的反馈和诊断
请参阅Dask.Distributed文档(单独网站)了解更多技术信息 在Dask的分布式调度器上。
熟悉的用户界面¶
Dask数据帧模仿Pandas-文档
import pandas as pd import dask.dataframe as dd
df = pd.read_csv('2015-01-01.csv') df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean() df.groupby(df.user_id).value.mean().compute()
Dask数组模仿NumPy-文档
import numpy as np import dask.array as da
f = h5py.File('myfile.hdf5') f = h5py.File('myfile.hdf5')
x = np.array(f['/small-data']) x = da.from_array(f['/big-data'],
chunks=(1000, 1000))
x - x.mean(axis=1) x - x.mean(axis=1).compute()
遮蔽袋模拟迭代器、Toolz和PySpark-文档
import dask.bag as db
b = db.read_text('2015-*-*.json.gz').map(json.loads)
b.pluck('name').frequencies().topk(10, lambda pair: pair[1]).compute()
任务延迟模拟循环和包装自定义代码-文档
from dask import delayed
L = []
for fn in filenames: # Use for loops to build up computation
data = delayed(load)(fn) # Delay execution of function
L.append(delayed(process)(data)) # Build connections between variables
result = delayed(summarize)(L)
result.compute()
The concurrent.futures界面提供自定义的通用深渊翻滚 任务:-文档
from dask.distributed import Client
client = Client('scheduler:port')
futures = []
for fn in filenames:
future = client.submit(load, fn)
futures.append(future)
summary = client.submit(summarize, futures)
summary.result()