43 $this->_replacementFactory = $replacementFactory;
55 $this->_params = $params;
56 switch ($params[
'type']) {
57 case self::TYPE_PROCESS:
60 case self::TYPE_SOCKET:
75 if (isset($this->_stream)) {
79 stream_set_timeout($this->_stream, $value);
85 stream_set_blocking($this->_stream, 1);
90 $this->_params[$param] = $value;
95 return stream_socket_enable_crypto($this->_stream,
true, STREAM_CRYPTO_METHOD_TLS_CLIENT);
103 if (isset($this->_stream)) {
104 switch ($this->_params[
'type']) {
105 case self::TYPE_PROCESS:
108 proc_close($this->_stream);
110 case self::TYPE_SOCKET:
112 fclose($this->_stream);
116 $this->_stream = null;
131 foreach ($this->_translations as $search => $replace) {
132 if (!isset($replacements[$search])) {
134 unset($this->_translations[$search]);
138 foreach ($replacements as $search => $replace) {
139 if (!isset($this->_translations[$search])) {
141 $this->_replacementFactory->createFilter($search, $replace), $search
143 $this->_translations[$search] =
true;
162 if (isset($this->_out) && !feof($this->_out)) {
163 $line = fgets($this->_out);
164 if (strlen($line) == 0) {
165 $metas = stream_get_meta_data($this->_out);
166 if ($metas[
'timed_out']) {
194 if (isset($this->_out) && !feof($this->_out)) {
195 $ret = fread($this->_out, $length);
196 if (strlen($ret) == 0) {
197 $metas = stream_get_meta_data($this->_out);
198 if ($metas[
'timed_out']) {
219 if (isset($this->_in)) {
227 if (isset($this->_in)) {
228 $bytesToWrite = strlen($bytes);
229 $totalBytesWritten = 0;
231 while ($totalBytesWritten < $bytesToWrite) {
232 $bytesWritten = fwrite($this->_in, substr($bytes, $totalBytesWritten));
233 if (
false === $bytesWritten || 0 === $bytesWritten) {
237 $totalBytesWritten += $bytesWritten;
240 if ($totalBytesWritten > 0) {
251 $host = $this->_params[
'host'];
252 if (!empty($this->_params[
'protocol'])) {
253 $host = $this->_params[
'protocol'].
'://'.$host;
256 if (!empty($this->_params[
'timeout'])) {
257 $timeout = $this->_params[
'timeout'];
260 if (!empty($this->_params[
'sourceIp'])) {
261 $options[
'socket'][
'bindto'] = $this->_params[
'sourceIp'].
':0';
263 $this->_stream = @stream_socket_client($host.
':'.$this->_params[
'port'], $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT, stream_context_create($options));
264 if (
false === $this->_stream) {
266 'Connection could not be established with host '.$this->_params[
'host'].
267 ' ['.$errstr.
' #'.$errno.
']'
270 if (!empty($this->_params[
'blocking'])) {
271 stream_set_blocking($this->_stream, 1);
273 stream_set_blocking($this->_stream, 0);
275 stream_set_timeout($this->_stream, $timeout);
285 $command = $this->_params[
'command'];
286 $descriptorSpec = array(
287 0 => array(
'pipe',
'r'),
288 1 => array(
'pipe',
'w'),
289 2 => array(
'pipe',
'w'),
291 $this->_stream = proc_open($command, $descriptorSpec, $pipes);
292 stream_set_blocking($pipes[2], 0);
293 if ($err = stream_get_contents($pipes[2])) {
295 'Process could not be started ['.$err.
']'
298 $this->_in = &$pipes[0];
299 $this->_out = &$pipes[1];
304 switch ($this->_params[
'type']) {
305 case self::TYPE_PROCESS:
306 return 'Process '.$this->_params[
'command'];
309 case self::TYPE_SOCKET:
311 $host = $this->_params[
'host'];
312 if (!empty($this->_params[
'protocol'])) {
313 $host = $this->_params[
'protocol'].
'://'.$host;
315 $host .=
':'.$this->_params[
'port'];