kavin

大数据架构-使用HBase和Solr配置存储与索引

kavin 运维技术 2022-11-19 615浏览 0

HBase和Solr可以通过协处理器Coprocessor的方式向Solr发出请求,Solr对于接收到的数据可以做相关的同步:增、删、改索引的操作。将存储与索引放在不同的机器上,这是大数据架构的必须品,但目前还有很多不懂得此道的同学,他们对于这种思想感到很新奇,不过,这绝对是好的方向,所以不懂得抓紧学习吧。

有个朋友给我的那篇博客留言,说CDH也可以做这样的事情,我还没有试过,他还问我要与此相关的代码,于是我就稍微整理了一下,作为本篇文章的主要内容。关于CDH的事,我会尽快尝试,有知道的同学可以给我留言。

下面我主要讲述一下,我测试对HBase和Solr的性能时,使用HBase协处理器向HBase添加数据所编写的相关代码,及解释说明。

一、编写HBase协处理器Coprocessor

​一旦有数据postPut,就立即对Solr里相应的Core更新。这里使用了ConcurrentUpdateSolrServer,它是Solr速率性能的保证,使用它不要忘记在Solr里面配置autoCommit哟。

/*

*版权:王安琪

*描述:监视HBase,一有数据postPut就向Solr发送,本类要作为触发器添加到HBase

*修改时间:2014-05-27

*修改内容:新增

*/

packagesolrHbase.test;



importjava.io.UnsupportedEncodingException;



import***;



publicclassSorlIndexCoprocessorObserverextendsBaseRegionObserver{



privatestaticfinalLoggerLOG=LoggerFactory

.getLogger(SorlIndexCoprocessorObserver.class);

privatestaticfinalStringsolrUrl="http://192.1.11.108:80/solr/core1";

privatestaticfinalSolrServersolrServer=newConcurrentUpdateSolrServer(

solrUrl,10000,20);



/**

*建立solr索引

*

*@throwsUnsupportedEncodingException

*/

@Override

publicvoidpostPut(finalObserverContext<RegionCoprocessorEnvironment>e,

finalPutput,finalWALEditedit,finalbooleanwriteToWAL)

throwsUnsupportedEncodingException{

inputSolr(put);

}



publicvoidinputSolr(Putput){

try{

solrServer.add(TestSolrMain.getInputDoc(put));

}catch(Exceptionex){

LOG.error(ex.getMessage());

}

}

}

注意:getInputDoc是这个HBase协处理器Coprocessor的精髓所在,它可以把HBase内的Put里的内容转化成Solr需要的值。其中StringfieldName=key.substring(key.indexOf(columnFamily)+3,key.indexOf("我在这")).trim();这里有一个乱码字符,在这里看不到,请大家注意一下。

publicstaticSolrInputDocumentgetInputDoc(Putput){

SolrInputDocumentdoc=newSolrInputDocument();

doc.addField("test_ID",Bytes.toString(put.getRow()));

for(KeyValuec:put.getFamilyMap().get(Bytes.toBytes(columnFamily))){

Stringkey=Bytes.toString(c.getKey());

Stringvalue=Bytes.toString(c.getValue());

if(value.isEmpty()){

continue;

}

StringfieldName=key.substring(key.indexOf(columnFamily)+3,

key.indexOf("")).trim();

doc.addField(fieldName,value);

}

returndoc;

}

二、编写测试程序入口代码main

​这段代码向HBase请求建了一张表,并将模拟的数据,向HBase连续地提交数据内容,在HBase中不断地插入数据,同时记录时间,测试插入性能。

/*

*版权:王安琪

*描述:测试HBaseInsert,HBase插入性能

*修改时间:2014-05-27

*修改内容:新增

*/

packagesolrHbase.test;



importhbaseInput.HbaseInsert;



import***;



