大数据常用命令
MySQL
# 启动
service mysqld start
systemctl start mysql[d]
# 关闭
service mysqld stop
#设置mysql开机启动
chkconfig mysqld on
Hadoop
# 全局组件启动与停止
start-all.sh
stop-all.sh
# HDFS 启动与停止
start-dfs.sh
stop-dfs.sh
# Yarn 启动与停止
start-yarn.sh
stop-yarn.sh
# HDFS 单个启动
hadoop-daemon.sh start namenode
# HDFS 多个启动
hadoop-daemons.sh start datanode
# Yarn 单个启动
yarn-daemon.sh start resourcemanager
# Yarn 多个启动
yarn-daemons.sh start nodemanager
# MR 历史 job记录,端口号 19888
mr-jobhistory-daemon.sh start historyserver
# 退出安全模式
hadoop dfsadmin -safemode leave
Hive
# 启动Hive 的元数据服务
nohup /export/server/hive-2.1.0/bin/hive --service metastore &
# 启动Hive 客户端服务
nohup /export/server/hive-2.1.0/bin/hiveserver2 start &
# beeline
!connect jdbc:hive2://node03:10000
# hive元数据初始化和更新
schematool -dbType mysql -initSchema
schematool -dbType mysql -upgradeSchema
# 使用动态分区
# 开启动态分区
set hive.exec.dynamic.partition=true;
# 开启非严格模式
set hive.exec.dynamic.partition.mode=nonstrict;
# 每个节点生成动态分区的最大个数
set hive.exec.max.dynamic.partitions.pernode=10000;
# 生成动态分区的最大个数
set hive.exec.max.dynamic.partitions=100000;
# 一个任务最多可以创建的文件数目
set hive.exec.max.created.files=150000;
# 限定一次最多打开的文件数
set dfs.datanode.max.xcievers=8192;
## Hive基础优化内容
# hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
# 写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;
# 分桶
set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
# 并行执行
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8;
# 小文件合并
-- set mapred.max.split.size=2147483648;
-- set mapred.min.split.size.per.node=1000000000;
-- set mapred.min.split.size.per.rack=1000000000;
# 矢量化查询
set hive.vectorized.execution.enabled=true;
# 关联优化器
set hive.optimize.correlation=true;
# 读取零拷贝
set hive.exec.orc.zerocopy=true;
# join数据倾斜
set hive.optimize.skewjoin=true;
-- set hive.skewjoin.key=100000;
set hive.optimize.skewjoin.compiletime=true;
set hive.optimize.union.remove=true;
# group倾斜
set hive.groupby.skewindata=false;
Zookeeper
# 全局启动
zkServer.sh start
# 标准启动
zookeeper-daemon.sh start
Kafka
启动与停止
# 启动 Kafka 启动服务,需要先启动 zookeeper
kafka-server-start.sh config/server.properties >>/dev/null 2>&1 &
# 关闭 Kafka 服务
kafka-server-stop.sh
封装启动脚本, 记得给权限
#!/bin/bash
KAFKA_HOME=/export/server/kafka_2.12-2.4.1
for number in {1..3}
do
host=node${number}
echo ${host}
/usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;export JMX_PORT=9988;${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >>/dev/null 2>&1 &"
echo "${host} started"
done
封装关闭脚本,记得给权限
#!/bin/bash
KAFKA_HOME=/export/server/kafka_2.12-2.4.1
for number in {1..3}
do
host=node${number}
echo ${host}
/usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;${KAFKA_HOME}/bin/kafka-server-stop.sh"
echo "${host} stoped"
done
彻底删除 kafka 并初始化
# 1.检查 server.properties 配置文件中的 delete.topic.enable=true,所有节点都需要设置,生效需要重启
# 2.删除 kafka 中的 topic test_data
bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --delete --topic test_data
#3.打开 zkCli.sh 删除三组配置
rm-rf /brokers/topics/test_data
rm-rf /config/topics/test_data
rm-rf /admin/delete_topics/test_data
#4.如果 kafka 集群没有关闭,关闭集群
#5.清空 log.dirs=/export/data/kafka/kafka-logs 目录,也就是 kafka 集群的数据目录
rm -rf /export/data/kafka/kafka-logs/*
#6.重启 kafka 集群
#7.重新创建新的 topic
bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create --topic test_data -- partitions 3 --replication-factor 2
创建主题
kafka-topics.sh --zookeeper node3:2181 --create --topic spark_kafka --partitions 3 --replication-factor 1
kafka-topics.sh --zookeeper node3:2181 --list
启动生产者和消费者
kafka-console-producer.sh --broker-list node3:9092 --topic spark_kafka
kafka-console-consumer.sh --from-beginning --bootstrap-server node3:9092 --topic spark_kafka
kafka-console-consumer.sh --from-beginning --bootstrap-server node3:9092 --topic __consumer_offsets
Spark
启动spark-thriftserver
start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10001 \
--hiveconf hive.server2.thrift.bind.host=node3 \
--master local[*]
启动 Spark HistoryServer服务, 端口号 18080
sbin/start-history-server.sh
structured Streaming
--memory sink
CREATE TABLE db_spark.tb_word_count (
id int NOT NULL AUTO_INCREMENT,
word varchar(255) NOT NULL,
count int NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY word (word)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
REPLACE INTO tb_word_count (id, word, count) VALUES (NULL, ?, ?);
spark yarn Pi 测试
/export/server/spark/bin/spark-submit \
--master yarn \
--class org.apache.spark.examples.SparkPi \
${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \
10
WordCount yarn
/export/server/spark/bin/spark-submit \
--master yarn \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--num-executors 2 \
--queue default \
--class cn.test.spark._2SparkWordCount \
/opt/spark-chapter01-1.0-SNAPSHOT.jar
Spark-submit
【 Run application local on 8 cores】
/export/server/spark/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[8] \
${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \
100
# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \
1000
# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \ # can be client for client mode
--executor-memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000
# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
--master spark://207.184.161.138:7077 \
examples/src/main/python/pi.py \
1000
# Run on a Mesos cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master mesos://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
http://path/to/examples.jar \
1000
# Run on a Kubernetes cluster in cluster deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master k8s://xx.yy.zz.ww:443 \
--deploy-mode cluster \
--executor-memory 20G \
--num-executors 50 \
http://path/to/examples.jar \
1000
Sqoop数据抽取和数据验证
export SQOOP_HOME=/export/server/sqoop-1.4.7.bin_hadoop-2.6.0
$SQOOP_HOME/bin/sqoop import \
--connect jdbc:mysql://192.168.88.163:3306/insurance \
--username root \
--password 123456 \
--table dd_table \
--hive-table insurance_ods.dd_table \
--hive-import \
--hive-overwrite \
--fields-terminated-by '\t' \
--delete-target-dir \
-m 1
#1、查询MySQL的表dd_table的条数
mysql_log=`$SQOOP_HOME/bin/sqoop eval \
--connect jdbc:mysql://192.168.88.163:3306/insurance \
--username root \
--password 123456 \
--query "select count(1) from dd_table"
`
mysql_cnt=`echo $mysql_log | awk -F'|' {'print $4'} | awk {'print $1'}`
#2、查询hive的表dd_table的条数
hive_log=`hive -e "select count(1) from insurance_ods.dd_table"`
#3、比较2边的数字是否一样。
if [ $mysql_cnt -eq $hive_log ] ; then
echo "mysql表的数据量=$mysql_cnt,hive表的数据量=$hive_log,是相等的"
else
echo "mysql表的数据量=$mysql_cnt,hive表的数据量=$hive_log,不是相等的"
fi
FLink
Flink on Yarn
- Session 模式
# 先创建 Session 会话, d 表示后台运行,s 表示每个 jm 的 slot 个数
flink/bin/yarn-session.sh -d -jm 1024 -tm 1024 -s 2
# 提交任务
flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar \
--input hdfs://node1.test.cn:8020/wordcount/input
- Job 分离模式
# 直接提交任务,m 表示 jm 的地址,环境变量需要提前配置
/export/server/flink/bin/flink run \
-m yarn-cluster -yjm 1024 -ytm 1024 \
/export/server/flink/examples/batch/WordCount.jar \
--input hdfs://node1.test.cn:8020/wordcount/input
其它命令
ES 启动
cd /export/server/es/elasticsearch-7.6.1/
/export/server/es/elasticsearch-7.6.1/bin/elasticsearch >>/dev/null 2>&1 &
markdown代码折叠
<details>
<summary><b>点击查看完整代码</b></summary>
<pre><code>
</code></pre>
</details>
免秘钥登录
ssh-keygen -t rsa
ssh-copy-id node1
scp /root/.ssh/authorized_keys node2:/root/.ssh
scp /root/.ssh/authorized_keys node3:/root/.ssh
大数据常用命令
https://jface001.github.io/2020/05/13/大数据常用命令/