嵌入式爱好者

查看: 22731|回复: 0

[评测] 【飞凌嵌入式 OK3399-C 开发板试用体验】(安装RabbitMQ消息队列服务】

[复制链接]

4

主题

4

帖子

49

积分

LS1043A\46A通行证RK3399通行证LS1028A通行证AM335x通行证i.MX RT通行证

扫一扫,手机访问本帖
发表于 2020-8-20 22:07:41 | 显示全部楼层 |阅读模式
本帖最后由 fcb5511 于 2020-9-7 09:00 编辑

提供两种消息队列服务安装方式:
一、通过apt命令安装
二、通过官方源码以及binary包安装

apt命令安装方式获取的erlang和rabbitmqserver版本一般比较低,erlang版本和rabbitmqserver版本如果匹配不好,在高并发,多连接的情况下,cpu以及内存消耗较大(不知道现在最新安装版本有没有这个问题,有兴趣的可以试一试),优点是安装和使用都很方便,下面是安装和配置的各个步骤:

安装RabbitMQServer需要的语言环境 erlang,执行如下命令
1apt-get installerlang-nox   # 安装erlang
2erl  # 查看relang语言版本,成功执行则说明relang安装成功
1.png

安装RabbitMQServer消息队列服务
apt-get install rabbitmq-server #安装成功自动启动
查看RabbitMQServer安装状态
systemctl status rabbitmq-server
2.png








启用管理插件(可通过网页登录)
rabbitmq-plugins enable rabbitmq_management

3.png

OK3399浏览器中输入localhost:15672,登陆用户名和密码分别为”guest””guest”
4.jpg
如果要实现局域网的其它计算机可访问OK3399MQ服务,需要给MQ服务添加管理员以及管理员权限。
新建admin用户并赋予权限
1rabbitmqctl add_useradmin admin
2rabbitmqctl set_user_tagsadmin administrator
3rabbitmqctl set_permissions-p / admin '.*' '.*' '.*'



局域网内计算机浏览器输入192.168.1.109:15672,用户名和密码分别为”admin””admin”
5.png


源码编译和binary安装需要从erlang官网下载源码自行编译为arm64平台的安装包,rabbitmqserver可直接从官网下载binaray包,手动添加环境变量。这种安装方式优点是可以根据喜好安装最新发布的版本。

github获取erlang源码包
wget  https://www.erlang.org/downloads/otp_src_22.1.tar.gz
解压源码包tar -zvxf otp_src_22.1.tar.gz #编译&安装 编译前先安装ssl相关的依赖库,否则ok3399在启动mq服务时会报错apt-get install libssl-dev 接下来安装erlangcd otp_src_22.1 ./otp_build autoconf./configure && make && sudo make install安装不指定路径会默认安装到/usr/local/lib/erlang/目录下

rabbitmqserver二进制包安装
github获取安装包
wget
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.0/rabbitmq-server-generic-unix-3.8.0.tar.xz

解压
tar -xvf rabbitmq-server-generic-unix-3.8.0.tar.xz
将解压后的包放到/usr/local目录

配置环境变量
vim /etc/profile 在后面添加如下命令:export PATH=$PATH:/usr/local/lib/erlang/bin:/usr/local/rabbitmq_server-3.8.0/sbin让新加的环境变量有效source /etc/profile
erl查看版本号
6.png

启动rabbitmq-server
rabbitmq-plugins enable rabbitmq_management//允许网页管理rabbitmq-server rabbitmq-server -detached(后台运行)
新建admin用户并赋予权限
1rabbitmqctl add_useradmin admin
2rabbitmqctl set_user_tagsadmin administrator
3rabbitmqctl set_permissions-p / admin '.*' '.*' '.*'

至此,两种方式安装Rabbitmq服务完成。接下来用python来验证一下功能,python实现一个消费者,一个生产者,生产间隔1秒钟发送一帧数据,消费者从队列取数据并打印。python使用rabbitmq需要引用pika库,pika库是独立于python的第三方库,需要使用pip命令安装。
amqproducer.py程序:
  1. import pika
  2. import threading
  3. import time
  4. import signal
  5. import os


  6. class Producer(object):
  7.     def __init__(self, host='127.0.0.1', port=5672,
  8.                  user='admin', passwd='admin',
  9.                  exchange='test_exchange', routing_key='test_key',
  10.                  exchange_type= 'fanout', exhcange_durable=False, exchange_autodelete=True
  11.                  ):
  12.         self.host=host
  13.         self.port=port
  14.         self.user=user
  15.         self.passwd=passwd
  16.         self.exchange=exchange
  17.         self.routing_key=routing_key
  18.         self.exchange_type=exchange_type
  19.         self.exhcange_durable=exhcange_durable
  20.         self.exchange_autodelete=exchange_autodelete
  21.         self.connection=None
  22.         self.channel=None


  23.     def CreateConnection(self):
  24.         try:
  25.             self.credentials=pika.PlainCredentials(self.user, self.passwd)
  26.             self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.host, port = self.port, virtual_host = '/', credentials=self.credentials))  
  27.             self.channel = self.connection.channel()   

  28.            
  29.             self.channel.exchange_declare(exchange_type=self.exchange_type, exchange = self.exchange, durable = self.exhcange_durable, auto_delete=self.exchange_autodelete)
  30.         except Exception as e:
  31.             print("occurring an exception when the producer executing the function 'CreateConnection()':", e)


  32.     def PublisMessage(self, Message):
  33.         try:
  34.             self.channel.basic_publish(exchange=self.exchange, routing_key=self.routing_key, properties=pika.BasicProperties(delivery_mode = 2), body=Message)
  35.         except Exception as e:
  36.            print("An exception occurred when publishing message:", e)
  37.         
