登录  
 加关注
查看详情
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

学无止境

一点积累,与大家分享

 
 
 

日志

 
 

mina 基于udp协议的demo  

2009-12-10 09:28:32|  分类: java技术 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
server 端:

package com.cusc.gcm.s2s.udp;

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoBuffer;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;

import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.filter.logging.LoggingFilter;

import org.apache.mina.transport.socket.DatagramAcceptor;
import org.apache.mina.transport.socket.nio.NioDatagramAcceptor;

import com.cusc.gcm.core.user.UserWarehouse;
import com.cusc.gcm.remoting.invoker.vo.RealTimeTrackVO;
import com.cusc.gcm.s2s.CharsetUtils;
import com.cusc.gcm.s2s.Endpoint;

public class UDPServerEndpoint implements Endpoint {

private static Log log = LogFactory.getLog(UDPServerEndpoint.class);
private DatagramAcceptor acceptor = new NioDatagramAcceptor();

public void addPacket(String packet) {}

public void start() {   
   new Thread(new EndpointThread()).start();
}

public void stop() {
   acceptor.unbind();
}

class EndpointThread implements Runnable {

   public void run() {
    acceptor.getFilterChain().addLast("logger", new LoggingFilter());
    acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new EndpointCodecFactory()));
   
         acceptor.getSessionConfig().setReadBufferSize( 2048 );
         acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
         acceptor.getSessionConfig().setReuseAddress(true);

    acceptor.setHandler(new IoHandlerAdapter() {
    
     @Override
     public void exceptionCaught(IoSession session, Throwable cause)
       throws Exception {
      cause.printStackTrace();
     }

     @Override
     public void messageReceived(IoSession session, Object message)
       throws Exception {
         ,......
     }
    });
   
    try {
     acceptor.bind(new InetSocketAddress(7008));
    } catch (IOException e) {
     e.printStackTrace();
    }  
   }  
}

}

class EndpointCodecFactory implements ProtocolCodecFactory {

private final EndpointDecoder decoder;
private final EndpointEncoder encoder;

public EndpointCodecFactory() {
   decoder = new EndpointDecoder();
   encoder = new EndpointEncoder();
}

public ProtocolDecoder getDecoder(IoSession session) throws Exception {
   return decoder;
}

public ProtocolEncoder getEncoder(IoSession session) throws Exception {
   return encoder;
}
}

class EndpointDecoder extends CumulativeProtocolDecoder {

@Override
protected boolean doDecode(IoSession session, IoBuffer in,
    ProtocolDecoderOutput out) throws Exception {
  
    String packet = CharsetUtils.decode("gb2312", in);
    out.write(packet);
    return true;
}
}

class EndpointEncoder extends ProtocolEncoderAdapter {

public void encode(IoSession session, Object message,
    ProtocolEncoderOutput out) throws Exception {
}
}

client端:

package com.cusc.gcm.s2s.udp;

import java.net.InetSocketAddress;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoBuffer;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoFutureListener;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;

import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioDatagramConnector;

import com.cusc.gcm.s2s.Endpoint;
import com.cusc.gcm.s2s.EndpointConfigurer;


public class UDPClientEndpoint implements Endpoint {

private Log log = LogFactory.getLog(UDPClientEndpoint.class);

private IoConnector connector;
private static Queue<String> packets = new ConcurrentLinkedQueue<String>();

public void addPacket(String packet) {   
   packets.add(packet);
}


public void start() {  
   new Thread(new EndpointThread()).start();
}

public void stop() {
   connector.dispose();
}


class EndpointThread implements Runnable {

   public void run() {
    connector = new NioDatagramConnector();
    connector.getFilterChain().addLast("logger", new LoggingFilter());
    connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));

    connector.setHandler(new IoHandlerAdapter() {

       public void sessionOpened(IoSession session) throws Exception {
      
       log.info(" sessionOpened ... ");    
      while(true) {
       try {  
        log.info(" packets size : " + packets.size());
       
        if (packets.isEmpty()) {
         Thread.sleep(2000);
        }else {
        
         IoBuffer buffer = IoBuffer.allocate(500);
         buffer.put(packets.poll().getBytes());
         buffer.flip();      
         System.out.println(buffer.toString());      
         session.write(buffer);
        
         Thread.sleep(100);
        }
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
      }      
       }

       public void messageReceived(IoSession session, Object message)
               throws Exception {
       }

       public void messageSent(IoSession session, Object message) throws Exception {
       }
    });
    ConnectFuture future = connector.connect(new InetSocketAddress(EndpointConfigurer.SERVER, EndpointConfigurer.UDP_SERVER_PORT));
    future.awaitUninterruptibly();

    log.debug("Adding a future listener.");
    future.addListener(new IoFutureListener<ConnectFuture>() {
     public void operationComplete(ConnectFuture f) {
      if (f.isConnected()) {
       log.debug("...connected");
      } else {
       log.error("Not connected...exiting");
      }
     }
    });
   
   }
}

}

test:

package com.cusc.gcm.test;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.cusc.gcm.s2s.Endpoint;
import com.cusc.gcm.s2s.EndpointFactory;
import com.cusc.gcm.s2s.EndpointPacketFactory;

public class S2SEndpointTest {

private static Log log = LogFactory.getLog(S2SEndpointTest.class);

public static void main(String[] args){
  
   log.info(" endpoint . ");

   Endpoint server = EndpointFactory.createUDPClientEndpoint();
   server.start();  
   log.info(" endpoint . ");
   for(int i =0; i<10000; i++) {
    server.addPacket(EndpointPacketFactory.createAlarmParseSettingPacket());
   
    try {
     Thread.sleep(1000);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }

}

}



  评论这张
 
阅读(885)| 评论(0)

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2018