Search code examples
multithreadingdelphicritical-section

Delphi: Multithreading, Thread safe not working


When data is sending to "tunnel" socket, it's sometimes merged, implemented the Critical Section but it's not working..

What I'm doing wrong ?

type
  my_ff_thread = class;
  my_ss_thread = class;
  Tmy_tunnel_from_MappedPortTCP = class;


  Tmy_thread_list = class
      ff_id   : string;
      ff_connection : TIdTCPConnection;
      constructor Create(local_ff_id: string; local_ss_c: TIdTCPConnection);
  end;


  Tmy_tunnel_from_MappedPortTCP = class(TIdBaseComponent)
  protected
    procedure InitComponent; override;
  public
    function my_connect:boolean;
  end;


  my_ff_thread = class(TThread)
  protected
    procedure Execute; override;
  public
    constructor Create;
  end;


  my_ss_thread = class(TThread)
  protected
    Fff_id : string;
    Fff_cmd : string;
    Fff_data : TIdBytes;
    procedure Execute; override;
  public
    constructor Create(ff_id:string; ff_cmd:string; ff_data:TIdBytes);
    function prepare_cmd(cmd:string; id:string; data:string):string;
    function set_nulls_at_begin(s:string):string;
  end;


var my_list : TThreadList;
    CS: TRTLCriticalSection;
    tunnel: TIdTCPConnection;

Implementation



constructor my_ff_thread.Create;
begin
  inherited Create(True);
end;




constructor my_ss_thread.Create(ff_id:string; ff_cmd:string; ff_data:TIdBytes);
begin
  inherited Create(True);
  Fff_id := ff_id;
  Fff_cmd := ff_cmd;
  Fff_data := ff_data;
end;


constructor Tmy_thread_list.Create(local_ff_id: string; local_ss_c: TIdTCPConnection);
begin
  ff_id   := local_ff_id;
  ff_connection := local_ss_c;
end;





function my_ss_thread.set_nulls_at_begin(s:string):string;
var len, i : integer;
    res : string;
begin
  if s='' then
  begin
    Result := '';
    Exit;
  end;
  res := '';
  len := Length(s);
  if len < 10 then
    for i:=1 to (10 - len) do
    begin
      res := res + '0';
    end;
  Result := res + s;
end;




function my_ss_thread.prepare_cmd(cmd:string; id:string; data:string):string;
var
  packet : string;
begin
  packet := set_nulls_at_begin(IntToStr(Length(cmd))) + cmd;
  packet := packet + set_nulls_at_begin(IntToStr(Length(id))) + id;
  packet := packet + set_nulls_at_begin(IntToStr(Length(data))) + data;
  Result := packet;
end;








function del_ff_from_list(firefox_id:string):boolean;
var i : integer;
begin
  Result := True;
  try
    with my_list.LockList do
    begin
      for i:=0 to Count-1 do
      begin
        if Tmy_thread_list(Items[i]).ff_id = firefox_id then
        begin
          Delete(i);
          break;
        end;
      end;
    end;
  finally
    my_list.UnlockList;
  end;
end;




procedure my_ss_thread.Execute;
var ss : TIdTCPClient;
    unix_time : integer;
    data : TIdBytes;
    packet : string;
    packet_stream: TStringStream;
