上文提到的udpsocket,只是一个简单的socket封装,被动地调用。本文仿照mfc的casyncsocket方式做一个包装调用,不过事件响应采用select,不用mfc的窗口消息。
简单接口如下:
typedef void (*udp_data_cb)(int sockid, char *data, int len, int ip, int port, int timestamp, void* param);
class cudpsession
{
cudpsession();
virtual ~cudpsession();
int open(uint16_t uslocalport, udp_data_cb cb, void* param);
int close();
int send(char* pbuff,uint32_t nlen,uint32_t ip,uint16_t port);
private:
udp_data_cb m_pcb;
void* m_pparam;
private:
static void* eventloop(void* lpparameter );
void processevent();
void ontimer();
int onrecv(char* pbuff,uint32_t nlen, uint32_t ip,uint16_t port);
bool m_beventloopstarted;
cudpsock m_udpio;
pthread_t m_tid;
};
出于通用和简便上考虑,线程采用了pthread形式。在windwos下,可以参考:。
这个项目把windows的线程使用重新封装了一下,话说windows原生的线程函数实在是.....。
class实现只展示几个重点函数,其他在源码里。
int cudpsession::open(uint16_t uslocalport, udp_data_cb cb, void* param)
{
m_pcb = cb;
m_pparam = param;
if (m_beventloopstarted)
return 0;
int iresult = m_udpio.open(uslocalport);
if (iresult > 0)
{
pthread_create (&m_tid, null, eventloop, this);
}
return iresult;
}
int cudpsession::close()
{
if (m_beventloopstarted)
{
m_beventloopstarted = false;
pthread_join(tid, null);
m_udpio.close();
}
return 0;
}
void* cudpsession::eventloop(void* lpparameter )
{
cudpsession* pthis = (cudpsession*)lpparameter;
pthis->processevent();
return 0;
}
void cudpsession::processevent()
{
timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 50000; //50ms
fd_set fsread;
char szbuffer[1500];
int ireceived = 0;
int ierrorcode = 0;
m_beventloopstarted = true;
bool blastsend = false;
while (m_beventloopstarted)
{
fd_zero( &fsread );
fd_set(m_udpio.m_handle, &fsread);
int ret = select (1024, &fsread, null, null, &timeout);
if (ret < 0)
{
//err
}
else if (ret == 0)
{
ontimer();
}
else
{
if(fd_isset( m_udpio.m_handle, &fsread))
{
uint32_t ip;
uint16_t port;
ireceived = m_udpio.receive(szbuffer, 1500, ip, port);
if (ireceived > 0)
{
onrecv(szbuffer, ireceived, ip, port);
}
else
{
//error
}
}
}
timeout.tv_sec = 0;
timeout.tv_usec = 50000;
}
}
void cudpsession::ontimer()
{
struct timeval cur;
gettimeofday(&cur, null);
int timestamp = (cur.tv_sec-start_time.tv_sec)*1000 (cur.tv_usec - start_time.tv_usec)/1000;
//start_time是一个时间标志,在合适的位置初始化一下,gettimeofday(&start_time, null);定时时间到就重置。
}
代码非常简单,只要熟悉select的使用就全理解了。主要讲讲ontimer,这里是借用select实现的一个定时器,
(这里是50ms,根据情况可以调整精度,不过时间过小,会引起cpu占用升高)。但是当有事件时,
timeout.tv_usec = 50000; 并不精确,所以需要gettimeofday得到当前的时间,再减去开始的时间start_time,得到一个时间跨度。gettimeofday是linux函数,windows api没有提供,不过很多开源项目里都有,这里从live555 copy了一个:
#if defined(__win32__) || defined(_win32)
// for windoze, we need to implement our own gettimeofday()
// used to make sure that static variables in gettimeofday() aren't initialized simultaneously by multiple threads
static long initializelock_gettimeofday = 0;
#if !defined(_win32_wce)
#include
#endif
int gettimeofday(struct timeval* tp, int* /*tz*/) {
static large_integer tickfrequency, epochoffset;
static bool isinitialized = false;
large_integer ticknow;
#if !defined(_win32_wce)
queryperformancecounter(&ticknow);
#else
ticknow.quadpart = gettickcount();
#endif
if (!isinitialized) {
if(1 == interlockedincrement(&initializelock_gettimeofday)) {
#if !defined(_win32_wce)
// for our first call, use "ftime()", so that we get a time with a proper epoch.
// for subsequent calls, use "queryperformancecount()", because it's more fine-grain.
struct timeb tb;
ftime(&tb);
tp->tv_sec = tb.time;
tp->tv_usec = 1000*tb.millitm;
// also get our counter frequency:
queryperformancefrequency(&tickfrequency);
#else
/* filetime of jan 1 1970 00:00:00. */
const longlong epoch = 116444736000000000ll;
filetime filetime;
large_integer time;
getsystemtimeasfiletime(&filetime);
time.highpart = filetime.dwhighdatetime;
time.lowpart = filetime.dwlowdatetime;
// convert to from 100ns time to unix timestamp in seconds, 1000*1000*10
tp->tv_sec = (long)((time.quadpart - epoch) / 10000000l);
/*
getsystemtimeasfiletime has just a seconds resolution,
thats why wince-version of gettimeofday is not 100% accurate, usec accuracy would be calculated like this:
// convert 100 nanoseconds to usec
tp->tv_usec= (long)((time.quadpart - epoch)000000l) / 10l;
*/
tp->tv_usec = 0;
// resolution of gettickcounter() is always milliseconds
tickfrequency.quadpart = 1000;
#endif
// compute an offset to add to subsequent counter times, so we get a proper epoch:
epochoffset.quadpart
= tp->tv_sec * tickfrequency.quadpart (tp->tv_usec * tickfrequency.quadpart) / 1000000l - ticknow.quadpart;
// next caller can use ticks for time calculation
isinitialized = true;
return 0;
} else {
interlockeddecrement(&initializelock_gettimeofday);
// wait until first caller has initialized static values
while(!isinitialized){
sleep(1);
}
}
}
// adjust our tick count so that we get a proper epoch:
ticknow.quadpart = epochoffset.quadpart;
tp->tv_sec = (long)(ticknow.quadpart / tickfrequency.quadpart);
tp->tv_usec = (long)(((ticknow.quadpart % tickfrequency.quadpart) * 1000000l) / tickfrequency.quadpart);
return 0;
}
#endif
以上是一个udp会话的整个封装,用来传文件等自定义大小的数据直接用就可以了。不过,用来传实时视频的话,部分视频帧肯定超过1500了,所以需要分包,收到后再生组。下文主要讲分包重组,然后代码再一起提交。
阅读(5631) | 评论(0) | 转发(2) |