@@ -73,7 +73,7 @@ public function size($queue = null)
73
73
{
74
74
$ attributes = $ this ->getQueue ($ queue )->get_attributes ();
75
75
76
- return (int ) $ attributes ->activeMsgNum ;
76
+ return (int )$ attributes ->activeMsgNum ;
77
77
}
78
78
79
79
/**
@@ -106,7 +106,7 @@ public function push($job, $data = '', $queue = null)
106
106
*
107
107
* @param string $payload
108
108
* @param string $queue
109
- * @param array $options
109
+ * @param array $options
110
110
*
111
111
* @return mixed
112
112
*/
@@ -117,40 +117,48 @@ public function pushRaw($payload, $queue = null, array $options = [])
117
117
$ driver = $ this ->parseQueue ($ queue );
118
118
119
119
if ($ driver instanceof Topic) {
120
- $ vTagList = [];
121
- if ($ this ->topicOptions ['filter ' ] === self ::CMQ_TOPIC_TAG_FILTER_NAME ) {
122
- $ vTagList = explode (', ' , $ queue );
120
+ switch ($ this ->topicOptions ['filter ' ]) {
121
+ case self ::CMQ_TOPIC_TAG_FILTER_NAME :
122
+ return $ driver ->publish_message ($ message ->msgBody , explode (', ' , $ queue ), null );
123
+ case self ::CMQ_TOPIC_ROUTING_FILTER_NAME :
124
+ return $ driver ->publish_message ($ message ->msgBody , [], $ queue );
125
+ default :
126
+ throw new \InvalidArgumentException (
127
+ 'Invalid CMQ topic filter: ' . $ this ->topicOptions ['filter ' ]
128
+ );
123
129
}
124
-
125
- $ routingKey = null ;
126
- if ($ this ->topicOptions ['filter ' ] === self ::CMQ_TOPIC_ROUTING_FILTER_NAME ) {
127
- $ routingKey = $ queue ;
128
- }
129
-
130
- return $ driver ->publish_message ($ message ->msgBody , $ vTagList , $ routingKey );
131
130
}
132
131
133
- return $ driver ->send_message ($ message , array_get ($ options , 'delay ' , 0 ));
132
+ return $ driver ->send_message ($ message , Arr:: get ($ options , 'delay ' , 0 ));
134
133
}
135
134
136
135
/**
137
136
* Push a new job onto the queue after a delay.
138
137
*
139
138
* @param \DateTimeInterface|\DateInterval|int $delay
140
- * @param string|object $job
141
- * @param mixed $data
142
- * @param string $queue
139
+ * @param string|object $job
140
+ * @param mixed $data
141
+ * @param string $queue
143
142
*
144
143
* @return mixed
145
144
*/
146
145
public function later ($ delay , $ job , $ data = '' , $ queue = null )
147
146
{
148
- $ payload = $ this ->isPlain () ? $ job ->getPayload () : $ this ->createPayload ($ job , $ data );
149
-
150
147
$ delay = method_exists ($ this , 'getSeconds ' )
151
148
? $ this ->getSeconds ($ delay )
152
149
: $ this ->secondsUntil ($ delay );
153
150
151
+ if ($ this ->isPlain ()) {
152
+ return $ this ->pushRaw ($ job ->getPayload (), $ queue , ['delay ' => $ delay ]);
153
+ }
154
+
155
+ $ reflection = new \ReflectionMethod ($ this , 'createPayload ' );
156
+ if ($ reflection ->getNumberOfParameters () === 3 ) { // version >= 5.7
157
+ $ payload = $ this ->createPayload ($ job , $ queue , $ data );
158
+ } else {
159
+ $ payload = $ this ->createPayload ($ job , $ data );
160
+ }
161
+
154
162
return $ this ->pushRaw ($ payload , $ queue , ['delay ' => $ delay ]);
155
163
}
156
164
@@ -167,10 +175,9 @@ public function pop($queue = null)
167
175
$ queue = $ this ->getQueue ($ queue );
168
176
$ message = $ queue ->receive_message ($ this ->queueOptions ['polling_wait_seconds ' ]);
169
177
} catch (CMQServerException $ e ) {
170
- if ($ e ->getCode () == self ::CMQ_QUEUE_NO_MESSAGE_CODE ) { //ignore no message
171
- return ;
178
+ if (( int ) $ e ->getCode () = == self ::CMQ_QUEUE_NO_MESSAGE_CODE ) { //ignore no message
179
+ return null ;
172
180
}
173
-
174
181
throw $ e ;
175
182
}
176
183
@@ -211,13 +218,9 @@ public function getTopic($topic = null)
211
218
public function parseQueue ($ queue = null )
212
219
{
213
220
if ($ this ->topicOptions ['enable ' ]) {
214
- $ exchangeName = $ this ->topicOptions ['name ' ] ?: $ queue ;
215
-
216
- return $ this ->getTopic ($ exchangeName );
221
+ return $ this ->getTopic ($ this ->topicOptions ['name ' ] ?: $ queue );
217
222
}
218
223
219
- $ queueName = $ queue ?: $ this ->queueOptions ['name ' ];
220
-
221
- return $ this ->getQueue ($ queueName );
224
+ return $ this ->getQueue ($ queue ?: $ this ->queueOptions ['name ' ]);
222
225
}
223
226
}
0 commit comments