Changeset 1684 for trunk/pdns/pdns/pdns_recursor.cc
- Timestamp:
- 08/08/10 21:59:19 (3 years ago)
- Files:
-
- 1 modified
-
trunk/pdns/pdns/pdns_recursor.cc (modified) (11 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/pdns/pdns/pdns_recursor.cc
r1678 r1684 100 100 bool g_quiet; 101 101 102 bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets 103 102 104 static __thread NetmaskGroup* t_allowFrom; 103 105 static NetmaskGroup* g_initialAllowFrom; // new thread needs to be setup with this … … 811 813 } 812 814 815 string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, int fd) 816 { 817 ++g_stats.qcounter; 818 819 string response; 820 try { 821 uint32_t age; 822 if(!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(question, g_now.tv_sec, &response, &age)) { 823 if(!g_quiet) 824 L<<Logger::Error<<t_id<< " question answered from packet cache from "<<fromaddr.toString()<<endl; 825 826 g_stats.packetCacheHits++; 827 SyncRes::s_queries++; 828 ageDNSPacket(response, age); 829 sendto(fd, response.c_str(), response.length(), 0, (struct sockaddr*) &fromaddr, fromaddr.getSocklen()); 830 if(response.length() >= sizeof(struct dnsheader)) 831 updateRcodeStats(((struct dnsheader*)response.c_str())->rcode); 832 g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0); // we assume 0 usec 833 return 0; 834 } 835 } 836 catch(std::exception& e) { 837 L<<Logger::Error<<"Error processing or aging answer packet: "<<e.what()<<endl; 838 return 0; 839 } 840 841 842 if(MT->numProcesses() > g_maxMThreads) { 843 g_stats.overCapacityDrops++; 844 return 0; 845 } 846 847 DNSComboWriter* dc = new DNSComboWriter(question.c_str(), question.size(), g_now); 848 dc->setSocket(fd); 849 dc->setRemote(&fromaddr); 850 851 dc->d_tcp=false; 852 MT->makeThread(startDoResolve, (void*) dc); // deletes dc 853 return 0; 854 } 855 813 856 void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var) 814 857 { … … 836 879 } 837 880 else { 838 ++g_stats.qcounter; 839 840 string response; 841 try { 842 uint32_t age; 843 if(!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(string(data, len), g_now.tv_sec, &response, &age)) { 844 if(!g_quiet) 845 L<<Logger::Error<<t_id<< " question answered from packet cache from "<<fromaddr.toString()<<endl; 846 847 g_stats.packetCacheHits++; 848 SyncRes::s_queries++; 849 ageDNSPacket(response, age); 850 sendto(fd, response.c_str(), response.length(), 0, (struct sockaddr*) &fromaddr, fromaddr.getSocklen()); 851 if(response.length() >= sizeof(struct dnsheader)) 852 updateRcodeStats(((struct dnsheader*)response.c_str())->rcode); 853 g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0); // we assume 0 usec 854 return; 855 } 856 } 857 catch(std::exception& e) { 858 throw MOADNSException(e.what()); // translate 859 } 860 if(MT->numProcesses() > g_maxMThreads) { 861 g_stats.overCapacityDrops++; 862 return; 863 } 864 865 DNSComboWriter* dc = new DNSComboWriter(data, len, g_now); 866 dc->setSocket(fd); 867 dc->setRemote(&fromaddr); 868 869 dc->d_tcp=false; 870 871 MT->makeThread(startDoResolve, (void*) dc); // deletes dc 881 string question(data, len); 882 if(g_weDistributeQueries) 883 distributeAsyncFunction(boost::bind(doProcessUDPQuestion, question, fromaddr, fd)); 884 else 885 doProcessUDPQuestion(question, fromaddr, fd); 872 886 } 873 887 } … … 884 898 } 885 899 } 900 886 901 887 902 typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t; … … 1199 1214 { 1200 1215 static unsigned int counter; 1201 unsigned int target = ++counter % g_pipes.size();1216 unsigned int target = 1 + (++counter % (g_pipes.size()-1)); 1202 1217 // cerr<<"Sending to: "<<target<<endl; 1203 1218 if(target == t_id) { … … 1644 1659 1645 1660 g_quiet=::arg().mustDo("quiet"); 1661 g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries"); 1662 if(g_weDistributeQueries) { 1663 L<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl; 1664 } 1665 1646 1666 if(::arg().mustDo("trace")) { 1647 1667 SyncRes::setLog(true); … … 1698 1718 makeTCPServerSockets(); 1699 1719 1720 for(int forks = 0; forks < ::arg().asNum("processes") - 1; ++forks) { 1721 if(!fork()) // we are child 1722 break; 1723 } 1724 1700 1725 s_pidfname=::arg()["socket-dir"]+"/"+s_programname+".pid"; 1701 1726 if(!s_pidfname.empty()) … … 1732 1757 1733 1758 1734 g_numThreads = ::arg().asNum("threads") ;1759 g_numThreads = ::arg().asNum("threads") + ::arg().mustDo("pdns-distributes-queries"); 1735 1760 1736 1761 makeThreadPipes(); … … 1805 1830 t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest); 1806 1831 1807 for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i) 1808 t_fdm->addReadFD(i->first, i->second); 1832 if(!g_weDistributeQueries || !t_id) // if we distribute queries, only t_id = 0 listens 1833 for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i) 1834 t_fdm->addReadFD(i->first, i->second); 1809 1835 1810 1836 if(!t_id) { 1811 1812 1837 t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel 1813 1838 } 1814 1839 1815 1840 unsigned int maxTcpClients=::arg().asNum("max-tcp-clients"); 1816 1841 … … 1928 1953 ::arg().set("network-timeout", "Wait this nummer of milliseconds for network i/o")="1500"; 1929 1954 ::arg().set("threads", "Launch this number of threads")="2"; 1955 ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1"; 1930 1956 #ifdef WIN32 1931 1957 ::arg().set("quiet","Suppress logging of questions and answers")="off"; … … 1983 2009 ::arg().setSwitch( "disable-edns-ping", "Disable EDNSPing" )= "no"; 1984 2010 ::arg().setSwitch( "disable-edns", "Disable EDNS" )= ""; 1985 ::arg().setSwitch( "disable-packetcache", "Disable packetcahe" )= "no"; 2011 ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no"; 2012 ::arg().setSwitch( "pdns-distributes-queries", "If PowerDNS itself should distribute queries over threads (EXPERIMENTAL)")="no"; 2013 1986 2014 1987 2015 ::arg().setCmd("help","Provide a helpful message");