
作者|donatello1996
來源 | 電子發(fā)燒友
題圖|飛凌嵌入式
rte_ring是一個用CAS實現(xiàn)的無鎖FIFO環(huán)形隊列,支持多消費者/生產(chǎn)者同時出入隊列,常用于多線程/多進程之間的通信。具體原理可以查看DPDK官方文檔或者閱讀源碼,本文采用的硬件板卡為飛凌嵌入式OKMX8MP-C開發(fā)板,系統(tǒng)版本Linux5.4.70+Qt5.15.0,主要介紹通過編譯DPDK源碼實現(xiàn)rte_ring無鎖環(huán)隊列進程間通信。
下面就跟著小編一起了解下。
一、內(nèi)核編譯下載并解壓飛凌廠商提供的iMX8MP內(nèi)核源碼壓縮包分卷:

在虛擬機中合并壓縮分卷并解壓得出內(nèi)核源碼包文件夾OK8MP-linux-kernel,將文件夾使用tar打包并復(fù)制到TF卡文件系統(tǒng)中解壓:

找到內(nèi)核源碼中的配置文件OK8MP-C_defconfig:

這個就是make選項,使用
make OK8MP-C_defconfig
指令即可配置編譯選項:
make -j4
開始編譯:
注意開始編譯前需要安裝常用軟件:
apt install bison bc flex


增量編譯完畢:

接下來就可以下載DPDK并運行rte_ring無鎖環(huán)隊列Demo應(yīng)用,需要從
https://www.dpdk.org/
官網(wǎng)中下載DPDK 19.11.10 (LTS)長期支持版本:

在根目錄下的mk/文件夾下找到名為rte_vars.mk設(shè)置文件,找到環(huán)境變量RTE_KERNELDIR,修改為上述的內(nèi)核源碼路徑:


RTE_KERNELDIR ?= /home/OK8MP-linux-kernel/
進入usertools文件夾,找到dpdk-setup.sh腳本并運行


選擇8,ARM64-armv8a-linuxapp-gcc,

這個選項會使dpdk的gcc交叉編譯鏈生成適用于armv8a處理器的外部庫,外部庫中有kmod和lib等ko文件和so文件,是用于第三方程序開發(fā)和運行的:


使用指令
insmod /home/dpdk-stable-19.11.10/arm64-armv8a-linuxapp-gcc/kmod/igb_uio.ko
左右滑動查看完整代碼加載igb_uio.ko驅(qū)動文件,這是進行dpdk開發(fā)必備的步驟:

然后是使用dpdk-devbind.py腳本手動進行hugepage大頁內(nèi)存綁定,此處為numa方式:

此舉會將/mnt/huge文件mount成hugepage映射文件,并實實在在地占用內(nèi)存空間:


準備工作完成,我們接下來可以進行rte_ring無鎖環(huán)隊列Demo代碼的編寫,但是在編寫之前,需要對無鎖環(huán)隊列有一個基本的認識:https://blog.csdn.net/chen98765432101/article/details/69367633
無論是dpdk第三方開發(fā)的rte_ring還是Linux內(nèi)核中本就存在的無鎖環(huán)隊列,其基本原理類似,在一條分配好的隊列型內(nèi)存空間中,讀寫方式為FIFO(先進先出),讀和寫的動作分別有兩個進程或兩個線程進行,寫進程不斷往地址自增的內(nèi)存位置寫入數(shù)據(jù),讀進程不斷讀取地址自增的內(nèi)存位置的數(shù)據(jù),當寫位置的內(nèi)存地址已為隊列中內(nèi)存的最高值時,需要釋放隊列中內(nèi)存地址最低值的空間供寫進程繼續(xù)寫,方式仍與上一周期相同(不斷往地址自增的內(nèi)存位置寫入數(shù)據(jù)),釋放過程需要保證對末尾內(nèi)存地址空間的鎖定與解鎖,避免讀寫過程出錯。而不同的是,Linux內(nèi)核中的無鎖環(huán)隊列,地址管理和讀寫控制均由內(nèi)核進行,而dpdk的rte_ring則由dpdk內(nèi)部的控制器進行,因為dpdk這一模塊完整地接管了所分配內(nèi)存空間的管理權(quán),是直接繞過Linux內(nèi)核進行管理的,內(nèi)核也無權(quán)訪問dpdk控制器的具體管理細節(jié)。




