Mara-pipelines 是一個輕量級的數據轉換框架,具有透明和低復雜性的特點。其他特點如下:
基于非常簡單的Python代碼就能完成流水線開發。
使用 PostgreSQL 作為數據處理引擎。
有Web界面可視化分析流水線執行過程。
基于 Python 的 multiprocessing 單機流水線執行。不需要分布式任務隊列。輕松調試和輸出日志。
基于成本的優先隊列:首先運行具有較高成本(基于記錄的運行時間)的節點。
此外,在Mara-pipelines的Web界面中,你不僅可以查看和管理流水線及其任務節點,你還可以直接觸發這些流水線和節點,非常好用:
1.安裝
由于使用了大量的依賴,Mara-pipelines 并不適用于 Windows,如果你需要在 Windows 上使用 Mara-pipelines,請使用 Docker 或者 Windows 下的 linux 子系統。
使用pip安裝Mara-pipelines:
pip install mara-pipelines
或者:
pip install git+https://github.com/mara/mara-pipelines.git
2.使用示例
這是一個基礎的流水線演示,由三個相互依賴的節點組成,包括 任務1(ping_localhost), 子流水線(sub_pipeline), 任務2(sleep):
# 注意,這個示例中使用了部分國外的網站,如果無法訪問,請變更為國內網站。 frommara_pipelines.commands.bash importRunBash frommara_pipelines.pipelines importPipeline, Task frommara_pipelines.ui.cli importrun_pipeline, run_interactively pipeline = Pipeline( id='demo', description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands') pipeline.add(Task(id='ping_localhost', description='Pings localhost', commands=[RunBash('ping -c 3 localhost')])) sub_pipeline = Pipeline(id='sub_pipeline', description='Pings a number of hosts') forhost in['google', 'amazon', 'facebook']: sub_pipeline.add(Task(id=f'ping_{host}', description=f'Pings {host}', commands=[RunBash(f'ping -c 3 {host}.com')])) sub_pipeline.add_dependency('ping_amazon', 'ping_facebook') sub_pipeline.add(Task(id='ping_foo', description='Pings foo', commands=[RunBash('ping foo')]), ['ping_amazon']) pipeline.add(sub_pipeline, ['ping_localhost']) pipeline.add(Task(id='sleep', description='Sleeps for 2 seconds', commands=[RunBash('sleep 2')]), ['sub_pipeline'])
可以看到,Task包含了多個commands,這些 command s會用于真正地執行動作。
而 pipeline.add 的參數中,第一個參數是其節點,第二個參數是此節點的上游。如:
pipeline.add(sub_pipeline, ['ping_localhost'])
則表明必須執行完 ping_localhost 才會執行 sub_pipeline.
為了運行這個流水線,需要配置一個 PostgreSQL 數據庫來存儲運行時信息、運行輸出和增量處理狀態:
importmara_db.auto_migration
importmara_db.config
importmara_db.dbs
mara_db.config.databases
= lambda: {'mara': mara_db.dbs.PostgreSQLDB(host='localhost', user='root', database='example_etl_mara')}
mara_db.auto_migration.auto_discover_models_and_migrate()
如果 PostgresSQL 正在運行并且賬號密碼正確,輸出如下所示(創建了一個包含多個表的數據庫):
Created database "postgresql+psycopg2://root@localhost/example_etl_mara" CREATETABLEdata_integration_file_dependency ( node_path TEXT[] NOTNULL, dependency_type VARCHARNOTNULL, hashVARCHAR, timestampTIMESTAMPWITHOUTTIMEZONE, PRIMARY KEY(node_path, dependency_type) ); .. more tables
為了運行這個流水線,你需要:
frommara_pipelines.ui.cli importrun_pipeline run_pipeline(pipeline)
這將運行單個流水線節點及其 (sub_pipeline) 所依賴的所有節點:
run_pipeline(sub_pipeline, nodes=[sub_pipeline.nodes['ping_amazon']], with_upstreams=True)
3.Web 界面
我認為 mara-pipelines 最有用的是他們提供了基于Flask管控流水線的Web界面。
對于每條流水線,他們都有一個頁面顯示:
所有子節點的圖以及它們之間的依賴關系
流水線的總體運行時間圖表以及過去 30 天內最昂貴的節點(可配置)
所有流水線節點及其平均運行時間和由此產生的排隊優先級的表
流水線最后一次運行的輸出和時間線
對于每個任務,都有一個頁面顯示
流水線中任務的上游和下游
最近 30 天內任務的運行時間
任務的所有命令
任務最后運行的輸出
此外,流水線和任務可以直接從網頁端調用運行,這是非常棒的特點。
責任編輯:haq
-
數據
+關注
關注
8文章
7335瀏覽量
94757 -
python
+關注
關注
57文章
4876瀏覽量
90025
原文標題:超級方便的輕量級 Python 流水線工具,還有漂亮的可視化界面!
文章出處:【微信號:LinuxHub,微信公眾號:Linux愛好者】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
軟通動力ASDM AI優先軟件研發流水線助力軟件工程發展
智造中樞:疆鴻智能DEVICENET轉MODBUS RTU網關在食品包裝流水線的融合之舞
流水線基本結構
如何更好地選擇工業流水線上用的條碼掃碼器?
固定式掃碼器,用于流水線掃紙盒子上的條碼進行分揀
流水線掃碼升級選NVF230!工業二維碼讀碼器方案實測
自動化開裝封碼流水線數據采集解決方案
面包成型流水線數據采集遠程監控系統
遠程io模塊在汽車流水線的應用
工業4.0時代,為什么你的流水線必須配備固定式掃碼器?
【經驗分享】玩轉FPGA串口通信:從“幻覺調試”到代碼解析
工業流水線上用的條碼掃碼器,如何選擇與使用?
RISC-V五級流水線CPU設計
效率卡在掃碼環節?工廠流水線加裝工業級掃碼設備,產能直接拉滿
基于非常簡單的Python代碼就能完成流水線開發
評論