博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce
阅读量:2491 次
发布时间:2019-05-11

本文共 4013 字,大约阅读时间需要 13 分钟。

MapReduce

简介

MapReduce是聚合工具中的明星。Countdistinctgroup能做的上述事情Mapreduce都能做。他是一个可以轻松并行化到多个服务器的聚合方法。它会拆分问题,再将各个部分发送到不同的机器上,让每台机器都完成一部分。当所有机器都完成的时候,再把结果汇集起来形成最终完整的结果。

MapReduce需要几个步骤。最开始是映射(map),将操作映射到集合中的每个文档。这个操作要么“无作为”,要么“产生一些键和X个值”。然后就是中间环节,称作洗牌(shuffle),按照键分组,并将产生的键值组成列表放到对应的键中。化简(reduce)则把列表中的值化简成一个单值。这个值被返回,然后接着进行洗牌,直到每个键的列表只有一个值为止,这个值也就是最终结果。

使用mapreduce的代价就是速度:group不是很快,mapreduce更慢,绝不要用在“实时”环境中。要作为后台任务来运行MapReduce,将创建一个保存结果的集合,可以对这个集合进行实时查询。

实例:

对消息体统的一个统计:

 public List<MsgAnalysisByDate> mapreduceAndroidPushedByDate(Date timeBegin, Date timeEnd) {

        log.info("--------------安卓推送统计开始mapreduceAndroidPushedByDate-----begin");

        MsgMapReduceBo msgMapReduceBo = new MsgMapReduceBo(timeBegin, timeEnd, bcandroidpushrecordMongoDBClient, MsgFlagTypeEnum.PUSHED.getCode(), msgIdName, titleName);

        return doMapReduce(msgMapReduceBo);

    }

    private List<MsgAnalysisByDate> doMapReduce(MsgMapReduceBo msgMapReduceBo) {

        int type = AppTypeEnum.JD.getCode();     //掌上京东

        String outputTarget = livenessMapreduce.buildOutputTarget("tmp_liveness_d_" + type);

        List<MsgAnalysisByDate> result = new ArrayList<MsgAnalysisByDate>(200);

        String map = buildMapForBC(msgMapReduceBo.getMsgIdName(), msgMapReduceBo.getTitleName());

        String reduce = buildReduceForBC(msgMapReduceBo.getMsgIdName());

        QueryBuilder query1 = new QueryBuilder();

        query1.and("created").greaterThanEquals(msgMapReduceBo.getTimeBegin()).lessThan(msgMapReduceBo.getTimeEnd());//统计安装量,用created

        DBCollection collection = msgMapReduceBo.getMongoDBClient().mapReduce(map, reduce, outputTarget, query1).getOutputCollection();

        log.info("domapReduce结束:客户端:"+msgMapReduceBo.getMongoDBClient());

        DBCursor cursor = collection.find();

        while (cursor.hasNext()) {

            DBObject item = cursor.next();

            //如果主键为空,忽略记录

            DBObject value = (DBObject) item.get("value");

            Double count = (Double) value.get("count") == null ? 0 : (Double) value.get("count");//总量

            String title = (String) value.get("title") == null ? "" : (String) value.get("title");

            String msgId = (String) value.get(msgMapReduceBo.getMsgIdName());

            String platformReal = (String) value.get("platform") == null ? "" : (String) value.get("platform");

            if (value.toMap().size() == 2) {

                //如果返回的value尺寸为2,说明只有一条记录,因为只有一条记录时没有进行reduce,结果集是map的,

                String _id = (String) item.get("_id");

                int index = _id.indexOf("-");

                msgId = _id.substring(0, index);

                int idex2 = msgId.indexOf("_");

                msgId = msgId.substring(idex2 + 1, msgId.length());

                platformReal = _id.substring(index + 1, _id.length());

            }

            MsgAnalysisByDate tmp = new MsgAnalysisByDate();

            tmp.setMsgid(msgId);

            tmp.setTile(title);

            tmp.setType(type);

            tmp.setFlag(msgMapReduceBo.getFlag());

            if (PlatformEnum.getByValue(platformReal) != null) {

                tmp.setPlatform(PlatformEnum.getByValue(platformReal).getKey());

            } else {

                log.error("msgId=" + msgId + "platformReal=" + platformReal);

            }

            if (msgMapReduceBo.isRead()) {

                tmp.setReadAmount(count.intValue());

            } else {

                tmp.setAmount(count.intValue());

            }

            result.add(tmp);

        }

        collection.drop();

        return result;

    }

    /**

     * 注意这里的key,用横杠分割,用于处理多个goupby 字段

     *

     * @return

     */

    private String buildMapForBC(String id, String title) {

        return "function()" +

                "{" +

                "var key1=this." + id + ";" +

                "var prefix=\'-\';" +

                "var key2=this.platform;" +

                "emit(key1+prefix+key2,{title:this." + title + ",count:1});" +

                "};";

    }

    /**

     * 注意冗余title

     *

     * @return

     */

    public String buildReduceForBC(String id) {

        return "function(key,values)" +

                "{" +

                "var total = 0;" +

                "var title=values[0].title;" +

                " var locate = key.indexOf(\'-\');" +

                " var platform = key.indexOf(\'-\');" +

                "var platform=values[0].title;" +

                "for(var i=0;i<values.length;i++){" +

                "total += values[i].count;" +

                "}" +

                "return {" + id + ":key.substr(0,locate) ,title: title, platform:key.substr(locate+1,key.length), count : total};" +

                "};";

    }

说明:

1. Map阶段:{

key1+prefix+key2}作为key{title:this." + title + ",count:1}作为value,遍历每一个doc,生成一堆上述(keyvalue)对的文件

2. Shuffle阶段:把上述的doc文件。Value合并,变成(keyvalues{value1,value2...valuen};

3. Reduce阶段:把上述的结果文件进行reduce处理,具体逻辑就是reduce函数,本例中就是遍历values,取count,汇总总数,值得注意的是这里用了一个技巧:key用了多个字段拼接,用于返回多个值。

转载地址:http://lzrrb.baihongyu.com/

你可能感兴趣的文章
关于yum Error: Cannot retrieve repository metadata (repomd.xml) for repository:xxxxxx.
查看>>
linux下载github中的文件
查看>>
HDP Sandbox里面git clone不了数据(HTTP request failed)【目前还没解决,所以hive的练习先暂时搁置了】
查看>>
动态分区最佳实践(一定要注意实践场景)
查看>>
HIVE—索引、分区和分桶的区别
查看>>
Hive进阶总结(听课总结)
查看>>
大数据领域两大最主流集群管理工具Ambari和Cloudera Manger
查看>>
Sqoop往Hive导入数据实战
查看>>
Mysql到HBase的迁移
查看>>
Sqoop import进阶
查看>>
Hive语句是如何转化成MapReduce任务的
查看>>
Hive创建table报错:Permission denied: user=lenovo, access=WRITE, inode="":suh:supergroup:rwxr-xr-x
查看>>
Hive执行job时return code 2排查
查看>>
hive常用函数及数据结构介绍
查看>>
Hive面试题干货(亲自跟着做了好几遍,会了的话对面试大有好处)
查看>>
力扣题解-230. 二叉搜索树中第K小的元素(递归方法,中序遍历解决)
查看>>
力扣题解-123. 买卖股票的最佳时机 III(动态规划)
查看>>
Django 源码阅读:服务启动(wsgi)
查看>>
Django 源码阅读:url解析
查看>>
Docker面试题(一)
查看>>