linux平台下mongodb c++连接池封装,线程安全
//函数返回0:成功 >0 出错
class cmongo{
public:
//默认构造函数,默认连接数为1
cmongo();
//传入连接数到构造函数,默认连接数为size
cmongo(int size);
//析构函数
~cmongo();
public:
//设置tcp读写超时时间
int set_wr_timeout(double t);
//连接
int conn(string mhost="127.0.0.1",int mport=27017);
//设置db collection
int setdb(string mdb,string mcollection);
int setindex(string key);
//查询
int get(map<string,string>& out,vector<string> in,string key,string key_val);
//投递一批要查询的字段,fields为要查询哪些字段
int gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key);
//dump key-value dumpkey对应一个value
int dumpkey(map< string,string >& rout,string key,string val);
//dump key->map<key,value> dumpkey对应一组value
int dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key);
//写入
int set(map<string,string> in,string key,string key_val);
//批量写入
//更新接口,批量更新key="id"
// "123456":<key,value>,<key,value>
// "123457":<key,value>,<key,value>
int sets(map< string,map<string,string> > in,string key);
//删除
int remove(string key,string key_val);
private:
string doc;
//tcp读写超时时间
double wr_timeout;
pthread_mutex_t _jobmux;
sem_t _jobsem;
map<DBClientConnection*,bool> _joblst;
pthread_mutex_t _dbmux;
};
cmongo::cmongo(int size){
//doc
doc=string(DB_DB)+"."+string(DB_COLLECTION);
wr_timeout=3;
//最大连接0-200
if(size<0){
size=1;
}
if(size>200){
size=200;
}
if(_joblst.size()>0){
return;
}
bool auto_conn=true;
pthread_mutex_init(&_jobmux,NULL);
if((sem_init(&_jobsem,0,0))<0){
return;
}
pthread_mutex_lock(&_jobmux);
for(int i=0;i<size;++i){
DBClientConnection* pconn = new DBClientConnection(auto_conn,0,wr_timeout);
if(pconn != NULL){
_joblst[pconn]=false;
}
}
pthread_mutex_unlock(&_jobmux);
}
cmongo::~cmongo(){
doc="";
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it != _joblst.end()){
delete it->first;
it++;
}
pthread_mutex_unlock(&_jobmux);
}
int cmongo::set_wr_timeout(double t){
wr_timeout=t;
return RET_OK;
}
int cmongo::conn(string mhost,int mport){
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
string errmsg="";
HostAndPort hp(mhost,mport);
if(!(it->first->connect(hp,errmsg))){
cerr<<"connect mhost:"<<mhost<<" mport:"<<mport<<" msg:"<<errmsg<<endl;
it->second=true;
}
sem_post(&_jobsem);
it++;
}
pthread_mutex_unlock(&_jobmux);
return RET_OK;
}
int cmongo::setdb(string mdb,string mcollection){
if(mdb.empty() || mcollection.empty()){
return RET_PARERR;
}
doc=mdb+"."+mcollection;
return RET_OK;
}
int cmongo::setindex(string key){
if(key.empty()){
return RET_PARERR;
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
if(it->second == false){
it->second=true;
break;
}
it++;
}
pthread_mutex_unlock(&_jobmux);
string bindex="{"+key+":1}";
it->first->ensureIndex(doc,fromjson(bindex));
pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);
return RET_OK;
}
//out为检索出来的key-value数据对应,in 为要检索的字段,key,key_value为要检索的条件,暂不支持多条件检索
//单列查询
int cmongo::get(map<string,string>& out,vector<string> in,string key,string key_val){
//key key_val 要检索字段
if(key.empty() || key_val.empty() || in.size()<=0){
return RET_PARERR;
}
BSONObjBuilder b;
for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){
b.append(*iter,1);
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
if(it->second == false){
it->second=true;
break;
}
it++;
}
pthread_mutex_unlock(&_jobmux);
BSONObj ob=b.obj();
BSONObj p=it->first->findOne(doc,QUERY(key<<key_val),&ob);
map<string,string> temp;
for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){
string mkey=*iter;
temp[*iter]=p.getStringField(mkey.c_str());
}
out=temp;
pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);
return RET_OK;
}
//查询key为key的一批数据的 某些字段
//fields为要查询的字段集
//key="id" 值为in 一批key
//返回key->map<key,value>
int cmongo::gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key){
if(key.empty()){
return RET_PARERR;
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
if(it->second == false){
it->second=true;
break;
}
it++;
}
pthread_mutex_unlock(&_jobmux);
BSONObjBuilder b;
b.append(key,1);
for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){
b.append(*iter,1);
}
BSONObj p=b.obj();
for(vector<string>::iterator iter2=in.begin();iter2!=in.end();++iter2){
BSONObj ob=it->first->findOne(doc,QUERY(key<<*iter2),&p);
map<string,string> temp;
for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){
string mkey=*iter;
temp[*iter]=ob.getStringField(mkey.c_str());
}
rout[ob.getStringField(key.c_str())]=temp;
}
pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);
return RET_OK;
}
//dumpkey key-value 返回 key对应的val值
//key val
int cmongo::dumpkey(map< string,string >& rout,string key,string val){
if(key.empty()){
return RET_PARERR;
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
if(it->second == false){
it->second=true;
break;
}
it++;
}
pthread_mutex_unlock(&_jobmux);
BSONObjBuilder b;
b.append(key,1);
if(!val.empty()){
b.append(val,1);
}
BSONObj p=b.obj();
pthread_mutex_lock(&_dbmux);
auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p);
while(cursor->more()){
BSONObj ob=cursor->next();
rout[ob.getStringField(key.c_str())]=ob.getStringField(val.c_str());
}
pthread_mutex_unlock(&_dbmux);
pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);
return RET_OK;
}
//dumpkey key对应多个value
//key->map<key,value>.
//其实dumpvals接口完全可以包含dumpkey,为了方便运用独立出来
//out 返回的key 对应的map<key,value>
//in 每个key需要对应的返回哪些字段
//key="id"
int cmongo::dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key){
if(key.empty()){
return RET_PARERR;
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
if(it->second == false){
it->second=true;
break;
}
it++;
}
pthread_mutex_unlock(&_jobmux);
BSONObjBuilder b;
b.append(key,1);
for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){
b.append(*iter,1);
}
BSONObj p=b.obj();
pthread_mutex_lock(&_dbmux);
auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p);
while(cursor->more()){
BSONObj ob=cursor->next();
map<string,string> temp;
for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){
string val=*iter;
temp[val]=ob.getStringField(val.c_str());
}
rout[ob.getStringField(key.c_str())]=temp;
temp.clear();
}
pthread_mutex_unlock(&_dbmux);
pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);
return RET_OK;
}
//更新接口,暂不支持key对应多条记录的更新
int cmongo::set(map<string,string> in,string key,string key_val){
//如果map没有数据,返回参数错误
if(in.size()<=0 || key.empty() || key_val.empty()){
return RET_PARERR;
}
BSONObjBuilder b;
map<string,string>::iterator iter;
for(iter=in.begin();iter!=in.end();++iter){
b.append(iter->first,iter->second);
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
if(it->second == false){
it->second=true;
break;
}
it++;
}
pthread_mutex_unlock(&_jobmux);
BSONObj ob=b.obj();
it->first->update(doc,QUERY(key<<key_val),BSON("$set"<<ob),true);
int ret=RET_OK;
string errmsg=it->first->getLastError();
if(!errmsg.empty()){
ret=RET_ERR;
}
pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);
return ret;
}
//更新接口,批量更新key="id"
// "123456":<key,value>,<key,value>
// "123457":<key,value>,<key,value>
int cmongo::sets(map< string,map<string,string> > in,string key){
//如果map没有数据,返回参数错误
if(in.size()<=0 || key.empty() ){
return RET_PARERR;
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
if(it->second == false){
it->second=true;
break;
}
it++;
}
pthread_mutex_unlock(&_jobmux);
int ret=RET_OK;
map< string,map<string,string> >::iterator iter;
for(iter=in.begin();iter!=in.end();++iter){
BSONObjBuilder b;
for(map<string,string>::iterator iter2=iter->second.begin();iter2!=iter->second.end();++iter2){
b.append(iter2->first,iter2->second);
}
BSONObj ob=b.obj();
it->first->update(doc,QUERY(key<<iter->first),BSON("$set"<<ob),true);
string errmsg=it->first->getLastError();
if(!errmsg.empty()){
ret=RET_ERR;
}
}
pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);
return ret;
}
//删除接口,删除记录 key=id key_val=587.即删除id="587"的记录
int cmongo::remove(string key,string key_val){
if(key.empty() || key_val.empty()){
return RET_PARERR;
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
if(it->second == false){
it->second=true;
break;
}
it++;
}
pthread_mutex_unlock(&_jobmux);
it->first->remove(doc,BSON(key << key_val));
pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);
return RET_OK;
}
