-
Notifications
You must be signed in to change notification settings - Fork 439
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mongodb transport #430
Mongodb transport #430
Conversation
docker-compose.yml
Outdated
@@ -44,6 +45,7 @@ services: | |||
- RDKAFKA_PORT=9092 | |||
- PUBSUB_EMULATOR_HOST=http://google-pubsub:8085 | |||
- GCLOUD_PROJECT=mqdev | |||
- MONGO_CONNECTION_STRING=mongodb://127.0.0.1/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to MONGO_DSN
*/ | ||
private $driverOptions; | ||
|
||
public function __construct($uri = 'mongodb://127.0.0.1/', array $config = [], array $uriOptions = [], array $driverOptions = []) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
constructor must accept one argument, an array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
t should have uri, uriOptions, driverOptions keys and other options
pkg/mongodb/MongodbConsumer.php
Outdated
} | ||
|
||
return $convertedMessage; | ||
} catch (\Exception $e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for catch
pkg/mongodb/MongodbConsumer.php
Outdated
try { | ||
$now = time(); | ||
$collection = $this->context->getCollection(); | ||
$message = $collection->findOne(['$or' => [['delayed_until' => ['$exists' => false]], ['delayed_until' => ['$lte' => $now]]]], ['sort' => ['priority' => -1]]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the line is too long, could you split it into several lines??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pkg/mongodb/MongodbConsumer.php
Outdated
if (!$message) { | ||
return null; | ||
} | ||
$mongodbMessage = $message->getArrayCopy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pkg/mongodb/MongodbConsumer.php
Outdated
} | ||
|
||
if ($mongodbMessage['properties']) { | ||
$message->setProperties(JSON::decode($mongodbMessage['properties'])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO there is no need to encode properties nor headers to json, we could store an array in mongodb.
pkg/mongodb/MongodbConsumer.php
Outdated
*/ | ||
protected function convertMessage(array $mongodbMessage) | ||
{ | ||
$message = $this->context->createMessage(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the createMessage method accept body, headers, and properties as arguments. pass them
/** | ||
* @return array | ||
*/ | ||
public function getConfig() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's not make it public for now.
pkg/mongodb/MongodbProducer.php
Outdated
$collection = $this->context->getCollection(); | ||
$collection->insertOne($mongoMessage); | ||
} catch (\Exception $e) { | ||
throw new Exception('The transport fails to send the message due to some internal error.', null, $e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The transport has failed to send the message due to some internal error.
pkg/mongodb/composer.json
Outdated
"require": { | ||
"php": ">=5.6", | ||
"queue-interop/queue-interop": "^0.6@dev|^1.0.0-alpha1", | ||
"php-http/client-common": "^1.7@dev", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think we needed, please remove
composer.json
Outdated
@@ -41,7 +42,8 @@ | |||
"symfony/console": "^2.8|^3|^4", | |||
"friendsofphp/php-cs-fixer": "^2", | |||
"empi89/php-amqp-stubs": "*@dev", | |||
"php-http/client-common": "^1.7@dev" | |||
"php-http/client-common": "^1.7@dev", | |||
"mongodb/mongodb": "^1.3" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could be remove as you required it in enqueue/mongodb
docker-compose.yml
Outdated
@@ -44,6 +45,7 @@ services: | |||
- RDKAFKA_PORT=9092 | |||
- PUBSUB_EMULATOR_HOST=http://google-pubsub:8085 | |||
- GCLOUD_PROJECT=mqdev | |||
- MONGO_DSN=mongodb://127.0.0.1/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be mongodb://mongo/
docker-compose.yml
Outdated
mongo: | ||
image: mongo | ||
ports: | ||
- "27017:27017" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add an empty line after the service definition.
pkg/mongodb/MongodbConsumer.php
Outdated
{ | ||
$now = time(); | ||
$collection = $this->context->getCollection(); | ||
$message = $collection->findOneAndDelete(['$or' => [['delayed_until' => ['$exists' => false]], ['delayed_until' => ['$lte' => $now]]]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please split into several lines.
pkg/mongodb/MongodbConsumer.php
Outdated
protected function convertMessage(array $mongodbMessage) | ||
{ | ||
$message = $this->context->createMessage($mongodbMessage['body'], $mongodbMessage['properties'], $mongodbMessage['headers']); | ||
$message->setId($mongodbMessage['_id']->__toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(string) $mongodbMessage['_id']
pkg/mongodb/MongodbContext.php
Outdated
|
||
public function getCollection() | ||
{ | ||
return $this->client->selectDatabase($this->config['dbname'])->selectCollection($this->config['collection_name']); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<?php
return $this->client
->selectDatabase($this->config['dbname'])
->selectCollection($this->config['collection_name'])
;
$mongoMessage = [ | ||
'published_at' => $publishedAt, | ||
'body' => $body, | ||
'headers' => JSON::encode($message->getHeaders()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to json encode
pkg/mongodb/composer.json
Outdated
"homepage": "https://enqueue.forma-pro.com/", | ||
"license": "MIT", | ||
"require": { | ||
"php": ">=5.6", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^7.1
*/ | ||
private $config; | ||
|
||
public function __construct($config = 'mongodb:') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you please add a docblcok with the information on possible configuration options
$this->markTestSkipped('The MONGO_DSN env is not available. Skip tests'); | ||
} | ||
$params = ['uri' => $env]; | ||
$factory = new MongodbConnectionFactory($params); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<?php
$factory = new MongodbConnectionFactory(['uri' => $env]);
)); | ||
} | ||
|
||
if ($parsedUrl['scheme'].':' === $dsn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hardcode string mongodb:
|
||
if ($parsedUrl['scheme'].':' === $dsn) { | ||
return [ | ||
'uri' => $parsedUrl['scheme'].'://127.0.0.1/', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
$config['uri'] = $parsedUrl['scheme'].'://'.$parsedUrl['host']; | ||
|
||
if (isset($parsedUrl['path']) && '/' !== $parsedUrl['path']) { | ||
$pathArr = explode('/', $parsedUrl['path']); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename the var to pathParts
]; | ||
} | ||
|
||
$config['uri'] = $parsedUrl['scheme'].'://'.$parsedUrl['host']; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not correct, the DSN might contain user, pasword, port, other options in it,
I think it would be good to store the orignal url as is is
} | ||
|
||
if (isset($parsedUrl['query'])) { | ||
parse_str($parsedUrl['query'], $queryParts); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use of undefined var, define $queryParts = null
just before this line
parse_str($parsedUrl['query'], $queryParts); | ||
|
||
//get enqueue attributes values | ||
if (isset($queryParts['polling_interval']) && !empty($queryParts['polling_interval'])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no need for isset()
9f620b5
to
85e4b9d
Compare
85e4b9d
to
07ed3ba
Compare
$factory = new Definition(MongodbConnectionFactory::class); | ||
$factory->setArguments([$config]); | ||
} else { | ||
throw new \LogicException('Set "dsn" options when you want ot use Mongodb.'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of this add required to ->scalarNode('dsn')
{ | ||
if (false == empty($config['dsn'])) { | ||
$factory = new Definition(MongodbConnectionFactory::class); | ||
$factory->setArguments([$config]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as far as I remember Definition takes an array of arguments as a second constructor argument.
throw new \LogicException('The config must be either an array of options, a DSN string or null'); | ||
} | ||
$config = array_replace([ | ||
'uri' => 'mongodb://127.0.0.1/', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add dbname and a collection here as well
pkg/mongodb/MongodbProducer.php
Outdated
'headers' => $message->getHeaders(), | ||
'properties' => $message->getProperties(), | ||
'priority' => $message->getPriority(), | ||
'queue' => $destination->getQueueName(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of getQueueName method is a bit misleading as we can send a message to the topic too.
Could you please add a neutral getName method to the destination and use it everywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added getName() alias for getQueueName() method. getQueueName() method should be implemented from PsrQueue interface.
$timeout /= 1000; | ||
$startAt = microtime(true); | ||
|
||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's try capped collection plus tailable cursor approach.
https://gist.github.com/ASKozienko/cf45b09c68774ed83dea67b6d02d5846
pkg/mongodb/MongodbProducer.php
Outdated
InvalidDestinationException::assertDestinationInstanceOf($destination, MongodbDestination::class); | ||
InvalidMessageException::assertMessageInstanceOf($message, MongodbMessage::class); | ||
|
||
if (null !== $this->priority && 0 === $message->getPriority()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be if (null !== $this->priority && null === $message->getPriority()) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and set priority in the message to null by default (just unset).
Mongodb transport (ref #284)