libQtCassandra 0.3.2

QCassandraPrivate.cpp

Go to the documentation of this file.
00001 /*
00002  * Text:
00003  *      QCassandraPrivate.cpp
00004  *
00005  * Description:
00006  *      Handling of the cassandra::CassandraClient and corresponding transports,
00007  *      protocols, sockets, etc.
00008  *
00009  * Documentation:
00010  *      See each function below.
00011  *
00012  * License:
00013  *      Copyright (c) 2011 Made to Order Software Corp.
00014  * 
00015  *      http://snapwebsites.org/
00016  *      contact@m2osw.com
00017  * 
00018  *      Permission is hereby granted, free of charge, to any person obtaining a
00019  *      copy of this software and associated documentation files (the
00020  *      "Software"), to deal in the Software without restriction, including
00021  *      without limitation the rights to use, copy, modify, merge, publish,
00022  *      distribute, sublicense, and/or sell copies of the Software, and to
00023  *      permit persons to whom the Software is furnished to do so, subject to
00024  *      the following conditions:
00025  *
00026  *      The above copyright notice and this permission notice shall be included
00027  *      in all copies or substantial portions of the Software.
00028  *
00029  *      THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
00030  *      OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
00031  *      MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
00032  *      IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
00033  *      CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
00034  *      TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
00035  *      SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
00036  */
00037 
00038 #include "QCassandraPrivate.h"
00039 #include <protocol/TBinaryProtocol.h>
00040 #include <transport/TSocket.h>
00041 #include <transport/TTransportUtils.h>
00042 
00043 #include <QtCore/QDebug>
00044 
00045 namespace QtCassandra
00046 {
00047 
00048 
00117 QCassandraPrivate::QCassandraPrivate(QCassandra *parent)
00118     : f_parent(parent)
00119       //f_socket(NULL) -- auto-initialized
00120       //f_transport(NULL) -- auto-initialized
00121       //f_protocol(NULL) -- auto-initialized
00122       //f_client(NULL) -- auto-initialized
00123 {
00124 }
00125 
00142 QCassandraPrivate::~QCassandraPrivate()
00143 {
00144     disconnect();
00145 }
00146 
00173 bool QCassandraPrivate::connect(const QString& host, int port)
00174 {
00175     bool    worked = true;
00176 
00177     // disconnect any existing connection
00178     disconnect();
00179 
00180     try {
00181         // create a socket, transportation, protocol, and client
00182         // the client is what we use to communicate with the Cassandra server
00183         f_socket.reset(new apache::thrift::transport::TSocket(host.toUtf8().data(), port));
00184         f_transport.reset(new apache::thrift::transport::TFramedTransport(f_socket));
00185         f_protocol.reset(new apache::thrift::protocol::TBinaryProtocol(f_transport));
00186         f_client.reset(new org::apache::cassandra::CassandraClient(f_protocol));
00187 
00188         // once everything is connected as it should, open the transport link
00189         f_transport->open();
00190     }
00191     catch(...) {
00192         // TBD: should we check for some specific error here?
00193         worked = false;
00194     }
00195 
00196     // if it failed, make sure to clear all the pointers
00197     if(!worked) {
00198         disconnect();
00199     }
00200 
00201     return worked;
00202 }
00203 
00211 void QCassandraPrivate::disconnect()
00212 {
00213     if(f_client.get()) {
00214         // this is probably not necessary (it should anyway happen as required)
00215         f_transport->close();
00216     }
00217     f_client.reset();
00218     f_protocol.reset();
00219     f_transport.reset();
00220     f_socket.reset();
00221 }
00222 
00233 bool QCassandraPrivate::isConnected() const
00234 {
00235     return f_client.get() != NULL;
00236 }
00237 
00248 void QCassandraPrivate::mustBeConnected() const throw(std::runtime_error)
00249 {
00250     if(!isConnected()) {
00251         throw std::runtime_error("not connected to the Cassandra server.");
00252     }
00253 }
00254 
00262 QString QCassandraPrivate::clusterName() const
00263 {
00264     mustBeConnected();
00265     std::string cluster_name;
00266     f_client->describe_cluster_name(cluster_name);
00267     return cluster_name.c_str();
00268 }
00269 
00277 QString QCassandraPrivate::protocolVersion() const
00278 {
00279     mustBeConnected();
00280     std::string protocol_version;
00281     f_client->describe_version(protocol_version);
00282     return protocol_version.c_str();
00283 }
00284 
00293 void QCassandraPrivate::setContext(const QString& context_name)
00294 {
00295     mustBeConnected();
00296     f_client->set_keyspace(context_name.toUtf8().data());
00297 }
00298 
00313 void QCassandraPrivate::contexts() const
00314 {
00315     mustBeConnected();
00316 
00317     // retrieve the key spaces from Cassandra
00318     std::vector<org::apache::cassandra::KsDef> keyspaces;
00319     f_client->describe_keyspaces(keyspaces);
00320 
00321     for(std::vector<org::apache::cassandra::KsDef>::const_iterator
00322                     ks = keyspaces.begin(); ks != keyspaces.end(); ++ks) {
00323         QSharedPointer<QCassandraContext> c(f_parent->context(ks->name.c_str()));
00324         const org::apache::cassandra::KsDef& ks_def = *ks;
00325         c->parseContextDefinition(&ks_def);
00326     }
00327 }
00328 
00345 void QCassandraPrivate::createContext(const QCassandraContext& context)
00346 {
00347     mustBeConnected();
00348     org::apache::cassandra::KsDef ks;
00349     context.prepareContextDefinition(&ks);
00350     std::string id_ignore;
00351     f_client->system_add_keyspace(id_ignore, ks);
00352 }
00353 
00366 void QCassandraPrivate::updateContext(const QCassandraContext& context)
00367 {
00368     mustBeConnected();
00369     org::apache::cassandra::KsDef ks;
00370     context.prepareContextDefinition(&ks);
00371     std::string id_ignore;
00372     f_client->system_update_keyspace(id_ignore, ks);
00373 }
00374 
00384 void QCassandraPrivate::dropContext(const QCassandraContext& context)
00385 {
00386     mustBeConnected();
00387     f_parent->clearCurrentContextIf(context);
00388     std::string id_ignore;
00389     f_client->system_drop_keyspace(id_ignore, context.contextName().toUtf8().data());
00390 }
00391 
00400 void QCassandraPrivate::createTable(const QCassandraTable *table)
00401 {
00402     mustBeConnected();
00403     org::apache::cassandra::CfDef cf;
00404     table->prepareTableDefinition(&cf);
00405     std::string id_ignore;
00406     f_client->system_add_column_family(id_ignore, cf);
00407 }
00408 
00415 void QCassandraPrivate::updateTable(const QCassandraTable *table)
00416 {
00417     mustBeConnected();
00418     org::apache::cassandra::CfDef cf;
00419     table->prepareTableDefinition(&cf);
00420     std::string id_ignore;
00421     f_client->system_update_column_family(id_ignore, cf);
00422 }
00423 
00430 void QCassandraPrivate::dropTable(const QString& table_name)
00431 {
00432     mustBeConnected();
00433     std::string id_ignore;
00434     f_client->system_drop_column_family(id_ignore, table_name.toUtf8().data());
00435 }
00436 
00444 void QCassandraPrivate::truncateTable(const QCassandraTable *table)
00445 {
00446     mustBeConnected();
00447     f_client->truncate(table->tableName().toUtf8().data());
00448 }
00449 
00462 void QCassandraPrivate::insertValue(const QString& table_name, const QByteArray& row_key, const QByteArray& column_key, const QCassandraValue& value)
00463 {
00464     mustBeConnected();
00465 
00466     std::string key(row_key.data(), row_key.size());
00467 
00468     org::apache::cassandra::ColumnParent column_parent;
00469     column_parent.__set_column_family(table_name.toUtf8().data());
00470     // no super column support here
00471 
00472     org::apache::cassandra::Column column;
00473     std::string ck(column_key.data(), column_key.size());
00474     column.__set_name(ck);
00475 
00476     const QByteArray& data(value.binaryValue());
00477     column.__set_value(std::string(data.data(), data.size())); // "unavoidable" copy of the data
00478 
00479     QCassandraValue::timestamp_mode_t mode = value.timestampMode();
00480     switch(mode) {
00481     case QCassandraValue::TIMESTAMP_MODE_AUTO:
00482         column.__set_timestamp(QCassandra::timeofday());
00483         break;
00484 
00485     case QCassandraValue::TIMESTAMP_MODE_DEFINED:
00486         // user defined
00487         column.__set_timestamp(value.timestamp());
00488         break;
00489 
00490     }
00491 
00492     if(value.ttl() != QCassandraValue::TTL_PERMANENT) {
00493         column.__set_ttl(value.ttl());
00494     }
00495 
00496     // our consistency level is 100% based on the Thrift consistency level
00497     // a cast is enough to get the value we want to get
00498     // (see the QCassandraValue.cpp file)
00499 
00500     consistency_level_t consistency_level = value.consistencyLevel();
00501     if(consistency_level == CONSISTENCY_LEVEL_DEFAULT) {
00502         consistency_level = f_parent->defaultConsistencyLevel();
00503     }
00504 
00505     f_client->insert(key, column_parent, column, static_cast<org::apache::cassandra::ConsistencyLevel::type>(static_cast<cassandra_consistency_level_t>(consistency_level)));
00506 }
00507 
00520 void QCassandraPrivate::getValue(const QString& table_name, const QByteArray& row_key, const QByteArray& column_key, QCassandraValue& value)
00521 {
00522     mustBeConnected();
00523 
00524     std::string key(row_key.data(), row_key.size());
00525 
00526     org::apache::cassandra::ColumnPath column_path;
00527     column_path.__set_column_family(table_name.toUtf8().data());
00528     // no super column support here
00529     column_path.__set_column(column_key.data());
00530 
00531     org::apache::cassandra::ColumnOrSuperColumn column_result;
00532 
00533     // our consistency level is 100% based on the Thrift consistency level
00534     // a cast is enough to get the value we want to get
00535     // (see the QCassandraValue.cpp file)
00536 
00537     consistency_level_t consistency_level = value.consistencyLevel();
00538     if(consistency_level == CONSISTENCY_LEVEL_DEFAULT) {
00539         consistency_level = f_parent->defaultConsistencyLevel();
00540     }
00541 
00542     f_client->get(column_result, key, column_path, static_cast<org::apache::cassandra::ConsistencyLevel::type>(static_cast<cassandra_consistency_level_t>(consistency_level)));
00543 
00544     if(!column_result.__isset.column) {
00545         throw std::runtime_error("attempt to retrieve a cell failed");
00546     }
00547 
00548     // we got a column, copy the data to the value parameter
00549     if(column_result.column.__isset.value) {
00550         value.setBinaryValue(QByteArray(column_result.column.value.c_str(), column_result.column.value.length()));
00551     }
00552     else {
00553         // undefined we assume empty...
00554         value.setNullValue();
00555     }
00556     if(column_result.column.__isset.timestamp) {
00557         value.assignTimestamp(column_result.column.timestamp);
00558     }
00559     if(column_result.column.__isset.ttl) {
00560         value.setTtl(column_result.column.ttl);
00561     }
00562 }
00563 
00578 int32_t QCassandraPrivate::getCellCount(const QString& table_name, const QByteArray& row_key, const QCassandraColumnPredicate& column_predicate)
00579 {
00580     mustBeConnected();
00581 
00582     std::string key(row_key.data(), row_key.size());
00583 
00584     org::apache::cassandra::ColumnParent column_parent;
00585     column_parent.__set_column_family(table_name.toUtf8().data());
00586     // no super column support here
00587 
00588     org::apache::cassandra::SlicePredicate slice_predicate;
00589     column_predicate.toPredicate(&slice_predicate);
00590 
00591     // our consistency level is 100% based on the Thrift consistency level
00592     // a cast is enough to get the value we want to get
00593     // (see the QCassandraValue.cpp file)
00594 
00595     consistency_level_t consistency_level = column_predicate.consistencyLevel();
00596     if(consistency_level == CONSISTENCY_LEVEL_DEFAULT) {
00597         consistency_level = f_parent->defaultConsistencyLevel();
00598     }
00599 
00600     return f_client->get_count(key, column_parent, slice_predicate, static_cast<org::apache::cassandra::ConsistencyLevel::type>(static_cast<cassandra_consistency_level_t>(consistency_level)));
00601 }
00602 
00615 void QCassandraPrivate::getColumnSlice(QCassandraTable& table, const QByteArray& row_key, const QCassandraColumnPredicate& column_predicate)
00616 {
00617     mustBeConnected();
00618 
00619     std::string key(row_key.data(), row_key.size());
00620 
00621     org::apache::cassandra::ColumnParent column_parent;
00622     column_parent.__set_column_family(table.tableName().toUtf8().data());
00623     // no super column support here
00624 
00625     org::apache::cassandra::SlicePredicate slice_predicate;
00626     column_predicate.toPredicate(&slice_predicate);
00627 
00628     typedef std::vector<org::apache::cassandra::ColumnOrSuperColumn> column_vector_t;
00629     column_vector_t results;
00630 
00631     // our consistency level is 100% based on the Thrift consistency level
00632     // a cast is enough to get the value we want to get
00633     // (see the QCassandraValue.cpp file)
00634 
00635     consistency_level_t consistency_level = column_predicate.consistencyLevel();
00636     if(consistency_level == CONSISTENCY_LEVEL_DEFAULT) {
00637         consistency_level = f_parent->defaultConsistencyLevel();
00638     }
00639 
00640     f_client->get_slice(results, key, column_parent, slice_predicate, static_cast<org::apache::cassandra::ConsistencyLevel::type>(static_cast<cassandra_consistency_level_t>(consistency_level)));
00641 
00642     // we got results, copy the data to the table cache
00643     for(column_vector_t::iterator it = results.begin(); it != results.end(); ++it) {
00644         // transform the value of the cell to a QCassandraValue
00645         QCassandraValue value;
00646         if(it->column.__isset.value) {
00647             value.setBinaryValue(QByteArray(it->column.value.c_str(), it->column.value.length()));
00648         }
00649         if(it->column.__isset.timestamp) {
00650             value.assignTimestamp(it->column.timestamp);
00651         }
00652         if(it->column.__isset.ttl) {
00653             value.setTtl(it->column.ttl);
00654         }
00655 
00656         // save the cell in the corresponding table, row, cell
00657         table.assignRow(row_key, it->column.name.c_str(), value);
00658     }
00659 }
00660 
00672 void QCassandraPrivate::remove(const QString& table_name, const QByteArray& row_key, const QByteArray& column_key, int64_t timestamp, consistency_level_t consistency_level)
00673 {
00674     mustBeConnected();
00675 
00676     std::string key(row_key.data(), row_key.size());
00677 
00678     org::apache::cassandra::ColumnPath column_path;
00679     column_path.__set_column_family(table_name.toUtf8().data());
00680     // no super column support here
00681     if(column_key.size() > 0) {
00682         // when column_key is empty we want to remove all the columns!
00683         std::string ckey(column_key.data(), column_key.size());
00684         column_path.__set_column(ckey);
00685     }
00686 
00687     if(consistency_level == CONSISTENCY_LEVEL_DEFAULT) {
00688         consistency_level = f_parent->defaultConsistencyLevel();
00689     }
00690 
00691     f_client->remove(key, column_path, timestamp, static_cast<org::apache::cassandra::ConsistencyLevel::type>(static_cast<cassandra_consistency_level_t>(consistency_level)));
00692 }
00693 
00704 uint32_t QCassandraPrivate::getRowSlices(QCassandraTable& table, const QCassandraRowPredicate& row_predicate)
00705 {
00706     mustBeConnected();
00707 
00708     org::apache::cassandra::ColumnParent column_parent;
00709     column_parent.__set_column_family(table.tableName().toUtf8().data());
00710     // no super column support here
00711 
00712     QSharedPointer<QCassandraColumnPredicate> column_predicate(row_predicate.columnPredicate());
00713     org::apache::cassandra::SlicePredicate slice_predicate;
00714     column_predicate->toPredicate(&slice_predicate);
00715 
00716     org::apache::cassandra::KeyRange key_range;
00717     row_predicate.toPredicate(&key_range);
00718 
00719     typedef std::vector<org::apache::cassandra::KeySlice> key_slice_vector_t;
00720     key_slice_vector_t results;
00721 
00722     // our consistency level is 100% based on the Thrift consistency level
00723     // a cast is enough to get the value we want to get
00724     // (see the QCassandraValue.cpp file)
00725 
00726     consistency_level_t consistency_level = column_predicate->consistencyLevel();
00727     if(consistency_level == CONSISTENCY_LEVEL_DEFAULT) {
00728         consistency_level = f_parent->defaultConsistencyLevel();
00729     }
00730 
00731     f_client->get_range_slices(results, column_parent, slice_predicate, key_range, static_cast<org::apache::cassandra::ConsistencyLevel::type>(static_cast<cassandra_consistency_level_t>(consistency_level)));
00732 
00733     // we got results, copy the data to the table cache
00734     for(key_slice_vector_t::iterator it = results.begin(); it != results.end(); ++it) {
00735         QByteArray row_key(it->key.c_str(), it->key.size());
00736         typedef std::vector<org::apache::cassandra::ColumnOrSuperColumn> column_vector_t;
00737         for(column_vector_t::iterator jt = it->columns.begin(); jt != it->columns.end(); ++jt) {
00738             // transform the value of the cell to a QCassandraValue
00739             QCassandraValue value;
00740             if(jt->column.__isset.value) {
00741                 value.setBinaryValue(QByteArray(jt->column.value.c_str(), jt->column.value.length()));
00742             }
00743             if(jt->column.__isset.timestamp) {
00744                 value.assignTimestamp(jt->column.timestamp);
00745             }
00746             if(jt->column.__isset.ttl) {
00747                 value.setTtl(jt->column.ttl);
00748             }
00749 
00750             // save the cell in the corresponding table, row, cell
00751             table.assignRow(row_key, jt->column.name.c_str(), value);
00752         }
00753     }
00754 
00755     return results.size();
00756 }
00757 
00758 } // namespace QtCassandra
00759 // vim: ts=4 sw=4 et
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

This document is part of the libQtCassandra Project.

Copyright by Made to Order Software Corp.