自学内容网 自学内容网

ESP32CAM人工智能教学15

ESP32CAM人工智能教学15

Flask服务器TCP连接

小智利用Flask在计算机中创建一个虚拟的网页服务器服务器,让ESP32Cam通过WiFi连接,把摄像头拍摄到的图片发送到电脑中,并在电脑中保存成图片文件。

Flask是用Python编写的网页服务程序WebServer。这个程序首先在电脑端用Python编写一个服务器端的程序,并创建一个指向5000端口的网页服务,用于接收ESP32CAM上传的图片数据,并保存成图片文件。

本来这个程序挺简单的,但是在测试程序时一直都过不了。测试程序耗了我三天时间,最后我采用“蚂蚁啃大象”的方法,采用由简及繁的方法,终于攻下难关。本文的测试主要参考博客内容https://blog.51cto.com/u_16213596/7788901

  • 第一轮给在线网站发信息

首先我们在计算机中,用Python编写一个客户端的测试程序,然后利用requests驱动库,给网上的一个测试网站发送字符信息,http://httpbin.org/post是网上为我们提供的一个学习测试的网站。

我们在测试程序中,创建一个字典d={‘jpg’,’tupian’},然后把信息发送到测试网页。点击菜单上的Run运行,结果在IDLE Shell窗口中,可以看到(第六行)返回了用户发送给网站的信息”jpg”:  ”tupian”。这样表面这个程序的requests的驱动库运行成功了。(当然,运行程序之前,需要在Python中先安装requests的驱动库,安装方法看前一课,可以到清华大学的镜像网站去下载)

  • 第二轮给本地Flask服务器发文字、图片信息

我们在Python中安装Flask驱动库,然后编写服务器端的程序test.py。在程序中创建一个一个网页服务,开放计算机的5000端口,并提供一个上传网页,用于接收客户端发送过来的消息。如图所示:客户端访问http://192.168.1.162:5000/updata可以给服务器端发送消息了。

修改客户端程序test02.py,把上传的地址改成本地的Flask服务器的上传网页地址,然后还是给服务器发送消息”jpg”:  “tupian”。

接下来是运行测试程序,先运行服务器端的程序test.py,可以看到Flask服务器开启了http://192.168.1.162:5000/。接着运行客户端程序test02.py,可以看到服务器端的IDLE Shell窗口显示一条消息,表示接收到了一条来自客户端的消息,并读取消息字典中名为“jpg”的内容,并组成字符串返回给客户端。在客户端的IDLE Shell窗口中看到了服务器返回的字符串”user”:  “tupian”。

接下来这个测试就简单了,首先把客户端中发送的消息“jpg”的内容,更改为一张图片的内容。我们先准备一张图片,命名为333.jpg,和test.py、test02.py一起,都保存在同一个文件夹中(Python的安装目录,我这里是D:\Python312\)。接着,更改服务器端的程序,返回客户端的消息为接收到的数据长度值。

接下来,先运行服务器端的程序,在运行客户端的程序,可以看到在服务器端的Shell看到接收到了数据,在客户端的Shell看到返回的图片文件大小了。这样,这张图片就上传成功了,可以在D:\Python312\中看到两个图片文件:333.jpg是被上传的原图; 555.jpg是上传后保存下的图片, 这个上传过程,就像是左手换右手一样,这两张图的内容是一模一样的。

  • 第三轮ESP32Cam读取内存图片发送给Flask

接下来,我们进行的是跨设备之间的传递,我们让ESP32Cam开发板读取内存中的一张图片的数据,然后通过WiFi连接,把图片数据发送到计算机中的Flask服务器中保存下来。

首先,我们在Thonny中连接ESP32Cam,并且下载一个urequests.py文件(如果没有可以复制后面的相关代码内容,然后在这里粘贴上传),上传到ESP32Cam中。

编写ESP32Cam客户端的程序,先连接WiFi,然后连接到服务器上传网页,读取图片内容,上传数据。需要注意的是,这个urequests.py驱动库文件只能传递二进制的数据,无法传递JSON的字符消息,所以在客户端和服务器端的程序,都有相应的小修改。

