摘要:?本文介紹了如何使用MaxCompute UDF對JSON格式的日志進行信息提取和轉換。
一、業務場景分析:
由于業務的復雜性,數據開發者需要面對不同來源的不同類型數據,需要把這些數據抽取到數據平臺,按照設計好的數據模型對關鍵業務字段進行抽取,形成一張二維表,以便后續在大數據平臺/數據倉庫中進行統計分析、關聯計算。
本文結合一個具體的案例來介紹如何使用MaxCompute對json格式的日志數據進行轉換處理。
1.數據來源:應用實時寫入ECS主機的指定目錄下的日志文件中;
2.數據格式:日志文件中,每條日志的格式如下圖所示(示例中對數據進行了簡化和脫敏),每一條日志中包含了設備信息,以及1個或多個Session信息,且每條日志中的Session數量是動態的:1個或多個Session。每條日志的內容示例如下:
3.數據處理需求:采集日志數據,對日志數據進行解析、轉換,對轉換后的日志數據在MaxCompute進行統計分析。由于日志數據是json格式的,其中包含了多個業務字段信息,需要將業務字段提前出來,才能在MaxCompute進行后續的業務統計(如進行按照時段進行PV/UV統計、按照設備類型進行統計、關聯設備ID與會員信息進行統計等),所以本文的關鍵需求就是如何把json格式數據的關鍵信息解析為一張包含業務字段的二維表。
二、解決方案:
本文的解決方案中,選擇使用阿里云的日志服務+MaxCompute產品組合來滿足以上業務需求,其中日志服務僅僅完成日志采集和投遞的職能,不做數據解析和轉化工作。
1.日志采集:通過日志服務獲取日志數據到logstore(這部分內容可參考日志服務幫助文檔)
2.通過日志服務的投遞功能(幫助文檔)將日志定時投遞歸檔到MaxCompute的1張原始日志表,其中每條日志所有信息都寫入到原始日志表的1個字段content中。
3.利用MaxCompute對原始數據進行字段解析和提取。
1)利用內建函數get_json_object進行數據提取
selectget_json_object(content,'$.DeviceID')?as?DeviceID,get_json_object(content,'$.UniqueIdentifier')?as?UniqueIdentifier,get_json_object(content,'$.GameID')?as?GameID,get_json_object(content,'$.Device')?as?Device,get_json_object(content,'$.Sessions\[0].SessionID')?as?Session1_ID,get_json_object(content,'$.Sessions\[0].Events\[0].Name')?as?Session1_EventName,get_json_object(content,'$.Sessions\[1].SessionID')?as?Session2_ID,get_json_object(content,'$.Sessions\[1].Events\[0].Name')?as?Session2_EventNamefrom?log_target_json?where?pt='20180725'?limit?10
提取的結果如下:
方案總結:以上處理邏輯,是把一條日志的業務字段分別提取成為行字段,適合每個json記錄中的信息固定且可以映射為表字段,例如上面的例子,把session1和session2的信息提取出來后,分別看做不同的列字段來處理。但如果每條日志記錄包含的session數量是動態不固定的時候,這種處理邏輯就難以滿足需要,例如下一條日志就包含了3個session,如果要提取每個session的信息,就要求解析的SQL增加Session3_ID, Session3_EventName邏輯,如果再下一條日志包含100個session呢?這種提取方式就很難處理了。
這種情況,可以使用UDTF自定義函數來實現。
2)開發MaxCompute UDTF函數,對日志進行處理
根據數據特點,1條日志包含了多個session信息,屬于1:N的關系,轉換到數據倉庫的二維表時,需要解析到最小粒度的session信息,把1行轉成N行,提取所有session信息。業務目標如下所示:
在MaxCompute中,對1行記錄處理轉換為多行記錄需要使用UDTF來實現。
我們這里以JAVA UDTF為例,對content字段中的每條json記錄進行解析,獲取并返回需要提取的業務字段。這里的UDTF的處理邏輯會深入到json的第3級,循環解析出最小粒度的數據并返回多條記錄。
package?com.aliyun.odps;import?com.aliyun.odps.udf.UDFException;import?com.aliyun.odps.udf.UDTF;import?com.aliyun.odps.udf.annotation.Resolve;import?com.google.gson.Gson;import?java.io.IOException;import?java.util.List;import?java.util.Map;@Resolve("string->string,string,string,string,string,string,string,string")public?class?get_json_udtf?extends?UDTF?{
????@Override
????public?void?process(Object[]?objects)?throws?UDFException,?IOException?{
????????String?input?=?(String)?objects[0];
????????Map?map?=?new?Gson().fromJson(input,?Map.class);
????????Object?deviceID?=?map.get("DeviceID");
????????Object?uniqueIdentifier?=?map.get("UniqueIdentifier");
????????Object?gameID?=?map.get("GameID");
????????Object?device?=?map.get("Device");
????????List?sessions?=?(List)?map.get("Sessions");
????????for?(Object?session?:?sessions)?{
????????????Map?sMap?=?(Map)?session;
????????????Object?sessionID?=?sMap.get("SessionID");
????????????List?events?=?(List)?sMap.get("Events");
????????????for?(Object?event?:?events)?{
????????????????String?name?=?(String)?((Map)?event).get("Name");
????????????????String?timestamp?=?(String)?((Map)?event).get("Timestamp");
????????????????String?networkStatus?=?(String)?((Map)?event).get("NetworkStatus");
????????????????forward(deviceID,?uniqueIdentifier,gameID,device,
????????????????????????sessionID,name,timestamp,networkStatus);
????????????}
????????}
????}}注:關于UDF本身編寫、打包上傳、創建Function等知識請參閱官方文檔https://help.aliyun.com/document_detail/27867.html。
程序編寫完畢后,需要打包、上傳UDTF并創建UDF函數:
對編譯好的程序進行打包處理,生成jar包,在MaxCompute客戶端(odpscmd)中,上傳這個資源:
add jar maxcompute_demo-1.0-SNAPSHOT.jar -f;
然后通過命令行創建function:
create function get_json_udtf as com.aliyun.odps.get_json_udtf using maxcompute_demo-1.0-SNAPSHOT.jar';
創建后查看函數:
測試驗證:
對包含原始日志的表進行查詢,使用創建的get_json_udtf對content字段進行查詢:
查詢結果如下,UDFT函數對每條json記錄進行處理,生成了多條記錄,符合預期:?
同時,如需要固化處理邏輯,還可以使用insert into語法,將解析后的結果查詢到一張新表,通過作業調度來實現周期性的數據轉換。
三、總結:
本文通過一個日志分析的大數據分析場景,通過一個常見的json日志處理的需求為例,介紹了通過日志服務采集日志到MaxCompute,同時使用MaxCompute的內建函數/UDF等方式,對json格式的日志數據進行解析和轉換,提取關鍵業務字段、生成了可用于后續分析的日志表。
本文為云棲社區原創內容,未經允許不得轉載。
電子發燒友App



























評論