実際に書いたコード
import logging
import socket
import sys
import time
import cv2
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger(__name__)
class DroneManager:
def __init__(self, host_ip='192.168.10.2', host_port=8889,
drone_ip='192.168.10.1', drone_port=8889,
video_enabled=False):
self.host_ip = host_ip
self.host_port = host_port
self.drone_ip = drone_ip
self.drone_port = drone_port
self.drone_address = (self.drone_ip, self.drone_port)
self.video_enabled = video_enabled
self.socket_closed = False # ソケットが閉じられたか追跡
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind((self.host_ip, self.host_port))
self.socket.settimeout(8) # タイムアウトを8秒に設定
logger.info("ソケットをバインドしました")
self.send_command('command')
if self.video_enabled:
self.send_command('streamon')
except Exception as e:
logger.error(f"初期化中にエラー: {e}")
self.stop()
raise
def __del__(self):
# 明示的に stop() された後は何もしない
if not self.socket_closed:
try:
self.land()
except Exception:
pass
self.stop()
def stop(self):
if hasattr(self, 'socket') and not self.socket_closed:
self.socket.close()
self.socket_closed = True
logger.info("ソケットを閉じました")
def send_command(self, command, retry=3):
logger.info({'action': 'send_command', 'command': command, 'target': self.drone_address})
for attempt in range(retry):
try:
self.socket.sendto(command.encode('utf-8'), self.drone_address)
response, _ = self.socket.recvfrom(1024)
response_text = response.decode('utf-8')
logger.info({'action': 'response', 'response': response_text})
return response_text
except socket.timeout:
logger.warning(f"応答がタイムアウトしました({attempt + 1}/{retry})")
except Exception as e:
logger.error(f"コマンド送信エラー: {e}")
return None
return None
def takeoff(self):
return self.send_command('takeoff')
def land(self):
return self.send_command('land')
def move_forward(self, distance_cm=20):
return self.send_command(f'forward {distance_cm}')
def rotate_cw(self, angle=90):
return self.send_command(f'cw {angle}')
def receive_video(self):
if not self.video_enabled:
logger.warning("映像受信は無効化されています")
return
logger.info("映像の受信を開始します。'q' キーで終了。")
cap = cv2.VideoCapture("udp://0.0.0.0:11111")
if not cap.isOpened():
logger.error("映像ストリームを開けませんでした")
return
while True:
ret, frame = cap.read()
if ret:
cv2.imshow("Tello Stream", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
logger.info("映像受信を終了しました")
if __name__ == '__main__':
drone = None
try:
drone = DroneManager(video_enabled=False) # 映像が必要なら True に変更
drone.takeoff()
time.sleep(5)
drone.move_forward(50)
time.sleep(3)
drone.rotate_cw(90)
time.sleep(3)
drone.land()
except Exception as e:
logger.error(f"メイン処理中にエラー: {e}")
if drone:
try:
drone.land()
except:
pass
finally:
if drone:
drone.stop()
実際の飛ばした動画
コメント