我们先在Thonny中上传一张图片到ESP32Cam中。打开服务器端的程序,再运行Thonny中的程序,这样就可以看到ESP32Cam内存中的那张图片,上传到了计算机D:\Python312\文件夹中了,在Thonny的Shell窗口中,看到了服务器程序返回来的图片文件大小的数字了。

  • 第三轮ESP32Cam摄像头拍摄图片发送给Flask

在这个测试中,我们把发送的图片改为摄像头拍摄到了图片,经过运行程序测试,可以看到摄像头拍摄到的图片,发送到了计算机中的D:\Python312\的文件夹中了。

避坑笔迹:之前一直测试的程序也是这个程序,但是就是一直不成功,一直找不到失败的原因,不知道传送的过程中究竟是哪个环节出现了问题,这个问题一直困扰了我三天时间,经过反反复复的测试,就是不成功。

后来我就采用“蚂蚁啃大象”的办法,从最简单的测试开始,尽可能地砍掉了所有的环节,保留最简单的连接通道(第一轮测试:本地客户端程序发送数据到线上的测试网站,保留最简单、最短传输路径的测试,因为网上的测试网站,基本可以忽略存在问题的可能,绝大的概率是正确的)。等第一轮测试通过后,就慢慢增加测试的传输路径长度,一点一点的向着最终的目标慢慢前进,经过几轮的测试,最终完成了ESP32Cam拍摄照片,利用TCP连接,发送到Flask服务器中的程序。

最后的测试成果,与网上的程序(之前一直不通过的)相比,我在摄像头初始化完成的后面,增加了一个0.2秒的延时,等待摄像头拍摄图片,然后在获取图片的数据,这样程序测试就通过了。

这个就是三天摸索的结果,不过最终还是找到了一个解决问题的办法:当程序出现问题的时候,如何化繁为简地进行测试,从最简单开始,一直到实现全部的预定功能。

如果在测试的时候,我们点击Thonny中的运行按钮,可以成功发送一张照片。第二次运行的时候,出现了错误信息,可以拔下ESP32Cam,重新插如电脑中,重新运行程序右可以了。说明这个程序还是有缺陷的,还需要改进。


第一轮测试,Python编写客户端程序,给在线HTTP服务网站发送信息

import requests
import binascii, json


def send_image():
    url = 'http://httpbin.org/post'
    d = {'jpg': 'tupian'}
    r = requests.post(url, data=d)
    
    print(r.text)
    
send_image()



/
第二轮测试    在电脑中创建两个Python程序,一个做服务器,一个做客户端,
在Python安装目录中,客服端读取333.jpg发送, 服务器端接收保存成555.jpg

test.py   服务器端:

from flask import Flask, request
import binascii

app = Flask(__name__)

@app.route("/updata", methods=["POST","GET"])
def updata():
    print("保存图片1")
    a = request.form.get('jpg')
    with open('555.jpg','wb') as f:
        val = binascii.a2b_base64(a)
        f.write(val)
    return ('user:%d' %len(a))

if __name__ == "__main__":
    app.run('0.0.0.0', 5000)


test02.py   客户端

import requests
import binascii, json


def send_image():
    url = 'http://192.168.1.162:5000/updata'
    with open('333.jpg','rb') as f:
        img = f.read()
    img = binascii.b2a_base64(img)
    
    d = {'jpg': img}
    r = requests.post(url, data=d)
    
    print(r.text)
    
send_image()

/
第三轮测试    ESP32CAM读取内存中的一张照片,发送到Python创建的Flask服务器端

from flask import Flask, request
import binascii

app = Flask(__name__)

@app.route("/updata", methods=["POST","GET"])
def updata():
    print("保存图片1")
    a = request.get_data()  #直接接收二进制
    with open('555.jpg','wb') as f:
        f.write(a)
    return ('user:%d' %len(a))

if __name__ == "__main__":
    app.run('0.0.0.0', 5000)   



ESP32Cam  设备作为客户端

import time, network

def connectWiFi():
    wlan = network.WLAN(network.STA_IF)
    if wlan.isconnected():
        wlan.disconnect()
    wlan.active(True)
    wlan.connect('ChinaNet-xxVP', '123456789')
    while not wlan.isconnected():
        pass
    print('network config: ', wlan.ifconfig())

connectWiFi()


import urequests as requests

def send_image():
    url = 'http://192.168.1.162:5000/updata'
    with open('333.jpg', 'rb') as f:
        img = f.read()
    r = requests.post(url, data = img)
    print(r.text)
    r.close
    