編寫無鎖環(huán)隊列兩個進程的Demo,先寫Primary進程:
左右滑動查看完整代碼static const char *_MSG_POOL = "MSG_POOL";static const char *_SEC_2_PRI = "SEC_2_PRI";static const char *_PRI_2_SEC = "PRI_2_SEC";static const char *_PRI_2_THI = "PRI_2_THI";struct rte_ring *send_ring, *recv_ring , *send_ring_third;struct rte_mempool *message_pool;volatile int quit = 0;static void * lcore_recv(void *arg){unsigned lcore_id = rte_lcore_id();printf("Starting core %u\n", lcore_id);while (!quit){void *msg;if (rte_ring_dequeue(recv_ring, &msg) < 0){usleep(5);continue;}printf("lcore_id = %d Received '%s'\n" , lcore_id , (char *)msg);rte_mempool_put(message_pool , msg);}return 0;}int string_size = 100;int elt_size = 128;pthread_t id1;int main(int argc, char **argv){const unsigned flags = 0;const unsigned ring_size = 64;const unsigned pool_size = 1024;const unsigned pool_cache = 32;const unsigned priv_data_sz = 0;int ret;unsigned lcore_id;ret = rte_eal_init(argc, argv);if (ret < 0)rte_exit(EXIT_FAILURE, "Cannot init EAL\n");send_ring = rte_ring_create(_PRI_2_SEC, ring_size, rte_socket_id(), flags);recv_ring = rte_ring_create(_SEC_2_PRI, ring_size, rte_socket_id(), flags);send_ring_third = rte_ring_create(_PRI_2_THI, ring_size, rte_socket_id(), flags);message_pool = rte_mempool_create(_MSG_POOL, pool_size,elt_size, pool_cache, priv_data_sz,NULL, NULL, NULL, NULL,rte_socket_id(), flags);if (send_ring == NULL)rte_exit(EXIT_FAILURE, "Problem getting sending ring\n");if (recv_ring == NULL)rte_exit(EXIT_FAILURE, "Problem getting receiving ring\n");if (send_ring_third == NULL)rte_exit(EXIT_FAILURE, "Problem getting send_ring_third\n");if (message_pool == NULL)rte_exit(EXIT_FAILURE, "Problem getting message pool\n");pthread_create(&id1 , NULL , lcore_recv , NULL);while(1){void *msg = NULL;if (rte_mempool_get(message_pool, &msg) < 0)continue;snprintf((char *)msg, string_size, "%s", "primary to secondary");if (rte_ring_enqueue(send_ring , msg) < 0){rte_mempool_put(message_pool, msg);}if (rte_mempool_get(message_pool, &msg) < 0)continue;snprintf((char *)msg, string_size, "%s", "primary to third");if (rte_ring_enqueue(send_ring_third , msg) < 0){rte_mempool_put(message_pool, msg);}sleep(1);}return 0;}
注意在Makefile文件里面要關(guān)閉WERROR相關(guān)編譯選項:
左右滑動查看完整代碼# BSD LICENSE## Copyright(c) 2010-2014 Intel Corporation. All rights reserved.# All rights reserved.## Redistribution and use in source and binary forms, with or without# modification, are permitted provided that the following conditions# are met:## * Redistributions of source code must retain the above copyright# notice, this list of conditions and the following disclaimer.# * Redistributions in binary form must reproduce the above copyright# notice, this list of conditions and the following disclaimer in# the documentation and/or other materials provided with the# distribution.# * Neither the name of Intel Corporation nor the names of its# contributors may be used to endorse or promote products derived# from this software without specific prior written permission.## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.ifeq ($(RTE_SDK),)$(error "Please define RTE_SDK environment variable")endif# Default target, can be overridden by command line or environmentRTE_TARGET ?= arm64-armv8a-linuxapp-gccinclude $(RTE_SDK)/mk/rte.vars.mk# binary nameAPP = rte_ring_primary# all source are stored in SRCS-ySRCS-y := main.cCFLAGS += -O0CFLAGS +=include $(RTE_SDK)/mk/rte.extapp.mk
Secondary進程:
static const char *_MSG_POOL = "MSG_POOL";static const char *_SEC_2_PRI = "SEC_2_PRI";static const char *_PRI_2_SEC = "PRI_2_SEC";struct rte_ring *send_ring, *recv_ring;struct rte_mempool *message_pool;volatile int quit = 0;int string_size = 100;static int lcore_send(__attribute__((unused)) void *arg){unsigned lcore_id = rte_lcore_id();while(1){void *msg = NULL;if (rte_mempool_get(message_pool, &msg) < 0)continue;snprintf((char *)msg , string_size , "%s", "secondary to primary");if (rte_ring_enqueue(send_ring , msg) < 0){rte_mempool_put(message_pool, msg);}sleep(1);}return 0;}pthread_t id1;int main(int argc, char **argv){const unsigned flags = 0;const unsigned ring_size = 64;const unsigned pool_size = 1024;const unsigned pool_cache = 32;const unsigned priv_data_sz = 0;int ret;unsigned lcore_id;ret = rte_eal_init(argc, argv);if (ret < 0)rte_exit(EXIT_FAILURE, "Cannot init EAL\n");recv_ring = rte_ring_lookup(_PRI_2_SEC);send_ring = rte_ring_lookup(_SEC_2_PRI);message_pool = rte_mempool_lookup(_MSG_POOL);if (send_ring == NULL)rte_exit(EXIT_FAILURE, "Problem getting sending ring\n");if (recv_ring == NULL)rte_exit(EXIT_FAILURE, "Problem getting receiving ring\n");if (message_pool == NULL)rte_exit(EXIT_FAILURE, "Problem getting message pool\n");pthread_create(&id1 , NULL , lcore_send , NULL);while (1){lcore_id = rte_lcore_id();void * msg = NULL;if (rte_ring_dequeue(recv_ring, &msg) < 0){usleep(5);continue;}printf("lcore_id = %d Received: %s\n" , lcore_id , (char *)msg);rte_mempool_put(message_pool, msg);}return 0;}
左右滑動查看完整代碼
同樣在Makefile文件里面要關(guān)閉WERROR相關(guān)編譯選項:
左右滑動查看完整代碼# BSD LICENSE## Copyright(c) 2010-2014 Intel Corporation. All rights reserved.# All rights reserved.## Redistribution and use in source and binary forms, with or without# modification, are permitted provided that the following conditions# are met:## * Redistributions of source code must retain the above copyright# notice, this list of conditions and the following disclaimer.# * Redistributions in binary form must reproduce the above copyright# notice, this list of conditions and the following disclaimer in# the documentation and/or other materials provided with the# distribution.# * Neither the name of Intel Corporation nor the names of its# contributors may be used to endorse or promote products derived# from this software without specific prior written permission.## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.ifeq ($(RTE_SDK),)$(error "Please define RTE_SDK environment variable")endif# Default target, can be overridden by command line or environmentRTE_TARGET ?= arm64-armv8a-linuxapp-gccinclude $(RTE_SDK)/mk/rte.vars.mk# binary nameAPP = rte_ring_secondary# all source are stored in SRCS-ySRCS-y := main.cCFLAGS += -O3CFLAGS += $()include $(RTE_SDK)/mk/rte.extapp.mk
運行,這里說一下,基于rte_ring的進程間通信,Secondary進程最好是使用auto類型:
./rte_ring_primary --proc-type primary./rte_ring_secondary --proc-type auto
運行效果:


作者簡介
donatello1996,某大型企業(yè)資深嵌入式工程師,電子發(fā)燒友論壇技術(shù)大牛,同時也是飛凌嵌入式多年鐵粉,曾基于飛凌多款板卡產(chǎn)出過優(yōu)質(zhì)測評文章或使用心得。本期三篇文章為donatello1996在使用OKMX8MP-C開發(fā)板過程中精心產(chǎn)出的干貨,在此對donatello1996表示感謝。
-
嵌入式
+關(guān)注
關(guān)注
5198文章
20442瀏覽量
333964
發(fā)布評論請先 登錄
電子發(fā)燒友榮獲電子工業(yè)出版社博文視點 “2025 年度卓越合作伙伴”
進程通信
實測對比:為什么發(fā)燒友更青睞立山科學(xué)熱敏電阻?
[投票評選]2025電子發(fā)燒友開發(fā)板測評大賽--最受歡迎開發(fā)板
電子發(fā)燒友工程師看!電子領(lǐng)域評職稱,技術(shù)之路更扎實
【社區(qū)活動】電子發(fā)燒友四月份活動匯總
i.MX8MP使用最新的BSP (6.6.52-2.2.0) 映像安裝TA失敗,為什么?
將Deepseek移植到i.MX 8MP|93 EVK的步驟
在SPI通信中啟動SCLK之前如何減少CS低電平時間?
如何實現(xiàn)i.MX8MPlus的rpmsg調(diào)試
發(fā)燒友實測 | i.MX8MP 編譯DPDK源碼實現(xiàn)rte_ring無鎖環(huán)隊列進程間通信
評論