简单回顾下业务模型:收银台<=>POS代理服务器<=>POS机,三者之间进行通讯,POS代理作为一个报文转换和同步转异步角色。
下面介绍下几个关键技术实现:
1、报文
这里的报文,指的是POS代理服务器跟POS通讯之间约定的报文。根据POS交易类型,支付、刷卡、打印等,约定每个交易报文包含什么字段信息和长度,其中一个比较特别字段是UUID,这个字段是每个报文的关键字段,
具有唯一性,每个报文都不同,主要用来实现同步转异步中,POS返回数据给代理服务器后找回原来发送指令的channel,并最终把转换后的数据发送给收银台。
之所以要找到原来的channel,是因为同步转异步的过程中,channel是被临时保存起来的。
2、同步转异步关键代码
public class PosResponseFuture<I> { private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); private String uuid;//消息序列号 //psoresponse使用 private final static Map<String, PosResponseFuture> futures = new ConcurrentHashMap<String, PosResponseFuture>(); private final static Object synLock = new Object(); public I write2pos(boolean broadcastFlag,MsgRequest msg) throws PosConnException,TimeOutException,TryLaterException { synchronized(synLock) { long st = System.currentTimeMillis(); lock.lock(); try { this.uuid = msg.getId(); futures.put(this.uuid, this);//把当前调用环境保存起来 //向pos发送消息 log.debug("向POS发送消息:{}",msg); PosIntContext.write2pos(msg); int timeout = PosIntContext.getApiTimeout(); if (msg.getTimeout()!=-1) { timeout = msg.getTimeout(); log.debug("超时设置:{}",timeout); } //这里是同步转异步关键 //程序执行到这里,一直处于阻塞状态,直到POS返回 //这里还设置了一个超时时间,避免POS出现故障,导致调用一直在等待 done.await(timeout,TimeUnit.SECONDS); if (!isDone()) { throw new TimeOutException("超时("+timeout+"秒)"); } } catch (InterruptedException e) { log.error("write2pos InterruptedException: "+e.getMessage()); throw new PosConnException(e); } catch (TimeOutException e) { throw e; } catch (PosConnException e) { throw e; } catch (TryLaterException e) { throw e; } finally { this.release(); lock.unlock(); } long en = System.currentTimeMillis(); log.debug("{} 执行时间:{}",msg.toString(),(en-st)); //POS执行完成,正常返回 if (response instanceof MsgResponse) { return (I)response; } return null; } } /** * pos返回消息回调 * @Title: received * @Description: TODO * @param @param response * @return void * @throws */ public static void received(MsgResponse response) { //用主键取回调用环境 PosResponseFuture<?> future = futures.remove(response.getId()); if (future != null) { future.doReceived(response); } } /** * 检测返回值 * @Title: isDone * @Description: TODO * @param @return * @return boolean * @throws */ private boolean isDone() { return this.response != null;//null代表超时 } /** * 接受到返回 * @Title: doReceived * @Description: TODO * @param @param response * @return void * @throws */ private void doReceived(MsgResponse response) { lock.lock();//同步控制,线程安全 try { this.response = response; done.signal();//notify,通知线程往下执行 } finally { lock.unlock(); } } /** * 释放资源 * @Title: release * @Description: TODO * @param * @return void * @throws */ private void release() { PosResponseFuture<I> tmp = futures.remove(this.uuid); if (tmp!=null) { log.debug("释放资源:{}",new Object[]{this.uuid,tmp.getProcessMsg()}); } else { log.debug("释放资源:NULL!"); } } public static void main(String args[]) { } }
3、POS代理服务器暴露RPC调用接口关键代码
public class Client { //这个代码包含了rpc调用的核心 @SuppressWarnings("unchecked") public <T> T getProxy(Class<T> interfaceClass,final String host,final int port) { return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { //其实就是一个AOP拦截 public Object invoke(Object proxy, Method method,Object[] arguments) throws Throwable { Socket socket = null; ObjectOutputStream output = null; ObjectInputStream input = null; try { //把需要调用的类、方法和参数,序列化传输到RPC服务器 //等待远端调用完成返回结果 socket = new Socket(host, port); output = new ObjectOutputStream(socket.getOutputStream()); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(arguments); input = new ObjectInputStream(socket.getInputStream()); return input.readObject(); } catch(Exception e) { throw e; } finally { if (socket!=null) { socket.close(); } if (output!=null) { output.close(); } if (input!=null) { input.close(); } } } }); } public static void main(String args[]) { HelloService helloService = new Client().getProxy(HelloService.class,"localhost",8080); long st = System.currentTimeMillis(); for (int i=0; i<1; i++) { System.out.println(i+"> "+helloService.sayHello("哈哈")); } long en = System.currentTimeMillis(); System.out.println("耗时:"+(en-st)); } } public class Server { private int port = 8888; public void rpcServer() throws Exception { ServerSocket server = null; try { server = new ServerSocket(port); for(;;) { final Socket socket = server.accept(); System.out.println(socket.getRemoteSocketAddress()); new Thread(new Runnable() { @Override public void run() { ObjectOutputStream output = null; ObjectInputStream input = null; try { input = new ObjectInputStream(socket.getInputStream());//接受rpc client请求 String methodName = input.readUTF();//调用方法名 Class<?>[] parameterTypes = (Class<?>[])input.readObject(); Object[] arguments = (Object[])input.readObject();//调用参数 output = new ObjectOutputStream(socket.getOutputStream()); Method method = new HelloServiceImp().getClass().getMethod(methodName, parameterTypes); Object result = method.invoke(new HelloServiceImp(), arguments);//执行调用 output.writeObject(result);//回写结果 } catch(Exception e) { e.printStackTrace(); } finally { try { if (output!=null) { output.close(); } if (input!=null) { input.close(); } } catch(Exception e) { } } } }).start(); } } catch(Exception e) { throw e; } finally { if (server!=null) { server.close(); } } } public static void main(String args[]) throws Exception { new Server().rpcServer(); } } public interface HelloService { public String sayHello(String input); } public class HelloServiceImp implements HelloService { @Override public String sayHello(String input) { return input + " wellcome."; } }
相关推荐
异步回调和同步回调相比,除了序列化组件和连接池组件,会多...异步回调能提高系统整体的吞吐量,具体使用哪种方式实现RPC-client,可以结合业务场景来选取(对时延敏感的可以选用同步,对吞吐量敏感的可以选用异步)。
该资源原理说明与博客《异步&同步加载树节点----zTree(一)》相一致。
react-redux-rxjs-starter-kit一个集成React全家桶的脚手架,主要技术栈:react16 + routerV4 + redux + webpack3 + redux-observable + rxjs + scss + less + css modules + react-hmre + eslint + 组件异步加载 + ...
FPGA-同步、异步复位-异步复位同步释放-实例分析,包括verilog代码以及电路图。
python,喜欢学习python的同学可以抓紧时间下载学习哦,这个资源也是我花了很多时间才收集到的,所以,有资源总是好的,当然,不是一味的收集,收集的同时也要学习哦
异步的 RPC ,一个关于com原理的方面。
用FPGA实现异步串口与同步串口的转换,李文亮,姚冬苹,TI公司的TMS320C64xx系列DSP芯片的McBSP同步串口不具备与UART异步串口直接通信的能力,为了解决这个问题,扩展DSP芯片的使用范围,本文介��
这是博文《异步加载图片(一)》对应的源码,这段代码仅实现了异步加载,对于滑动时暂停,停划时加载的问题会在《异步加载图片(二)》中讲解,这篇文章的地址:...
创建React应用程序+ Redux +路由器+更少+ Mock-API + Axios包括路由权限,视图权限可用脚本在项目目录中,可以运行:npm start (包括模拟数据)npm模拟模拟数据npm testnpm run build/ngx ngx = builde +文件副本...
jrpc是一个异步多线程的RPC框架, 采用json格式的序列化/反序列化方案, 传输协议为. 框架的结构如下图所示: 位于框架底层, 向下调用Linux socket API, 向上提供消息回调. 此外,网络库还具有定时器, 线程池, 日志输出...
行业分类-设备装置-一种结合同步及异步技术的高升力控制系统.zip
基于C#的TCP/IP同步以及异步通信实现方法,我已经把两种方法封装好了,直接调用就可以了,不仅可以实时获取连接的客户端,还可以实时刷新客户端连接,很好用的,如果有什么问题,欢迎联系我!
使用qt写的请求http服务类,实现了get同步请求、get异步请求、post同步请求、post异步请求。4个方法,方便实用。
### 一、为什么要写一个RPC框架? > + 不是想要造轮子,Dubbo、gRPC、Thift这些轮子已经非常好用了 > + RPC在微服务、分布式系统、Web服务器方面应用太广泛了,需要对底层通信过程有基本认识 > + Nignx、Hadoop、...
今天小编就为大家分享一篇Angular异步变同步处理方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
easyui的树形菜单展示-同步+异步请求后台方法-含后台java代码,
最近项目要用到Socket,遇到一些坑,比如频繁发消息 收消息,会产生粘包拆包等问题,还有断线重连的问题
C语言实现同步http的get请求,上传文件和C语言实现异步http的get请求,上传文件。 编译步骤和所需库在文件中已说明,该实现经测试可直接编译运行。
博文《PullToRefresh使用详解(三)--实现异步加载的下拉刷新列表》对应的源码,博客对应地址:http://blog.csdn.net/harvic880925/article/details/17789617
PIC单片机实现的通用同步/异步通信的应用(源程序)