CPU密集型任務(wù)開發(fā)指導(dǎo)

2024-02-16 13:45 更新

CPU密集型任務(wù)是指需要占用系統(tǒng)資源處理大量計(jì)算能力的任務(wù),需要長時(shí)間運(yùn)行,這段時(shí)間會(huì)阻塞線程其它事件的處理,不適宜放在主線程進(jìn)行。例如圖像處理、視頻編碼、數(shù)據(jù)分析等。

基于多線程并發(fā)機(jī)制處理CPU密集型任務(wù)可以提高CPU利用率,提升應(yīng)用程序響應(yīng)速度。

當(dāng)進(jìn)行一系列同步任務(wù)時(shí),推薦使用Worker;而進(jìn)行大量或調(diào)度點(diǎn)較為分散的獨(dú)立任務(wù)時(shí),不方便使用8個(gè)Worker去做負(fù)載管理,推薦采用TaskPool。接下來將以圖像直方圖處理以及后臺(tái)長時(shí)間的模型預(yù)測(cè)任務(wù)分別進(jìn)行舉例。

使用TaskPool進(jìn)行圖像直方圖處理

  1. 實(shí)現(xiàn)圖像處理的業(yè)務(wù)邏輯。

  2. 數(shù)據(jù)分段,將各段數(shù)據(jù)通過不同任務(wù)的執(zhí)行完成圖像處理。

    創(chuàng)建Task,通過execute()執(zhí)行任務(wù),在當(dāng)前任務(wù)結(jié)束后,會(huì)將直方圖處理結(jié)果同時(shí)返回。

  3. 結(jié)果數(shù)組匯總處理。

  1. import taskpool from '@ohos.taskpool';
  2. @Concurrent
  3. function imageProcessing(dataSlice: ArrayBuffer) {
  4. // 步驟1: 具體的圖像處理操作及其他耗時(shí)操作
  5. return dataSlice;
  6. }
  7. function histogramStatistic(pixelBuffer: ArrayBuffer) {
  8. // 步驟2: 分成三段并發(fā)調(diào)度
  9. let number = pixelBuffer.byteLength / 3;
  10. let buffer1 = pixelBuffer.slice(0, number);
  11. let buffer2 = pixelBuffer.slice(number, number * 2);
  12. let buffer3 = pixelBuffer.slice(number * 2);
  13. let task1 = new taskpool.Task(imageProcessing, buffer1);
  14. let task2 = new taskpool.Task(imageProcessing, buffer2);
  15. let task3 = new taskpool.Task(imageProcessing, buffer3);
  16. taskpool.execute(task1).then((ret: ArrayBuffer[]) => {
  17. // 步驟3: 結(jié)果處理
  18. });
  19. taskpool.execute(task2).then((ret: ArrayBuffer[]) => {
  20. // 步驟3: 結(jié)果處理
  21. });
  22. taskpool.execute(task3).then((ret: ArrayBuffer[]) => {
  23. // 步驟3: 結(jié)果處理
  24. });
  25. }
  26. @Entry
  27. @Component
  28. struct Index {
  29. @State message: string = 'Hello World'
  30. build() {
  31. Row() {
  32. Column() {
  33. Text(this.message)
  34. .fontSize(50)
  35. .fontWeight(FontWeight.Bold)
  36. .onClick(() => {
  37. let data: ArrayBuffer;
  38. histogramStatistic(data);
  39. })
  40. }
  41. .width('100%')
  42. }
  43. .height('100%')
  44. }
  45. }

使用Worker進(jìn)行長時(shí)間數(shù)據(jù)分析

