/* * (c) BayCom GmbH, http://www.baycom.de, info@baycom.de * * See the COPYING file for copyright information and * how to reach the author. * * modified by Reel Multimedia, http://www.reel-multimedia.com, info@reel-multimedia.com * 01042010 DL: use a single thread for reading from network layer (uses less resources) * */ #include "headers.h" //---------------------------------------------------------------------------------------------------------------------------------- STATIC int udp_ipv6_is_multicast_address (const struct sockaddr *addr) { #ifdef IPV4 if (addr->sa_family == AF_INET) return IN_MULTICAST (ntohl (((struct sockaddr_in *) addr)->sin_addr.s_addr)); #endif if (addr->sa_family == AF_INET6) return IN6_IS_ADDR_MULTICAST (&((struct sockaddr_in6 *) addr)->sin6_addr); return -1; } //--------------------------------------------------------------------------------------------------------------------------------- STATIC int udp_ipv6_set_multicast_ttl (SOCKET sockfd, int mcastTTL, struct sockaddr *addr) { #ifdef IPV4 if (addr->sa_family == AF_INET) { if (setsockopt (sockfd, IPPROTO_IP, IP_MULTICAST_TTL, &mcastTTL, sizeof (mcastTTL)) < 0) { perror ("setsockopt(IP_MULTICAST_TTL)"); return -1; } } #endif if (addr->sa_family == AF_INET6) { if (setsockopt (sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, (_SOTYPE)&mcastTTL, sizeof (mcastTTL)) < 0) { perror ("setsockopt(IPV6_MULTICAST_HOPS)"); return -1; } } return 0; } //--------------------------------------------------------------------------------------------------------------------------------- int udp_ipv6_join_multicast_group (SOCKET sockfd, int iface, struct sockaddr *addr) { #ifdef IPV4 if (addr->sa_family == AF_INET) { struct ip_mreq mreq; mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr; mreq.imr_interface.s_addr = INADDR_ANY; if (setsockopt (sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *) &mreq, sizeof (mreq)) < 0) { perror ("setsockopt(IP_ADD_MEMBERSHIP)"); return -1; } } #endif if (addr->sa_family == AF_INET6) { struct ipv6_mreq mreq6; memcpy (&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *) addr)->sin6_addr), sizeof (struct in6_addr)); mreq6.ipv6mr_interface = iface; if (setsockopt (sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, (_SOTYPE)&mreq6, sizeof (mreq6)) < 0) { perror ("setsockopt(IPV6_ADD_MEMBERSHIP)"); return -1; } } return 0; } //--------------------------------------------------------------------------------------------------------------------------------- int udp_ipv6_leave_multicast_group (SOCKET sockfd, int iface, struct sockaddr *addr) { #ifdef IPV4 if (addr->sa_family == AF_INET) { struct ip_mreq mreq; mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr; mreq.imr_interface.s_addr = INADDR_ANY; if (setsockopt (sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *) &mreq, sizeof (mreq)) < 0) { perror ("setsockopt(IP_DROP_MEMBERSHIP)"); return -1; } } #endif if (addr->sa_family == AF_INET6) { struct ipv6_mreq mreq6; memcpy (&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *) addr)->sin6_addr), sizeof (struct in6_addr)); mreq6.ipv6mr_interface = iface; if (setsockopt (sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, (_SOTYPE)&mreq6, sizeof (mreq6)) < 0) { perror ("setsockopt(IPV6_DROP_MEMBERSHIP)"); return -1; } } return 0; } //----------------------------------------------------------------------------------------------------------------------------------- STATIC int sockfd_to_family (SOCKET sockfd) { struct sockaddr_storage ss; socklen_t len; len = sizeof (ss); if (getsockname (sockfd, (SA *) & ss, &len) < 0) return (-1); return (ss.ss_family); } /* end sockfd_to_family */ //---------------------------------------------------------------------------------------------------------------------------------- int mcast_set_if (SOCKET sockfd, const char *ifname, u_int ifindex) { switch (sockfd_to_family (sockfd)) { #ifdef IPV4 case AF_INET:{ struct in_addr inaddr; struct ifreq ifreq; if (ifindex > 0) { if (if_indextoname (ifindex, ifreq.ifr_name) == NULL) { errno = ENXIO; /* i/f index not found */ return (-1); } goto doioctl; } else if (ifname != NULL) { memset(&ifreq, 0, sizeof(struct ifreq)); strncpy (ifreq.ifr_name, ifname, IFNAMSIZ-1); doioctl: if (ioctl (sockfd, SIOCGIFADDR, &ifreq) < 0) return (-1); memcpy (&inaddr, &((struct sockaddr_in *) &ifreq.ifr_addr)->sin_addr, sizeof (struct in_addr)); } else inaddr.s_addr = htonl (INADDR_ANY); /* remove prev. set default */ return (setsockopt (sockfd, IPPROTO_IP, IP_MULTICAST_IF, &inaddr, sizeof (struct in_addr))); } #endif case AF_INET6:{ u_int idx; // printf("Changing interface IPV6...\n"); if ((idx = ifindex) == 0) { if (ifname == NULL) { errno = EINVAL; /* must supply either index or name */ return (-1); } if ((idx = if_nametoindex (ifname)) == 0) { errno = ENXIO; /* i/f name not found */ return (-1); } } return (setsockopt (sockfd, IPPROTO_IPV6, IPV6_MULTICAST_IF, (_SOTYPE)&idx, sizeof (idx))); } default: // errno = EAFNOSUPPORT; return (-1); } } //-------------------------------------------------------------------------------------------------------------------------------------------- UDPContext *server_udp_open (const struct in6_addr *mcg, int port, const char *ifname) { UDPContext *s; int sendfd; int n; pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL); s = (UDPContext *) calloc (1, sizeof (UDPContext)); if (!s) { err ("Cannot allocate memory !\n"); goto error; } struct sockaddr_in6 *addr = (struct sockaddr_in6 *) &s->dest_addr; addr->sin6_addr=*mcg;; addr->sin6_family = AF_INET6; addr->sin6_port = htons (port); s->dest_addr_len = sizeof (struct sockaddr_in6); sendfd = socket (PF_INET6, SOCK_DGRAM, IPPROTO_UDP); if (sendfd < 0) { err ("cannot get socket\n"); } s->dest_addr_len = sizeof (struct sockaddr_in6); if ((udp_ipv6_is_multicast_address ((struct sockaddr *) &s->dest_addr))) { if (ifname && strlen (ifname) && (mcast_set_if (sendfd, ifname, 0) < 0)) { warn ("mcast_set_if error\n"); goto error; } if (udp_ipv6_set_multicast_ttl (sendfd, MCAST_TTL, (struct sockaddr *) &s->dest_addr) < 0) { warn ("udp_ipv6_set_multicast_ttl"); } } n = UDP_TX_BUF_SIZE; if (setsockopt (sendfd, SOL_SOCKET, SO_SNDBUF, (_SOTYPE)&n, sizeof (n)) < 0) { warn ("setsockopt sndbuf"); } s->is_multicast = 0; //server s->udp_fd = sendfd; s->local_port = port; dbg ("Multicast streamer initialized successfully ! \n"); pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL); return s; error: err ("Cannot init udp_server !\n"); if (s) { free (s); } pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL); return NULL; } UDPContext *server_udp_open_host (const char *host, int port, const char *ifname) { struct in6_addr addr; inet_pton (AF_INET6, host, &addr); return server_udp_open (&addr, port, ifname); } //-------------------------------------------------------------------------------------------------------------------------------------------- UDPContext *client_udp_open (const struct in6_addr *mcg, int port, const char *ifname) { UDPContext *s; int recvfd; int n; pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL); s = (UDPContext *) calloc (1, sizeof (UDPContext)); if (!s) { err ("Cannot allocate memory !\n"); goto error; } struct sockaddr_in6 *addr = (struct sockaddr_in6 *) &s->dest_addr; #ifndef WIN32 addr->sin6_addr=*mcg; #else struct in6_addr any; memset(&any,0,sizeof(any)); addr->sin6_addr=any; #endif addr->sin6_family = AF_INET6; addr->sin6_port = htons (port); s->dest_addr_len = sizeof (struct sockaddr_in6); recvfd = socket (PF_INET6, SOCK_DGRAM, IPPROTO_UDP); if (recvfd < 0) { err ("cannot get socket\n"); } #ifdef WIN32 # ifndef IPV6_PROTECTION_LEVEL # define IPV6_PROTECTION_LEVEL 23 # endif n = 10 /*PROTECTION_LEVEL_UNRESTRICTED*/; if(setsockopt( recvfd, IPPROTO_IPV6, IPV6_PROTECTION_LEVEL, (_SOTYPE)&n, sizeof(n) ) < 0 ) { warn ("setsockopt IPV6_PROTECTION_LEVEL\n"); } #endif n = 1; if (setsockopt (recvfd, SOL_SOCKET, SO_REUSEADDR, (_SOTYPE)&n, sizeof (n)) < 0) { warn ("setsockopt REUSEADDR\n"); } #if ! (defined WIN32 || defined APPLE) if (ifname && strlen (ifname) && setsockopt (recvfd, SOL_SOCKET, SO_BINDTODEVICE, ifname, strlen (ifname) + 1)) { dbg ("setsockopt SO_BINDTODEVICE %s failed\n", ifname); } #endif if (bind (recvfd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len) < 0) { warn ("bind failed\n"); goto error; } #ifdef WIN32 addr->sin6_addr=*mcg; #endif if (udp_ipv6_is_multicast_address ((struct sockaddr *) &s->dest_addr)) { #if 0 if (ifname && strlen (ifname) && (mcast_set_if (recvfd, ifname, 0) < 0)) { warn ("mcast_set_if error \n"); goto error; } #endif if (ifname) { if ((s->idx = if_nametoindex (ifname)) == 0) { s->idx = 0; } else { dbg("Selecting interface %s (%d)", ifname, s->idx); } } else { s->idx = 0; } if (udp_ipv6_join_multicast_group (recvfd, s->idx, (struct sockaddr *) &s->dest_addr) < 0) { warn ("Cannot join multicast group !\n"); goto error; } s->is_multicast = 1; } n = UDP_RX_BUF_SIZE; if (setsockopt (recvfd, SOL_SOCKET, SO_RCVBUF, (_SOTYPE)&n, sizeof (n)) < 0) { warn ("setsockopt rcvbuf"); goto error; } s->udp_fd = recvfd; s->local_port = port; pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL); return s; error: warn ("socket error !\n"); if (s) { free (s); } pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL); return NULL; } UDPContext *client_udp_open_host (const char *host, int port, const char *ifname) { struct in6_addr addr; inet_pton (AF_INET6, host, &addr); return client_udp_open (&addr, port, ifname); } //-------------------------------------------------------------------------------------------------------------------------------------------- int udp_read (UDPContext * s, uint8_t * buf, int size, int timeout, struct sockaddr_storage *from) { socklen_t from_len = sizeof (struct sockaddr_storage); struct sockaddr_storage from_local; if(!from) { from=&from_local; } struct pollfd p; p.fd = s->udp_fd; p.events = POLLIN; if(poll(&p,1,(timeout+999)>>10)>0) { return recvfrom (s->udp_fd, (char *)buf, size, 0, (struct sockaddr *) from, &from_len); } return -1; } //-------------------------------------------------------------------------------------------------------------------------------------------------- int udp_write (UDPContext * s, uint8_t * buf, int size) { int ret; for (;;) { ret = sendto (s->udp_fd, (char *) buf, size, 0, (struct sockaddr *) &s->dest_addr, s->dest_addr_len); if (ret < 0) { if (errno != EINTR && errno != EAGAIN) return -1; } else { break; } } return size; } //---------------------------------------------------------------------------------------------------------------------------------------------------- int udp_close (UDPContext * s) { if (s->is_multicast) udp_ipv6_leave_multicast_group (s->udp_fd, s->idx, (struct sockaddr *) &s->dest_addr); closesocket (s->udp_fd); free (s); return 0; } //-------------------------------------------------------------------------------------------------------------------------------------------- #ifndef MULTI_THREAD_RECEIVER #define MAX_BUFF_SIZE 0x10000 #define MAX_CON_LIST 128 UDPContext *gConList[MAX_CON_LIST]; pthread_mutex_t gConListLock = PTHREAD_MUTEX_INITIALIZER; static int gConListInit=0; static int gConListModified; static UDPContext *gConChain; STATIC void client_upd_cleanup (void *arg) { if(!gConListInit) return; pthread_mutex_lock(&gConListLock); memset(&gConList, 0, sizeof(gConList)); gConListInit=0; pthread_mutex_unlock(&gConListLock); } // client_upd_cleanup void *client_upd_process(void *arg) { #ifdef RT #if 1 if (setpriority (PRIO_PROCESS, 0, -15) == -1) #else if (pthread_setschedprio (p->recv_ts_thread, -15)) #endif { dbg ("Cannot raise priority to -15\n"); } #endif unsigned char buff[MAX_BUFF_SIZE]; socklen_t from_len = sizeof (struct sockaddr_storage); struct sockaddr_storage from_local; struct pollfd fds[MAX_CON_LIST]; pthread_cleanup_push (client_upd_cleanup, 0); int max_fd=0; while(1) { UDPContext *e; pthread_mutex_lock(&gConListLock); if(gConListModified) { gConListModified=0; max_fd=0; for(e=gConChain;e;e=e->next) { fds[max_fd].fd = e->udp_fd; fds[max_fd].events = POLLIN; fds[max_fd].revents = 0; e->pfd = &fds[max_fd]; max_fd++; } // for } // if pthread_mutex_unlock(&gConListLock); int rs = poll(fds, max_fd, 1000); if(rs>0) { pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL); pthread_mutex_lock(&gConListLock); for(e=gConChain;e;e=e->next) { if(e->pfd && (e->pfd->revents & POLLIN)) { if(e->cb) { int ret = recvfrom (e->udp_fd, (char *)buff, MAX_BUFF_SIZE, 0, 0, 0/*(struct sockaddr *) &from_local, &from_len*/); if(ret>0) e->cb(buff, ret, e->arg); } else if(e->buff && !e->bufflen) { pthread_mutex_lock(&e->bufflock); int ret = recvfrom (e->udp_fd, (char *)e->buff, e->buffmax, 0, (struct sockaddr *) &from_local, &from_len); if(ret>0) e->bufflen = ret; pthread_mutex_unlock(&e->bufflock); } // if } // if } // for pthread_mutex_unlock(&gConListLock); pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL); } // if pthread_testcancel(); } // while pthread_cleanup_pop (1); return NULL; } static int client_upd_init() { pthread_mutex_lock(&gConListLock); if(gConListInit) { pthread_mutex_unlock(&gConListLock); return 1; } // if memset(&gConList, 0, sizeof(gConList)); gConListModified = 0; gConChain = NULL; pthread_t client_upd_thread; if(0==pthread_create (&client_upd_thread, NULL, client_upd_process, 0)) { gConListInit = 1; pthread_detach(client_upd_thread); } // if pthread_mutex_unlock(&gConListLock); return gConListInit; } // client_upd_init UDPContext *client_udp_open_buff (const struct in6_addr *mcg, int port, const char *ifname, int buff_size) { UDPContext *ret = client_udp_open_cb (mcg, port, ifname, 0, 0); if(ret) { ret->buff = (unsigned char *)malloc(buff_size); ret->buffmax = buff_size; ret->bufflen = 0; if (!ret->buff) { err ("client_udp_open_buff: out of memory\n"); } } // if return ret; } // client_udp_open_buff UDPContext *client_udp_open_cb (const struct in6_addr *mcg, int port, const char *ifname, client_udp_cb cb, void *arg) { if(!client_upd_init()) return NULL; UDPContext *s; int recvfd = -1; int n; pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL); s = (UDPContext *) calloc (1, sizeof (UDPContext)); if (!s) { err ("Cannot allocate memory !\n"); goto error; } struct sockaddr_in6 *addr = (struct sockaddr_in6 *) &s->dest_addr; #ifndef WIN32 addr->sin6_addr=*mcg; #else struct in6_addr any=IN6ADDR_ANY_INIT; addr->sin6_addr=any; #endif addr->sin6_family = AF_INET6; addr->sin6_port = htons (port); s->dest_addr_len = sizeof (struct sockaddr_in6); recvfd = socket (PF_INET6, SOCK_DGRAM, IPPROTO_UDP); if (recvfd < 0) { err ("cannot get socket\n"); } #ifdef WIN32 # ifndef IPV6_PROTECTION_LEVEL # define IPV6_PROTECTION_LEVEL 23 # endif n = 10 /*PROTECTION_LEVEL_UNRESTRICTED*/; if(setsockopt( recvfd, IPPROTO_IPV6, IPV6_PROTECTION_LEVEL, (_SOTYPE)&n, sizeof(n) ) < 0 ) { warn ("setsockopt IPV6_PROTECTION_LEVEL\n"); } #endif n = 1; if (setsockopt (recvfd, SOL_SOCKET, SO_REUSEADDR, (_SOTYPE)&n, sizeof (n)) < 0) { warn ("setsockopt REUSEADDR\n"); } #if ! (defined WIN32 || defined APPLE) if (ifname && strlen (ifname) && setsockopt (recvfd, SOL_SOCKET, SO_BINDTODEVICE, ifname, strlen (ifname) + 1)) { dbg ("setsockopt SO_BINDTODEVICE %s failed\n", ifname); } #endif if (bind (recvfd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len) < 0) { warn ("bind failed\n"); goto error; } #ifdef WIN32 addr->sin6_addr=*mcg; #endif if (udp_ipv6_is_multicast_address ((struct sockaddr *) &s->dest_addr)) { #if 0 if (ifname && strlen (ifname) && (mcast_set_if (recvfd, ifname, 0) < 0)) { warn ("mcast_set_if error \n"); goto error; } #endif if (ifname) { if ((s->idx = if_nametoindex (ifname)) == 0) { s->idx = 0; } else { dbg("Selecting interface %s (%d)", ifname, s->idx); } } else { s->idx = 0; } if (udp_ipv6_join_multicast_group (recvfd, s->idx, (struct sockaddr *) &s->dest_addr) < 0) { warn ("Cannot join multicast group !\n"); goto error; } s->is_multicast = 1; } n = cb ? UDP_PID_BUF_SIZE : UDP_RX_BUF_SIZE; if (setsockopt (recvfd, SOL_SOCKET, SO_RCVBUF, (_SOTYPE)&n, sizeof (n)) < 0) { warn ("setsockopt rcvbuf"); goto error; } s->udp_fd = recvfd; s->local_port = port; pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL); s->cb = cb; s->arg = arg; pthread_mutex_init(&s->bufflock, NULL); int i; pthread_mutex_lock(&gConListLock); for(i=0;inext=gConChain; gConChain=s; break; } // if } // for pthread_mutex_unlock(&gConListLock); if(i>=MAX_CON_LIST) warn("---------------------------------------------No slot found!\n"); return s; error: warn ("socket error !\n"); if (s) { free (s); } pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL); return NULL; } UDPContext *client_udp_open_host_buff (const char *host, int port, const char *ifname, int buff_size) { struct in6_addr addr; inet_pton (AF_INET6, host, &addr); return client_udp_open_buff (&addr, port, ifname, buff_size); } //-------------------------------------------------------------------------------------------------------------------------------------------- int udp_read_buff (UDPContext * s, uint8_t * buf, int size, int timeout, struct sockaddr_storage *from) { pthread_mutex_lock(&s->bufflock); int ret = s->bufflen>size ? size : s->bufflen; if(ret>0) { memcpy(buf, s->buff, ret); s->bufflen-=ret; } pthread_mutex_unlock(&s->bufflock); return ret; } //---------------------------------------------------------------------------------------------------------------------------------------------------- int udp_close_buff (UDPContext * s) { int i; pthread_mutex_lock(&gConListLock); for(i=0;inext) { if(*e == s) { *e=(*e)->next; break; } } break; } } pthread_mutex_unlock(&gConListLock); if (s->is_multicast) udp_ipv6_leave_multicast_group (s->udp_fd, s->idx, (struct sockaddr *) &s->dest_addr); closesocket (s->udp_fd); free(s->buff); pthread_mutex_destroy(&s->bufflock); free (s); return 0; } #endif