flinkcdc mysql DataStream API问题错在哪?
flinkcdc版本是2.1.1, mysql:5.7. flink:1.15.1
照着官方的葫芦画个瓢都IDEAJ没反应,在flink上跑jar提示:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
java代码:
import java.util.Properties;import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.example.utils.TransformUtil;
import org.apache.flink.streaming.api.windowing.time.Time;
public class ApplyFundExecutives {
public static void main(String[] args) {
Properties debeziumProperties = new Properties();
debeziumProperties.put("snapshot.locking.mode", "none");
MySqlSource<String> applyFundSource = MySqlSource.<String>builder()
.hostname("127.0.0.1")
.port(33061)
.databaseList("已存在的数据") // set captured database
.tableList("某张表") // set captured table
.username("root")
.password("root")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置 3s 的 checkpoint 间隔
// checkpoint every 3000 milliseconds
env.enableCheckpointing(3000);
env.fromSource(
applyFundSource,
WatermarkStrategy.noWatermarks(),
"MySQL ApplyFund Source").setParallelism(4).print().setParallelism(1);
try {
env.execute("Print MySQL Snapshot + Binlog");
}catch (Exception e){
e.printStackTrace();
}
}
}
pom:
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>etl-table</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>alibaba</id>
<name>ali Repository</name>
<url>https://maven.aliyun.com/repository/public</url>
</repository>
</repositories>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- source相关的jar-->
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.21</version>
</dependency>
<!-- Flink Connectors :Stream || DataSet || Table -->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.15.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.15.1</version>
</dependency>
<!-- sink相关的jar-->
<!-- https://docs.microsoft.com/zh-cn/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver15#using-the-jdbc-driver-with-maven-central -->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>10.2.0.jre8</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.15.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.80</version>
</dependency>
<!-- CNFE:org.apache.flink.connector.base.source.reader.RecordEmitter-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.15.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.15.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.15.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.15.1</version>
</dependency>
</dependencies>
</project>
回答:
找到问题了:
1)版本不匹配: flink的版本要与使用的flinkcdc声明的一样。我使用的是2.1.1对应的flink版本是1.13.6
2)相关jar: 把IDE中依赖的非flink的jar放到flink的lib下:%FLINK_HOME%/lib. 这些除了驱动外flinkcdc依赖的版本也是不一样的.不可以混用
以上是 flinkcdc mysql DataStream API问题错在哪? 的全部内容, 来源链接: utcz.com/p/944615.html