DMR User (dmruser) コマンド仕様書
1. 概要
dmruser は、DMR Monitor (dmr_monitor5.30C.py) が管理するIPアドレスのブロック状態を、コマンドラインから手動で操作するためのユーティリティスクリプトです。このコマンドは、主にブロックされたIPアドレスの確認と解除(ホワイトリストへの追加、またはブロックリストからの削除)を目的としています。
2. 目的
- ブロック状態の確認: 現在ブロックされているIPアドレスと、関連するコールサインの情報を一覧表示する。
- ブロックの解除: ブロックされたIPアドレスを明示的にブロックリストから削除し、DMR Monitorにその変更を反映させる。
- 許可リストへの追加: 指定したコールサインを許可リスト (
allowed_callsigns.txt) に追加する。 - 既知IP情報の表示: DMR Monitorがこれまでに検出したIPアドレスとコールサインの対応情報を表示する。
3. 機能要件
dmruser は以下の機能を提供します。
| 機能カテゴリ | 機能名 | コマンド/オプション | 定量的/具体的記述 |
| ブロック管理 | ブロック中IPの表示 | dmruser list | 現在 dropped_ips.json に記録され、DMR MonitorによってブロックされているIPアドレスとコールサインの一覧を表示します。IPアドレス、関連コールサイン、ブロック日時(オプション)が含まれます。 |
| IPブロックの解除 | dmruser unblock <IP_ADDRESS> | 指定された <IP_ADDRESS> に対する iptables のDROPルールを削除し、dropped_ips.json からもそのIP情報を削除します。これにより、当該IPはDMR Monitorの監視対象に戻り、次回の不正アクセス検知まで通信が可能になります。 | |
| 許可リスト管理 | コールサインの許可追加 | dmruser allow <CALLSIGN> | 指定された <CALLSIGN> を allowed_callsigns.txt に追加します。大文字・小文字は区別されず、自動的に大文字で保存されます。DMR Monitorはファイル変更を検知し、即座に当該コールサインを持つIPアドレスのブロックを解除します。 |
| コールサインの許可解除 | dmruser disallow <CALLSIGN> | 指定された <CALLSIGN> を allowed_callsigns.txt から削除します。これにより、当該コールサインからのアクセスはDMR Monitorの監視対象となり、不正アクセスと見なされればブロックされます。 | |
| 情報表示 | 既知IPの表示 | dmruser known | known_ips.json に記録されている、DMR Monitorがこれまでに検出した全てのIPアドレスとコールサインの対応情報を表示します。IPアドレス、関連コールサインが含まれます。 |
| ヘルプ表示 | コマンドヘルプ | dmruser --help / dmruser -h | 利用可能なコマンドオプションと使用方法の詳細を表示します。 |
| ファイルパス設定 | データファイルパス指定 | (コマンドライン引数で定義) | dmruser が参照する allowed_callsigns.txt, dropped_ips.json, known_ips.json のパスは、スクリプト内部で dmr_monitor5.30C.py と同じく、dmruser スクリプトの実行ディレクトリを基準に設定されます。これにより、両スクリプトが同じデータセットを操作します。 |
| エラーハンドリング | 無効な入力の検知 | (自動) | 存在しないIPアドレスの解除試行、無効なコマンドオプション、ファイル操作エラーなど、無効な入力やエラー発生時には適切なエラーメッセージを標準エラー出力 (sys.stderr) に表示し、非ゼロの終了コードで終了します。 |
Google スプレッドシートにエクスポート
4. 非機能要件
- 互換性: DMR Monitor (
dmr_monitor5.30C.py) と同じデータファイルを共有し、シームレスな連携を実現します。 - 堅牢性: ファイルの読み書きにおけるエラー処理を実装し、データ破損や不整合のリスクを低減します。
- 利便性: コマンドラインからの直感的な操作を可能にし、システム管理者の負担を軽減します。
- セキュリティ:
iptablesの変更を伴う操作にはsudo権限を要求します。
5. 構成要素とファイルパス
dmruser は通常、dmr_monitor5.30C.py と同じディレクトリに配置され、同じデータファイルを共有します。
| 項目 | ファイル/パス | 役割 | 備考 |
| スクリプト本体 | dmruser (例: /usr/local/bin/dmruser) | コマンドラインインターフェースを提供 | Pythonスクリプトとして実装されることを想定 |
| データファイル | allowed_callsigns.txt | 許可されたDMRコールサインのリスト | dmr_monitor と共有。スクリプト相対パスでアクセス |
dropped_ips.json | 現在ブロック中のIPとそのコールサイン情報 | dmr_monitor と共有。スクリプト相対パスでアクセス | |
known_ips.json | 過去に検知された全てのIPとそのコールサインの対応 | dmr_monitor と共有。スクリプト相対パスでアクセス |
Google スプレッドシートにエクスポート
6. 環境要件
- OS: Linux (
iptablesが利用可能な環境) - Python: Python 3.x
- Python パッケージ: 標準ライブラリのみ(
re,subprocess,time,os,json,sys,argparse,collections)。追加のインストールは不要。 - 権限:
unblockコマンドなどiptablesを操作する機能についてはrootユーザー権限(sudo)が必要です。
7. 想定されるコマンドライン構文
- ブロック中のIPアドレス一覧表示:Bash
sudo dmruser list - 特定のIPアドレスのブロック解除:Bash
sudo dmruser unblock 192.168.1.50 - コールサインを許可リストに追加:Bash
sudo dmruser allow JP1XYZ - コールサインを許可リストから削除:Bash
sudo dmruser disallow JP1XYZ - 既知IPアドレスとコールサインの一覧表示:Bash
sudo dmruser known - ヘルプ表示:Bash
dmruser --help
この仕様書は dmruser コマンドの設計と機能の概要を説明しています。具体的な実装は、これらの要件に基づいて行われます。
#!/usr/bin/env python3
import os
import sys
import json
import subprocess
import argparse
import datetime # ログ出力のために追加
# --- 設定項目 (dmr_monitor と共通) ---
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
CALLSIGN_FILE = os.path.join(SCRIPT_DIR, "allowed_callsigns.txt")
DROPPED_IP_FILE = os.path.join(SCRIPT_DIR, "dropped_ips.json")
KNOWN_IP_FILE = os.path.join(SCRIPT_DIR, "known_ips.json")
DROP_PORTS = [62030, 8880, 30051, 42000] # iptables操作に必要
# --- ログ出力ヘルパー関数 ---
def log_message(message, level="INFO", file=sys.stdout):
"""
メッセージに現在時刻のタイムスタンプを付与して出力します。
dmr_monitor と同様のフォーマット。
"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] [{level}] {message}", file=file)
# --- ファイル操作ヘルパー関数 ---
def load_json_file(file_path, default_data={}):
"""JSONファイルを読み込み、存在しない場合はデフォルトデータを返す。"""
if not os.path.exists(file_path):
log_message(f"情報ファイル '{os.path.basename(file_path)}' が見つかりません。新規作成されます。", level="WARNING")
return default_data
try:
with open(file_path, "r") as f:
return json.load(f)
except json.JSONDecodeError as e:
log_message(f"ファイル '{os.path.basename(file_path)}' の解析エラー: {e}", level="ERROR", file=sys.stderr)
log_message(f"'{os.path.basename(file_path)}' を空のデータで初期化します。", level="WARNING")
return default_data
except IOError as e:
log_message(f"ファイル '{os.path.basename(file_path)}' の読み込みエラー: {e}", level="ERROR", file=sys.stderr)
return default_data
def save_json_file(file_path, data):
"""JSONファイルを保存する。"""
try:
with open(file_path, "w") as f:
json.dump(data, f, indent=4)
except IOError as e:
log_message(f"ファイル '{os.path.basename(file_path)}' の書き込みエラー: {e}", level="ERROR", file=sys.stderr)
return False
return True
def load_text_file(file_path):
"""テキストファイルを読み込み、各行をセットとして返す。"""
if not os.path.exists(file_path):
log_message(f"許可リストファイル '{os.path.basename(file_path)}' が見つかりません。新規作成されます。", level="WARNING")
return set()
try:
with open(file_path, "r") as f:
return {line.strip().upper() for line in f if line.strip()}
except IOError as e:
log_message(f"ファイル '{os.path.basename(file_path)}' の読み込みエラー: {e}", level="ERROR", file=sys.stderr)
return set()
def save_text_file(file_path, data_set):
"""テキストファイルを保存する。"""
try:
with open(file_path, "w") as f:
for item in sorted(list(data_set)): # ソートして書き込む
f.write(item + "\n")
except IOError as e:
log_message(f"ファイル '{os.path.basename(file_path)}' の書き込みエラー: {e}", level="ERROR", file=sys.stderr)
return False
return True
# --- iptables操作関数 (dmr_monitor と共通の一部) ---
def remove_iptables_drop_rules(ip):
"""
指定されたIPアドレスに対するDROPルールをすべて削除する。
dmr_monitor の remove_drop 関数からiptables部分を抽出。
"""
removed_any = False
for port in DROP_PORTS:
while subprocess.run(["iptables", "-D", "INPUT", "-p", "udp", "--dport", str(port), "-s", ip, "-j", "DROP"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0:
removed_any = True
log_message(f" -> iptables DROPルールを削除: IP={ip}, Port={port}", level="INFO")
pass # 削除が成功したらループを継続して、全ての該当ルールを削除
if not removed_any:
log_message(f" -> IP={ip} に対するiptablesルールは元々存在しませんでした。", level="INFO")
return removed_any
# --- コマンド処理関数 ---
def list_dropped_ips():
"""現在DROP中のIPアドレスを一覧表示する。"""
dropped_ips = load_json_file(DROPPED_IP_FILE)
if not dropped_ips:
log_message("現在ブロックされているIPアドレスはありません。", level="INFO")
return
log_message("--- 現在ブロックされているIPアドレス ---", level="INFO")
for ip, info in dropped_ips.items():
callsign = info.get('callsign', '不明') if isinstance(info, dict) else info # 互換性対応
log_message(f"IP: {ip}, コールサイン: {callsign}")
log_message("-------------------------------------", level="INFO")
def unblock_ip(ip_address):
"""指定されたIPアドレスのブロックを解除する。"""
# root権限チェック
if os.geteuid() != 0:
log_message("このコマンドはroot権限で実行する必要があります。 (例: sudo dmruser unblock <IP_ADDRESS>)", level="ERROR", file=sys.stderr)
sys.exit(1)
dropped_ips = load_json_file(DROPPED_IP_FILE)
if ip_address not in dropped_ips:
log_message(f"IPアドレス '{ip_address}' はブロックリストにありません。", level="WARNING")
return
callsign = dropped_ips.get(ip_address, '不明')
if isinstance(callsign, dict): # 新しい形式の場合
callsign = callsign.get('callsign', '不明')
log_message(f"IPアドレス '{ip_address}' (コールサイン: {callsign}) のブロックを解除します...", level="INFO")
# 1. iptables ルールを削除
remove_iptables_drop_rules(ip_address)
# 2. dropped_ips.json から情報を削除
del dropped_ips[ip_address]
if save_json_file(DROPPED_IP_FILE, dropped_ips):
log_message(f"IPアドレス '{ip_address}' (コールサイン: {callsign}) のブロック解除が完了しました。", level="INFO")
else:
log_message(f"IPアドレス '{ip_address}' のブロックは解除されましたが、ファイルへの保存に失敗しました。", level="ERROR", file=sys.stderr)
def allow_callsign(callsign):
"""コールサインを許可リストに追加する。"""
# root権限チェック
if os.geteuid() != 0:
log_message("このコマンドはroot権限で実行する必要があります。 (例: sudo dmruser allow <CALLSIGN>)", level="ERROR", file=sys.stderr)
sys.exit(1)
callsign_upper = callsign.upper()
allowed_callsigns = load_text_file(CALLSIGN_FILE)
if callsign_upper in allowed_callsigns:
log_message(f"コールサイン '{callsign_upper}' は既に許可リストにあります。", level="WARNING")
return
allowed_callsigns.add(callsign_upper)
if save_text_file(CALLSIGN_FILE, allowed_callsigns):
log_message(f"コールサイン '{callsign_upper}' を許可リストに追加しました。", level="INFO")
log_message("dmr_monitor はファイルの変更を検知し、自動的にルールを更新します。", level="INFO")
else:
log_message(f"コールサイン '{callsign_upper}' の許可リストへの追加に失敗しました。", level="ERROR", file=sys.stderr)
def disallow_callsign(callsign):
"""コールサインを許可リストから削除する。"""
# root権限チェック
if os.geteuid() != 0:
log_message("このコマンドはroot権限で実行する必要があります。 (例: sudo dmruser disallow <CALLSIGN>)", level="ERROR", file=sys.stderr)
sys.exit(1)
callsign_upper = callsign.upper()
allowed_callsigns = load_text_file(CALLSIGN_FILE)
if callsign_upper not in allowed_callsigns:
log_message(f"コールサイン '{callsign_upper}' は許可リストにありません。", level="WARNING")
return
allowed_callsigns.remove(callsign_upper)
if save_text_file(CALLSIGN_FILE, allowed_callsigns):
log_message(f"コールサイン '{callsign_upper}' を許可リストから削除しました。", level="INFO")
log_message("dmr_monitor はファイルの変更を検知し、自動的にルールを更新します。", level="INFO")
else:
log_message(f"コールサイン '{callsign_upper}' の許可リストからの削除に失敗しました。", level="ERROR", file=sys.stderr)
def list_known_ips():
"""過去に検出された既知のIPアドレスを一覧表示する。"""
known_ips = load_json_file(KNOWN_IP_FILE)
if not known_ips:
log_message("既知のIPアドレス情報はありません。", level="INFO")
return
log_message("--- 既知のIPアドレスとコールサイン ---", level="INFO")
# 辞書をコールサインでソートして表示
sorted_items = sorted(known_ips.items(), key=lambda item: item[1].get('callsign', '') if isinstance(item[1], dict) else item[1])
for ip, info in sorted_items:
callsign = info.get('callsign', '不明') if isinstance(info, dict) else info # 互換性対応
log_message(f"IP: {ip}, コールサイン: {callsign}")
log_message("----------------------------------", level="INFO")
# --- メイン処理 ---
def main():
parser = argparse.ArgumentParser(
description="DMR Monitorによって管理されるIPアドレスのブロック状態を手動で操作するユーティリティ。",
formatter_class=argparse.RawTextHelpFormatter # ヘルプメッセージの整形を保持
)
# サブコマンドの定義
subparsers = parser.add_subparsers(dest="command", help="利用可能なコマンド")
# list コマンド
list_parser = subparsers.add_parser("list", help="現在ブロックされているIPアドレスを一覧表示します。")
# unblock コマンド
unblock_parser = subparsers.add_parser("unblock", help="指定されたIPアドレスのブロックを解除します。")
unblock_parser.add_argument("ip_address", type=str, help="ブロックを解除するIPアドレス")
# allow コマンド
allow_parser = subparsers.add_parser("allow", help="指定されたコールサインを許可リストに追加します。")
allow_parser.add_argument("callsign", type=str, help="許可リストに追加するコールサイン")
# disallow コマンド
disallow_parser = subparsers.add_parser("disallow", help="指定されたコールサインを許可リストから削除します。")
disallow_parser.add_argument("callsign", type=str, help="許可リストから削除するコールサイン")
# known コマンド
known_parser = subparsers.add_parser("known", help="DMR Monitorがこれまでに検出した既知のIPアドレスとコールサインの一覧を表示します。")
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)
args = parser.parse_args()
if args.command == "list":
list_dropped_ips()
elif args.command == "unblock":
unblock_ip(args.ip_address)
elif args.command == "allow":
allow_callsign(args.callsign)
elif args.command == "disallow":
disallow_callsign(args.callsign)
elif args.command == "known":
list_known_ips()
else:
parser.print_help(sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
これで、DMRモニターシステムをより詳細に管理するためのdmruser コマンドが利用可能になります。
はい、承知いたしました。dmruser コマンドの全文ソースコードを提供します。
このスクリプトは、先に提示した仕様書に基づいて、dmr_monitor5.30C.py と同じデータファイルを操作するように設計されています。
dmruser コマンド 全文ソースコード
Python
#!/usr/bin/env python3
import os
import sys
import json
import subprocess
import argparse
import datetime # ログ出力のために追加
# — 設定項目 (dmr_monitor と共通) —
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
CALLSIGN_FILE = os.path.join(SCRIPT_DIR, “allowed_callsigns.txt”)
DROPPED_IP_FILE = os.path.join(SCRIPT_DIR, “dropped_ips.json”)
KNOWN_IP_FILE = os.path.join(SCRIPT_DIR, “known_ips.json”)
DROP_PORTS = [62030, 8880, 30051, 42000] # iptables操作に必要
# — ログ出力ヘルパー関数 —
def log_message(message, level=”INFO”, file=sys.stdout):
“””
メッセージに現在時刻のタイムスタンプを付与して出力します。
dmr_monitor と同様のフォーマット。
“””
timestamp = datetime.datetime.now().strftime(“%Y-%m-%d %H:%M:%S”)
print(f”[{timestamp}] [{level}] {message}”, file=file)
# — ファイル操作ヘルパー関数 —
def load_json_file(file_path, default_data={}):
“””JSONファイルを読み込み、存在しない場合はデフォルトデータを返す。”””
if not os.path.exists(file_path):
log_message(f”情報ファイル ‘{os.path.basename(file_path)}’ が見つかりません。新規作成されます。”, level=”WARNING”)
return default_data
try:
with open(file_path, “r”) as f:
return json.load(f)
except json.JSONDecodeError as e:
log_message(f”ファイル ‘{os.path.basename(file_path)}’ の解析エラー: {e}”, level=”ERROR”, file=sys.stderr)
log_message(f”‘{os.path.basename(file_path)}’ を空のデータで初期化します。”, level=”WARNING”)
return default_data
except IOError as e:
log_message(f”ファイル ‘{os.path.basename(file_path)}’ の読み込みエラー: {e}”, level=”ERROR”, file=sys.stderr)
return default_data
def save_json_file(file_path, data):
“””JSONファイルを保存する。”””
try:
with open(file_path, “w”) as f:
json.dump(data, f, indent=4)
except IOError as e:
log_message(f”ファイル ‘{os.path.basename(file_path)}’ の書き込みエラー: {e}”, level=”ERROR”, file=sys.stderr)
return False
return True
def load_text_file(file_path):
“””テキストファイルを読み込み、各行をセットとして返す。”””
if not os.path.exists(file_path):
log_message(f”許可リストファイル ‘{os.path.basename(file_path)}’ が見つかりません。新規作成されます。”, level=”WARNING”)
return set()
try:
with open(file_path, “r”) as f:
return {line.strip().upper() for line in f if line.strip()}
except IOError as e:
log_message(f”ファイル ‘{os.path.basename(file_path)}’ の読み込みエラー: {e}”, level=”ERROR”, file=sys.stderr)
return set()
def save_text_file(file_path, data_set):
“””テキストファイルを保存する。”””
try:
with open(file_path, “w”) as f:
for item in sorted(list(data_set)): # ソートして書き込む
f.write(item + “\n”)
except IOError as e:
log_message(f”ファイル ‘{os.path.basename(file_path)}’ の書き込みエラー: {e}”, level=”ERROR”, file=sys.stderr)
return False
return True
# — iptables操作関数 (dmr_monitor と共通の一部) —
def remove_iptables_drop_rules(ip):
“””
指定されたIPアドレスに対するDROPルールをすべて削除する。
dmr_monitor の remove_drop 関数からiptables部分を抽出。
“””
removed_any = False
for port in DROP_PORTS:
while subprocess.run([“iptables”, “-D”, “INPUT”, “-p”, “udp”, “–dport”, str(port), “-s”, ip, “-j”, “DROP”],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0:
removed_any = True
log_message(f” -> iptables DROPルールを削除: IP={ip}, Port={port}”, level=”INFO”)
pass # 削除が成功したらループを継続して、全ての該当ルールを削除
if not removed_any:
log_message(f” -> IP={ip} に対するiptablesルールは元々存在しませんでした。”, level=”INFO”)
return removed_any
# — コマンド処理関数 —
def list_dropped_ips():
“””現在DROP中のIPアドレスを一覧表示する。”””
dropped_ips = load_json_file(DROPPED_IP_FILE)
if not dropped_ips:
log_message(“現在ブロックされているIPアドレスはありません。”, level=”INFO”)
return
log_message(“— 現在ブロックされているIPアドレス —“, level=”INFO”)
for ip, info in dropped_ips.items():
callsign = info.get(‘callsign’, ‘不明’) if isinstance(info, dict) else info # 互換性対応
log_message(f”IP: {ip}, コールサイン: {callsign}”)
log_message(“————————————-“, level=”INFO”)
def unblock_ip(ip_address):
“””指定されたIPアドレスのブロックを解除する。”””
# root権限チェック
if os.geteuid() != 0:
log_message(“このコマンドはroot権限で実行する必要があります。 (例: sudo dmruser unblock <IP_ADDRESS>)”, level=”ERROR”, file=sys.stderr)
sys.exit(1)
dropped_ips = load_json_file(DROPPED_IP_FILE)
if ip_address not in dropped_ips:
log_message(f”IPアドレス ‘{ip_address}’ はブロックリストにありません。”, level=”WARNING”)
return
callsign = dropped_ips.get(ip_address, ‘不明’)
if isinstance(callsign, dict): # 新しい形式の場合
callsign = callsign.get(‘callsign’, ‘不明’)
log_message(f”IPアドレス ‘{ip_address}’ (コールサイン: {callsign}) のブロックを解除します…”, level=”INFO”)
# 1. iptables ルールを削除
remove_iptables_drop_rules(ip_address)
# 2. dropped_ips.json から情報を削除
del dropped_ips[ip_address]
if save_json_file(DROPPED_IP_FILE, dropped_ips):
log_message(f”IPアドレス ‘{ip_address}’ (コールサイン: {callsign}) のブロック解除が完了しました。”, level=”INFO”)
else:
log_message(f”IPアドレス ‘{ip_address}’ のブロックは解除されましたが、ファイルへの保存に失敗しました。”, level=”ERROR”, file=sys.stderr)
def allow_callsign(callsign):
“””コールサインを許可リストに追加する。”””
# root権限チェック
if os.geteuid() != 0:
log_message(“このコマンドはroot権限で実行する必要があります。 (例: sudo dmruser allow <CALLSIGN>)”, level=”ERROR”, file=sys.stderr)
sys.exit(1)
callsign_upper = callsign.upper()
allowed_callsigns = load_text_file(CALLSIGN_FILE)
if callsign_upper in allowed_callsigns:
log_message(f”コールサイン ‘{callsign_upper}’ は既に許可リストにあります。”, level=”WARNING”)
return
allowed_callsigns.add(callsign_upper)
if save_text_file(CALLSIGN_FILE, allowed_callsigns):
log_message(f”コールサイン ‘{callsign_upper}’ を許可リストに追加しました。”, level=”INFO”)
log_message(“dmr_monitor はファイルの変更を検知し、自動的にルールを更新します。”, level=”INFO”)
else:
log_message(f”コールサイン ‘{callsign_upper}’ の許可リストへの追加に失敗しました。”, level=”ERROR”, file=sys.stderr)
def disallow_callsign(callsign):
“””コールサインを許可リストから削除する。”””
# root権限チェック
if os.geteuid() != 0:
log_message(“このコマンドはroot権限で実行する必要があります。 (例: sudo dmruser disallow <CALLSIGN>)”, level=”ERROR”, file=sys.stderr)
sys.exit(1)
callsign_upper = callsign.upper()
allowed_callsigns = load_text_file(CALLSIGN_FILE)
if callsign_upper not in allowed_callsigns:
log_message(f”コールサイン ‘{callsign_upper}’ は許可リストにありません。”, level=”WARNING”)
return
allowed_callsigns.remove(callsign_upper)
if save_text_file(CALLSIGN_FILE, allowed_callsigns):
log_message(f”コールサイン ‘{callsign_upper}’ を許可リストから削除しました。”, level=”INFO”)
log_message(“dmr_monitor はファイルの変更を検知し、自動的にルールを更新します。”, level=”INFO”)
else:
log_message(f”コールサイン ‘{callsign_upper}’ の許可リストからの削除に失敗しました。”, level=”ERROR”, file=sys.stderr)
def list_known_ips():
“””過去に検出された既知のIPアドレスを一覧表示する。”””
known_ips = load_json_file(KNOWN_IP_FILE)
if not known_ips:
log_message(“既知のIPアドレス情報はありません。”, level=”INFO”)
return
log_message(“— 既知のIPアドレスとコールサイン —“, level=”INFO”)
# 辞書をコールサインでソートして表示
sorted_items = sorted(known_ips.items(), key=lambda item: item[1].get(‘callsign’, ”) if isinstance(item[1], dict) else item[1])
for ip, info in sorted_items:
callsign = info.get(‘callsign’, ‘不明’) if isinstance(info, dict) else info # 互換性対応
log_message(f”IP: {ip}, コールサイン: {callsign}”)
log_message(“———————————-“, level=”INFO”)
# — メイン処理 —
def main():
parser = argparse.ArgumentParser(
description=”DMR Monitorによって管理されるIPアドレスのブロック状態を手動で操作するユーティリティ。”,
formatter_class=argparse.RawTextHelpFormatter # ヘルプメッセージの整形を保持
)
# サブコマンドの定義
subparsers = parser.add_subparsers(dest=”command”, help=”利用可能なコマンド”)
# list コマンド
list_parser = subparsers.add_parser(“list”, help=”現在ブロックされているIPアドレスを一覧表示します。”)
# unblock コマンド
unblock_parser = subparsers.add_parser(“unblock”, help=”指定されたIPアドレスのブロックを解除します。”)
unblock_parser.add_argument(“ip_address”, type=str, help=”ブロックを解除するIPアドレス”)
# allow コマンド
allow_parser = subparsers.add_parser(“allow”, help=”指定されたコールサインを許可リストに追加します。”)
allow_parser.add_argument(“callsign”, type=str, help=”許可リストに追加するコールサイン”)
# disallow コマンド
disallow_parser = subparsers.add_parser(“disallow”, help=”指定されたコールサインを許可リストから削除します。”)
disallow_parser.add_argument(“callsign”, type=str, help=”許可リストから削除するコールサイン”)
# known コマンド
known_parser = subparsers.add_parser(“known”, help=”DMR Monitorがこれまでに検出した既知のIPアドレスとコールサインの一覧を表示します。”)
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)
args = parser.parse_args()
if args.command == “list”:
list_dropped_ips()
elif args.command == “unblock”:
unblock_ip(args.ip_address)
elif args.command == “allow”:
allow_callsign(args.callsign)
elif args.command == “disallow”:
disallow_callsign(args.callsign)
elif args.command == “known”:
list_known_ips()
else:
parser.print_help(sys.stderr)
sys.exit(1)
if __name__ == “__main__”:
main()
dmruser の導入と使用方法
- ファイルの保存:
上記のコードを例えば /usr/local/bin/dmruser という名前で保存します。
(/usr/local/bin は PATH が通っているため、コマンドとして直接実行できます。)
- 実行権限の付与:
保存したファイルに実行権限を付与します。
Bash
sudo chmod +x /usr/local/bin/dmruser
- 使用例:
- ブロック中のIPアドレスを一覧表示:
Bash
sudo dmruser list
- 特定のIPアドレスのブロックを解除:
Bash
sudo dmruser unblock 192.168.1.100
- コールサインを許可リストに追加:
Bash
sudo dmruser allow JG1XYZ
- コールサインを許可リストから削除:
Bash
sudo dmruser disallow JG1XYZ
- 既知のIPアドレスとコールサインの一覧表示:
Bash
sudo dmruser known
- ヘルプの表示:
Bash
dmruser –help
重要な注意点
- 実行ディレクトリ: dmruser スクリプトは、dmr_monitor5.30C.py と同じディレクトリ(例: /home/kshinoda/)に allowed_callsigns.txt, dropped_ips.json, known_ips.json が存在することを前提としています。もし dmruser を /usr/local/bin のような別の場所に配置する場合、これらのデータファイルをシンボリックリンクで貼るか、同じディレクトリにコピーするなどして、dmruser がデータファイルを見つけられるようにする必要があります。推奨はシンボリックリンクです。
例:
Bash
sudo ln -s /home/kshinoda/allowed_callsigns.txt /usr/local/bin/allowed_callsigns.txt
sudo ln -s /home/kshinoda/dropped_ips.json /usr/local/bin/dropped_ips.json
sudo ln -s /home/kshinoda/known_ips.json /usr/local/bin/known_ips.json
より良い方法は、dmruser スクリプト内でデータファイルのパスを絶対パスで指定するか、dmr_monitor の作業ディレクトリ(systemdのWorkingDirectory)から実行されるようにすることです。 現状のコードでは、SCRIPT_DIR を基準にデータファイルを読み込むため、dmruser を /usr/local/bin に置くと、データファイルも /usr/local/bin にあると仮定してしまいます。
【改善案】: dmr_monitor5.30C.py と同様に、dmruser スクリプトの冒頭でデータファイルのパスを絶対パスで指定するか、systemd の WorkingDirectory を利用して、スクリプトがデータファイルと同じディレクトリから実行されるように設定してください。
例: dmruser の CALLSIGN_FILE などの定義を以下のように修正します。
Python
# — 設定項目 (dmr_monitor と共通) —
# SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) # この行は削除またはコメントアウト
DATA_FILES_BASE_DIR = “/home/kshinoda” # ★ データファイルが実際に存在する絶対パスに修正 ★
CALLSIGN_FILE = os.path.join(DATA_FILES_BASE_DIR, “allowed_callsigns.txt”)
DROPPED_IP_FILE = os.path.join(DATA_FILES_BASE_DIR, “dropped_ips.json”)
KNOWN_IP_FILE = os.path.join(DATA_FILES_BASE_DIR, “known_ips.json”)
この修正を dmruser スクリプトに適用すると、どこから実行しても /home/kshinoda/ のデータファイルを正しく参照するようになります。
- Root権限: unblock, allow, disallow コマンドは iptables の操作やシステムファイルの書き込みを行うため、sudo を付けて実行する必要があります。
これで、DMRモニターシステムをより詳細に管理するためのdmruser コマンドが利用可能になります。