DMR User (dmruser) コマンド仕様書

1. 概要

dmruser は、DMR Monitor (dmr_monitor5.30C.py) が管理するIPアドレスのブロック状態を、コマンドラインから手動で操作するためのユーティリティスクリプトです。このコマンドは、主にブロックされたIPアドレスの確認と解除(ホワイトリストへの追加、またはブロックリストからの削除)を目的としています。


2. 目的

  • ブロック状態の確認: 現在ブロックされているIPアドレスと、関連するコールサインの情報を一覧表示する。
  • ブロックの解除: ブロックされたIPアドレスを明示的にブロックリストから削除し、DMR Monitorにその変更を反映させる。
  • 許可リストへの追加: 指定したコールサインを許可リスト (allowed_callsigns.txt) に追加する。
  • 既知IP情報の表示: DMR Monitorがこれまでに検出したIPアドレスとコールサインの対応情報を表示する。

3. 機能要件

dmruser は以下の機能を提供します。

機能カテゴリ機能名コマンド/オプション定量的/具体的記述
ブロック管理ブロック中IPの表示dmruser list現在 dropped_ips.json に記録され、DMR MonitorによってブロックされているIPアドレスとコールサインの一覧を表示します。IPアドレス、関連コールサイン、ブロック日時(オプション)が含まれます。
IPブロックの解除dmruser unblock <IP_ADDRESS>指定された <IP_ADDRESS> に対する iptables のDROPルールを削除し、dropped_ips.json からもそのIP情報を削除します。これにより、当該IPはDMR Monitorの監視対象に戻り、次回の不正アクセス検知まで通信が可能になります。
許可リスト管理コールサインの許可追加dmruser allow <CALLSIGN>指定された <CALLSIGN>allowed_callsigns.txt に追加します。大文字・小文字は区別されず、自動的に大文字で保存されます。DMR Monitorはファイル変更を検知し、即座に当該コールサインを持つIPアドレスのブロックを解除します。
コールサインの許可解除dmruser disallow <CALLSIGN>指定された <CALLSIGN>allowed_callsigns.txt から削除します。これにより、当該コールサインからのアクセスはDMR Monitorの監視対象となり、不正アクセスと見なされればブロックされます。
情報表示既知IPの表示dmruser knownknown_ips.json に記録されている、DMR Monitorがこれまでに検出した全てのIPアドレスとコールサインの対応情報を表示します。IPアドレス、関連コールサインが含まれます。
ヘルプ表示コマンドヘルプdmruser --help / dmruser -h利用可能なコマンドオプションと使用方法の詳細を表示します。
ファイルパス設定データファイルパス指定(コマンドライン引数で定義)dmruser が参照する allowed_callsigns.txt, dropped_ips.json, known_ips.json のパスは、スクリプト内部で dmr_monitor5.30C.py と同じく、dmruser スクリプトの実行ディレクトリを基準に設定されます。これにより、両スクリプトが同じデータセットを操作します。
エラーハンドリング無効な入力の検知(自動)存在しないIPアドレスの解除試行、無効なコマンドオプション、ファイル操作エラーなど、無効な入力やエラー発生時には適切なエラーメッセージを標準エラー出力 (sys.stderr) に表示し、非ゼロの終了コードで終了します。

Google スプレッドシートにエクスポート


4. 非機能要件

  • 互換性: DMR Monitor (dmr_monitor5.30C.py) と同じデータファイルを共有し、シームレスな連携を実現します。
  • 堅牢性: ファイルの読み書きにおけるエラー処理を実装し、データ破損や不整合のリスクを低減します。
  • 利便性: コマンドラインからの直感的な操作を可能にし、システム管理者の負担を軽減します。
  • セキュリティ: iptables の変更を伴う操作には sudo 権限を要求します。

5. 構成要素とファイルパス

dmruser は通常、dmr_monitor5.30C.py と同じディレクトリに配置され、同じデータファイルを共有します。

