mara-pipelines 是一个轻量级的数据转换框架,具有透明和低复杂性的特点。其他特点如下:
基于非常简单的python代码就能完成流水线开发。使用 postgresql 作为数据处理引擎。有web界面可视化分析流水线执行过程。基于 python 的 multiprocessing 单机流水线执行。不需要分布式任务队列。轻松调试和输出日志。基于成本的优先队列:首先运行具有较高成本(基于记录的运行时间)的节点。此外,在mara-pipelines的web界面中,你不仅可以查看和管理流水线及其任务节点,你还可以直接触发这些流水线和节点,非常好用:
1.安装
由于使用了大量的依赖,mara-pipelines 并不适用于 windows,如果你需要在 windows 上使用 mara-pipelines,请使用 docker 或者 windows 下的 linux 子系统。
使用pip安装mara-pipelines:
pip install mara-pipelines或者:
pip install git+https://github.com/mara/mara-pipelines.git2.使用示例
这是一个基础的流水线演示,由三个相互依赖的节点组成,包括 任务1(ping_localhost), 子流水线(sub_pipeline), 任务2(sleep):
# 注意,这个示例中使用了部分国外的网站,如果无法访问,请变更为国内网站。from mara_pipelines.commands.bash import runbashfrom mara_pipelines.pipelines import pipeline, taskfrom mara_pipelines.ui.cli import run_pipeline, run_interactivelypipeline = pipeline( id='demo', description='a small pipeline that demonstrates the interplay between pipelines, tasks and commands')pipeline.add(task(id='ping_localhost', description='pings localhost', commands=[runbash('ping -c 3 localhost')]))sub_pipeline = pipeline(id='sub_pipeline', description='pings a number of hosts')for host in ['google', 'amazon', 'facebook']: sub_pipeline.add(task(id=f'ping_{host}', description=f'pings {host}', commands=[runbash(f'ping -c 3 {host}.com')]))sub_pipeline.add_dependency('ping_amazon', 'ping_facebook')sub_pipeline.add(task(id='ping_foo', description='pings foo', commands=[runbash('ping foo')]), ['ping_amazon'])pipeline.add(sub_pipeline, ['ping_localhost'])pipeline.add(task(id='sleep', description='sleeps for 2 seconds', commands=[runbash('sleep 2')]), ['sub_pipeline'])可以看到,task包含了多个commands,这些 command s会用于真正地执行动作。
而 pipeline.add 的参数中,第一个参数是其节点,第二个参数是此节点的上游。如:
pipeline.add(sub_pipeline, ['ping_localhost'])则表明必须执行完 ping_localhost 才会执行 sub_pipeline.
为了运行这个流水线,需要配置一个 postgresql 数据库来存储运行时信息、运行输出和增量处理状态:
import mara_db.auto_migrationimport mara_db.configimport mara_db.dbsmara_db.config.databases = lambda: {'mara': mara_db.dbs.postgresqldb(host='localhost', user='root', database='example_etl_mara')}mara_db.auto_migration.auto_discover_models_and_migrate()如果 postgressql 正在运行并且账号密码正确,输出如下所示(创建了一个包含多个表的数据库):
created database postgresql+psycopg2://root@localhost/example_etl_maracreate table data_integration_file_dependency ( node_path text[] not null, dependency_type varchar not null, hash varchar, timestamp timestamp without time zone, primary key (node_path, dependency_type));.. more tables为了运行这个流水线,你需要:
from mara_pipelines.ui.cli import run_pipelinerun_pipeline(pipeline)
这将运行单个流水线节点及其 ( **sub_pipeline ** ) 所依赖的所有节点:
run_pipeline(sub_pipeline, nodes=[sub_pipeline.nodes['ping_amazon']], with_upstreams=true)3.web 界面
我认为 mara-pipelines 最有用的是他们提供了基于flask管控流水线的web界面。
对于每条流水线,他们都有一个页面显示:
所有子节点的图以及它们之间的依赖关系流水线的总体运行时间图表以及过去 30 天内最昂贵的节点(可配置)所有流水线节点及其平均运行时间和由此产生的排队优先级的表流水线最后一次运行的输出和时间线
对于每个任务,都有一个页面显示
流水线中任务的上游和下游最近 30 天内任务的运行时间任务的所有命令任务最后运行的输出
此外,流水线和任务可以直接从网页端调用运行,这是非常棒的特点。
动力电池如何进行热管理
Maxim推出3通道 RGB激光驱动器MAX3600
5G时代的到来并不会让WiFi退出历史舞台
一文简述3D打印技术
适合女性的运动电子设备汇总及价格参考
超级方便的轻量级Python流水线工具
新媒体编辑APP开发功能
新唐科技N567HP330(OTP)芯片介绍
基于python的用于构建仿真及测试用例的lib库cocotb
详细介绍了加酸壶清洗机自动控制系统的设计思路,方法及实施方案
我国应抓住新工业革命的宝贵机遇,在关系核心竞争力的领域上取得突破
法德两国宣布将在电池制造领域合作
来10年国内将年均新增44GW风电/89GW光伏装机量
ChatGPT的应用场景越加丰富 GPT-4生成的投资推介材料获更多资金支持
宜百利:这样的洗碗刷你见过么
XR虚拟演播厅解决方案
利用RFID射频识别实现停车场智能化管理
TouchGFX 4.16提供更新更易于访问的方法
MTS传感器在单缸液压圆锥破碎机上的应用
关注| 起诉小鹏汽车雇员,特斯拉是真的慌了吗?