背景介紹
#[function("length(varchar)->int4")] pubfnchar_length(s:&str)->i32{ s.chars().count()asi32 }
這是 RisingWave 中一個 SQL 函數的實現。只需短短幾行代碼,通過在 Rust 函數上加一行過程宏,我們就把它包裝成了一個 SQL 函數。
dev=>selectlength('RisingWave');
length
--------
11
(1row)
類似的,除了標量函數(Scalar Function),表函數(Table Function)和聚合函數(Aggregate Function)也可以用這樣的方法定義。我們甚至可以利用泛型來同時定義多種類型的重載函數:
#[function("generate_series(int4,int4)->setofint4")]
#[function("generate_series(int8,int8)->setofint8")]
fngenerate_series(start:T,stop:T)->implIterator- {
start..=stop
}
#[aggregate("max(int2)->int2",state="ref")]
#[aggregate("max(int4)->int4",state="ref")]
#[aggregate("max(int8)->int8",state="ref")]
fnmax
(state:T,input:T)->T{
state.max(input)
}
dev=>selectgenerate_series(1,3); generate_series ----------------- 1 2 3 (3rows) dev=>selectmax(x)fromgenerate_series(1,3)t(x); max ----- 3 (1row)
利用 Rust 過程宏,我們將函數實現背后的瑣碎細節隱藏起來,向開發者暴露一個干凈簡潔的接口。這樣我們便能夠專注于函數本身邏輯的實現,從而大幅提高開發和維護的效率。
而當一個接口足夠簡單,簡單到連 ChatGPT 都可以理解時,讓 AI 幫我們寫代碼就不再是天方夜譚了。(警告:AI 會自信地寫出 Bug,使用前需要人工 review)


向 GPT 展示一個 SQL 函數實現的例子,然后給出一個新函數的文檔,讓他生成完整的 Rust 實現代碼。
在本文中,我們將深度解析 RisingWave 中 #[function] 過程宏的設計目標和工作原理。通過回答以下幾個問題揭開過程宏的魔法面紗:
函數執行的過程是怎樣的?
為什么選擇使用過程宏實現?
這個宏是如何展開的?生成了怎樣的代碼?
利用過程宏還能實現哪些高級需求?
1向量化計算模型
RisingWave 是一個支持 SQL 語言的流處理引擎。在內部處理數據時,它使用基于列式內存存儲的向量化計算模型。在這種模型下,一個表(Table)的數據按列分割,每一列的數據連續存儲在一個數組(Array)中。為了便于理解,本文中我們采用列式內存的行業標準 Apache Arrow 格式作為示例。下圖是其中一批數據(RecordBatch)的內存結構,RisingWave 的列存結構與之大同小異。

列式內存存儲的數據結構
在函數求值時,我們首先把每個輸入參數對應的數據列合并成一個 RecordBatch,然后依次讀取每一行的數據,作為參數調用函數,最后將函數返回值壓縮成一個數組,作為最終返回結果。這種一次處理一批數據的方式就是向量化計算。

