`
MauerSu
  • 浏览: 494619 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

dubbo 运行机制

 
阅读更多
源:http://san-yun.iteye.com/blog/1897250
评:写的 调用链很清晰


dubbo是阿里巴巴开源的单一长连接服务框架,底层通信采用nio框架,支持netty,mina,grizzly,默认是netty。对dubbo比较感兴趣的是:
1. client端的线程模型是什么样的?
传统的io client是请求应答模式,发送请求-->等待远程应答。dubbo底层是异步IO的,所有请求复用单一长连接,所以调用都不会阻在IO上,而是阻在Future超时wait上。
2. server端的线程模型是什么样的?
这个比较成熟了,现在一般的server都是基于nio,一批io thread负责处理io,一批worker thread负责处理业务。



一. 快速启动

学习dubbo最好的方式是快速运行起来,由于dubbo还是比较重量级的产品,之前遇到一些问题。
server端:


Java代码  收藏代码

    import java.io.IOException; 
     
    import com.alibaba.dubbo.config.ApplicationConfig; 
    import com.alibaba.dubbo.config.ProtocolConfig; 
    import com.alibaba.dubbo.config.ServiceConfig; 
    import com.duitang.dboss.client.test.BlogQueryService; 
    import com.duitang.dboss.client.test.BlogQueryServiceImpl; 
     
    public class DubboServerTester { 
     
        public static void main(String[] args) throws IOException { 
            BlogQueryService blogQueryService = new BlogQueryServiceImpl(); 
            ApplicationConfig application = new ApplicationConfig(); 
            application.setName("dubbo-test"); 
     
            ProtocolConfig protocol = new ProtocolConfig(); 
            protocol.setName("dubbo"); 
            protocol.setPort(8989); 
            protocol.setThreads(200); 
     
            // RegistryConfig registry = new RegistryConfig(); 
            // registry.setAddress("10.20.130.230:9090"); 
            // registry.setUsername("aaa"); 
            // registry.setPassword("bbb"); 
     
            ServiceConfig<BlogQueryService> service = new ServiceConfig<BlogQueryService>(); // 此实例很重,封装了与注册中心的连接,请自行缓存,否则可能造成内存和连接泄漏 
            service.setApplication(application); 
     
            // service.setRegistry(registry); 
            service.setRegister(false); 
            service.setProtocol(protocol); // 多个协议可以用setProtocols() 
            service.setInterface(BlogQueryService.class); 
            service.setRef(blogQueryService); 
            service.setVersion("1.0.0"); 
            // 暴露及注册服务 
            service.export(); 
             
            System.out.println("Press any key to exit."); 
            System.in.read(); 
        } 
    } 

注意:dubbo export服务默认依赖于RegistryConfig,如果没有配置RegistryConfig会报错.可以通过service.setRegister(false)禁用。



client:


Java代码  收藏代码

    import java.io.IOException; 
    import java.util.ArrayList; 
    import java.util.List; 
    import java.util.concurrent.Callable; 
    import java.util.concurrent.ExecutionException; 
    import java.util.concurrent.ExecutorService; 
    import java.util.concurrent.Executors; 
    import java.util.concurrent.Future; 
    import java.util.concurrent.ThreadFactory; 
    import java.util.concurrent.atomic.AtomicInteger; 
     
    import com.alibaba.dubbo.config.ApplicationConfig; 
    import com.alibaba.dubbo.config.ReferenceConfig; 
    import com.duitang.dboss.client.test.BlogQueryService; 
     
    public class DubboClientTester { 
     
        public static void main(String[] args) throws InterruptedException, IOException { 
            ApplicationConfig application = new ApplicationConfig(); 
            application.setName("dubbo-test"); 
     
            ReferenceConfig<BlogQueryService> reference = new ReferenceConfig<BlogQueryService>(); 
            reference.setUrl("dubbo://127.0.0.1:8989/com.duitang.dboss.client.test.BlogQueryService"); 
            reference.setTimeout(500); 
            reference.setConnections(10); 
            reference.setApplication(application); 
            reference.setInterface(BlogQueryService.class); 
            reference.setVersion("1.0.0"); 
            final BlogQueryService blogQueryService = reference.get(); 
     
            long begin = System.currentTimeMillis(); 
            System.out.println(blogQueryService.test()); 
            long end = System.currentTimeMillis(); 
            System.out.println(" cost:" + (end - begin)); 
     
            ExecutorService es = Executors.newFixedThreadPool(50, new NamedThreadFactory("my test")); 
            List<Callable<String>> tasks = new ArrayList<Callable<String>>(); 
            for (int i = 0; i < 100000; ++i) { 
                tasks.add(new Callable<String>() { 
     
                    @Override 
                    public String call() throws Exception { 
                        System.out.println("run"); 
                        System.out.println(blogQueryService.test()); 
                        System.out.println("run success"); 
                        return null; 
                    } 
                }); 
            } 
            List<Future<String>> futurelist = es.invokeAll(tasks); 
            for (Future<String> future : futurelist) { 
                try { 
                    String result = future.get(); 
                } catch (ExecutionException e) { 
                    e.printStackTrace(); 
                } 
                System.out.println("------------------------------------------------------------------------------------------------------------------------------------------------\r\n"); 
            } 
            es.shutdown(); 
            System.out.println("end"); 
            System.in.read(); 
        } 
     
        static class NamedThreadFactory implements ThreadFactory { 
     
            private static final AtomicInteger POOL_SEQ   = new AtomicInteger(1); 
     
            private final AtomicInteger        mThreadNum = new AtomicInteger(1); 
     
            private final String               mPrefix; 
     
            private final boolean              mDaemo; 
     
            private final ThreadGroup          mGroup; 
     
            public NamedThreadFactory(){ 
                this("pool-" + POOL_SEQ.getAndIncrement(), false); 
            } 
     
            public NamedThreadFactory(String prefix){ 
                this(prefix, false); 
            } 
     
            public NamedThreadFactory(String prefix, boolean daemo){ 
                mPrefix = prefix + "-thread-"; 
                mDaemo = daemo; 
                SecurityManager s = System.getSecurityManager(); 
                mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup(); 
            } 
     
            public Thread newThread(Runnable runnable) { 
                String name = mPrefix + mThreadNum.getAndIncrement(); 
                Thread ret = new Thread(mGroup, runnable, name, 0); 
                ret.setDaemon(mDaemo); 
                return ret; 
            } 
     
            public ThreadGroup getThreadGroup() { 
                return mGroup; 
            } 
     
        } 
    } 





1. 通过setUrl("")来实现远程服务直连。
2. 需要注意的是默认connection只有一个,可以通过setConnections()来指定connection pool。在高负载环境下,nio的单连接也会遇到瓶颈,此时你可以通过设置连接池来让更多的连接分担dubbo的请求负载,从而提高系统的吞吐量。”

二. 代码流程
这里重点分析一下client的调用过程,client调用分为三个部分:
1). 初始化,建立连接。
2). 发送请求。
3). 等待远程应答。

(一).初始化
1. DubboProtocol.initClient()
2. Exchangers.connect(URL url, ExchangeHandler handler)  
3. Exchangers.getExchanger(url).connect(url, handler)
4. HeaderExchanger.connect(URL url, ExchangeHandler handler)
5. return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
6. Transporters.getTransporter().connect(URL url, ChannelHandler handler)
7. NettyTransporter.connect(URL url, ChannelHandler listener)
8. new NettyClient(url, listener) //timeout默认值:timeout=1000;connectTimeout=3000;
9. NettyClient.doOpen()        //创建netty的ClientBootstrap
bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout()); //注意:此timeout是timeout,而非connectTimeout
10. AbstractClient.connect()
11. NettyClient.doConnect()  //如果远程地址无法连接,抛出timeout异常流程结束。
ChannelFuture future = bootstrap.connect(getConnectAddress());
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);

(二).发送请求
1.DubboInvoker.doInvoke(Invocation invocation) //currentClient.request(invocation, timeout).get()
2.HeaderExchangeClient.request(invocation, timeout)
3.HeaderExchangeChannel.request(Invocation invocation,timeout)
4.AbstractPeer.send(Request request)
5.NettyChannel.send(Object message, boolean sent)
6.NioClientSocketChannel.write(message)
7.NettyHandler.writeRequested(ChannelHandlerContext ctx, MessageEvent e)
8.AbstractPeer.sent(Channel ch, Request request)

(三).等待远程应答
在调用DubboInvoker.doInvoke(Invocation invocation)中实际是调用currentClient.request(invocation, timeout).get(),此方法会返回DefaultFuture,调用get方法会阻塞直到超时,在阻塞的同时netty的io线程会接收到远程应答,如果收到响应会产生io事件调用NettyHandler.messageReceived。

1.NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
2.AbstractPeer.received(Channel ch, Object msg)
3.MultiMessageHandler.received(Channel channel, Object message)
4.AllChannelHandler.received(Channel channel, Object message)
5.DecodeHandler.received(Channel channel, Object message)
6.HeaderExchangeHandler.received(Channel channel, Object message)
7.DefaultFuture.received(Channel channel, Response response)  //注意是static方法
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
    future.doReceived(response);
}

三. dubbo client的核心


我认为dubbo client的核心在DefaultFuture。所以远程调用都不会阻在IO上,而是阻在Future超时wait上,下面忽略掉远程调用把future抽取出来。





下面是代码实现
Java代码  收藏代码

    package executor; 
     
    import java.util.concurrent.Callable; 
    import java.util.concurrent.ExecutionException; 
    import java.util.concurrent.ExecutorService; 
    import java.util.concurrent.Executors; 
    import java.util.concurrent.Future; 
    import java.util.concurrent.atomic.AtomicLong; 
     
    public class Commands { 
     
        private ExecutorService senders   = Executors.newCachedThreadPool(); 
        private ExecutorService receviers = Executors.newCachedThreadPool(); 
        private AtomicLong      counter   = new AtomicLong(); 
     
        public CommandResponse execute(Callable<Object> task, int timeout) { 
            Future<Object> result = senders.submit(task); 
            long id = counter.getAndIncrement(); 
            CommandFuture commandFuture = new CommandFuture(id); 
            receviers.submit(new ReceiveWorker(id, result)); 
            return commandFuture.get(timeout); 
        } 
     
        static class ReceiveWorker implements Runnable { 
     
            private Future<Object> result; 
            private Long           id; 
     
            public ReceiveWorker(Long id, Future<Object> result){ 
                super(); 
                this.result = result; 
                this.id = id; 
            } 
     
            @Override 
            public void run() { 
                try { 
                    Object obj = result.get(); 
                    CommandFuture.received(new CommandResponse(id, obj)); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } catch (ExecutionException e) { 
                    e.printStackTrace(); 
                } 
            } 
        } 
     
        public void shutdown() { 
            senders.shutdown(); 
            receviers.shutdown(); 
        } 
    } 


Java代码  收藏代码

    package executor; 
     
    import java.util.Map; 
    import java.util.concurrent.ConcurrentHashMap; 
    import java.util.concurrent.TimeUnit; 
    import java.util.concurrent.locks.Condition; 
    import java.util.concurrent.locks.Lock; 
    import java.util.concurrent.locks.ReentrantLock; 
     
    public class CommandFuture { 
     
        private final Lock                            lock    = new ReentrantLock(); 
     
        private final Condition                       done    = lock.newCondition(); 
     
        private CommandResponse                                response; 
     
        private static final Map<Long, CommandFuture> FUTURES = new ConcurrentHashMap<Long, CommandFuture>(); 
     
         
        public CommandFuture(Long id){ 
            FUTURES.put(id, this); 
        } 
     
        public boolean isDone() { 
            return response != null; 
        } 
     
        public CommandResponse get(int timeout) { 
     
            if (!isDone()) { 
                long start = System.currentTimeMillis(); 
                lock.lock(); 
                try { 
                    while (!isDone()) { 
                        done.await(timeout, TimeUnit.MILLISECONDS); 
                        if (isDone() || System.currentTimeMillis() - start >= timeout) { 
                            break; 
                        } 
                    } 
                } catch (InterruptedException e) { 
                    throw new RuntimeException(e); 
                } finally { 
                    lock.unlock(); 
                } 
                if (!isDone()) { 
                    throw new TimeoutException("timeout"); 
                } 
            } 
            return response; 
        } 
     
        public void doReceived(CommandResponse response) { 
            lock.lock(); 
            try { 
                this.response = response; 
                if (done != null) { 
                    done.signal(); 
                } 
            } finally { 
                lock.unlock(); 
            } 
     
        } 
     
        public static void received(CommandResponse response) { 
            try { 
                CommandFuture future = FUTURES.remove(response.getId()); 
                if (future != null) { 
                    future.doReceived(response); 
                } else { 
                    System.out.println("some error!"); 
                } 
            } finally { 
                // CHANNELS.remove(response.getId()); 
            } 
        } 
    } 


Java代码  收藏代码

    package executor; 
     
    import java.util.concurrent.Callable; 
    import java.util.concurrent.ExecutionException; 
    import java.util.concurrent.ExecutorService; 
    import java.util.concurrent.Executors; 
    import java.util.concurrent.Future; 
    import java.util.concurrent.atomic.AtomicLong; 
     
    public class Commands { 
     
        private ExecutorService senders   = Executors.newCachedThreadPool(); 
        private ExecutorService receviers = Executors.newCachedThreadPool(); 
        private AtomicLong      counter   = new AtomicLong(); 
     
        public CommandResponse execute(Callable<Object> task, int timeout) { 
            Future<Object> result = senders.submit(task); 
            long id = counter.getAndIncrement(); 
            CommandFuture commandFuture = new CommandFuture(id); 
            receviers.submit(new ReceiveWorker(id, result)); 
            return commandFuture.get(timeout); 
        } 
     
        static class ReceiveWorker implements Runnable { 
     
            private Future<Object> result; 
            private Long           id; 
     
            public ReceiveWorker(Long id, Future<Object> result){ 
                super(); 
                this.result = result; 
                this.id = id; 
            } 
     
            @Override 
            public void run() { 
                try { 
                    Object obj = result.get(); 
                    CommandFuture.received(new CommandResponse(id, obj)); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } catch (ExecutionException e) { 
                    e.printStackTrace(); 
                } 
            } 
        } 
     
        public void shutdown() { 
            senders.shutdown(); 
            receviers.shutdown(); 
        } 
    } 



下面是jstack
分享到:
评论

相关推荐

    阿里巴巴开源服务框架Dubbo.zip

    主要核心部件: Remoting: 网络通信框架,实现了 sync-over-async 和 request-response 消息机制. RPC: 一个远程过程调用的抽象,支持负载均衡、容灾和集群功能 Registry: 服务目录框架用于服务的注册和服务...

    java面试题,180多页,绝对良心制作,欢迎点评,涵盖各种知识点,排版优美,阅读舒心

    【Dubbo】dubbo运行时,突然所有的zookeeper全部宕机,dubbo是否还会继续提供服务. 169 【Dubbo】dubbo服务是阻塞的吗? 170 【Dubbo】dubbo 默认协议 170 【Dubbo】dubbo注册中心zookeeper支持的功能 171 【Dubbo】...

    在Docker中运行Dubbo应用

    本文来自于阿里云,由火龙果软件Anna编辑、推荐。Dubbo是阿里开源的一个分布式服务框架,在国内粉丝很多。...在Dubbo世界里,服务调用方和服务提供方通过Dubbo的发现机制互相发现。一个最小的Dubbo应用包含如下

    Dubbo服务框架-其他

    Remoting:网络通信框架,实现了sync-over-async和request-response消息机制 RPC:一个远程过程调用的抽象,支持负载均衡、容灾和集群功能 Registry:服务目录框架用于服务的注册和服务事件发布和订阅 Dubbo功能特点...

    Dubbo服务框架 v3.0.0

    Dubbo主要核心部件Remoting:网络通信框架,实现了sync-over-async和request-response消息机制 RPC:一个远程过程调用的抽象,支持负载均衡、容灾和集群功能 Registry:服务目录框架用于服务的注册和服务事件发布...

    dubbo框架源码是阿里巴巴公司开源的一个高性能优秀的服务框架.rar

    Container 服务运行容器 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用...

    springboot+mybatis+dubbo 本项目是基于微服务架构的班车预约系统.zip

    采用两个版本第一个版本:springboot+mybatis+dubbo+rocketmq+mysql+redis等。第二个版 MySQL 是一款广受欢迎的开源关系型数据库管理系统(RDBMS),由瑞典MySQL AB公司开发,现隶属于美国甲骨文公司(Oracle)。自...

    基于Guns+springboot+dubbo开发+源代码+文档说明

    Guns v3.0新增rest api服务,提供对接服务端接口的支持,并利用jwt token鉴权机制给予客户端的访问权限,传输数据进行md5签名保证传输过程数据的安全性! ### Guns v4.0更新内容 &gt; * spring boot升级到2.0版本! &gt; * ...

    基于SpringBoot+Shiro+Redis+Jwt+Thymeleaf+MyBatis 开发的后台用户、角色+源代码+文档

    * 为了方便大学家习dubbo的运行机制,本框架将dubbo的provider和customer作了一个整合,将官方demo里的方多应用整合成了一个,即在同一应用内启动消费端和服务端 * 注:如有实际业务需要请将服务端与消费端分离,...

    基于Spring MVC+MyBatis+Shiro+Dubbo开发的分布式后台管理系统.zip

    跨平台性(Write Once, Run Anywhere): Java的代码可以在不同的平台上运行,只需编写一次代码,就可以在任何支持Java的设备上执行。这得益于Java虚拟机(JVM),它充当了代码和底层硬件之间的中介。 面向对象: ...

    通用权限管理系统:作为配置中心,管理后台系统的菜单、功能、用户、角色等,并提供DUBBO接口。.zip

    跨平台性(Write Once, Run Anywhere): Java的代码可以在不同的平台上运行,只需编写一次代码,就可以在任何支持Java的设备上执行。这得益于Java虚拟机(JVM),它充当了代码和底层硬件之间的中介。 面向对象: ...

    尚硅谷Java视频教程_Spring Boot视频教程(下)整合篇

    简介 14、尚硅谷-SpringBoot高级-消息-RabbitMQ基本概念简介 15、尚硅谷-SpringBoot高级-消息-RabbitMQ运行机制 16、尚硅谷-SpringBoot高级-消息-RabbitMQ安装测试 17、尚硅谷-SpringBoot高级-消息-RabbitTemplate...

    128元尚硅谷Java视频教程_Spring Boot视频教程(下)整合篇

    15、尚硅谷-SpringBoot高级-消息-RabbitMQ运行机制 16、尚硅谷-SpringBoot高级-消息-RabbitMQ安装测试 17、尚硅谷-SpringBoot高级-消息-RabbitTemplate发送接受消息&序列化机制 18、尚硅谷-SpringBoot高级-消息-@...

    大厂学院SVIP十门合集|完结无秘

    Dubbo内核四大机制 运行时内存 不同垃圾收集器工作原理详解 GC日志分析 Spring全家桶源码分析 Tomcat架构原理 Web请求处理原理 数据访问层框架原理 架构与设计思维模式 程序中的数学 数据分析 机器智能算法剖析与...

    springboot知识点整理

    7.1.3 编写事件监听机制 132 8 Spring Boot自定义starters 136 8.1 概述 136 8.2 步骤 137 9 更多Springboot整合示例 144 10 Spring Boot与缓存 145 10.1 JSR107缓存规范 145 10.2 Spring的缓存抽象 146 10.2.1 基本...

    程序员Java互联网求职个人简历

    3、协助并快速定位及解决开发及运行过程中的技术问题。 项目技能 1、JAVA基础扎实,良好的面向对象编程思想;熟练使用Tomcat、Nginx等应用服务器; 2、熟练使用Spring, SpringMVC, Hibernate等流行的开发框架,理解...

    Java基于Netty实现的高性能分布式IM即时通信系统源码+项目说明.tar

    + 使用Dubbo的泛化调用机制实现服务的定向调用,解决了因分布式IM_Server的部署导致的用户信息分散在不同服务器上的问题 + Dubbo泛化调用的地址为一致性哈希负载均衡算法计算所得 + 解决了自定义协议在传输中导致的...

    tyloo:分布式交易框架——TCC

    基于Dubbo的ProxyFactory代理机制为服务接口生成代理对象。 基于Mysql,Redis乐观锁进行事务版本控制以及基于石英进行事务恢复。 支持各种事务日志序列化以及事务存储实现。 调用方式(版本):Dubbo,HTTP 业务场景...

    GMall项目是一套电商系统.zip

    包括前台商城系统及后台管理系统,基于SpringBoot+MyBatisPlus+Dubbo+zookeeper实现。 前台商城系统包含首页门户、商品推荐、商品搜索、商品展示、购物车、订单流程、会员中心、客户服务、帮助中心等模块。 Java是...

Global site tag (gtag.js) - Google Analytics