Search code examples
c#cassandradatastax-enterprisecassandra-3.0datastax-csharp-driver

How to store paging state in Cassandra c# driver?


I have read couple of links to implement manual paging using Cassandra c# driver.

Links referred:

Backward paging in cassandra c# driver

https://datastax.github.io/csharp-driver/features/paging/

My requirement:

I am trying to get list of all distinct partition keys form table which is too big in size.

Because of size Cassandra db is throwing error in between retrieving them or on the first execution of query. Now suppose it failed after fetching 100000 distinct partition keys I will use the Paging state provided by Cassandra c# driver.

Now I am saving the last available page state before failing to log file and use it again to continue from where it failed.

I am saving the paging state into log file using:

Encoding.ASCII.GetString(pagingState);

And retrieving form log file using:

Encoding.ASCII.GetBytes(pagingState);

But when I pass it to .SetPagingState(pagingState) and execute the query it throws exception like:

java.lang.IllegalStateException: Cannot call hasNext() until the previous iterator has been fully consumed

I compared byte by byte array bytes before saving into file and after retrieving them from file. Few values in byte array are different. I tried with UIF8 encoding but no use.

NOTE: It works perfectly when I pass byte array without converting. I mean the below if condition code works perfectly.

if (pagingState != null)
{
     GenerateInitialLogs(pagingState);
}

Full functions:

    private void BtnGetPrimaryKeys_Click(object sender, EventArgs e)
    {
        string fileContent = File.ReadAllText("D:/Logs/log.txt");            
        if(fileContent.Length > 0)
        {
            GenerateInitialLogs(Encoding.ASCII.GetBytes(fileContent));
        }
        else
        {
            GenerateInitialLogs(null);
        }
    }

    private void Log(byte[] pagingState)
    {
        File.WriteAllText("D:/Logs/log.txt", Encoding.ASCII.GetString(pagingState));    
    }

    private int GenerateInitialLogs(byte[] pagingState)
    {            
        try
        {
            RowSet rowSet = BLL.SelectDistinctPrimaryKeys(pagingState);

            List<PrimaryKey> distinctPrimaryKeys = new List<PrimaryKey>();
            foreach (Row row in rowSet)
            {
                if (rowSet.PagingState != null) { pagingState = new byte[rowSet.PagingState.Length]; }
                pagingState = rowSet.PagingState;
            }
            Log(pagingState)

            if (pagingState != null)
            {
                GenerateInitialLogs(pagingState);
            }
        }
        catch(Exception ex)
        {
            throw ex;
        }
    }

    public static RowSet SelectDistinctPrimaryKeysFromTagReadings(byte[] pagingState)
    {
        try
        {
            // will execute on continuing after failing in between. 
            if (pagingState != null)
            {
                PreparedStatement preparedStatement = BLL.currentSession.Prepare("SELECT DISTINCT \"Url\",\"Id\" FROM \"Readings\" ");
                BoundStatement boundStatement = preparedStatement.Bind();
                IStatement istatement = boundStatement.SetAutoPage(false).SetPageSize(1000).SetPagingState(pagingState);
                return BLL.currentSession.Execute(istatement);
            }
            else
            {
                PreparedStatement preparedStatement = BLL.currentSession.Prepare("SELECT DISTINCT \"Url\",\"Id\" FROM \"Readings\" ");
                BoundStatement boundStatement = preparedStatement.Bind();
                IStatement istatement = boundStatement.SetAutoPage(false).SetPageSize(1000);
                return BLL.currentSession.Execute(istatement);                    
            }
        }
        catch (Exception ex)
        {
            throw ex;
        }
    }

Solution

  • This solution is not figured out by me. It's done by Jorge Bay Gondra (employee of datastax).

    Original answer:

    https://groups.google.com/a/lists.datastax.com/forum/#!topic/csharp-driver-user/4XWTXZC-hyI

    Solution:

    Can't convert them into ASCII or UIF8 or any encoding because they don't represent text.

    Use these functions to convert byte array into hexadecimal and vice versa.

    public static string ByteArrayToHexaDecimalString(byte[] bytes)
    {
         StringBuilder stringBuilder = new StringBuilder(bytes.Length * 2);
         foreach (byte b in bytes) { stringBuilder.AppendFormat("{0:x2}", b); }
         return stringBuilder.ToString();
    }
    
    public static byte[] HexaDecimalStringToByteArray(String hexaDecimalString)
    {
         int NumberChars = hexaDecimalString.Length;
         byte[] bytes = new byte[NumberChars / 2];
         for (int i = 0; i < NumberChars; i += 2)
         { 
             bytes[i / 2] = Convert.ToByte(hexaDecimalString.Substring(i, 2), 16); 
         }
         return bytes;
    }