項目ファイル/パス役割備考
スクリプト本体dmruser (例: /usr/local/bin/dmruserコマンドラインインターフェースを提供Pythonスクリプトとして実装されることを想定
データファイルallowed_callsigns.txt許可されたDMRコールサインのリストdmr_monitor と共有。スクリプト相対パスでアクセス
dropped_ips.json現在ブロック中のIPとそのコールサイン情報dmr_monitor と共有。スクリプト相対パスでアクセス
known_ips.json過去に検知された全てのIPとそのコールサインの対応dmr_monitor と共有。スクリプト相対パスでアクセス

Google スプレッドシートにエクスポート


6. 環境要件

  • OS: Linux (iptables が利用可能な環境)
  • Python: Python 3.x
  • Python パッケージ: 標準ライブラリのみ(re, subprocess, time, os, json, sys, argparse, collections)。追加のインストールは不要。
  • 権限: unblock コマンドなど iptables を操作する機能については root ユーザー権限(sudo)が必要です。

7. 想定されるコマンドライン構文

  • ブロック中のIPアドレス一覧表示:Bashsudo dmruser list
  • 特定のIPアドレスのブロック解除:Bashsudo dmruser unblock 192.168.1.50
  • コールサインを許可リストに追加:Bashsudo dmruser allow JP1XYZ
  • コールサインを許可リストから削除:Bashsudo dmruser disallow JP1XYZ
  • 既知IPアドレスとコールサインの一覧表示:Bashsudo dmruser known
  • ヘルプ表示:Bashdmruser --help

この仕様書は dmruser コマンドの設計と機能の概要を説明しています。具体的な実装は、これらの要件に基づいて行われます。

#!/usr/bin/env python3

import os
import sys
import json
import subprocess
import argparse
import datetime # ログ出力のために追加

# --- 設定項目 (dmr_monitor と共通) ---
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
CALLSIGN_FILE = os.path.join(SCRIPT_DIR, "allowed_callsigns.txt")
DROPPED_IP_FILE = os.path.join(SCRIPT_DIR, "dropped_ips.json")
KNOWN_IP_FILE = os.path.join(SCRIPT_DIR, "known_ips.json")

DROP_PORTS = [62030, 8880, 30051, 42000] # iptables操作に必要

# --- ログ出力ヘルパー関数 ---
def log_message(message, level="INFO", file=sys.stdout):
    """
    メッセージに現在時刻のタイムスタンプを付与して出力します。
    dmr_monitor と同様のフォーマット。
    """
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] [{level}] {message}", file=file)

# --- ファイル操作ヘルパー関数 ---
def load_json_file(file_path, default_data={}):
    """JSONファイルを読み込み、存在しない場合はデフォルトデータを返す。"""
    if not os.path.exists(file_path):
        log_message(f"情報ファイル '{os.path.basename(file_path)}' が見つかりません。新規作成されます。", level="WARNING")
        return default_data
    try:
        with open(file_path, "r") as f:
            return json.load(f)
    except json.JSONDecodeError as e:
        log_message(f"ファイル '{os.path.basename(file_path)}' の解析エラー: {e}", level="ERROR", file=sys.stderr)
        log_message(f"'{os.path.basename(file_path)}' を空のデータで初期化します。", level="WARNING")
        return default_data
    except IOError as e:
        log_message(f"ファイル '{os.path.basename(file_path)}' の読み込みエラー: {e}", level="ERROR", file=sys.stderr)
        return default_data

def save_json_file(file_path, data):
    """JSONファイルを保存する。"""
    try:
        with open(file_path, "w") as f:
            json.dump(data, f, indent=4)
    except IOError as e:
        log_message(f"ファイル '{os.path.basename(file_path)}' の書き込みエラー: {e}", level="ERROR", file=sys.stderr)
        return False
    return True

def load_text_file(file_path):
    """テキストファイルを読み込み、各行をセットとして返す。"""
    if not os.path.exists(file_path):
        log_message(f"許可リストファイル '{os.path.basename(file_path)}' が見つかりません。新規作成されます。", level="WARNING")
        return set()
    try:
        with open(file_path, "r") as f:
            return {line.strip().upper() for line in f if line.strip()}
    except IOError as e:
        log_message(f"ファイル '{os.path.basename(file_path)}' の読み込みエラー: {e}", level="ERROR", file=sys.stderr)
        return set()

