Search code examples
hadoopkerberosaccumulo

Connecting to Accumulo inside a Mapper using Kerberos


I am moving some software from an older Hadoop Cluster (uses username/password authentication) to a newer one, 2.6.0-cdh5.12.0 which has Kerberos authentication enabled.

I have been able to get many of existing Map/Reduce jobs that use Accumulo for its input and/or output to work fine using a DelegationToken set in the AccumuloInput/OutputFormat classes.

However, I have 1 job, that uses AccumuloInput/OutputFormat for input and output, but also inside its Mapper.setup() method, it connects to Accumulo via Zookeeper so that in the Mapper.map() method, it can compare each key/value being processed my the Mapper.map() to and entry in another Accumulo table.

I included the relevant code below which shows the setup() method connecting to Zookeeper user a PasswordToken and then creating an Accumulo table Scanner which is then used in the mapper method.

So the question is how do I replace the use of the PasswordToken with a KerberosToken for setting up the Accumulo scanner in the Mapper.setup() method? I can find no way to "get" the DelegationToken used by the AccumuloInput/OutputFormat classes that I set.

I have tried context.getCredentials().getAllTokens() and looking for a token of type org.apache.accumulo.code.client.security.tokens.AuthenticationToken -- all of the tokens returned here are of type org.apache.hadoop.security.token.Token.

Please note that I typed the code fragments in versus cut/paste as the code runs on a network unconnected to the internet - aka there may be a typo. :)

//****************************
// code in the M/R driver
//****************************
ClientConfiguration accumuloCfg = ClientConfiguration.loadDefault().withInstance("Accumulo1").withZkHosts("zookeeper1");
ZooKeeperInstance inst = new ZooKeeperInstance(accumuloCfg);
AuthenticationToken dt = conn.securityOperations().getDelegationToken(new DelagationTokenConfig());
AccumuloInputFormat.setConnectorInfo(job, username, dt);
AccumuloOutputFormat.setConnectorInfo(job, username, dt);
// other job setup and then
job.waitForCompletion(true)



//****************************
// this is inside the Mapper class of the M/R job
//****************************
private Scanner index_scanner;

public void setup(Context context) {
    Configuration cfg = context.getConfiguration();

    // properties set and passed from M/R Driver program
    String username = cfg.get("UserName");
    String password = cfg.get("Password");
    String accumuloInstName = cfg.get("InstanceName");
    String zookeepers = cfg.get("Zookeepers");
    String tableName = cfg.get("TableName");
    Instance inst = new ZooKeeperInstance(accumuloInstName, zookeepers);
    try {
      AuthenticationToken passwordToken = new PasswordToken(password);

      Connector conn = inst.getConnector(username, passwordToken);

      index_scanner = conn.createScanner(tableName, conn.securityOperations().getUserAuthorizations(username));
    } catch(Exception e) {
       e.printStackTrace();
    }
}

public void map(Key key, Value value, Context context) throws IOException, InterruptedException {
    String uuid = key.getRow().toString();
    index_scanner.clearColumns();
    index_scanner.setRange(Range.exact(uuid));
    for(Entry<Key, Value> entry : index_scanner) {
        // do some processing in here
    }
}

Solution

  • The provided AccumuloInputFormat and AccumuloOutputFormat have a method to set the token in the job configuration with the Accumulo*putFormat.setConnectorInfo(job, principle, token). You can also serialize the token in a file in HDFS, using the AuthenticationTokenSerializer and use the version of the setConnectorInfo method which accepts a file name.

    If a KerberosToken is passed in, the job will create a DelegationToken to use, and if a DelegationToken is passed in, it will just use that.

    The provided AccumuloInputFormat should handle its own scanner, so normally, you shouldn't have to do that in your Mapper if you've set the configuration properly. However, if you're doing secondary scanning (for something like a join) inside your Mapper, you can inspect the provided AccumuloInputFormat's RecordReader source code for an example of how to retrieve the configuration and construct a Scanner.