Search code examples
delphidatasnap

Can we use TDSProviderConnection to replace TLocalConnection for in-process DataSnap application?


I able to access server method by in-process DataSnap application. Click here for details.

However, there is another aspect of in-process datasnap application. It is the IAppServer or TDataSetProvider.

Prior to Delphi 2009, I use TConnectionBroker with TLocalConnection for in-process datasnap access. The new Delphi 2009/2010 DataSnap allow us to use TDSProviderConnection to as RemoteServer. However, I can only make it works for TCP/HTTP connection. I can't use TDSProviderConnection for in-process datasnap application. It will prompt "invalid pointer operation".

This is how my code looks like:

var o: TDataModule1;
    Q: TSQLConnection;
    c: TEmployeeServerClient;
begin
  o := TDataModule1.Create(Self); 
  Q := TSQLConnection.Create(Self);
  try
    Q.DriverName := 'DSServer1';
    Q.LoginPrompt := False;
    Q.Open;

    DSProviderConnection1.SQLConnection := Q;
    DSProviderConnection1.ServerClassName := 'TEmployeeServer';
    DSProviderConnection1.Connected := True;

    ClientDataSet1.ProviderName := 'DataSetProvider1';
    ClientDataSet1.Open;
  finally
    o.Free;
    Q.Free;
  end;
end;

The TEmployeeServer is a TDSServerModule class descendant that consist of TDataSetProvider, TSQLDataSet and TSQLConnection that connect together.

After tracing the source code, I found the TSQLDataSet did open and traverse the dataset. The cause of the problem should be related to the following 2 methods that use TDBXNoOpRow

function TDSVoidConnectionHandler.CreateDbxRow: TDBXStreamerRow;
begin
  Result := TDBXNoOpRow.Create(DBXContext);
end;

function TDSServerCommand.CreateParameterRow: TDBXRow;
begin
  Result := TDBXNoOpRow.Create(FDbxContext);
end;

The TDBXNoOpRow instance will consumed by

procedure TDBXStreamValue.SetRowValue;
begin
  if FExtendedType then
  begin
    if FStreamStreamReader <> nil then
      FDbxRow.SetStream(Self, FStreamStreamReader)
    else if FByteStreamReader <> nil then
      FDbxRow.SetStream(Self, FByteStreamReader)
    else
      inherited SetRowValue;
  end else
    inherited SetRowValue;
end;

Since TDBXNoOpRow doesn't nothing, the data packet doesn't get transfer by above method. I suspect this is the cause of the problem using in-process machanism.

I am not sure if we able to throw away TLocalConnection and replaced by TDSProviderConnection for in-process DataSnap application? I have traced the DBX source code for days and can't even find a clue on this issue.