def save_text_file(file_path, data_set):
    """テキストファイルを保存する。"""
    try:
        with open(file_path, "w") as f:
            for item in sorted(list(data_set)): # ソートして書き込む
                f.write(item + "\n")
    except IOError as e:
        log_message(f"ファイル '{os.path.basename(file_path)}' の書き込みエラー: {e}", level="ERROR", file=sys.stderr)
        return False
    return True

# --- iptables操作関数 (dmr_monitor と共通の一部) ---
def remove_iptables_drop_rules(ip):
    """
    指定されたIPアドレスに対するDROPルールをすべて削除する。
    dmr_monitor の remove_drop 関数からiptables部分を抽出。
    """
    removed_any = False
    for port in DROP_PORTS:
        while subprocess.run(["iptables", "-D", "INPUT", "-p", "udp", "--dport", str(port), "-s", ip, "-j", "DROP"],
                             stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0:
            removed_any = True
            log_message(f"    -> iptables DROPルールを削除: IP={ip}, Port={port}", level="INFO")
            pass # 削除が成功したらループを継続して、全ての該当ルールを削除
    if not removed_any:
        log_message(f"    -> IP={ip} に対するiptablesルールは元々存在しませんでした。", level="INFO")
    return removed_any

# --- コマンド処理関数 ---

def list_dropped_ips():
    """現在DROP中のIPアドレスを一覧表示する。"""
    dropped_ips = load_json_file(DROPPED_IP_FILE)
    if not dropped_ips:
        log_message("現在ブロックされているIPアドレスはありません。", level="INFO")
        return

    log_message("--- 現在ブロックされているIPアドレス ---", level="INFO")
    for ip, info in dropped_ips.items():
        callsign = info.get('callsign', '不明') if isinstance(info, dict) else info # 互換性対応
        log_message(f"IP: {ip}, コールサイン: {callsign}")
    log_message("-------------------------------------", level="INFO")

def unblock_ip(ip_address):
    """指定されたIPアドレスのブロックを解除する。"""
    # root権限チェック
    if os.geteuid() != 0:
        log_message("このコマンドはroot権限で実行する必要があります。 (例: sudo dmruser unblock <IP_ADDRESS>)", level="ERROR", file=sys.stderr)
        sys.exit(1)

    dropped_ips = load_json_file(DROPPED_IP_FILE)
    if ip_address not in dropped_ips:
        log_message(f"IPアドレス '{ip_address}' はブロックリストにありません。", level="WARNING")
        return

    callsign = dropped_ips.get(ip_address, '不明')
    if isinstance(callsign, dict): # 新しい形式の場合
        callsign = callsign.get('callsign', '不明')

    log_message(f"IPアドレス '{ip_address}' (コールサイン: {callsign}) のブロックを解除します...", level="INFO")

    # 1. iptables ルールを削除
    remove_iptables_drop_rules(ip_address)

    # 2. dropped_ips.json から情報を削除
    del dropped_ips[ip_address]
    if save_json_file(DROPPED_IP_FILE, dropped_ips):
        log_message(f"IPアドレス '{ip_address}' (コールサイン: {callsign}) のブロック解除が完了しました。", level="INFO")
    else:
        log_message(f"IPアドレス '{ip_address}' のブロックは解除されましたが、ファイルへの保存に失敗しました。", level="ERROR", file=sys.stderr)


def allow_callsign(callsign):
    """コールサインを許可リストに追加する。"""
    # root権限チェック
    if os.geteuid() != 0:
        log_message("このコマンドはroot権限で実行する必要があります。 (例: sudo dmruser allow <CALLSIGN>)", level="ERROR", file=sys.stderr)
        sys.exit(1)

    callsign_upper = callsign.upper()
    allowed_callsigns = load_text_file(CALLSIGN_FILE)

    if callsign_upper in allowed_callsigns:
        log_message(f"コールサイン '{callsign_upper}' は既に許可リストにあります。", level="WARNING")
        return

    allowed_callsigns.add(callsign_upper)
    if save_text_file(CALLSIGN_FILE, allowed_callsigns):
        log_message(f"コールサイン '{callsign_upper}' を許可リストに追加しました。", level="INFO")
        log_message("dmr_monitor はファイルの変更を検知し、自動的にルールを更新します。", level="INFO")
    else:
        log_message(f"コールサイン '{callsign_upper}' の許可リストへの追加に失敗しました。", level="ERROR", file=sys.stderr)

def disallow_callsign(callsign):
    """コールサインを許可リストから削除する。"""
    # root権限チェック
    if os.geteuid() != 0:
        log_message("このコマンドはroot権限で実行する必要があります。 (例: sudo dmruser disallow <CALLSIGN>)", level="ERROR", file=sys.stderr)
        sys.exit(1)

    callsign_upper = callsign.upper()
    allowed_callsigns = load_text_file(CALLSIGN_FILE)

    if callsign_upper not in allowed_callsigns:
        log_message(f"コールサイン '{callsign_upper}' は許可リストにありません。", level="WARNING")
        return

    allowed_callsigns.remove(callsign_upper)
    if save_text_file(CALLSIGN_FILE, allowed_callsigns):
        log_message(f"コールサイン '{callsign_upper}' を許可リストから削除しました。", level="INFO")
        log_message("dmr_monitor はファイルの変更を検知し、自動的にルールを更新します。", level="INFO")
    else:
        log_message(f"コールサイン '{callsign_upper}' の許可リストからの削除に失敗しました。", level="ERROR", file=sys.stderr)

def list_known_ips():
    """過去に検出された既知のIPアドレスを一覧表示する。"""
    known_ips = load_json_file(KNOWN_IP_FILE)
    if not known_ips:
        log_message("既知のIPアドレス情報はありません。", level="INFO")
        return

    log_message("--- 既知のIPアドレスとコールサイン ---", level="INFO")
    # 辞書をコールサインでソートして表示
    sorted_items = sorted(known_ips.items(), key=lambda item: item[1].get('callsign', '') if isinstance(item[1], dict) else item[1])
    for ip, info in sorted_items:
        callsign = info.get('callsign', '不明') if isinstance(info, dict) else info # 互換性対応
        log_message(f"IP: {ip}, コールサイン: {callsign}")
    log_message("----------------------------------", level="INFO")

# --- メイン処理 ---
def main():
    parser = argparse.ArgumentParser(
        description="DMR Monitorによって管理されるIPアドレスのブロック状態を手動で操作するユーティリティ。",
        formatter_class=argparse.RawTextHelpFormatter # ヘルプメッセージの整形を保持
    )

    # サブコマンドの定義
    subparsers = parser.add_subparsers(dest="command", help="利用可能なコマンド")

    # list コマンド
    list_parser = subparsers.add_parser("list", help="現在ブロックされているIPアドレスを一覧表示します。")

    # unblock コマンド
    unblock_parser = subparsers.add_parser("unblock", help="指定されたIPアドレスのブロックを解除します。")
    unblock_parser.add_argument("ip_address", type=str, help="ブロックを解除するIPアドレス")

    # allow コマンド
    allow_parser = subparsers.add_parser("allow", help="指定されたコールサインを許可リストに追加します。")
    allow_parser.add_argument("callsign", type=str, help="許可リストに追加するコールサイン")

    # disallow コマンド
    disallow_parser = subparsers.add_parser("disallow", help="指定されたコールサインを許可リストから削除します。")
    disallow_parser.add_argument("callsign", type=str, help="許可リストから削除するコールサイン")

    # known コマンド
    known_parser = subparsers.add_parser("known", help="DMR Monitorがこれまでに検出した既知のIPアドレスとコールサインの一覧を表示します。")


    if len(sys.argv) == 1:
        parser.print_help(sys.stderr)
        sys.exit(1)

    args = parser.parse_args()

    if args.command == "list":
        list_dropped_ips()
    elif args.command == "unblock":
        unblock_ip(args.ip_address)
    elif args.command == "allow":
        allow_callsign(args.callsign)
    elif args.command == "disallow":
        disallow_callsign(args.callsign)
    elif args.command == "known":
        list_known_ips()
    else:
        parser.print_help(sys.stderr)
        sys.exit(1)

if __name__ == "__main__":
    main()

これで、DMRモニターシステムをより詳細に管理するためのdmruser コマンドが利用可能になります。

はい、承知いたしました。dmruser コマンドの全文ソースコードを提供します。

このスクリプトは、先に提示した仕様書に基づいて、dmr_monitor5.30C.py と同じデータファイルを操作するように設計されています。


dmruser コマンド 全文ソースコード

Python

#!/usr/bin/env python3

import os

import sys

import json

import subprocess

import argparse

import datetime # ログ出力のために追加

# — 設定項目 (dmr_monitor と共通) —

SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))

