首页 > 其他 > 详细

异步任务调度二

时间:2020-07-14 11:25:29      阅读:51      评论:0      收藏:0      [点我收藏+]

异步任务调度二

/// <author>cxg 2020-7-14</author>
(*使用:
unit Unit1;

interface

uses    tasks, MsgPack,
  Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
  Dialogs, StdCtrls;

type
  TForm1 = class(TForm)
    Button1: TButton;
    procedure Button1Click(Sender: TObject);
  private
    { Private declarations }
    tasks: TThreadCfg;
  public
    { Public declarations }
    procedure callback(task: TMsgPack);
  end;

var
  Form1: TForm1;

implementation

{$R *.dfm}

procedure TForm1.Button1Click(Sender: TObject);
var task: TMsgPack;
  queue: TTaskQueue;
begin
  task := TMsgPack.Create;
  task.Force(‘f1‘).AsString := ‘测试‘;
  queue := TTaskQueue.Create;
  queue.enQueue(task);
  tasks := TThreadCfg.Create(1, queue);
  tasks.onCallback := callback;
  tasks.newThreads;
end;

procedure TForm1.callback(task: TMsgPack);
begin
  Caption := task.force(‘f1‘).AsString;
  tasks.Free;
end;

end.
*)

unit tasks;

interface

uses
  Windows, MsgPack, Contnrs,
  SyncObjs, Classes,
  SysUtils;

type
  TCallBack = procedure(task: TMsgPack) of object;

type
  TTaskQueue = class   //任务队列(线程安全)
  private
    fQueue: TQueue;
    fCS: TCriticalSection;
  public
    constructor Create;
    destructor Destroy; override;
    procedure enQueue(task: Pointer);
    function deQueue: Pointer;
  end;

type
  TThreadCfg = class     //管理 工作线程
  private
    fQueue: TTaskQueue;
    fCallBack: TCallBack;
    fThreadNum: Integer;
    fWorkers: array of TThread;
  public
    constructor Create(const threadNum: Integer; const queue: TTaskQueue);
    destructor Destroy; override;
    procedure newThreads;
    property onCallback: TCallBack read fCallBack write fCallBack;
  end;

type
  TWorkThread = class(TThread)  //工作线程
  private
    fConfig: TThreadCfg;
  public
    constructor Create(cfg: TThreadCfg);
    destructor Destroy; override;
    procedure Execute; override;
  end;  

implementation

function GetCPUNum: Integer;
var
  si: SYSTEM_INFO;
begin
  GetSystemInfo(si);
  Result := si.dwNumberOfProcessors;
end;

{ TTaskQueue }

constructor TTaskQueue.Create;
begin
  fQueue := TQueue.Create;
  fCS := TCriticalSection.Create;
end;

function TTaskQueue.deQueue: Pointer;
begin
  fCS.Enter;
  Result := fQueue.Pop;
  fCS.Leave;
end;

destructor TTaskQueue.Destroy;
begin
  FreeAndNil(fQueue);
  FreeAndNil(fCS);
  inherited;
end;

procedure TTaskQueue.enQueue(task: Pointer);
begin
  fCS.Enter;
  fQueue.Push(task);
  fCS.Leave;
end;

{ TThreadCfg }

constructor TThreadCfg.Create(const threadNum: Integer;
  const queue: TTaskQueue);
begin
  fThreadNum := threadNum;
  fQueue := queue;
  if fThreadNum = 0 then
    fThreadNum := GetCPUNum;
  SetLength(fWorkers, fThreadNum);
end;

destructor TThreadCfg.Destroy;
var i: Integer;
begin
  for i := 0 to fThreadNum - 1 do  //停止并释放工作线程
  begin
    fWorkers[i].Terminate;
    fWorkers[i].WaitFor;
    fWorkers[i].Free;
  end;
  fQueue.Free;       //释放队列
  inherited;
end;

procedure TThreadCfg.newThreads;
var i: Integer;
begin
  for i := 0 to fThreadNum - 1 do
  begin
    fWorkers[i] := TWorkThread.Create(Self);
    fWorkers[i].Resume;
  end;
end;

{ TWorkThread }

constructor TWorkThread.Create(cfg: TThreadCfg);
begin
  inherited Create(True);
  FreeOnTerminate := true;
  fConfig := cfg;
end;

destructor TWorkThread.Destroy;
begin

  inherited;
end;

procedure TWorkThread.Execute;
var pack: TMsgPack;
begin
  while not Self.Terminated do
  begin
    if fConfig.fQueue.fQueue.Count > 0 then
    begin
      pack := TMsgPack(fConfig.fQueue.deQueue);
      if Assigned(fConfig.fCallBack) then
      begin
        fConfig.fCallBack(pack);
        pack.Free;    //释放
      end;
    end;
    Sleep(1);
    SwitchToThread;
  end;
end;

end.

  

异步任务调度二

原文:https://www.cnblogs.com/hnxxcxg/p/13298034.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!