libQtCassandra 0.3.2
|
00001 /* 00002 * Text: 00003 * QCassandraPrivate.cpp 00004 * 00005 * Description: 00006 * Handling of the cassandra::CassandraClient and corresponding transports, 00007 * protocols, sockets, etc. 00008 * 00009 * Documentation: 00010 * See each function below. 00011 * 00012 * License: 00013 * Copyright (c) 2011 Made to Order Software Corp. 00014 * 00015 * http://snapwebsites.org/ 00016 * contact@m2osw.com 00017 * 00018 * Permission is hereby granted, free of charge, to any person obtaining a 00019 * copy of this software and associated documentation files (the 00020 * "Software"), to deal in the Software without restriction, including 00021 * without limitation the rights to use, copy, modify, merge, publish, 00022 * distribute, sublicense, and/or sell copies of the Software, and to 00023 * permit persons to whom the Software is furnished to do so, subject to 00024 * the following conditions: 00025 * 00026 * The above copyright notice and this permission notice shall be included 00027 * in all copies or substantial portions of the Software. 00028 * 00029 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 00030 * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 00031 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. 00032 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY 00033 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, 00034 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE 00035 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 00036 */ 00037 00038 #include "QCassandraPrivate.h" 00039 #include <protocol/TBinaryProtocol.h> 00040 #include <transport/TSocket.h> 00041 #include <transport/TTransportUtils.h> 00042 00043 #include <QtCore/QDebug> 00044 00045 namespace QtCassandra 00046 { 00047 00048 00117 QCassandraPrivate::QCassandraPrivate(QCassandra *parent) 00118 : f_parent(parent) 00119 //f_socket(NULL) -- auto-initialized 00120 //f_transport(NULL) -- auto-initialized 00121 //f_protocol(NULL) -- auto-initialized 00122 //f_client(NULL) -- auto-initialized 00123 { 00124 } 00125 00142 QCassandraPrivate::~QCassandraPrivate() 00143 { 00144 disconnect(); 00145 } 00146 00173 bool QCassandraPrivate::connect(const QString& host, int port) 00174 { 00175 bool worked = true; 00176 00177 // disconnect any existing connection 00178 disconnect(); 00179 00180 try { 00181 // create a socket, transportation, protocol, and client 00182 // the client is what we use to communicate with the Cassandra server 00183 f_socket.reset(new apache::thrift::transport::TSocket(host.toUtf8().data(), port)); 00184 f_transport.reset(new apache::thrift::transport::TFramedTransport(f_socket)); 00185 f_protocol.reset(new apache::thrift::protocol::TBinaryProtocol(f_transport)); 00186 f_client.reset(new org::apache::cassandra::CassandraClient(f_protocol)); 00187 00188 // once everything is connected as it should, open the transport link 00189 f_transport->open(); 00190 } 00191 catch(...) { 00192 // TBD: should we check for some specific error here? 00193 worked = false; 00194 } 00195 00196 // if it failed, make sure to clear all the pointers 00197 if(!worked) { 00198 disconnect(); 00199 } 00200 00201 return worked; 00202 } 00203 00211 void QCassandraPrivate::disconnect() 00212 { 00213 if(f_client.get()) { 00214 // this is probably not necessary (it should anyway happen as required) 00215 f_transport->close(); 00216 } 00217 f_client.reset(); 00218 f_protocol.reset(); 00219 f_transport.reset(); 00220 f_socket.reset(); 00221 } 00222 00233 bool QCassandraPrivate::isConnected() const 00234 { 00235 return f_client.get() != NULL; 00236 } 00237 00248 void QCassandraPrivate::mustBeConnected() const throw(std::runtime_error) 00249 { 00250 if(!isConnected()) { 00251 throw std::runtime_error("not connected to the Cassandra server."); 00252 } 00253 } 00254 00262 QString QCassandraPrivate::clusterName() const 00263 { 00264 mustBeConnected(); 00265 std::string cluster_name; 00266 f_client->describe_cluster_name(cluster_name); 00267 return cluster_name.c_str(); 00268 } 00269 00277 QString QCassandraPrivate::protocolVersion() const 00278 { 00279 mustBeConnected(); 00280 std::string protocol_version; 00281 f_client->describe_version(protocol_version); 00282 return protocol_version.c_str(); 00283 } 00284 00293 void QCassandraPrivate::setContext(const QString& context_name) 00294 { 00295 mustBeConnected(); 00296 f_client->set_keyspace(context_name.toUtf8().data()); 00297 } 00298 00313 void QCassandraPrivate::contexts() const 00314 { 00315 mustBeConnected(); 00316 00317 // retrieve the key spaces from Cassandra 00318 std::vector<org::apache::cassandra::KsDef> keyspaces; 00319 f_client->describe_keyspaces(keyspaces); 00320 00321 for(std::vector<org::apache::cassandra::KsDef>::const_iterator 00322 ks = keyspaces.begin(); ks != keyspaces.end(); ++ks) { 00323 QSharedPointer<QCassandraContext> c(f_parent->context(ks->name.c_str())); 00324 const org::apache::cassandra::KsDef& ks_def = *ks; 00325 c->parseContextDefinition(&ks_def); 00326 } 00327 } 00328 00345 void QCassandraPrivate::createContext(const QCassandraContext& context) 00346 { 00347 mustBeConnected(); 00348 org::apache::cassandra::KsDef ks; 00349 context.prepareContextDefinition(&ks); 00350 std::string id_ignore; 00351 f_client->system_add_keyspace(id_ignore, ks); 00352 } 00353 00366 void QCassandraPrivate::updateContext(const QCassandraContext& context) 00367 { 00368 mustBeConnected(); 00369 org::apache::cassandra::KsDef ks; 00370 context.prepareContextDefinition(&ks); 00371 std::string id_ignore; 00372 f_client->system_update_keyspace(id_ignore, ks); 00373 } 00374 00384 void QCassandraPrivate::dropContext(const QCassandraContext& context) 00385 { 00386 mustBeConnected(); 00387 f_parent->clearCurrentContextIf(context); 00388 std::string id_ignore; 00389 f_client->system_drop_keyspace(id_ignore, context.contextName().toUtf8().data()); 00390 } 00391 00400 void QCassandraPrivate::createTable(const QCassandraTable *table) 00401 { 00402 mustBeConnected(); 00403 org::apache::cassandra::CfDef cf; 00404 table->prepareTableDefinition(&cf); 00405 std::string id_ignore; 00406 f_client->system_add_column_family(id_ignore, cf); 00407 } 00408 00415 void QCassandraPrivate::updateTable(const QCassandraTable *table) 00416 { 00417 mustBeConnected(); 00418 org::apache::cassandra::CfDef cf; 00419 table->prepareTableDefinition(&cf); 00420 std::string id_ignore; 00421 f_client->system_update_column_family(id_ignore, cf); 00422 } 00423 00430 void QCassandraPrivate::dropTable(const QString& table_name) 00431 { 00432 mustBeConnected(); 00433 std::string id_ignore; 00434 f_client->system_drop_column_family(id_ignore, table_name.toUtf8().data()); 00435 } 00436 00444 void QCassandraPrivate::truncateTable(const QCassandraTable *table) 00445 { 00446 mustBeConnected(); 00447 f_client->truncate(table->tableName().toUtf8().data()); 00448 } 00449 00462 void QCassandraPrivate::insertValue(const QString& table_name, const QByteArray& row_key, const QByteArray& column_key, const QCassandraValue& value) 00463 { 00464 mustBeConnected(); 00465 00466 std::string key(row_key.data(), row_key.size()); 00467 00468 org::apache::cassandra::ColumnParent column_parent; 00469 column_parent.__set_column_family(table_name.toUtf8().data()); 00470 // no super column support here 00471 00472 org::apache::cassandra::Column column; 00473 std::string ck(column_key.data(), column_key.size()); 00474 column.__set_name(ck); 00475 00476 const QByteArray& data(value.binaryValue()); 00477 column.__set_value(std::string(data.data(), data.size())); // "unavoidable" copy of the data 00478 00479 QCassandraValue::timestamp_mode_t mode = value.timestampMode(); 00480 switch(mode) { 00481 case QCassandraValue::TIMESTAMP_MODE_AUTO: 00482 column.__set_timestamp(QCassandra::timeofday()); 00483 break; 00484 00485 case QCassandraValue::TIMESTAMP_MODE_DEFINED: 00486 // user defined 00487 column.__set_timestamp(value.timestamp()); 00488 break; 00489 00490 } 00491 00492 if(value.ttl() != QCassandraValue::TTL_PERMANENT) { 00493 column.__set_ttl(value.ttl()); 00494 } 00495 00496 // our consistency level is 100% based on the Thrift consistency level 00497 // a cast is enough to get the value we want to get 00498 // (see the QCassandraValue.cpp file) 00499 00500 consistency_level_t consistency_level = value.consistencyLevel(); 00501 if(consistency_level == CONSISTENCY_LEVEL_DEFAULT) { 00502 consistency_level = f_parent->defaultConsistencyLevel(); 00503 } 00504 00505 f_client->insert(key, column_parent, column, static_cast<org::apache::cassandra::ConsistencyLevel::type>(static_cast<cassandra_consistency_level_t>(consistency_level))); 00506 } 00507 00520 void QCassandraPrivate::getValue(const QString& table_name, const QByteArray& row_key, const QByteArray& column_key, QCassandraValue& value) 00521 { 00522 mustBeConnected(); 00523 00524 std::string key(row_key.data(), row_key.size()); 00525 00526 org::apache::cassandra::ColumnPath column_path; 00527 column_path.__set_column_family(table_name.toUtf8().data()); 00528 // no super column support here 00529 column_path.__set_column(column_key.data()); 00530 00531 org::apache::cassandra::ColumnOrSuperColumn column_result; 00532 00533 // our consistency level is 100% based on the Thrift consistency level 00534 // a cast is enough to get the value we want to get 00535 // (see the QCassandraValue.cpp file) 00536 00537 consistency_level_t consistency_level = value.consistencyLevel(); 00538 if(consistency_level == CONSISTENCY_LEVEL_DEFAULT) { 00539 consistency_level = f_parent->defaultConsistencyLevel(); 00540 } 00541 00542 f_client->get(column_result, key, column_path, static_cast<org::apache::cassandra::ConsistencyLevel::type>(static_cast<cassandra_consistency_level_t>(consistency_level))); 00543 00544 if(!column_result.__isset.column) { 00545 throw std::runtime_error("attempt to retrieve a cell failed"); 00546 } 00547 00548 // we got a column, copy the data to the value parameter 00549 if(column_result.column.__isset.value) { 00550 value.setBinaryValue(QByteArray(column_result.column.value.c_str(), column_result.column.value.length())); 00551 } 00552 else { 00553 // undefined we assume empty... 00554 value.setNullValue(); 00555 } 00556 if(column_result.column.__isset.timestamp) { 00557 value.assignTimestamp(column_result.column.timestamp); 00558 } 00559 if(column_result.column.__isset.ttl) { 00560 value.setTtl(column_result.column.ttl); 00561 } 00562 } 00563 00578 int32_t QCassandraPrivate::getCellCount(const QString& table_name, const QByteArray& row_key, const QCassandraColumnPredicate& column_predicate) 00579 { 00580 mustBeConnected(); 00581 00582 std::string key(row_key.data(), row_key.size()); 00583 00584 org::apache::cassandra::ColumnParent column_parent; 00585 column_parent.__set_column_family(table_name.toUtf8().data()); 00586 // no super column support here 00587 00588 org::apache::cassandra::SlicePredicate slice_predicate; 00589 column_predicate.toPredicate(&slice_predicate); 00590 00591 // our consistency level is 100% based on the Thrift consistency level 00592 // a cast is enough to get the value we want to get 00593 // (see the QCassandraValue.cpp file) 00594 00595 consistency_level_t consistency_level = column_predicate.consistencyLevel(); 00596 if(consistency_level == CONSISTENCY_LEVEL_DEFAULT) { 00597 consistency_level = f_parent->defaultConsistencyLevel(); 00598 } 00599 00600 return f_client->get_count(key, column_parent, slice_predicate, static_cast<org::apache::cassandra::ConsistencyLevel::type>(static_cast<cassandra_consistency_level_t>(consistency_level))); 00601 } 00602 00615 void QCassandraPrivate::getColumnSlice(QCassandraTable& table, const QByteArray& row_key, const QCassandraColumnPredicate& column_predicate) 00616 { 00617 mustBeConnected(); 00618 00619 std::string key(row_key.data(), row_key.size()); 00620 00621 org::apache::cassandra::ColumnParent column_parent; 00622 column_parent.__set_column_family(table.tableName().toUtf8().data()); 00623 // no super column support here 00624 00625 org::apache::cassandra::SlicePredicate slice_predicate; 00626 column_predicate.toPredicate(&slice_predicate); 00627 00628 typedef std::vector<org::apache::cassandra::ColumnOrSuperColumn> column_vector_t; 00629 column_vector_t results; 00630 00631 // our consistency level is 100% based on the Thrift consistency level 00632 // a cast is enough to get the value we want to get 00633 // (see the QCassandraValue.cpp file) 00634 00635 consistency_level_t consistency_level = column_predicate.consistencyLevel(); 00636 if(consistency_level == CONSISTENCY_LEVEL_DEFAULT) { 00637 consistency_level = f_parent->defaultConsistencyLevel(); 00638 } 00639 00640 f_client->get_slice(results, key, column_parent, slice_predicate, static_cast<org::apache::cassandra::ConsistencyLevel::type>(static_cast<cassandra_consistency_level_t>(consistency_level))); 00641 00642 // we got results, copy the data to the table cache 00643 for(column_vector_t::iterator it = results.begin(); it != results.end(); ++it) { 00644 // transform the value of the cell to a QCassandraValue 00645 QCassandraValue value; 00646 if(it->column.__isset.value) { 00647 value.setBinaryValue(QByteArray(it->column.value.c_str(), it->column.value.length())); 00648 } 00649 if(it->column.__isset.timestamp) { 00650 value.assignTimestamp(it->column.timestamp); 00651 } 00652 if(it->column.__isset.ttl) { 00653 value.setTtl(it->column.ttl); 00654 } 00655 00656 // save the cell in the corresponding table, row, cell 00657 table.assignRow(row_key, it->column.name.c_str(), value); 00658 } 00659 } 00660 00672 void QCassandraPrivate::remove(const QString& table_name, const QByteArray& row_key, const QByteArray& column_key, int64_t timestamp, consistency_level_t consistency_level) 00673 { 00674 mustBeConnected(); 00675 00676 std::string key(row_key.data(), row_key.size()); 00677 00678 org::apache::cassandra::ColumnPath column_path; 00679 column_path.__set_column_family(table_name.toUtf8().data()); 00680 // no super column support here 00681 if(column_key.size() > 0) { 00682 // when column_key is empty we want to remove all the columns! 00683 std::string ckey(column_key.data(), column_key.size()); 00684 column_path.__set_column(ckey); 00685 } 00686 00687 if(consistency_level == CONSISTENCY_LEVEL_DEFAULT) { 00688 consistency_level = f_parent->defaultConsistencyLevel(); 00689 } 00690 00691 f_client->remove(key, column_path, timestamp, static_cast<org::apache::cassandra::ConsistencyLevel::type>(static_cast<cassandra_consistency_level_t>(consistency_level))); 00692 } 00693 00704 uint32_t QCassandraPrivate::getRowSlices(QCassandraTable& table, const QCassandraRowPredicate& row_predicate) 00705 { 00706 mustBeConnected(); 00707 00708 org::apache::cassandra::ColumnParent column_parent; 00709 column_parent.__set_column_family(table.tableName().toUtf8().data()); 00710 // no super column support here 00711 00712 QSharedPointer<QCassandraColumnPredicate> column_predicate(row_predicate.columnPredicate()); 00713 org::apache::cassandra::SlicePredicate slice_predicate; 00714 column_predicate->toPredicate(&slice_predicate); 00715 00716 org::apache::cassandra::KeyRange key_range; 00717 row_predicate.toPredicate(&key_range); 00718 00719 typedef std::vector<org::apache::cassandra::KeySlice> key_slice_vector_t; 00720 key_slice_vector_t results; 00721 00722 // our consistency level is 100% based on the Thrift consistency level 00723 // a cast is enough to get the value we want to get 00724 // (see the QCassandraValue.cpp file) 00725 00726 consistency_level_t consistency_level = column_predicate->consistencyLevel(); 00727 if(consistency_level == CONSISTENCY_LEVEL_DEFAULT) { 00728 consistency_level = f_parent->defaultConsistencyLevel(); 00729 } 00730 00731 f_client->get_range_slices(results, column_parent, slice_predicate, key_range, static_cast<org::apache::cassandra::ConsistencyLevel::type>(static_cast<cassandra_consistency_level_t>(consistency_level))); 00732 00733 // we got results, copy the data to the table cache 00734 for(key_slice_vector_t::iterator it = results.begin(); it != results.end(); ++it) { 00735 QByteArray row_key(it->key.c_str(), it->key.size()); 00736 typedef std::vector<org::apache::cassandra::ColumnOrSuperColumn> column_vector_t; 00737 for(column_vector_t::iterator jt = it->columns.begin(); jt != it->columns.end(); ++jt) { 00738 // transform the value of the cell to a QCassandraValue 00739 QCassandraValue value; 00740 if(jt->column.__isset.value) { 00741 value.setBinaryValue(QByteArray(jt->column.value.c_str(), jt->column.value.length())); 00742 } 00743 if(jt->column.__isset.timestamp) { 00744 value.assignTimestamp(jt->column.timestamp); 00745 } 00746 if(jt->column.__isset.ttl) { 00747 value.setTtl(jt->column.ttl); 00748 } 00749 00750 // save the cell in the corresponding table, row, cell 00751 table.assignRow(row_key, jt->column.name.c_str(), value); 00752 } 00753 } 00754 00755 return results.size(); 00756 } 00757 00758 } // namespace QtCassandra 00759 // vim: ts=4 sw=4 et
This document is part of the libQtCassandra Project.
Copyright by Made to Order Software Corp.