CALLSIGN_FILE = os.path.join(SCRIPT_DIR, “allowed_callsigns.txt”)

DROPPED_IP_FILE = os.path.join(SCRIPT_DIR, “dropped_ips.json”)

KNOWN_IP_FILE = os.path.join(SCRIPT_DIR, “known_ips.json”)

DROP_PORTS = [62030, 8880, 30051, 42000] # iptables操作に必要

# — ログ出力ヘルパー関数 —

def log_message(message, level=”INFO”, file=sys.stdout):

    “””

    メッセージに現在時刻のタイムスタンプを付与して出力します。

    dmr_monitor と同様のフォーマット。

    “””

    timestamp = datetime.datetime.now().strftime(“%Y-%m-%d %H:%M:%S”)

    print(f”[{timestamp}] [{level}] {message}”, file=file)

# — ファイル操作ヘルパー関数 —

def load_json_file(file_path, default_data={}):

    “””JSONファイルを読み込み、存在しない場合はデフォルトデータを返す。”””

    if not os.path.exists(file_path):

        log_message(f”情報ファイル ‘{os.path.basename(file_path)}’ が見つかりません。新規作成されます。”, level=”WARNING”)

        return default_data

    try:

        with open(file_path, “r”) as f:

            return json.load(f)

    except json.JSONDecodeError as e:

        log_message(f”ファイル ‘{os.path.basename(file_path)}’ の解析エラー: {e}”, level=”ERROR”, file=sys.stderr)

        log_message(f”‘{os.path.basename(file_path)}’ を空のデータで初期化します。”, level=”WARNING”)

        return default_data

    except IOError as e:

        log_message(f”ファイル ‘{os.path.basename(file_path)}’ の読み込みエラー: {e}”, level=”ERROR”, file=sys.stderr)

        return default_data