函數的向量化求值 之所以要這么折騰一圈做列式存儲、向量化求值,本質上還是因為批處理能夠均攤掉控制邏輯的開銷,并充分利用現代 CPU 中的緩存局部性和 SIMD 指令等特性,實現更高的訪存和計算性能。
我們將上述函數求值過程抽象成一個 Rust trait,大概長這樣:
pubtraitScalarFunction{ ///Callthefunctiononeachrowandreturnresultsasanarray. fneval(&self,input:&RecordBatch)->Result; }
在實際查詢中,多個函數嵌套組合成一個表達式。例如表達式 a + b - c等價于 sub(add(a, b), c)。對表達式求值就相當于遞歸地對多個函數進行求值。這個表達式本身也可以看作一個函數,同樣適用上面的 trait。因此本文中我們不區分表達式和標量函數。
2表達式執行的黑白魔法:類型體操 vs 代碼生成
接下來我們討論在 Rust 語言中如何具體實現表達式向量化求值。
2.1 我們要實現什么
回顧上一節中提到的求值過程,寫成代碼的整體結構是這樣的:
//首先定義好對每行數據的求值函數
fnadd(a:i32,b:i32)->i32{
a+b
}
//對于每一種函數,我們需要定義一個struct
structAdd;
//并為之實現ScalarFunctiontrait
implScalarFunctionforAdd{
//在此方法中實現向量化批處理
fneval(&self,input:&RecordBatch)->Result{
//我們拿到一個RecordBatch,里面包含了若干列,每一列對應一個輸入參數
//此時我們拿到的列是Arc,也就是一個**類型擦除**的數組
leta0:Arc=input.columns(0);
leta1:Arc=input.columns(1);
//我們可以獲取每一列的數據類型,并驗證它符合函數的要求
ensure!(a0.data_type()==DataType::Int32);
ensure!(a1.data_type()==DataType::Int32);
//然后將它們downcast到具體的數組類型
leta0:&Int32Array=a0.as_any().downcast_ref().context("typemismatch")?;
leta1:&Int32Array=a1.as_any().downcast_ref().context("typemismatch")?;
//在求值前,我們還需要準備好一個arraybuilder存儲返回值
letmutbuilder=Int32Builder::with_capacity(input.num_rows());
//此時我們就可以通過.iter()來遍歷具體的元素了
for(v0,v1)ina0.iter().zip(a1.iter()){
//這里我們拿到的v0和v1是Option類型
//對于add函數來說
letres=match(v0,v1){
//只有當所有輸入都非空時,函數才會被計算
(Some(v0),Some(v1))=>Some(add(v0,v1)),
//而任何一個輸入為空會導致輸出也為空
_=>None,
};
//最后將結果存入arraybuilder
builder.append_option(res);
}
//返回結果array
Ok(Arc::new(builder.finish()))
}
}
我們發現,這個函數本體的邏輯只需要短短一個 fn 就可以描述:
fnadd(a:i32,b:i32)->i32{
a+b
}
然而,為了支持在列存上進行向量化計算,還需要實現后面這一大段樣板代碼來處理瑣碎邏輯。有什么辦法能自動生成這坨代碼呢?
2.2 類型體操
著名數據庫專家遲先生曾在博文「數據庫表達式執行的黑魔法:用 Rust 做類型體操[1]」中討論了各種可能的解決方法,包括:
基于 trait 的泛型
聲明宏
過程宏
外部代碼生成器
并且系統性地闡述了它們的關系和工程實現中的利弊:

從方法論的角度來講,一旦開發者在某個需要使用泛型的地方使用了宏展開,調用它的代碼就不可能再通過 trait-based generics 使用這段代碼。從這個角度來說,越是“大道至簡”的生成代碼,越難維護。但反過來說,如果要完全實現 trait-based generics,往往要和編譯器斗智斗勇,就算是通過編譯也需要花掉大量的時間。
我們首先來看基于 trait 泛型的解決方案。在 arrow-rs 中有一個名為 binary[2] 的 kernel 就是做這個的:給定一個二元標量函數,將其應用于兩個 array 進行向量化計算,并生成一個新的 array。它的函數簽名如下:
pubfnbinary( a:&PrimitiveArray, b:&PrimitiveArray, op:F )->Result,ArrowError> where A:ArrowPrimitiveType, B:ArrowPrimitiveType, O:ArrowPrimitiveType, F:Fn(::Native,::Native)-> ::Native,
相信你已經開始感受到「類型體操」的味道了。盡管如此,它依然有以下這些局限:
支持的類型僅限于 PrimitiveArray ,也就是 int, float, decimal 等基礎類型。對于復雜類型,如 bytes, string, list, struct,因為沒有統一到一個 trait 下,所以每種都需要一個新的函數。
僅適用于兩個參數的函數。對于一個或更多參數,每一種都需要這樣一個函數。arrow-rs 中也只內置了 unary 和 binary 兩種 kernel。
僅適用于一種標量函數簽名,即不出錯的、不接受空值的函數。考慮其它各種可能的情況下,需要有不同的 F 定義:
fnadd(i32,i32)->i32; fnchecked_add(i32,i32)->Result; fnoptional_add(i32,Option )->Option ;
如果考慮以上三種因素的結合,那么可能的組合無窮盡也,不可能覆蓋所有的函數類型。
2.3 類型體操 + 聲明宏
在文章《類型體操》及 RisingWave 的初版實現中,作者使用 泛型 + 聲明宏 的方法部分解決了以上問題:
1. 首先設計一套精妙的類型系統,將全部類型統一到一個 trait 下,解決了第一個問題。 
2. 然后,使用聲明宏來生成多種類型的 kernel 函數。覆蓋常見的 1、2、3 個參數,以及 T 和 Option 的輸入輸出組合。生成了常用的 unary binary ternary unary_nullable unary_bytes 等 kernel,部分解決了第二三個問題。(具體實現參見 RisingWave 早期代碼[3])當然,這里理論上也可以繼續使用類型體操。例如,引入 trait 統一 (A,) (A, B) (A, B, C) ,用 Into, AsRef trait 統一 T, Option
3. 最后,這些 kernel 沒有解決類型動態 downcast 的問題。為此,作者又利用聲明宏設計了一套精妙的宏套宏機制來實現動態派發。
macro_rules!for_all_cmp_combinations{
($macro:tt$(,$x:tt)*)=>{
$macro!{
[$($x),*],
//comparisonacrossintegertypes
{int16,int32,int32},
{int32,int16,int32},
{int16,int64,int64},
//...
盡管解決了一些問題,但這套方案依然有它的痛點:
基于 trait 做類型體操使我們不可避免地陷入到與 Rust 編譯器斗智斗勇之中。
依然沒有全面覆蓋所有可能情況。有相當一部分函數仍然需要開發者手寫向量化實現。
性能。當我們需要引入 SIMD 對部分函數進行優化時,需要重新實現一套 kernel 函數。
沒有對開發者隱藏全部細節。函數開發者依然需要先熟悉類型體操和聲明宏的工作原理,才能比較流暢地添加函數。
究其原因,我認為是函數的變體形式過于復雜,而 Rust 的 trait 和聲明宏系統的靈活性不足導致的。本質上是一種元編程能力不夠強大的表現。
2.4 元編程?
讓我們來看看其他語言和框架是怎么解決這個問題的。
首先是 Python,一種靈活的動態類型語言。這是 Flink 中的 Python UDF 接口,其它大數據系統的接口也大同小異:
@udf(result_type='BIGINT') defadd(i,j): returni+j
我們發現它是用 @udf 這個裝飾器標記了函數的簽名信息,然后在運行時對不同類型進行相應的處理。當然,由于它本身是動態類型,因此 Rust 中的很多問題在 Python 中根本不存在,代價則是性能損失。
接下來是 Java,它是一種靜態類型語言,但通過虛擬機 JIT 運行。這是 Flink 中的 Java UDF 接口:
publicstaticclassSubstringFunctionextendsScalarFunction{
publicStringeval(Strings,Integerbegin,Integerend){
returns.substring(begin,end);
}
}
可以看到同樣也很短。這次甚至不需要額外標記類型了,因為靜態類型系統本身就包含了類型信息。我們可以通過運行時反射拿到類型信息,并通過 JIT 機制在運行時生成高效的強類型代碼,兼具靈活與性能。
最后是 Zig,一種新時代的 C 語言。它最大的特色是任何代碼都可以加上 comptime 關鍵字在編譯時運行,因此具備非常強的元編程能力。tygg 在博文「Zig lang 初體驗 -- 『大道至簡』的 comptime[4]」中演示了用 Zig 實現遲先生類型體操的方法:通過 編譯期反射 和 過程式的代碼生成 來代替開發者完成類型體操。
用一張表總結一下:
| 語言 | 類型反射 | 代碼生成 | 靈活性 | 性能 |
|---|---|---|---|---|
| Python | 運行時 | — | ||
| Java | 運行時 | 運行時 | ||
| Zig | 編譯時 | 編譯時 | ||
| Rust (trait + macro_rules) | — | 編譯時 |
可以發現,Zig 語言強大的元編程能力提供了相對最好的解決方案。
2.5 過程宏
那么 Rust 里面有沒有類似 Zig 的特性呢。其實是有的,那就是過程宏(Procedural Macros)。它可以在編譯期動態執行任何 Rust 代碼來修改 Rust 程序本身。只不過,它的編譯時和運行時代碼是物理分開的,相比 Zig 的體驗沒有那么統一,但是效果幾乎一樣。
參考 Python UDF 的接口設計,我們便得到了 ”大道至簡“ 的 Rust 函數接口:
#[function("add(int,int)->int")]
fnadd(a:i32,b:i32)->i32{
a+b
}
從用戶的角度看,他只需要在自己熟悉的 Rust 函數上面標一個函數簽名。其它的類型體操和代碼生成操作都被隱藏在過程宏之后,完全無需關心。
此時我們已經拿到了一個函數所必須的全部信息,接下來我們將看到過程宏如何生成向量化執行所需的樣板代碼。
3展開 #[function]
3.1 解析函數簽名
首先我們要實現類型反射,也就是分別解析 SQL 函數和 Rust 函數的簽名,以此決定后面如何生成代碼。在過程宏入口處我們會拿到兩個 TokenStream,分別包含了標注信息和函數本體:
#[proc_macro_attribute]
pubfnfunction(attr:TokenStream,item:TokenStream)->TokenStream{
//attr:"add(int,int)->int"
//item:fnadd(a:i32,b:i32)->i32{a+b}
...
}
我們使用 syn 庫將 TokenStream 轉為 AST,然后:
解析 SQL 函數簽名字符串,獲取函數名、輸入輸出類型等信息。
解析 Rust 函數簽名,獲取函數名、每個參數和返回值的類型模式、是否 async 等信息。
具體地:
對于參數類型,我們確定它是 T 或者 Option
對于返回值類型,我們將其識別為:T,Option
這將決定我們后面如何調用函數以及處理錯誤。
3.2 定義類型表
作為 trait 類型體操的代替方案,我們在過程宏中定義了這樣一張類型表,來描述類型系統之間的對應關系,并且提供了相應的查詢函數。
//nameprimitivearrayprefixdatatype constTYPE_MATRIX:&str=" void_NullNull boolean_BooleanBoolean smallintyInt16Int16 intyInt32Int32 bigintyInt64Int64 realyFloat32Float32 floatyFloat64Float64 ... varchar_StringUtf8 bytea_BinaryBinary array_ListList struct_StructStruct ";
比如當我們拿到用戶的函數簽名后,
#[function("length(varchar)->int")]
查表即可得知:
第一個參數 varchar 對應的 array 類型為 StringArray
返回值 int 對應的數據類型為 DataType::Int32,對應的 Builder 類型為 Int32Builder
并非所有輸入輸出均為 primitive 類型,因此無法進行 SIMD 優化
在下面的代碼生成中,這些類型將被填入到對應的位置。
3.3 生成求值代碼
在代碼生成階段,我們主要使用 quote 庫來生成并組合代碼片段。最終生成的代碼整體結構如下:
quote!{
struct#struct_name;
implScalarFunctionfor#struct_name{
fneval(&self,input:&RecordBatch)->Result{
#downcast_arrays
letmutbuilder=#builder;
#eval
Ok(Arc::new(builder.finish()))
}
}
}
下面我們來逐個填寫代碼片段,首先是 downcast 輸入 array:
letchildren_indices=(0..self.args.len());
letarrays=children_indices.map(|i|format_ident!("a{i}"));
letarg_arrays=children_indices.map(|i|format_ident!("{}",types::array_type(&self.args[*i])));
letdowncast_arrays=quote!{
#(
let#arrays:arg_arrays=input.column(#children_indices).as_any().downcast_ref()
.ok_or_else(||ArrowError::CastError(...))?;
)*
};
builder:
letbuilder_type=format_ident!("{}",types::array_builder_type(ty));
letbuilder=quote!{#builder_type::with_capacity(input.num_rows())};
接下來是最關鍵的執行部分,我們先寫出函數調用的那一行:
letinputs=children_indices.map(|i|format_ident!("i{i}"));
letoutput=quote!{#user_fn_name(#(#inputs,)*)};
//example:add(i0,i1)
然后考慮:這個表達式返回了什么類型呢?這需要根據 Rust 函數簽名決定,它可能包含 Option,也可能包含 Result。我們進行錯誤處理,然后將其歸一化到 Option
letoutput=matchuser_fn.return_type_kind{
T=>quote!{Some(#output)},
Option=>quote!{#output},
Result=>quote!{Some(#output?)},
ResultOption=>quote!{#output?},
};
//example:Some(add(i0,i1))
下面考慮:這個函數接收什么樣的類型作為輸入?這同樣需要根據 Rust 函數簽名決定,每個參數可能是或不是 Option。如果函數不接受 Option 輸入,但實際輸入的卻是 null,那么我們默認它的返回值就是 null,此時無需調用函數。因此,我們使用 match 語句來對輸入參數做預處理:
letsome_inputs=inputs.iter()
.zip(user_fn.arg_is_option.iter())
.map(|(input,opt)|{
if*opt{
quote!{#input}
}else{
quote!{Some(#input)}
}
});
letoutput=quote!{
//這里的inputs是從array中拿出來的Option
match(#(#inputs,)*){
//我們將部分參數unwrap后再喂給函數
(#(#some_inputs,)*)=>#output,
//如有unwrap失敗則直接返回null
_=>None,
}
};
//example:
//match(i0,i1){
//(Some(i0),Some(i1))=>Some(add(i0,i1)),
//_=>None,
//}
此時我們已經拿到了一行的返回值,可以將它 append 到 builder 中:
letappend_output=quote!{builder.append_option(#output);};
最后在外面套一層循環,對輸入逐行操作:
leteval=quote!{
for(i,(#(#inputs,)*))inmultizip((#(#arrays.iter(),)*)).enumerate(){
#append_output
}
};
如果一切順利的話,過程宏展開生成的代碼將如 2.1 節中所示的那樣。
3.4 函數注冊
到此為止我們已經完成了最核心、最困難的部分,即生成向量化求值代碼。但是,用戶該怎么使用生成的代碼呢?
注意到一開始我們生成了一個 struct。因此,我們可以允許用戶指定這個 struct 的名稱,或者定義一套規范自動生成唯一的名稱。這樣用戶就能在這個 struct 上調用函數了。
//指定生成名為Add的struct
#[function("add(int,int)->int",output="Add")]
fnadd(a:i32,b:i32)->i32{
a+b
}
//調用生成的向量化求值函數
letinput:RecordBatch=...;
letoutput:RecordBatch=Add.eval(&input).unwrap();
不過在實際場景中,很少有這種使用特定函數的需求。更多是在項目中定義很多函數,然后在解析 SQL 查詢時,動態地查找匹配的函數。為此我們需要一種全局的函數注冊和查找機制。
問題來了:Rust 本身沒有反射機制,如何在運行時獲取所有由 #[function] 靜態定義的函數呢?
答案是:利用程序的鏈接時(link time)特性,將函數指針等元信息放入特定的 section 中。程序鏈接時,鏈接器(linker)會自動收集分布在各處的符號(symbol)集中在一起。程序運行時即可掃描這個 section 獲取全部函數了。
Rust 社區的 dtolnay 大佬為此需求做了兩個開箱即用的庫:linkme[5] 和 inventory[6]。其中前者是直接利用上述機制,后者是利用 C 標準的 constructor 初始化函數,但背后的原理沒有本質區別。下面我們以 linkme 為例來演示如何實現注冊機制。
首先我們需要在公共庫(而不是 proc-macro)中定義函數簽名的結構:
pubstructFunctionSignature{
pubname:String,
pubarg_types:Vec,
pubreturn_type:DataType,
pubfunction:Box,
}
然后定義一個全局變量 REGISTRY 作為注冊中心。它會在第一次被訪問時利用 linkme 將所有 #[function] 定義的函數收集到一個 HashMap 中:
///Acollectionofdistributed`#[function]`signatures.
#[linkme::distributed_slice]
pubstaticSIGNATURES:[fn()->FunctionSignature];
lazy_static::lazy_static!{
///Globalfunctionregistry.
pubstaticrefREGISTRY:FunctionRegistry={
letmutsignatures=HashMap::>::new();
forsiginSIGNATURES{
letsig=sig();
signatures.entry(sig.name.clone()).or_default().push(sig);
}
FunctionRegistry{signatures}
};
}
最后在 #[function] 過程宏中,我們為每個函數生成如下代碼:
#[linkme::distributed_slice(SIGNATURES)]
fn#sig_name()->FunctionSignature{
FunctionSignature{
name:#name.into(),
arg_types:vec![#(#args),*],
return_type:#ret,
//這里#struct_name就是我們之前生成的函數結構體
function:Box::new(#struct_name),
}
}
如此一來,用戶就可以通過 FunctionRegistry 提供的方法動態查找函數并進行求值了:
letgcd=REGISTRY.get("gcd",&[Int32,Int32],&Int32);
letoutput:RecordBatch=gcd.function.eval(&input).unwrap();
3.5 小結
以上我們完整闡述了 #[function] 過程宏的工作原理和實現過程:
使用 syn 庫解析函數簽名
使用 quote 庫生成定制化的向量化求值代碼
使用 linkme 庫實現函數的全局注冊和動態查找
其中:
SQL 簽名決定了如何從 input array 中讀取數據,如何生成 output array
Rust 簽名決定了如何調用用戶的 Rust 函數,如何處理空值和錯誤
類型查找表決定了 SQL 類型和 Rust 類型的映射關系
相比 trait + 聲明宏的解決方案,過程宏中的 “過程式” 風格為我們提供了極大的靈活性,一攬子解決了之前提到的全部問題。在下一章中,我們將會在這個框架的基礎上繼續擴展,解決更多實際場景下的復雜需求。
4高級功能
抽象的問題是簡單的,但現實的需求是復雜的。上面的原型看似解決了所有問題,但在 RisingWave 的實際工程開發中,我們遇到了各種稀奇古怪的需求,都無法用最原始的 #[function] 宏實現。下面我們來逐一介紹這些問題,并利用過程宏的靈活性見招拆招。
4.1 支持多類型重載
有些函數支持大量不同類型的重載,例如 + 運算對幾乎支持所有數字類型。此時我們一般會復用同一個泛型函數,然后用不同的類型去實例化它。
#[function("add(*int,*int)->auto")]
#[function("add(*float,*float)->auto")]
#[function("add(decimal,decimal)->decimal")]
#[function("add(interval,interval)->interval")]
fnadd(l:T1,r:T2)->Result
where
T1:Into+Debug,
T2:Into+Debug,
T3:CheckedAdd
因此我們支持在同一個函數上同時標記多個#[function] 宏。此外,我們還支持使用類型通配符將一個#[function] 自動展開成多個,并使用 auto 自動推斷返回類型。例如 *int 通配符表示全部整數類型 int2, int4, int8,那么 add(*int, *int) 將展開為 3 x 3 = 9 種整數的組合,返回值自動推斷為兩種類型中最大的一個:
#[function("add(int2,int2)->int2")]
#[function("add(int2,int4)->int4")]
#[function("add(int2,int8)->int8")]
#[function("add(int4,int4)->int4")]
...
而如果泛型不能滿足一些特殊類型的要求,你也完全可以定義新函數進行特化(specialization):
#[function("add(interval,timestamp)->timestamp")]
fninterval_timestamp_add(l:Interval,r:Timestamp)->Result{
r.checked_add(l).ok_or(ExprError::NumericOutOfRange)
}
這一特性幫助我們快速實現函數重載,同時避免了冗余代碼。
4.2 自動 SIMD 優化
作為零開銷抽象語言,Rust 從不向性能妥協,#[function] 宏也是如此。對于很多簡單函數,理論上可以利用 CPU 內置的 SIMD 指令實現上百倍的性能提升。然而,編譯器往往只能對簡單的循環結構實現自動 SIMD 向量化。一旦循環中出現分支跳轉等復雜結構,自動向量化就會失效。
//簡單循環支持自動向量化
assert_eq!(a.len(),n);
assert_eq!(b.len(),n);
assert_eq!(c.len(),n);
foriin0..n{
c[i]=a[i]+b[i];
}
//一旦出現分支結構,如錯誤處理、越界檢查等,自動向量化就會失效
foriin0..n{
c.push(a[i].checked_add(b[i])?);
}
不幸的是,我們前文中生成的代碼結構并不利于編譯器進行自動向量化,因為循環中的 builder.append_option() 操作本身就自帶條件分支。
為了支持自動向量化,我們需要對代碼生成邏輯進一步特化:
首先根據函數簽名判斷這個函數能否實現 SIMD 優化。這需要滿足以下兩個主要條件:
比如:
#[function("equal(int,int)->boolean")]
fnequal(a:i32,b:i32)->bool{
a==b
}
所有輸入輸出類型均為基礎類型,即 boolean, int, float, decimal
Rust 函數的輸入類型均不含 Option,輸出不含 Option 和 Result
一旦上述條件滿足,我們會對 #eval 代碼段進行特化,將其替換為這樣的代碼,調用 arrow-rs 內置的 unary 和 binary kernel 實現自動向量化:
//SIMDoptimizationforprimitivetypes
matchself.args.len(){
0=>quote!{
letc=#ret_array_type::from_iter_values(
std::repeat_with(||#user_fn_name()).take(input.num_rows())
);
letarray=Arc::new(c);
},
1=>quote!{
letc:#ret_array_type=arrow_arith::unary(a0,#user_fn_name);
letarray=Arc::new(c);
},
2=>quote!{
letc:#ret_array_type=arrow_arith::binary(a0,a1,#user_fn_name)?;
letarray=Arc::new(c);
},
n=>todo!("SIMDoptimizationfor{n}arguments"),
}
需要注意,如果用戶函數本身包含分支結構,那么自動向量化也是無效的。我們只是盡力為編譯器創造了實現優化的條件。另一方面,這一優化也不是完全安全的,它會使得原本為 null 的輸入強制執行。例如整數除法 a / b,如果 b 為 null,原本不會執行,現在卻會執行 a / 0,導致除零異常而崩潰。這種情況下我們只能修改函數簽名,避免生成特化代碼。
整體而言,實現這一功能后,用戶編寫代碼不需要有任何變化,但是部分函數的性能得到了大幅提高。這對于高性能數據處理系統而言是必須的。
4.3 返回字符串直接寫入 buffer
很多函數會返回字符串。但是樸素地返回 String 會導致大量動態內存分配,降低性能。
#[function("concat(varchar,varchar)->varchar")]
fnconcat(left:&str,right:&str)->String{
format!("{left}{right}")
}
注意到列式內存存儲中,StringArray 實際上是把多個字符串存放在一段連續的內存上,構建這個數組的 StringBuilder 實際上也只是將字符串追加寫入同一個 buffer 里。因此函數返回 String 是沒有必要的,它可以直接將字符串寫入 StringBuilder 的 buffer 中。
于是我們支持對返回字符串的函數添加一個 &mut Write 類型的 writer 參數。內部可以直接用 write! 方法向 writer 寫入返回值。
#[function("concat(varchar,varchar)->varchar")]
fnconcat(left:&str,right:&str,writer:&mutimplstd::Write){
writer.write_str(left).unwrap();
writer.write_str(right).unwrap();
}
在過程宏的實現中,我們主要修改了函數調用部分:
letwriter=user_fn.write.then(||quote!{&mutbuilder,});
letoutput=quote!{#user_fn_name(#(#inputs,)*#writer)};
以及特化 append_output 的邏輯:
letappend_output=ifuser_fn.write{
quote!{{
if#output.is_some(){//返回值直接在這行寫入builder
builder.append_value("");
}else{
builder.append_null();
}
}}
}else{
quote!{builder.append_option(#output);}
};
經過測試,這一功能也可以大幅提升字符串處理函數的性能。
4.4 常量預處理優化
有些函數的某個參數往往是一個常量,并且這個常量需要經過一個開銷較大的預處理過程。這類函數的典型代表是正則表達式匹配:
//regexp_like(source,pattern)
#[function("regexp_like(varchar,varchar)->boolean")]
fnregexp_like(text:&str,pattern:&str)->Result{
letregex=regex::new(pattern)?;//預處理:編譯正則表達式
Ok(regex.is_match(text))
}
對于一次向量化求值來說,如果輸入的 pattern 是常數(very likely),那么其實只需要編譯一次,然后用編譯后的數據結構對每一行文本進行匹配即可。但如果不是常數(unlikely,但是合法行為),則需要對每一行 pattern 編譯一次再執行。
為了支持這一需求,我們修改用戶接口,將特定參數的預處理過程提取到過程宏中,然后把預處理后的類型作為參數:
#[function(
"regexp_like(varchar,varchar)->boolean",
prebuild="Regex::new($1)?"http://$1表示第一個參數(下標從0開始)
)]
fnregexp_like(text:&str,regex:&Regex)->bool{
regex.is_match(text)
}
這樣,過程宏可以對這個函數生成兩個版本的代碼:
如果指定參數為常量,那么在構造函數中執行 prebuild 代碼,并將生成的 Regex 中間值存放在 struct 當中,在求值階段直接傳入函數。
如果不是常量,那么在求值階段將 prebuild 代碼嵌入到函數參數的位置上。
至于具體的代碼生成邏輯,由于細節相當復雜,這里就不再展開介紹了。
總之,這一優化保證了此類函數各種輸入下都具有最優性能,并且極大簡化了手工實現的復雜性。
4.5 表函數
最后,我們來看表函數(Table Function,Postgres 中也稱 Set-returning Funcion,返回集合的函數)。這類函數的返回值不再是一行,而是多行。如果同時返回多列,那么就相當于返回一個表。
select*fromgenerate_series(1,3); generate_series ----------------- 1 2 3
對應到常見的編程語言中,實際是一個生成器函數(Generator)。以 Python 為例,可以寫成這樣:
defgenerate_series(start,end): foriinrange(start,end+1): yieldi
Rust 語言目前在 nightly 版本支持生成器,但這一特性尚未 stable。不過如果不用 yield 語法的話,我們可以利用 RPIT 特性實現返回迭代器的函數,以達到同樣的效果:
#[function("generate_series(int,int)->setofint")]
fngenerate_series(start:i32,stop:i32)->implIterator- {
start..=stop
}
我們支持在 #[function] 簽名中使用 -> setof 以聲明一個表函數。它修飾的 Rust 函數必須返回一個 impl Iterator,其中的 Item 需要匹配返回類型。當然,Iterator 的內外都可以包含 Option 或 Result。
在對表函數進行向量化求值時,我們會對每一行輸入調用生成器函數,然后將每一行返回的多行結果串聯起來,最后按照固定的 chunk size 進行切割,依次返回多個 RecordBatch。因此表函數的向量化接口長這個樣子:
pubtraitTableFunction{
fneval(&self,input:&RecordBatch,chunk_size:usize)
->Result>>>;
}
我們給出一組 generate_series 的輸入輸出樣例(假設 chunk size = 2):
inputoutput +-------+------++-----+-----------------+ |start|stop||row|generate_series| +-------+------++-----+-----------------+ |0|0|---->|0|0| |||+->|2|0| |0|2|--++-----+-----------------+ +-------+------+|2|1| |2|2| +-----+-----------------+
由于表函數的輸入輸出不再具有一對一的關系,我們在 output 中會額外生成一列row來表示每一行輸出對應 input 中的哪一行輸入。這一關系信息會在某些 SQL 查詢中被使用到。
回到#[function]宏的實現,它為表函數生成的代碼實際上也是一個生成器。我們在內部使用了futures_async_stream[7]提供的#[try_stream]宏實現 async generator(它依賴 nightly 的 generator 特性),在 stable 版本中則使用genawaiter[8]代替。之所以要使用生成器,則是因為一個表函數可能會生成非常長的結果(例如generate_series(0, 1000000000)),中途必須把控制權交還調用者,才能保證系統不被卡死。感興趣的讀者可以思考一下:如果沒有 generator 機制,高效的向量化表函數求值能否實現?如何實現?
說到這里,多扯兩句。genawaiter 也是個很有意思的庫,它使用 async-await 機制來在 stable Rust 中實現 generator。我們知道 async-await 本質上也是一種 generator,它們都依賴編譯器的 CPS 變換實現狀態機。不過出于對異步編程的強烈需求,async-await 很早就被穩定化,而 generator 卻遲遲沒有穩定。由于背后的原理相通,它們可以互相實現。 此外,目前 Rust 社區正在積極推動 async generator 的進展,原生的async gen[9]和for await[10]語法剛剛在上個月進入 nightly。不過由于沒有和 futures 生態對接,整體依然處于不可用狀態。RisingWave 的流處理引擎就深度依賴 async generator 機制實現流算子,以簡化異步 IO 下的流狀態管理。不過這又是一個龐大的話題,之后有機會再來介紹這方面的應用吧。
5總結
由于篇幅所限,我們只能展開這么多了。如你所見,一個簡單的函數求值背后,隱藏著非常多的設計和實現細節:
為了高性能,我們選擇列式內存存儲和向量化求值。
存儲數據的容器通常是類型擦除的結構。但 Rust 是一門靜態類型語言,用戶定義的函數是強類型的簽名。這意味著我們需要在編譯期確定每一個容器的具體類型,做類型體操來處理不同類型之間的轉換,準確地把數據從容器中取出來喂給函數,最后高效地將函數吐出來的結果打包回數據容器中。
為了將上述過程隱藏起來,我們設計了#[function]過程宏在編譯期做類型反射和代碼生成,最終暴露給用戶一個盡可能簡單直觀的接口。
但是實際工程中存在各種復雜需求以及對性能的要求,我們必須持續在接口上打洞,并對代碼生成邏輯進行特化。幸好,過程宏具有非常強的靈活性,使得我們可以敏捷地應對變化的需求。
#[function]宏最初是為 RisingWave 內部函數實現的一套框架。最近,我們將它從 RisingWave 項目中獨立出來,基于 Apache Arrow 標準化成一套通用的用戶定義函數接口arrow-udf[11]。如果你的項目也在使用 arrow-rs 進行數據處理,現在可以直接使用這套#[function]宏定義自己的函數。如果你在使用 RisingWave,那么從這個月底發布的 1.7 版本起,你可以使用這個庫來定義 Rust UDF。它可以編譯成 WebAssembly 模塊插入到 RisingWave 中運行。感興趣的讀者也可以閱讀這個項目的源碼了解更多實現細節。
事實上,RisingWave 基于 Apache Arrow 構建了一整套用戶定義函數接口。此前,我們已經實現了服務器模式的 Python 和 Java UDF。最近,我們又基于 WebAssembly 實現了 Rust UDF,基于 QuickJS 實現了 JavaScript UDF。它們都可以嵌入到 RisingWave 中運行,以實現更好的性能和用戶體驗。
審核編輯:劉清
-
SQL
+關注
關注
1文章
789瀏覽量
46695 -
生成器
+關注
關注
7文章
322瀏覽量
22706 -
Rust
+關注
關注
1文章
240瀏覽量
7585 -
ChatGPT
+關注
關注
31文章
1598瀏覽量
10264
原文標題:用 Rust 過程宏魔法簡化 SQL 函數實現
文章出處:【微信號:Rust語言中文社區,微信公眾號:Rust語言中文社區】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
請教如何用SQL語句來壓縮ACCESS數據庫
如何用 rust 語言開發 stm32
如何用proc sql生成宏變量?
如何用Rust過程宏魔法簡化SQL函數呢?
評論