Solution

  • Classic DataSnap

    Prior to Delphi 2009, we may use either TLocalConnection or TSocketConnection together with TConnectionBroker for in-process or out-of-process communication via IAppServer interface. There are even more DataSnap connection that supports IAppServer. Check Delphi helps for details.

    New DataSnap from Delphi 2009

    Previously, TSQLConnection was used in DataSnap server only. In new DataSnap, we may use TSQLConnection in DataSnap client. There is a new driver call DataSnap that allow us to connect to a DataSnap server either via TCP or HTTP protocol using REST data packet for multi-tier application. Furthermore, we may use connect to TDSSever (TDSServer.Name) via TSQLConnection.DriverName for in-process connection. This benefits us to write a scalable multi-tier DataSnap application to consume server methods. See here for more details.

    In Delphi 2009/2010, a new DataSnap connection component – TDSProviderConnection was introduced. As the name implied, it supply providers from DataSnap server. This connection require a TSQLConnection instance to work with in client tier. Thus, we may use a single TSQLConnection in client tier either in-process or out-of-process. And that fulfill the design philosophy of scalable multi-tier DataSnap application.

    There are lots of demo or CodeRage videos available in the web showing how to TDSProviderConnection in DataSnap client tier. However, most of the examples only showing out-of-process design. I never find one example illustrate the usage of TDSProviderConnection for in-process design while writing this topic. Hope there are more from other famous or well know Delphi fans.

    At first, I thought it is easy to use TDSProviderConnection for in-process design. But I face problems while follow the rules. These problems should be related to bugs and in mature design of DataSnap framework. I will show at here how to deals with the problems.

    Design a DataSnap module

    First, we design a simple DataSnap module for this example. This is a TDSServerModule descendant instance with 2 components: a TDataSetProvider and a TClientDataSet instance. The reason using TDSServerModule is it will manage providers define in the module.

    MySeverProvider.DFM

    object ServerProvider: TServerProvider
      OldCreateOrder = False
      OnCreate = DSServerModuleCreate
      Height = 225
      Width = 474
      object DataSetProvider1: TDataSetProvider
        DataSet = ClientDataSet1
        Left = 88
        Top = 56
      end
      object ClientDataSet1: TClientDataSet
        Aggregates = <>
        Params = <>
        Left = 200
        Top = 56
      end
    end
    

    MyServerProvider.PAS

    type
      TServerProvider = class(TDSServerModule)
        DataSetProvider1: TDataSetProvider;
        ClientDataSet1: TClientDataSet;
        procedure DSServerModuleCreate(Sender: TObject);
      end;
    
    {$R *.dfm}
    
    procedure TServerProvider.DSServerModuleCreate(Sender: TObject);
    begin
      ClientDataSet1.LoadFromFile('..\orders.cds');
    end;
    

    Define a transport layer for the provider module

    Since this is a in-process application, we don’t really need a physical transport layer for the provider module. What we need here is a TDSServer and a TDSServerClass instance that helps propagate the providers to ClientDataSet in later stage.

    var C: TDSServer:
        D: TDSServerClass;
    begin
      C := TDSServer.Create(nil);
      D := TDSServerClass.Create(nil);
      try
        C.Server := D;
        C.OnGetClass := OnGetClass;
        D.Start;
    
      finally
        D.Free;
        C.Free;
      end;
    end;
    
    procedure TForm1.OnGetClass(DSServerClass: TDSServerClass; var
        PersistentClass: TPersistentClass);
    begin
      PersistentClass := TServerProvider;
    end;
    

    Use TDSProviderConnection to consume the in-process DataSnap service

    We start hook up everything in DataSnap context to get it done:

    var Q: TSQLConnection;
        D: TDSServer;
        C: TDSServerClass;
        P: TServerProvider;
        N: TDSProviderConnection;
    begin
      P := TServerProvider.Create(nil);
      D := TDSServer.Create(nil);
      C := TDSServerClass.Create(nil);
      Q := TSQLConnection.Create(nil);
      N := TDSProviderConnection.Create(nil);
      try
        C.Server := D;
        C.OnGetClass := OnGetClass;
    
        D.Start;
    
        Q.DriverName := 'DSServer';
        Q.LoginPrompt := False;
        Q.Open;
    
        N.SQLConnection := Q;
        N.ServerClassName := 'TServerProvider';
        N.Connected := True;
    
        ClientDataSet1.RemoteServer := N;
        ClientDataSet1.ProviderName := 'DataSetProvider1';
        ClientDataSet1.Open;
    
        ShowMessage(IntToStr(ClientDataSet1.RecordCount));
      finally
        N.Free;
        Q.Free;
        C.Free;
        D.Free;
        P.Free;
      end;
    end;
    

    If you are using Delphi version 14.0.3513.24210 or prior than that, you will find it doesn’t work, a “Invalid pointer operation” exception raise after that.

    I have found all the problems faced so far and the fixed are as follow.

    Troubleshoot: Invalid pointer operation

    There is a bug in DSUtil.StreamToDataPacket. I have file a report in QC#78666.

    Here is a fix without changing the DBX source code:

    unit DSUtil.QC78666;
    
    interface
    
    implementation
    
    uses SysUtils, Variants, VarUtils, ActiveX, Classes, DBXCommonResStrs, DSUtil,
         CodeRedirect;
    
    type
      THeader = class
        const
          Empty       = 1;
          Variant     = 2;
          DataPacket  = 3;
      end;
    
      PIntArray = ^TIntArray;
      TIntArray = array[0..0] of Integer;
    
      TVarFlag = (vfByRef, vfVariant);
      TVarFlags = set of TVarFlag;
    
      EInterpreterError = class(Exception);
    
      TVariantStreamer = class
      private
        class function ReadArray(VType: Integer; const Data: TStream): OleVariant;
      public
        class function ReadVariant(out Flags: TVarFlags; const Data: TStream): OleVariant;
      end;
    
    const
      EasyArrayTypes = [varSmallInt, varInteger, varSingle, varDouble, varCurrency,
                        varDate, varBoolean, varShortInt, varByte, varWord, varLongWord];
    
      VariantSize: array[0..varLongWord] of Word  = (0, 0, SizeOf(SmallInt), SizeOf(Integer),
        SizeOf(Single), SizeOf(Double), SizeOf(Currency), SizeOf(TDateTime), 0, 0,
        SizeOf(Integer), SizeOf(WordBool), 0, 0, 0, 0, SizeOf(ShortInt), SizeOf(Byte),
        SizeOf(Word), SizeOf(LongWord));
    
    class function TVariantStreamer.ReadArray(VType: Integer; const Data: TStream): OleVariant;
    var
      Flags: TVarFlags;
      LoDim, HiDim, Indices, Bounds: PIntArray;
      DimCount, VSize, i: Integer;
      V: OleVariant;
      LSafeArray: PSafeArray;
      P: Pointer;
    begin
      VarClear(Result);
      Data.Read(DimCount, SizeOf(DimCount));
      VSize := DimCount * SizeOf(Integer);
      GetMem(LoDim, VSize);
      try
        GetMem(HiDim, VSize);
        try
          Data.Read(LoDim^, VSize);
          Data.Read(HiDim^, VSize);
          GetMem(Bounds, VSize * 2);
          try
            for i := 0 to DimCount - 1 do
            begin
              Bounds[i * 2] := LoDim[i];
              Bounds[i * 2 + 1] := HiDim[i];
            end;
            Result := VarArrayCreate(Slice(Bounds^,DimCount * 2), VType and varTypeMask);
          finally
            FreeMem(Bounds);
          end;
          if VType and varTypeMask in EasyArrayTypes then
          begin
            Data.Read(VSize, SizeOf(VSize));
            P := VarArrayLock(Result);
            try
              Data.Read(P^, VSize);
            finally
              VarArrayUnlock(Result);
            end;
          end else
          begin
            LSafeArray := PSafeArray(TVarData(Result).VArray);
            GetMem(Indices, VSize);
            try
              FillChar(Indices^, VSize, 0);
              for I := 0 to DimCount - 1 do
                Indices[I] := LoDim[I];
              while True do
              begin
                V := ReadVariant(Flags, Data);
                if VType and varTypeMask = varVariant then
                  SafeArrayCheck(SafeArrayPutElement(LSafeArray, Indices^, V))
                else
                  SafeArrayCheck(SafeArrayPutElement(LSafeArray, Indices^, TVarData(V).VPointer^));
                Inc(Indices[DimCount - 1]);
                if Indices[DimCount - 1] > HiDim[DimCount - 1] then
                  for i := DimCount - 1 downto 0 do
                    if Indices[i] > HiDim[i] then
                    begin
                      if i = 0 then Exit;
                      Inc(Indices[i - 1]);
                      Indices[i] := LoDim[i];
                    end;
              end;
            finally
              FreeMem(Indices);
            end;
          end;
        finally
          FreeMem(HiDim);
        end;
      finally
        FreeMem(LoDim);
      end;
    end;
    
    class function TVariantStreamer.ReadVariant(out Flags: TVarFlags; const Data: TStream): OleVariant;
    var
      I, VType: Integer;
      W: WideString;
      TmpFlags: TVarFlags;
    begin
      VarClear(Result);
      Flags := [];
      Data.Read(VType, SizeOf(VType));
      if VType and varByRef = varByRef then
        Include(Flags, vfByRef);
      if VType = varByRef then
      begin
        Include(Flags, vfVariant);
        Result := ReadVariant(TmpFlags, Data);
        Exit;
      end;
      if vfByRef in Flags then
        VType := VType xor varByRef;
      if (VType and varArray) = varArray then
        Result := ReadArray(VType, Data) else
      case VType and varTypeMask of
        varEmpty: VarClear(Result);
        varNull: Result := NULL;
        varOleStr:
        begin
          Data.Read(I, SizeOf(Integer));
          SetLength(W, I);
          Data.Read(W[1], I * 2);
          Result := W;
        end;
        varDispatch, varUnknown:
          raise EInterpreterError.CreateResFmt(@SBadVariantType,[IntToHex(VType,4)]);
      else
        TVarData(Result).VType := VType;
        Data.Read(TVarData(Result).VPointer, VariantSize[VType and varTypeMask]);
      end;
    end;
    
    procedure StreamToDataPacket(const Stream: TStream; out VarBytes: OleVariant);
    var
      P: Pointer;
      ByteCount: Integer;
      Size: Int64;
    begin
      Stream.Read(Size, 8);
      ByteCount := Integer(Size);
      if ByteCount > 0 then
      begin
        VarBytes := VarArrayCreate([0, ByteCount-1], varByte);
        P := VarArrayLock(VarBytes);
        try
    //      Stream.Position := 0;   // QC#78666 "Mismatched in datapacket" with DSUtil.StreamToDataPacket
          Stream.Read(P^, ByteCount);
          Stream.Position := 0;
        finally
          VarArrayUnlock(VarBytes);
        end;
      end
      else
        VarBytes := Null;
    end;
    
    procedure StreamToVariantPatch(const Stream: TStream; out VariantValue: OleVariant);
    var
      Flags: TVarFlags;
      Header: Byte;
    begin
      if Assigned(Stream) then
      begin
        Stream.Position := 0;
        Stream.Read(Header, 1);
        if Header = THeader.Variant then
          VariantValue := TVariantStreamer.ReadVariant(Flags, Stream)
        else if Header = THeader.DataPacket then
          StreamToDataPacket(Stream, VariantValue)
        else
          Assert(false);
      end;
    end;
    
    var QC78666: TCodeRedirect;
    
    initialization
      QC78666 := TCodeRedirect.Create(@StreamToVariant, @StreamToVariantPatch);
    finalization
      QC78666.Free;
    end.
    

    Troubleshoot: I still encounter “Invalid Pointer Operation” after apply DSUtil.StreamToDataPacket patch

    I have filed this problem in QC#78752. An in-process DataSnap create an instance of TDSServerCommand. A method of TDSServerCommand create TDBXNoOpRow instance:

    function TDSServerCommand.CreateParameterRow: TDBXRow;
    begin
      Result := TDBXNoOpRow.Create(FDbxContext);
    end;
    

    Most of the methods in TDBXNoOpRow is not implemented. There are 2 methods in class TDBXNoOpRow, GetStream and SetStream are used in subsequence operations. This is the reason that cause the exception.

    After fix TDBXNoOpRow problem, the data packet will transport to ClientDataSet successfully.

    The fix is as follow:

    unit DBXCommonServer.QC78752;
    
    interface
    
    uses SysUtils, Classes, DBXCommon, DSCommonServer, DBXCommonTable;
    
    type
      TDSServerCommand_Patch = class(TDSServerCommand)
      protected
        function CreateParameterRowPatch: TDBXRow;
      end;
    
      TDBXNoOpRowPatch = class(TDBXNoOpRow)
      private
        function GetBytesFromStreamReader(const R: TDBXStreamReader; out Buf: TBytes): Integer;
      protected
        procedure GetStream(DbxValue: TDBXStreamValue; var Stream: TStream; var IsNull:
            LongBool); override;
        procedure SetStream(DbxValue: TDBXStreamValue; StreamReader: TDBXStreamReader);
            override;
        function UseExtendedTypes: Boolean; override;
      end;
    
      TDBXStreamValueAccess = class(TDBXByteArrayValue)
      private
        FStreamStreamReader: TDBXLookAheadStreamReader;
      end;
    
    implementation
    
    uses CodeRedirect;
    
    function TDSServerCommand_Patch.CreateParameterRowPatch: TDBXRow;
    begin
      Result := TDBXNoOpRowPatch.Create(FDbxContext);
    end;
    
    procedure TDBXNoOpRowPatch.GetStream(DbxValue: TDBXStreamValue; var Stream: TStream;
        var IsNull: LongBool);
    var iSize: integer;
        B: TBytes;
    begin
      iSize := GetBytesFromStreamReader(TDBXStreamValueAccess(DbxValue).FStreamStreamReader, B);
      IsNull := iSize = 0;
      if not IsNull then begin
        Stream := TMemoryStream.Create;
        Stream.Write(B[0], iSize);
      end;
    end;
    
    procedure TDBXNoOpRowPatch.SetStream(DbxValue: TDBXStreamValue; StreamReader:
        TDBXStreamReader);
    var B: TBytes;
        iSize: integer;
    begin
      iSize := GetBytesFromStreamReader(StreamReader, B);
      Dbxvalue.SetDynamicBytes(0, B, 0, iSize);
    end;
    
    function TDBXNoOpRowPatch.GetBytesFromStreamReader(const R: TDBXStreamReader; out Buf: TBytes):
        Integer;
    const BufSize = 50 * 1024;
    var iPos: integer;
        iRead: integer;
    begin
      Result := 0;
      while not R.Eos do begin
        SetLength(Buf, Result + BufSize);
        iPos := Result;
        iRead := R.Read(Buf, iPos, BufSize);
        Inc(Result, iRead);
      end;
      SetLength(Buf, Result);
    end;
    
    function TDBXNoOpRowPatch.UseExtendedTypes: Boolean;
    begin
      Result := True;
    end;
    
    var QC78752: TCodeRedirect;
    
    initialization
      QC78752 := TCodeRedirect.Create(@TDSServerCommand_Patch.CreateParameterRow, @TDSServerCommand_Patch.CreateParameterRowPatch);
    finalization
      QC78752.Free;
    end.
    

    Troubleshoot: Both patches applied and work for the example but I still encounter “Invalid Pointer Operation”

    This problem also filed in QC#78752. The problem is due to the following 2 methods:

    1. procedure TDBXStreamValue.SetValue
    2. function TDBXLookAheadStreamReader.ConvertToMemoryStream: TStream;

    TDBXLookAheadStreamReader.ConvertToMemoryStream return a managed FStream object to TDBXStreamValue.SetValue. This stream object become another managed object of TDBXStreamValue. It turns out that a Stream object managed by two objects and the exception raised when these 2 objects attempt to free the Stream object:

    procedure TDBXStreamValue.SetValue(const Value: TDBXValue);
    begin
      if Value.IsNull then
        SetNull
      else
      begin
        SetStream(Value.GetStream(False), True);
      end;
    end;
    function TDBXLookAheadStreamReader.ConvertToMemoryStream: TStream;
    ...
    begin
      if FStream = nil then
        Result := nil
      else
      begin
        Count := Size;
        if not (FStream is TMemoryStream) then
        begin
          ...
          StreamTemp := FStream;
          FStream := Stream;
          FreeAndNil(StreamTemp);
        end;
        FStream.Seek(0, soFromBeginning);
        FHasLookAheadByte := false;
        Result := FStream;
      end;
    end;
    

    The fix is as follow:

    unit DBXCommon.QC78752;
    
    interface
    
    implementation
    
    uses SysUtils, Classes, DBXCommon, CodeRedirect;
    
    type
      TDBXLookAheadStreamReaderAccess = class(TDBXStreamReader)
      private
        FStream: TStream;
        FEOS:               Boolean;
        FHasLookAheadByte:  Boolean;
        FLookAheadByte:     Byte;
      end;
    
      TDBXLookAheadStreamReaderHelper = class helper for TDBXLookAheadStreamReader
      private
        function Accessor: TDBXLookAheadStreamReaderAccess;
      public
        function ConvertToMemoryStreamPatch: TStream;
      end;
    
    function TDBXLookAheadStreamReaderHelper.Accessor:
        TDBXLookAheadStreamReaderAccess;
    begin
      Result := TDBXLookAheadStreamReaderAccess(Self);
    end;
    
    function TDBXLookAheadStreamReaderHelper.ConvertToMemoryStreamPatch: TStream;
    var
      Stream: TMemoryStream;
      StreamTemp: TStream;
      Count: Integer;
      Buffer: TBytes;
      ReadBytes: Integer;
    begin
      if Accessor.FStream = nil then
        Result := nil
      else
      begin
        Count := Size;
        if not (Accessor.FStream is TMemoryStream) then
        begin
          Stream := TMemoryStream.Create;
          if Count >= 0 then
            Stream.SetSize(Count);
          if Accessor.FHasLookAheadByte then
            Stream.Write(Accessor.FLookAheadByte, 1);
          SetLength(Buffer, 256);
          while true do
          begin
            ReadBytes := Accessor.FStream.Read(Buffer, Length(Buffer));
            if ReadBytes > 0 then
              Stream.Write(Buffer, ReadBytes)
            else
              Break;
          end;
          StreamTemp := Accessor.FStream;
          Accessor.FStream := Stream;
          FreeAndNil(StreamTemp);
          Result := Accessor.FStream;
        end else begin
          Stream := TMemoryStream.Create;
          Accessor.FStream.Seek(0, soFromBeginning);
          Stream.CopyFrom(Accessor.FStream, Accessor.FStream.Size);
        end;
        Stream.Seek(0, soFromBeginning);
        Accessor.FHasLookAheadByte := false;
    
        Result := Stream;
    //    Stream := TMemoryStream.Create;
    //    Stream.LoadFromStream(FStream);
    //    FStream.Seek(0, soFromBeginning);
    //    Result := Stream;
      end;
    end;
    
    var QC78752: TCodeRedirect;
    
    initialization
      QC78752 := TCodeRedirect.Create(@TDBXLookAheadStreamReader.ConvertToMemoryStream, @TDBXLookAheadStreamReader.ConvertToMemoryStreamPatch);
    finalization
      QC78752.Free;
    end.
    

    Troubleshoot: I encounter memory leaks after close the application

    There is a memory leaks in TDSServerConnection for in-process connection. I have filed a report in QC#78696.

    Here is the fix:

    unit DSServer.QC78696;
    
    interface
    
    implementation
    
    uses SysUtils,
         DBXCommon, DSServer, DSCommonServer, DBXMessageHandlerCommon, DBXSqlScanner,
         DBXTransport,
         CodeRedirect;
    
    type
      TDSServerConnectionHandlerAccess = class(TDBXConnectionHandler)
        FConProperties: TDBXProperties;
        FConHandle: Integer;
        FServer: TDSCustomServer;
        FDatabaseConnectionHandler: TObject;
        FHasServerConnection: Boolean;
        FInstanceProvider: TDSHashtableInstanceProvider;
        FCommandHandlers: TDBXCommandHandlerArray;
        FLastCommandHandler: Integer;
        FNextHandler: TDBXConnectionHandler;
        FErrorMessage: TDBXErrorMessage;
        FScanner: TDBXSqlScanner;
        FDbxConnection: TDBXConnection;
        FTransport: TDSServerTransport;
        FChannel: TDbxChannel;
        FCreateInstanceEventObject: TDSCreateInstanceEventObject;
        FDestroyInstanceEventObject: TDSDestroyInstanceEventObject;
        FPrepareEventObject: TDSPrepareEventObject;
        FConnectEventObject: TDSConnectEventObject;
        FErrorEventObject: TDSErrorEventObject;
        FServerCon: TDSServerConnection;
      end;
    
      TDSServerConnectionPatch = class(TDSServerConnection)
      public
        destructor Destroy; override;
      end;
    
      TDSServerDriverPatch = class(TDSServerDriver)
      protected
        function CreateConnectionPatch(ConnectionBuilder: TDBXConnectionBuilder): TDBXConnection;
      end;
    
    destructor TDSServerConnectionPatch.Destroy;
    begin
      inherited Destroy;
      TDSServerConnectionHandlerAccess(ServerConnectionHandler).FServerCon := nil;
      ServerConnectionHandler.Free;
    end;
    
    function TDSServerDriverPatch.CreateConnectionPatch(
      ConnectionBuilder: TDBXConnectionBuilder): TDBXConnection;
    begin
      Result := TDSServerConnectionPatch.Create(ConnectionBuilder);
    end;
    
    var QC78696: TCodeRedirect;
    
    initialization
      QC78696 := TCodeRedirect.Create(@TDSServerDriverPatch.CreateConnection, @TDSServerDriverPatch.CreateConnectionPatch);
    finalization
      QC78696.Free;
    end.