def save_json_file(file_path, data):

    “””JSONファイルを保存する。”””

    try:

        with open(file_path, “w”) as f:

            json.dump(data, f, indent=4)

    except IOError as e:

        log_message(f”ファイル ‘{os.path.basename(file_path)}’ の書き込みエラー: {e}”, level=”ERROR”, file=sys.stderr)

        return False

    return True

def load_text_file(file_path):

    “””テキストファイルを読み込み、各行をセットとして返す。”””

    if not os.path.exists(file_path):

        log_message(f”許可リストファイル ‘{os.path.basename(file_path)}’ が見つかりません。新規作成されます。”, level=”WARNING”)

        return set()

    try:

        with open(file_path, “r”) as f:

            return {line.strip().upper() for line in f if line.strip()}

    except IOError as e:

        log_message(f”ファイル ‘{os.path.basename(file_path)}’ の読み込みエラー: {e}”, level=”ERROR”, file=sys.stderr)

        return set()

def save_text_file(file_path, data_set):

    “””テキストファイルを保存する。”””

    try:

        with open(file_path, “w”) as f:

            for item in sorted(list(data_set)): # ソートして書き込む

                f.write(item + “\n”)

    except IOError as e:

        log_message(f”ファイル ‘{os.path.basename(file_path)}’ の書き込みエラー: {e}”, level=”ERROR”, file=sys.stderr)

        return False

    return True

