1: <?php
2: /**
3: * This file is part of GameQ.
4: *
5: * GameQ is free software; you can redistribute it and/or modify
6: * it under the terms of the GNU Lesser General Public License as published by
7: * the Free Software Foundation; either version 3 of the License, or
8: * (at your option) any later version.
9: *
10: * GameQ is distributed in the hope that it will be useful,
11: * but WITHOUT ANY WARRANTY; without even the implied warranty of
12: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13: * GNU Lesser General Public License for more details.
14: *
15: * You should have received a copy of the GNU Lesser General Public License
16: * along with this program. If not, see <http://www.gnu.org/licenses/>.
17: */
18:
19: namespace GameQ\Query;
20:
21: use GameQ\Exception\Query as Exception;
22:
23: /**
24: * Native way of querying servers
25: *
26: * @author Austin Bischoff <austin@codebeard.com>
27: */
28: class Native extends Core
29: {
30: /**
31: * Get the current socket or create one and return
32: *
33: * @return resource|null
34: * @throws \GameQ\Exception\Query
35: */
36: public function get()
37: {
38: // No socket for this server, make one
39: if (is_null($this->socket)) {
40: $this->create();
41: }
42:
43: return $this->socket;
44: }
45:
46: /**
47: * Write data to the socket
48: *
49: * @param string $data
50: *
51: * @return int The number of bytes written
52: * @throws \GameQ\Exception\Query
53: */
54: public function write($data)
55: {
56: try {
57: // No socket for this server, make one
58: if (is_null($this->socket)) {
59: $this->create();
60: }
61:
62: // Send the packet
63: return fwrite($this->socket, $data);
64: } catch (\Exception $e) {
65: throw new Exception($e->getMessage(), $e->getCode(), $e);
66: }
67: }
68:
69: /**
70: * Close the current socket
71: */
72: public function close()
73: {
74: if ($this->socket) {
75: fclose($this->socket);
76: $this->socket = null;
77: }
78: }
79:
80: /**
81: * Create a new socket for this query
82: *
83: * @throws \GameQ\Exception\Query
84: */
85: protected function create()
86: {
87: // Create the remote address
88: $remote_addr = sprintf("%s://%s:%d", $this->transport, $this->ip, $this->port);
89:
90: // Create context
91: $context = stream_context_create([
92: 'socket' => [
93: 'bindto' => '0:0', // Bind to any available IP and OS decided port
94: ],
95: ]);
96:
97: // Define these first
98: $errno = null;
99: $errstr = null;
100:
101: // Create the socket
102: if (($this->socket =
103: @stream_socket_client($remote_addr, $errno, $errstr, $this->timeout, STREAM_CLIENT_CONNECT, $context))
104: !== false
105: ) {
106: // Set the read timeout on the streams
107: stream_set_timeout($this->socket, $this->timeout);
108:
109: // Set blocking mode
110: stream_set_blocking($this->socket, $this->blocking);
111:
112: // Set the read buffer
113: stream_set_read_buffer($this->socket, 0);
114:
115: // Set the write buffer
116: stream_set_write_buffer($this->socket, 0);
117: } else {
118: // Reset socket
119: $this->socket = null;
120:
121: // Something bad happened, throw query exception
122: throw new Exception(
123: __METHOD__ . " - Error creating socket to server {$this->ip}:{$this->port}. Error: " . $errstr,
124: $errno
125: );
126: }
127: }
128:
129: /**
130: * Pull the responses out of the stream
131: *
132: * @SuppressWarnings(PHPMD.CyclomaticComplexity)
133: * @SuppressWarnings(PHPMD.NPathComplexity)
134: *
135: * @param array $sockets
136: * @param int $timeout
137: * @param int $stream_timeout
138: *
139: * @return array Raw responses
140: */
141: public function getResponses(array $sockets, $timeout, $stream_timeout)
142: {
143: // Set the loop to active
144: $loop_active = true;
145:
146: // Will hold the responses read from the sockets
147: $responses = [];
148:
149: // To store the sockets
150: $sockets_tmp = [];
151:
152: // Loop and pull out all the actual sockets we need to listen on
153: foreach ($sockets as $socket_id => $socket_data) {
154: // Get the socket
155: // @var $socket \GameQ\Query\Core
156: $socket = $socket_data['socket'];
157:
158: // Append the actual socket we are listening to
159: $sockets_tmp[$socket_id] = $socket->get();
160:
161: unset($socket);
162: }
163:
164: // Init some variables
165: $read = $sockets_tmp;
166: $write = null;
167: $except = null;
168:
169: // Check to see if $read is empty, if so stream_select() will throw a warning
170: if (empty($read)) {
171: return $responses;
172: }
173:
174: // This is when it should stop
175: $time_stop = microtime(true) + $timeout;
176:
177: // Let's loop until we break something.
178: while ($loop_active && microtime(true) < $time_stop) {
179: // Check to make sure $read is not empty, if so we are done
180: if (empty($read)) {
181: break;
182: }
183:
184: // Now lets listen for some streams, but do not cross the streams!
185: $streams = stream_select($read, $write, $except, 0, $stream_timeout);
186:
187: // We had error or no streams left, kill the loop
188: if ($streams === false || ($streams <= 0)) {
189: break;
190: }
191:
192: // Loop the sockets that received data back
193: foreach ($read as $socket) {
194: // @var $socket resource
195:
196: // See if we have a response
197: if (($response = fread($socket, 32768)) === false) {
198: continue; // No response yet so lets continue.
199: }
200:
201: // Check to see if the response is empty, if so we are done with this server
202: if (strlen($response) == 0) {
203: // Remove this server from any future read loops
204: unset($sockets_tmp[(int)$socket]);
205: continue;
206: }
207:
208: // Add the response we got back
209: $responses[(int)$socket][] = $response;
210: }
211:
212: // Because stream_select modifies read we need to reset it each time to the original array of sockets
213: $read = $sockets_tmp;
214: }
215:
216: // Free up some memory
217: unset($streams, $read, $write, $except, $sockets_tmp, $time_stop, $response);
218:
219: // Return all of the responses, may be empty if something went wrong
220: return $responses;
221: }
222: }
223: