BasicColumnFamilyDefinition columnFamilyDefinition = new
BasicColumnFamilyDefinition();
columnFamilyDefinition.setColumnType(ColumnType.STANDARD);
columnFamilyDefinition.setName(CFName);
boolean cfDefFound = false;
for (ColumnFamilyDefinition cfDef :
bamKeyspaceDefinition.getCfDefs()) {
log.info(“CF found : ” + cfDef.getName());
if (cfDef.getName().equals(CFName)) {
cfDefFound = true;
break;
}
}
// Column Family not found, so create it
if (!cfDefFound) {
ThriftCfDef cfDef = new
ThriftCfDef(columnFamilyDefinition);
cluster.addColumnFamily(cfDef);
}
cfList.add(CFName);
}
return true;
}
Problem is in a loaded scenario where multiple threads go through this code, it results in the following exception.
{me.prettyprint.cassandra.hector.TimingLogger} – start[1312305404883]
time[9] tag[META_WRITE.fail_]
me.prettyprint.hector.api.exceptions.HInvalidRequestException:
InvalidRequestException(why:CF is already defined in that keyspace.)
at
me.prettyprint.cassandra.service.ExceptionsTranslatorImpl.translate(Excepti onsTranslatorImpl.java:
42)
at me.prettyprint.cassandra.service.ThriftCluster
$3.execute(ThriftCluster.java:68)
at me.prettyprint.cassandra.service.ThriftCluster
$3.execute(ThriftCluster.java:62)
at
me.prettyprint.cassandra.service.Operation.executeAndSetResult(Operation.ja va:
101)
at
me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover( HConnectionManager.java:
156)
at
me.prettyprint.cassandra.service.ThriftCluster.addColumnFamily(ThriftCluste r.java:
72)
at
org.wso2.carbon.bam.receiver.persistence.NoSQLDataStore.createCF(NoSQLDataS tore.java:
168)
at
org.wso2.carbon.bam.receiver.persistence.NoSQLDataStore.persistData(NoSQLDa taStore.java:
124)
at
org.wso2.carbon.bam.receiver.persistence.PersistenceManager.persistEvent(Pe rsistenceManager.java:
55)
at
org.wso2.carbon.bam.receiver.internal.QueueWorker.run(QueueWorker.java:
69)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
Caused by: InvalidRequestException(why:CF is already defined in that
keyspace.)
at org.apache.cassandra.thrift.Cassandra
$system_add_column_family_result.read(Cassandra.java:23375)
at org.apache.cassandra.thrift.Cassandra
$Client.recv_system_add_column_family(Cassandra.java:1333)
at org.apache.cassandra.thrift.Cassandra
$Client.system_add_column_family(Cassandra.java:1308)
at me.prettyprint.cassandra.service.ThriftCluster
$3.execute(ThriftCluster.java:66)
This happens because, schema change methods such as cluster.addColumnFamily(cfDef) does not block. Cassandra takes time to propagate this schema change across it’s nodes. Then the next thread that enters the synchronized block, fails the check to see that this column family already exists and tries to add it again. This results in the above exception.
Workaround: When you are doing schema changes such as this, try to do it in in your initialization code. Then as data is getting stored these changes will already get propagated without hitting this sort of exceptions. Or of course, if you have a static configuration, just make the changes in the Cassandra yaml file.