send_image()



第四轮测试    ESP32CAM拍下一张照片,发送到Python创建的Flask服务器端

Python服务器端

from flask import Flask, request
import binascii

app = Flask(__name__)

@app.route("/updata", methods=["POST","GET"])
def updata():
    print("保存图片1")
    a = request.get_data()  #直接接收二进制
    with open('555.jpg','wb') as f:
        f.write(a)
    return ('user:%d' %len(a))

if __name__ == "__main__":
    app.run('0.0.0.0', 5000)  



ESP32Cam客户端  

import time, network

def connectWiFi():
    wlan = network.WLAN(network.STA_IF)
    if wlan.isconnected():
        wlan.disconnect()
    wlan.active(True)
    wlan.connect('ChinaNet-xxVP', '123456789')
    while not wlan.isconnected():
        pass
    print('network config: ', wlan.ifconfig())

connectWiFi()

import camera
import time
import urequests as requests

def send_image():
    url = 'http://192.168.1.162:5000/updata'
    while not camera.init(0):
        time.sleep(0.2)
        camera.deinit()
        time.sleep(0.2)
    time.sleep(0.2)  #稍作延时,等待拍照
    img = camera.capture()  # 获取照片
    camera.deinit()
    
    r = requests.post(url, data = img)
    print(r.text)
    r.close

send_image()
    
 
//
/
保存到ESP32Cam中的 urequests.py 驱动库源代码

import usocket

class Response:

  def __init__(self, f):
    self.raw = f
    self.encoding = "utf-8"
    self._cached = None

  def close(self):
    if self.raw:
      self.raw.close()
      self.raw = None
    self._cached = None

  @property
  def content(self):
    if self._cached is None:
      self._cached = self.raw.read()
      self.raw.close()
      self.raw = None
    return self._cached

  @property
  def text(self):
    return str(self.content, self.encoding)

  def json(self):
    import ujson
    return ujson.loads(self.content)


def request(method, url, data=None, json=None, headers={}, stream=None,params=None):
  try:
    proto, dummy, host, path = url.split("/", 3)
  except ValueError:
    proto, dummy, host = url.split("/", 2)
    path = ""
  if proto == "http:":
    port = 80
  elif proto == "https:":
    import ussl
    port = 443
  else:
    raise ValueError("Unsupported protocol: " + proto)

  if ":" in host:
    host, port = host.split(":", 1)
    port = int(port)
    
  if params:
    path = path + "?"
    for k in params:
      path = path + '&'+k+'='+params[k]

  ai = usocket.getaddrinfo(host, port)
  addr = ai[0][4]
  s = usocket.socket()
  s.connect(addr)
  if proto == "https:":
    s = ussl.wrap_socket(s)
  s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
  if not "Host" in headers:
    s.write(b"Host: %s\r\n" % host)
  # Iterate over keys to avoid tuple alloc
  for k in headers:
    s.write(k)
    s.write(b": ")
    s.write(headers[k])
    s.write(b"\r\n")
  if json is not None:
    assert data is None
    import ujson
    data = ujson.dumps(json)
  if data:
    s.write(b"Content-Length: %d\r\n" % len(data))
  s.write(b"\r\n")
  if data:
    s.write(data)

  l = s.readline()
  protover, status, msg = l.split(None, 2)
  status = int(status)
  #print(protover, status, msg)
  while True:
    l = s.readline()
    if not l or l == b"\r\n":
      break
        #print(l)
    if l.startswith(b"Transfer-Encoding:"):
      if b"chunked" in l:
        raise ValueError("Unsupported " + l)
    elif l.startswith(b"Location:") and not 200 <= status <= 299:
      raise NotImplementedError("Redirects not yet supported")

  resp = Response(s)
  resp.status_code = status
  resp.reason = msg.rstrip()
  return resp


def head(url, **kw):
  return request("HEAD", url, **kw)

def get(url, **kw):
  return request("GET", url, **kw)

def post(url, **kw):
  return request("POST", url, **kw)

def put(url, **kw):
  return request("PUT", url, **kw)

def patch(url, **kw):
  return request("PATCH", url, **kw)

def delete(url, **kw):
  return request("DELETE", url, **kw)


原文地址:https://blog.csdn.net/tongyue/article/details/140569403

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!