本文通過某地區(qū)提供的房價(jià)數(shù)據(jù)訓(xùn)練一個(gè)簡易的房價(jià)預(yù)測(cè)模型,該模型支持通過輸入房屋面積和房間數(shù)量去預(yù)測(cè)該區(qū)域的房價(jià),模型需要長時(shí)間運(yùn)行,房價(jià)預(yù)測(cè)需要使用前面的模型運(yùn)行結(jié)果,因此需要使用Worker。

  1. DevEco Studio提供了Worker創(chuàng)建的模板,新建一個(gè)Worker線程,例如命名為“MyWorker”。

  2. 在主線程中通過調(diào)用ThreadWorker的constructor()方法創(chuàng)建Worker對(duì)象,當(dāng)前線程為宿主線程。

    1. import worker from '@ohos.worker';
    2. const workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
  3. 在宿主線程中通過調(diào)用onmessage()方法接收Worker線程發(fā)送過來的消息,并通過調(diào)用postMessage()方法向Worker線程發(fā)送消息。

    例如向Worker線程發(fā)送訓(xùn)練和預(yù)測(cè)的消息,同時(shí)接收Worker線程發(fā)送回來的消息。

    1. // 接收Worker子線程的結(jié)果
    2. workerInstance.onmessage = function(e) {
    3. // data:Worker線程發(fā)送的信息
    4. let data = e.data;
    5. console.info('MyWorker.ts onmessage');
    6. }
    7. workerInstance.onerror = function (d) {
    8. // 接收Worker子線程的錯(cuò)誤信息
    9. }
    10. // 向Worker子線程發(fā)送訓(xùn)練消息
    11. workerInstance.postMessage({ 'type': 0 });
    12. // 向Worker子線程發(fā)送預(yù)測(cè)消息
    13. workerInstance.postMessage({ 'type': 1, 'value': [90, 5] });
  4. 在MyWorker.ts文件中綁定Worker對(duì)象,當(dāng)前線程為Worker線程。

    1. import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
    2. let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
  5. 在Worker線程中通過調(diào)用onmessage()方法接收宿主線程發(fā)送的消息內(nèi)容,并通過調(diào)用postMessage()方法向宿主線程發(fā)送消息。

    例如在Worker線程中定義預(yù)測(cè)模型及其訓(xùn)練過程,同時(shí)與主線程進(jìn)行信息交互。

    1. import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
    2. let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
    3. // 定義訓(xùn)練模型及結(jié)果
    4. let result;
    5. // 定義預(yù)測(cè)函數(shù)
    6. function predict(x) {
    7. return result[x];
    8. }
    9. // 定義優(yōu)化器訓(xùn)練過程
    10. function optimize() {
    11. result = {};
    12. }
    13. // Worker線程的onmessage邏輯
    14. workerPort.onmessage = function (e: MessageEvents) {
    15. let data = e.data
    16. // 根據(jù)傳輸?shù)臄?shù)據(jù)的type選擇進(jìn)行操作
    17. switch (data.type) {
    18. case 0:
    19. // 進(jìn)行訓(xùn)練
    20. optimize();
    21. // 訓(xùn)練之后發(fā)送主線程訓(xùn)練成功的消息
    22. workerPort.postMessage({ type: 'message', value: 'train success.' });
    23. break;
    24. case 1:
    25. // 執(zhí)行預(yù)測(cè)
    26. const output = predict(data.value);
    27. // 發(fā)送主線程預(yù)測(cè)的結(jié)果
    28. workerPort.postMessage({ type: 'predict', value: output });
    29. break;
    30. default:
    31. workerPort.postMessage({ type: 'message', value: 'send message is invalid' });
    32. break;
    33. }
    34. }
  6. 在Worker線程中完成任務(wù)之后,執(zhí)行Worker線程銷毀操作。銷毀線程的方式主要有兩種:根據(jù)需要可以在宿主線程中對(duì)Worker線程進(jìn)行銷毀;也可以在Worker線程中主動(dòng)銷毀Worker線程。

    在宿主線程中通過調(diào)用onexit()方法定義Worker線程銷毀后的處理邏輯。

    1. // Worker線程銷毀后,執(zhí)行onexit回調(diào)方法
    2. workerInstance.onexit = function() {
    3. console.info("main thread terminate");
    4. }

    方式一:在宿主線程中通過調(diào)用terminate()方法銷毀Worker線程,并終止Worker接收消息。

    1. // 銷毀Worker線程
    2. workerInstance.terminate();

    方式二:在Worker線程中通過調(diào)用close()方法主動(dòng)銷毀Worker線程,并終止Worker接收消息。

    1. // 銷毀線程
    2. workerPort.close();
以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)