begin
    ss := TIdTCPClient.Create(nil);
    try
      with TIdTcpClient(ss) do
      begin
        Host := '127.0.0.1';
        Port := 6666;
        ReadTimeout := 1000 * 5;
        Connect;
      end;
    except
      on E:Exception do
      begin
        ss.Disconnect;
        exit;
      end;
    end;





    try
      my_list.LockList.Add(Tmy_thread_list.Create(Fff_id, ss));
    finally
      my_list.UnlockList;
    end;

    try
      ss.Socket.Write(Fff_data);
    except
      on E:Exception do begin {Fmy_memo.Lines.Add('First data not sent!');} end;
    end;


    unix_time := DateTimeToUnix(NOW);


    while True do
    begin
      ss.Socket.CheckForDataOnSource(5);
      if not ss.Socket.InputBufferIsEmpty then
      begin
        SetLength(data, 0);

        ss.Socket.InputBuffer.ExtractToBytes(data);
        packet := prepare_cmd('data_from_ss', Fff_id, TIdEncoderMIME.EncodeBytes(data));
        packet_stream := TStringStream.Create(packet);
        packet_stream.Position := 0;

        ss.Socket.InputBuffer.Clear;
        unix_time := DateTimeToUnix(NOW);

        try
            EnterCriticalSection(CS);
            tunnel.Socket.Write(packet_stream, -1, True);
            LeaveCriticalSection(CS);

        except
          on E:Exception do
          begin
          end;
        end;
      end;


      if (DateTimeToUnix(NOW) - unix_time) > 120 then
      begin
        ss.Disconnect;
        break;
      end;

      if not ss.Connected then
      begin
        break;
      end;

      if not tunnel.Connected then
      begin
        ss.Disconnect;
        break;
      end;


    end;


    try
      if tunnel.Connected then
      begin
        EnterCriticalSection(CS);
          packet := prepare_cmd('disconnect', Fff_id, 'x');
          packet_stream := TStringStream.Create(packet);
          packet_stream.Position := 0;
          tunnel.Socket.Write(packet_stream, -1, True);
        LeaveCriticalSection(CS);
      end;
    except
      on E:Exception do begin end;
    end;


Terminate;
end;










procedure my_ff_thread.Execute;
var
  t : my_ss_thread;
  cmd, id : string;
  i : integer;
  found_ss : TIdTCPConnection;
  list : TList;
  packet : string;
  cmd_len, id_len, data_len : integer;
  data : TIdBytes;
  orig_data : string;
  packet_stream: TStringStream;
  cmd_len_str, id_len_str, data_len_str : string;
begin
  packet_stream := TStringStream.Create;

  while not Terminated do
  begin
        packet_stream.Position := 0;
        try
          tunnel.Socket.ReadStream(packet_stream);
        except
          on E:Exception do begin end;
        end;

        packet := packet_stream.DataString;


        if packet = '0000' then
          continue;




        try
        cmd_len_str := Copy(packet, 1, 10);
        cmd_len := StrToInt(cmd_len_str);
        except
          on E:Exception do begin end;
        end;
        Delete(packet, 1, 10);
        cmd := Copy(packet, 1, cmd_len);
        Delete(packet, 1, cmd_len);

        try
        id_len_str := Copy(packet, 1, 10);
        id_len := StrToInt(id_len_str);
        except
          on E:Exception do begin end;
        end;
        Delete(packet, 1, 10);
        id := Copy(packet, 1, id_len);
        Delete(packet, 1, id_len);

        SetLength(data, 0);
        try
        data_len_str := Copy(packet, 1, 10);
        data_len := StrToInt(data_len_str);
        except
          on E:Exception do begin end;
        end;
        Delete(packet, 1, 10);
        data := TIdDecoderMIME.DecodeBytes(Copy(packet, 1, data_len));
        orig_data := Copy(packet, 1, data_len);
        Delete(packet, 1, data_len);

        found_ss := nil;
        try
          list := my_list.LockList;
          for i:=0 to list.Count-1 do
          begin
            if Tmy_thread_list(list[i]).ff_id = id then
            begin
              found_ss := Tmy_thread_list(list[i]).ff_connection;
              break;
            end;
          end;
        finally
          my_list.UnlockList;
        end;


        if cmd = 'disconnect' then
        begin
          if found_ss <> nil then
            if found_ss.Connected then
            begin
              found_ss.Disconnect;
              del_ff_from_list(id);
              continue;
            end;
        end;


        if found_ss = nil then
        begin
          t := my_ss_thread.Create(id, cmd, data);
          t.Start;
        end
        else
        begin
          if found_ss <> nil then
            try
            if found_ss.Connected then
              begin
                found_ss.Socket.Write(data);
              end;
            except
              on E:Exception do begin end;
            end;
        end;




      if not tunnel.Connected then
      begin
        Terminate;
        break;
      end;


  end;