# — iptables操作関数 (dmr_monitor と共通の一部) —

def remove_iptables_drop_rules(ip):

    “””

    指定されたIPアドレスに対するDROPルールをすべて削除する。

    dmr_monitor の remove_drop 関数からiptables部分を抽出。

    “””

    removed_any = False

    for port in DROP_PORTS:

        while subprocess.run([“iptables”, “-D”, “INPUT”, “-p”, “udp”, “–dport”, str(port), “-s”, ip, “-j”, “DROP”],

                             stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0:

            removed_any = True

            log_message(f”    -> iptables DROPルールを削除: IP={ip}, Port={port}”, level=”INFO”)

            pass # 削除が成功したらループを継続して、全ての該当ルールを削除

    if not removed_any:

        log_message(f”    -> IP={ip} に対するiptablesルールは元々存在しませんでした。”, level=”INFO”)

    return removed_any

# — コマンド処理関数 —

def list_dropped_ips():

    “””現在DROP中のIPアドレスを一覧表示する。”””

    dropped_ips = load_json_file(DROPPED_IP_FILE)

    if not dropped_ips:

        log_message(“現在ブロックされているIPアドレスはありません。”, level=”INFO”)

        return

    log_message(“— 現在ブロックされているIPアドレス —“, level=”INFO”)

    for ip, info in dropped_ips.items():

        callsign = info.get(‘callsign’, ‘不明’) if isinstance(info, dict) else info # 互換性対応

        log_message(f”IP: {ip}, コールサイン: {callsign}”)

    log_message(“————————————-“, level=”INFO”)

def unblock_ip(ip_address):

    “””指定されたIPアドレスのブロックを解除する。”””

    # root権限チェック

    if os.geteuid() != 0:

        log_message(“このコマンドはroot権限で実行する必要があります。 (例: sudo dmruser unblock <IP_ADDRESS>)”, level=”ERROR”, file=sys.stderr)

        sys.exit(1)

    dropped_ips = load_json_file(DROPPED_IP_FILE)

    if ip_address not in dropped_ips:

        log_message(f”IPアドレス ‘{ip_address}’ はブロックリストにありません。”, level=”WARNING”)

        return

    callsign = dropped_ips.get(ip_address, ‘不明’)

    if isinstance(callsign, dict): # 新しい形式の場合

        callsign = callsign.get(‘callsign’, ‘不明’)

    log_message(f”IPアドレス ‘{ip_address}’ (コールサイン: {callsign}) のブロックを解除します…”, level=”INFO”)

    # 1. iptables ルールを削除

    remove_iptables_drop_rules(ip_address)

    # 2. dropped_ips.json から情報を削除

    del dropped_ips[ip_address]

    if save_json_file(DROPPED_IP_FILE, dropped_ips):

        log_message(f”IPアドレス ‘{ip_address}’ (コールサイン: {callsign}) のブロック解除が完了しました。”, level=”INFO”)

    else:

        log_message(f”IPアドレス ‘{ip_address}’ のブロックは解除されましたが、ファイルへの保存に失敗しました。”, level=”ERROR”, file=sys.stderr)

def allow_callsign(callsign):

    “””コールサインを許可リストに追加する。”””

    # root権限チェック

    if os.geteuid() != 0:

        log_message(“このコマンドはroot権限で実行する必要があります。 (例: sudo dmruser allow <CALLSIGN>)”, level=”ERROR”, file=sys.stderr)

        sys.exit(1)

    callsign_upper = callsign.upper()

    allowed_callsigns = load_text_file(CALLSIGN_FILE)

    if callsign_upper in allowed_callsigns:

        log_message(f”コールサイン ‘{callsign_upper}’ は既に許可リストにあります。”, level=”WARNING”)

        return

    allowed_callsigns.add(callsign_upper)

    if save_text_file(CALLSIGN_FILE, allowed_callsigns):

        log_message(f”コールサイン ‘{callsign_upper}’ を許可リストに追加しました。”, level=”INFO”)

        log_message(“dmr_monitor はファイルの変更を検知し、自動的にルールを更新します。”, level=”INFO”)

    else:

        log_message(f”コールサイン ‘{callsign_upper}’ の許可リストへの追加に失敗しました。”, level=”ERROR”, file=sys.stderr)

def disallow_callsign(callsign):

    “””コールサインを許可リストから削除する。”””

    # root権限チェック

    if os.geteuid() != 0:

        log_message(“このコマンドはroot権限で実行する必要があります。 (例: sudo dmruser disallow <CALLSIGN>)”, level=”ERROR”, file=sys.stderr)

        sys.exit(1)

    callsign_upper = callsign.upper()

    allowed_callsigns = load_text_file(CALLSIGN_FILE)

    if callsign_upper not in allowed_callsigns:

        log_message(f”コールサイン ‘{callsign_upper}’ は許可リストにありません。”, level=”WARNING”)

        return

    allowed_callsigns.remove(callsign_upper)

    if save_text_file(CALLSIGN_FILE, allowed_callsigns):

        log_message(f”コールサイン ‘{callsign_upper}’ を許可リストから削除しました。”, level=”INFO”)

        log_message(“dmr_monitor はファイルの変更を検知し、自動的にルールを更新します。”, level=”INFO”)

    else:

        log_message(f”コールサイン ‘{callsign_upper}’ の許可リストからの削除に失敗しました。”, level=”ERROR”, file=sys.stderr)

def list_known_ips():

    “””過去に検出された既知のIPアドレスを一覧表示する。”””

    known_ips = load_json_file(KNOWN_IP_FILE)

    if not known_ips:

        log_message(“既知のIPアドレス情報はありません。”, level=”INFO”)

        return

    log_message(“— 既知のIPアドレスとコールサイン —“, level=”INFO”)

    # 辞書をコールサインでソートして表示

    sorted_items = sorted(known_ips.items(), key=lambda item: item[1].get(‘callsign’, ”) if isinstance(item[1], dict) else item[1])

    for ip, info in sorted_items:

        callsign = info.get(‘callsign’, ‘不明’) if isinstance(info, dict) else info # 互換性対応

        log_message(f”IP: {ip}, コールサイン: {callsign}”)

    log_message(“———————————-“, level=”INFO”)

# — メイン処理 —

def main():

    parser = argparse.ArgumentParser(

        description=”DMR Monitorによって管理されるIPアドレスのブロック状態を手動で操作するユーティリティ。”,

        formatter_class=argparse.RawTextHelpFormatter # ヘルプメッセージの整形を保持

    )

    # サブコマンドの定義

    subparsers = parser.add_subparsers(dest=”command”, help=”利用可能なコマンド”)

    # list コマンド

    list_parser = subparsers.add_parser(“list”, help=”現在ブロックされているIPアドレスを一覧表示します。”)

    # unblock コマンド

    unblock_parser = subparsers.add_parser(“unblock”, help=”指定されたIPアドレスのブロックを解除します。”)

    unblock_parser.add_argument(“ip_address”, type=str, help=”ブロックを解除するIPアドレス”)

    # allow コマンド

    allow_parser = subparsers.add_parser(“allow”, help=”指定されたコールサインを許可リストに追加します。”)

    allow_parser.add_argument(“callsign”, type=str, help=”許可リストに追加するコールサイン”)

    # disallow コマンド

    disallow_parser = subparsers.add_parser(“disallow”, help=”指定されたコールサインを許可リストから削除します。”)

    disallow_parser.add_argument(“callsign”, type=str, help=”許可リストから削除するコールサイン”)

    # known コマンド

    known_parser = subparsers.add_parser(“known”, help=”DMR Monitorがこれまでに検出した既知のIPアドレスとコールサインの一覧を表示します。”)

    if len(sys.argv) == 1:

        parser.print_help(sys.stderr)

        sys.exit(1)

    args = parser.parse_args()

    if args.command == “list”:

        list_dropped_ips()

    elif args.command == “unblock”:

        unblock_ip(args.ip_address)

    elif args.command == “allow”:

        allow_callsign(args.callsign)

    elif args.command == “disallow”:

        disallow_callsign(args.callsign)

    elif args.command == “known”:

        list_known_ips()

    else:

        parser.print_help(sys.stderr)

        sys.exit(1)

if __name__ == “__main__”:

    main()


dmruser の導入と使用方法

  1. ファイルの保存:

上記のコードを例えば /usr/local/bin/dmruser という名前で保存します。

(/usr/local/bin は PATH が通っているため、コマンドとして直接実行できます。)

  1. 実行権限の付与:

保存したファイルに実行権限を付与します。

Bash

sudo chmod +x /usr/local/bin/dmruser

  1. 使用例:
    • ブロック中のIPアドレスを一覧表示:

Bash

sudo dmruser list

  1. 特定のIPアドレスのブロックを解除:

Bash

sudo dmruser unblock 192.168.1.100

  1. コールサインを許可リストに追加:

Bash

sudo dmruser allow JG1XYZ

  1. コールサインを許可リストから削除:

Bash

sudo dmruser disallow JG1XYZ

  1. 既知のIPアドレスとコールサインの一覧表示:

Bash

sudo dmruser known

  1. ヘルプの表示:

Bash

dmruser –help

重要な注意点

  • 実行ディレクトリ: dmruser スクリプトは、dmr_monitor5.30C.py と同じディレクトリ(例: /home/kshinoda/)に allowed_callsigns.txt, dropped_ips.json, known_ips.json が存在することを前提としています。もし dmruser を /usr/local/bin のような別の場所に配置する場合、これらのデータファイルをシンボリックリンクで貼るか、同じディレクトリにコピーするなどして、dmruser がデータファイルを見つけられるようにする必要があります。推奨はシンボリックリンクです。

例:

Bash

sudo ln -s /home/kshinoda/allowed_callsigns.txt /usr/local/bin/allowed_callsigns.txt

sudo ln -s /home/kshinoda/dropped_ips.json /usr/local/bin/dropped_ips.json

sudo ln -s /home/kshinoda/known_ips.json /usr/local/bin/known_ips.json

より良い方法は、dmruser スクリプト内でデータファイルのパスを絶対パスで指定するか、dmr_monitor の作業ディレクトリ(systemdWorkingDirectory)から実行されるようにすることです。 現状のコードでは、SCRIPT_DIR を基準にデータファイルを読み込むため、dmruser を /usr/local/bin に置くと、データファイルも /usr/local/bin にあると仮定してしまいます。

【改善案】: dmr_monitor5.30C.py と同様に、dmruser スクリプトの冒頭でデータファイルのパスを絶対パスで指定するか、systemd の WorkingDirectory を利用して、スクリプトがデータファイルと同じディレクトリから実行されるように設定してください。

例: dmruser の CALLSIGN_FILE などの定義を以下のように修正します。

Python

# — 設定項目 (dmr_monitor と共通) —

# SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) # この行は削除またはコメントアウト

DATA_FILES_BASE_DIR = “/home/kshinoda” # ★ データファイルが実際に存在する絶対パスに修正 ★

CALLSIGN_FILE = os.path.join(DATA_FILES_BASE_DIR, “allowed_callsigns.txt”)

DROPPED_IP_FILE = os.path.join(DATA_FILES_BASE_DIR, “dropped_ips.json”)

KNOWN_IP_FILE = os.path.join(DATA_FILES_BASE_DIR, “known_ips.json”)

この修正を dmruser スクリプトに適用すると、どこから実行しても /home/kshinoda/ のデータファイルを正しく参照するようになります。

  • Root権限: unblock, allow, disallow コマンドは iptables の操作やシステムファイルの書き込みを行うため、sudo を付けて実行する必要があります。

これで、DMRモニターシステムをより詳細に管理するためのdmruser コマンドが利用可能になります。

コメントする

メールアドレスが公開されることはありません。 が付いている欄は必須項目です