File "FsConsumer.php"
Full Path: /home/aiclgcwq/photonindustriespvt.com/wp-content/plugins/tenweb-speed-optimizer/vendor/enqueue/fs/FsConsumer.php
File size: 5.54 KB
MIME-type: text/x-php
Charset: utf-8
<?php
declare(strict_types=1);
namespace Enqueue\Fs;
use Interop\Queue\Consumer;
use Interop\Queue\Exception\InvalidMessageException;
use Interop\Queue\Message;
use Interop\Queue\Queue;
class FsConsumer implements Consumer
{
/**
* @var FsDestination
*/
private $destination;
/**
* @var FsContext
*/
private $context;
/**
* @var int
*/
private $preFetchCount;
/**
* @var FsMessage[]
*/
private $preFetchedMessages;
/**
* In milliseconds.
*
* @var int
*/
private $pollingInterval = 100;
public function __construct(FsContext $context, FsDestination $destination, int $preFetchCount)
{
$this->context = $context;
$this->destination = $destination;
$this->preFetchCount = $preFetchCount;
$this->preFetchedMessages = [];
}
/**
* Set polling interval in milliseconds.
*/
public function setPollingInterval(int $msec): void
{
$this->pollingInterval = $msec;
}
/**
* Get polling interval in milliseconds.
*/
public function getPollingInterval(): int
{
return $this->pollingInterval;
}
/**
* @return FsDestination
*/
public function getQueue(): Queue
{
return $this->destination;
}
/**
* @return FsMessage
*/
public function receive(int $timeout = 0): ?Message
{
$timeout /= 1000;
$startAt = microtime(true);
while (true) {
$message = $this->receiveNoWait();
if ($message) {
return $message;
}
if ($timeout && (microtime(true) - $startAt) >= $timeout) {
return null;
}
usleep($this->pollingInterval * 1000);
if ($timeout && (microtime(true) - $startAt) >= $timeout) {
return null;
}
}
}
/**
* @return FsMessage
*/
public function receiveNoWait(): ?Message
{
if ($this->preFetchedMessages) {
return array_shift($this->preFetchedMessages);
}
$this->context->workWithFile($this->destination, 'c+', function (FsDestination $destination, $file) {
$count = $this->preFetchCount;
while ($count) {
$frame = $this->readFrame($file, 1);
//guards
if ($frame && false == ('|' == $frame[0] || ' ' == $frame[0])) {
throw new \LogicException(sprintf('The frame could start from either " " or "|". The malformed frame starts with "%s".', $frame[0]));
}
if (0 !== $reminder = strlen($frame) % 64) {
throw new \LogicException(sprintf('The frame size is "%d" and it must divide exactly to 64 but it leaves a reminder "%d".', strlen($frame), $reminder));
}
ftruncate($file, fstat($file)['size'] - strlen($frame));
rewind($file);
$rawMessage = str_replace('\|\{', '|{', $frame);
$rawMessage = substr(trim($rawMessage), 1);
if ($rawMessage) {
try {
$fetchedMessage = FsMessage::jsonUnserialize($rawMessage);
$expireAt = $fetchedMessage->getHeader('x-expire-at');
if ($expireAt && $expireAt - microtime(true) < 0) {
// message has expired, just drop it.
return null;
}
$this->preFetchedMessages[] = $fetchedMessage;
} catch (\Exception $e) {
throw new \LogicException(sprintf("Cannot decode json message '%s'", $rawMessage), 0, $e);
}
} else {
return null;
}
--$count;
}
});
if ($this->preFetchedMessages) {
return array_shift($this->preFetchedMessages);
}
return null;
}
public function acknowledge(Message $message): void
{
// do nothing. fs transport always works in auto ack mode
}
public function reject(Message $message, bool $requeue = false): void
{
InvalidMessageException::assertMessageInstanceOf($message, FsMessage::class);
// do nothing on reject. fs transport always works in auto ack mode
if ($requeue) {
$this->context->createProducer()->send($this->destination, $message);
}
}
public function getPreFetchCount(): int
{
return $this->preFetchCount;
}
public function setPreFetchCount(int $preFetchCount): void
{
$this->preFetchCount = $preFetchCount;
}
/**
* @param resource $file
*/
private function readFrame($file, int $frameNumber): string
{
$frameSize = 64;
$offset = $frameNumber * $frameSize;
fseek($file, -$offset, SEEK_END);
$frame = fread($file, $frameSize);
if ('' == $frame) {
return '';
}
if (false !== strpos($frame, '|{')) {
return $frame;
}
$previousFrame = $this->readFrame($file, $frameNumber + 1);
if ('|' === substr($previousFrame, -1) && '{' === $frame[0]) {
$matched = [];
if (false === preg_match('/\ *?\|$/', $previousFrame, $matched)) {
throw new \LogicException('Something went completely wrong.');
}
return $matched[0].$frame;
}
return $previousFrame.$frame;
}
}