end;



function Tmy_tunnel_from_MappedPortTCP.my_connect:boolean;
var t : my_ff_thread;
begin
  Result := True;
    try
      with TIdTcpClient(tunnel) do
      begin
        Host := '192.168.0.157';
        Port := 8099;
        Connect;
      end;
    except
      on E:Exception do
      begin
        tunnel.Disconnect;
        exit;
      end;
    end;
    t := my_ff_thread.Create;
    t.Start;

end;


initialization
  InitializeCriticalSection(CS);
  my_list := TThreadList.Create;
  tunnel := TIdTCPClient.Create(nil);
finalization
  DeleteCriticalSection(CS);


end.

Solution

  • Try something like this:

    type 
      my_ff_thread = class; 
      my_ss_thread = class; 
      Tmy_tunnel_from_MappedPortTCP = class; 
    
      Tmy_thread_list = class 
      public
        ff_id   : string; 
        ff_connection : TIdTCPConnection; 
        constructor Create(const local_ff_id: string; local_ss_c: TIdTCPConnection); 
      end; 
    
      Tmy_tunnel_from_MappedPortTCP = class(TIdBaseComponent) 
      protected 
        procedure InitComponent; override; 
      public 
        function my_connect: boolean; 
        function my_disconnect: boolean; 
      end; 
    
      my_ff_thread = class(TThread) 
      protected 
        procedure Execute; override; 
      public 
        constructor Create;
      end; 
    
      my_ss_thread = class(TThread) 
      protected 
        Fff_id : string; 
        Fff_cmd : string; 
        Fff_data : TIdBytes; 
        procedure Execute; override; 
      public 
        constructor Create(const ff_id, ff_cmd: string; const ff_data: TIdBytes); 
      end; 
    
    var
      my_list : TThreadList = nil; 
      CS: TCriticalSection = nil; 
      tunnel: TIdTCPClient = nil; 
      tunnel_thread: my_ff_thread = nil;
    
    implementation 
    
    constructor Tmy_thread_list.Create(const local_ff_id: string; local_ss_c: TIdTCPConnection); 
    begin 
      ff_id := local_ff_id; 
      ff_connection := local_ss_c; 
    end; 
    
    constructor my_ss_thread.Create(const ff_id, ff_cmd: string; const ff_data: TIdBytes); 
    begin 
      inherited Create(False); 
      Fff_id   := ff_id; 
      Fff_cmd  := ff_cmd; 
      Fff_data := Copy(ff_data, 0, Length(ff_data)); 
    end; 
    
    procedure my_ss_thread.Execute; 
    var
      ss : TIdTCPClient; 
      data : TIdBytes; 
      packet : string;
    
      procedure WriteStrToStream(strm: TStream; const s: String);
      var
        buf: TIdBytes;
        len: Integer;
      begin
        buf := ToBytes(s, IndyUTF8Encoding);
        len := Length(buf);
        strm.WriteBuffer(len, SizeOf(Integer));
        if bytes <> nil then
          strm.WriteBuffer(buf[0], len);
      end;
    
      procedure WritePacketToTunnel(const cmd: string; const bytes: TIdBytes = nil);
      var
        strm: TMemoryStream;
      begin
        strm := TMemoryStream.Create;
        try
          WriteStrToStream(strm, cmd);
          WriteStrToStream(strm, Fff_id);
          WriteStrToStream(strm, TIdEncoderMIME.EncodeBytes(bytes));
    
          CS.Enter;
          try 
            tunnel.IOHandler.Write(strm, 0, True);
          finally
            CS.Leave;
          end;
        finally
          strm.Free;
        end;
      end;
    
    begin 
      ss := TIdTCPClient.Create(nil); 
      try
        ss.Host := '127.0.0.1'; 
        ss.Port := 6666; 
        ss.ReadTimeout := 1000 * 120; 
    
        ss.Connect; 
        try  
          my_list.Add(Tmy_thread_list.Create(Fff_id, ss)); 
    
          try 
            ss.IOHandler.Write(Fff_data); 
          except 
            {Fmy_memo.Lines.Add('First data not sent!');}
            raise;
          end; 
    
          while not Terminated do 
          begin 
            SetLength(data, 0); 
            ss.IOHandler.ReadBytes(data, -1);
            if Length(data) = 0 then
              break;
    
            WritePacketToTunnel('data_from_ss', data);
          end; 
    
          WritePacketToTunnel('disconnect');
        finally
          ss.Disconnect;
        end;
      finally
        ss.Free;
      end;
    end; 
    
    constructor my_ff_thread.Create; 
    begin 
      inherited Create(False); 
    end; 
    
    procedure my_ff_thread.Execute; 
    var 
      cmd, id : string; 
      data : TIdBytes; 
      i : integer; 
      found_ss : TIdTCPConnection; 
      list : TList; 
    
      function ReadStrFromStream(strm: TStream): string;
      var
        len: Integer;
      begin
        strm.ReadBuffer(len, SizeOf(Integer));
        if len > 0 then
          Result := IdGlobal.ReadStringFromStream(strm, len, IndyUTF8Encoding)
        else
          Result := '';
      end;
    
      procedure ReadPacketFromTunnel(var v_cmd, v_id: string; var v_data: TIdBytes);
      var
        strm: TMemoryStream;
      begin
        strm := TMemoryStream.Create;
        try
          tunnel.IOHandler.ReadStream(strm, -1, False);
          strm.Position := 0;
          v_cmd  := ReadStrFromStream(strm);
          v_id   := ReadStrFromStream(strm);
          v_data := TIdDecoderMIME.DecodeBytes(ReadStrFromStream(strm));
        finally
          strm.Free;
        end;
      end;
    
    begin 
      while not Terminated do 
      begin 
        ReadPacketFromTunnel(cmd, id, data); 
    
        found_ss := nil; 
        list := my_list.LockList; 
        try 
          for i := 0 to list.Count-1 do 
          begin 
            if Tmy_thread_list(list[i]).ff_id = id then 
            begin 
              found_ss := Tmy_thread_list(list[i]).ff_connection; 
              break; 
            end; 
          end; 
        finally 
          my_list.UnlockList; 
        end; 
    
        if cmd = 'disconnect' then 
        begin 
          if found_ss <> nil then
            found_ss.Disconnect; 
          del_ff_from_list(id); 
          continue; 
        end; 
    
        if found_ss <> nil then 
        begin 
          try 
            found_ss.IOHandler.Write(data); 
          except 
          end; 
          Continue;
        end;
    
        my_ss_thread.Create(id, cmd, data);
      end; 
    end; 
    
    function Tmy_tunnel_from_MappedPortTCP.my_connect: boolean; 
    begin 
      Result := True; 
      try 
        tunnel.Host := '192.168.0.157'; 
        tunnel.Port := 8099; 
        tunnel.Connect; 
        tunnel_thread := my_ff_thread.Create(tunnel); 
      except 
        tunnel.Disconnect; 
        Result := False;
      end; 
    end; 
    
    function Tmy_tunnel_from_MappedPortTCP.my_disconnect: boolean; 
    begin 
      Result := True; 
      try
        if tunnel_thread <> nil then tunnel_thread.Terminate;
        try
          tunnel.Disconnect; 
        finally
          if tunnel_thread <> nil then
          begin
            tunnel_thread.WaitFor;
            FreeAnNil(tunnel_thread);
          end;
        end;
      except 
        Result := False;
      end; 
    end; 
    
    initialization 
      CS := TCriticalSection.Create; 
      my_list := TThreadList.Create; 
      tunnel := TIdTCPClient.Create(nil); 
    finalization 
      tunnel.Free; 
      my_list.Free; 
      CS.Free; 
    
    end.