TelloドローンをPythonプログラムで飛ばしてみた。

実際に書いたコード


import logging
import socket
import sys
import time
import cv2

logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger(__name__)

class DroneManager:
    def __init__(self, host_ip='192.168.10.2', host_port=8889,
                 drone_ip='192.168.10.1', drone_port=8889,
                 video_enabled=False):
        self.host_ip = host_ip
        self.host_port = host_port
        self.drone_ip = drone_ip
        self.drone_port = drone_port
        self.drone_address = (self.drone_ip, self.drone_port)
        self.video_enabled = video_enabled
        self.socket_closed = False  # ソケットが閉じられたか追跡

        try:
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self.socket.bind((self.host_ip, self.host_port))
            self.socket.settimeout(8)  # タイムアウトを8秒に設定
            logger.info("ソケットをバインドしました")

            self.send_command('command')
            if self.video_enabled:
                self.send_command('streamon')
        except Exception as e:
            logger.error(f"初期化中にエラー: {e}")
            self.stop()
            raise

    def __del__(self):
        # 明示的に stop() された後は何もしない
        if not self.socket_closed:
            try:
                self.land()
            except Exception:
                pass
            self.stop()

    def stop(self):
        if hasattr(self, 'socket') and not self.socket_closed:
            self.socket.close()
            self.socket_closed = True
            logger.info("ソケットを閉じました")

    def send_command(self, command, retry=3):
        logger.info({'action': 'send_command', 'command': command, 'target': self.drone_address})
        for attempt in range(retry):
            try:
                self.socket.sendto(command.encode('utf-8'), self.drone_address)
                response, _ = self.socket.recvfrom(1024)
                response_text = response.decode('utf-8')
                logger.info({'action': 'response', 'response': response_text})
                return response_text
            except socket.timeout:
                logger.warning(f"応答がタイムアウトしました({attempt + 1}/{retry})")
            except Exception as e:
                logger.error(f"コマンド送信エラー: {e}")
                return None
        return None

    def takeoff(self):
        return self.send_command('takeoff')

    def land(self):
        return self.send_command('land')

    def move_forward(self, distance_cm=20):
        return self.send_command(f'forward {distance_cm}')

    def rotate_cw(self, angle=90):
        return self.send_command(f'cw {angle}')

    def receive_video(self):
        if not self.video_enabled:
            logger.warning("映像受信は無効化されています")
            return

        logger.info("映像の受信を開始します。'q' キーで終了。")
        cap = cv2.VideoCapture("udp://0.0.0.0:11111")
        if not cap.isOpened():
            logger.error("映像ストリームを開けませんでした")
            return

        while True:
            ret, frame = cap.read()
            if ret:
                cv2.imshow("Tello Stream", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        cap.release()
        cv2.destroyAllWindows()
        logger.info("映像受信を終了しました")

if __name__ == '__main__':
    drone = None
    try:
        drone = DroneManager(video_enabled=False)  # 映像が必要なら True に変更
        drone.takeoff()
        time.sleep(5)

        drone.move_forward(50)
        time.sleep(3)

        drone.rotate_cw(90)
        time.sleep(3)

        drone.land()
    except Exception as e:
        logger.error(f"メイン処理中にエラー: {e}")
        if drone:
            try:
                drone.land()
            except:
                pass
    finally:
        if drone:
            drone.stop()

実際の飛ばした動画

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

コメント

コメントする

目次