publicclassTestHBaseMain{



privatestaticConfigurationconfig;

privatestaticStringtableName="angelHbase";

privatestaticHTabletable=null;

privatestaticfinalStringcolumnFamily="wanganqi";



/**

*@paramargs

*/

publicstaticvoidmain(String[]args){

config=HBaseConfiguration.create();

config.set("hbase.zookeeper.quorum","192.103.101.104");

HbaseInsert.createTable(config,tableName,columnFamily);

try{

table=newHTable(config,Bytes.toBytes(tableName));

for(intk=0;k<1;k++){

Threadt=newThread(){

publicvoidrun(){

for(inti=0;i<100000;i++){

HbaseInsert.inputData(table,

PutCreater.createPuts(1000,columnFamily));

Calendarc=Calendar.getInstance();

StringdateTime=c.get(Calendar.YEAR)+"-"

+c.get(Calendar.MONTH)+"-"

+c.get(Calendar.DATE)+"T"

+c.get(Calendar.HOUR)+":"

+c.get(Calendar.MINUTE)+":"

+c.get(Calendar.SECOND)+":"

+c.get(Calendar.MILLISECOND)+"Z写入:"

+i*1000;

System.out.println(dateTime);

}

}

};

t.start();

}

}catch(IOExceptione1){

e1.printStackTrace();

}

}



}

​下面的是与HBase相关的操作,把它封装到一个类中,这里就只有建表与插入数据的相关代码。

/*

*版权:王安琪

*描述:与HBase相关操作,建表与插入数据

*修改时间:2014-05-27

*修改内容:新增

*/

packagehbaseInput;

import***;

importorg.apache.hadoop.hbase.client.Put;



publicclassHbaseInsert{



publicstaticvoidcreateTable(Configurationconfig,StringtableName,

StringcolumnFamily){

HBaseAdminhBaseAdmin;

try{

hBaseAdmin=newHBaseAdmin(config);

if(hBaseAdmin.tableExists(tableName)){

return;

}

HTableDescriptortableDescriptor=newHTableDescriptor(tableName);

tableDescriptor.addFamily(newHColumnDescriptor(columnFamily));

hBaseAdmin.createTable(tableDescriptor);

hBaseAdmin.close();

}catch(MasterNotRunningExceptione){

e.printStackTrace();

}catch(ZooKeeperConnectionExceptione){

e.printStackTrace();

}catch(IOExceptione){

e.printStackTrace();

}

}



publicstaticvoidinputData(HTabletable,ArrayList<Put>puts){

try{

table.put(puts);

table.flushCommits();

puts.clear();

}catch(IOExceptione){

e.printStackTrace();

}

}

}

三、编写模拟数据Put

向HBase中写入数据需要构造Put,下面是我构造模拟数据Put的方式,有字符串的生成,我是由mmseg提供的词典words.dic中随机读取一些词语连接起来,生成一句字符串的,下面的代码没有体现,不过很easy,你自己造你自己想要的数据就OK了。

publicstaticPutcreatePut(StringcolumnFamily){

Stringss=getSentence();

byte[]family=Bytes.toBytes(columnFamily);

byte[]rowKey=Bytes.toBytes(""+Math.abs(r.nextLong()));

Putput=newPut(rowKey);

put.add(family,Bytes.toBytes("DeviceID"),

Bytes.toBytes(""+Math.abs(r.nextInt())));

******

put.add(family,Bytes.toBytes("Company_mmsegsm"),Bytes.toBytes("ss"));



returnput;

}

当然在运行上面这个程序之前,需要先在Solr里面配置好你需要的列信息,HBase、Solr安装与配置,它们的基础使用方法将会在之后的文章中介绍。在这里,Solr的列配置就跟你使用createPut生成的Put搞成一样的列名就行了,当然也可以使用动态列的形式。

四、直接对Solr性能测试

如果你不想对HBase与Solr的相结合进行测试,只想单独对Solr的性能进行测试,这就更简单了,完全可以利用上面的代码段来测试,稍微组装一下就可以了。

privatestaticvoidsendConcurrentUpdateSolrServer(finalStringurl,

finalintcount)throwsSolrServerException,IOException{

SolrServersolrServer=newConcurrentUpdateSolrServer(url,10000,20);
for(inti=0;i<count;i++){solrServer.add(getInputDoc(PutCreater.createPut(columnFamily)));
}
}

希望可以帮助到你规格严格-功夫到家。这次的文章代码又偏多了点,但代码是解释思想的***的语言,我的提倡就是尽可能的减少代码的注释,尽力简化你的代码,使你的代码足够的清晰易懂,甚至于相似于伪代码了,这也是《重构》这本书里所提倡的。

继续浏览有关 数据库 的文章
发表评论