复制代码


amqconsumer.py程序:
  1. import pika
  2. import time

  3. class Consumer(object):
  4.     def __init__(self, host='127.0.0.1', port=5672,
  5.                  user='admin', passwd='admin',
  6.                  queue='test_queue', exchange='test_exchange', routing_key='test_key',
  7.                  queue_exclusive=False, queue_durable=False, queue_autodelete=True,
  8.                  exchange_type= 'fanout', exhcange_durable=False, exchange_autodelete=True
  9.                  ):
  10.         self.host=host
  11.         self.port=port
  12.         self.user=user
  13.         self.passwd=passwd
  14.         self.queue=queue
  15.         self.exchange=exchange
  16.         self.routing_key=routing_key
  17.         self.queue_exclusive=queue_exclusive
  18.         self.queue_durable=queue_durable
  19.         self.queue_autodelete=queue_autodelete
  20.         self.exchange_type=exchange_type
  21.         self.exhcange_durable=exhcange_durable
  22.         self.exchange_autodelete=exchange_autodelete

  23.         self.connection=None
  24.         self.channel=None

  25.         
  26.     def CreateConnection(self):
  27.         try:
  28.             self.credentials=pika.PlainCredentials(self.user, self.passwd)
  29.             self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.host, port = self.port, virtual_host = '/', credentials=self.credentials))
  30.             self.channel = self.connection.channel()

  31.             self.channel.queue_declare(queue=self.queue, durable=self.queue_durable, exclusive=self.queue_exclusive, auto_delete=self.queue_autodelete)
  32.             
  33.             self.channel.exchange_declare(exchange_type=self.exchange_type, exchange = self.exchange, durable = self.exhcange_durable, auto_delete=self.exchange_autodelete)

  34.             self.channel.queue_bind(queue = self.queue, exchange = self.exchange, routing_key = self.routing_key)

  35.             self.channel.basic_consume(on_message_callback = self.on_response, auto_ack=True, queue=self.queue)

  36.         except Exception as e:
  37.             print("occurring an exception when the consumer  RecvData:", e)
  38.             

  39.     def on_response(self, ch, method, props, body):
  40.             #print(" [%s] Received %r" % (time.time(), body))
  41.             print(time.strftime('%H:%M:%S',time.localtime(time.time()))+': Get Message from '+self.host + ':'+body.decode())


  42.     def ProcessConsumeEventBlock(self):
  43.         try:
  44.             self.channel.start_consuming()
  45.         except Exception as e:
  46.             print('An exception occurred when executing consuming, the exception is:',e)


  47.     def ProcessConsumeEventNoneBlock(self):
  48.         try:
  49.             self.connection.process_data_events()
  50.         except Exception as e:
  51.             print('An exception occurred when executing consuming, the exception is:',e)
  52.    
复制代码



main.py程序:
  1. import pika
  2. import time
  3. import threading
  4. from amqconsumer import Consumer
  5. from amqproducer import Producer

  6. class ManageMent:
  7.     def __init__(self):
  8.         self.producer = Producer(host='localhost',user='guest',passwd='guest')
  9.         self.consumer = Consumer(host='localhost',user='guest',passwd='guest')
  10.         
  11.    
  12.     def CreateSever(self):
  13.         self.consumer.CreateConnection()
  14.         self.producer.CreateConnection()
  15.    
  16.     def ProducerRun(self):
  17.         while True:
  18.            self.producer.PublisMessage('this message is used for testing RabbitMQ')  
  19.            time.sleep(0.3)

  20.     def ConsumerRun(self):
  21.         self.consumer.ProcessConsumeEventBlock()
  22.         

  23.     def Run1(self):
  24.         while True:     
  25.             self.producer.PublisMessage('this message is used for test RabbitMQ')  
  26.             self.consumer.ProcessConsumeEventNoneBlock()
  27.             time.sleep(1)

  28.     def Run(self):
  29.         self.Consumer_thread=threading.Thread(target=self.ConsumerRun)
  30.         self.Producer_thread=threading.Thread(target=self.ProducerRun)
  31.         self.Consumer_thread.start()
  32.         self.Producer_thread.start()
  33.   

  34. if __name__ == '__main__':
  35.     manage=ManageMent()
  36.     manage.CreateSever()
  37.     manage.Run()

复制代码


最终运行效果如图

7.png

此贴为基础功能,下次将mq数据与qt界面结合,实现动态的界面。

python.zip

2.2 KB, 下载次数: 1, 下载积分: 贡献 1

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|小黑屋| 飞凌嵌入式 ( 冀ICP备12004394号-1 )

GMT+8, 2024-11-22 04:56

Powered by Discuz! X3.4

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表