diff --git a/README.md b/README.md index 09349f5..baeb603 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,7 @@ production: ```ruby AliyunIot::Queue.queues ## 列出所有队列 AliyunIot::Queue[QueueName].receive_message(WaitSeconds) ## 消费消息 + AliyunIot::Queue[QueueName].batch_receive_message(MessageCount, WaitSeconds) ## 批量消费消息 AliyunIot::Queue[QueueName].peek ## 查看消息 AliyunIot::Queue[QueueName].create({DelaySeconds, MaximumMessageSize, MessageRetentionPeriod, VisibilityTimeout, PollingWaitSeconds, LoggingEnabled}) ## 创建队列 AliyunIot::Queue[QueueName].delete ## 删除队列 @@ -82,7 +83,8 @@ production: ```ruby AliyunIot::Product.create(Name) ## 创建产品 - AliyunIot::Product.check_regist_state(ApplyId) ## 批量查询注册状态 + AliyunIot::Product.batch_get_device_state({DeviceName.1, DeviceName.2 ....}) ## 批量查询设备状态 + AliyunIot::Product.check_regist_state(ApplyId) ## 查询注册状态 AliyunIot::Product.list_regist_info(ApplyId, PageSize, CurrentPage) ## 批量查询注册状态 AliyunIot::Product[ProductKey].update({ProductName, ProductDesc}) ## 修改产品信息 AliyunIot::Product[ProductKey].list({PageSize, CurrentPage}) ## 查询产品的设备列表 diff --git a/lib/aliyun_iot.rb b/lib/aliyun_iot.rb index b1a56ee..8993f27 100644 --- a/lib/aliyun_iot.rb +++ b/lib/aliyun_iot.rb @@ -9,6 +9,7 @@ require "aliyun_iot/topic" require "aliyun_iot/message" require "aliyun_iot/product" +require "aliyun_iot/result" class Hash def self.xml_array content, *path diff --git a/lib/aliyun_iot/message.rb b/lib/aliyun_iot/message.rb index 10c4070..087cbcc 100644 --- a/lib/aliyun_iot/message.rb +++ b/lib/aliyun_iot/message.rb @@ -1,20 +1,12 @@ require "aliyun_iot/request/xml" module AliyunIot class Message - attr_reader :queue, :id, :body_md5, :body, :receipt_handle, :enqueue_at, :first_enqueue_at, :next_visible_at, :dequeue_count, :priority + attr_reader :h, :queue, :id, :body_md5, :body, :receipt_handle, :enqueue_at, :first_enqueue_at, :next_visible_at, :dequeue_count, :priority - def initialize(queue, content) - h = Hash.xml_object(content, "Message") + def initialize(queue, h) + @h = h @queue = queue - @id = h["MessageId"] - @body_md5 = h["MessageBodyMD5"] - @body = h["MessageBody"] - @enqueue_at = Time.at(h["EnqueueTime"].to_i/1000.0) - @first_enqueue_at = Time.at(h["FirstDequeueTime"].to_i/1000.0) - @dequeue_count = h["DequeueCount"].to_i - @priority = h["Priority"].to_i - @receipt_handle = h["ReceiptHandle"] - @next_visible_at = Time.at(h["NextVisibleTime"].to_i/1000.0) + set_message_info end #删除消息 @@ -56,6 +48,18 @@ def to_s end private + + def set_message_info + @id = h["MessageId"] + @body_md5 = h["MessageBodyMD5"] + @body = h["MessageBody"] + @enqueue_at = Time.at(h["EnqueueTime"].to_i/1000.0) + @first_enqueue_at = Time.at(h["FirstDequeueTime"].to_i/1000.0) + @dequeue_count = h["DequeueCount"].to_i + @priority = h["Priority"].to_i + @receipt_handle = h["ReceiptHandle"] + @next_visible_at = Time.at(h["NextVisibleTime"].to_i/1000.0) + end def set_data(query) {mqs_headers: {"x-mns-version" => "2015-06-06"}, query: query} diff --git a/lib/aliyun_iot/product.rb b/lib/aliyun_iot/product.rb index 8ec0c78..3527fd1 100644 --- a/lib/aliyun_iot/product.rb +++ b/lib/aliyun_iot/product.rb @@ -51,13 +51,15 @@ def regist_device(params = {}) def regist_devices(params = {}) execute params, 'ApplyDeviceWithNames' end + + def batch_get_device_state(params = {}) + execute params, 'BatchGetDeviceState' + end def pub(params = {}) raise RequestException.new(Exception.new("message MessageContent is empty!")) if params[:MessageContent].nil? - default_params = { Qos: '0' } - default_params.merge!({ Qos: '0' }) if params[:Qos].nil? - params[:MessageContent] = Base64.urlsafe_encode64(params[:MessageContent]).chomp - execute params.merge(default_params), 'Pub' + params[:MessageContent] = Base64.encode64(params[:MessageContent]).chomp + execute params, 'Pub' end def sub(params = {}) diff --git a/lib/aliyun_iot/queue.rb b/lib/aliyun_iot/queue.rb index 76b79d0..3cbd432 100644 --- a/lib/aliyun_iot/queue.rb +++ b/lib/aliyun_iot/queue.rb @@ -36,7 +36,7 @@ def create(opts = {}) :MessageRetentionPeriod => 345600, :PollingWaitSeconds => 0 }.merge(opts) - request.content_xml(:Queue, msg_options) + request.content(:Queue, msg_options) end end @@ -57,7 +57,7 @@ def send_message(message, opts = {}) :DelaySeconds => 0, :Priority => 8 }.merge(opts) - request.content_xml(:Message, msg_options.merge(:MessageBody => message.to_s)) + request.content(:Message, msg_options.merge(:MessageBody => message.to_s)) end end @@ -65,7 +65,21 @@ def send_message(message, opts = {}) def receive_message(wait_seconds = 3) result = Request::Xml.get(messages_path, query: {waitseconds: wait_seconds}) return nil if result.nil? - Message.new(self, result) + Result.new(self, result).get_message + end + + #批量消费消息 + def batch_receive_message(num = 16, wait_seconds = 3) + result = Request::Xml.get(messages_path, query: {waitseconds: wait_seconds, numOfMessages: num}) + return nil if result.nil? + Result.new(self, result, "Messages", "Message").get_message + end + + #设置队列属性 + def set_attr(opts = {}) + Request::Xml.put(queue_path, query: {Metaoverride: true}) do |request| + request.content(:Queue, opts) + end end def queue_path diff --git a/lib/aliyun_iot/result.rb b/lib/aliyun_iot/result.rb new file mode 100644 index 0000000..ca61408 --- /dev/null +++ b/lib/aliyun_iot/result.rb @@ -0,0 +1,16 @@ +module AliyunIot + class Result + + attr_reader :h, :queue + + def initialize(queue, content, *path) + @queue = queue + @h = path.blank? ? Hash.xml_object(content, "Message") : Hash.xml_array(content, *path) + end + + def get_message + h.is_a?(Array) ? h.map{ |message| Message.new(queue, message) } : Message.new(queue, h) + end + + end +end diff --git a/lib/aliyun_iot/version.rb b/lib/aliyun_iot/version.rb index a6a07b4..d0c32a9 100644 --- a/lib/aliyun_iot/version.rb +++ b/lib/aliyun_iot/version.rb @@ -1,3 +1,3 @@ module AliyunIot - VERSION = "0.1.3" + VERSION = "0.1.4" end